1.4. Writing a simple Python publisher and subscriber application
This section details how to create a simple Fast DDS application with a publisher and a subscriber using Python API step by step.
1.4.1. Background
DDS is a data-centric communications middleware that implements the DCPS model. This model is based on the development of a publisher, a data generating element; and a subscriber, a data consuming element. These entities communicate by means of the topic, an element that binds both DDS entities. Publishers generate information under a topic and subscribers subscribe to this same topic to receive information.
1.4.2. Prerequisites
First of all, you need to follow the steps outlined in the Installation Manual for the installation of eProsima Fast DDS and all its dependencies. You also need to have completed the steps outlined in the Installation Manual for the installation of the eProsima Fast DDS-Gen tool. Moreover, all the commands provided in this tutorial are outlined for a Linux environment.
1.4.3. Create the application workspace
The application workspace will have the following structure at the end of the project.
Files HelloWorldPublisher.py
and HelloWorldSubscriber.py
are the Publisher application and
Subscriber application respectively.
.
├── CMakeCache.txt
├── CMakeFiles
├── CMakeLists.txt
├── HelloWorld.hpp
├── HelloWorld.i
├── HelloWorld.idl
├── HelloWorld.py
├── HelloWorldCdrAux.hpp
├── HelloWorldCdrAux.ipp
├── HelloWorldPubSubTypes.cxx
├── HelloWorldPubSubTypes.h
├── HelloWorldPubSubTypes.i
├── HelloWorldPublisher.py
├── HelloWorldSubscriber.py
├── HelloWorldTypeObjectSupport.cxx
├── HelloWorldTypeObjectSupport.hpp
├── Makefile
├── _HelloWorldWrapper.so
├── cmake_install.cmake
└── libHelloWorld.so
Let’s create the directory tree first.
mkdir workspace_HelloWorld && cd workspace_HelloWorld
1.4.4. Import linked libraries and its dependencies
The DDS application requires the Fast DDS, Fast CDR and Fast DDS Python bindings libraries. Depending on the installation procedure followed the process of making these libraries available for our DDS application will be slightly different.
1.4.4.1. Colcon installation
From a Colcon installation there are several ways to import the libraries. If the libraries need to be available just for the current session, run the following command.
source <path/to/Fast-DDS-python/workspace>/install/setup.bash
They can be made accessible from any session by adding the Fast DDS installation directory to your $PATH
variable in the shell configuration files for the current user running the following command.
echo 'source <path/to/Fast-DDS-python/workspace>/install/setup.bash' >> ~/.bashrc
This will set up the environment after each of this user’s logins.
1.4.5. Build the topic data type
eProsima Fast DDS-Gen is a Java application that generates source code using the data types defined in an Interface Description Language (IDL) file. This application can do two different things:
Generate C++ definitions for your custom topic.
Generate SWIG interface files to generate the Python bindings for your custom topic.
For this project, we will use the Fast DDS-Gen application to define the data type of the messages that will be sent by the publishers and received by the subscribers.
In the workspace directory, execute the following commands:
touch HelloWorld.idl
This creates the HelloWorld.idl file. Open the file in a text editor and copy and paste the following snippet of code.
struct HelloWorld
{
unsigned long index;
string message;
};
By doing this we have defined the HelloWorld
data type, which has two elements: an index of type uint32_t
and a message of type std::string
.
All that remains is to generate the source code that implements this data type in C++11 and the
SWIG interface files for the Python bindings.
To do this, run the following command.
<path/to/Fast DDS-Gen>/scripts/fastddsgen -python HelloWorld.idl
This must have generated the following files:
HelloWorld.hpp: HelloWorld C++ type definition.
HelloWorld.i: SWIG interface file for HelloWorld C++ type definition.
HelloWorldPubSubTypes.cxx: C++ interface used by Fast DDS to support HelloWorld type.
HelloWorldPubSubTypes.h: C++ header file for HelloWorldPubSubTypes.cxx.
HelloWorldPubSubTypes.i: SWIG interface file for C++ Serialization and Deserialization code.
HelloWorldCdrAux.ipp: C++ serialization and deserialization code for the HelloWorld type.
HelloWorldCdrAux.hpp: C++ header file for HelloWorldCdrAux.ipp.
HelloWorldTypeObjectSupport.cxx: TypeObject representation registration code.
HelloWorldTypeObjectSupport.hpp: Header file for HelloWorldTypeObjectSupport.cxx.
CMakeLists.txt: CMake file to generate C++ source code and Python module from the SWIG interface files, compile and generate C++ libraries.
After that, the python bindings can be generated by running the following command.
cmake .
make
This must have generated the python binding:
HelloWorld.py: Python module to be imported by your Python example.
1.4.5.1. CMakeLists.txt
At this point the project is ready for building, compiling and generating Python bindings for this data type. From the workspace, run the following commands.
cmake .
make
1.4.6. Write the Fast DDS publisher
From the workspace, run the following command to download the HelloWorldPublisher.py file.
wget -O HelloWorldPublisher.py \
https://raw.githubusercontent.com/eProsima/Fast-RTPS-docs/master/code/Examples/Python/HelloWorld/HelloWorldPublisher.py
This is the Python source code for the publisher application. It is going to send 10 publications under the topic HelloWorldTopic.
1# Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15HelloWorld Publisher
16"""
17from threading import Condition
18import time
19
20import fastdds
21import HelloWorld
22
23DESCRIPTION = """HelloWorld Publisher example for Fast DDS python bindings"""
24USAGE = ('python3 HelloWorldPublisher.py')
25
26class WriterListener (fastdds.DataWriterListener) :
27 def __init__(self, writer) :
28 self._writer = writer
29 super().__init__()
30
31
32 def on_publication_matched(self, datawriter, info) :
33 if (0 < info.current_count_change) :
34 print ("Publisher matched subscriber {}".format(info.last_subscription_handle))
35 self._writer._cvDiscovery.acquire()
36 self._writer._matched_reader += 1
37 self._writer._cvDiscovery.notify()
38 self._writer._cvDiscovery.release()
39 else :
40 print ("Publisher unmatched subscriber {}".format(info.last_subscription_handle))
41 self._writer._cvDiscovery.acquire()
42 self._writer._matched_reader -= 1
43 self._writer._cvDiscovery.notify()
44 self._writer._cvDiscovery.release()
45
46
47class Writer:
48
49
50 def __init__(self):
51 self._matched_reader = 0
52 self._cvDiscovery = Condition()
53 self.index = 0
54
55 factory = fastdds.DomainParticipantFactory.get_instance()
56 self.participant_qos = fastdds.DomainParticipantQos()
57 factory.get_default_participant_qos(self.participant_qos)
58 self.participant = factory.create_participant(0, self.participant_qos)
59
60 self.topic_data_type = HelloWorld.HelloWorldPubSubType()
61 self.topic_data_type.setName("HelloWorld")
62 self.type_support = fastdds.TypeSupport(self.topic_data_type)
63 self.participant.register_type(self.type_support)
64
65 self.topic_qos = fastdds.TopicQos()
66 self.participant.get_default_topic_qos(self.topic_qos)
67 self.topic = self.participant.create_topic("HelloWorldTopic", self.topic_data_type.getName(), self.topic_qos)
68
69 self.publisher_qos = fastdds.PublisherQos()
70 self.participant.get_default_publisher_qos(self.publisher_qos)
71 self.publisher = self.participant.create_publisher(self.publisher_qos)
72
73 self.listener = WriterListener(self)
74 self.writer_qos = fastdds.DataWriterQos()
75 self.publisher.get_default_datawriter_qos(self.writer_qos)
76 self.writer = self.publisher.create_datawriter(self.topic, self.writer_qos, self.listener)
77
78
79 def write(self):
80 data = HelloWorld.HelloWorld()
81 data.message("Hello World")
82 data.index(self.index)
83 self.writer.write(data)
84 print("Sending {message} : {index}".format(message=data.message(), index=data.index()))
85 self.index = self.index + 1
86
87
88 def wait_discovery(self) :
89 self._cvDiscovery.acquire()
90 print ("Writer is waiting discovery...")
91 self._cvDiscovery.wait_for(lambda : self._matched_reader != 0)
92 self._cvDiscovery.release()
93 print("Writer discovery finished...")
94
95
96 def run(self):
97 self.wait_discovery()
98 for x in range(10) :
99 time.sleep(1)
100 self.write()
101 self.delete()
102
103
104 def delete(self):
105 factory = fastdds.DomainParticipantFactory.get_instance()
106 self.participant.delete_contained_entities()
107 factory.delete_participant(self.participant)
108
109
110if __name__ == '__main__':
111 print('Starting publisher.')
112 writer = Writer()
113 writer.run()
114 exit()
1.4.6.1. Examining the code
At the beginning of the file we import the Fast DDS Python bindings.
import fastdds
and also the Python module generated by Fast-DDS-Gen as described in Build the topic data type section.
import HelloWorld
Then, the WriterListener
class is defined by inheriting from the DataWriterListener
class.
This class overrides the default DataWriter listener callbacks, which allows the execution of routines in case of an
event.
The overridden callback on_publication_matched()
allows the definition of a series of actions when a new DataReader
is detected listening to the topic under which the DataWriter is publishing.
The info.current_count_change()
detects these changes of DataReaders that are matched to the
DataWriter.
This is a member in the MatchedStatus
structure that allows tracking changes in the status of
subscriptions.
class WriterListener (fastdds.DataWriterListener) :
def __init__(self, writer) :
self._writer = writer
super().__init__()
def on_publication_matched(self, datawriter, info) :
if (0 < info.current_count_change) :
print ("Publisher matched subscriber {}".format(info.last_subscription_handle))
self._writer._cvDiscovery.acquire()
self._writer._matched_reader += 1
self._writer._cvDiscovery.notify()
self._writer._cvDiscovery.release()
else :
print ("Publisher unmatched subscriber {}".format(info.last_subscription_handle))
self._writer._cvDiscovery.acquire()
self._writer._matched_reader -= 1
self._writer._cvDiscovery.notify()
self._writer._cvDiscovery.release()
The next block creates the Writer
class that implements a publisher.
class Writer:
The publisher’s initialization member function of the Writer
class are defined below.
This function performs several actions:
Uses the
DomainParticipantFactory
to create the participant.Registers the data type defined in the IDL.
Creates the topic for the publications.
Creates the publisher.
Creates the DataWriter with the listener previously created.
def __init__(self):
self._matched_reader = 0
self._cvDiscovery = Condition()
self.index = 0
factory = fastdds.DomainParticipantFactory.get_instance()
self.participant_qos = fastdds.DomainParticipantQos()
factory.get_default_participant_qos(self.participant_qos)
self.participant = factory.create_participant(0, self.participant_qos)
self.topic_data_type = HelloWorld.HelloWorldPubSubType()
self.topic_data_type.setName("HelloWorld")
self.type_support = fastdds.TypeSupport(self.topic_data_type)
self.participant.register_type(self.type_support)
self.topic_qos = fastdds.TopicQos()
self.participant.get_default_topic_qos(self.topic_qos)
self.topic = self.participant.create_topic("HelloWorldTopic", self.topic_data_type.getName(), self.topic_qos)
self.publisher_qos = fastdds.PublisherQos()
self.participant.get_default_publisher_qos(self.publisher_qos)
self.publisher = self.participant.create_publisher(self.publisher_qos)
self.listener = WriterListener(self)
self.writer_qos = fastdds.DataWriterQos()
self.publisher.get_default_datawriter_qos(self.writer_qos)
self.writer = self.publisher.create_datawriter(self.topic, self.writer_qos, self.listener)
To make the publication, the public member function write()
is implemented.
This is simply the writing of a change by the DataWriter object.
def write(self):
data = HelloWorld.HelloWorld()
data.message("Hello World")
data.index(self.index)
self.writer.write(data)
print("Sending {message} : {index}".format(message=data.message(), index=data.index()))
self.index = self.index + 1
To detect when a DataReader has matched, the public member function wait_discovery()
is implemented.
In the DataWriter’s listener callback which states that the DataWriter has matched with a DataReader
that listens to the publication topic, the data member _matched_reader
is updated.
It contains the number of DataReaders discovered.
Therefore, when the first DataReader has been discovered, the application starts to publish.
def wait_discovery(self) :
self._cvDiscovery.acquire()
print ("Writer is waiting discovery...")
self._cvDiscovery.wait_for(lambda : self._matched_reader != 0)
self._cvDiscovery.release()
print("Writer discovery finished...")
The public run function waits until a DataReader is discovered and executes the action of publishing 10 samples.
def run(self):
self.wait_discovery()
for x in range(10) :
time.sleep(1)
self.write()
self.delete()
Finally, the Writer is initialized and run in main.
if __name__ == '__main__':
print('Starting publisher.')
writer = Writer()
writer.run()
exit()
1.4.7. Write the Fast DDS subscriber
From the workspace, run the following command to download the HelloWorldSubscriber.py file.
wget -O HelloWorldSubscriber.py \
https://raw.githubusercontent.com/eProsima/Fast-RTPS-docs/master/code/Examples/Python/HelloWorld/HelloWorldSubscriber.py
This is the Python source code for the subscriber application. The application runs a subscriber until the user press Ctrl+C receiving samples under the topic HelloWorldTopic.
1# Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15HelloWorld Subscriber
16"""
17import signal
18
19import fastdds
20import HelloWorld
21
22DESCRIPTION = """HelloWorld Subscriber example for Fast DDS python bindings"""
23USAGE = ('python3 HelloWorldSubscriber.py')
24
25# To capture ctrl+C
26def signal_handler(sig, frame):
27 print('Interrupted!')
28
29class ReaderListener(fastdds.DataReaderListener):
30
31
32 def __init__(self):
33 super().__init__()
34
35
36 def on_subscription_matched(self, datareader, info) :
37 if (0 < info.current_count_change) :
38 print ("Subscriber matched publisher {}".format(info.last_publication_handle))
39 else :
40 print ("Subscriber unmatched publisher {}".format(info.last_publication_handle))
41
42
43 def on_data_available(self, reader):
44 info = fastdds.SampleInfo()
45 data = HelloWorld.HelloWorld()
46 reader.take_next_sample(data, info)
47
48 print("Received {message} : {index}".format(message=data.message(), index=data.index()))
49
50
51class Reader:
52
53
54 def __init__(self):
55 factory = fastdds.DomainParticipantFactory.get_instance()
56 self.participant_qos = fastdds.DomainParticipantQos()
57 factory.get_default_participant_qos(self.participant_qos)
58 self.participant = factory.create_participant(0, self.participant_qos)
59
60 self.topic_data_type = HelloWorld.HelloWorldPubSubType()
61 self.topic_data_type.setName("HelloWorld")
62 self.type_support = fastdds.TypeSupport(self.topic_data_type)
63 self.participant.register_type(self.type_support)
64
65 self.topic_qos = fastdds.TopicQos()
66 self.participant.get_default_topic_qos(self.topic_qos)
67 self.topic = self.participant.create_topic("HelloWorldTopic", self.topic_data_type.getName(), self.topic_qos)
68
69 self.subscriber_qos = fastdds.SubscriberQos()
70 self.participant.get_default_subscriber_qos(self.subscriber_qos)
71 self.subscriber = self.participant.create_subscriber(self.subscriber_qos)
72
73 self.listener = ReaderListener()
74 self.reader_qos = fastdds.DataReaderQos()
75 self.subscriber.get_default_datareader_qos(self.reader_qos)
76 self.reader = self.subscriber.create_datareader(self.topic, self.reader_qos, self.listener)
77
78
79 def delete(self):
80 factory = fastdds.DomainParticipantFactory.get_instance()
81 self.participant.delete_contained_entities()
82 factory.delete_participant(self.participant)
83
84
85 def run(self):
86 signal.signal(signal.SIGINT, signal_handler)
87 print('Press Ctrl+C to stop')
88 signal.pause()
89 self.delete()
90
91
92if __name__ == '__main__':
93 print('Creating subscriber.')
94 reader = Reader()
95 reader.run()
96 exit()
1.4.7.1. Examining the code
Since the source code of both the publisher and subscriber applications is mostly identical, this document will focus on the main differences between them, omitting the parts of the code that have already been explained.
Following the same structure as in the publisher explanation, the first step is the implementation of the data reader
listener.
The first overridden callback of the ReaderListener is the on_subscription_matched()
,
which is the analog of the on_publication_matched()
callback of the DataWriter.
def on_subscription_matched(self, datareader, info) :
if (0 < info.current_count_change) :
print ("Subscriber matched publisher {}".format(info.last_publication_handle))
else :
print ("Subscriber unmatched publisher {}".format(info.last_publication_handle))
The second overridden callback is on_data_available()
.
In this, the next received sample that the data reader can access is taken and processed to display its content.
It is here that the object of the SampleInfo
class is defined, which determines whether a sample has
already been read or taken.
def on_data_available(self, reader):
info = fastdds.SampleInfo()
data = HelloWorld.HelloWorld()
reader.take_next_sample(data, info)
The next line defines the Reader
class that implements a subscriber.
class Reader:
Next comes the subscriber initialization public member function.
This is the same as the initialization public member function defined for the Writer
.
def __init__(self):
factory = fastdds.DomainParticipantFactory.get_instance()
self.participant_qos = fastdds.DomainParticipantQos()
factory.get_default_participant_qos(self.participant_qos)
self.participant = factory.create_participant(0, self.participant_qos)
self.topic_data_type = HelloWorld.HelloWorldPubSubType()
self.topic_data_type.setName("HelloWorld")
self.type_support = fastdds.TypeSupport(self.topic_data_type)
self.participant.register_type(self.type_support)
self.topic_qos = fastdds.TopicQos()
self.participant.get_default_topic_qos(self.topic_qos)
self.topic = self.participant.create_topic("HelloWorldTopic", self.topic_data_type.getName(), self.topic_qos)
self.subscriber_qos = fastdds.SubscriberQos()
self.participant.get_default_subscriber_qos(self.subscriber_qos)
self.subscriber = self.participant.create_subscriber(self.subscriber_qos)
self.listener = ReaderListener()
self.reader_qos = fastdds.DataReaderQos()
self.subscriber.get_default_datareader_qos(self.reader_qos)
self.reader = self.subscriber.create_datareader(self.topic, self.reader_qos, self.listener)
The public member function run()
ensures that the subscriber runs until the user press Ctrl+C.
def run(self):
signal.signal(signal.SIGINT, signal_handler)
print('Press Ctrl+C to stop')
signal.pause()
self.delete()
Finally, the participant that implements a subscriber is initialized and run in main.
if __name__ == '__main__':
print('Creating subscriber.')
reader = Reader()
reader.run()
exit()
1.4.8. Putting all together
Finally, from the build directory, run the publisher and subscriber applications from two terminals.
python3 HelloWorldPublisher.py
python3 HelloWorldSubscriber.py
1.4.9. Summary
In this tutorial you have built a Python publisher and a subscriber DDS application. You have also learned how to generate from an IDL file the specific Python module for your Topic data type.
1.4.10. Next steps
In the eProsima Fast DDS Github repository you will find more complex examples that implement DDS communication for a multitude of use cases and scenarios. You can find them here.