Publisher-Subscriber Layer¶
eProsima Fast RTPS provides a high-level Publisher-Subscriber Layer, which is a simple to use abstraction over the RTPS protocol. By using this layer, you can code a straight-to-the-point application while letting the library take care of the lower level configuration.
How to use the Publisher-Subscriber Layer¶
We are going to use the example built in the previous section to explain how this layer works.
The first step is to create a Participant
instance, which will act as a container for the Publishers and
Subscribers our application needs. For this we use Domain
, a static class that manages RTPS entities.
We also need to pass a configuration structure for the Participant, which can be left in its default configuration for
now:
ParticipantAttributes participant_attr; //Configuration structure
Participant *participant = Domain::createParticipant(participant_attr);
The default configuration provides a basic working set of options with predefined ports for communications. During this tutorial, you will learn to tune eProsima Fast RTPS.
In order to use our topic, we have to register it within the Participant
using the code generated with
fastrtpsgen (see Introduction.
Once again, this is done by using the Domain
class:
HelloWorldPubSubType m_type; //Auto-generated type from FastRTPSGen
Domain::registerType(participant, &m_type);
Once set up, we instantiate a Publisher
within our Participant
:
PublisherAttributes publisher_attr; //Configuration structure
PubListener publisher_listener; //Class that implements callbacks from the publisher
Publisher *publisher = Domain::createPublisher(participant, publisher_attr, &publisher_listener);
Once the Publisher
is functional, posting data is a simple process:
HelloWorld sample; //Auto-generated container class for topic data from FastRTPSGen
sample.msg("Hello there!"); // Add contents to the message
publisher->write(&sample); //Publish
The Publisher
has a set of optional callback functions that are triggered when events happen.
An example is when a Subscriber
starts listening to our topic.
To implement these callbacks we create the class PubListener
, which inherits from the base class
PublisherListener
.
We pass an instance to this class during the creation of the Publisher
.
class PubListener : public PublisherListener
{
public:
PubListener() {}
~PubListener() {}
void onPublicationmatched(Publisher* pub, MatchingInfo& info)
{
//Callback implementation. This is called each time the Publisher finds a Subscriber on the network that listens to the same topic.
}
};
The Subscriber
creation and implementation are symmetric.
SubscriberAttributes subscriber_attr; //Configuration structure
SubListener subscriber_listener; //Class that implements callbacks from the Subscriber
Subscriber *subscriber = Domain::createSubscriber(participant, subscriber_attr, &subscriber_listener);
Incoming messages are processed within the callback that is called when a new message is received:
class SubListener: public SubscriberListener
{
public:
SubListener() {}
~SubListener() {}
void onNewDataMessage(Subscriber * sub)
{
if(sub->takeNextData((void*)&sample, &sample_info))
{
if(sample_info.sampleKind == ALIVE)
{
std::cout << "New message: " << sample.msg() << std::endl;
}
}
}
HelloWorld sample; //Storage for incoming messages
SampleInfo_t sample_info; //Auxiliary structure with meta-data on the message
};
Configuration¶
eProsima Fast RTPS entities can be configured through the code or XML profiles. This section will show both alternatives.
Participant configuration¶
The Participant
can be configured via the ParticipantAttributes
structure.
createParticipant
function accepts an instance of this structure.
ParticipantAttributes participant_attr;
participant_attr.rtps.setName("my_participant");
participant_attr.rtps.builtin.domainId = 80;
Participant *participant = Domain::createParticipant(participant_attr);
Also, it can be configured through an XML profile.
createParticipant
function accepts a name of an XML profile.
Participant *participant = Domain::createParticipant("participant_xml_profile");
About XML profiles you can learn more in XML profiles. This is an example of a participant XML profile.
<participant profile_name="participant_xml_conf_profile">
<rtps>
<name>my_participant</name>
<builtin>
<domainId>80</domainId>
</builtin>
</rtps>
</participant>
We will now go over the most common configuration options.
Participant name: the name of the
Participant
forms part of the meta-data of the RTPS protocol.C++ participant_attr.rtps.setName("my_participant");
XML <participant profile_name="participant_xml_conf_name_profile"> <rtps> <name>my_participant</name> </rtps> </participant>
DomainId: Publishers and Subscribers can only talk to each other if their Participants belong to the same DomainId.
C++ participant_attr.rtps.builtin.domainId = 80;
XML <participant profile_name="participant_xml_conf_domain_profile"> <rtps> <builtin> <domainId>80</domainId> </builtin> </rtps> </participant>
Mutation Tries: The reader’s physical port could be already bound. In that case, the Participant uses its mutation_tries attribute to determine how many different ports must try before failing. These mutated ports will modify the locator’s information. By default, its value is 100.
C++ participant_attr.rtps.builtin.mutation_tries = 55;
XML <participant profile_name="participant_xml_conf_mutation_tries_profile"> <rtps> <builtin> <mutation_tries>55</mutation_tries> </builtin> </rtps> </participant>
Publisher and Subscriber configuration¶
The Publisher
can be configured via the PublisherAttributes
structure and
createPublisher
function accepts an instance of this structure. The Subscriber
can be configured via the
SubscriberAttributes
structure and createSubscriber
function accepts an instance of this structure.
PublisherAttributes publisher_attr;
Publisher *publisher = Domain::createPublisher(participant, publisher_attr);
SubscriberAttributes subscriber_attr;
Subscriber *subscriber = Domain::createSubscriber(participant, subscriber_attr);
Also, these entities can be configured through an XML profile. createPublisher
and createSubscriber
functions
accept the name of an XML profile.
Publisher *publisher = Domain::createPublisher(participant, "publisher_xml_profile");
Subscriber *subscriber = Domain::createSubscriber(participant, "subscriber_xml_profile");
We will now go over the most common configuration options.
Topic information¶
The topic name and data type are used as meta-data to determine whether Publishers and Subscribers can exchange messages.
C++ |
publisher_attr.topic.topicDataType = "HelloWorldType";
publisher_attr.topic.topicName = "HelloWorldTopic";
subscriber_attr.topic.topicDataType = "HelloWorldType";
subscriber_attr.topic.topicName = "HelloWorldTopic";
|
XML |
<publisher profile_name="publisher_xml_conf_topic_profile">
<topic>
<dataType>HelloWorldType</dataType>
<name>HelloWorldTopic</name>
</topic>
</publisher>
<subscriber profile_name="subscriber_xml_conf_topic_profile">
<topic>
<dataType>HelloWorldType</dataType>
<name>HelloWorldTopic</name>
</topic>
</subscriber>
|
Reliability¶
The RTPS standard defines two behavior modes for message delivery:
- Best-Effort (default): Messages are sent without arrival confirmation from the receiver (subscriber). It is fast, but messages can be lost.
- Reliable: The sender agent (publisher) expects arrival confirmation from the receiver (subscriber). It is slower but prevents data loss.
C++ |
publisher_attr.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
subscriber_attr.qos.m_reliability.kind = BEST_EFFORT_RELIABILITY_QOS;
|
XML |
<publisher profile_name="publisher_xml_conf_reliability_profile">
<qos>
<reliability>
<kind>RELIABLE</kind>
</reliability>
</qos>
</publisher>
<subscriber profile_name="subscriber_xml_conf_reliability_profile">
<qos>
<reliability>
<kind>BEST_EFFORT</kind>
</reliability>
</qos>
</subscriber>
|
Some reliability combinations make a publisher and a subscriber incompatible and unable to talk to each other. Next table shows the incompatibilities.
Publisher \ Subscriber | Best Effort | Reliable |
Best Effort | ✓ | ✕ |
Reliable | ✓ | ✓ |
History¶
There are two policies for sample storage:
- Keep-All: Store all samples in memory.
- Keep-Last (Default): Store samples up to a maximum depth. When this limit is reached, they start to become overwritten.
C++ |
publisher_attr.topic.historyQos.kind = KEEP_ALL_HISTORY_QOS;
subscriber_attr.topic.historyQos.kind = KEEP_LAST_HISTORY_QOS;
subscriber_attr.topic.historyQos.depth = 5;
|
XML |
<publisher profile_name="publisher_xml_conf_history_profile">
<topic>
<historyQos>
<kind>KEEP_ALL</kind>
</historyQos>
</topic>
</publisher>
<subscriber profile_name="subscriber_xml_conf_history_profile">
<topic>
<historyQos>
<kind>KEEP_LAST</kind>
<depth>5</depth>
</historyQos>
</topic>
</subscriber>
|
Durability¶
Durability configuration of the endpoint defines how it behaves regarding samples that existed on the topic before a subscriber joins
- Volatile: Past samples are ignored, a joining subscriber receives samples generated after the moment it matches.
- Transient Local (Default): When a new subscriber joins, its History is filled with past samples.
- Transient: When a new subscriber joins, its History is filled with past samples, which are stored on persistent storage (see Persistence).
C++ |
publisher_attr.qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;
subscriber_attr.qos.m_durability.kind = VOLATILE_DURABILITY_QOS;
|
XML |
<publisher profile_name="publisher_xml_conf_durability_profile">
<qos>
<durability>
<kind>TRANSIENT_LOCAL</kind>
</durability>
</qos>
</publisher>
<subscriber profile_name="subscriber_xml_conf_durability_profile">
<qos>
<durability>
<kind>VOLATILE</kind>
</durability>
</qos>
</subscriber>
|
Deadline¶
The deadline QoS raises an alarm when the frequency of new samples falls below a certain threshold. It is useful for cases where data is expected to be updated periodically, requiring that each instance is updated periodically for topics with key.
On the publishing side, the deadline QoS defines the maximum period in which the application is expected to supply a new sample. On the subscribing side, it defines the maximum period in which new samples should be received. For publishers and subscribers to match, the offered deadline period must be less than or equal to the requested deadline period, otherwise the entities are considered to be incompatible.
C++ |
publisher_attr.qos.m_deadline.period = 1;
subscriber_attr.qos.m_deadline.period = 1;
|
XML |
<publisher profile_name="publisher_xml_conf_deadline_profile">
<qos>
<deadline>
<period>
<sec>1</sec>
</period>
</deadline>
</qos>
</publisher>
<subscriber profile_name="subscriber_xml_conf_deadline_profile">
<qos>
<deadline>
<period>
<sec>1</sec>
</period>
</deadline>
</qos>
</subscriber>
|
Lifespan¶
Specifies the maximum duration of validity of the data written by the publisher. When the lifespan period expires, data is removed from the history.
C++ |
publisher_attr.qos.m_lifespan.duration = 1;
subscriber_attr.qos.m_lifespan.duration = 1;
|
XML |
<publisher profile_name="publisher_xml_conf_lifespan_profile">
<qos>
<lifespan>
<duration>
<sec>1</sec>
</duration>
</lifespan>
</qos>
</publisher>
<subscriber profile_name="subscriber_xml_conf_lifespan_profile">
<qos>
<lifespan>
<duration>
<sec>1</sec>
</duration>
</lifespan>
</qos>
</subscriber>
|
Liveliness¶
Liveliness is a quality of service that can be used to ensure that particular entities on the network are “alive”. There are different settings that allow distinguishing between applications where data is updated periodically and applications where data is changed sporadically. It also allows customizing the application regarding the kind of failures that should be detected by the liveliness mechanism.
The AUTOMATIC liveliness kind is suitable for applications that only need to detect whether a remote application is still running. Therefore, as long as the local process where the participant is running and the link connecting it to remote participants exists, the entities within the remote participant will be considered alive.
The two manual settings require that liveliness is asserted periodically on the publishing side to consider that remote entities are alive. Liveliness can be asserted explicitly by calling the assert_liveliness operations on the publisher, or implicitly by writing data. The MANUAL_BY_PARTICIPANT setting only requires that one entity in the publishing side asserts liveliness to deduce that all other entities within that participant are also alive. The MANUAL_BY_TOPIC mode is more restrictive and requires that at least one instance within the publisher is asserted to consider that the publisher is alive.
Besides the liveliness kind, two additional parameters allow defining the application behavior. They are all listed in the table below.
Name | Description | Values | Default |
---|---|---|---|
<kind> |
Specifies how to manage liveliness. | AUTOMATIC ,
MANUAL_BY_TOPIC ,
MANUAL_BY_TOPIC |
AUTOMATIC |
<lease_duration> |
Amount of time to wait since the last message from a writer to consider that it is no longer alive. | DurationType | c_TimeInfinite |
<announcement_period> |
Amount of time between consecutive liveliness messages sent by the publisher. Only used for AUTOMATIC and MANUAL_BY_PARTICIPANT liveliness kinds. | DurationType | c_TimeInfinite |
C++ |
publisher_attr.qos.m_liveliness.announcement_period = 0.5;
publisher_attr.qos.m_liveliness.lease_duration = 1;
publisher_attr.qos.m_liveliness.kind = AUTOMATIC_LIVELINESS_QOS;
subscriber_attr.qos.m_liveliness.lease_duration = 1;
subscriber_attr.qos.m_liveliness.kind = AUTOMATIC_LIVELINESS_QOS;
|
XML |
<publisher profile_name="publisher_xml_conf_liveliness_profile">
<qos>
<liveliness>
<announcement_period>
<sec>0</sec>
<nanosec>1000000</nanosec>
</announcement_period>
<lease_duration>
<sec>1</sec>
</lease_duration>
<kind>AUTOMATIC</kind>
</liveliness>
</qos>
</publisher>
<subscriber profile_name="subscriber_xml_conf_liveliness_profile">
<qos>
<liveliness>
<lease_duration>
<sec>1</sec>
</lease_duration>
<kind>AUTOMATIC</kind>
</liveliness>
</qos>
</subscriber>
|
Resource limits¶
Allow controlling the maximum size of the History and other resources.
C++ |
publisher_attr.topic.resourceLimitsQos.max_samples = 200;
subscriber_attr.topic.resourceLimitsQos.max_samples = 200;
|
XML |
<publisher profile_name="publisher_xml_conf_resource_limits_profile">
<topic>
<resourceLimitsQos>
<max_samples>200</max_samples>
</resourceLimitsQos>
</topic>
</publisher>
<subscriber profile_name="subscriber_xml_conf_resource_limits_profile">
<topic>
<resourceLimitsQos>
<max_samples>200</max_samples>
</resourceLimitsQos>
</topic>
</subscriber>
|
Disable positive acks¶
This is an additional QoS that allows reducing network traffic when strict reliable communication is not required and bandwidth is limited. It consists in changing the default behavior by which positive acks are sent from readers to writers. Instead, only negative acks will be sent when a reader is missing a sample, but writers will keep data for a sufficient keep duration before considering it as acknowledged. A writer and a reader are incompatible (i.e. they will not match) if the latter is using this QoS but the former is not.
C++ |
publisher_attr.qos.m_disablePositiveACKs.enabled = true;
publisher_attr.qos.m_disablePositiveACKs.duration = 1;
subscriber_attr.qos.m_disablePositiveACKs.enabled = true;
|
XML |
<publisher profile_name="publisher_xml_conf_disable_positive_acks_profile">
<qos>
<disablePositiveAcks>
<enabled>true</enabled>
<duration>
<sec>1</sec>
</duration>
</disablePositiveAcks>
</qos>
</publisher>
<subscriber profile_name="subscriber_xml_conf_disable_positive_acks_profile">
<qos>
<disablePositiveAcks>
<enabled>true</enabled>
</disablePositiveAcks>
</qos>
</subscriber>
|
Unicast locators¶
They are network endpoints where the entity will receive data. For more information about the network, see Transports. Publishers and subscribers inherit unicast locators from the participant. You can set a different set of locators through this attribute.
C++ |
Locator_t new_locator;
new_locator.port = 7800;
subscriber_attr.unicastLocatorList.push_back(new_locator);
publisher_attr.unicastLocatorList.push_back(new_locator);
|
XML |
<publisher profile_name="publisher_xml_conf_unicast_locators_profile">
<unicastLocatorList>
<locator>
<udpv4>
<port>7800</port>
</udpv4>
</locator>
</unicastLocatorList>
</publisher>
<subscriber profile_name="subscriber_xml_conf_unicast_locators_profile">
<unicastLocatorList>
<locator>
<udpv4>
<port>7800</port>
</udpv4>
</locator>
</unicastLocatorList>
</subscriber>
|
Multicast locators¶
They are network endpoints where the entity will receive data. For more information about network configuration, see Transports. By default publishers and subscribers don’t use any multicast locator. This attribute is useful when you have a lot of entities and you want to reduce the network usage.
C++ |
Locator_t new_locator;
IPLocator::setIPv4(new_locator, "239.255.0.4");
new_locator.port = 7900;
subscriber_attr.multicastLocatorList.push_back(new_locator);
publisher_attr.multicastLocatorList.push_back(new_locator);
|
XML |
<publisher profile_name="publisher_xml_conf_multicast_locators_profile">
<multicastLocatorList>
<locator>
<udpv4>
<address>239.255.0.4</address>
<port>7900</port>
</udpv4>
</locator>
</multicastLocatorList>
</publisher>
<subscriber profile_name="subscriber_xml_conf_multicast_locators_profile">
<multicastLocatorList>
<locator>
<udpv4>
<address>239.255.0.4</address>
<port>7900</port>
</udpv4>
</locator>
</multicastLocatorList>
</subscriber>
|
Additional Concepts¶
Using message meta-data¶
When a message is taken from the Subscriber, an auxiliary SampleInfo_t
structure instance is also returned.
Static types |
HelloWorld sample;
SampleInfo_t sample_info;
subscriber->takeNextData((void*)&sample, &sample_info);
|
Dynamic types |
// input_type is an instance of DynamicPubSubType of out current dynamic type
DynamicPubSubType *pst = dynamic_cast<DynamicPubSubType*>(input_type);
DynamicData *sample = DynamicDataFactory::get_instance()->create_data(pst->GetDynamicType());
subscriber->takeNextData(sample, &sample_info);
|
This SampleInfo_t
structure contains meta-data on the incoming message:
- sampleKind: type of the sample, as defined by the RTPS Standard. Healthy messages from a topic are always ALIVE.
- WriterGUID: Signature of the sender (Publisher) the message comes from.
- OwnershipStrength: When several senders are writing the same data, this field can be used to determine which data is more reliable.
- SourceTimestamp: A timestamp on the sender side that indicates the moment the sample was encapsulated and sent.
This meta-data can be used to implement filters:
if( (sample_info.sampleKind == ALIVE) & (sample_info.ownershipStrength > 25) )
{
//Process data
}
Defining callbacks¶
As we saw in the example, both the Publisher
and Subscriber
have a set of callbacks you can use
in your application. These callbacks are to be implemented within classes that derive from
SubscriberListener
or PublisherListener
. The following table gathers information about
the possible callbacks that can be implemented in both cases:
Callback | Publisher | Subscriber |
---|---|---|
onNewDataMessage | N | Y |
onSubscriptionMatched | N | Y |
onPublicationMatched | Y | N |
on_offered_deadline_missed | Y | N |
on_requested_deadline_missed | N | Y |
on_liveliness_lost | Y | N |
on_liveliness_changed | N | Y |