""" Stitching Scanner v2 - Simplified unified approach Same displacement-based stitching for both horizontal rows and vertical row transitions. No complex visual matching - just track displacement and append strips. """ import cv2 import numpy as np import time import threading from dataclasses import dataclass from typing import Optional, Callable, Tuple from enum import Enum class ScanDirection(Enum): RIGHT = 'right' LEFT = 'left' DOWN = 'down' UP = 'up' @dataclass class StitchConfig: displacement_threshold: float = 0.10 # 10% of frame triggers append movement_interval: float = 0.001 frame_interval: float = 1.00 settle_time: float = 0.75 max_scan_time: float = 300.0 row_overlap: float = 0.15 max_mosaic_width: int = 1000 max_mosaic_height: int = 1000 scan_speed_index: int = 3 autofocus_every_row: bool = True @dataclass class StitchState: is_scanning: bool = False direction: str = '' cumulative_x: float = 0.0 cumulative_y: float = 0.0 current_x: float = 0.0 current_y: float = 0.0 last_displacement: Tuple[float, float] = (0.0, 0.0) current_row: int = 0 total_rows: int = 0 mosaic_width: int = 0 mosaic_height: int = 0 mosaic_init_width: int = 0 mosaic_init_height: int = 0 frame_count: int = 0 append_count: int = 0 class StitchingScanner: """ Slide scanner using continuous stitching. Unified approach for horizontal and vertical movement. """ def __init__(self, camera, motion_controller, autofocus_controller=None, config: StitchConfig = None, on_log: Callable[[str], None] = None, on_progress: Callable[[int, int], None] = None, on_mosaic_updated: Callable[[], None] = None): self.camera = camera self.motion = motion_controller self.autofocus = autofocus_controller self.config = config or StitchConfig() self.on_log = on_log self.on_progress = on_progress self.on_mosaic_updated = on_mosaic_updated self.running = False self.paused = False self.state = StitchState() self._state_lock = threading.Lock() self.mosaic: Optional[np.ndarray] = None self._mosaic_lock = threading.Lock() self._prev_frame: Optional[np.ndarray] = None self._displacement_since_append_x: float = 0.0 self._displacement_since_append_y: float = 0.0 self._thread: Optional[threading.Thread] = None def log(self, message: str): if self.on_log: self.on_log(f"[Stitch] {message}") print(f"[Stitch] {message}") # ========================================================================= # Displacement Detection # ========================================================================= def _to_grayscale(self, frame: np.ndarray) -> np.ndarray: if len(frame.shape) == 3: return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) return frame def _detect_displacement(self, prev_frame: np.ndarray, curr_frame: np.ndarray) -> Tuple[float, float]: prev_gray = self._to_grayscale(prev_frame) curr_gray = self._to_grayscale(curr_frame) if prev_gray.shape != curr_gray.shape: return (0.0, 0.0) prev_f = prev_gray.astype(np.float32) curr_f = curr_gray.astype(np.float32) h, w = prev_gray.shape window = cv2.createHanningWindow((w, h), cv2.CV_32F) prev_f = prev_f * window curr_f = curr_f * window shift, _ = cv2.phaseCorrelate(prev_f, curr_f) return shift def _detect_displacement_robust(self, prev_frame: np.ndarray, curr_frame: np.ndarray) -> Tuple[float, float]: dx, dy = self._detect_displacement(prev_frame, curr_frame) h, w = prev_frame.shape[:2] max_displacement = max(w, h) * 0.5 if abs(dx) > max_displacement or abs(dy) > max_displacement: self.log(f"Warning: Large displacement ({dx:.1f}, {dy:.1f}), ignoring") return (0.0, 0.0) return (dx, dy) # ========================================================================= # Mosaic Building # ========================================================================= def _init_mosaic(self, frame: np.ndarray): with self._mosaic_lock: self.mosaic = frame.copy() self._prev_frame = frame.copy() self._displacement_since_append_x = 0.0 self._displacement_since_append_y = 0.0 with self._state_lock: h, w = frame.shape[:2] self.state.mosaic_width = w self.state.mosaic_height = h self.state.mosaic_init_width = w self.state.mosaic_init_height = h self.state.frame_count = 1 self.state.append_count = 0 self.state.current_y = 0 self.state.current_x = 0 self.log(f"Initialized mosaic: {frame.shape[1]}x{frame.shape[0]}") def _blend_horizontal_at_y(self, base: np.ndarray, strip: np.ndarray, blend_width: int, append_right: bool, y_offset: int = 0) -> np.ndarray: """Blend strip horizontally at a specific Y position in the mosaic.""" h_base, w_base = base.shape[:2] h_strip, w_strip = strip.shape[:2] # Clamp y_offset to valid range y_offset = max(0, min(y_offset, h_base - h_strip)) # Early exit: no blending possible if blend_width <= 0 or blend_width >= w_strip: if append_right: # Create result with expanded width result_width = w_base + w_strip result = np.zeros((h_base, result_width, 3), dtype=np.uint8) result[:, :w_base] = base result[y_offset:y_offset + h_strip, w_base:] = strip return result else: result_width = w_base + w_strip result = np.zeros((h_base, result_width, 3), dtype=np.uint8) result[y_offset:y_offset + h_strip, :w_strip] = strip result[:, w_strip:] = base return result blend_w = min(blend_width, w_strip, w_base) if append_right: result_width = w_base + w_strip - blend_w result = np.zeros((h_base, result_width, 3), dtype=np.uint8) result[:, :w_base] = base # Extract overlaps at the correct Y position alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis] base_overlap = base[y_offset:y_offset + h_strip, -blend_w:].astype(np.float32) strip_overlap = strip[:, :blend_w].astype(np.float32) blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8) # Place blended region and remainder at correct Y result[y_offset:y_offset + h_strip, w_base - blend_w:w_base] = blended result[y_offset:y_offset + h_strip, w_base:] = strip[:, blend_w:] return result else: # append_left result_width = w_base + w_strip - blend_w result = np.zeros((h_base, result_width, 3), dtype=np.uint8) # Place strip at correct Y position result[y_offset:y_offset + h_strip, :w_strip] = strip # Copy base (shifted right) result[:, w_strip:] = base[:, blend_w:] # Extract overlaps at correct Y position alpha = np.linspace(0, 1, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis] strip_overlap = strip[:, -blend_w:].astype(np.float32) base_overlap = base[y_offset:y_offset + h_strip, :blend_w].astype(np.float32) blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8) result[y_offset:y_offset + h_strip, w_strip - blend_w:w_strip] = blended return result def _blend_horizontal(self, base: np.ndarray, strip: np.ndarray, blend_width: int, append_right: bool) -> np.ndarray: if blend_width <= 0 or blend_width >= strip.shape[1]: if append_right: return np.hstack([base, strip]) return np.hstack([strip, base]) h_base, w_base = base.shape[:2] h_strip, w_strip = strip.shape[:2] self.log(f"Base Width: {w_base}px") if h_strip != h_base: if append_right: return np.hstack([base, strip]) return np.hstack([strip, base]) blend_w = min(blend_width, w_strip, w_base) if append_right: result_width = w_base + w_strip - blend_w result = np.zeros((h_base, result_width, 3), dtype=np.uint8) result[:, :w_base] = base alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis] base_overlap = base[:, -blend_w:].astype(np.float32) strip_overlap = strip[:, :blend_w].astype(np.float32) blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8) result[:, w_base - blend_w:w_base] = blended result[:, w_base:] = strip[:, blend_w:] return result else: result_width = w_base + w_strip - blend_w result = np.zeros((h_base, result_width, 3), dtype=np.uint8) result[:, :w_strip] = strip alpha = np.linspace(0, 1, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis] strip_overlap = strip[:, -blend_w:].astype(np.float32) base_overlap = base[:, :blend_w].astype(np.float32) blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8) result[:, w_strip - blend_w:w_strip] = blended result[:, w_strip:] = base[:, blend_w:] return result def _blend_vertical_at_x(self, base: np.ndarray, strip: np.ndarray, blend_height: int, append_below: bool, x_off: int = 0) -> np.ndarray: h_base, w_base = base.shape[:2] h_strip, w_strip = strip.shape[:2] # Clamp x_offset to valid range x_offset = max(0, w_base - self.state.mosaic_init_width) # Create full-width strip with strip placed at x_offset full_strip = np.zeros((h_strip, w_base, 3), dtype=np.uint8) available_width = w_base - x_offset copy_width = min(w_strip, available_width) full_strip[:, x_offset:x_offset + copy_width] = strip[:, :copy_width] # Early exit: no blending possible if blend_height <= 0 or blend_height >= h_strip: if append_below: return np.vstack([base, full_strip]) return np.vstack([full_strip, base]) # Height mismatch shouldn't happen with full_strip, but safety check if w_strip > w_base: self.log(f"Warning: strip wider than base ({w_strip} > {w_base})") blend_h = min(blend_height, h_strip, h_base) if append_below: result_height = h_base + h_strip - blend_h result = np.zeros((result_height, w_base, 3), dtype=np.uint8) result[:h_base, :] = base alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis] base_overlap = base[-blend_h:, :].astype(np.float32) strip_overlap = full_strip[:blend_h, :].astype(np.float32) blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8) result[h_base - blend_h:h_base, :] = blended result[h_base:, :] = full_strip[blend_h:, :] return result else: result_height = h_base + h_strip - blend_h result = np.zeros((result_height, w_base, 3), dtype=np.uint8) result[:h_strip, :] = full_strip alpha = np.linspace(0, 1, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis] strip_overlap = full_strip[-blend_h:, :].astype(np.float32) base_overlap = base[:blend_h, :].astype(np.float32) blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8) result[h_strip - blend_h:h_strip, :] = blended result[h_strip:, :] = base[blend_h:, :] return result def _blend_vertical(self, base: np.ndarray, strip: np.ndarray, blend_height: int, append_below: bool) -> np.ndarray: mh, mw = base.shape[:2] sh, sw = strip.shape[:2] # Match widths if sw > mw: strip = strip[:, :mw] elif sw < mw: pad = np.zeros((sh, mw - sw, 3), dtype=np.uint8) strip = np.hstack([strip, pad]) blend_h = min(blend_height, sh, mh) if blend_h <= 0: if append_below: return np.vstack([base, strip]) return np.vstack([strip, base]) if append_below: alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis] base_overlap = base[-blend_h:].astype(np.float32) strip_overlap = strip[:blend_h].astype(np.float32) blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8) result_h = mh + sh - blend_h result = np.zeros((result_h, mw, 3), dtype=np.uint8) result[:mh - blend_h] = base[:-blend_h] result[mh - blend_h:mh] = blended result[mh:] = strip[blend_h:] return result else: alpha = np.linspace(0, 1, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis] strip_overlap = strip[-blend_h:].astype(np.float32) base_overlap = base[:blend_h].astype(np.float32) blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8) result_h = mh + sh - blend_h result = np.zeros((result_h, mw, 3), dtype=np.uint8) result[:sh - blend_h] = strip[:-blend_h] result[sh - blend_h:sh] = blended result[sh:] = base[blend_h:] return result def _append_strip(self, frame: np.ndarray, direction: ScanDirection): """Append strip to mosaic based on accumulated displacement.""" BLEND_WIDTH = 10 SAFETY_MARGIN = 2 with self._mosaic_lock: if self.mosaic is None: return h, w = frame.shape[:2] mh, mw = self.mosaic.shape[:2] dx = abs(self._displacement_since_append_x) dy = abs(self._displacement_since_append_y) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]: append_width = round(dx) + SAFETY_MARGIN append_width = min(append_width, w - BLEND_WIDTH - 5) if append_width < 1: return pixels_consumed = append_width - SAFETY_MARGIN fractional_remainder = dx - pixels_consumed # Calculate Y offset for current row y_offset = int(self.state.current_y) if direction == ScanDirection.RIGHT: strip_start = max(0, w - append_width - BLEND_WIDTH) new_strip = frame[:, strip_start:] self.mosaic = self._blend_horizontal_at_y( self.mosaic, new_strip, BLEND_WIDTH, append_right=True, y_offset=y_offset) else: strip_end = min(w, append_width + BLEND_WIDTH) new_strip = frame[:, :strip_end] self.mosaic = self._blend_horizontal_at_y( self.mosaic, new_strip, BLEND_WIDTH, append_right=False, y_offset=y_offset) self._displacement_since_append_x = fractional_remainder self._displacement_since_append_y = 0.0 elif direction in [ScanDirection.DOWN, ScanDirection.UP]: append_height = round(dy) + SAFETY_MARGIN append_height = min(append_height, h - BLEND_WIDTH - 5) if append_height < 1: return pixels_consumed = append_height - SAFETY_MARGIN fractional_remainder = dy - pixels_consumed if direction == ScanDirection.DOWN: strip_end = min(h, append_height + BLEND_WIDTH) new_strip = frame[:strip_end:, :] self.mosaic = self._blend_vertical_at_x( self.mosaic, new_strip, BLEND_WIDTH, append_below=False, x_off=int(self.state.current_x)) else: strip_start = max(0, h - append_height - BLEND_WIDTH) new_strip = frame[:strip_start, :] self.mosaic = self._blend_vertical_at_x( self.mosaic, new_strip, BLEND_WIDTH, append_below=True, x_off=int(self.state.current_x)) self._displacement_since_append_x = 0.0 self._displacement_since_append_y = fractional_remainder new_mh, new_mw = self.mosaic.shape[:2] with self._state_lock: self.state.mosaic_width = new_mw self.state.mosaic_height = new_mh self.state.append_count += 1 if self.on_mosaic_updated: self.on_mosaic_updated() # ========================================================================= # Scan Control # ========================================================================= def start(self) -> bool: if self.running: self.log("Already running") return False self.running = True self.paused = False with self._state_lock: self.state = StitchState() self.state.is_scanning = True with self._mosaic_lock: self.mosaic = None self._prev_frame = None self._displacement_since_append_x = 0.0 self._displacement_since_append_y = 0.0 self._thread = threading.Thread(target=self._scan_loop, daemon=True) self._thread.start() self.log("Stitching scan started") return True def stop(self): self.running = False self.paused = False self.motion.stop_all() with self._state_lock: self.state.is_scanning = False self.log("Scan stopped") def pause(self): if self.running and not self.paused: self.paused = True self.motion.stop_all() self.log("Scan paused") def resume(self): if self.running and self.paused: self.paused = False self.log("Scan resumed") # ========================================================================= # Scanning Logic # ========================================================================= def _scan_loop(self): try: self.log("Starting scan loop") self.log(f"Max dimensions: {self.config.max_mosaic_width}x{self.config.max_mosaic_height}") self.motion.set_speed(self.config.scan_speed_index) time.sleep(0.1) frame = self._capture_frame() self._init_mosaic(frame) row = 0 while self.running: with self._state_lock: self.state.current_row = row self.state.total_rows = row + 1 self.log(f"=== Row {row + 1} ===") # Serpentine: even rows right, odd rows left h_direction = ScanDirection.RIGHT if row % 2 == 0 else ScanDirection.LEFT stop_reason = self._scan_direction(h_direction) if not self.running: break # Check max height if self.state.mosaic_height >= self.config.max_mosaic_height: self.log(f"Max height reached ({self.state.mosaic_height}px)") break # Move to next row using same stitching approach if not self._move_to_next_row(): self.log("Failed to move to next row") break row += 1 self.log(f"Scan complete! Final: {self.state.mosaic_width}x{self.state.mosaic_height}") except Exception as e: self.log(f"Scan error: {e}") import traceback traceback.print_exc() finally: self.running = False self.motion.stop_all() with self._state_lock: self.state.is_scanning = False def _scan_direction(self, direction: ScanDirection) -> str: """Scan in a direction until edge or max dimension reached.""" self.log(f"Scanning {direction.value}...") with self._state_lock: self.state.direction = direction.value frame = self._capture_frame() h, w = frame.shape[:2] total_x = 0 # Setup based on direction if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]: threshold_pixels = w * self.config.displacement_threshold max_dim = self.config.max_mosaic_width current_dim = lambda: self.state.mosaic_width start_cmd = 'E' if direction == ScanDirection.RIGHT else 'W' stop_cmd = 'e' if direction == ScanDirection.RIGHT else 'w' else: threshold_pixels = h * self.config.displacement_threshold max_dim = self.config.max_mosaic_height current_dim = lambda: self.state.mosaic_height start_cmd = 'S' if direction == ScanDirection.DOWN else 'N' stop_cmd = 's' if direction == ScanDirection.DOWN else 'n' self._prev_frame = frame.copy() self._displacement_since_append_x = 0.0 self._displacement_since_append_y = 0.0 start_time = time.time() no_movement_count = 0 max_no_movement = 50 stop_reason = 'stopped' while self.running and not self.paused: if time.time() - start_time > self.config.max_scan_time: self.log("Scan timeout") stop_reason = 'timeout' break if current_dim() >= max_dim and direction == ScanDirection.RIGHT: self.log(f"Max dimension reached ({current_dim()}px)") stop_reason = 'max_dim' break # if current_dim() <= 0 and direction == ScanDirection.LEFT: # self.log(f"Max dimension reached ({current_dim()}px)") # stop_reason = 'min_dim' # break if self.state.current_x >= 0 and direction == ScanDirection.LEFT: self.log(f"Max dimension reached ({self.config.max_mosaic_width}px)") self.log(f"Current X offset ({self.state.current_x}px) total_x ({total_x}px)") stop_reason = 'max_dim' break if abs(self.state.current_x) >= self.config.max_mosaic_width and direction == ScanDirection.RIGHT: self.log(f"Max dimension reached ({self.config.max_mosaic_width}px)") self.log(f"Current X offset ({self.state.current_x}px)") stop_reason = 'max_dim' break # Pulse motor self.motion.send_command(start_cmd) time.sleep(self.config.movement_interval) self.motion.send_command(stop_cmd) time.sleep(self.config.frame_interval) curr_frame = self._capture_frame() dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame) self._displacement_since_append_x += dx self._displacement_since_append_y += dy total_x += dx self.state.current_x += dx with self._state_lock: self.state.cumulative_x = self._displacement_since_append_x self.state.cumulative_y = self._displacement_since_append_y self.state.last_displacement = (dx, dy) self.state.frame_count += 1 # Edge detection movement = abs(dx) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(dy) if movement < 1.0: no_movement_count += 1 if no_movement_count >= max_no_movement: self.log(f"Edge detected (no movement)") stop_reason = 'edge' break else: no_movement_count = 0 # Append when threshold reached disp = abs(self._displacement_since_append_x) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(self._displacement_since_append_y) if disp >= threshold_pixels: self._append_strip(curr_frame, direction) self.log(f"Appended {disp:.1f}px, mosaic: {self.state.mosaic_width}x{self.state.mosaic_height}") self._prev_frame = curr_frame.copy() if self.on_progress: self.on_progress(self.state.append_count, 0) self.motion.send_command(stop_cmd) time.sleep(self.config.settle_time) self.log(f"Direction finished: {stop_reason}") return stop_reason def _move_to_next_row(self) -> bool: """ Move down to next row using displacement-based stitching. Same approach as horizontal scanning. """ self.log("Moving to next row...") frame = self._capture_frame() h, w = frame.shape[:2] # Target: move (1 - overlap) * frame_height target_displacement = h * (1 - self.config.row_overlap) threshold_pixels = h * self.config.displacement_threshold self.log(f"Target Y: {target_displacement:.0f}px, threshold: {threshold_pixels:.0f}px") with self._state_lock: self.state.direction = 'down' self.state.cumulative_y = 0.0 self._prev_frame = frame.copy() self._displacement_since_append_x = 0.0 self._displacement_since_append_y = 0.0 total_y = 0.0 no_movement_count = 0 max_no_movement = 30 # Start moving South self.motion.send_command('S') try: while self.running: self.motion.send_command('S') time.sleep(self.config.movement_interval) self.motion.send_command('s') time.sleep(self.config.frame_interval) curr_frame = self._capture_frame() dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame) self._displacement_since_append_y += dy total_y += dy self.state.current_y += dy with self._state_lock: self.state.cumulative_y = total_y self.state.last_displacement = (dx, dy) # Edge detection if abs(dy) < 1.0: no_movement_count += 1 if no_movement_count >= max_no_movement: self.log("Edge detected during row transition") self.motion.send_command('s') time.sleep(self.config.settle_time) return False else: no_movement_count = 0 # Append strip when threshold reached if abs(self._displacement_since_append_y) >= threshold_pixels: self._append_strip(curr_frame, ScanDirection.DOWN) self.log(f" Row transition: appended, total Y: {abs(total_y):.1f}px") # Done when we've moved enough if abs(total_y) >= target_displacement: self.log(f"Row transition complete: {abs(total_y):.1f}px") self.state.current_y = 0 self.motion.send_command('s') time.sleep(self.config.settle_time) # Reset for next horizontal row frame = self._capture_frame() self._prev_frame = frame.copy() self._displacement_since_append_x = 0.0 self._displacement_since_append_y = 0.0 return True self._prev_frame = curr_frame.copy() except Exception as e: self.log(f"Row transition error: {e}") self.motion.send_command('s') return False self.motion.send_command('s') time.sleep(self.config.settle_time) return False def _capture_frame(self) -> np.ndarray: frame = self.camera.capture_frame() frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) return frame # ========================================================================= # Getters # ========================================================================= def get_state(self) -> StitchState: with self._state_lock: return StitchState( is_scanning=self.state.is_scanning, direction=self.state.direction, cumulative_x=self.state.cumulative_x, cumulative_y=self.state.cumulative_y, last_displacement=self.state.last_displacement, current_row=self.state.current_row, total_rows=self.state.total_rows, mosaic_width=self.state.mosaic_width, mosaic_height=self.state.mosaic_height, frame_count=self.state.frame_count, append_count=self.state.append_count ) def get_mosaic(self) -> Optional[np.ndarray]: with self._mosaic_lock: if self.mosaic is not None: return self.mosaic.copy() return None def get_mosaic_preview(self, max_size: int = 600) -> Optional[np.ndarray]: with self._mosaic_lock: if self.mosaic is None: return None h, w = self.mosaic.shape[:2] scale = min(max_size / w, max_size / h, 1.0) if scale < 1.0: new_w = int(w * scale) new_h = int(h * scale) return cv2.resize(self.mosaic, (new_w, new_h)) return self.mosaic.copy() def save_mosaic(self, filepath: str) -> bool: with self._mosaic_lock: if self.mosaic is None: return False cv2.imwrite(filepath, self.mosaic) self.log(f"Saved mosaic to {filepath}") return True # ========================================================================= # Testing # ========================================================================= def test_displacement(self, num_frames: int = 10) -> dict: results = {'frames': [], 'total_dx': 0.0, 'total_dy': 0.0} prev_frame = self._capture_frame() for i in range(num_frames): time.sleep(0.1) curr_frame = self._capture_frame() dx, dy = self._detect_displacement(prev_frame, curr_frame) results['frames'].append({'frame': i, 'dx': dx, 'dy': dy}) results['total_dx'] += dx results['total_dy'] += dy prev_frame = curr_frame return results def test_row_transition(self) -> dict: """Test row transition using displacement stitching.""" results = { 'success': False, 'y_moved': 0.0, 'mosaic_before': (0, 0), 'mosaic_after': (0, 0), 'error': None } try: self.log("Testing row transition...") if self.mosaic is None: frame = self._capture_frame() self._init_mosaic(frame) results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height) with self._state_lock: self.state.cumulative_y = 0.0 self.running = True success = self._move_to_next_row() self.running = False results['success'] = success results['y_moved'] = self.state.cumulative_y results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height) self.log(f"Row transition: {'SUCCESS' if success else 'FAILED'}, Y: {results['y_moved']:.1f}px") except Exception as e: results['error'] = str(e) self.log(f"Test error: {e}") self.running = False return results def test_single_row(self, direction: str = 'right') -> dict: """Test scanning a single row.""" results = { 'success': False, 'stop_reason': None, 'appends': 0, 'mosaic_before': (0, 0), 'mosaic_after': (0, 0), 'error': None } try: self.log(f"Testing single row ({direction})...") if self.mosaic is None: frame = self._capture_frame() self._init_mosaic(frame) results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height) appends_before = self.state.append_count self.motion.set_speed(self.config.scan_speed_index) time.sleep(0.1) self.running = True scan_dir = ScanDirection.RIGHT if direction == 'right' else ScanDirection.LEFT stop_reason = self._scan_direction(scan_dir) self.running = False results['success'] = True results['stop_reason'] = stop_reason results['appends'] = self.state.append_count - appends_before results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height) except Exception as e: results['error'] = str(e) self.running = False return results def get_memory_estimate(self) -> dict: current_bytes = self.mosaic.nbytes if self.mosaic is not None else 0 max_bytes = self.config.max_mosaic_width * self.config.max_mosaic_height * 3 return { 'current_size': (self.state.mosaic_width, self.state.mosaic_height), 'current_mb': current_bytes / (1024 * 1024), 'max_size': (self.config.max_mosaic_width, self.config.max_mosaic_height), 'max_mb': max_bytes / (1024 * 1024), }