Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

to support a feature of content filtered topic #1

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions rcl/include/rcl/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extern "C"
#include "rcl/macros.h"
#include "rcl/node.h"
#include "rcl/visibility_control.h"
#include "rcutils/types/string_array.h"

#include "rmw/message_sequence.h"

Expand Down Expand Up @@ -204,6 +205,136 @@ RCL_WARN_UNUSED
rcl_subscription_options_t
rcl_subscription_get_default_options(void);

/// Copy one rcl_subscription_options_t structure into another.
/**
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | Yes
* Thread-Safe | No
* Uses Atomics | No
* Lock-Free | Yes
*
* \param[in] src The structure to be copied.
* Its allocator is used to copy memory into the new structure.
* \param[out] dst A rcl_subscription_options_t structure to be copied into.
* \return `RCL_RET_OK` if the structure was copied successfully, or
* \return `RCL_RET_INVALID_ARGUMENT` if src is NULL, or
* if src allocator is invalid, or
* if dst is NULL, or
* if dst contains already allocated memory, or
* \return `RCL_RET_BAD_ALLOC` if allocating memory failed.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_subscription_options_copy(
const rcl_subscription_options_t * src,
rcl_subscription_options_t * dst
);

/// Reclaim resources held inside rcl_subscription_options_t structure.
/**
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | No
* Thread-Safe | No
* Uses Atomics | No
* Lock-Free | Yes
*
* \param[in] option The structure which its resources have to be deallocated.
* \return `RCL_RET_OK` if the memory was successfully freed, or
* \return `RCL_RET_INVALID_ARGUMENT` if log_levels is NULL, or
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* \return `RCL_RET_INVALID_ARGUMENT` if log_levels is NULL, or
* \return `RCL_RET_INVALID_ARGUMENT` if option is NULL, or

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

* if its allocator is invalid and the structure contains initialized memory.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_subscription_options_fini(rcl_subscription_options_t * option);

/// Check if the content filter topic feature is supported in the subscription.
/**
* Depending on the middleware and whether cft is supported in the subscription.
* this will return true if the middleware can support ContentFilteredTopic in the subscription.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
bool
rcl_subscription_is_cft_supported(const rcl_subscription_t * subscription);

/// Set the filter expression and expression parameters for the subscription.
/**
* This function will set a filter expression and an array of expression parameters
* for the given subscription, but not to update the original rcl_subscription_options_t
* of subscription.
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | No
* Thread-Safe | No
* Uses Atomics | Maybe [1]
* Lock-Free | Maybe [1]
* <i>[1] implementation defined, check the implementation documentation</i>
*
* \param[in] subscription subscription the subscription object to inspect.
iuhilnehc-ynos marked this conversation as resolved.
Show resolved Hide resolved
* \param[in] filter_expression An filter expression to set.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* \param[in] filter_expression An filter expression to set.
* \param[in] filter_expression A filter expression to set.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

* \param[in] expression_parameters Array of expression parameters to set,
* it can be NULL if there is no placeholder in filter_expression.
* \return `RCL_RET_OK` if the query was successful, or
* \return `RCL_RET_INVALID_ARGUMENT` if `subscription` is NULL, or
* \return `RCL_RET_INVALID_ARGUMENT` if `filter_expression` is NULL, or
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How we can reset the filtering on subscription? i think that there should be a way to do that, since the application is likely to do the following.

  1. create subscription.
  2. receive messages via subscription.
  3. set content filtering.
  4. receive messages via cft subscription.
  5. reset content filtering.
  6. receive messages via subsciption.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking as following,

  1. If a subscription can be filtered, users should set this subscription with filter_expression(""), not NULL.
  2. receive all messages
  3. update filter_expression with custom value by rcl_subscription_set_cft_expression_parameters, such as "data MATCH 'Hello World: 5'"
  4. receive messages via cft subscription
  5. set filter_expression("") to reset.
  6. receive all messages

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, i see and it works

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, i see and it works

I am sorry. After checking and trying to reproduce the above case, I found it's not supported.

Currently,
User must set content filter topic with a non-empty string for subscription at the beginning, and then update with a new filter (not empty string).
it means if users create a subscription without a content filter expression, this subscription can't subscribe to the content filter topic anymore.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't be 😄 we just trying some new stuff, there will be something like this.

okay that is said, the followings should be done in user application aspect?

  • user creates normal subscription.
  • if user wants to use contentfilter, application provides the filtering expression and parameters which are not NULL. (actually this is the 1st time to create contentfilteredtopic based on the parent topic)
  • user can only have events from contentfilteredtopic but not from parent topic.
  • user can change and modify the filtering expression and parameters which are not NULL.
  • if the user wants to use normal topic (in other words, parent topic) user can specify NULL filtering expression and parameters? rmw takes NULL expression and then in the backend it will delete the contentfilteredtopic?

i am not sure if anything like this feasible, but if we do, it just appears to be the filtering expression and parameters interfaces only for user application. in other words, it can conceal creating/destroying contentfilteredtopic to user application, but rmw should take care of.

* \return `RCL_RET_INCORRECT_RMW_IMPLEMENTATION` if the `node` implementation
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RCL_RET_INCORRECT_RMW_IMPLEMENTATION is going to be defined in rcl later? i do not see this is already supported in rcl. and actually never used.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove it

* identifier does not match this implementation, or
* \return `RCL_RET_ERROR` if an unspecified error occurs.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_subscription_set_cft_expression_parameters(
const rcl_subscription_t * subscription,
const char * filter_expression,
const rcutils_string_array_t * expression_parameters
);

/// Retrieve the filter expression of the subscription.
/**
* This function will return an filter expression by the given subscription.
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | Yes
* Thread-Safe | No
* Uses Atomics | Maybe [1]
* Lock-Free | Maybe [1]
* <i>[1] implementation defined, check the implementation documentation</i>
iuhilnehc-ynos marked this conversation as resolved.
Show resolved Hide resolved
*
* \param[in] subscription subscription the subscription object to inspect.
* \param[out] filter_expression an filter expression, populated on success.
* It is up to the caller to deallocate the filter expression later on,
* using rcutils_get_default_allocator().deallocate().
* \param[out] expression_parameters Array of expression parameters, populated on success.
* It is up to the caller to finalize this array later on, using rcutils_string_array_fini().
* \return `RCL_RET_OK` if the query was successful, or
* \return `RCL_RET_INVALID_ARGUMENT` if `subscription` is NULL, or
* \return `RCL_RET_INVALID_ARGUMENT` if `filter_expression` is NULL, or
* \return `RMW_RET_INVALID_ARGUMENT` if `expression_parameters` is NULL, or
* \return `RCL_RET_INCORRECT_RMW_IMPLEMENTATION` if the `node` implementation
* identifier does not match this implementation, or
* \return `RCL_RET_BAD_ALLOC` if memory allocation fails, or
* \return `RCL_RET_ERROR` if an unspecified error occurs.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_subscription_get_cft_expression_parameters(
const rcl_subscription_t * subscription,
char ** filter_expression,
rcutils_string_array_t * expression_parameters
);

/// Take a ROS message from a topic using a rcl subscription.
/**
* It is the job of the caller to ensure that the type of the ros_message
Expand Down
175 changes: 173 additions & 2 deletions rcl/src/rcl/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ extern "C"
#include "rcl/error_handling.h"
#include "rcl/node.h"
#include "rcutils/logging_macros.h"
#include "rcutils/strdup.h"
#include "rcutils/types/string_array.h"
#include "rmw/error_handling.h"
#include "rmw/validate_full_topic_name.h"
#include "tracetools/tracetools.h"
Expand Down Expand Up @@ -92,6 +94,7 @@ rcl_subscription_init(
sizeof(rcl_subscription_impl_t), allocator->state);
RCL_CHECK_FOR_NULL_WITH_MSG(
subscription->impl, "allocating memory failed", ret = RCL_RET_BAD_ALLOC; goto cleanup);
subscription->impl->options = rcl_subscription_get_default_options();
// Fill out the implemenation struct.
// rmw_handle
// TODO(wjwwood): pass allocator once supported in rmw api.
Expand All @@ -116,8 +119,12 @@ rcl_subscription_init(
}
subscription->impl->actual_qos.avoid_ros_namespace_conventions =
options->qos.avoid_ros_namespace_conventions;
// options
subscription->impl->options = *options;
ret = rcl_subscription_options_copy(options, &subscription->impl->options);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at this time, calling rnw_create_subscription is to create parent subscription for content filtering. so actually it just saves filter_expression and expression_parameters, not to be used during creating subscription. am i understanding right? and beside, at this moment, it is likely that filter_expression and expression_parameters are empty, right? because rcl_subscription_set_cft_expression_parameters cannot be called before.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at this time, calling rnw_create_subscription is to create parent subscription for content filtering. so actually it just saves filter_expression and expression_parameters, not to be used during creating subscription. am i understanding right?

The concrete rmw(rmw_connextdds) had already directly used filter_expression and expression_parameters

subscription->impl->rmw_handle = rmw_create_subscription(
,
I thought that these two parameters should be backed up into rcl_subscription_impl_t, so it can prevent from accessing a pointer that might be changed outside.

and beside, at this moment, it is likely that filter_expression and expression_parameters are empty, right? because rcl_subscription_set_cft_expression_parameters cannot be called before.

Yes.

if (RCL_RET_OK != ret) {
ret = RCL_RET_ERROR;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need to overwrite the error code here?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

goto fail;
}

RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription initialized");
ret = RCL_RET_OK;
TRACEPOINT(
Expand All @@ -139,6 +146,12 @@ rcl_subscription_init(
}
}

ret = rcl_subscription_options_fini(&subscription->impl->options);
if (RCL_RET_OK != ret) {
RCUTILS_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RCUTILS_SAFE_FWRITE_TO_STDERR("\n");
}

allocator->deallocate(subscription->impl, allocator->state);
subscription->impl = NULL;
}
Expand Down Expand Up @@ -175,6 +188,13 @@ rcl_subscription_fini(rcl_subscription_t * subscription, rcl_node_t * node)
RCL_SET_ERROR_MSG(rmw_get_error_string().str);
result = RCL_RET_ERROR;
}
rcl_ret_t rcl_ret = rcl_subscription_options_fini(&subscription->impl->options);
if (RCL_RET_OK != rcl_ret) {
RCUTILS_SAFE_FWRITE_TO_STDERR(rcl_get_error_string().str);
RCUTILS_SAFE_FWRITE_TO_STDERR("\n");
result = RCL_RET_ERROR;
}

allocator.deallocate(subscription->impl, allocator.state);
subscription->impl = NULL;
}
Expand All @@ -194,6 +214,157 @@ rcl_subscription_get_default_options()
return default_options;
}

RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_subscription_options_copy(
const rcl_subscription_options_t * src,
rcl_subscription_options_t * dst
)
{
RCL_CHECK_ARGUMENT_FOR_NULL(src, RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(dst, RCL_RET_INVALID_ARGUMENT);
const rcl_allocator_t * allocator = &src->allocator;
RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);

rcl_ret_t ret = RCL_RET_OK;
dst->qos = src->qos;
dst->allocator = src->allocator;
// copy rmw_subscription_options_t
dst->rmw_subscription_options.rmw_specific_subscription_payload =
src->rmw_subscription_options.rmw_specific_subscription_payload;
dst->rmw_subscription_options.ignore_local_publications =
src->rmw_subscription_options.ignore_local_publications;
if (src->rmw_subscription_options.filter_expression) {
dst->rmw_subscription_options.filter_expression =
rcutils_strdup(src->rmw_subscription_options.filter_expression, *allocator);
if (!dst->rmw_subscription_options.filter_expression) {
ret = RCL_RET_BAD_ALLOC;
goto clean;
}
}

if (src->rmw_subscription_options.expression_parameters) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably return error if filter_expression is not null but expression_parameters is null. i believe this case cannot exist.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll move it inside the above if statement.

rcutils_string_array_t * parameters =
(rcutils_string_array_t *)allocator->allocate(
sizeof(rcutils_string_array_t),
allocator->state);
if (!parameters) {
ret = RCL_RET_BAD_ALLOC;
goto clean;
}

rcutils_ret_t ret = rcutils_string_array_init(
parameters, src->rmw_subscription_options.expression_parameters->size, allocator);
if (RCUTILS_RET_OK != ret) {
ret = RCL_RET_BAD_ALLOC;
goto clean;
}

for (size_t i = 0; i < src->rmw_subscription_options.expression_parameters->size; ++i) {
char * parameter = rcutils_strdup(
src->rmw_subscription_options.expression_parameters->data[i], *allocator);
if (!parameter) {
ret = RCL_RET_BAD_ALLOC;
goto clean;
}
parameters->data[i] = parameter;
}

dst->rmw_subscription_options.expression_parameters = parameters;
}

return RCL_RET_OK;

clean:
if (RCL_RET_OK != rcl_subscription_options_fini(dst)) {
RCL_SET_ERROR_MSG("Error while finalizing rcl subscription options due to another error");
}
return ret;
}

RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_subscription_options_fini(rcl_subscription_options_t * option)
{
RCL_CHECK_ARGUMENT_FOR_NULL(option, RCL_RET_INVALID_ARGUMENT);
// fini rmw_subscription_options_t
const rcl_allocator_t * allocator = &option->allocator;
RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
if (option->rmw_subscription_options.filter_expression) {
allocator->deallocate(option->rmw_subscription_options.filter_expression, allocator->state);
option->rmw_subscription_options.filter_expression = NULL;
}

if (option->rmw_subscription_options.expression_parameters) {
rcutils_ret_t ret = rcutils_string_array_fini(
option->rmw_subscription_options.expression_parameters);
assert(ret == RCUTILS_RET_OK);
allocator->deallocate(option->rmw_subscription_options.expression_parameters, allocator->state);
option->rmw_subscription_options.expression_parameters = NULL;
}
return RCL_RET_OK;
}

bool
rcl_subscription_is_cft_supported(const rcl_subscription_t * subscription)
{
if (!rcl_subscription_is_valid(subscription)) {
return false; // error message already set
}
return subscription->impl->rmw_handle->is_cft_supported;
}

rcl_ret_t
rcl_subscription_set_cft_expression_parameters(
const rcl_subscription_t * subscription,
const char * filter_expression,
const rcutils_string_array_t * expression_parameters
)
{
RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID);
RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT);

if (!rcl_subscription_is_valid(subscription)) {
return RCL_RET_SUBSCRIPTION_INVALID;
}
RCL_CHECK_ARGUMENT_FOR_NULL(filter_expression, RCL_RET_INVALID_ARGUMENT);
rmw_ret_t ret = rmw_subscription_set_cft_expression_parameters(
subscription->impl->rmw_handle, filter_expression, expression_parameters);

if (ret != RMW_RET_OK) {
RCL_SET_ERROR_MSG(rmw_get_error_string().str);
return rcl_convert_rmw_ret_to_rcl_ret(ret);
}
return RCL_RET_OK;
}

rcl_ret_t
rcl_subscription_get_cft_expression_parameters(
const rcl_subscription_t * subscription,
char ** filter_expression,
rcutils_string_array_t * expression_parameters
)
{
RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID);
RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT);

if (!rcl_subscription_is_valid(subscription)) {
return RCL_RET_SUBSCRIPTION_INVALID;
}
RCL_CHECK_ARGUMENT_FOR_NULL(filter_expression, RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(expression_parameters, RCL_RET_INVALID_ARGUMENT);
rmw_ret_t ret = rmw_subscription_get_cft_expression_parameters(
subscription->impl->rmw_handle, filter_expression, expression_parameters);

if (ret != RMW_RET_OK) {
RCL_SET_ERROR_MSG(rmw_get_error_string().str);
return rcl_convert_rmw_ret_to_rcl_ret(ret);
}
return RCL_RET_OK;
}

rcl_ret_t
rcl_take(
const rcl_subscription_t * subscription,
Expand Down
Loading