Skip to content

Testing Infrastructure

mhidalgo-bdai edited this page Jan 31, 2024 · 9 revisions

The spot_wrapper.testing submodule offers pytest compatible machinery to test Spot SDK usage.

At its core, this machinery is nothing but mocks and fixtures: mocks of Spot gRPC services, and fixtures serving those mocks over the wire. This affords complete control over a well-defined interface that spans the entire feature set (effectively emulating an Spot robot) at the expense of additional complexity.

A word of advice. spot_wrapper.testing complexity is not always warranted. A single Spot SDK API can involve multiple gRPC service invocations, and Spot SDK documentation is seldom transparent about this. An apparent unit test is in fact an integration test. For cases in which this is problematic, other mocking frameworks and utilities, such as unittest.mock or pytest-mocker in Python, o gmock in C++, can be used instead of (and also in addition to) spot_wrapper.testing to mock Spot SDK APIs directly, in-process.

Mocks

Mocking with spot_wrapper.testing means mocking gRPC services specific to Spot robots. In practice, this amounts to subclassing gRPC servicer classes and providing an implementation for each service stub. Aside from implementation specific details -- typically a dummy implementation bearing minimal logic, if any --, this is no different from any other gRPC service implementation, and thus the same semantics and guidelines apply. For instance, mocks must served by a gRPC server too.

import concurrent.futures

import grpc
from bosdyn.api.robot_id_pb2 import RobotIdRequest, RobotIdResponse
from bosdyn.api.robot_id_service_pb2_grpc import RobotIdServiceServicer
from bosdyn.api.robot_id_service_pb2_grpc import add_RobotIdServiceServicer_to_server


class MockRobotIdService(RobotIdServiceServicer):
    """A mock Spot robot id service."""

    def GetRobotId(self, request: RobotIdRequest, context: grpc.ServicerContext) -> RobotIdResponse:
        response = RobotIdResponse()
        response.robot_id.serial_number = "1234567890"
        return response


def main() -> None:
    server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
    add_RobotIdServiceServicer_to_server(MockRobotIdService(), server)
    server.add_insecure_port("[::]:50051")
    server.start()
    server.wait_for_termination()


if __name__ == "__main__":
    main()

A complete Spot robot can be emulated in this way. It is an arduous path, however. Spot robots are complex machines featuring hundreds of services. To alleviate that burden, spot_wrapper.testing already provides enough built-in mocks to cover for spot_wrapper.SpotWrapper initialization. This includes mocks for emergency stop services, leasing services, keep-alive services, time synchronization services, and more. Each mock is designed as an standalone component, and multiple mocks can be aggregated via multiple inheritance, like building blocks of larger, more comprehensive mocks. These mocks can naturally coexist with user-defined ones.

import concurrent.futures

import grpc
from bosdyn.api.time_sync_pb2 import (
    TimeSyncState,
    TimeSyncUpdateRequest,
    TimeSyncUpdateResponse,
)
from bosdyn.api.time_sync_service_pb2_grpc import (
    TimeSyncServiceServicer,
    add_TimeSyncServiceServicer_to_server,
)

from bosdyn.api.robot_id_service_pb2_grpc import add_RobotIdServiceServicer_to_server
from bosdyn.api.robot_state_service_pb2_grpc import add_RobotStateServiceServicer_to_server

from spot_wrapper.testing.mocks.robot_id import MockRobotIdService
from spot_wrapper.testing.mocks.robot_state import MockRobotStateService


class MockAnyRobotService(MockRobotIdService, MockRobotStateService, TimeSyncServiceServicer):

     def TimeSyncUpdate(self, request: TimeSyncUpdateRequest, context: grpc.ServicerContext) -> TimeSyncUpdateResponse:
        response = TimeSyncUpdateResponse()
        response.state.status = TimeSyncState.STATUS_OK
        return response


def main() -> None:
    server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
    mock = MockAnyRobotService()
    add_TimeSyncServiceServicer_to_server(mock, server)
    add_RobotIdServiceServicer_to_server(mock, server)
    add_RobotStateServiceServicer_to_server(mock, server)
    server.add_insecure_port("[::]:50051")
    server.start()
    server.wait_for_termination()


if __name__ == "__main__":
    main()

These mocks are seldom used or even referenced explicitly though. Unless one such basic service is under test, the spot_wrapper.testing.mocks.MockSpot mock is a far more convenient choice. This mock class inherits from all gRPC servicer classes that the Spot SDK exposes as well as from all mocks available under spot_wrapper.testing.mocks. It is thus the natural starting point when writing most tests.

import concurrent.futures

import grpc

from spot_wrapper.testing.mocks import MockSpot


class MyMockSpot(MockSpot):
    pass


def main() -> None:
    server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
    server.add_insecure_port("[::]:50051")
    mock = MyMockSpot()
    mock.add_to(server)
    server.start()
    server.wait_for_termination()


if __name__ == "__main__":
    main()

spot_wrapper.testing.mocks.MockSpot also features automatic service specification support. This means that, upon mock instantiation, all gRPC service stubs left unimplemented will be (re)defined as deferred method handlers. These handlers are callable objects that allow resolving invocations in advance (by manipulating the handler future) or on a first-come first-served basis from different threads (serving handler calls).

import concurrent.futures

import grpc

from spot_wrapper.testing.mocks import MockSpot

from bosdyn.api.robot_command_pb2 import RobotCommandResponse


class MyMockSpot(MockSpot):
    pass


def main() -> None:
    server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
    server.add_insecure_port("[::]:50051")
    mock = MyMockSpot()
    mock.add_to(server)
    server.start()
    # proceed to mock a sequence of exchanges
    mock.RobotCommand.future.fails(grpc.StatusCode.INTERNAL)  # first one will fail
    response = RobotCommandResponse()
    response.status = RobotCommandResponse.Status.STATUS_OK
    mock.RobotCommand.future.returns(response)  # second one will succeed
    call = mock.RobotCommand.serve(timeout=5.0)  # third one will be processed here
    assert call is not None
    assert call.request.command.Which("command") == "full_body_command"
    response = RobotCommandResponse()
    response.status = RobotCommandResponse.Status.STATUS_OK
    call.returns(response)
    # done, now wait
    server.wait_for_termination()


if __name__ == "__main__":
    main()

As such, instances of MockSpot (and MockSpot subclasses) cover all Spot services with a mixture of built-in, user-defined, and deferred implementations. Note that it is up to the user to handle deferred service requests in a timely manner. Failing to do so may result in unexpected and potentially indefinite hangs (e.g. if the Spot SDK is blocked waiting on a service response for a service request that was left unhandled).

import concurrent.futures

import grpc

from spot_wrapper.testing.mocks import MockSpot


class MyMockSpot(MockSpot):
    pass


def main() -> None:
    server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
    server.add_insecure_port("[::]:50051")
    with MyMockSpot() as mock:
        mock.add_to(server)
        server.start()
        server.wait_for_termination()  # no deferred RobotCommand service request will be handled now
    # when leaving context, a warning will be issued about pending RobotCommand service requests if any


if __name__ == "__main__":
    main()

MockSpot subclasses can also enable automatic service tracking support by setting the autotrack class attribute. When autotrack is True, all gRPC service stub implementations, built-in, user-defined, or deferred, will be decorated to track the number of calls and all the requests served. Note this can have a substantial impact on memory usage over time. It can also affect streamed services timing, as streamed service requests are collected in full before service invocation.

import concurrent.futures

import grpc

from spot_wrapper.testing.mocks import MockSpot

from bosdyn.api.robot_command_pb2 import RobotCommandRequest, RobotCommandResponse


class MyMockSpot(MockSpot):
    autotrack = True

    def RobotCommand(self, request: RobotCommandRequest, context: grpc.ServicerContext) -> RobotCommandResponse:
        response = RobotCommandResponse()
        response.state.status = RobotCommandResponse.Status.STATUS_OK
        return response


def main() -> None:
    server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10))
    server.add_insecure_port("[::]:50051")
    try:
        with MyMockSpot() as mock:
            mock.add_to(server)
            server.start()
            server.wait_for_termination()  # no deferred RobotCommand service request will be handled now
    finally:
        print(f"RobotCommand service invoked {mock.RobotCommand.num_calls} times with", mock.RobotCommand.requests)


if __name__ == "__main__":
    main()

For the odd cases in which better control over how the mock is put together is necessary, there is spot_wrapper.testing.mocks.BaseMockSpot. BaseMockSpot is not as much a mock as it is a mock interface. It inherits from all gRPC servicer classes that the Spot SDK exposes too, but uses no built-in mock and disables both automatic service specification and tracking by default. Subclasses can enable them using the autospec and autotrack class attributes, respectively.

Fixtures

Fixtures are pytest.fixtures built around mocks. Decorating a mock class with the spot_wrapper.testing.fixture function turns it into a fixture. When requested by a test, the fixture will start a gRPC server in a background thread to serve mocked services for as long as it remains in scope. This gRPC server will listen at a unique address and port, both of which are made available as fixture attributes. The mock itself will be available as an api fixture attribute.

import logging

import spot_wrapper.testing
from spot_wrapper.testing.mocks import MockSpot
from spot_wrapper.wrapper import SpotWrapper


@spot_wrapper.testing.fixture
class simplest_mock(MockSpot):
    name = "bob"


def test_wrapper(simplest_mock):
    wrapper = SpotWrapper(
        username="spot",
        password="spot",
        hostname=simplest_mock.address,
        port=simplest_mock.port,
        robot_name=simplest_mock.api.name,
        logger=logging.getLogger("spot"),
    )
    assert wrapper.is_valid

A note about implementation. Fixtures work with insecure (i.e. non-encrypted) gRPC channels only. spot_wrapper.testing takes care of monkeypatching in-process secure gRPC channels to allow testing existing code with no provisions for insecure gRPC channels (such as the Spot SDK), but it will not (because it cannot) do this for other processes. It is up to the user to ensure an insecure gRPC channel with the right address and port number is used.

No built-in fixtures are provided, as these are always user-defined.

Patterns

The usual pattern for testing using spot_wrapper.testing machinery is shown below:

import logging

import spot_wrapper.testing
from spot_wrapper.testing.mocks import MockSpot
from spot_wrapper.wrapper import SpotWrapper


@spot_wrapper.testing.fixture
class some_mock(MockSpot):
    name = "bob"

    # def secondary dummy services


def test_something(some_mock):
    wrapper = SpotWrapper(
        username="spot",
        password="spot",
        hostname=some_mock.address,
        port=some_mock.port,
        robot_name=some_mock.api.name,
        logger=logging.getLogger("spot"),
    )
    assert wrapper.is_valid

    # do something with some_mock.api

Using MockSpot for the fixture takes care of initialization exchanges. User-defined mock services, defined locally or separately as a mock of its own to enable reuse, can take care of the exchanges that are secondary to (but often required for) the test. The actual test is carried out through deferred mock services, tying the flow of gRPC exchanges to the execution path of the test.

Reference

For further information, refere to the spot_wrapper.testing codebase and documentation.

Clone this wiki locally