Skip to content

Commit ea64d78

Browse files
tcappellari-bdaijbarry-bdai
authored andcommitted
Merge branch 'main' into migration-readme
2 parents ba68e47 + 97fb7b6 commit ea64d78

8 files changed

Lines changed: 347 additions & 28 deletions

File tree

CONTRIBUTING.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ All types of contributions are encouraged and valued. See the [Table of Contents
2323

2424
## I Have a Question
2525

26-
Before you ask a question, it is best to search for existing [issues](https://github.com/bdaiinstitute/synchros2/issues) that might help you. In case you have found a suitable issue and still need clarification, you can write your question in this issue. It is also advisable to search the internet for answers first.
26+
Before you ask a question, it is best to search for existing [issues](https://github.com/rai-opensource/synchros2/issues) that might help you. In case you have found a suitable issue and still need clarification, you can write your question in this issue. It is also advisable to search the internet for answers first.
2727

2828
If you then still feel the need to ask a question and need clarification, we recommend the following:
2929

@@ -47,7 +47,7 @@ A good bug report shouldn't leave others needing to chase you up for more inform
4747

4848
- Make sure that you are using the latest version.
4949
- Determine if your bug is really a bug and not an error on your side. If you are looking for support, you might want to check [this section](#i-have-a-question)).
50-
- To see if other users have experienced (and potentially already solved) the same issue you are having, check if there is not already a bug report existing for your bug or error in the [bug tracker](https://github.com/bdaiinstitute/synchros2/issues?q=label%3Abug).
50+
- To see if other users have experienced (and potentially already solved) the same issue you are having, check if there is not already a bug report existing for your bug or error in the [bug tracker](https://github.com/rai-opensource/synchros2/issues?q=label%3Abug).
5151
- Collect information about the bug:
5252
- Stack trace (Traceback)
5353
- OS, ROS, Platform and Version (Windows, Linux, macOS, x86, ARM)
@@ -60,7 +60,7 @@ A good bug report shouldn't leave others needing to chase you up for more inform
6060

6161
We use GitHub issues to track bugs and errors. If you run into an issue with the project:
6262

63-
- Open an [issue](https://github.com/bdaiinstitute/synchros2/issues/new).
63+
- Open an [issue](https://github.com/rai-opensource/synchros2/issues/new).
6464
- Explain the behavior you would expect and the actual behavior.
6565
- Please provide as much context as possible and describe the *reproduction steps* that someone else can follow to recreate the issue on their own. This usually includes your code. For good bug reports you should isolate the problem and create a reduced test case.
6666
- Provide the information you collected in the previous section.
@@ -78,13 +78,13 @@ Once it's filed:
7878

7979
- Make sure that you are using the latest version.
8080
- Read the documentation carefully and ensure the functionality is indeed missing.
81-
- Perform a [search](https://github.com/bdaiinstitute/synchros2/issues) to see if the feature has already been requested. If it has, add a comment to the existing issue instead of opening a new one.
81+
- Perform a [search](https://github.com/rai-opensource/synchros2/issues) to see if the feature has already been requested. If it has, add a comment to the existing issue instead of opening a new one.
8282
- Find out whether your idea fits with the scope and aims of the project. It's up to you to make a strong case to convince the project's developers of the merits of this feature. Keep in mind that we want features that will be useful to the majority of our users and not just a small subset. If you're just targeting a minority of users, consider writing an add-on/plugin library.
8383

8484
<!-- omit in toc -->
8585
#### How Do I Submit a Good Feature Request?
8686

87-
Feature requests are tracked as [GitHub issues](https://github.com/bdaiinstitute/synchros2/issues).
87+
Feature requests are tracked as [GitHub issues](https://github.com/rai-opensource/synchros2/issues).
8888

8989
- Use a **clear and descriptive title** for the issue to identify the suggestion.
9090
- Provide a **step-by-step description of the suggested enhancement** in as many details as possible.

synchros2/docs/concepts/message_feeds.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ Any message filter can become a feed, allowing:
1212

1313
Like message filters, most message feeds can be chained. This is true for all but those that externally source messages, ROS 2 topic subscriptions being the prime example. These are sources only. Other message feeds built into `synchros2` offer a vehicle for generic map-filter-reduce patterns, time synchronization across multiple message feeds, and synchronized `tf` lookups.
1414

15-
**Note:** While any message filter can become a feed, standard ROS 2 message filters are usually not thread-safe. See [`synchros2.filters`](https://github.com/bdaiinstitute/synchros2/tree/main/synchros2/synchros2/filters.py) for thread-safe (re)implementations.
15+
**Note:** While any message filter can become a feed, standard ROS 2 message filters are usually not thread-safe. See [`synchros2.filters`](https://github.com/rai-opensource/synchros2/tree/main/synchros2/synchros2/filters.py) for thread-safe (re)implementations.
1616

1717
## Looping over topic messages
1818

synchros2/docs/getting_started/installation.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ sudo apt install ros-$ROS_DISTRO-synchros2
2828
```bash
2929
mkdir -p path/to/workspace/src # for a new workspace
3030
cd path/to/workspace/src
31-
git clone https://github.com/bdaiinstitute/synchros2.git
31+
git clone https://github.com/rai-opensource/synchros2.git
3232
```
3333

3434
2. Install `synchros2` dependencies with `rosdep`:

synchros2/synchros2/executors.py

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import rclpy.callback_groups
1818
import rclpy.executors
1919
import rclpy.node
20+
import rclpy.timer
2021

2122
from synchros2.futures import FutureLike
2223
from synchros2.utilities import bind_to_thread, fqn
@@ -592,6 +593,7 @@ def __init__(
592593
max_thread_idle_time: typing.Optional[float] = None,
593594
max_threads_per_callback_group: typing.Optional[int] = None,
594595
*,
596+
num_threads_for_timers: typing.Optional[int] = None,
595597
context: typing.Optional[rclpy.context.Context] = None,
596598
logger: typing.Optional[logging.Logger] = None,
597599
) -> None:
@@ -607,24 +609,41 @@ def __init__(
607609
max_threads_per_callback_group: optional maximum number of concurrent callbacks the
608610
default thread pool should service for a given callback group. Useful to avoid
609611
reentrant callback groups from starving the default thread pool.
612+
num_threads_for_timers: optional number of threads to dedicate to timer callbacks.
613+
Defaults to 10% of all available threads, which may be 0 if there are less than
614+
10 threads, in which case timer callbacks will be serviced by the default thread pool.
610615
context: An optional instance of the ros context.
611616
logger: An optional logger instance.
612617
"""
613618
super().__init__(context=context)
614619
if logger is None:
615620
logger = rclpy.logging.get_logger(fqn(self.__class__))
621+
if max_threads is None:
622+
max_threads = 32 * (os.cpu_count() or 1)
623+
if num_threads_for_timers is None:
624+
num_threads_for_timers = max_threads // 10
625+
if num_threads_for_timers == 0:
626+
logger.warning("Not enough threads available, timers will be serviced by the default thread pool")
627+
max_threads -= num_threads_for_timers
616628
self._logger = logger
617629
self._is_shutdown = False
618630
self._spin_lock = threading.Lock()
619631
self._shutdown_lock = threading.RLock()
620-
self._thread_pools = [
621-
AutoScalingThreadPool(
622-
max_workers=max_threads,
623-
max_idle_time=max_thread_idle_time,
632+
self._default_thread_pool = AutoScalingThreadPool(
633+
max_workers=max_threads,
634+
max_idle_time=max_thread_idle_time,
635+
submission_quota=max_threads_per_callback_group,
636+
logger=self._logger,
637+
)
638+
self._timers_thread_pool: typing.Optional[AutoScalingThreadPool] = None
639+
if num_threads_for_timers != 0:
640+
self._timers_thread_pool = AutoScalingThreadPool(
641+
min_workers=num_threads_for_timers,
642+
max_workers=num_threads_for_timers,
624643
submission_quota=max_threads_per_callback_group,
625644
logger=self._logger,
626-
),
627-
]
645+
)
646+
self._static_thread_pools: typing.List[AutoScalingThreadPool] = []
628647
self._callback_group_affinity: weakref.WeakKeyDictionary[
629648
rclpy.callback_groups.CallbackGroup,
630649
AutoScalingThreadPool,
@@ -637,12 +656,21 @@ def __init__(
637656
@property
638657
def default_thread_pool(self) -> AutoScalingThreadPool:
639658
"""Default autoscaling thread pool."""
640-
return self._thread_pools[0]
659+
return self._default_thread_pool
660+
661+
@property
662+
def timers_thread_pool(self) -> typing.Optional[AutoScalingThreadPool]:
663+
"""Autoscaling thread pool for timer callbacks."""
664+
return self._timers_thread_pool
641665

642666
@property
643667
def thread_pools(self) -> typing.List[AutoScalingThreadPool]:
644668
"""Autoscaling thread pools in use."""
645-
return list(self._thread_pools)
669+
thread_pools = [self._default_thread_pool]
670+
if self._timers_thread_pool is not None:
671+
thread_pools.append(self._timers_thread_pool)
672+
thread_pools.extend(self._static_thread_pools)
673+
return thread_pools
646674

647675
def add_static_thread_pool(self, num_threads: typing.Optional[int] = None) -> AutoScalingThreadPool:
648676
"""Add a thread pool that keeps a steady number of workers."""
@@ -653,8 +681,8 @@ def add_static_thread_pool(self, num_threads: typing.Optional[int] = None) -> Au
653681
max_workers=num_threads,
654682
logger=self._logger,
655683
)
656-
self._thread_pools.append(thread_pool)
657-
self._logger.debug(f"Added static thread pool #{len(self._thread_pools) - 1}")
684+
self._static_thread_pools.append(thread_pool)
685+
self._logger.debug(f"Added static thread pool #{len(self._static_thread_pools) - 1}")
658686
return thread_pool
659687

660688
def bind(self, callback_group: rclpy.callback_groups.CallbackGroup, thread_pool: AutoScalingThreadPool) -> None:
@@ -663,9 +691,13 @@ def bind(self, callback_group: rclpy.callback_groups.CallbackGroup, thread_pool:
663691
Thread pool must be known to the executor. That is, instantiated through add_*_thread_pool() methods.
664692
"""
665693
with self._shutdown_lock:
666-
if thread_pool not in self._thread_pools:
694+
if thread_pool not in self._static_thread_pools:
695+
if thread_pool is self._default_thread_pool:
696+
raise ValueError("cannot rebind to default thread pool")
697+
if thread_pool is self._timers_thread_pool:
698+
raise ValueError("cannot bind to timers thread pool")
667699
raise ValueError("thread pool unknown to executor")
668-
thread_pool_index = self._thread_pools.index(thread_pool)
700+
thread_pool_index = self._static_thread_pools.index(thread_pool)
669701
callback_group_name = f"{fqn(type(callback_group))}@{id(callback_group)}"
670702
self._logger.debug(f"Binding {callback_group_name} to thread pool #{thread_pool_index}...")
671703
self._callback_group_affinity[callback_group] = thread_pool
@@ -698,14 +730,16 @@ def _do_spin_once(self, *args: typing.Any, **kwargs: typing.Any) -> None:
698730
# dispatch and be missed. Fortunately, this will only delay dispatch until the
699731
# next spin cycle.
700732
if task not in self._work_in_progress or (self._work_in_progress[task].done() and not task.done()):
701-
if task.callback_group is not None:
702-
if task.callback_group not in self._callback_group_affinity:
703-
self._callback_group_affinity[task.callback_group] = self._thread_pools[0]
733+
if task.callback_group is not None and task.callback_group in self._callback_group_affinity:
704734
thread_pool = self._callback_group_affinity[task.callback_group]
735+
thread_pool_index = self._static_thread_pools.index(thread_pool)
736+
self._logger.debug(f"Task '{task}' submitted to static thread pool #{thread_pool_index}")
737+
elif self._timers_thread_pool is not None and isinstance(task.entity, rclpy.timer.Timer):
738+
thread_pool = self._timers_thread_pool
739+
self._logger.debug(f"Task '{task}' submitted to timers thread pool")
705740
else:
706-
thread_pool = self._thread_pools[0]
707-
thread_pool_index = self._thread_pools.index(thread_pool)
708-
self._logger.debug(f"Task '{task}' submitted to thread pool #{thread_pool_index}")
741+
thread_pool = self._default_thread_pool
742+
self._logger.debug(f"Task '{task}' submitted to default thread pool")
709743
self._work_in_progress[task] = thread_pool.submit(task)
710744
for task in list(self._work_in_progress):
711745
if not task.done():
@@ -781,10 +815,11 @@ def shutdown(self, timeout_sec: typing.Optional[float] = None) -> bool:
781815
# must be waited on. Work tracking in rclpy.executors.Executor
782816
# base implementation is subject to races, so block thread pool
783817
# submissions and wait for all futures to finish. Then shutdown.
784-
done = all(thread_pool.wait(timeout_sec) for thread_pool in self._thread_pools)
818+
819+
done = all(thread_pool.wait(timeout_sec) for thread_pool in self.thread_pools)
785820
if done:
786821
assert super().shutdown(timeout_sec=0)
787-
for thread_pool in self._thread_pools:
822+
for thread_pool in self.thread_pools:
788823
thread_pool.shutdown()
789824
self._is_shutdown = True
790825
if done:

synchros2/synchros2/node.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,21 @@
33
import functools
44
from typing import Any, Callable, Iterable, Optional, Type
55

6+
try:
7+
from typing import override # type: ignore[attr-defined]
8+
except ImportError:
9+
override = lambda func: func # noqa
10+
611
from rclpy.callback_groups import CallbackGroup
12+
from rclpy.clock import Clock
713
from rclpy.exceptions import InvalidHandle
814
from rclpy.node import Node as BaseNode
15+
from rclpy.timer import Rate
916
from rclpy.waitable import Waitable
1017

1118
from synchros2.callback_groups import NonReentrantCallbackGroup
1219
from synchros2.logging import MemoizingRcutilsLogger
20+
from synchros2.time import SteadyRate
1321

1422

1523
def suppressed(exception: Type[BaseException], func: Callable) -> Callable:
@@ -55,6 +63,32 @@ def default_callback_group(self) -> CallbackGroup:
5563
# NOTE(hidmic): this overrides the hardcoded default group in rclpy.node.Node implementation
5664
return self._default_callback_group_override
5765

66+
@override
67+
def create_rate(
68+
self,
69+
frequency: float,
70+
clock: Optional[Clock] = None,
71+
) -> Rate:
72+
"""Create a Rate object.
73+
74+
:param frequency: The frequency the Rate runs at (Hz).
75+
:param clock: The clock the Rate gets time from.
76+
"""
77+
if clock is None:
78+
clock = self.get_clock()
79+
return SteadyRate(frequency, clock, context=self._context)
80+
81+
@override
82+
def destroy_rate(self, rate: Rate) -> bool:
83+
"""Destroy a Rate object created by the node.
84+
85+
:return: ``True`` if successful, ``False`` otherwise.
86+
"""
87+
if isinstance(rate, SteadyRate):
88+
rate.destroy()
89+
return True
90+
return super().destroy_rate(rate)
91+
5892
@property
5993
def waitables(self) -> Iterable[Waitable]:
6094
"""Get patched node waitables.

synchros2/synchros2/time.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,20 @@
11
# Copyright (c) 2024 Robotics and AI Institute LLC dba RAI Institute. All rights reserved.
22

3+
import threading
34
from datetime import datetime, timedelta
4-
from typing import Union
5+
from typing import Optional, Union
56

7+
try:
8+
from typing import override # type: ignore[attr-defined]
9+
except ImportError:
10+
override = lambda func: func # noqa
11+
12+
from rclpy.context import Context
613
from rclpy.duration import Duration
14+
from rclpy.exceptions import ROSInterruptException
715
from rclpy.time import Time
16+
from rclpy.timer import Rate
17+
from rclpy.utilities import get_default_context
818

919

1020
def as_proper_time(time: Union[int, float, datetime, Time]) -> Time:
@@ -57,3 +67,66 @@ def as_proper_duration(duration: Union[int, float, timedelta, Duration]) -> Dura
5767
if not isinstance(duration, Duration):
5868
raise ValueError(f"unsupported duration type: {duration}")
5969
return duration
70+
71+
72+
class SteadyRate(Rate):
73+
"""An rclpy.Rate equivalent that uses clock functionality directly, without timer overhead."""
74+
75+
def __init__(self, frequency: float, clock: Time, *, context: Optional[Context] = None) -> None:
76+
# NOTE: SteadyRate subclasses Rate for type consistency but does not use any of its functionality.
77+
# Thus, we skip the constructor call entirely.
78+
self._clock = clock
79+
if context is None:
80+
context = get_default_context()
81+
self._context = context
82+
self._period = as_proper_duration(1.0 / frequency)
83+
self._deadline = self._clock.now() + self._period
84+
85+
self._lock = threading.Lock()
86+
self._num_sleepers = 0
87+
88+
self._is_shutdown = False
89+
self._is_destroyed = False
90+
self._context.on_shutdown(self._on_shutdown)
91+
92+
@override
93+
def _on_shutdown(self) -> None:
94+
self._is_shutdown = True
95+
self.destroy()
96+
97+
@override
98+
def destroy(self) -> None:
99+
"""Destroy the rate."""
100+
self._is_destroyed = True
101+
102+
@override
103+
def _presleep(self) -> None:
104+
if self._is_shutdown:
105+
raise ROSInterruptException()
106+
if self._is_destroyed:
107+
raise RuntimeError("MonotonicRate cannot sleep because it has been destroyed")
108+
with self._lock:
109+
self._num_sleepers += 1
110+
111+
@override
112+
def _postsleep(self) -> None:
113+
with self._lock:
114+
self._num_sleepers -= 1
115+
if self._num_sleepers == 0:
116+
now = self._clock.now()
117+
next_deadline = self._deadline + self._period
118+
if now < self._deadline or now > next_deadline:
119+
next_deadline = now + self._period
120+
self._deadline = next_deadline
121+
if self._is_shutdown:
122+
self.destroy()
123+
raise ROSInterruptException()
124+
125+
@override
126+
def sleep(self) -> None:
127+
"""Block until the current period is over."""
128+
self._presleep()
129+
try:
130+
self._clock.sleep_until(self._deadline, context=self._context)
131+
finally:
132+
self._postsleep()

0 commit comments

Comments
 (0)