Displacement checking for subsequent ropws

This commit is contained in:
2ManyProjects 2026-01-10 17:59:34 -06:00
parent 6ab292a410
commit db6e225a07

View file

@ -54,6 +54,14 @@ class StitchState:
append_count: int = 0 append_count: int = 0
@dataclass
class RowAlignmentOffset:
"""Stores the alignment offset calculated at the start of each row"""
x_offset: float = 0.0
y_offset: float = 0.0
valid: bool = False
class StitchingScanner: class StitchingScanner:
""" """
Slide scanner using continuous stitching. Slide scanner using continuous stitching.
@ -86,6 +94,10 @@ class StitchingScanner:
self._displacement_since_append_x: float = 0.0 self._displacement_since_append_x: float = 0.0
self._displacement_since_append_y: float = 0.0 self._displacement_since_append_y: float = 0.0
# Row alignment offset - calculated at the start of each row
self._row_alignment = RowAlignmentOffset()
self._is_first_strip_of_row: bool = True
self._thread: Optional[threading.Thread] = None self._thread: Optional[threading.Thread] = None
def log(self, message: str): def log(self, message: str):
@ -134,6 +146,79 @@ class StitchingScanner:
return (dx, dy) return (dx, dy)
def _detect_row_alignment(self, current_frame: np.ndarray, direction: ScanDirection) -> RowAlignmentOffset:
"""
Detect alignment offset at the start of a new row by comparing
the current frame with the overlapping region of the mosaic.
This compensates for backlash in control gears that causes circular
motion instead of right angles when transitioning from vertical to horizontal.
"""
offset = RowAlignmentOffset()
with self._mosaic_lock:
if self.mosaic is None:
self.log("No mosaic for row alignment detection")
return offset
mh, mw = self.mosaic.shape[:2]
fh, fw = current_frame.shape[:2]
# Determine where in the mosaic we expect to overlap
# For LEFT direction: we're at the right edge of the mosaic
# For RIGHT direction: we're at the left edge (but this is row 0, shouldn't need alignment)
if direction == ScanDirection.LEFT:
# We're starting from the right side, moving left
# The current frame should overlap with the right edge of the mosaic
# Extract the rightmost portion of the mosaic that should overlap
# The overlap region is at the bottom-right of the mosaic
# Current Y position in mosaic
y_pos = int(self.state.current_y)
y_pos = max(0, min(y_pos, mh - fh))
# Extract overlap region from mosaic (right edge)
overlap_width = min(fw, mw)
mosaic_region = self.mosaic[y_pos:y_pos + fh, mw - overlap_width:mw]
frame_region = current_frame[:, :overlap_width]
else: # RIGHT direction
# We're starting from the left side, moving right
# Current Y position in mosaic
y_pos = int(self.state.current_y)
y_pos = max(0, min(y_pos, mh - fh))
# Extract overlap region from mosaic (left edge)
overlap_width = min(fw, mw)
mosaic_region = self.mosaic[y_pos:y_pos + fh, :overlap_width]
frame_region = current_frame[:, :overlap_width]
# Ensure regions have the same size
min_h = min(mosaic_region.shape[0], frame_region.shape[0])
min_w = min(mosaic_region.shape[1], frame_region.shape[1])
if min_h < 50 or min_w < 50:
self.log(f"Overlap region too small for alignment: {min_w}x{min_h}")
return offset
mosaic_region = mosaic_region[:min_h, :min_w]
frame_region = frame_region[:min_h, :min_w]
# Detect displacement between mosaic region and current frame
dx, dy = self._detect_displacement_robust(mosaic_region, frame_region)
offset.x_offset = dx
offset.y_offset = dy
offset.valid = True
self.log(f"=== Row Alignment Detection ===")
self.log(f" Direction: {direction.value}")
self.log(f" Mosaic region: {mosaic_region.shape[1]}x{mosaic_region.shape[0]} at Y={y_pos}")
self.log(f" Detected offset: X={dx:.1f}, Y={dy:.1f}")
return offset
# ========================================================================= # =========================================================================
# Mosaic Building # Mosaic Building
# ========================================================================= # =========================================================================
@ -146,6 +231,10 @@ class StitchingScanner:
self._displacement_since_append_x = 0.0 self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0 self._displacement_since_append_y = 0.0
# Reset row alignment for first row (no alignment needed)
self._row_alignment = RowAlignmentOffset()
self._is_first_strip_of_row = False # First row doesn't need alignment
with self._state_lock: with self._state_lock:
h, w = frame.shape[:2] h, w = frame.shape[:2]
self.state.mosaic_width = w self.state.mosaic_width = w
@ -160,20 +249,32 @@ class StitchingScanner:
self.log(f"Initialized mosaic: {frame.shape[1]}x{frame.shape[0]}") self.log(f"Initialized mosaic: {frame.shape[1]}x{frame.shape[0]}")
def _blend_horizontal_at_y(self, base: np.ndarray, strip: np.ndarray, def _blend_horizontal_at_y(self, base: np.ndarray, strip: np.ndarray,
blend_width: int, append_right: bool, x_offset: int = None, blend_width: int, append_right: bool,
y_offset: int = 0) -> np.ndarray: x_offset: int = None, y_offset: int = 0,
alignment_x: float = 0.0, alignment_y: float = 0.0) -> np.ndarray:
"""
Blend strip horizontally onto base at specified Y position.
Args:
base: The existing mosaic
strip: The new strip to append
blend_width: Width of the blending zone
append_right: True to append to right, False to append left
x_offset: X position for left-append mode
y_offset: Y position in the mosaic
alignment_x: Additional X alignment offset (from row alignment detection)
alignment_y: Additional Y alignment offset (from row alignment detection)
"""
h_base, w_base = base.shape[:2] h_base, w_base = base.shape[:2]
h_strip, w_strip = strip.shape[:2] h_strip, w_strip = strip.shape[:2]
# === DEBUG MAGIC NUMBERS ===
DEBUG_SHIFT_RIGHT = -20 # Positive = shift strip right
DEBUG_SHIFT_UP = 75 # Positive = shift strip up
# ===========================
blend_w = min(blend_width, w_strip, w_base) blend_w = min(blend_width, w_strip, w_base)
if append_right: if append_right:
# Clamp y_offset for append_right (no debug shifts here) # Apply alignment offset for Y
y_offset = y_offset + int(round(alignment_y))
# Clamp y_offset
y_offset = max(0, min(y_offset, h_base - h_strip)) y_offset = max(0, min(y_offset, h_base - h_strip))
# Expand mosaic to the right # Expand mosaic to the right
@ -183,13 +284,13 @@ class StitchingScanner:
self.log(f"=== _blend_horizontal_at_y (append_right) ===") self.log(f"=== _blend_horizontal_at_y (append_right) ===")
self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}") self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}")
self.log(f" y_offset: {y_offset}, blend_w: {blend_w}") self.log(f" y_offset: {y_offset}, blend_w: {blend_w}")
self.log(f" alignment: X={alignment_x:.1f}, Y={alignment_y:.1f}")
self.log(f" result: {result_width}x{h_base}") self.log(f" result: {result_width}x{h_base}")
# Step 1: Copy entire base # Step 1: Copy entire base
result[:, :w_base] = base result[:, :w_base] = base
# Step 2: Copy non-overlap portion of strip at correct Y # Step 2: Copy non-overlap portion of strip at correct Y
x_place = w_base
self.log(f" Placing strip at X={w_base - blend_w}:{result_width}, Y={y_offset}:{y_offset + h_strip}") self.log(f" Placing strip at X={w_base - blend_w}:{result_width}, Y={y_offset}:{y_offset + h_strip}")
result[y_offset:y_offset + h_strip, w_base:] = strip[:, blend_w:] result[y_offset:y_offset + h_strip, w_base:] = strip[:, blend_w:]
@ -209,20 +310,19 @@ class StitchingScanner:
if x_offset is None: if x_offset is None:
x_offset = 0 x_offset = 0
# Apply debug shifts # Apply alignment offsets (detected at start of row)
x_offset = x_offset + DEBUG_SHIFT_RIGHT x_offset = x_offset + int(round(alignment_x))
y_offset = y_offset - DEBUG_SHIFT_UP y_offset = y_offset - int(round(alignment_y))
# Clamp x_offset to valid range # Clamp x_offset to valid range
# x_offset = max(0, min(x_offset, w_base - blend_w)) x_offset = max(0, min(x_offset, w_base - blend_w))
x_offset = 0 - min(x_offset, w_base)
# Handle strip cropping if y_offset is negative (strip protrudes above frame) # Handle strip cropping if y_offset is negative (strip protrudes above frame)
strip_y_start = 0 # How much to crop from top of strip strip_y_start = 0 # How much to crop from top of strip
if y_offset < 0: if y_offset < 0:
strip_y_start = -y_offset # Crop this many rows from top of strip strip_y_start = -y_offset # Crop this many rows from top of strip
y_offset = 0 y_offset = 0
self.log(f" Cropping {strip_y_start}px from top of strip (DEBUG_SHIFT_UP={DEBUG_SHIFT_UP})") self.log(f" Cropping {strip_y_start}px from top of strip")
# Handle strip cropping if it protrudes below frame # Handle strip cropping if it protrudes below frame
strip_y_end = h_strip strip_y_end = h_strip
@ -241,7 +341,7 @@ class StitchingScanner:
self.log(f"=== _blend_horizontal_at_y (append_left) ===") self.log(f"=== _blend_horizontal_at_y (append_left) ===")
self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}") self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}")
self.log(f" x_offset: {x_offset}, y_offset: {y_offset}, blend_w: {blend_w}") self.log(f" x_offset: {x_offset}, y_offset: {y_offset}, blend_w: {blend_w}")
self.log(f" DEBUG: shift_right={DEBUG_SHIFT_RIGHT}, shift_up={DEBUG_SHIFT_UP}") self.log(f" alignment: X={alignment_x:.1f}, Y={alignment_y:.1f}")
self.log(f" Strip crop: rows [{strip_y_start}:{strip_y_end}] -> height {h_cropped}") self.log(f" Strip crop: rows [{strip_y_start}:{strip_y_end}] -> height {h_cropped}")
# Result is same size as base (no expansion when going left) # Result is same size as base (no expansion when going left)
@ -376,6 +476,7 @@ class StitchingScanner:
result[h_strip - blend_h:h_strip, :] = blended result[h_strip - blend_h:h_strip, :] = blended
result[h_strip:, :] = base[blend_h:, :] result[h_strip:, :] = base[blend_h:, :]
return result return result
def _blend_vertical(self, base: np.ndarray, strip: np.ndarray, def _blend_vertical(self, base: np.ndarray, strip: np.ndarray,
blend_height: int, append_below: bool) -> np.ndarray: blend_height: int, append_below: bool) -> np.ndarray:
mh, mw = base.shape[:2] mh, mw = base.shape[:2]
@ -435,6 +536,10 @@ class StitchingScanner:
dx = abs(self._displacement_since_append_x) dx = abs(self._displacement_since_append_x)
dy = abs(self._displacement_since_append_y) dy = abs(self._displacement_since_append_y)
# Get alignment offsets for this row
align_x = self._row_alignment.x_offset if self._row_alignment.valid else 0.0
align_y = self._row_alignment.y_offset if self._row_alignment.valid else 0.0
if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]: if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
append_width = round(dx) + SAFETY_MARGIN append_width = round(dx) + SAFETY_MARGIN
append_width = min(append_width, w - BLEND_WIDTH - 5) append_width = min(append_width, w - BLEND_WIDTH - 5)
@ -452,12 +557,16 @@ class StitchingScanner:
strip_start = max(0, w - append_width - BLEND_WIDTH) strip_start = max(0, w - append_width - BLEND_WIDTH)
new_strip = frame[:, strip_start:] new_strip = frame[:, strip_start:]
self.mosaic = self._blend_horizontal_at_y( self.mosaic = self._blend_horizontal_at_y(
self.mosaic, new_strip, BLEND_WIDTH, append_right=True, x_offset=int(self.state.current_x), y_offset=y_offset) self.mosaic, new_strip, BLEND_WIDTH, append_right=True,
x_offset=int(self.state.current_x), y_offset=y_offset,
alignment_x=align_x, alignment_y=align_y)
else: else:
strip_end = min(w, append_width + BLEND_WIDTH) strip_end = min(w, append_width + BLEND_WIDTH)
new_strip = frame[:, :strip_end] new_strip = frame[:, :strip_end]
self.mosaic = self._blend_horizontal_at_y( self.mosaic = self._blend_horizontal_at_y(
self.mosaic, new_strip, BLEND_WIDTH, append_right=False, x_offset=int(self.state.current_x), y_offset=y_offset) self.mosaic, new_strip, BLEND_WIDTH, append_right=False,
x_offset=int(self.state.current_x), y_offset=y_offset,
alignment_x=align_x, alignment_y=align_y)
self._displacement_since_append_x = fractional_remainder self._displacement_since_append_x = fractional_remainder
self._displacement_since_append_y = 0.0 self._displacement_since_append_y = 0.0
@ -518,6 +627,8 @@ class StitchingScanner:
self._prev_frame = None self._prev_frame = None
self._displacement_since_append_x = 0.0 self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0 self._displacement_since_append_y = 0.0
self._row_alignment = RowAlignmentOffset()
self._is_first_strip_of_row = False # First row doesn't need alignment
self._thread = threading.Thread(target=self._scan_loop, daemon=True) self._thread = threading.Thread(target=self._scan_loop, daemon=True)
self._thread.start() self._thread.start()
@ -572,8 +683,22 @@ class StitchingScanner:
# Serpentine: even rows right, odd rows left # Serpentine: even rows right, odd rows left
h_direction = ScanDirection.RIGHT if row % 2 == 0 else ScanDirection.LEFT h_direction = ScanDirection.RIGHT if row % 2 == 0 else ScanDirection.LEFT
# For rows after the first, detect alignment at the start
if row > 0:
self._is_first_strip_of_row = True
frame = self._capture_frame()
self._row_alignment = self._detect_row_alignment(frame, h_direction)
self.log(f"Row {row + 1} alignment: X={self._row_alignment.x_offset:.1f}, Y={self._row_alignment.y_offset:.1f}")
else:
# First row - no alignment needed
self._row_alignment = RowAlignmentOffset()
self._is_first_strip_of_row = False
stop_reason = self._scan_direction(h_direction) stop_reason = self._scan_direction(h_direction)
# After first strip is appended, clear the flag
self._is_first_strip_of_row = False
if not self.running: if not self.running:
break break
@ -611,6 +736,7 @@ class StitchingScanner:
frame = self._capture_frame() frame = self._capture_frame()
h, w = frame.shape[:2] h, w = frame.shape[:2]
total_x = 0 total_x = 0
# Setup based on direction # Setup based on direction
if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]: if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
threshold_pixels = w * self.config.displacement_threshold threshold_pixels = w * self.config.displacement_threshold
@ -644,10 +770,6 @@ class StitchingScanner:
self.log(f"Max dimension reached ({current_dim()}px)") self.log(f"Max dimension reached ({current_dim()}px)")
stop_reason = 'max_dim' stop_reason = 'max_dim'
break break
# if current_dim() <= 0 and direction == ScanDirection.LEFT:
# self.log(f"Max dimension reached ({current_dim()}px)")
# stop_reason = 'min_dim'
# break
if self.state.current_x >= 0 and direction == ScanDirection.LEFT: if self.state.current_x >= 0 and direction == ScanDirection.LEFT:
self.log(f"Max dimension reached ({self.config.max_mosaic_width}px)") self.log(f"Max dimension reached ({self.config.max_mosaic_width}px)")
@ -660,6 +782,7 @@ class StitchingScanner:
self.log(f"Current X offset ({self.state.current_x}px)") self.log(f"Current X offset ({self.state.current_x}px)")
stop_reason = 'max_dim' stop_reason = 'max_dim'
break break
# Pulse motor # Pulse motor
self.motion.send_command(start_cmd) self.motion.send_command(start_cmd)
time.sleep(self.config.movement_interval) time.sleep(self.config.movement_interval)
@ -940,6 +1063,13 @@ class StitchingScanner:
self.running = True self.running = True
scan_dir = ScanDirection.RIGHT if direction == 'right' else ScanDirection.LEFT scan_dir = ScanDirection.RIGHT if direction == 'right' else ScanDirection.LEFT
# Test row alignment detection if not first row
if self.state.current_row > 0 or self.mosaic is not None:
frame = self._capture_frame()
self._row_alignment = self._detect_row_alignment(frame, scan_dir)
self.log(f"Test row alignment: X={self._row_alignment.x_offset:.1f}, Y={self._row_alignment.y_offset:.1f}")
stop_reason = self._scan_direction(scan_dir) stop_reason = self._scan_direction(scan_dir)
self.running = False self.running = False
@ -954,6 +1084,39 @@ class StitchingScanner:
return results return results
def test_row_alignment(self, direction: str = 'left') -> dict:
"""Test row alignment detection without scanning."""
results = {
'success': False,
'x_offset': 0.0,
'y_offset': 0.0,
'error': None
}
try:
self.log(f"Testing row alignment detection ({direction})...")
if self.mosaic is None:
self.log("No mosaic - initializing...")
frame = self._capture_frame()
self._init_mosaic(frame)
frame = self._capture_frame()
scan_dir = ScanDirection.LEFT if direction == 'left' else ScanDirection.RIGHT
alignment = self._detect_row_alignment(frame, scan_dir)
results['success'] = alignment.valid
results['x_offset'] = alignment.x_offset
results['y_offset'] = alignment.y_offset
self.log(f"Alignment result: valid={alignment.valid}, X={alignment.x_offset:.1f}, Y={alignment.y_offset:.1f}")
except Exception as e:
results['error'] = str(e)
self.log(f"Test error: {e}")
return results
def get_memory_estimate(self) -> dict: def get_memory_estimate(self) -> dict:
current_bytes = self.mosaic.nbytes if self.mosaic is not None else 0 current_bytes = self.mosaic.nbytes if self.mosaic is not None else 0
max_bytes = self.config.max_mosaic_width * self.config.max_mosaic_height * 3 max_bytes = self.config.max_mosaic_width * self.config.max_mosaic_height * 3