Migrating from rclpy to synchros2
The synchros2 programming model is a superset of rclpy’s. Code written for rclpy, which requires everything to be asynchronous, can be used with synchros2 directly, while simultaneously enabling synchronous patterns. To do so, just:
Use
@synchros2.process.main()for your application entrypoint.Make sure to spin your node subclasses explicitly.
This allows you to migrate incrementally: you can keep your existing node subclasses, and still get access to a process-wide background executor for synchronous programming if desired.
We also recommend you subclass synchros2.node.Node instead of rclpy.node.Node. By default it is better behaved in multi-threaded settings.
Example
Before (on rclpy)
import rclpy
from rclpy.node import Node
class MyNode(Node):
def __init__(self) -> None:
super().__init__('my_node')
# ... setup publishers, timers, etc ...
def main() -> None:
rclpy.init()
node = MyNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == '__main__':
main()
After (on synchros2)
import synchros2.process as ros_process
from synchros2.node import Node
class MyNode(Node):
def __init__(self) -> None:
super().__init__('my_node')
# ... setup publishers, timers, etc ...
@ros_process.main(prebaked=False)
def main() -> None:
ros_process.spin(MyNode)
if __name__ == '__main__':
main()
And if single-threaded execution is (still) a must to ensure equivalent behavior:
from rclpy.executors import SingleThreadedExecutor
import synchros2.process as ros_process
from synchros2.executors import foreground
from synchros2.node import Node
class MyNode(Node):
def __init__(self) -> None:
super().__init__('my_node')
# ... setup publishers, timers, etc ...
@ros_process.main(prebaked=False)
def main() -> None:
with foreground(SingleThreadedExecutor()) as main.executor:
main.spin(MyNode)
if __name__ == '__main__':
main()