rosbridge_library.protocol module

class rosbridge_library.protocol.Protocol(client_id: str, node_handle: Node)

Bases: object

The interface for a single client to interact with ROS.

See rosbridge_protocol for the default protocol used by rosbridge

The lifecycle for a Protocol instance is as follows: - Pass incoming messages from the client to incoming - Propagate outgoing messages to the client by overriding outgoing - Call finish to clean up resources when the client is finished

add_capability(capability_class: type[Capability]) None

Add a capability to the protocol.

This method is for convenience; assumes the default capability constructor.

Parameters:

capability_class – The class of the capability to add

bson_only_mode = False
buffer = ''
busy = False
delay_between_messages = 0
deserialize(msg: str | bytes | bytearray, cid: str | None = None) dict[str, Any]

Turn the wire-level representation into a dictionary of values.

Default behaviour assumes JSON. Override to use a different container.

Parameters:
  • msg – The wire-level message to deserialize

  • cid – (optional) An ID associated with this. Is logged on error

Returns:

a dictionary of values

external_action_list: dict[str, AdvertisedActionHandler]
external_service_list: dict[str, AdvertisedServiceHandler]
finish() None

Indicate that the client is finished and clean up resources.

All clients should call this method after disconnecting.

fragment_size = None
incoming(message_string: str = '') None

Process an incoming message from the client.

Parameters:

message_string – The wire-level message sent by the client

log(level: str, message: str, lid: str | None = None) None

Log a message to the client.

By default just sends the message to the node logger.

Parameters:
  • level – The logger level of this message

  • message – The string message to send to the user

  • lid – An associated id for this log message

old_buffer = ''
outgoing(message: bson.BSON | bytearray | str, compression: str = 'none') None

Pass an outgoing message to the client.

This method should be overridden.

Parameters:

message – The wire-level message to send to the client

parameters = None
png = None
register_operation(opcode: str, handler: Callable[[dict[str, Any]], None]) None

Register a handler for an opcode.

Parameters:
  • opcode – The opcode to register this handler for

  • handler – A callback function to call for messages with this opcode

send(message: dict[str, Any] | bytes, cid: str | None = None, compression: str = 'none') None

Prepare a message for sending to the client.

Called internally in preparation for sending messages to the client.

This method pre-processes the message then passes it to the overridden outgoing method.

Parameters:
  • message – A dict of message values to be marshalled and sent

  • cid – (optional) An associated id

serialize(msg: bytearray | bson.BSON | dict[str, Any], cid: str | None = None) bson.BSON | bytearray | str | None

Turn a dictionary of values into the appropriate wire-level representation.

Default behaviour uses JSON. Override to use a different container.

Parameters:
  • msg – The dictionary of values to serialize

  • cid – (optional) An ID associated with this. Will be logged on err.

Returns:

a JSON string representing the dictionary

unregister_operation(opcode: str) None

Unregister a handler for an opcode.

Parameters:

opcode – The opcode to unregister the handler for

rosbridge_library.protocol.has_binary(obj: object) bool

Return True if obj is a binary or contains a binary attribute.

rosbridge_library.protocol.is_number(s: object) bool | None