18.2.11.1. RTPSWriter

class RTPSWriter : public eprosima::fastdds::rtps::Endpoint, public eprosima::fastdds::statistics::StatisticsWriterImpl

Class RTPSWriter, manages the sending of data to the readers. Is always associated with a HistoryCache.

Public Functions

template<typename T>
inline CacheChange_t *new_change(T &data, ChangeKind_t changeKind, InstanceHandle_t handle = c_InstanceHandle_Unknown)

Create a new change based with the provided changeKind.

Parameters:
  • data – Data of the change.

  • changeKind – The type of change.

  • handle – InstanceHandle to assign.

Returns:

Pointer to the CacheChange or nullptr if incorrect.

bool release_change(CacheChange_t *change)

Release a change when it is not being used anymore.

Parameters:

change – Pointer to the cache change to be released.

Returns:

whether the operation succeeded or not

Pre:

  • change is not nullptr

  • change points to a cache change obtained from a call to this->new_change

Post:

memory pointed to by change is not accessed

virtual bool matched_reader_add(const ReaderProxyData &data) = 0

Add a matched reader.

Parameters:

data – Pointer to the ReaderProxyData object added.

Returns:

True if added.

virtual bool matched_reader_remove(const GUID_t &reader_guid) = 0

Remove a matched reader.

Parameters:

reader_guid – GUID of the reader to remove.

Returns:

True if removed.

virtual bool matched_reader_is_matched(const GUID_t &reader_guid) = 0

Tells us if a specific Reader is matched against this writer.

Parameters:

reader_guid – GUID of the reader to check.

Returns:

True if it was matched.

virtual void reader_data_filter(fastdds::rtps::IReaderDataFilter *filter) = 0

Set a content filter to perform content filtering on this writer.

This method sets a content filter that will be used to check whether a cache change is relevant for a reader or not.

Parameters:

filter – The content filter to use on this writer. May be nullptr to remove the content filter (i.e. treat all samples as relevant).

virtual const fastdds::rtps::IReaderDataFilter *reader_data_filter() const = 0

Get the content filter used to perform content filtering on this writer.

Returns:

The content filter used on this writer.

inline virtual bool has_been_fully_delivered(const SequenceNumber_t &seq_num) const

Check if a specific change has been delivered to the transport layer of every matched remote RTPSReader at least once.

Parameters:

seq_num – Sequence number of the change to check.

Returns:

true if delivered. False otherwise.

inline virtual bool is_acked_by_all(const CacheChange_t*) const

Check if a specific change has been acknowledged by all Readers. Is only useful in reliable Writer. In BE Writers returns false when pending to be sent.

Returns:

True if acknowledged by all.

inline virtual bool wait_for_all_acked(const Duration_t&)

Waits until all changes were acknowledged or max_wait.

Returns:

True if all were acknowledged.

virtual void updateAttributes(const WriterAttributes &att) = 0

Update the Attributes of the Writer.

Parameters:

att – New attributes

SequenceNumber_t get_seq_num_min()

Get Min Seq Num in History.

Returns:

Minimum sequence number in history

SequenceNumber_t get_seq_num_max()

Get Max Seq Num in History.

Returns:

Maximum sequence number in history

uint32_t getTypeMaxSerialized()

Get maximum size of the serialized type

Returns:

Maximum size of the serialized type

uint32_t getMaxDataSize()

Get maximum size of the data.

uint32_t calculateMaxDataSize(uint32_t length)

Calculates the maximum size of the data.

inline WriterListener *getListener()

Get listener

Returns:

Listener

inline bool isAsync() const

Get the publication mode

Returns:

publication mode

bool remove_older_changes(unsigned int max = 0)

Remove an specified max number of changes

Parameters:

max – Maximum number of changes to remove.

Returns:

at least one change has been removed

inline virtual bool get_disable_positive_acks() const

Returns if disable positive ACKs QoS is enabled.

Returns:

Best effort writers always return false. Reliable writers override this method.

virtual bool try_remove_change(const std::chrono::steady_clock::time_point &max_blocking_time_point, std::unique_lock<RecursiveTimedMutex> &lock) = 0

Tries to remove a change waiting a maximum of the provided microseconds.

Parameters:
  • max_blocking_time_point – Maximum time to wait for.

  • lock – Lock of the Change list.

Returns:

at least one change has been removed

virtual bool wait_for_acknowledgement(const SequenceNumber_t &seq, const std::chrono::steady_clock::time_point &max_blocking_time_point, std::unique_lock<RecursiveTimedMutex> &lock) = 0

Waits till a change has been acknowledged.

Parameters:
  • seq – Sequence number to wait for acknowledgement.

  • max_blocking_time_point – Maximum time to wait for.

  • lock – Lock of the Change list.

Returns:

true when change was acknowledged, false when timeout is reached.

inline RTPSParticipantImpl *getRTPSParticipant() const

Get RTPS participant

Returns:

RTPS participant

inline void set_separate_sending(bool enable)

Enable or disable sending data to readers separately NOTE: This will only work for synchronous writers

Parameters:

enable – If separate sending should be enabled

inline bool get_separate_sending() const

Inform if data is sent to readers separately

Returns:

true if separate sending is enabled

inline virtual bool process_acknack(const GUID_t &writer_guid, const GUID_t &reader_guid, uint32_t ack_count, const SequenceNumberSet_t &sn_set, bool final_flag, bool &result, fastdds::rtps::VendorId_t origin_vendor_id = c_VendorId_Unknown)

Process an incoming ACKNACK submessage.

Parameters:
  • writer_guid[in] GUID of the writer the submessage is directed to.

  • reader_guid[in] GUID of the reader originating the submessage.

  • ack_count[in] Count field of the submessage.

  • sn_set[in] Sequence number bitmap field of the submessage.

  • final_flag[in] Final flag field of the submessage.

  • result[out] true if the writer could process the submessage. Only valid when returned value is true.

  • origin_vendor_id[in] VendorId of the source participant from which the message was received

Returns:

true when the submessage was destinated to this writer, false otherwise.

inline virtual bool process_nack_frag(const GUID_t &writer_guid, const GUID_t &reader_guid, uint32_t ack_count, const SequenceNumber_t &seq_num, const FragmentNumberSet_t fragments_state, bool &result, fastdds::rtps::VendorId_t origin_vendor_id = c_VendorId_Unknown)

Process an incoming NACKFRAG submessage.

Parameters:
  • writer_guid[in] GUID of the writer the submessage is directed to.

  • reader_guid[in] GUID of the reader originating the submessage.

  • ack_count[in] Count field of the submessage.

  • seq_num[in] Sequence number field of the submessage.

  • fragments_state[in] Fragment number bitmap field of the submessage.

  • result[out] true if the writer could process the submessage. Only valid when returned value is true.

  • origin_vendor_id[in] VendorId of the source participant from which the message was received

Returns:

true when the submessage was destinated to this writer, false otherwise.

const dds::LivelinessQosPolicyKind &get_liveliness_kind() const

A method to retrieve the liveliness kind.

Returns:

Liveliness kind

const Duration_t &get_liveliness_lease_duration() const

A method to retrieve the liveliness lease duration.

Returns:

Lease duration

const Duration_t &get_liveliness_announcement_period() const

A method to return the liveliness announcement period.

Returns:

The announcement period

bool is_datasharing_compatible() const
Returns:

Whether the writer is data sharing compatible or not

virtual DeliveryRetCode deliver_sample_nts(CacheChange_t *cache_change, RTPSMessageGroup &group, LocatorSelectorSender &locator_selector, const std::chrono::time_point<std::chrono::steady_clock> &max_blocking_time) = 0

Tells writer the sample can be sent to the network.

This function should be used by a fastdds::rtps::FlowController.

Note

Must be non-thread safe.

Parameters:
  • cache_change – Pointer to the CacheChange_t that represents the sample which can be sent.

  • group – RTPSMessageGroup reference uses for generating the RTPS message.

  • locator_selector – RTPSMessageSenderInterface reference uses for selecting locators. The reference has to be a member of this RTPSWriter object.

  • max_blocking_time – Future timepoint where blocking send should end.

Returns:

Return code.

virtual bool send_nts(const std::vector<eprosima::fastdds::rtps::NetworkBuffer> &buffers, const uint32_t &total_bytes, const LocatorSelectorSender &locator_selector, std::chrono::steady_clock::time_point &max_blocking_time_point) const

Send a message through this interface.

Parameters:
  • buffers – Vector of NetworkBuffers to send with data already serialized.

  • total_bytes – Total number of bytes to send. Should be equal to the sum of the size field of all buffers.

  • locator_selector – RTPSMessageSenderInterface reference uses for selecting locators. The reference has to be a member of this RTPSWriter object.

  • max_blocking_time_point – Future timepoint where blocking send should end.

Public Members

LivelinessLostStatus liveliness_lost_status_

Liveliness lost status of this writer.