diff --git a/src/stitching_scanner.py b/src/stitching_scanner.py index b6edc07..7b0607d 100644 --- a/src/stitching_scanner.py +++ b/src/stitching_scanner.py @@ -54,6 +54,14 @@ class StitchState: append_count: int = 0 +@dataclass +class RowAlignmentOffset: + """Stores the alignment offset calculated at the start of each row""" + x_offset: float = 0.0 + y_offset: float = 0.0 + valid: bool = False + + class StitchingScanner: """ Slide scanner using continuous stitching. @@ -86,6 +94,10 @@ class StitchingScanner: self._displacement_since_append_x: float = 0.0 self._displacement_since_append_y: float = 0.0 + # Row alignment offset - calculated at the start of each row + self._row_alignment = RowAlignmentOffset() + self._is_first_strip_of_row: bool = True + self._thread: Optional[threading.Thread] = None def log(self, message: str): @@ -134,6 +146,79 @@ class StitchingScanner: return (dx, dy) + def _detect_row_alignment(self, current_frame: np.ndarray, direction: ScanDirection) -> RowAlignmentOffset: + """ + Detect alignment offset at the start of a new row by comparing + the current frame with the overlapping region of the mosaic. + + This compensates for backlash in control gears that causes circular + motion instead of right angles when transitioning from vertical to horizontal. + """ + offset = RowAlignmentOffset() + + with self._mosaic_lock: + if self.mosaic is None: + self.log("No mosaic for row alignment detection") + return offset + + mh, mw = self.mosaic.shape[:2] + fh, fw = current_frame.shape[:2] + + # Determine where in the mosaic we expect to overlap + # For LEFT direction: we're at the right edge of the mosaic + # For RIGHT direction: we're at the left edge (but this is row 0, shouldn't need alignment) + + if direction == ScanDirection.LEFT: + # We're starting from the right side, moving left + # The current frame should overlap with the right edge of the mosaic + # Extract the rightmost portion of the mosaic that should overlap + + # The overlap region is at the bottom-right of the mosaic + # Current Y position in mosaic + y_pos = int(self.state.current_y) + y_pos = max(0, min(y_pos, mh - fh)) + + # Extract overlap region from mosaic (right edge) + overlap_width = min(fw, mw) + mosaic_region = self.mosaic[y_pos:y_pos + fh, mw - overlap_width:mw] + frame_region = current_frame[:, :overlap_width] + + else: # RIGHT direction + # We're starting from the left side, moving right + # Current Y position in mosaic + y_pos = int(self.state.current_y) + y_pos = max(0, min(y_pos, mh - fh)) + + # Extract overlap region from mosaic (left edge) + overlap_width = min(fw, mw) + mosaic_region = self.mosaic[y_pos:y_pos + fh, :overlap_width] + frame_region = current_frame[:, :overlap_width] + + # Ensure regions have the same size + min_h = min(mosaic_region.shape[0], frame_region.shape[0]) + min_w = min(mosaic_region.shape[1], frame_region.shape[1]) + + if min_h < 50 or min_w < 50: + self.log(f"Overlap region too small for alignment: {min_w}x{min_h}") + return offset + + mosaic_region = mosaic_region[:min_h, :min_w] + frame_region = frame_region[:min_h, :min_w] + + # Detect displacement between mosaic region and current frame + dx, dy = self._detect_displacement_robust(mosaic_region, frame_region) + + offset.x_offset = dx + offset.y_offset = dy + offset.valid = True + + self.log(f"=== Row Alignment Detection ===") + self.log(f" Direction: {direction.value}") + self.log(f" Mosaic region: {mosaic_region.shape[1]}x{mosaic_region.shape[0]} at Y={y_pos}") + self.log(f" Detected offset: X={dx:.1f}, Y={dy:.1f}") + + return offset + # ========================================================================= # Mosaic Building # ========================================================================= @@ -146,6 +231,10 @@ class StitchingScanner: self._displacement_since_append_x = 0.0 self._displacement_since_append_y = 0.0 + # Reset row alignment for first row (no alignment needed) + self._row_alignment = RowAlignmentOffset() + self._is_first_strip_of_row = False # First row doesn't need alignment + with self._state_lock: h, w = frame.shape[:2] self.state.mosaic_width = w @@ -160,20 +249,32 @@ class StitchingScanner: self.log(f"Initialized mosaic: {frame.shape[1]}x{frame.shape[0]}") def _blend_horizontal_at_y(self, base: np.ndarray, strip: np.ndarray, - blend_width: int, append_right: bool, x_offset: int = None, - y_offset: int = 0) -> np.ndarray: + blend_width: int, append_right: bool, + x_offset: int = None, y_offset: int = 0, + alignment_x: float = 0.0, alignment_y: float = 0.0) -> np.ndarray: + """ + Blend strip horizontally onto base at specified Y position. + + Args: + base: The existing mosaic + strip: The new strip to append + blend_width: Width of the blending zone + append_right: True to append to right, False to append left + x_offset: X position for left-append mode + y_offset: Y position in the mosaic + alignment_x: Additional X alignment offset (from row alignment detection) + alignment_y: Additional Y alignment offset (from row alignment detection) + """ h_base, w_base = base.shape[:2] h_strip, w_strip = strip.shape[:2] - # === DEBUG MAGIC NUMBERS === - DEBUG_SHIFT_RIGHT = -20 # Positive = shift strip right - DEBUG_SHIFT_UP = 75 # Positive = shift strip up - # =========================== - blend_w = min(blend_width, w_strip, w_base) if append_right: - # Clamp y_offset for append_right (no debug shifts here) + # Apply alignment offset for Y + y_offset = y_offset + int(round(alignment_y)) + + # Clamp y_offset y_offset = max(0, min(y_offset, h_base - h_strip)) # Expand mosaic to the right @@ -183,13 +284,13 @@ class StitchingScanner: self.log(f"=== _blend_horizontal_at_y (append_right) ===") self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}") self.log(f" y_offset: {y_offset}, blend_w: {blend_w}") + self.log(f" alignment: X={alignment_x:.1f}, Y={alignment_y:.1f}") self.log(f" result: {result_width}x{h_base}") # Step 1: Copy entire base result[:, :w_base] = base # Step 2: Copy non-overlap portion of strip at correct Y - x_place = w_base self.log(f" Placing strip at X={w_base - blend_w}:{result_width}, Y={y_offset}:{y_offset + h_strip}") result[y_offset:y_offset + h_strip, w_base:] = strip[:, blend_w:] @@ -209,20 +310,19 @@ class StitchingScanner: if x_offset is None: x_offset = 0 - # Apply debug shifts - x_offset = x_offset + DEBUG_SHIFT_RIGHT - y_offset = y_offset - DEBUG_SHIFT_UP + # Apply alignment offsets (detected at start of row) + x_offset = x_offset + int(round(alignment_x)) + y_offset = y_offset - int(round(alignment_y)) # Clamp x_offset to valid range - # x_offset = max(0, min(x_offset, w_base - blend_w)) - x_offset = 0 - min(x_offset, w_base) + x_offset = max(0, min(x_offset, w_base - blend_w)) # Handle strip cropping if y_offset is negative (strip protrudes above frame) strip_y_start = 0 # How much to crop from top of strip if y_offset < 0: strip_y_start = -y_offset # Crop this many rows from top of strip y_offset = 0 - self.log(f" Cropping {strip_y_start}px from top of strip (DEBUG_SHIFT_UP={DEBUG_SHIFT_UP})") + self.log(f" Cropping {strip_y_start}px from top of strip") # Handle strip cropping if it protrudes below frame strip_y_end = h_strip @@ -241,7 +341,7 @@ class StitchingScanner: self.log(f"=== _blend_horizontal_at_y (append_left) ===") self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}") self.log(f" x_offset: {x_offset}, y_offset: {y_offset}, blend_w: {blend_w}") - self.log(f" DEBUG: shift_right={DEBUG_SHIFT_RIGHT}, shift_up={DEBUG_SHIFT_UP}") + self.log(f" alignment: X={alignment_x:.1f}, Y={alignment_y:.1f}") self.log(f" Strip crop: rows [{strip_y_start}:{strip_y_end}] -> height {h_cropped}") # Result is same size as base (no expansion when going left) @@ -376,6 +476,7 @@ class StitchingScanner: result[h_strip - blend_h:h_strip, :] = blended result[h_strip:, :] = base[blend_h:, :] return result + def _blend_vertical(self, base: np.ndarray, strip: np.ndarray, blend_height: int, append_below: bool) -> np.ndarray: mh, mw = base.shape[:2] @@ -435,6 +536,10 @@ class StitchingScanner: dx = abs(self._displacement_since_append_x) dy = abs(self._displacement_since_append_y) + # Get alignment offsets for this row + align_x = self._row_alignment.x_offset if self._row_alignment.valid else 0.0 + align_y = self._row_alignment.y_offset if self._row_alignment.valid else 0.0 + if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]: append_width = round(dx) + SAFETY_MARGIN append_width = min(append_width, w - BLEND_WIDTH - 5) @@ -452,12 +557,16 @@ class StitchingScanner: strip_start = max(0, w - append_width - BLEND_WIDTH) new_strip = frame[:, strip_start:] self.mosaic = self._blend_horizontal_at_y( - self.mosaic, new_strip, BLEND_WIDTH, append_right=True, x_offset=int(self.state.current_x), y_offset=y_offset) + self.mosaic, new_strip, BLEND_WIDTH, append_right=True, + x_offset=int(self.state.current_x), y_offset=y_offset, + alignment_x=align_x, alignment_y=align_y) else: strip_end = min(w, append_width + BLEND_WIDTH) new_strip = frame[:, :strip_end] self.mosaic = self._blend_horizontal_at_y( - self.mosaic, new_strip, BLEND_WIDTH, append_right=False, x_offset=int(self.state.current_x), y_offset=y_offset) + self.mosaic, new_strip, BLEND_WIDTH, append_right=False, + x_offset=int(self.state.current_x), y_offset=y_offset, + alignment_x=align_x, alignment_y=align_y) self._displacement_since_append_x = fractional_remainder self._displacement_since_append_y = 0.0 @@ -518,6 +627,8 @@ class StitchingScanner: self._prev_frame = None self._displacement_since_append_x = 0.0 self._displacement_since_append_y = 0.0 + self._row_alignment = RowAlignmentOffset() + self._is_first_strip_of_row = False # First row doesn't need alignment self._thread = threading.Thread(target=self._scan_loop, daemon=True) self._thread.start() @@ -572,8 +683,22 @@ class StitchingScanner: # Serpentine: even rows right, odd rows left h_direction = ScanDirection.RIGHT if row % 2 == 0 else ScanDirection.LEFT + # For rows after the first, detect alignment at the start + if row > 0: + self._is_first_strip_of_row = True + frame = self._capture_frame() + self._row_alignment = self._detect_row_alignment(frame, h_direction) + self.log(f"Row {row + 1} alignment: X={self._row_alignment.x_offset:.1f}, Y={self._row_alignment.y_offset:.1f}") + else: + # First row - no alignment needed + self._row_alignment = RowAlignmentOffset() + self._is_first_strip_of_row = False + stop_reason = self._scan_direction(h_direction) + # After first strip is appended, clear the flag + self._is_first_strip_of_row = False + if not self.running: break @@ -611,6 +736,7 @@ class StitchingScanner: frame = self._capture_frame() h, w = frame.shape[:2] total_x = 0 + # Setup based on direction if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]: threshold_pixels = w * self.config.displacement_threshold @@ -640,26 +766,23 @@ class StitchingScanner: stop_reason = 'timeout' break - if current_dim() >= max_dim and direction == ScanDirection.RIGHT: + if current_dim() >= max_dim and direction == ScanDirection.RIGHT: self.log(f"Max dimension reached ({current_dim()}px)") stop_reason = 'max_dim' break - # if current_dim() <= 0 and direction == ScanDirection.LEFT: - # self.log(f"Max dimension reached ({current_dim()}px)") - # stop_reason = 'min_dim' - # break - if self.state.current_x >= 0 and direction == ScanDirection.LEFT: + if self.state.current_x >= 0 and direction == ScanDirection.LEFT: self.log(f"Max dimension reached ({self.config.max_mosaic_width}px)") self.log(f"Current X offset ({self.state.current_x}px) total_x ({total_x}px)") stop_reason = 'max_dim' break - if abs(self.state.current_x) >= self.config.max_mosaic_width and direction == ScanDirection.RIGHT: + if abs(self.state.current_x) >= self.config.max_mosaic_width and direction == ScanDirection.RIGHT: self.log(f"Max dimension reached ({self.config.max_mosaic_width}px)") self.log(f"Current X offset ({self.state.current_x}px)") stop_reason = 'max_dim' break + # Pulse motor self.motion.send_command(start_cmd) time.sleep(self.config.movement_interval) @@ -940,6 +1063,13 @@ class StitchingScanner: self.running = True scan_dir = ScanDirection.RIGHT if direction == 'right' else ScanDirection.LEFT + + # Test row alignment detection if not first row + if self.state.current_row > 0 or self.mosaic is not None: + frame = self._capture_frame() + self._row_alignment = self._detect_row_alignment(frame, scan_dir) + self.log(f"Test row alignment: X={self._row_alignment.x_offset:.1f}, Y={self._row_alignment.y_offset:.1f}") + stop_reason = self._scan_direction(scan_dir) self.running = False @@ -954,6 +1084,39 @@ class StitchingScanner: return results + def test_row_alignment(self, direction: str = 'left') -> dict: + """Test row alignment detection without scanning.""" + results = { + 'success': False, + 'x_offset': 0.0, + 'y_offset': 0.0, + 'error': None + } + + try: + self.log(f"Testing row alignment detection ({direction})...") + + if self.mosaic is None: + self.log("No mosaic - initializing...") + frame = self._capture_frame() + self._init_mosaic(frame) + + frame = self._capture_frame() + scan_dir = ScanDirection.LEFT if direction == 'left' else ScanDirection.RIGHT + alignment = self._detect_row_alignment(frame, scan_dir) + + results['success'] = alignment.valid + results['x_offset'] = alignment.x_offset + results['y_offset'] = alignment.y_offset + + self.log(f"Alignment result: valid={alignment.valid}, X={alignment.x_offset:.1f}, Y={alignment.y_offset:.1f}") + + except Exception as e: + results['error'] = str(e) + self.log(f"Test error: {e}") + + return results + def get_memory_estimate(self) -> dict: current_bytes = self.mosaic.nbytes if self.mosaic is not None else 0 max_bytes = self.config.max_mosaic_width * self.config.max_mosaic_height * 3