Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 80 additions & 17 deletions uc2rest/motor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ def __init__(self, parent=None):
self.currentDirection = np.zeros((self.nMotors))
self.currentPosition = np.zeros((self.nMotors))

# Per-axis hardware-direction sign (+1 or -1), indexed as [A, X, Y, Z]
# Applied internally at the firmware boundary: outbound commands are
# multiplied by this sign before being sent, and inbound positions are
# multiplied by this sign before being reported. This lets callers keep
# a consistent physical/user coordinate frame while motor.py absorbs
# any wiring polarity flips. Configured via setup_motor() (either via
# an explicit `direction` argument or by passing a negative stepSize).
self.direction = np.ones((self.nMotors), dtype=np.int8)

self.minPosX = -np.inf
self.minPosY = -np.inf
self.minPosZ = -np.inf
Expand Down Expand Up @@ -83,8 +92,14 @@ def _callback_motor_status(self, data):
stepSizes = np.array((self.stepSizeA, self.stepSizeX, self.stepSizeY, self.stepSizeZ))
for iMotor in range(nSteppers):
stepperID = data["steppers"][iMotor]["stepperid"]
# Hardware returns steps, convert to physical units: (steps * stepSize)
self.currentPosition[stepperID] = data["steppers"][iMotor]["position"] * stepSizes[stepperID]
# Hardware returns raw steps in firmware frame; convert to physical units
# in user frame: phys = hw_steps * stepSize * direction. The direction sign
# hides any wiring polarity flip from the caller.
self.currentPosition[stepperID] = (
data["steppers"][iMotor]["position"]
* stepSizes[stepperID]
* self.direction[stepperID]
)
if callable(self._callbackPerKey[0]):
self._callbackPerKey[0](self.currentPosition) # we call the function with the value
except Exception as e:
Expand Down Expand Up @@ -254,7 +269,42 @@ def stopFocusScanning(self):
'''################################################################################################################################################
HIGH-LEVEL Functions that rely on basic REST-API functions
################################################################################################################################################'''
def setup_motor(self, axis, minPos, maxPos, stepSize, backlash):
def setup_motor(self, axis, minPos, maxPos, stepSize, backlash, direction=None):
"""Configure one motor axis.

The ``stepSize`` is stored internally as a positive magnitude (physical
units per step). The wiring polarity is tracked separately in
``self.direction[axis]`` so the user/physical coordinate frame stays
consistent in both directions of communication.

Parameters
----------
axis : str
One of "X", "Y", "Z", "A".
minPos, maxPos : float
Soft limits in physical (user-frame) units.
stepSize : float
Physical units per step. A negative value is interpreted as a
request to flip the hardware direction for this axis (equivalent
to passing ``direction=-1``) and is split into magnitude + sign.
backlash : float
Backlash in hardware steps (sign handled internally).
direction : int or None, optional
Explicit hardware-direction sign (+1 or -1). If ``None`` it is
derived from the sign of ``stepSize`` (default +1).
"""
# Split sign from magnitude. An explicit `direction` always wins;
# otherwise the sign of stepSize is consumed and stepSize becomes
# a positive scale factor.
if direction is None:
sign = -1 if stepSize < 0 else 1
else:
sign = -1 if direction < 0 else 1
stepSize = abs(stepSize)

axisIdx = self.xyztTo1230(axis)
self.direction[axisIdx] = sign
Comment on lines +296 to +306

if axis == "X":
self.minPosX = minPos
self.maxPosX = maxPos
Expand All @@ -271,7 +321,7 @@ def setup_motor(self, axis, minPos, maxPos, stepSize, backlash):
self.minPosA = minPos
self.maxPosA = maxPos
self.stepSizeA = stepSize
self.backlash[self.xyztTo1230(axis)] = backlash
self.backlash[axisIdx] = backlash

def xyztTo1230(self, axis):
axis = axis.upper()
Expand Down Expand Up @@ -504,12 +554,16 @@ def move_stepper(self, steps=(0,0,0,0), speed=(1000,1000,1000,1000), is_absolute

# Store the target position in physical units BEFORE conversion to hardware steps
targetPositionPhysical = steps.copy()

# convert from physical units to steps
steps[0] *= 1/self.stepSizeA
steps[1] *= 1/self.stepSizeX
steps[2] *= 1/self.stepSizeY
steps[3] *= 1/self.stepSizeZ

# Convert from physical (user-frame) units to firmware (hardware-frame) steps.
# Apply the per-axis direction sign so wiring polarity is hidden from callers:
# hw_steps = phys / |stepSize| * direction
# (For relative moves this flips the requested delta; for absolute targets it
# flips the absolute position the firmware should drive to.)
steps[0] = steps[0] * self.direction[0] / self.stepSizeA
steps[1] = steps[1] * self.direction[1] / self.stepSizeX
steps[2] = steps[2] * self.direction[2] / self.stepSizeY
steps[3] = steps[3] * self.direction[3] / self.stepSizeZ

# detect change in direction and compute distances in HARDWARE STEPS for travel time calculation
absoluteDistances_steps = np.zeros((4)) # Distance in hardware steps
Expand Down Expand Up @@ -617,16 +671,19 @@ def move_stepper(self, steps=(0,0,0,0), speed=(1000,1000,1000,1000), is_absolute
}
}

# Update currentPosition to track expected position in physical units
# steps are now in hardware units, so we need to convert back to physical
# Update currentPosition to track expected position in physical (user-frame) units.
# ``steps`` is in hardware/firmware frame at this point, so we convert back with
# phys = hw_steps * stepSize * direction
stepSizes = np.array((self.stepSizeA, self.stepSizeX, self.stepSizeY, self.stepSizeZ))
for iMotor in range(self.nMotors):
if isAbsoluteArray[iMotor]:
# For absolute: convert hardware steps back to physical units: (steps * stepSize)
self.currentPosition[iMotor] = steps[iMotor] * stepSizes[iMotor]
# For absolute: convert hardware steps back to physical (user) units.
self.currentPosition[iMotor] = steps[iMotor] * stepSizes[iMotor] * self.direction[iMotor]
else:
# For relative: convert step delta to physical delta and add to current position
self.currentPosition[iMotor] = self.currentPosition[iMotor] + (steps[iMotor] * stepSizes[iMotor])
# For relative: convert hw step delta to physical delta and accumulate.
self.currentPosition[iMotor] = self.currentPosition[iMotor] + (
steps[iMotor] * stepSizes[iMotor] * self.direction[iMotor]
)

# drive motor
self.isRunning = True
Expand Down Expand Up @@ -757,7 +814,13 @@ def get_position(self, axis=None, timeout=1):
if "motor" in r :
for index, istepper in enumerate(r["motor"]["steppers"]):
if index >3: break # TODO: We would need to handle other values too soon
_position[istepper["stepperid"]]=istepper["position"]*_physicalStepSizes[self.motorAxisOrder[index]]
stepperID = istepper["stepperid"]
# phys = hw_steps * stepSize * direction (apply wiring sign at boundary)
_position[stepperID] = (
istepper["position"]
* _physicalStepSizes[self.motorAxisOrder[index]]
* self.direction[stepperID]
)


return _position
Expand Down
2 changes: 1 addition & 1 deletion uc2rest/mserial.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ def sendMessage(self, command, nResponses=1, timeout = 20):
return "communication interrupted by timeout or reset: "+str(identifier) + " and code:"+str(self.commands[identifier])
with self.lock:
if identifier in self.responses:
if len(self.responses[identifier])==nResponses:
if len(self.responses[identifier])>=nResponses:
returnMessage = self.responses[identifier]
# remove the response from the list
del self.responses[identifier]
Expand Down