1.4. Writing a simple Python publisher and subscriber application

This section details how to create a simple Fast DDS application with a publisher and a subscriber using Python API step by step.

1.4.1. Background

DDS is a data-centric communications middleware that implements the DCPS model. This model is based on the development of a publisher, a data generating element; and a subscriber, a data consuming element. These entities communicate by means of the topic, an element that binds both DDS entities. Publishers generate information under a topic and subscribers subscribe to this same topic to receive information.

1.4.2. Prerequisites

First of all, you need to follow the steps outlined in the Installation Manual for the installation of eProsima Fast DDS and all its dependencies. You also need to have completed the steps outlined in the Installation Manual for the installation of the eProsima Fast DDS-Gen tool. Moreover, all the commands provided in this tutorial are outlined for a Linux environment.

1.4.3. Create the application workspace

The application workspace will have the following structure at the end of the project. Files HelloWorldPublisher.py and HelloWorldSubscriber.py are the Publisher application and Subscriber application respectively.

.
├── CMakeCache.txt
├── CMakeFiles
├── CMakeLists.txt
├── HelloWorld.cxx
├── HelloWorld.h
├── HelloWorld.i
├── HelloWorld.idl
├── HelloWorld.py
├── HelloWorldPubSubTypes.cxx
├── HelloWorldPubSubTypes.h
├── HelloWorldPubSubTypes.i
├── HelloWorldPublisher.py
├── HelloWorldSubscriber.py
├── Makefile
├── _HelloWorldWrapper.so
├── cmake_install.cmake
└── libHelloWorld.so

Let’s create the directory tree first.

mkdir workspace_HelloWorld && cd workspace_HelloWorld

1.4.4. Import linked libraries and its dependencies

The DDS application requires the Fast DDS, Fast CDR and Fast DDS Python bindings libraries. Depending on the installation procedure followed the process of making these libraries available for our DDS application will be slightly different.

1.4.4.1. Colcon installation

From a Colcon installation there are several ways to import the libraries. If the libraries need to be available just for the current session, run the following command.

source <path/to/Fast-DDS-python/workspace>/install/setup.bash

They can be made accessible from any session by adding the Fast DDS installation directory to your $PATH variable in the shell configuration files for the current user running the following command.

echo 'source <path/to/Fast-DDS-python/workspace>/install/setup.bash' >> ~/.bashrc

This will set up the environment after each of this user’s logins.

1.4.5. Build the topic data type

eProsima Fast DDS-Gen is a Java application that generates source code using the data types defined in an Interface Description Language (IDL) file. This application can do two different things:

  1. Generate C++ definitions for your custom topic.

  2. Generate SWIG interface files to generate the Python bindings for your custom topic.

For this project, we will use the Fast DDS-Gen application to define the data type of the messages that will be sent by the publishers and received by the subscribers.

In the workspace directory, execute the following commands:

touch HelloWorld.idl

This creates the HelloWorld.idl file. Open the file in a text editor and copy and paste the following snippet of code.

struct HelloWorld
{
    unsigned long index;
    string message;
};

By doing this we have defined the HelloWorld data type, which has two elements: an index of type uint32_t and a message of type std::string. All that remains is to generate the source code that implements this data type in C++11 and the SWIG interface files for the Python bindings. To do this, run the following command.

<path/to/Fast DDS-Gen>/scripts/fastddsgen -python HelloWorld.idl

This must have generated the following files:

  • HelloWorld.cxx: HelloWorld C++ type definition.

  • HelloWorld.h: C++ header file for HelloWorld.cxx.

  • HelloWorld.i: SWIG interface file for HelloWorld C++ type definition.

  • HelloWorldPubSubTypes.cxx: C+`Serialization and Deserialization code for the HelloWorld type.

  • HelloWorldPubSubTypes.h: C++ header file for HelloWorldPubSubTypes.cxx.

  • HelloWorldPubSubTypes.i: SWIG interface file for C++ Serialization and Deserialization code.

  • CMakeLists.txt: CMake file to generate C++ source code and Python module from the SWIG interface files, compile and generate C++ libraries.

  • HelloWorld.py: Python module to be imported by your Python example.

1.4.5.1. CMakeLists.txt

At this point the project is ready for building, compiling and generating Python bindings for this data type. From the workspace, run the following commands.

cmake .
make

1.4.6. Write the Fast DDS publisher

From the workspace, run the following command to download the HelloWorldPublisher.py file.

wget -O HelloWorldPublisher.py \
    https://raw.githubusercontent.com/eProsima/Fast-RTPS-docs/master/code/Examples/Python/HelloWorld/HelloWorldPublisher.py

This is the Python source code for the publisher application. It is going to send 10 publications under the topic HelloWorldTopic.

  1# Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima).
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#     http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14"""
 15HelloWorld Publisher
 16"""
 17from threading import Condition
 18import time
 19
 20import fastdds
 21import HelloWorld
 22
 23DESCRIPTION = """HelloWorld Publisher example for Fast DDS python bindings"""
 24USAGE = ('python3 HelloWorldPublisher.py')
 25
 26class WriterListener (fastdds.DataWriterListener) :
 27    def __init__(self, writer) :
 28        self._writer = writer
 29        super().__init__()
 30
 31
 32    def on_publication_matched(self, datawriter, info) :
 33        if (0 < info.current_count_change) :
 34            print ("Publisher matched subscriber {}".format(info.last_subscription_handle))
 35            self._writer._cvDiscovery.acquire()
 36            self._writer._matched_reader += 1
 37            self._writer._cvDiscovery.notify()
 38            self._writer._cvDiscovery.release()
 39        else :
 40            print ("Publisher unmatched subscriber {}".format(info.last_subscription_handle))
 41            self._writer._cvDiscovery.acquire()
 42            self._writer._matched_reader -= 1
 43            self._writer._cvDiscovery.notify()
 44            self._writer._cvDiscovery.release()
 45
 46
 47class Writer:
 48
 49
 50    def __init__(self):
 51        self._matched_reader = 0
 52        self._cvDiscovery = Condition()
 53        self.index = 0
 54
 55        factory = fastdds.DomainParticipantFactory.get_instance()
 56        self.participant_qos = fastdds.DomainParticipantQos()
 57        factory.get_default_participant_qos(self.participant_qos)
 58        self.participant = factory.create_participant(0, self.participant_qos)
 59
 60        self.topic_data_type = HelloWorld.HelloWorldPubSubType()
 61        self.topic_data_type.setName("HelloWorld")
 62        self.type_support = fastdds.TypeSupport(self.topic_data_type)
 63        self.participant.register_type(self.type_support)
 64
 65        self.topic_qos = fastdds.TopicQos()
 66        self.participant.get_default_topic_qos(self.topic_qos)
 67        self.topic = self.participant.create_topic("HelloWorldTopic", self.topic_data_type.getName(), self.topic_qos)
 68
 69        self.publisher_qos = fastdds.PublisherQos()
 70        self.participant.get_default_publisher_qos(self.publisher_qos)
 71        self.publisher = self.participant.create_publisher(self.publisher_qos)
 72
 73        self.listener = WriterListener(self)
 74        self.writer_qos = fastdds.DataWriterQos()
 75        self.publisher.get_default_datawriter_qos(self.writer_qos)
 76        self.writer = self.publisher.create_datawriter(self.topic, self.writer_qos, self.listener)
 77
 78
 79    def write(self):
 80        data = HelloWorld.HelloWorld()
 81        data.message("Hello World")
 82        data.index(self.index)
 83        self.writer.write(data)
 84        print("Sending {message} : {index}".format(message=data.message(), index=data.index()))
 85        self.index = self.index + 1
 86
 87
 88    def wait_discovery(self) :
 89        self._cvDiscovery.acquire()
 90        print ("Writer is waiting discovery...")
 91        self._cvDiscovery.wait_for(lambda : self._matched_reader != 0)
 92        self._cvDiscovery.release()
 93        print("Writer discovery finished...")
 94
 95
 96    def run(self):
 97        self.wait_discovery()
 98        for x in range(10) :
 99            time.sleep(1)
100            self.write()
101        self.delete()
102
103
104    def delete(self):
105        factory = fastdds.DomainParticipantFactory.get_instance()
106        self.participant.delete_contained_entities()
107        factory.delete_participant(self.participant)
108
109
110if __name__ == '__main__':
111    print('Starting publisher.')
112    writer = Writer()
113    writer.run()
114    exit()

1.4.6.1. Examining the code

At the beginning of the file we import the Fast DDS Python bindings.

import fastdds

and also the Python module generated by Fast-DDS-Gen as described in Build the topic data type section.

import HelloWorld

Then, the WriterListener class is defined by inheriting from the DataWriterListener class. This class overrides the default DataWriter listener callbacks, which allows the execution of routines in case of an event. The overridden callback on_publication_matched() allows the definition of a series of actions when a new DataReader is detected listening to the topic under which the DataWriter is publishing. The info.current_count_change() detects these changes of DataReaders that are matched to the DataWriter. This is a member in the MatchedStatus structure that allows tracking changes in the status of subscriptions.

class WriterListener (fastdds.DataWriterListener) :
    def __init__(self, writer) :
        self._writer = writer
        super().__init__()


    def on_publication_matched(self, datawriter, info) :
        if (0 < info.current_count_change) :
            print ("Publisher matched subscriber {}".format(info.last_subscription_handle))
            self._writer._cvDiscovery.acquire()
            self._writer._matched_reader += 1
            self._writer._cvDiscovery.notify()
            self._writer._cvDiscovery.release()
        else :
            print ("Publisher unmatched subscriber {}".format(info.last_subscription_handle))
            self._writer._cvDiscovery.acquire()
            self._writer._matched_reader -= 1
            self._writer._cvDiscovery.notify()
            self._writer._cvDiscovery.release()

The next block creates the Writer class that implements a publisher.

class Writer:

The publisher’s initialization member function of the Writer class are defined below. This function performs several actions:

  1. Uses the DomainParticipantFactory to create the participant.

  2. Registers the data type defined in the IDL.

  3. Creates the topic for the publications.

  4. Creates the publisher.

  5. Creates the DataWriter with the listener previously created.

def __init__(self):
    self._matched_reader = 0
    self._cvDiscovery = Condition()
    self.index = 0

    factory = fastdds.DomainParticipantFactory.get_instance()
    self.participant_qos = fastdds.DomainParticipantQos()
    factory.get_default_participant_qos(self.participant_qos)
    self.participant = factory.create_participant(0, self.participant_qos)

    self.topic_data_type = HelloWorld.HelloWorldPubSubType()
    self.topic_data_type.setName("HelloWorld")
    self.type_support = fastdds.TypeSupport(self.topic_data_type)
    self.participant.register_type(self.type_support)

    self.topic_qos = fastdds.TopicQos()
    self.participant.get_default_topic_qos(self.topic_qos)
    self.topic = self.participant.create_topic("HelloWorldTopic", self.topic_data_type.getName(), self.topic_qos)

    self.publisher_qos = fastdds.PublisherQos()
    self.participant.get_default_publisher_qos(self.publisher_qos)
    self.publisher = self.participant.create_publisher(self.publisher_qos)

    self.listener = WriterListener(self)
    self.writer_qos = fastdds.DataWriterQos()
    self.publisher.get_default_datawriter_qos(self.writer_qos)
    self.writer = self.publisher.create_datawriter(self.topic, self.writer_qos, self.listener)

To make the publication, the public member function write() is implemented. This is simply the writing of a change by the DataWriter object.

    def write(self):
        data = HelloWorld.HelloWorld()
        data.message("Hello World")
        data.index(self.index)
        self.writer.write(data)
        print("Sending {message} : {index}".format(message=data.message(), index=data.index()))
        self.index = self.index + 1

To detect when a DataReader has matched, the public member function wait_discovery() is implemented. In the DataWriter’s listener callback which states that the DataWriter has matched with a DataReader that listens to the publication topic, the data member _matched_reader is updated. It contains the number of DataReaders discovered. Therefore, when the first DataReader has been discovered, the application starts to publish.

def wait_discovery(self) :
    self._cvDiscovery.acquire()
    print ("Writer is waiting discovery...")
    self._cvDiscovery.wait_for(lambda : self._matched_reader != 0)
    self._cvDiscovery.release()
    print("Writer discovery finished...")

The public run function waits until a DataReader is discovered and executes the action of publishing 10 samples.

def run(self):
    self.wait_discovery()
    for x in range(10) :
        time.sleep(1)
        self.write()
    self.delete()

Finally, the Writer is initialized and run in main.

if __name__ == '__main__':
    print('Starting publisher.')
    writer = Writer()
    writer.run()
    exit()

1.4.7. Write the Fast DDS subscriber

From the workspace, run the following command to download the HelloWorldPublisher.py file.

wget -O HelloWorldPublisher.py \
    https://raw.githubusercontent.com/eProsima/Fast-RTPS-docs/master/code/Examples/Python/HelloWorld/HelloWorldSubscriber.py

This is the Python source code for the subscriber application. The application runs a subscriber until the user press Ctrl+C receiving samples under the topic HelloWorldTopic.

 1# Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima).
 2#
 3# Licensed under the Apache License, Version 2.0 (the "License");
 4# you may not use this file except in compliance with the License.
 5# You may obtain a copy of the License at
 6#
 7#     http://www.apache.org/licenses/LICENSE-2.0
 8#
 9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15HelloWorld Subscriber
16"""
17import signal
18
19import fastdds
20import HelloWorld
21
22DESCRIPTION = """HelloWorld Subscriber example for Fast DDS python bindings"""
23USAGE = ('python3 HelloWorldSubscriber.py')
24
25# To capture ctrl+C
26def signal_handler(sig, frame):
27    print('Interrupted!')
28
29class ReaderListener(fastdds.DataReaderListener):
30
31
32    def __init__(self):
33        super().__init__()
34
35
36    def on_subscription_matched(self, datareader, info) :
37        if (0 < info.current_count_change) :
38            print ("Subscriber matched publisher {}".format(info.last_publication_handle))
39        else :
40            print ("Subscriber unmatched publisher {}".format(info.last_publication_handle))
41
42
43    def on_data_available(self, reader):
44        info = fastdds.SampleInfo()
45        data = HelloWorld.HelloWorld()
46        reader.take_next_sample(data, info)
47
48        print("Received {message} : {index}".format(message=data.message(), index=data.index()))
49
50
51class Reader:
52
53
54    def __init__(self):
55        factory = fastdds.DomainParticipantFactory.get_instance()
56        self.participant_qos = fastdds.DomainParticipantQos()
57        factory.get_default_participant_qos(self.participant_qos)
58        self.participant = factory.create_participant(0, self.participant_qos)
59
60        self.topic_data_type = HelloWorld.HelloWorldPubSubType()
61        self.topic_data_type.setName("HelloWorld")
62        self.type_support = fastdds.TypeSupport(self.topic_data_type)
63        self.participant.register_type(self.type_support)
64
65        self.topic_qos = fastdds.TopicQos()
66        self.participant.get_default_topic_qos(self.topic_qos)
67        self.topic = self.participant.create_topic("HelloWorldTopic", self.topic_data_type.getName(), self.topic_qos)
68
69        self.subscriber_qos = fastdds.SubscriberQos()
70        self.participant.get_default_subscriber_qos(self.subscriber_qos)
71        self.subscriber = self.participant.create_subscriber(self.subscriber_qos)
72
73        self.listener = ReaderListener()
74        self.reader_qos = fastdds.DataReaderQos()
75        self.subscriber.get_default_datareader_qos(self.reader_qos)
76        self.reader = self.subscriber.create_datareader(self.topic, self.reader_qos, self.listener)
77
78
79    def delete(self):
80        factory = fastdds.DomainParticipantFactory.get_instance()
81        self.participant.delete_contained_entities()
82        factory.delete_participant(self.participant)
83
84
85    def run(self):
86        signal.signal(signal.SIGINT, signal_handler)
87        print('Press Ctrl+C to stop')
88        signal.pause()
89        self.delete()
90
91
92if __name__ == '__main__':
93    print('Creating subscriber.')
94    reader = Reader()
95    reader.run()
96    exit()

1.4.7.1. Examining the code

Since the source code of both the publisher and subscriber applications is mostly identical, this document will focus on the main differences between them, omitting the parts of the code that have already been explained.

Following the same structure as in the publisher explanation, the first step is the implementation of the data reader listener. The first overridden callback of the ReaderListener is the on_subscription_matched(), which is the analog of the on_publication_matched() callback of the DataWriter.

def on_subscription_matched(self, datareader, info) :
    if (0 < info.current_count_change) :
        print ("Subscriber matched publisher {}".format(info.last_publication_handle))
    else :
        print ("Subscriber unmatched publisher {}".format(info.last_publication_handle))

The second overridden callback is on_data_available(). In this, the next received sample that the data reader can access is taken and processed to display its content. It is here that the object of the SampleInfo class is defined, which determines whether a sample has already been read or taken.

def on_data_available(self, reader):
    info = fastdds.SampleInfo()
    data = HelloWorld.HelloWorld()
    reader.take_next_sample(data, info)

The next line defines the Reader class that implements a subscriber.

class Reader:

Next comes the subscriber initialization public member function. This is the same as the initialization public member function defined for the Writer.

def __init__(self):
    factory = fastdds.DomainParticipantFactory.get_instance()
    self.participant_qos = fastdds.DomainParticipantQos()
    factory.get_default_participant_qos(self.participant_qos)
    self.participant = factory.create_participant(0, self.participant_qos)

    self.topic_data_type = HelloWorld.HelloWorldPubSubType()
    self.topic_data_type.setName("HelloWorld")
    self.type_support = fastdds.TypeSupport(self.topic_data_type)
    self.participant.register_type(self.type_support)

    self.topic_qos = fastdds.TopicQos()
    self.participant.get_default_topic_qos(self.topic_qos)
    self.topic = self.participant.create_topic("HelloWorldTopic", self.topic_data_type.getName(), self.topic_qos)

    self.subscriber_qos = fastdds.SubscriberQos()
    self.participant.get_default_subscriber_qos(self.subscriber_qos)
    self.subscriber = self.participant.create_subscriber(self.subscriber_qos)

    self.listener = ReaderListener()
    self.reader_qos = fastdds.DataReaderQos()
    self.subscriber.get_default_datareader_qos(self.reader_qos)
    self.reader = self.subscriber.create_datareader(self.topic, self.reader_qos, self.listener)

The public member function run() ensures that the subscriber runs until the user press Ctrl+C.

def run(self):
    signal.signal(signal.SIGINT, signal_handler)
    print('Press Ctrl+C to stop')
    signal.pause()
    self.delete()

Finally, the participant that implements a subscriber is initialized and run in main.

if __name__ == '__main__':
    print('Creating subscriber.')
    reader = Reader()
    reader.run()
    exit()

1.4.8. Putting all together

Finally, from the build directory, run the publisher and subscriber applications from two terminals.

python3 HelloWorldPublisher.py
python3 HelloWorldSubscriber.py

1.4.9. Summary

In this tutorial you have built a Python publisher and a subscriber DDS application. You have also learned how to generate from an IDL file the specific Python module for your Topic data type.

1.4.10. Next steps

In the eProsima Fast DDS Github repository you will find more complex examples that implement DDS communication for a multitude of use cases and scenarios. You can find them here.