-
Notifications
You must be signed in to change notification settings - Fork 0
/
example_subscriber_3.c
370 lines (324 loc) · 14.2 KB
/
example_subscriber_3.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
/*
* (c) Copyright, Real-Time Innovations, 2021. All rights reserved.
* RTI grants Licensee a license to use, modify, compile, and create derivative
* works of the software solely for use with RTI Connext DDS. Licensee may
* redistribute copies of the software provided that all such copies are subject
* to this license. The software is provided "as is", with no warranty of any
* type, including any warranty for fitness for any purpose. RTI is under no
* obligation to maintain or support the software. RTI shall not be liable for
* any incidental or consequential damages arising out of the use or inability
* to use the software.
*/
#include <stdio.h>
// headers from Connext DDS Micro installation
#include "rti_me_c.h"
#include "disc_dpse/disc_dpse_dpsediscovery.h"
#include "wh_sm/wh_sm_history.h"
#include "rh_sm/rh_sm_history.h"
#include "netio/netio_udp.h"
// rtiddsgen generated headers
#include "example.h"
#include "examplePlugin.h"
#include "exampleSupport.h"
// DPSE Discovery-related constants defined in this header
#include "discovery_constants.h"
// Names of network interfaces on the target machine
#include "nic_config.h"
void my_typeSubscriber_on_subscription_matched(
void *listener_data,
DDS_DataReader * reader,
const struct DDS_SubscriptionMatchedStatus *status)
{
(void)(listener_data); // to suppress -Wunused-parameter warning
char the_topic_name[64];
strcpy(the_topic_name, DDS_TopicDescription_get_name(DDS_DataReader_get_topicdescription(reader)));
if (status->current_count_change > 0) {
printf("INFO: Matched a DataWriter on Topic: '%s'\n", the_topic_name);
} else if (status->current_count_change < 0) {
printf("INFO: Unmatched a DataWriter on Topic: '%s'\n", the_topic_name);
}
}
void take_the_data(DDS_DataReader * reader)
{
my_typeDataReader *narrowed_reader = my_typeDataReader_narrow(reader);
struct DDS_SampleInfoSeq info_seq = DDS_SEQUENCE_INITIALIZER;
struct my_typeSeq sample_seq = DDS_SEQUENCE_INITIALIZER;
const DDS_Long MAX_SAMPLES_PER_TAKE = 32;
DDS_ReturnCode_t retcode;
retcode = my_typeDataReader_take(
narrowed_reader,
&sample_seq,
&info_seq,
MAX_SAMPLES_PER_TAKE,
DDS_ANY_SAMPLE_STATE,
DDS_ANY_VIEW_STATE,
DDS_ANY_INSTANCE_STATE);
if (retcode == DDS_RETCODE_NO_DATA) {
printf("INFO: no data\n");
} else if (retcode != DDS_RETCODE_OK) {
printf("ERROR: failed to take data, retcode = %d\n", retcode);
}
// print each valid sample taken
DDS_Long i;
for (i = 0; i < my_typeSeq_get_length(&sample_seq); ++i) {
struct DDS_SampleInfo *sample_info =
DDS_SampleInfoSeq_get_reference(&info_seq, i);
if (sample_info->valid_data) {
my_type *sample = my_typeSeq_get_reference(&sample_seq, i);
printf("\nValid sample received\n");
printf("\tsample id = %d\n", sample->id);
printf("\tsample msg = %s\n", sample->msg);
} else {
printf("\nSample received\n\tINVALID DATA\n");
}
}
my_typeDataReader_return_loan(narrowed_reader, &sample_seq, &info_seq);
my_typeSeq_finalize(&sample_seq);
DDS_SampleInfoSeq_finalize(&info_seq);
}
int main(void)
{
DDS_ReturnCode_t retcode;
// create the DomainParticipantFactory and registry so that we can change
// some of the default values
DDS_DomainParticipantFactory *dpf = NULL;
dpf = DDS_DomainParticipantFactory_get_instance();
RT_Registry_T *registry = NULL;
registry = DDS_DomainParticipantFactory_get_registry(dpf);
// register writer history
if (!RT_Registry_register(
registry,
DDSHST_WRITER_DEFAULT_HISTORY_NAME,
WHSM_HistoryFactory_get_interface(),
NULL,
NULL))
{
printf("ERROR: failed to register wh\n");
}
// register reader history
if (!RT_Registry_register(
registry,
DDSHST_READER_DEFAULT_HISTORY_NAME,
RHSM_HistoryFactory_get_interface(),
NULL,
NULL))
{
printf("ERROR: failed to register rh\n");
}
// Set up the UDP transport's allowed interfaces. To do this we:
// (1) unregister the UDP transport
// (2) name the allowed interfaces
// (3) re-register the transport
if(!RT_Registry_unregister(
registry,
NETIO_DEFAULT_UDP_NAME,
NULL,
NULL))
{
printf("ERROR: failed to unregister udp\n");
}
struct UDP_InterfaceFactoryProperty *udp_property = NULL;
udp_property = (struct UDP_InterfaceFactoryProperty *)
malloc(sizeof(struct UDP_InterfaceFactoryProperty));
if (udp_property == NULL) {
printf("ERROR: failed to allocate udp properties\n");
}
*udp_property = UDP_INTERFACE_FACTORY_PROPERTY_DEFAULT;
// For additional allowed interface(s), increase maximum and length, and
// set interface below:
REDA_StringSeq_set_maximum(&udp_property->allow_interface,2);
REDA_StringSeq_set_length(&udp_property->allow_interface,2);
*REDA_StringSeq_get_reference(&udp_property->allow_interface,0) =
DDS_String_dup(loopback_name);
*REDA_StringSeq_get_reference(&udp_property->allow_interface,1) =
DDS_String_dup(eth_nic_name);
#if 0
// When you are working on an RTOS or other lightweight OS, the middleware
// may not be able to get the NIC information automatically. In that case,
// the code below can be used to manually tell the middleware about an
// interface. The interface name ("en0" below) could be anything, but should
// match what you included in the "allow_interface" calls above.
if (!UDP_InterfaceTable_add_entry(
&udp_property->if_table,
0xc0a864c8, // IP address of 192.168.100.200
0xffffff00, // netmask of 255.255.255.0
"en0",
UDP_INTERFACE_INTERFACE_UP_FLAG |
UDP_INTERFACE_INTERFACE_MULTICAST_FLAG)) {
LOG(1, "failed to add interface")
}
#endif
if(!RT_Registry_register(
registry,
NETIO_DEFAULT_UDP_NAME,
UDP_InterfaceFactory_get_interface(),
(struct RT_ComponentFactoryProperty*)udp_property, NULL))
{
printf("ERROR: failed to re-register udp\n");
}
struct DPSE_DiscoveryPluginProperty discovery_plugin_properties =
DPSE_DiscoveryPluginProperty_INITIALIZER;
struct DDS_Duration_t my_lease = {5,0};
struct DDS_Duration_t my_assert_period = {2,0};
discovery_plugin_properties.participant_liveliness_lease_duration = my_lease;
discovery_plugin_properties.participant_liveliness_assert_period = my_assert_period;
// register the dpse (discovery) component
if (!RT_Registry_register(
registry,
"dpse",
DPSE_DiscoveryFactory_get_interface(),
&discovery_plugin_properties._parent,
NULL))
{
printf("ERROR: failed to register dpse\n");
}
// Now that we've finished the changes to the registry, we will start
// creating DDS entities. By setting autoenable_created_entities to false
// until all of the DDS entities are created, we limit all dynamic memory
// allocation to happen *before* the point where we enable everything.
struct DDS_DomainParticipantFactoryQos dpf_qos =
DDS_DomainParticipantFactoryQos_INITIALIZER;
DDS_DomainParticipantFactory_get_qos(dpf, &dpf_qos);
dpf_qos.entity_factory.autoenable_created_entities = DDS_BOOLEAN_FALSE;
DDS_DomainParticipantFactory_set_qos(dpf, &dpf_qos);
struct DDS_DomainParticipantQos dp_qos =
DDS_DomainParticipantQos_INITIALIZER;
// configure discovery prior to creating our DomainParticipant
if(!RT_ComponentFactoryId_set_name(&dp_qos.discovery.discovery.name, "dpse")) {
printf("ERROR: failed to set discovery plugin name\n");
}
if(!DDS_StringSeq_set_maximum(&dp_qos.discovery.initial_peers, 1)) {
printf("ERROR: failed to set initial peers maximum\n");
}
if (!DDS_StringSeq_set_length(&dp_qos.discovery.initial_peers, 1)) {
printf("ERROR: failed to set initial peers length\n");
}
*DDS_StringSeq_get_reference(&dp_qos.discovery.initial_peers, 0) =
DDS_String_dup(peer);
// configure the DomainParticipant's resource limits... these are just
// examples, if there are more remote or local endpoints these values would
// need to be increased
dp_qos.resource_limits.max_destination_ports = 32;
dp_qos.resource_limits.max_receive_ports = 32;
dp_qos.resource_limits.local_topic_allocation = 1;
dp_qos.resource_limits.local_type_allocation = 1;
dp_qos.resource_limits.local_reader_allocation = 1;
dp_qos.resource_limits.local_writer_allocation = 1;
dp_qos.resource_limits.remote_participant_allocation = 8;
dp_qos.resource_limits.remote_reader_allocation = 8;
dp_qos.resource_limits.remote_writer_allocation = 8;
// set the name of the local DomainParticipant (i.e. - this application)
// from the constants defined in discovery_constants.h
// (this is required for DPSE discovery)
strcpy(dp_qos.participant_name.name, k_PARTICIPANT04_NAME);
// now the DomainParticipant can be created
DDS_DomainParticipant *dp = NULL;
dp = DDS_DomainParticipantFactory_create_participant(
dpf,
domain_id,
&dp_qos,
NULL,
DDS_STATUS_MASK_NONE);
if(dp == NULL) {
printf("ERROR: failed to create participant\n");
}
// register the type (my_type, from the idl) with the middleware
retcode = DDS_DomainParticipant_register_type(
dp,
my_typeTypePlugin_get_default_type_name(),
my_typeTypePlugin_get());
if(retcode != DDS_RETCODE_OK) {
printf("ERROR: failed to register type\n");
}
// Create the Topic to which we will subscribe. Note that the name of the
// Topic is stored in my_topic_name, which was defined in the IDL
DDS_Topic *topic = NULL;
topic = DDS_DomainParticipant_create_topic(
dp,
my_topic_name, // this constant is defined in the *.idl file
my_typeTypePlugin_get_default_type_name(),
&DDS_TOPIC_QOS_DEFAULT,
NULL,
DDS_STATUS_MASK_NONE);
if(topic == NULL) {
printf("ERROR: topic == NULL\n");
}
// assert the remote DomainParticipant who's name is held in
// the constant k_PARTICIANT01_NAME, defined in discovery_constants.h
retcode = DPSE_RemoteParticipant_assert(dp, k_PARTICIPANT01_NAME);
if(retcode != DDS_RETCODE_OK) {
printf("ERROR: failed to assert remote participant\n");
}
// create the Subscriber
DDS_Subscriber *subscriber = NULL;
subscriber = DDS_DomainParticipant_create_subscriber(
dp,
&DDS_SUBSCRIBER_QOS_DEFAULT,
NULL,
DDS_STATUS_MASK_NONE);
if(subscriber == NULL) {
printf("ERROR: subscriber == NULL\n");
}
// Configure the DataReader's QoS. Note that the 'rtps_object_id' that we
// assign to our own DataReader here needs to be the same number the remote
// DataWriter will configure for its remote peer. We are defining these IDs
// and other constants in discovery_constants.h
struct DDS_DataReaderQos dr_qos = DDS_DataReaderQos_INITIALIZER;
dr_qos.protocol.rtps_object_id = k_OBJ_ID_PARTICIPANT04_DR01;
dr_qos.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
dr_qos.resource_limits.max_instances = 2;
dr_qos.resource_limits.max_samples_per_instance = 16;
dr_qos.resource_limits.max_samples = dr_qos.resource_limits.max_instances *
dr_qos.resource_limits.max_samples_per_instance;
dr_qos.reader_resource_limits.max_remote_writers = 10;
dr_qos.reader_resource_limits.max_remote_writers_per_instance = 10;
dr_qos.history.depth = 16;
// Configure the DataReaderListener callback
struct DDS_DataReaderListener dr_listener = DDS_DataReaderListener_INITIALIZER;
dr_listener.on_subscription_matched = my_typeSubscriber_on_subscription_matched;
// create the DataReader
DDS_DataReader *datareader = NULL;
datareader = DDS_Subscriber_create_datareader(
subscriber,
DDS_Topic_as_topicdescription(topic),
&dr_qos,
&dr_listener,
DDS_SUBSCRIPTION_MATCHED_STATUS);
if(datareader == NULL) {
printf("ERROR: datareader == NULL\n");
}
// When we use DPSE discovery we must manually setup information about any
// DataWriters we are expecting to discover. This information includes a
// unique object ID for the remote peer (we are defining this in
// discovery_constants.h), as well as its Topic, type, and QoS.
struct DDS_PublicationBuiltinTopicData rem_publication_data =
DDS_PublicationBuiltinTopicData_INITIALIZER;
rem_publication_data.key.value[DDS_BUILTIN_TOPIC_KEY_OBJECT_ID] =
k_OBJ_ID_PARTICIPANT01_DW01;
rem_publication_data.topic_name = DDS_String_dup(my_topic_name);
rem_publication_data.type_name =
DDS_String_dup(my_typeTypePlugin_get_default_type_name());
rem_publication_data.reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
// Now we can assert that a remote DomainParticipant (with the name held in
// k_PARTICIANT01_NAME) will contain a DataWriter described by the
// information in the rem_publication_data struct.
retcode = DPSE_RemotePublication_assert(
dp,
k_PARTICIPANT01_NAME,
&rem_publication_data,
my_type_get_key_kind(my_typeTypePlugin_get(),
NULL));
if (retcode != DDS_RETCODE_OK) {
printf("ERROR: failed to assert remote publication\n");
}
// Finally, now that all of the entities are created, we can enable them all
retcode = DDS_Entity_enable(DDS_DomainParticipant_as_entity(dp));
if(retcode != DDS_RETCODE_OK) {
printf("ERROR: failed to enable entity\n");
}
printf("Waiting for samples to arrive, press Ctrl-C to exit\n");
while (1) {
take_the_data(datareader);
OSAPI_Thread_sleep(500); // sleep 500ms before we check for data again
}
}