diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml new file mode 100644 index 0000000..a812dae --- /dev/null +++ b/.github/workflows/docker.yml @@ -0,0 +1,71 @@ +name: Docker Image Build and Publish + +on: + push: # * Run for every push + branches: [ "*" ] + tags: [ '*' ] + schedule: # Run on Tuesday's at 12:00 + - cron: '0 12 * * 2' + workflow_dispatch: # Run manually + +env: + # Use docker.io for Docker Hub if empty + REGISTRY: ghcr.io + # github.repository as / + IMAGE_NAME: ${{ github.repository }} + + +jobs: + build_and_publish: + + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v2 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + # Login against a Docker registry + # https://github.com/docker/login-action + - name: Log into registry ${{ env.REGISTRY }} + uses: docker/login-action@343f7c4344506bcbf9b4de18042ae17996df046d # v3.0.0 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + # Extract metadata (tags, labels) for Docker + # https://github.com/docker/metadata-action + - name: Extract Docker metadata + id: meta + uses: docker/metadata-action@96383f45573cb7f253c731d3b3ab81c87ef81934 # v5.0.0 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + type=schedule + type=ref,event=branch + type=ref,event=tag + type=ref,event=pr + type=raw,value=latest,enable={{is_default_branch}} + + # Build and push Docker image with Buildx + # https://github.com/docker/build-push-action + - name: Build and push Docker image + id: build-and-push + uses: docker/build-push-action@0565240e2d4ab88bba5387d719585280857ece09 # v5.0.0 + with: + context: . + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + platforms: linux/amd64,linux/arm64 + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml new file mode 100644 index 0000000..f01c862 --- /dev/null +++ b/.github/workflows/pre-commit.yml @@ -0,0 +1,20 @@ +name: Pre-Commit Checks + +on: + push: # Run for every push + branches: ["*"] + tags: ["*"] + workflow_dispatch: # Run manually + +jobs: + pre-commit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + name: Checkout code + - uses: actions/setup-python@v4 + name: Setup Python + with: + python-version: 3.9 + - uses: pre-commit/action@v3.0.1 + name: Run Pre-Commit Checks diff --git a/.justfile b/.justfile new file mode 100644 index 0000000..4f0003b --- /dev/null +++ b/.justfile @@ -0,0 +1,28 @@ +# List available commands +default: + @just --list --justfile {{justfile()}} + +# initialize the project +init: + @which pdm || echo "pdm not found, you'll need to install it: https://github.com/pdm-project/pdm" + @pdm install -G:all + @OSTYPE="" . .venv/bin/activate + @which pre-commit && pre-commit install && pre-commit autoupdate || true + +# Build the project +build: init dcb + +# Run the pre-commit checks +checks: + @pre-commit run --all-files || { echo "Checking fixes\n" ; pre-commit run --all-files; } +check: checks + +# Run automated tests +test: + @pytest +tests: test +pytest: test + +# Build docker image +dcb: + @docker compose build diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 22ff296..caed31b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,7 +16,7 @@ repos: - id: nbstripout - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.9.9 + rev: v0.11.5 hooks: # Run the linter. - id: ruff diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4eb26e6 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,25 @@ +FROM ghcr.io/ad-sdl/madsci + +LABEL org.opencontainers.image.source=https://github.com/AD-SDL/hudson_platecrane_module +LABEL org.opencontainers.image.description="Drivers and REST API's for the Hudson Platecrane and Sciclops robots" +LABEL org.opencontainers.image.licenses=MIT + +######################################### +# Module specific logic goes below here # +######################################### + +RUN apt update && apt install -y libusb-1.0-0-dev && rm -rf /var/lib/apt/lists/* + +RUN mkdir -p sciclops_module + +COPY ./src sciclops_module/src +COPY ./README.md sciclops_module/README.md +COPY ./pyproject.toml sciclops_module/pyproject.toml + +RUN --mount=type=cache,target=/root/.cache \ + pip install -e ./sciclops_module + +RUN usermod -aG dialout madsci + +CMD ["python", "-,", "sciclops_rest_node"] +######################################### diff --git a/README.md b/README.md index dd1c035..9eedb34 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,26 @@ # sciclops_module -A MADSci node module for interfacing with the Hudson Robotics Sciclops Platecrane + +A MADSci Node module for interfacing with the Hudson Robotics Sciclops Platecrane. + +## Installation and Usage + +### Python + +```bash +# Create a virtual environment named .venv +python -m venv .venv +# Activate the virtual environment on Linux or macOS +source .venv/bin/activate +# Alternatively, activate the virtual environment on Windows +# .venv\Scripts\activate +# Install the module and dependencies in the venv +pip install . +# Run the environment +python -m sciclops_rest_node --host 127.0.0.1 --port 2000 +``` + +### Docker + +- We provide a `Dockerfile` and example docker compose file (`compose.yaml`) to run this node dockerized. +- There is also a pre-built image available as `ghcr.io/ad-sdl/sciclops_module`. +- You can control the container user's id and group id by setting the `USER_ID` and `GROUP_ID` diff --git a/compose.yaml b/compose.yaml new file mode 100644 index 0000000..577c6bc --- /dev/null +++ b/compose.yaml @@ -0,0 +1,16 @@ +name: sciclops_node +services: + sciclops: + container_name: sciclops + image: ghcr.io/ad-sdl/sciclops_module + build: + context: . + dockerfile: Dockerfile + tags: + - ghcr.io/ad-sdl/sciclops_module:latest + - ghcr.io/ad-sdl/sciclops_module:dev + command: python -m sciclops_rest_node + privileged: true + network_mode: host + volumes: + - ./definitions:/home/madsci/definitions diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..adac53b --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,144 @@ +[project] +name = "hudson_platecrane_module" +version = "1.5.0" +description = "Driver for the Platecrane and Sciclops" +authors = [ + {name = "Ryan D. Lewis", email="ryan.lewis@anl.gov"}, + {name = "Rafael Vescovi", email="ravescovi@anl.gov"}, + {name = "Doga Ozgulbas", email="dozgulbas@anl.gov"}, + {name = "Abe Stroka", email="astroka@anl.gov"}, + {name = "Kyle Hippe", email = "khippe@anl.gov"}, + {name = "Tobias Ginsburg", email = "tginsburg@anl.gov"}, +] +dependencies = [ + "fastapi>=0.103", + "uvicorn>=0.14.0", + "pyusb", + "libusb", + "pyserial", + "madsci.node_module>=0.1.9", + "pydantic>=2.7", + "pytest" +] +requires-python = ">=3.9.1" +readme = "README.md" +license = {text = "MIT"} + +[project.urls] +homepage = "https://github.com/AD-SDL/sciclops_module" + +###################### +# Build Info + Tools # +###################### +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +##################### +# Development Tools # +##################### + +[tool.ruff] +# https://docs.astral.sh/ruff/configuration/ + +# Exclude a variety of commonly ignored directories. +exclude = [ + ".bzr", + ".direnv", + ".eggs", + ".git", + ".git-rewrite", + ".hg", + ".mypy_cache", + ".nox", + ".pants.d", + ".pytype", + ".ruff_cache", + ".svn", + ".tox", + ".venv", + "__pypackages__", + "_build", + "buck-out", + "build", + "dist", + "node_modules", + "venv", + "docs", +] + +# Same as Black. +line-length = 88 +indent-width = 4 + +# Assume Python 3.8 +target-version = "py38" + +[tool.ruff.lint] +# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. +select = [ + # pycodestyle + "E", + # Pyflakes + "F", + # pyupgrade + # "UP", + # flake8-bugbear + "B", + # flake8-simplify + # "SIM", + # isort + "I", + # Warning + "W", + # pydocstyle + "D100", "D101", "D102", "D103", "D104", "D105", "D106", "D107", + # ruff + # "RUF" +] +ignore = [ + "E501" # Line too long +] + +# Allow fix for all enabled rules (when `--fix`) is provided. +fixable = ["ALL"] +unfixable = [] + +# Allow unused variables when underscore-prefixed. +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" + +[tool.ruff.format] +# Like Black, use double quotes for strings. +quote-style = "double" + +# Like Black, indent with spaces, rather than tabs. +indent-style = "space" + +# Like Black, respect magic trailing commas. +skip-magic-trailing-comma = false + +# Like Black, automatically detect the appropriate line ending. +line-ending = "auto" + +[tool.pytest.ini_options] +# https://docs.pytest.org/en/stable/customize.html +addopts = "-x" +junit_family="xunit1" +filterwarnings = [ + "ignore::DeprecationWarning", + "ignore::pottery.exceptions.InefficientAccessWarning", +] +markers = [ + "hardware: marks test as requiring hardware (deselect with '-m \"not hardware\"')", +] + +[tool.mypy] +# https://mypy.readthedocs.io/en/stable/config_file.html#using-a-pyproject-toml +show_error_codes = true +check_untyped_defs = true +follow_imports = "normal" +strict_optional = true +plugins = ["pydantic.mypy"] +strict = true +disallow_untyped_defs = true +implicit_reexport = true diff --git a/src/sciclops_interface.py b/src/sciclops_interface.py new file mode 100644 index 0000000..a45efcc --- /dev/null +++ b/src/sciclops_interface.py @@ -0,0 +1,778 @@ +"""Driver for the Hudson Robotics Sciclops robot.""" + +import asyncio +import re + +import usb.core +import usb.util +from madsci.client.resource_client import ResourceClient +from madsci.common.types.node_types import RestNodeConfig + + +class SCICLOPS: + """ + Description: + Python interface that allows remote commands to be executed to the Sciclops. + """ + + def __init__( + self, config: RestNodeConfig, resource_client: ResourceClient, gripper_id: str + ): + """Creates a new SCICLOPS driver object. The default VENDOR_ID and PRODUCT_ID are for the Sciclops robot.""" + self.VENDOR_ID = config.vendor_id + self.PRODUCT_ID = config.product_id + self.resource_client = resource_client + self.gripper_id = gripper_id + self.neutral_joints = config.neutral_joints + self.host_path = self.connect_sciclops() + self.exchange_location = config.exchange_location + self.current_pos = [0, 0, 0, 0] + self.STATUS = 0 + self.ERROR = "" + self.plate_info = config.plate_info + self.success_count = 0 + self.status = self.get_status() + self.error = self.get_error() + self.movement_state = "READY" + + def connect_sciclops(self): + """ + Connect to USB device. If wrong device, inform user + """ + host_path = usb.core.find(idVendor=self.VENDOR_ID, idProduct=self.PRODUCT_ID) + + if host_path is None: + raise Exception("Could not establish connection.") + + else: + print("Device Connected") + return host_path + + def disconnect_robot(self): + """Disconnects from the sciclops robot.""" + try: + usb.util.dispose_resources(self.host_path) + except Exception as err: + print(err) + else: + print("Robot is disconnected") + + def send_command(self, command): + """ + Sends provided command to Sciclops and stores data outputted by the sciclops. + """ + + self.host_path.write(4, command) + + response_buffer = "Write: " + command + msg = None + + # Adds SciClops output to response_buffer + while msg != command: + # or "success" not in msg or "error" in msg + try: + response = self.host_path.read(0x83, 200, timeout=5000) + except Exception: + break + msg = "".join(chr(i) for i in response) + response_buffer = response_buffer + "Read: " + msg + + print(response_buffer) + + self.success_count = self.success_count + response_buffer.count("0000 Success") + + self.get_error(response_buffer) + + return response_buffer + + def get_error(self, response_buffer=None): + """ + Gets error message from the feedback. + """ + + if not response_buffer: + return + + output_line = response_buffer[response_buffer[:-1].rfind("\n") :] + exp = r"(\d)(\d)(\d)(\d)(.*\w)" # Format of feedback that indicates an error message + output_line = re.search(exp, response_buffer) + + try: + # Checks if specified format is found in the last line of feedback + if output_line[5][5:9] != "0000": + self.ERROR = "ERROR: %s" % output_line[5] + + except Exception: + pass + + ################################ + # Individual Command Functions + + def get_position(self): + """ + Requests and stores sciclops position. + Coordinates: + Z: Vertical axis + R: Base turning axis + Y: Extension axis + P: Gripper turning axis + """ + + command = "GETPOS\r\n" # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + print(out_msg) + # Checks if specified format is found in feedback + exp = r"Z:([-.\d]+), R:([-.\d]+), Y:([-.\d]+), P:([-.\d]+)" # Format of coordinates provided in feedback + find_current_pos = re.search(exp, out_msg) + self.current_pos = { + "Z": float(find_current_pos[1]), + "R": float(find_current_pos[2]), + "Y": float(find_current_pos[3]), + "P": float(find_current_pos[4]), + } + + return self.current_pos + except Exception as e: + raise (e) + + def get_status(self): + """ + Checks status of Sciclops + """ + + command = "STATUS\r\n" # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + exp = r"0000 (.*\w)" # Format of feedback that indicates that the rest of the line is the status + find_status = re.search(exp, out_msg) + self.status = find_status[1] + + print(self.status) + + except Exception: + pass + + async def check_complete(self): + """ + Checks to see if current sciclops action has completed + """ + print("Checking if complete") + command = "STATUS\r\n" # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + exp = r"0000 (.*\w)" # Format of feedback that indicates that the rest of the line is the status + find_status = re.search(exp, out_msg) + self.status = find_status[1] + self.movement_state = "READY" + + return True + + except Exception: + self.movement_state = "BUSY" + + return False + finally: + await asyncio.sleep(0.1) + + async def check_complete_loop(self): + """ + continuously runs check_complete until it returns True + """ + a = False + while not a: + a = await self.check_complete() + + print("ACTION COMPLETE") + + def get_version(self): + """ + Checks version of Sciclops + """ + + command = "VERSION\r\n" # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + exp = r"0000 (.*\w)" # Format of feedback that indicates that the rest of the line is the version + find_version = re.search(exp, out_msg) + self.VERSION = find_version[1] + + print(self.VERSION) + + except Exception: + pass + + # TODO: swings outward and collides with pf400 + def reset(self): + """ + Resets Sciclops + """ + + self.set_speed(5) + + command = "RESET\r\n" # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + exp = r"0000 (.*\w)" # Format of feedback that indicates that the rest of the line is the version + find_reset = re.search(exp, out_msg) + self.RESET = find_reset[1] + + print(self.RESET) + + except Exception: + pass + + def get_config(self): + """ + Checks configuration of Sciclops + """ + + command = "GETCONFIG\r\n" # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + exp = r"0000 (.*\w)" # Format of feedback that indicates that the rest of the line is the configuration + find_config = re.search(exp, out_msg) + self.CONFIG = find_config[1] + + print(self.CONFIG) + + except Exception: + pass + + def get_grip_length(self): + """ + Checks current length of the gripper (units unknown) of Sciclops + """ + + command = "GETGRIPPERLENGTH\r\n" # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + exp = r"0000 (.*\w)" # Format of feedback that indicates that the rest of the line is the gripper length + find_grip_length = re.search(exp, out_msg) + self.griplength = find_grip_length[1] + + print(self.griplength) + + except Exception: + pass + + command = "GETCOLLAPSEDISTANCE\r\n" # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + exp = r"0000 (.*\w)" # Format of feedback that indicates that the rest of the line is the collapsed distance + find_collapsed_distance = re.search(exp, out_msg) + self.COLLAPSEDDISTANCE = find_collapsed_distance[1] + + print(self.COLLAPSEDDISTANCE) + + except Exception: + pass + + def get_steps_per_unit(self): + """ + ??? + """ + + command = "GETSTEPSPERUNIT\r\n" # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + exp = r"Z:([-.\d]+),R:([-.\d]+),Y:([-.\d]+),P:([-.\d]+)" # Format of the coordinates provided in feedback + find_steps_per_unit = re.search(exp, out_msg) + self.STEPSPERUNIT = [ + float(find_steps_per_unit[1]), + float(find_steps_per_unit[2]), + float(find_steps_per_unit[3]), + float(find_steps_per_unit[4]), + ] + + print(self.STEPSPERUNIT) + + except Exception: + pass + + def home(self, axis=""): + """ + Homes all of the axes. Returns to neutral position (above exchange) + """ + + # Moves axes to home position + command = "HOME\r\n" # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + exp = r"0000 (.*\w)" # Format of feedback that indicates that the rest of the line is the success message + home_msg = re.search(exp, out_msg) + self.HOMEMSG = home_msg[1] + + print(self.HOMEMSG) + except Exception: + pass + + # Moves axes to neutral position (above exchange) + + def open(self): + """ + Opens gripper + """ + + command = "OPEN\r\n" # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + exp = r"0000 (.*\w)" # Format of feedback that indicates that the rest of the line is the success message + open_msg = re.search(exp, out_msg) + self.OPENMSG = open_msg[1] + print(self.OPENMSG) + + except Exception: + pass + + def close(self): + """ + Closes gripper + """ + + command = "CLOSE\r\n" # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + exp = r"0000 (.*\w)" # Format of feedback that indicates that the rest of the line is the success message + close_msg = re.search(exp, out_msg) + self.CLOSEMSG = close_msg[1] + + print(self.CLOSEMSG) + except Exception: + pass + + def check_open(self): + """ + Checks if gripper is open + """ + + command = "GETGRIPPERISOPEN\r\n" # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + exp = r"0000 (.*\w)" # Format of feedback that indicates that the rest of the line answers if the gripper is open + check_open_msg = re.search(exp, out_msg) + self.CHECKOPENMSG = check_open_msg[1] + + print(self.CHECKOPENMSG) + except Exception: + pass + + def check_closed(self): + """ + Checks if gripper is closed + """ + + command = "GETGRIPPERISCLOSED\r\n" # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + exp = r"0000 (.*\w)" # Format of feedback that indicates that the rest of the line answers if the gripper is closed + check_closed_msg = re.search(exp, out_msg) + self.CHECKCLOSEDMSG = check_closed_msg[1] + + print(self.CHECKCLOSEDMSG) + + except Exception: + pass + + def check_plate(self): + """ + ??? + """ + + command = "GETPLATEPRESENT\r\n" # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + exp = r"0000 (.*\w)" # Format of feedback that indicates ??? + check_plate_msg = re.search(exp, out_msg) + self.CHECKPLATEMSG = check_plate_msg[1] + + print(self.CHECKPLATEMSG) + + except Exception: + pass + + def set_speed(self, speed): + """ + Changes speed of Sciclops + """ + + command = "SETSPEED %d\r\n" % speed # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + exp = r"0000 (.*\w)" # Format of feedback that indicates success message + set_speed_msg = re.search(exp, out_msg) + self.SETSPEEDMSG = set_speed_msg[1] + print(self.SETSPEEDMSG) + except Exception: + pass + + def list_points(self): + """ + Lists all of the preset points + """ + + command = "LISTPOINTS\r\n" # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + list_point_msg_index = out_msg.find( + "0000" + ) # Format of feedback that indicates success message + self.LISTPOINTS = out_msg[list_point_msg_index + 4 :] + print(self.LISTPOINTS) + except Exception: + pass + + def jog(self, axis, distance): + """ + Moves the specified axis the specified distance. + """ + + command = "JOG %s,%d\r\n" % (axis, distance) # Command interpreted by Sciclops + out_msg = self.send_command(command) + + try: + # Checks if specified format is found in feedback + jog_msg_index = out_msg.find( + "0000" + ) # Format of feedback that indicates success message + self.JOGMSG = out_msg[jog_msg_index + 4 :] + print(self.JOGMSG) + except Exception: + pass + + def loadpoint(self, R, Z, P, Y): + """ + Adds point to listpoints function + """ + + command = "LOADPOINT R:%s, Z:%s, P:%s, Y:%s, R:%s\r\n" % ( + R, + Z, + P, + Y, + R, + ) # Command interpreted by Sciclops + out_msg = self.send_command(command) + try: + # Checks if specified format is found in feedback + loadpoint_msg_index = out_msg.find( + "0000" + ) # Format of feedback that indicates success message + self.LOADPOINTMSG = out_msg[loadpoint_msg_index + 5 :] + except Exception: + pass + + def deletepoint(self, R, Z, P, Y): + """ + Deletes point from listpoints function + """ + + command = "DELETEPOINT R:%s\r\n" % R # Command interpreted by Sciclops + out_msg = self.send_command(command) + try: + deletepoint_msg_index = out_msg.find( + "0000" + ) # Format of feedback that indicates success message + self.DELETEPOINTMSG = out_msg[deletepoint_msg_index + 5 :] + except Exception: + pass + + def move(self, R, Z, P, Y): + """ + Moves to specified coordinates + """ + + self.loadpoint(R, Z, P, Y) + + command = "MOVE R:%s\r\n" % R + out_msg_move = self.send_command(command) + + try: + # Checks if specified format is found in feedback + move_msg_index = out_msg_move.find( + "0000" + ) # Format of feedback that indicates success message + self.MOVEMSG = out_msg_move[move_msg_index + 4 :] + except Exception: + pass + + self.deletepoint(R, Z, P, Y) + + def move_neutral(self): + """Move the robot arm to the neutral position.""" + self.move( + R=self.neutral_joints["R"], + Z=self.neutral_joints["Z"], + P=self.neutral_joints["P"], + Y=self.neutral_joints["Y"], + ) + + def get_plate(self, source, target): + """ + Grabs plate and places on exchange. Paramater is the stack that the Sciclops is requested to remove the plate from. + Format: "Stack" + remove lid and trash bools tell whether to remove lid from plate and whether to throw said lid in the trash or place in nest + """ + + plate_type = "96_well" + # Move arm up and to neutral position to avoid hitting any objects + self.open() + self.set_speed(10) # + self.jog("Y", -1000) + self.jog("Z", 1000) + self.set_speed(12) + self.move_neutral() + + # check coordinates + asyncio.run(self.check_complete_loop()) + + # Move above desired tower + self.set_speed(100) + self.move( + R=source.location["R"], + Z=23.5188, + P=source.location["P"], + Y=source.location["Y"], + ) + # check coordinates + asyncio.run(self.check_complete_loop()) + + # Remove plate from towertower_info["type"] + self.close() + self.set_speed(15) + self.jog("Z", -1000) + # move up certain amount + self.jog("Z", 10) + self.open() + grab_height = self.plate_info[plate_type]["grab_tower"] + self.jog("Z", grab_height) + self.close() + try: + plate, _ = self.resource_client.pop(source.resource_id) + self.resource_client.push(self.gripper_id, plate) + except Exception as e: + self.open() + self.move_neutral() + raise e + self.set_speed(100) + self.jog("Z", 1000) + # check coordinates + # asyncio.run(self.check_complete_loop()) + + # Place in exchange + self.move( + R=target.location["R"], + Z=23.5188, + P=target.location["P"], + Y=target.location["Y"], + ) + # check coordinates + # asyncio.run(self.check_complete_loop()) + self.jog("Z", -380) + self.set_speed(5) + self.jog("Z", -30) + self.open() + try: + plate, _ = self.resource_client.pop(self.gripper_id) + self.resource_client.push(target.resource_id, plate) + except Exception as e: + self.move_neutral() + raise e + + self.set_speed(100) + self.jog("Z", 1000) + # check coordinates + # asyncio.run(self.check_complete_loop()) + # check if lid needs to be removed + # Move back to neutral + self.move_neutral() + # check coordinates + # asyncio.run(self.check_complete_loop()) + + def return_plate(self, source, target): + """ + Grabs plate and places on exchange. Paramater is the stack that the Sciclops is requested to remove the plate from. + Format: "Stack" + remove lid and trash bools tell whether to remove lid from plate and whether to throw said lid in the trash or place in nest + """ + + plate_type = "96_well" + # Move arm up and to neutral position to avoid hitting any objects + self.open() + self.set_speed(10) # + self.jog("Y", -1000) + self.jog("Z", 1000) + self.set_speed(12) + self.move_neutral() + + # check coordinates + asyncio.run(self.check_complete_loop()) + + # Move above desired tower + self.set_speed(100) + self.move( + R=source.location["R"], + Z=23.5188, + P=source.location["P"], + Y=source.location["Y"], + ) + # check coordinates + asyncio.run(self.check_complete_loop()) + + self.close() + self.set_speed(15) + self.jog("Z", -380) + # move up certain amount + self.open() + self.set_speed(5) + self.jog("Z", -30) + grab_height = self.plate_info[plate_type]["grab_tower"] + self.jog("Z", grab_height) + self.close() + try: + plate, _ = self.resource_client.pop(source.resource_id) + self.resource_client.push(self.gripper_id, plate) + except Exception as e: + self.open() + self.move_neutral() + raise e + self.set_speed(50) + self.jog("Z", 1000) + + # Place in tower + self.move( + R=target.location["R"], + Z=23.5188, + P=target.location["P"], + Y=target.location["Y"], + ) + + self.jog("Z", -1000) + self.jog("Z", 10) + self.open() + try: + plate, _ = self.resource_client.pop(self.gripper_id) + self.resource_client.push(target.resource_id, plate) + except Exception as e: + self.move_neutral() + raise e + self.set_speed(100) + self.jog("Z", 1000) + self.move_neutral() + + def limp(self, limp_bool): + """ + Turns on/off limp mode (allows someone to manually move joints) + """ + if limp_bool: + limp_string = "FALSE" + else: + limp_string = "TRUE" + command = "LIMP %s" % limp_string # Command interpreted by Sciclops + self.send_command(command) + + def plate_to_stack(self, tower, add_lid): + """Plate from exchange to stack (self, tower, plateinfo)""" + # Move arm up and to neutral position to avoid hitting any objects + self.open() + self.set_speed(10) + self.jog("Y", -1000) + self.jog("Z", 1000) + self.set_speed(12) + self.move( + R=self.labware["neutral"]["pos"]["R"], + Z=23.5188, + P=self.labware["neutral"]["pos"]["P"], + Y=self.labware["neutral"]["pos"]["Y"], + ) + asyncio.run(self.check_complete_loop()) + plate_type = self.labware["exchange"]["type"] + if add_lid: + self.check_for_lid() + self.replace_lid() + + # TODO: check to see if given stack is full, use function to account for different labware, maybe checks all stacks to find one with same labware? + + # move over exchange + self.open() + self.move( + R=self.labware["exchange"]["pos"]["R"], + Z=23.5188, + P=self.labware["exchange"]["pos"]["P"], + Y=self.labware["exchange"]["pos"]["Y"], + ) + # check coordinates + asyncio.run(self.check_complete_loop()) + # grab plate + self.set_speed(100) + self.jog("Z", -380) + grab_height = self.plate_info[plate_type]["grab_exchange"] + self.jog("Z", grab_height) + self.close() + self.set_speed(100) + self.jog("Z", 1000) + asyncio.run(self.check_complete_loop()) + + # move above tower, place plate in tower + self.move( + R=self.labware[tower]["pos"]["R"], + Z=23.5188, + P=self.labware[tower]["pos"]["P"], + Y=self.labware[tower]["pos"]["Y"], + ) + # check coordinates + asyncio.run(self.check_complete_loop()) + self.set_speed(10) + self.jog("Z", -1000) + self.open() + self.set_speed(100) + self.jog("Z", 1000) + # check coordinates + asyncio.run(self.check_complete_loop()) + + # move to home + self.move( + R=self.labware["neutral"]["pos"]["R"], + Z=23.5188, + P=self.labware["neutral"]["pos"]["P"], + Y=self.labware["neutral"]["pos"]["Y"], + ) + # check coordinates + asyncio.run(self.check_complete_loop()) + + # update labware dict + self.labware["exchange"]["howmany"] -= 1 + self.labware[tower]["howmany"] += 1 diff --git a/src/sciclops_rest_node.py b/src/sciclops_rest_node.py new file mode 100644 index 0000000..19341da --- /dev/null +++ b/src/sciclops_rest_node.py @@ -0,0 +1,157 @@ +#! /usr/bin/env python3 +"""The server for the Hudson Platecrane/Sciclops that takes incoming WEI flow requests from the experiment application""" + +from typing import Any, Optional + +from madsci.client.resource_client import ResourceClient +from madsci.common.types.action_types import ActionSucceeded +from madsci.common.types.admin_command_types import AdminCommandResponse +from madsci.common.types.auth_types import OwnershipInfo +from madsci.common.types.location_types import LocationArgument +from madsci.common.types.node_types import RestNodeConfig +from madsci.common.types.resource_types.definitions import SlotResourceDefinition +from madsci.node_module.helpers import action +from madsci.node_module.rest_node_module import RestNode +from typing_extensions import Annotated + +from sciclops_interface import SCICLOPS + + +class SciclopsConfig(RestNodeConfig): + """Configuration for the camera node module.""" + + vendor_id: int = 0x7513 + """The sciclops vendor id address, a device path in Linux/Mac.""" + + product_id: int = 0x0002 + + """The sciclops vendor id address, a device path in Linux/Mac.""" + + neutral_joints: dict[str, float] = { + "Z": 23.5188, + "R": 109.2741, + "Y": 32.7484, + "P": 98.2955, + } + """The neutral joint position for the arm""" + + plate_info: Optional[Any] = None + """The specs for picking up different kinds of plates""" + + exchange_location: Optional[Any] = None + """the location of the exchange for placing plates""" + + resource_manager_url: Optional[str] = None + """the resource manager url for the sciclops""" + + +class SciclopsNode(RestNode): + """MADSci node module for the Hudson Robotics Sciclops.""" + + config_model = SciclopsConfig + + def startup_handler(self): + """Initial run function for the app, initializes the state + ParametersNodeLocation + ---------- + app : FastApi + The REST API app being initialized + + Returns + ------- + None""" + print("Hello, World!") + try: + self.resource_client = ResourceClient(self.config.resource_manager_url) + self.gripper = self.resource_client.init_resource( + SlotResourceDefinition( + resource_name="sciclops_gripper_" + + str(self.node_definition.node_name), + owner=OwnershipInfo(node_id=self.node_definition.node_id), + ) + ) + self.sciclops = SCICLOPS( + self.config, self.resource_client, self.gripper.resource_id + ) + except Exception as error_msg: + print("------- SCICLOPS Error message: " + str(error_msg) + (" -------")) + raise (error_msg) + else: + print("SCICLOPS online") + + @action(name="status") + def status(self): + """Action that forces the sciclops to check its status.""" + + self.sciclops.get_status() + return ActionSucceeded() + + @action + def home(self): + """Homes the sciclops""" + self.sciclops.home() + return ActionSucceeded() + + @action(name="get_plate") + def get_plate( + self, + source: Annotated[LocationArgument, "Stack to get plate from"], + target: Annotated[LocationArgument, "Exchange to place plate"], + ): + """Get a plate from a stack position and move it to transfer point (or trash)""" + self.sciclops.get_plate(source, target) + return ActionSucceeded() + + @action(name="return_plate") + def return_plate( + self, + source: Annotated[LocationArgument, "Exchange to get plate from"], + target: Annotated[LocationArgument, "Tower to place plate"], + ): + """Get a plate from a stack position and move it to transfer point (or trash)""" + self.sciclops.return_plate(source, target) + return ActionSucceeded() + + @action(name="limp") + def limp( + self, + toggle: Annotated[bool, "turn on or off bool"] = False, + ): + """Get a plate from a stack position and move it to transfer point (or trash)""" + self.sciclops.limp(toggle) + return ActionSucceeded() + + @action(name="open") + def open( + self, + ): + """Get a plate from a stack position and move it to transfer point (or trash)""" + self.sciclops.open() + return ActionSucceeded() + + @action(name="close") + def close( + self, + ): + """Get a plate from a stack position and move it to transfer point (or trash)""" + self.sciclops.close() + return ActionSucceeded() + + @action(name="move") + def move(self, target: Annotated[LocationArgument, "Target Location to move to"]): + """Get a plate from a stack position and move it to transfer point (or trash)""" + location = target.location + self.sciclops.move(location["Z"], location["R"], location["Y"], location["P"]) + return ActionSucceeded() + + def get_location(self) -> AdminCommandResponse: + """Return the current position of the sciclops""" + try: + return AdminCommandResponse(data={"location": self.sciclops.get_position()}) + except Exception: + return AdminCommandResponse(success=False) + + +if __name__ == "__main__": + sciclops_node = SciclopsNode() + sciclops_node.start_node()