Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument client/service for end-to-end request/response tracking #521

Open
wants to merge 1 commit into
base: rolling
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 56 additions & 6 deletions rmw_cyclonedds_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4676,6 +4676,13 @@ extern "C" rmw_ret_t rmw_take_response(
info->reqtime.erase(seq);
}
#endif
TRACETOOLS_TRACEPOINT(
rmw_take_response,
static_cast<const void *>(client),
static_cast<const void *>(ros_response),
(nullptr != request_header ? request_header->request_id.sequence_number : 0LL),
(nullptr != request_header ? request_header->source_timestamp : 0LL),
*taken);
return ret;
}

Expand Down Expand Up @@ -4706,17 +4713,30 @@ extern "C" rmw_ret_t rmw_take_request(
service, service->implementation_identifier, eclipse_cyclonedds_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
auto info = static_cast<CddsService *>(service->data);
return rmw_take_response_request(
rmw_ret_t ret = rmw_take_response_request(
&info->service, request_header, ros_request, taken, nullptr,
false);
if (TRACETOOLS_TRACEPOINT_ENABLED(rmw_take_request)) {
// Do not use the whole request_header->writer_guid, see the rmw_client_init tracepoint trigger
rmw_gid_t gid{};
memcpy(gid.data, &request_header->request_id.writer_guid, sizeof(info->service.pub->pubiid));
TRACETOOLS_DO_TRACEPOINT(
rmw_take_request,
static_cast<const void *>(service),
static_cast<const void *>(ros_request),
gid.data,
(nullptr != request_header ? request_header->request_id.sequence_number : 0LL),
*taken);
}
return ret;
}

static rmw_ret_t rmw_send_response_request(
CddsCS * cs, const cdds_request_header_t & header,
const void * ros_data)
const void * ros_data, const dds_time_t timestamp)
{
const cdds_request_wrapper_t wrap = {header, const_cast<void *>(ros_data)};
if (dds_write(cs->pub->enth, static_cast<const void *>(&wrap)) >= 0) {
if (dds_write_ts(cs->pub->enth, static_cast<const void *>(&wrap), timestamp) >= 0) {
return RMW_RET_OK;
} else {
RMW_SET_ERROR_MSG("cannot publish data");
Expand Down Expand Up @@ -4818,7 +4838,22 @@ extern "C" rmw_ret_t rmw_send_response(
case client_present_t::MAYBE:
return RMW_RET_TIMEOUT;
case client_present_t::YES:
return rmw_send_response_request(&info->service, header, ros_response);
{
const dds_time_t timestamp = dds_time();
if (TRACETOOLS_TRACEPOINT_ENABLED(rmw_send_response)) {
// Do not use request_header->writer_guid, see the rmw_client_init tracepoint trigger
rmw_gid_t gid{};
memcpy(gid.data, &header.guid, sizeof(header.guid));
TRACETOOLS_DO_TRACEPOINT(
rmw_send_response,
static_cast<const void *>(service),
static_cast<const void *>(ros_response),
gid.data,
header.seq,
timestamp);
}
return rmw_send_response_request(&info->service, header, ros_response, timestamp);
}
case client_present_t::GONE:
return RMW_RET_OK;
}
Expand All @@ -4841,15 +4876,21 @@ extern "C" rmw_ret_t rmw_send_request(
cdds_request_header_t header;
header.guid = info->client.pub->pubiid;
header.seq = *sequence_id = ++next_request_id;
const dds_time_t timestamp = dds_time();

#if REPORT_BLOCKED_REQUESTS
{
std::lock_guard<std::mutex> lock(info->lock);
info->reqtime[header.seq] = dds_time();
info->reqtime[header.seq] = timestamp;
}
#endif

return rmw_send_response_request(&info->client, header, ros_request);
TRACETOOLS_TRACEPOINT(
rmw_send_request,
static_cast<const void *>(client),
static_cast<const void *>(ros_request),
header.seq);
return rmw_send_response_request(&info->client, header, ros_request, timestamp);
}

static const rosidl_service_type_support_t * get_service_typesupport(
Expand Down Expand Up @@ -5151,6 +5192,15 @@ extern "C" rmw_client_t * rmw_create_client(
cleanup_client.cancel();
cleanup_fini_cs.cancel();
cleanup_info.cancel();
if (TRACETOOLS_TRACEPOINT_ENABLED(rmw_client_init)) {
// rmw_cyclonedds uses info->client.pub->pubiid as the internal request header GUID, which is
// the first half (8 bytes out of 16 bytes) of the rmw_request_id_t's writer_guid. The second
// half doesn't match when read from the client side and the service side, so only use the first
// half. The second half will be zeros on both client side and service side.
rmw_gid_t gid{};
memcpy(gid.data, &info->client.pub->pubiid, sizeof(info->client.pub->pubiid));
TRACETOOLS_DO_TRACEPOINT(rmw_client_init, static_cast<const void *>(rmw_client), gid.data);
}
return rmw_client;
}

Expand Down