-
Notifications
You must be signed in to change notification settings - Fork 40
Testing Infrastructure
The spot_wrapper.testing submodule
offers pytest compatible machinery to test Spot SDK usage.
At its core, this machinery is nothing but mocks and fixtures: mocks of Spot gRPC services, and fixtures serving those mocks over the wire. This affords complete control over a well-defined interface that spans the entire feature set (effectively emulating an Spot robot) at the expense of additional complexity.
A word of advice. spot_wrapper.testing complexity is not always warranted. A single Spot SDK API can involve
multiple gRPC service invocations, and Spot SDK documentation is seldom transparent about this. An apparent unit test
is in fact an integration test. For cases in which this is problematic, other mocking frameworks and utilities, such as
unittest.mock or pytest-mocker in Python, o gmock in C++, can be used instead of (and also in addition to)
spot_wrapper.testing to mock Spot SDK APIs directly, in-process.
Mocking with spot_wrapper.testing means mocking gRPC services specific to Spot robots. In practice,
this amounts to subclassing gRPC servicer classes and providing
an implementation for each service stub. Aside from implementation specific details -- typically a dummy implementation
bearing minimal logic, if any --, this is no different from any other gRPC service implementation, and thus the same
semantics and guidelines apply. For instance, mocks must served by a gRPC
server too.
import concurrent.futures
import grpc
from bosdyn.api.robot_id_pb2 import RobotIdRequest, RobotIdResponse
from bosdyn.api.robot_id_service_pb2_grpc import RobotIdServiceServicer
from bosdyn.api.robot_id_service_pb2_grpc import add_RobotIdServiceServicer_to_server
class MockRobotIdService(RobotIdServiceServicer):
"""A mock Spot robot id service."""
def GetRobotId(self, request: RobotIdRequest, context: grpc.ServicerContext) -> RobotIdResponse:
response = RobotIdResponse()
response.robot_id.serial_number = "1234567890"
return response
def main() -> None:
server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
add_RobotIdServiceServicer_to_server(MockRobotIdService(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
main()A complete Spot robot can be emulated in this way. It is an arduous path, however. Spot robots are complex machines featuring
hundreds of services. To alleviate that burden, spot_wrapper.testing already provides enough built-in mocks to cover for
spot_wrapper.SpotWrapper initialization. This includes mocks for emergency stop services, leasing services, keep-alive
services, time synchronization services, and more. Each mock is designed as an standalone component, and multiple mocks
can be aggregated via multiple inheritance, like building blocks of larger, more comprehensive mocks. These mocks can
naturally coexist with user-defined ones.
import concurrent.futures
import grpc
from bosdyn.api.time_sync_pb2 import (
TimeSyncState,
TimeSyncUpdateRequest,
TimeSyncUpdateResponse,
)
from bosdyn.api.time_sync_service_pb2_grpc import (
TimeSyncServiceServicer,
add_TimeSyncServiceServicer_to_server,
)
from bosdyn.api.robot_id_service_pb2_grpc import add_RobotIdServiceServicer_to_server
from bosdyn.api.robot_state_service_pb2_grpc import add_RobotStateServiceServicer_to_server
from spot_wrapper.testing.mocks.robot_id import MockRobotIdService
from spot_wrapper.testing.mocks.robot_state import MockRobotStateService
class MockAnyRobotService(MockRobotIdService, MockRobotStateService, TimeSyncServiceServicer):
def TimeSyncUpdate(self, request: TimeSyncUpdateRequest, context: grpc.ServicerContext) -> TimeSyncUpdateResponse:
response = TimeSyncUpdateResponse()
response.state.status = TimeSyncState.STATUS_OK
return response
def main() -> None:
server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
mock = MockAnyRobotService()
add_TimeSyncServiceServicer_to_server(mock, server)
add_RobotIdServiceServicer_to_server(mock, server)
add_RobotStateServiceServicer_to_server(mock, server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
main()These mocks are seldom used or even referenced explicitly though. Unless one such basic service is under test,
the spot_wrapper.testing.mocks.MockSpot mock is a far more convenient choice. This mock class inherits from all
gRPC servicer classes that the Spot SDK exposes as well as from all mocks available under spot_wrapper.testing.mocks.
It is thus the natural starting point when writing most tests.
import concurrent.futures
import grpc
from spot_wrapper.testing.mocks import MockSpot
class MyMockSpot(MockSpot):
pass
def main() -> None:
server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
server.add_insecure_port("[::]:50051")
mock = MyMockSpot()
mock.add_to(server)
server.start()
server.wait_for_termination()
if __name__ == "__main__":
main()spot_wrapper.testing.mocks.MockSpot also features automatic service specification support. This means that, upon mock
instantiation, all gRPC service stubs left unimplemented will be (re)defined as deferred method handlers. These handlers
are callable objects that allow resolving invocations in advance (by manipulating the handler future) or on a first-come
first-served basis from different threads (serving handler calls).
import concurrent.futures
import grpc
from spot_wrapper.testing.mocks import MockSpot
from bosdyn.api.robot_command_pb2 import RobotCommandResponse
class MyMockSpot(MockSpot):
pass
def main() -> None:
server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
server.add_insecure_port("[::]:50051")
mock = MyMockSpot()
mock.add_to(server)
server.start()
# proceed to mock a sequence of exchanges
mock.RobotCommand.future.fails(grpc.StatusCode.INTERNAL) # first one will fail
response = RobotCommandResponse()
response.status = RobotCommandResponse.Status.STATUS_OK
mock.RobotCommand.future.returns(response) # second one will succeed
call = mock.RobotCommand.serve(timeout=5.0) # third one will be processed here
assert call is not None
assert call.request.command.Which("command") == "full_body_command"
response = RobotCommandResponse()
response.status = RobotCommandResponse.Status.STATUS_OK
call.returns(response)
# done, now wait
server.wait_for_termination()
if __name__ == "__main__":
main()As such, instances of MockSpot (and MockSpot subclasses) cover all Spot services with a mixture of built-in, user-defined,
and deferred implementations. Note that it is up to the user to handle deferred service requests in a timely manner. Failing
to do so may result in unexpected and potentially indefinite hangs (e.g. if the Spot SDK is blocked waiting on a service response
for a service request that was left unhandled).
import concurrent.futures
import grpc
from spot_wrapper.testing.mocks import MockSpot
class MyMockSpot(MockSpot):
pass
def main() -> None:
server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
server.add_insecure_port("[::]:50051")
with MyMockSpot() as mock:
mock.add_to(server)
server.start()
server.wait_for_termination() # no deferred RobotCommand service request will be handled now
# when leaving context, a warning will be issued about pending RobotCommand service requests if any
if __name__ == "__main__":
main()MockSpot subclasses can also enable automatic service tracking support by setting the autotrack class attribute. When autotrack
is True, all gRPC service stub implementations, built-in, user-defined, or deferred, will be decorated to track the number of
calls and all the requests served. Note this can have a substantial impact on memory usage over time. It can also affect streamed
services timing, as streamed service requests are collected in full before service invocation.
import concurrent.futures
import grpc
from spot_wrapper.testing.mocks import MockSpot
from bosdyn.api.robot_command_pb2 import RobotCommandRequest, RobotCommandResponse
class MyMockSpot(MockSpot):
autotrack = True
def RobotCommand(self, request: RobotCommandRequest, context: grpc.ServicerContext) -> RobotCommandResponse:
response = RobotCommandResponse()
response.state.status = RobotCommandResponse.Status.STATUS_OK
return response
def main() -> None:
server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
server.add_insecure_port("[::]:50051")
try:
with MyMockSpot() as mock:
mock.add_to(server)
server.start()
server.wait_for_termination() # no deferred RobotCommand service request will be handled now
finally:
print(f"RobotCommand service invoked {mock.RobotCommand.num_calls} times with", mock.RobotCommand.requests)
if __name__ == "__main__":
main()For the odd cases in which better control over how the mock is put together is necessary, there is spot_wrapper.testing.mocks.BaseMockSpot.
BaseMockSpot is not as much a mock as it is a mock interface. It inherits from all gRPC servicer classes that the Spot SDK exposes too,
but uses no built-in mock and disables both automatic service specification and tracking by default. Subclasses can enable them using the
autospec and autotrack class attributes, respectively.
Fixtures are pytest.fixtures built around mocks. Decorating a mock class with the spot_wrapper.testing.fixture function turns it into a fixture.
When requested by a test, the fixture will start a gRPC server in a background thread to serve mocked services for as long as it remains in scope.
This gRPC server will listen at a unique address and port, both of which are made available as fixture attributes. The mock itself will be available
as an api fixture attribute.
import logging
import spot_wrapper.testing
from spot_wrapper.testing.mocks import MockSpot
from spot_wrapper.wrapper import SpotWrapper
@spot_wrapper.testing.fixture
class simplest_mock(MockSpot):
name = "bob"
def test_wrapper(simplest_mock):
wrapper = SpotWrapper(
username="spot",
password="spot",
hostname=simplest_mock.address,
port=simplest_mock.port,
robot_name=simplest_mock.api.name,
logger=logging.getLogger("spot"),
)
assert wrapper.is_validA note about implementation. Fixtures work with insecure (i.e. non-encrypted) gRPC channels only. spot_wrapper.testing takes care
of monkeypatching in-process secure gRPC channels to allow testing existing code with no provisions for insecure gRPC channels (such as the
Spot SDK), but it will not (because it cannot) do this for other processes. It is up to the user to ensure an insecure gRPC channel with the
right address and port number is used.
No built-in fixtures are provided, as these are always user-defined.
The usual pattern for testing using spot_wrapper.testing machinery is shown below:
import logging
import spot_wrapper.testing
from spot_wrapper.testing.mocks import MockSpot
from spot_wrapper.wrapper import SpotWrapper
@spot_wrapper.testing.fixture
class some_mock(MockSpot):
name = "bob"
# def secondary dummy services
def test_something(some_mock):
wrapper = SpotWrapper(
username="spot",
password="spot",
hostname=some_mock.address,
port=some_mock.port,
robot_name=some_mock.api.name,
logger=logging.getLogger("spot"),
)
assert wrapper.is_valid
# do something with some_mock.apiUsing MockSpot for the fixture takes care of initialization exchanges. User-defined mock services, defined locally or separately as
a mock of its own to enable reuse, can take care of the exchanges that are secondary to (but often required for) the test. The actual
test is carried out through deferred mock services, tying the flow of gRPC exchanges to the execution path of the test.
For further information, refere to the spot_wrapper.testing
codebase and documentation.