""" Stitching Scanner v2 - Simplified unified approach Same displacement-based stitching for both horizontal rows and vertical row transitions. No complex visual matching - just track displacement and append strips. """ import cv2 import numpy as np import time import threading from dataclasses import dataclass from typing import Optional, Callable, Tuple from enum import Enum class ScanDirection(Enum): RIGHT = 'right' LEFT = 'left' DOWN = 'down' UP = 'up' @dataclass class StitchConfig: displacement_threshold: float = 0.10 # 10% of frame triggers append movement_interval: float = 0.001 frame_interval: float = 1.00 settle_time: float = 0.75 max_scan_time: float = 300.0 row_overlap: float = 0.15 max_mosaic_width: int = 1000 max_mosaic_height: int = 1000 scan_speed_index: int = 3 autofocus_every_row: bool = True @dataclass class StitchState: is_scanning: bool = False direction: str = '' cumulative_x: float = 0.0 cumulative_y: float = 0.0 current_x: float = 0.0 current_y: float = 0.0 last_displacement: Tuple[float, float] = (0.0, 0.0) current_row: int = 0 total_rows: int = 0 mosaic_width: int = 0 mosaic_height: int = 0 mosaic_init_width: int = 0 mosaic_init_height: int = 0 frame_count: int = 0 append_count: int = 0 class StitchingScanner: """ Slide scanner using continuous stitching. Unified approach for horizontal and vertical movement. """ def __init__(self, camera, motion_controller, autofocus_controller=None, config: StitchConfig = None, on_log: Callable[[str], None] = None, on_progress: Callable[[int, int], None] = None, on_mosaic_updated: Callable[[], None] = None): self.camera = camera self.motion = motion_controller self.autofocus = autofocus_controller self.config = config or StitchConfig() self.on_log = on_log self.on_progress = on_progress self.on_mosaic_updated = on_mosaic_updated self.running = False self.paused = False self.state = StitchState() self._state_lock = threading.Lock() self.mosaic: Optional[np.ndarray] = None self._mosaic_lock = threading.Lock() self._prev_frame: Optional[np.ndarray] = None self._displacement_since_append_x: float = 0.0 self._displacement_since_append_y: float = 0.0 self._thread: Optional[threading.Thread] = None def log(self, message: str): if self.on_log: self.on_log(f"[Stitch] {message}") print(f"[Stitch] {message}") # ========================================================================= # Displacement Detection # ========================================================================= def _to_grayscale(self, frame: np.ndarray) -> np.ndarray: if len(frame.shape) == 3: return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) return frame def _detect_displacement(self, prev_frame: np.ndarray, curr_frame: np.ndarray) -> Tuple[float, float]: prev_gray = self._to_grayscale(prev_frame) curr_gray = self._to_grayscale(curr_frame) if prev_gray.shape != curr_gray.shape: return (0.0, 0.0) prev_f = prev_gray.astype(np.float32) curr_f = curr_gray.astype(np.float32) h, w = prev_gray.shape window = cv2.createHanningWindow((w, h), cv2.CV_32F) prev_f = prev_f * window curr_f = curr_f * window shift, _ = cv2.phaseCorrelate(prev_f, curr_f) return shift def _detect_displacement_robust(self, prev_frame: np.ndarray, curr_frame: np.ndarray) -> Tuple[float, float]: dx, dy = self._detect_displacement(prev_frame, curr_frame) h, w = prev_frame.shape[:2] max_displacement = max(w, h) * 0.5 if abs(dx) > max_displacement or abs(dy) > max_displacement: self.log(f"Warning: Large displacement ({dx:.1f}, {dy:.1f}), ignoring") return (0.0, 0.0) return (dx, dy) # ========================================================================= # Mosaic Building # ========================================================================= def _init_mosaic(self, frame: np.ndarray): with self._mosaic_lock: self.mosaic = frame.copy() self._prev_frame = frame.copy() self._displacement_since_append_x = 0.0 self._displacement_since_append_y = 0.0 with self._state_lock: h, w = frame.shape[:2] self.state.mosaic_width = w self.state.mosaic_height = h self.state.mosaic_init_width = w self.state.mosaic_init_height = h self.state.frame_count = 1 self.state.append_count = 0 self.state.current_y = 0 self.state.current_x = 0 self.log(f"Initialized mosaic: {frame.shape[1]}x{frame.shape[0]}") def _blend_horizontal_at_y(self, base: np.ndarray, strip: np.ndarray, blend_width: int, append_right: bool, x_offset: int = None, y_offset: int = 0) -> np.ndarray: h_base, w_base = base.shape[:2] h_strip, w_strip = strip.shape[:2] # === DEBUG MAGIC NUMBERS === DEBUG_SHIFT_RIGHT = -15 # Positive = shift strip right DEBUG_SHIFT_UP = 5 # Positive = shift strip up # =========================== blend_w = min(blend_width, w_strip, w_base) if append_right: # Clamp y_offset for append_right (no debug shifts here) y_offset = max(0, min(y_offset, h_base - h_strip)) # Expand mosaic to the right result_width = w_base + w_strip - blend_w result = np.zeros((h_base, result_width, 3), dtype=np.uint8) self.log(f"=== _blend_horizontal_at_y (append_right) ===") self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}") self.log(f" y_offset: {y_offset}, blend_w: {blend_w}") self.log(f" result: {result_width}x{h_base}") # Step 1: Copy entire base result[:, :w_base] = base # Step 2: Copy non-overlap portion of strip at correct Y x_place = w_base self.log(f" Placing strip at X={w_base - blend_w}:{result_width}, Y={y_offset}:{y_offset + h_strip}") result[y_offset:y_offset + h_strip, w_base:] = strip[:, blend_w:] # Step 3: Create blend alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis] base_overlap = base[y_offset:y_offset + h_strip, -blend_w:].astype(np.float32) strip_overlap = strip[:, :blend_w].astype(np.float32) blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8) # Step 4: Place blend result[y_offset:y_offset + h_strip, w_base - blend_w:w_base] = blended return result else: # append_left - place at x_offset, NO width expansion # x_offset is where the LEFT edge of the strip should go if x_offset is None: x_offset = 0 # Apply debug shifts x_offset = x_offset + DEBUG_SHIFT_RIGHT y_offset = y_offset - DEBUG_SHIFT_UP # Clamp x_offset to valid range # x_offset = max(0, min(x_offset, w_base - blend_w)) x_offset = 0 - min(x_offset, w_base) # Handle strip cropping if y_offset is negative (strip protrudes above frame) strip_y_start = 0 # How much to crop from top of strip if y_offset < 0: strip_y_start = -y_offset # Crop this many rows from top of strip y_offset = 0 self.log(f" Cropping {strip_y_start}px from top of strip (DEBUG_SHIFT_UP={DEBUG_SHIFT_UP})") # Handle strip cropping if it protrudes below frame strip_y_end = h_strip if y_offset + (h_strip - strip_y_start) > h_base: strip_y_end = h_strip - ((y_offset + (h_strip - strip_y_start)) - h_base) self.log(f" Cropping bottom of strip, new strip_y_end={strip_y_end}") # Get the cropped strip if strip_y_start >= strip_y_end: self.log(f" WARNING: Strip fully cropped, nothing to place") return base.copy() cropped_strip = strip[strip_y_start:strip_y_end, :] h_cropped = cropped_strip.shape[0] self.log(f"=== _blend_horizontal_at_y (append_left) ===") self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}") self.log(f" x_offset: {x_offset}, y_offset: {y_offset}, blend_w: {blend_w}") self.log(f" DEBUG: shift_right={DEBUG_SHIFT_RIGHT}, shift_up={DEBUG_SHIFT_UP}") self.log(f" Strip crop: rows [{strip_y_start}:{strip_y_end}] -> height {h_cropped}") # Result is same size as base (no expansion when going left) result = base.copy() # Calculate placement bounds strip_x_start = x_offset strip_x_end = min(x_offset + w_strip, w_base) strip_cols_to_copy = strip_x_end - strip_x_start self.log(f" Placing strip at X={strip_x_start}:{strip_x_end}, Y={y_offset}:{y_offset + h_cropped}") self.log(f" Strip cols to copy: {strip_cols_to_copy}") # Step 1: Copy strip content (non-blend portion) at correct position # For LEFT scanning, blend is on the RIGHT side of the strip non_blend_end = strip_cols_to_copy - blend_w if non_blend_end > 0: result[y_offset:y_offset + h_cropped, strip_x_start:strip_x_start + non_blend_end] = cropped_strip[:, :non_blend_end] self.log(f" Step 1: Placed non-blend at X={strip_x_start}:{strip_x_start + non_blend_end}") # Step 2: Create blend on the RIGHT edge of strip (blending with existing content) blend_x_start = strip_x_end - blend_w blend_x_end = strip_x_end if blend_w > 0 and blend_x_start >= strip_x_start: alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis] strip_overlap = cropped_strip[:, -blend_w:].astype(np.float32) base_overlap = base[y_offset:y_offset + h_cropped, blend_x_start:blend_x_end].astype(np.float32) blended = (strip_overlap * alpha + base_overlap * (1 - alpha)).astype(np.uint8) result[y_offset:y_offset + h_cropped, blend_x_start:blend_x_end] = blended self.log(f" Step 2: Blend zone at X={blend_x_start}:{blend_x_end}") self.log(f" Final: Strip placed at X={strip_x_start}, Y={y_offset}, mosaic size unchanged: {w_base}x{h_base}") return result def _blend_horizontal(self, base: np.ndarray, strip: np.ndarray, blend_width: int, append_right: bool) -> np.ndarray: if blend_width <= 0 or blend_width >= strip.shape[1]: if append_right: return np.hstack([base, strip]) return np.hstack([strip, base]) h_base, w_base = base.shape[:2] h_strip, w_strip = strip.shape[:2] self.log(f"Base Width: {w_base}px") if h_strip != h_base: if append_right: return np.hstack([base, strip]) return np.hstack([strip, base]) blend_w = min(blend_width, w_strip, w_base) if append_right: result_width = w_base + w_strip - blend_w result = np.zeros((h_base, result_width, 3), dtype=np.uint8) result[:, :w_base] = base alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis] base_overlap = base[:, -blend_w:].astype(np.float32) strip_overlap = strip[:, :blend_w].astype(np.float32) blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8) result[:, w_base - blend_w:w_base] = blended result[:, w_base:] = strip[:, blend_w:] return result else: result_width = w_base + w_strip - blend_w result = np.zeros((h_base, result_width, 3), dtype=np.uint8) result[:, :w_strip] = strip alpha = np.linspace(0, 1, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis] strip_overlap = strip[:, -blend_w:].astype(np.float32) base_overlap = base[:, :blend_w].astype(np.float32) blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8) result[:, w_strip - blend_w:w_strip] = blended result[:, w_strip:] = base[:, blend_w:] return result def _blend_vertical_at_x(self, base: np.ndarray, strip: np.ndarray, blend_height: int, append_below: bool, x_off: int = 0) -> np.ndarray: h_base, w_base = base.shape[:2] h_strip, w_strip = strip.shape[:2] # Clamp x_offset to valid range x_offset = max(0, w_base - self.state.mosaic_init_width) # Create full-width strip with strip placed at x_offset full_strip = np.zeros((h_strip, w_base, 3), dtype=np.uint8) available_width = w_base - x_offset copy_width = min(w_strip, available_width) full_strip[:, x_offset:x_offset + copy_width] = strip[:, :copy_width] # Early exit: no blending possible if blend_height <= 0 or blend_height >= h_strip: if append_below: return np.vstack([base, full_strip]) return np.vstack([full_strip, base]) # Height mismatch shouldn't happen with full_strip, but safety check if w_strip > w_base: self.log(f"Warning: strip wider than base ({w_strip} > {w_base})") blend_h = min(blend_height, h_strip, h_base) if append_below: result_height = h_base + h_strip - blend_h result = np.zeros((result_height, w_base, 3), dtype=np.uint8) result[:h_base, :] = base alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis] base_overlap = base[-blend_h:, :].astype(np.float32) strip_overlap = full_strip[:blend_h, :].astype(np.float32) blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8) result[h_base - blend_h:h_base, :] = blended result[h_base:, :] = full_strip[blend_h:, :] return result else: result_height = h_base + h_strip - blend_h result = np.zeros((result_height, w_base, 3), dtype=np.uint8) result[:h_strip, :] = full_strip alpha = np.linspace(0, 1, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis] strip_overlap = full_strip[-blend_h:, :].astype(np.float32) base_overlap = base[:blend_h, :].astype(np.float32) blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8) result[h_strip - blend_h:h_strip, :] = blended result[h_strip:, :] = base[blend_h:, :] return result def _blend_vertical(self, base: np.ndarray, strip: np.ndarray, blend_height: int, append_below: bool) -> np.ndarray: mh, mw = base.shape[:2] sh, sw = strip.shape[:2] # Match widths if sw > mw: strip = strip[:, :mw] elif sw < mw: pad = np.zeros((sh, mw - sw, 3), dtype=np.uint8) strip = np.hstack([strip, pad]) blend_h = min(blend_height, sh, mh) if blend_h <= 0: if append_below: return np.vstack([base, strip]) return np.vstack([strip, base]) if append_below: alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis] base_overlap = base[-blend_h:].astype(np.float32) strip_overlap = strip[:blend_h].astype(np.float32) blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8) result_h = mh + sh - blend_h result = np.zeros((result_h, mw, 3), dtype=np.uint8) result[:mh - blend_h] = base[:-blend_h] result[mh - blend_h:mh] = blended result[mh:] = strip[blend_h:] return result else: alpha = np.linspace(0, 1, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis] strip_overlap = strip[-blend_h:].astype(np.float32) base_overlap = base[:blend_h].astype(np.float32) blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8) result_h = mh + sh - blend_h result = np.zeros((result_h, mw, 3), dtype=np.uint8) result[:sh - blend_h] = strip[:-blend_h] result[sh - blend_h:sh] = blended result[sh:] = base[blend_h:] return result def _append_strip(self, frame: np.ndarray, direction: ScanDirection): """Append strip to mosaic based on accumulated displacement.""" BLEND_WIDTH = 10 SAFETY_MARGIN = 2 with self._mosaic_lock: if self.mosaic is None: return h, w = frame.shape[:2] mh, mw = self.mosaic.shape[:2] dx = abs(self._displacement_since_append_x) dy = abs(self._displacement_since_append_y) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]: append_width = round(dx) + SAFETY_MARGIN append_width = min(append_width, w - BLEND_WIDTH - 5) if append_width < 1: return pixels_consumed = append_width - SAFETY_MARGIN fractional_remainder = dx - pixels_consumed # Calculate Y offset for current row y_offset = int(self.state.current_y) if direction == ScanDirection.RIGHT: strip_start = max(0, w - append_width - BLEND_WIDTH) new_strip = frame[:, strip_start:] self.mosaic = self._blend_horizontal_at_y( self.mosaic, new_strip, BLEND_WIDTH, append_right=True, x_offset=int(self.state.current_x), y_offset=y_offset) else: strip_end = min(w, append_width + BLEND_WIDTH) new_strip = frame[:, :strip_end] self.mosaic = self._blend_horizontal_at_y( self.mosaic, new_strip, BLEND_WIDTH, append_right=False, x_offset=int(self.state.current_x), y_offset=y_offset) self._displacement_since_append_x = fractional_remainder self._displacement_since_append_y = 0.0 elif direction in [ScanDirection.DOWN, ScanDirection.UP]: append_height = round(dy) + SAFETY_MARGIN append_height = min(append_height, h - BLEND_WIDTH - 5) if append_height < 1: return pixels_consumed = append_height - SAFETY_MARGIN fractional_remainder = dy - pixels_consumed if direction == ScanDirection.DOWN: strip_end = min(h, append_height + BLEND_WIDTH) new_strip = frame[:strip_end:, :] self.mosaic = self._blend_vertical_at_x( self.mosaic, new_strip, BLEND_WIDTH, append_below=False, x_off=int(self.state.current_x)) else: strip_start = max(0, h - append_height - BLEND_WIDTH) new_strip = frame[:strip_start, :] self.mosaic = self._blend_vertical_at_x( self.mosaic, new_strip, BLEND_WIDTH, append_below=True, x_off=int(self.state.current_x)) self._displacement_since_append_x = 0.0 self._displacement_since_append_y = fractional_remainder new_mh, new_mw = self.mosaic.shape[:2] with self._state_lock: self.state.mosaic_width = new_mw self.state.mosaic_height = new_mh self.state.append_count += 1 if self.on_mosaic_updated: self.on_mosaic_updated() # ========================================================================= # Scan Control # ========================================================================= def start(self) -> bool: if self.running: self.log("Already running") return False self.running = True self.paused = False with self._state_lock: self.state = StitchState() self.state.is_scanning = True with self._mosaic_lock: self.mosaic = None self._prev_frame = None self._displacement_since_append_x = 0.0 self._displacement_since_append_y = 0.0 self._thread = threading.Thread(target=self._scan_loop, daemon=True) self._thread.start() self.log("Stitching scan started") return True def stop(self): self.running = False self.paused = False self.motion.stop_all() with self._state_lock: self.state.is_scanning = False self.log("Scan stopped") def pause(self): if self.running and not self.paused: self.paused = True self.motion.stop_all() self.log("Scan paused") def resume(self): if self.running and self.paused: self.paused = False self.log("Scan resumed") # ========================================================================= # Scanning Logic # ========================================================================= def _scan_loop(self): try: self.log("Starting scan loop") self.log(f"Max dimensions: {self.config.max_mosaic_width}x{self.config.max_mosaic_height}") self.motion.set_speed(self.config.scan_speed_index) time.sleep(0.1) frame = self._capture_frame() self._init_mosaic(frame) row = 0 while self.running: with self._state_lock: self.state.current_row = row self.state.total_rows = row + 1 self.log(f"=== Row {row + 1} ===") # Serpentine: even rows right, odd rows left h_direction = ScanDirection.RIGHT if row % 2 == 0 else ScanDirection.LEFT stop_reason = self._scan_direction(h_direction) if not self.running: break # Check max height if self.state.mosaic_height >= self.config.max_mosaic_height: self.log(f"Max height reached ({self.state.mosaic_height}px)") break # Move to next row using same stitching approach if not self._move_to_next_row(): self.log("Failed to move to next row") break row += 1 self.log(f"Scan complete! Final: {self.state.mosaic_width}x{self.state.mosaic_height}") except Exception as e: self.log(f"Scan error: {e}") import traceback traceback.print_exc() finally: self.running = False self.motion.stop_all() with self._state_lock: self.state.is_scanning = False def _scan_direction(self, direction: ScanDirection) -> str: """Scan in a direction until edge or max dimension reached.""" self.log(f"Scanning {direction.value}...") with self._state_lock: self.state.direction = direction.value frame = self._capture_frame() h, w = frame.shape[:2] total_x = 0 # Setup based on direction if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]: threshold_pixels = w * self.config.displacement_threshold max_dim = self.config.max_mosaic_width current_dim = lambda: self.state.mosaic_width start_cmd = 'E' if direction == ScanDirection.RIGHT else 'W' stop_cmd = 'e' if direction == ScanDirection.RIGHT else 'w' else: threshold_pixels = h * self.config.displacement_threshold max_dim = self.config.max_mosaic_height current_dim = lambda: self.state.mosaic_height start_cmd = 'S' if direction == ScanDirection.DOWN else 'N' stop_cmd = 's' if direction == ScanDirection.DOWN else 'n' self._prev_frame = frame.copy() self._displacement_since_append_x = 0.0 self._displacement_since_append_y = 0.0 start_time = time.time() no_movement_count = 0 max_no_movement = 50 stop_reason = 'stopped' while self.running and not self.paused: if time.time() - start_time > self.config.max_scan_time: self.log("Scan timeout") stop_reason = 'timeout' break if current_dim() >= max_dim and direction == ScanDirection.RIGHT: self.log(f"Max dimension reached ({current_dim()}px)") stop_reason = 'max_dim' break # if current_dim() <= 0 and direction == ScanDirection.LEFT: # self.log(f"Max dimension reached ({current_dim()}px)") # stop_reason = 'min_dim' # break if self.state.current_x >= 0 and direction == ScanDirection.LEFT: self.log(f"Max dimension reached ({self.config.max_mosaic_width}px)") self.log(f"Current X offset ({self.state.current_x}px) total_x ({total_x}px)") stop_reason = 'max_dim' break if abs(self.state.current_x) >= self.config.max_mosaic_width and direction == ScanDirection.RIGHT: self.log(f"Max dimension reached ({self.config.max_mosaic_width}px)") self.log(f"Current X offset ({self.state.current_x}px)") stop_reason = 'max_dim' break # Pulse motor self.motion.send_command(start_cmd) time.sleep(self.config.movement_interval) self.motion.send_command(stop_cmd) time.sleep(self.config.frame_interval) curr_frame = self._capture_frame() dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame) self._displacement_since_append_x += dx self._displacement_since_append_y += dy total_x += dx with self._state_lock: self.state.current_x += dx self.log(f"Current X offset ({self.state.current_x}px)") with self._state_lock: self.state.cumulative_x = self._displacement_since_append_x self.state.cumulative_y = self._displacement_since_append_y self.state.last_displacement = (dx, dy) self.state.frame_count += 1 # Edge detection movement = abs(dx) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(dy) if movement < 1.0: no_movement_count += 1 if no_movement_count >= max_no_movement: self.log(f"Edge detected (no movement)") stop_reason = 'edge' break else: no_movement_count = 0 # Append when threshold reached disp = abs(self._displacement_since_append_x) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(self._displacement_since_append_y) if disp >= threshold_pixels: self._append_strip(curr_frame, direction) self.log(f"Appended {disp:.1f}px, mosaic: {self.state.mosaic_width}x{self.state.mosaic_height}") self._prev_frame = curr_frame.copy() if self.on_progress: self.on_progress(self.state.append_count, 0) self.motion.send_command(stop_cmd) time.sleep(self.config.settle_time) self.log(f"Direction finished: {stop_reason}") return stop_reason def _move_to_next_row(self) -> bool: """ Move down to next row using displacement-based stitching. Same approach as horizontal scanning. """ self.log("Moving to next row...") frame = self._capture_frame() h, w = frame.shape[:2] # Target: move (1 - overlap) * frame_height target_displacement = h * (1 - self.config.row_overlap) threshold_pixels = h * self.config.displacement_threshold self.log(f"Target Y: {target_displacement:.0f}px, threshold: {threshold_pixels:.0f}px") with self._state_lock: self.state.direction = 'down' self.state.cumulative_y = 0.0 self._prev_frame = frame.copy() self._displacement_since_append_x = 0.0 self._displacement_since_append_y = 0.0 total_y = 0.0 no_movement_count = 0 max_no_movement = 30 # Start moving South self.motion.send_command('S') try: while self.running: self.motion.send_command('S') time.sleep(self.config.movement_interval) self.motion.send_command('s') time.sleep(self.config.frame_interval) curr_frame = self._capture_frame() dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame) self._displacement_since_append_y += dy total_y += dy with self._state_lock: self.state.current_y += dy with self._state_lock: self.state.cumulative_y = total_y self.state.last_displacement = (dx, dy) # Edge detection if abs(dy) < 1.0: no_movement_count += 1 if no_movement_count >= max_no_movement: self.log("Edge detected during row transition") self.motion.send_command('s') time.sleep(self.config.settle_time) return False else: no_movement_count = 0 # Append strip when threshold reached if abs(self._displacement_since_append_y) >= threshold_pixels: self._append_strip(curr_frame, ScanDirection.DOWN) self.log(f" Row transition: appended, total Y: {abs(total_y):.1f}px") # Done when we've moved enough if abs(total_y) >= target_displacement: self.log(f"Row transition complete: {abs(total_y):.1f}px") with self._state_lock: self.state.current_y = 0 self.motion.send_command('s') time.sleep(self.config.settle_time) # Reset for next horizontal row frame = self._capture_frame() self._prev_frame = frame.copy() self._displacement_since_append_x = 0.0 self._displacement_since_append_y = 0.0 return True self._prev_frame = curr_frame.copy() except Exception as e: self.log(f"Row transition error: {e}") self.motion.send_command('s') return False self.motion.send_command('s') time.sleep(self.config.settle_time) return False def _capture_frame(self) -> np.ndarray: frame = self.camera.capture_frame() frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) return frame # ========================================================================= # Getters # ========================================================================= def get_state(self) -> StitchState: with self._state_lock: return StitchState( is_scanning=self.state.is_scanning, direction=self.state.direction, cumulative_x=self.state.cumulative_x, cumulative_y=self.state.cumulative_y, last_displacement=self.state.last_displacement, current_row=self.state.current_row, total_rows=self.state.total_rows, mosaic_width=self.state.mosaic_width, mosaic_height=self.state.mosaic_height, frame_count=self.state.frame_count, append_count=self.state.append_count ) def get_mosaic(self) -> Optional[np.ndarray]: with self._mosaic_lock: if self.mosaic is not None: return self.mosaic.copy() return None def get_mosaic_preview(self, max_size: int = 600) -> Optional[np.ndarray]: with self._mosaic_lock: if self.mosaic is None: return None h, w = self.mosaic.shape[:2] scale = min(max_size / w, max_size / h, 1.0) if scale < 1.0: new_w = int(w * scale) new_h = int(h * scale) return cv2.resize(self.mosaic, (new_w, new_h)) return self.mosaic.copy() def save_mosaic(self, filepath: str) -> bool: with self._mosaic_lock: if self.mosaic is None: return False cv2.imwrite(filepath, self.mosaic) self.log(f"Saved mosaic to {filepath}") return True # ========================================================================= # Testing # ========================================================================= def test_displacement(self, num_frames: int = 10) -> dict: results = {'frames': [], 'total_dx': 0.0, 'total_dy': 0.0} prev_frame = self._capture_frame() for i in range(num_frames): time.sleep(0.1) curr_frame = self._capture_frame() dx, dy = self._detect_displacement(prev_frame, curr_frame) results['frames'].append({'frame': i, 'dx': dx, 'dy': dy}) results['total_dx'] += dx results['total_dy'] += dy prev_frame = curr_frame return results def test_row_transition(self) -> dict: """Test row transition using displacement stitching.""" results = { 'success': False, 'y_moved': 0.0, 'mosaic_before': (0, 0), 'mosaic_after': (0, 0), 'error': None } try: self.log("Testing row transition...") if self.mosaic is None: frame = self._capture_frame() self._init_mosaic(frame) results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height) with self._state_lock: self.state.cumulative_y = 0.0 self.running = True success = self._move_to_next_row() self.running = False results['success'] = success results['y_moved'] = self.state.cumulative_y results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height) self.log(f"Row transition: {'SUCCESS' if success else 'FAILED'}, Y: {results['y_moved']:.1f}px") except Exception as e: results['error'] = str(e) self.log(f"Test error: {e}") self.running = False return results def test_single_row(self, direction: str = 'right') -> dict: """Test scanning a single row.""" results = { 'success': False, 'stop_reason': None, 'appends': 0, 'mosaic_before': (0, 0), 'mosaic_after': (0, 0), 'error': None } try: self.log(f"Testing single row ({direction})...") if self.mosaic is None: frame = self._capture_frame() self._init_mosaic(frame) results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height) appends_before = self.state.append_count self.motion.set_speed(self.config.scan_speed_index) time.sleep(0.1) self.running = True scan_dir = ScanDirection.RIGHT if direction == 'right' else ScanDirection.LEFT stop_reason = self._scan_direction(scan_dir) self.running = False results['success'] = True results['stop_reason'] = stop_reason results['appends'] = self.state.append_count - appends_before results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height) except Exception as e: results['error'] = str(e) self.running = False return results def get_memory_estimate(self) -> dict: current_bytes = self.mosaic.nbytes if self.mosaic is not None else 0 max_bytes = self.config.max_mosaic_width * self.config.max_mosaic_height * 3 return { 'current_size': (self.state.mosaic_width, self.state.mosaic_height), 'current_mb': current_bytes / (1024 * 1024), 'max_size': (self.config.max_mosaic_width, self.config.max_mosaic_height), 'max_mb': max_bytes / (1024 * 1024), }