Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 31 additions & 29 deletions crazyflie_py/crazyflie_py/crazyflie.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
from crazyflie_interfaces.srv import Takeoff, Land, GoTo, UploadTrajectory, StartTrajectory, NotifySetpointsStop
from crazyflie_interfaces.msg import TrajectoryPolynomialPiece, FullState, Position, VelocityWorld
from tf2_msgs.msg import TFMessage
from uav_trajectory import Trajectory
from typing import Sequence, Mapping, Any

def arrayToGeometryPoint(a):
def arrayToGeometryPoint(a: Sequence[float]) -> Point:
result = Point()
result.x = a[0]
result.y = a[1]
Expand All @@ -46,25 +48,25 @@ class TimeHelper:
simulation scripts. Maintains the property that scripts should not
know/care if they are running in simulation or not.
"""
def __init__(self, node):
def __init__(self, node: rclpy.node.Node):
self.node = node
# self.rosRate = None
self.rateHz = None
self.nextTime = None
# self.visualizer = visNull.VisNull()

def time(self):
def time(self) -> float:
"""Returns the current time in seconds."""
return self.node.get_clock().now().nanoseconds / 1e9

def sleep(self, duration):
def sleep(self, duration: float):
"""Sleeps for the provided duration in seconds."""
start = self.time()
end = start + duration
while self.time() < end:
rclpy.spin_once(self.node, timeout_sec=0)

def sleepForRate(self, rateHz):
def sleepForRate(self, rateHz: float):
"""Sleeps so that, if called in a loop, executes at specified rate."""
# Note: The following ROS 2 construct cannot easily be used, because in ROS 2
# there is no implicit threading anymore. Thus, the rosRate.sleep() call
Expand All @@ -80,7 +82,7 @@ def sleepForRate(self, rateHz):
rclpy.spin_once(self.node, timeout_sec=0)
self.nextTime += 1.0 / rateHz

def isShutdown(self):
def isShutdown(self) -> bool:
"""Returns true if the script should abort, e.g. from Ctrl-C."""
return not rclpy.ok()

Expand All @@ -91,7 +93,7 @@ class Crazyflie:
The bulk of the module's functionality is contained in this class.
"""

def __init__(self, node, cfname, paramTypeDict, index=0, log_pose=False):
def __init__(self, node: rclpy.node.Node, cfname: str, paramTypeDict: Mapping[str, ParameterType], index: int = 0, log_pose: bool = False):
"""Constructor.

Args:
Expand Down Expand Up @@ -237,7 +239,7 @@ def __init__(self, node, cfname, paramTypeDict, index=0, log_pose=False):
# """Disables onboard collision avoidance."""
# self.setParam("colAv/enable", 0)

def pose_cb(self, msg):
def pose_cb(self, msg: TFMessage):
"""Update the server's understanding of the poses for each crazyflie
This is utilized when a real-time understanding of the cf positions
and/or rotations is required. The result will update the internal
Expand All @@ -259,7 +261,7 @@ def pose_cb(self, msg):
if self.log_pose:
self.poses.append(copy.copy(self.pose))

def position(self):
def position(self) -> np.typing.NDArray[float]:
return np.vstack([pose[0] for pose in self.pose.values()])[self.index]

def emergency(self):
Expand All @@ -277,7 +279,7 @@ def emergency(self):
req = Empty.Request()
self.emergencyService.call_async(req)

def takeoff(self, targetHeight, duration, groupMask = 0):
def takeoff(self, targetHeight: float, duration: float, groupMask: int = 0):
"""Execute a takeoff - fly straight up, then hover indefinitely.

Asynchronous command; returns immediately.
Expand All @@ -293,7 +295,7 @@ def takeoff(self, targetHeight, duration, groupMask = 0):
req.duration = rclpy.duration.Duration(seconds=duration).to_msg()
self.takeoffService.call_async(req)

def land(self, targetHeight, duration, groupMask = 0):
def land(self, targetHeight: float, duration: float, groupMask: int = 0):
"""Execute a landing - fly straight down. User must cut power after.

Asynchronous command; returns immediately.
Expand Down Expand Up @@ -324,7 +326,7 @@ def land(self, targetHeight, duration, groupMask = 0):
# """
# self.stopService(groupMask)

def goTo(self, goal, yaw, duration, relative = False, groupMask = 0):
def goTo(self, goal: Sequence[float], yaw: float, duration: float, relative: bool = False, groupMask: int = 0):
"""Move smoothly to the goal, then hover indefinitely.

Asynchronous command; returns immediately.
Expand All @@ -348,7 +350,7 @@ def goTo(self, goal, yaw, duration, relative = False, groupMask = 0):
controller to become unstable.

Args:
goal (iterable of 3 floats): The goal position. Meters.
goal (sequence of 3 floats): The goal position. Meters.
yaw (float): The goal yaw angle (heading). Radians.
duration (float): How long until the goal is reached. Seconds.
relative (bool): If true, the goal position is interpreted as a
Expand All @@ -366,7 +368,7 @@ def goTo(self, goal, yaw, duration, relative = False, groupMask = 0):
req.duration = rclpy.duration.Duration(seconds=duration).to_msg()
self.goToService.call_async(req)

def uploadTrajectory(self, trajectoryId, pieceOffset, trajectory):
def uploadTrajectory(self, trajectoryId: int, pieceOffset: int, trajectory: Trajectory):
"""Uploads a piecewise polynomial trajectory for later execution.

See uav_trajectory.py for more information about piecewise polynomial
Expand All @@ -393,7 +395,7 @@ def uploadTrajectory(self, trajectoryId, pieceOffset, trajectory):
req.pieces = pieces
self.uploadTrajectoryService.call_async(req)

def startTrajectory(self, trajectoryId, timescale = 1.0, reverse = False, relative = True, groupMask = 0):
def startTrajectory(self, trajectoryId: int, timescale: float = 1.0, reverse: bool = False, relative: bool = True, groupMask: int = 0):
"""Begins executing a previously uploaded trajectory.

Asynchronous command; returns immediately.
Expand All @@ -417,7 +419,7 @@ def startTrajectory(self, trajectoryId, timescale = 1.0, reverse = False, relati
req.relative = relative
self.startTrajectoryService.call_async(req)

def notifySetpointsStop(self, remainValidMillisecs=100, groupMask=0):
def notifySetpointsStop(self, remainValidMillisecs:float=100, groupMask:int=0):
"""Informs that streaming low-level setpoint packets are about to stop.

Streaming setpoints are :meth:`cmdVelocityWorld`, :meth:`cmdFullState`,
Expand Down Expand Up @@ -484,7 +486,7 @@ def notifySetpointsStop(self, remainValidMillisecs=100, groupMask=0):
# """
# return rospy.get_param(self.prefix + "/" + name)

def setParam(self, name, value):
def setParam(self, name: str, value: Any):
"""Changes the value of the given parameter.

See :meth:`getParam()` docs for overview of the parameter system.
Expand Down Expand Up @@ -515,7 +517,7 @@ def setParam(self, name, value):
# rospy.set_param(self.prefix + "/" + name, value)
# self.updateParamsService(params.keys())

def cmdFullState(self, pos, vel, acc, yaw, omega):
def cmdFullState(self, pos: Sequence[float], vel: Sequence[float], acc: Sequence[float], yaw: float, omega: float):
"""Sends a streaming full-state controller setpoint command.

The full-state setpoint is most useful for aggressive maneuvers where
Expand Down Expand Up @@ -558,7 +560,7 @@ def cmdFullState(self, pos, vel, acc, yaw, omega):
self.cmdFullStateMsg.twist.angular.z = omega[2]
self.cmdFullStatePublisher.publish(self.cmdFullStateMsg)

def cmdVelocityWorld(self, vel, yawRate):
def cmdVelocityWorld(self, vel: Sequence[float], yawRate: float):
"""Sends a streaming velocity-world controller setpoint command.

In this mode, the PC specifies desired velocity vector and yaw rate.
Expand Down Expand Up @@ -631,7 +633,7 @@ def cmdVelocityWorld(self, vel, yawRate):
# msg.linear.z = thrust
# self.cmdVelPublisher.publish(msg)

def cmdPosition(self, pos, yaw = 0.):
def cmdPosition(self, pos: Sequence[float], yaw: float = 0.):
"""Sends a streaming command of absolute position and yaw setpoint.

Useful for slow maneuvers where a high-level planner determines the
Expand All @@ -650,7 +652,7 @@ def cmdPosition(self, pos, yaw = 0.):
self.cmdPositionMsg.yaw = yaw
self.cmdPositionPublisher.publish(self.cmdPositionMsg)

def setLEDColor(self, r, g, b):
def setLEDColor(self, r: float, g: float, b: float):
"""Sets the color of the LED ring deck.

While most params (such as PID gains) only need to be set once, it is
Expand Down Expand Up @@ -690,7 +692,7 @@ class CrazyflieServer(rclpy.node.Node):
crazyfliesById (Dict[int, Crazyflie]): Index to the same Crazyflie
objects by their ID number (last byte of radio address).
"""
def __init__(self, log_poses=False):
def __init__(self, log_poses: bool = False):
"""Initialize the server. Waits for all ROS services before returning.
"""
super().__init__("CrazyflieAPI")
Expand Down Expand Up @@ -791,7 +793,7 @@ def emergency(self):
req = Empty.Request()
self.emergencyService.call_async(req)

def takeoff(self, targetHeight, duration, groupMask = 0):
def takeoff(self, targetHeight: float, duration: float, groupMask: int = 0):
"""Broadcasted takeoff - fly straight up, then hover indefinitely.

Broadcast version of :meth:`Crazyflie.takeoff()`. All robots that match the
Expand All @@ -809,7 +811,7 @@ def takeoff(self, targetHeight, duration, groupMask = 0):
req.duration = rclpy.duration.Duration(seconds=duration).to_msg()
self.takeoffService.call_async(req)

def land(self, targetHeight, duration, groupMask = 0):
def land(self, targetHeight: float, duration: float, groupMask: int = 0):
"""Broadcasted landing - fly straight down. User must cut power after.

Broadcast version of :meth:`Crazyflie.land()`. All robots that match the
Expand All @@ -830,7 +832,7 @@ def land(self, targetHeight, duration, groupMask = 0):
req.duration = rclpy.duration.Duration(seconds=duration).to_msg()
self.landService.call_async(req)

def goTo(self, goal, yaw, duration, groupMask = 0):
def goTo(self, goal: Sequence[float], yaw: float, duration: float, groupMask: int = 0):
"""Broadcasted goTo - Move smoothly to goal, then hover indefinitely.

Broadcast version of :meth:`Crazyflie.goTo()`. All robots that match the
Expand All @@ -845,7 +847,7 @@ def goTo(self, goal, yaw, duration, groupMask = 0):
See docstring of :meth:`Crazyflie.goTo()` for additional details.

Args:
goal (iterable of 3 floats): The goal offset. Meters.
goal (sequence of 3 floats): The goal offset. Meters.
yaw (float): The goal yaw angle (heading). Radians.
duration (float): How long until the goal is reached. Seconds.
groupMask (int): Group mask bits. See :meth:`Crazyflie.setGroupMask()` doc.
Expand All @@ -859,7 +861,7 @@ def goTo(self, goal, yaw, duration, groupMask = 0):
req.duration = rclpy.duration.Duration(seconds=duration).to_msg()
self.goToService.call_async(req)

def startTrajectory(self, trajectoryId, timescale = 1.0, reverse = False, relative = True, groupMask = 0):
def startTrajectory(self, trajectoryId: int, timescale: float = 1.0, reverse: bool = False, relative: bool = True, groupMask: int = 0):
"""Broadcasted - begins executing a previously uploaded trajectory.

Broadcast version of :meth:`Crazyflie.startTrajectory()`.
Expand All @@ -883,7 +885,7 @@ def startTrajectory(self, trajectoryId, timescale = 1.0, reverse = False, relati
req.relative = relative
self.startTrajectoryService.call_async(req)

def setParam(self, name, value):
def setParam(self, name: str, value: ParameterType):
"""Broadcasted setParam. See Crazyflie.setParam() for details."""
param_name = "all.params." + name
param_type = self.paramTypeDict[name]
Expand All @@ -895,7 +897,7 @@ def setParam(self, name, value):
req.parameters = [Parameter(name=param_name, value=param_value)]
self.setParamsService.call_async(req)

def cmdFullState(self, pos, vel, acc, yaw, omega):
def cmdFullState(self, pos: Sequence[float], vel: Sequence[float], acc: Sequence[float], yaw: float, omega: float):
"""Sends a streaming full-state controller setpoint command.

The full-state setpoint is most useful for aggressive maneuvers where
Expand Down