Executors and Callback Groups
synchros2 uses and extends the same abstractions and patterns that rclpy offers to manage execution. Namely, executors for task and callback dispatch and callback groups for concurrency control. However, executors and callback groups provided by rclpy do not generally afford blocking calls. synchros2 adds implementations of its own to better support these use cases.
Executors
Executors in ROS 2 are components that manage how nodes’ callbacks and tasks are scheduled and run. They handle the coordination of message processing, timers, and service requests by dispatching these events to one or more threads. Executors allow developers to control concurrency and execution flow.
rclpy.executors.SingleThreadedExecutor
The simplest executor. Tasks and callbacks are dispatched and run by the same thread that is spinning it.
import rclpy
from rclpy.executors import SingleThreadedExecutor
def main():
rclpy.init()
node = rclpy.create_node("my_node")
executor = SingleThreadedExecutor()
executor.add_node(node)
try:
executor.spin() # blocking!
except KeyboardInterrupt:
pass
finally:
executor.shutdown()
executor.remove_node(node)
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()
Concurrency is therefore impossible. A blocking call within a callback will block the entire executor, and if it relies on other callbacks to unblock it will invariably deadlock it.
import rclpy
from std_srvs.srv import Trigger
from rclpy.executors import SingleThreadedExecutor
def poll(client):
print("About to deadlock...")
client.call(Trigger.Request()) # deadlock!
def trigger_callback(request, response):
response.success = True
return response
def main():
rclpy.init()
node = rclpy.create_node("my_node")
executor = SingleThreadedExecutor()
node.create_service(Trigger, "trigger", trigger_callback)
client = node.create_client(Trigger, "trigger")
node.create_timer(1.0, lambda: poll(client))
executor.add_node(node)
try:
executor.spin() # blocking!
except KeyboardInterrupt:
pass
finally:
executor.shutdown()
executor.remove_node(node)
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()
On the flip side, lack of context switch makes the single threaded executor a good choice when time keeping is important (e.g. a node with a timer that needs to fire at a high rate).
import time
import rclpy
from rclpy.executors import SingleThreadedExecutor
def main():
rclpy.init()
node = rclpy.create_node("my_node")
executor = SingleThreadedExecutor()
node.create_timer(0.1, lambda: print(time.time()))
executor.add_node(node)
try:
executor.spin() # blocking!
except KeyboardInterrupt:
pass
finally:
executor.shutdown()
executor.remove_node(node)
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()
rclpy.executors.MultiThreadedExecutor
Simple threaded executor for concurrent execution. Tasks and callbacks are dispatched by the thread that is spinning it but run on a fixed size thread pool.
import time
import rclpy
from rclpy.executors import MultiThreadedExecutor
def timer_callback():
print(time.time())
time.sleep(0.5)
def main():
rclpy.init()
node = rclpy.create_node("my_node")
node.create_timer(1.0, timer_callback)
node.create_timer(1.0, timer_callback)
executor = MultiThreadedExecutor(2)
executor.add_node(node)
try:
executor.spin() # blocking!
except KeyboardInterrupt:
pass
finally:
executor.shutdown()
executor.remove_node(node)
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()
Blocking calls are possible but its thread pool size must be chosen wisely. If there are fewer threads than standing blocking calls at any point in time, dispatch delays and deadlocks are back on the table.
synchros2.executors.AutoScalingMultiThreadedExecutor
Akin to the rclpy.executors.MultiThreadedExecutor but managing thread pools that scale up and down on demand. Tasks and callbacks are dispatched by the thread that is spinning it but run on a thread pool that adds workers to match the demand.
import time
import rclpy
from synchros2.executors import AutoScalingMultiThreadedExecutor
def callback(i):
print(f"[{time.time()}] Ahoy! (#{i})")
def main():
rclpy.init()
executor = AutoScalingMultiThreadedExecutor()
for i in range(20):
executor.create_task(callback, i)
try:
executor.spin() # blocking!
except KeyboardInterrupt:
pass
finally:
executor.shutdown()
rclpy.try_shutdown()
if __name__ == "__main__":
main()
If the number of workers hits a configurable upper bound (max_threads), work is queued. If a worker is idle for too long (max_thread_idle_time), and the number of workers has not yet hit a configurable lower bound (0), it will be stopped. A concurrency quota can be configured for callback groups to avoid any given callback group from starving the rest (max_threads_per_callback_group).
By default, this executor manages a single thread pool, but more may be added. Callback groups can then be assigned to specific thread pools to further control callback dispatch and execution.
import time
import threading
import rclpy
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from synchros2.executors import AutoScalingMultiThreadedExecutor
def callback():
current_thread = threading.current_thread()
print(f"Ahoy! from {current_thread.name}")
def timer_callback(executor):
current_thread = threading.current_thread()
print(f"Dispatch from {current_thread.name} at time {time.time()}")
for i in range(10):
executor.create_task(callback)
def main():
rclpy.init()
node = rclpy.create_node("my_node")
executor = AutoScalingMultiThreadedExecutor()
thread_pool = executor.add_static_thread_pool(1)
custom_callback_group = MutuallyExclusiveCallbackGroup()
executor.bind(custom_callback_group, thread_pool)
node.create_timer(0.1, lambda: timer_callback(executor), custom_callback_group)
executor.add_node(node)
try:
executor.spin() # blocking!
except KeyboardInterrupt:
pass
finally:
executor.shutdown()
executor.remove_node(node)
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()
Callback groups
Callback groups in ROS 2 are mechanisms that control how callbacks (functions triggered by events like messages or timers) are scheduled and executed within a node. They let you specify concurrency rules, such as whether callbacks can run simultaneously or must run one at a time. There is little value to them in single threaded execution scenarios but they are crucial when using multi-threaded executors. By assigning callbacks to different groups, you can fine-tune how your system handles incoming events and tasks.
rclpy.callback_groups.MutuallyExclusiveCallbackGroup
Tasks and callbacks may only run one at a time when in this callback group.
This is the default callback group for rclpy.node.Node instances.
rclpy.callback_groups.ReentrantCallbackGroup
Tasks and callbacks may run concurrently when in this callback group, including multiple instances of the same callback e.g. when a topic subscription message arrives before it is done processing the previous one.
synchros2.callback_groups.NonReentrantCallbackGroup
Tasks and callbacks may run concurrently when in this callback group, but there may only be one instance of a given callback running at any given time e.g. topic subscription messages will always be processed sequentially.
This is the default callback group for synchros2.node.Node instances.
Utilities
To simplify executor management, synchros2 offers a few additional utilities.
If you need to spin in foreground (i.e. block on the executor), you may:
from rclpy.executors import SingleThreadedExecutor
from synchros2.executors import foreground
with foreground(SingleThreadedExecutor()) as executor:
executor.spin()
If you need to spin in the background (i.e. keep the executor running while blocking on something else), you may:
from rclpy.executors import SingleThreadedExecutor
from synchros2.executors import background
with background(SingleThreadedExecutor()) as executor:
# do work, no need to spin
# executor.spin() would raise
Either way, synchros2 context managers will take care of the executor lifecycle for you.