Class MessageQueue

Nested Relationships

Nested Types

Inheritance Relationships

Base Type

  • public std::enable_shared_from_this< MessageQueue >

Derived Type

Class Documentation

class MessageQueue : public std::enable_shared_from_this<MessageQueue>

Thread safe queue to send messages between nodes

Subclassed by dai::Node::Input

Public Types

using CallbackId = int

Alias for callback id.

Public Functions

explicit MessageQueue(unsigned int maxSize = 16, bool blocking = true)
explicit MessageQueue(std::string name, unsigned int maxSize = 16, bool blocking = true)
inline MessageQueue(const MessageQueue &c)
inline MessageQueue(MessageQueue &&m) noexcept
inline MessageQueue &operator=(const MessageQueue &c)
inline MessageQueue &operator=(MessageQueue &&m) noexcept
virtual ~MessageQueue()
std::string getName() const

Get name of the queue.

void setName(std::string name)

Set the name of the queue

bool isClosed() const

Check whether queue is closed

void close()

Closes the queue and unblocks any waiting consumers or producers

void setBlocking(bool blocking)

Sets queue behavior when full (maxSize)

Parameters:

blocking – Specifies if block or overwrite the oldest message in the queue

bool getBlocking() const

Gets current queue behavior when full (maxSize)

Returns:

True if blocking, false otherwise

void setMaxSize(unsigned int maxSize)

Sets queue maximum size

Note

If maxSize is smaller than size, queue will not be truncated immediately, only after messages are popped

Parameters:

maxSize – Specifies maximum number of messages in the queue

unsigned int getMaxSize() const

Gets queue maximum size

Returns:

Maximum queue size

unsigned int getSize() const

Gets queue current size

Returns:

Current queue size

unsigned int isFull() const

Gets whether queue is full

Returns:

True if queue is full, false otherwise

CallbackId addCallback(std::function<void(std::string, std::shared_ptr<ADatatype>)>)

Adds a callback on message received

Parameters:

callback – Callback function with queue name and message pointer

Returns:

Callback id

CallbackId addCallback(const std::function<void(std::shared_ptr<ADatatype>)>&)

Adds a callback on message received

Parameters:

callback – Callback function with message pointer

Returns:

Callback id

CallbackId addCallback(const std::function<void()> &callback)

Adds a callback on message received

Parameters:

callback – Callback function without any parameters

Returns:

Callback id

bool removeCallback(CallbackId callbackId)

Removes a callback

Parameters:

callbackId – Id of callback to be removed

Returns:

True if callback was removed, false otherwise

template<class T>
inline bool has()

Check whether front of the queue has message of type T

Returns:

True if queue isn’t empty and the first element is of type T, false otherwise

inline bool has()

Check whether front of the queue has a message (isn’t empty)

Returns:

True if queue isn’t empty, false otherwise

template<class T>
inline std::shared_ptr<T> tryGet()

Try to retrieve message T from queue. If message isn’t of type T it returns nullptr

Returns:

Message of type T or nullptr if no message available

inline std::shared_ptr<ADatatype> tryGet()

Try to retrieve message from queue. If no message available, return immediately with nullptr

Returns:

Message or nullptr if no message available

template<class T>
inline std::shared_ptr<T> get()

Block until a message is available.

Returns:

Message of type T or nullptr if no message available

inline std::shared_ptr<ADatatype> get()

Block until a message is available.

Returns:

Message or nullptr if no message available

template<class T>
inline std::shared_ptr<T> front()

Gets first message in the queue.

Returns:

Message of type T or nullptr if no message available

inline std::shared_ptr<ADatatype> front()

Gets first message in the queue.

Returns:

Message or nullptr if no message available

template<class T, typename Rep, typename Period>
inline std::shared_ptr<T> get(std::chrono::duration<Rep, Period> timeout, bool &hasTimedout)

Block until a message is available with a timeout.

Parameters:
  • timeout – Duration for which the function should block

  • hasTimedout[out] Outputs true if timeout occurred, false otherwise

Returns:

Message of type T otherwise nullptr if message isn’t type T or timeout occurred

template<typename Rep, typename Period>
inline std::shared_ptr<ADatatype> get(std::chrono::duration<Rep, Period> timeout, bool &hasTimedout)

Block until a message is available with a timeout.

Parameters:
  • timeout – Duration for which the function should block

  • hasTimedout[out] Outputs true if timeout occurred, false otherwise

Returns:

Message of type T otherwise nullptr if message isn’t type T or timeout occurred

template<class T>
inline std::vector<std::shared_ptr<T>> tryGetAll()

Try to retrieve all messages in the queue.

Returns:

Vector of messages which can either be of type T or nullptr

inline std::vector<std::shared_ptr<ADatatype>> tryGetAll()

Try to retrieve all messages in the queue.

Returns:

Vector of messages

template<class T>
inline std::vector<std::shared_ptr<T>> getAll()

Block until at least one message in the queue. Then return all messages from the queue.

Returns:

Vector of messages which can either be of type T or nullptr

inline std::vector<std::shared_ptr<ADatatype>> getAll()

Block until at least one message in the queue. Then return all messages from the queue.

Returns:

Vector of messages

template<class T, typename Rep, typename Period>
inline std::vector<std::shared_ptr<T>> getAll(std::chrono::duration<Rep, Period> timeout, bool &hasTimedout)

Block for maximum timeout duration. Then return all messages from the queue.

Parameters:
  • timeout – Maximum duration to block

  • hasTimedout[out] Outputs true if timeout occurred, false otherwise

Returns:

Vector of messages which can either be of type T or nullptr

template<typename Rep, typename Period>
inline std::vector<std::shared_ptr<ADatatype>> getAll(std::chrono::duration<Rep, Period> timeout, bool &hasTimedout)

Block for maximum timeout duration. Then return all messages from the queue.

Parameters:
  • timeout – Maximum duration to block

  • hasTimedout[out] Outputs true if timeout occurred, false otherwise

Returns:

Vector of messages

void send(const std::shared_ptr<ADatatype> &msg)

Adds a message to the queue, which will be picked up and sent to the device. Can either block if ‘blocking’ behavior is true or overwrite oldest

Parameters:

msg – Message to add to the queue

bool send(const std::shared_ptr<ADatatype> &msg, std::chrono::milliseconds timeout)

Adds message to the queue, which will be picked up and sent to the device. Can either block until timeout if ‘blocking’ behavior is true or overwrite oldest

Parameters:
  • msg – Message to add to the queue

  • timeout – Maximum duration to block in milliseconds

bool send(const ADatatype &msg, std::chrono::milliseconds timeout)

Adds message to the queue, which will be picked up and sent to the device. Can either block until timeout if ‘blocking’ behavior is true or overwrite oldest

Parameters:
  • msg – Message to add to the queue

  • timeout – Maximum duration to block in milliseconds

bool trySend(const std::shared_ptr<ADatatype> &msg)

Tries sending a message

Parameters:

msg – message to send

Public Members

std::mutex callbacksMtx
std::unordered_map<CallbackId, std::function<void(std::string, std::shared_ptr<ADatatype>)>> callbacks
CallbackId uniqueCallbackId = {0}
class QueueException : public std::runtime_error

Public Functions

inline explicit QueueException(const std::string &message)
~QueueException() noexcept override