rosbridge_library.protocol module
- class rosbridge_library.protocol.Protocol(client_id: str, node_handle: Node)
Bases:
object
The interface for a single client to interact with ROS.
See rosbridge_protocol for the default protocol used by rosbridge
The lifecycle for a Protocol instance is as follows: - Pass incoming messages from the client to incoming - Propagate outgoing messages to the client by overriding outgoing - Call finish to clean up resources when the client is finished
- add_capability(capability_class: type[Capability]) None
Add a capability to the protocol.
This method is for convenience; assumes the default capability constructor.
- Parameters:
capability_class – The class of the capability to add
- bson_only_mode = False
- buffer = ''
- busy = False
- delay_between_messages = 0
- deserialize(msg: str | bytes | bytearray, cid: str | None = None) dict[str, Any]
Turn the wire-level representation into a dictionary of values.
Default behaviour assumes JSON. Override to use a different container.
- Parameters:
msg – The wire-level message to deserialize
cid – (optional) An ID associated with this. Is logged on error
- Returns:
a dictionary of values
- external_action_list: dict[str, AdvertisedActionHandler]
- external_service_list: dict[str, AdvertisedServiceHandler]
- finish() None
Indicate that the client is finished and clean up resources.
All clients should call this method after disconnecting.
- fragment_size = None
- incoming(message_string: str = '') None
Process an incoming message from the client.
- Parameters:
message_string – The wire-level message sent by the client
- log(level: str, message: str, lid: str | None = None) None
Log a message to the client.
By default just sends the message to the node logger.
- Parameters:
level – The logger level of this message
message – The string message to send to the user
lid – An associated id for this log message
- old_buffer = ''
- outgoing(message: bson.BSON | bytearray | str, compression: str = 'none') None
Pass an outgoing message to the client.
This method should be overridden.
- Parameters:
message – The wire-level message to send to the client
- parameters = None
- png = None
- register_operation(opcode: str, handler: Callable[[dict[str, Any]], None]) None
Register a handler for an opcode.
- Parameters:
opcode – The opcode to register this handler for
handler – A callback function to call for messages with this opcode
- send(message: dict[str, Any] | bytes, cid: str | None = None, compression: str = 'none') None
Prepare a message for sending to the client.
Called internally in preparation for sending messages to the client.
This method pre-processes the message then passes it to the overridden outgoing method.
- Parameters:
message – A dict of message values to be marshalled and sent
cid – (optional) An associated id
- serialize(msg: bytearray | bson.BSON | dict[str, Any], cid: str | None = None) bson.BSON | bytearray | str | None
Turn a dictionary of values into the appropriate wire-level representation.
Default behaviour uses JSON. Override to use a different container.
- Parameters:
msg – The dictionary of values to serialize
cid – (optional) An ID associated with this. Will be logged on err.
- Returns:
a JSON string representing the dictionary
- unregister_operation(opcode: str) None
Unregister a handler for an opcode.
- Parameters:
opcode – The opcode to unregister the handler for
- rosbridge_library.protocol.has_binary(obj: object) bool
Return True if obj is a binary or contains a binary attribute.
- rosbridge_library.protocol.is_number(s: object) bool | None