diff --git a/src/gui.py b/src/gui.py index 5a3ac36..e35052d 100644 --- a/src/gui.py +++ b/src/gui.py @@ -87,6 +87,18 @@ class AppGUI: on_progress=self._on_stitch_progress, on_mosaic_updated=self._on_mosaic_updated ) + + # Update initial memory estimate + self._update_memory_estimate() + + def _update_memory_estimate(self): + """Update memory estimate label""" + bytes_per_pixel = 3 + mem_mb = (self.stitch_config.max_mosaic_width * + self.stitch_config.max_mosaic_height * + bytes_per_pixel) / (1024 * 1024) + if hasattr(self, 'memory_label'): + self.memory_label.config(text=f"~{mem_mb:.0f} MB") def _on_command_sent(self, cmd): self.log_message(f"> {cmd}") @@ -277,63 +289,54 @@ class AppGUI: command=self._show_mosaic_window).pack(side=tk.RIGHT, padx=2) def _build_row2_stitch_settings(self, parent): - """Row 2: Stitching settings""" + """Row 2: Stitching settings with max dimensions""" row = ttk.LabelFrame(parent, text="Stitch Settings") row.pack(fill=tk.X, pady=(0, 3)) - inner = ttk.Frame(row) - inner.pack(fill=tk.X, padx=5, pady=3) + # First row: Threshold, Speed, Overlap + inner1 = ttk.Frame(row) + inner1.pack(fill=tk.X, padx=5, pady=3) # Displacement threshold - ttk.Label(inner, text="Threshold:").pack(side=tk.LEFT) + ttk.Label(inner1, text="Threshold:").pack(side=tk.LEFT) self.disp_threshold_var = tk.DoubleVar(value=0.10) self.disp_threshold_spinbox = ttk.Spinbox( - inner, from_=0.05, to=0.30, increment=0.01, width=5, + inner1, from_=0.05, to=0.30, increment=0.01, width=5, textvariable=self.disp_threshold_var, command=self._update_stitch_config ) self.disp_threshold_spinbox.pack(side=tk.LEFT, padx=(2, 10)) - # Number of rows - ttk.Label(inner, text="Rows:").pack(side=tk.LEFT) - self.num_rows_var = tk.IntVar(value=3) - self.num_rows_spinbox = ttk.Spinbox( - inner, from_=1, to=10, width=3, - textvariable=self.num_rows_var, - command=self._update_stitch_config - ) - self.num_rows_spinbox.pack(side=tk.LEFT, padx=(2, 10)) - - # Row overlap - ttk.Label(inner, text="Overlap:").pack(side=tk.LEFT) - self.row_overlap_var = tk.DoubleVar(value=0.15) - self.row_overlap_spinbox = ttk.Spinbox( - inner, from_=0.05, to=0.50, increment=0.05, width=5, - textvariable=self.row_overlap_var, - command=self._update_stitch_config - ) - self.row_overlap_spinbox.pack(side=tk.LEFT, padx=(2, 10)) - # Scan speed - ttk.Label(inner, text="Speed:").pack(side=tk.LEFT) + ttk.Label(inner1, text="Speed:").pack(side=tk.LEFT) self.scan_speed_var = tk.IntVar(value=3) self.scan_speed_spinbox = ttk.Spinbox( - inner, from_=1, to=6, width=3, + inner1, from_=1, to=6, width=3, textvariable=self.scan_speed_var, command=self._update_stitch_config ) self.scan_speed_spinbox.pack(side=tk.LEFT, padx=(2, 10)) + # Row overlap + ttk.Label(inner1, text="Overlap:").pack(side=tk.LEFT) + self.row_overlap_var = tk.DoubleVar(value=0.15) + self.row_overlap_spinbox = ttk.Spinbox( + inner1, from_=0.05, to=0.50, increment=0.05, width=5, + textvariable=self.row_overlap_var, + command=self._update_stitch_config + ) + self.row_overlap_spinbox.pack(side=tk.LEFT, padx=(2, 10)) + # Autofocus toggle self.af_every_row_var = tk.BooleanVar(value=True) ttk.Checkbutton( - inner, text="AF each row", + inner1, text="AF each row", variable=self.af_every_row_var, command=self._update_stitch_config ).pack(side=tk.LEFT, padx=(10, 0)) # Row/Direction status on right - status_frame = ttk.Frame(inner) + status_frame = ttk.Frame(inner1) status_frame.pack(side=tk.RIGHT) ttk.Label(status_frame, text="Row:").pack(side=tk.LEFT) @@ -343,6 +346,49 @@ class AppGUI: ttk.Label(status_frame, text="Dir:").pack(side=tk.LEFT, padx=(8, 0)) self.direction_label = ttk.Label(status_frame, text="--", width=6, font=('Arial', 9)) self.direction_label.pack(side=tk.LEFT) + + # Second row: Max dimensions and test buttons + inner2 = ttk.Frame(row) + inner2.pack(fill=tk.X, padx=5, pady=(0, 3)) + + # Max Width + ttk.Label(inner2, text="Max W:").pack(side=tk.LEFT) + self.max_width_var = tk.IntVar(value=5000) + self.max_width_entry = ttk.Entry(inner2, textvariable=self.max_width_var, width=7) + self.max_width_entry.pack(side=tk.LEFT, padx=(2, 5)) + self.max_width_entry.bind('', lambda e: self._update_stitch_config()) + + # Max Height + ttk.Label(inner2, text="Max H:").pack(side=tk.LEFT) + self.max_height_var = tk.IntVar(value=5000) + self.max_height_entry = ttk.Entry(inner2, textvariable=self.max_height_var, width=7) + self.max_height_entry.pack(side=tk.LEFT, padx=(2, 5)) + self.max_height_entry.bind('', lambda e: self._update_stitch_config()) + + # Apply button + ttk.Button(inner2, text="Apply", width=5, + command=self._update_stitch_config).pack(side=tk.LEFT, padx=(5, 10)) + + ttk.Separator(inner2, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=5) + + # Test buttons + ttk.Label(inner2, text="Test:").pack(side=tk.LEFT) + + ttk.Button(inner2, text="Row↓", width=5, + command=self._test_row_transition).pack(side=tk.LEFT, padx=2) + + ttk.Button(inner2, text="Scan→", width=5, + command=lambda: self._test_horizontal('right')).pack(side=tk.LEFT, padx=2) + + ttk.Button(inner2, text="Scan←", width=5, + command=lambda: self._test_horizontal('left')).pack(side=tk.LEFT, padx=2) + + ttk.Button(inner2, text="Est", width=4, + command=self._show_scan_estimate).pack(side=tk.LEFT, padx=2) + + # Memory estimate label + self.memory_label = ttk.Label(inner2, text="~-- MB", font=('Arial', 9)) + self.memory_label.pack(side=tk.RIGHT) def _build_row3_movement(self, parent): """Row 3: Movement controls for all axes""" @@ -511,11 +557,15 @@ class AppGUI: def _update_stitch_config(self): """Update stitching scanner config from GUI values""" self.stitch_config.displacement_threshold = self.disp_threshold_var.get() - self.stitch_config.rows = self.num_rows_var.get() + self.stitch_config.max_mosaic_width = self.max_width_var.get() + self.stitch_config.max_mosaic_height = self.max_height_var.get() self.stitch_config.row_overlap = self.row_overlap_var.get() self.stitch_config.scan_speed_index = self.scan_speed_var.get() self.stitch_config.autofocus_every_row = self.af_every_row_var.get() + # Update memory estimate + self._update_memory_estimate() + # Reinitialize stitching scanner with new config self.stitch_scanner = StitchingScanner( camera=self.camera, @@ -526,6 +576,54 @@ class AppGUI: on_progress=self._on_stitch_progress, on_mosaic_updated=self._on_mosaic_updated ) + + self.log_message(f"Config updated: {self.stitch_config.max_mosaic_width}x{self.stitch_config.max_mosaic_height}") + + def _test_row_transition(self): + """Test row transition without full scan""" + if not self.stitch_scanner: + self._update_stitch_config() + + self.log_message("Testing row transition...") + + def run_test(): + result = self.stitch_scanner.test_row_transition() + self.root.after(0, lambda: self.log_message( + f"Row transition: {'SUCCESS' if result['success'] else 'FAILED'}, " + f"Y moved: {result['y_moved']:.1f}px")) + self.root.after(0, self._update_mosaic_window) + + threading.Thread(target=run_test, daemon=True).start() + + def _test_horizontal(self, direction: str): + """Test single row scan""" + if not self.stitch_scanner: + self._update_stitch_config() + + self.log_message(f"Testing single row scan ({direction})...") + + def run_test(): + result = self.stitch_scanner.test_single_row(direction) + self.root.after(0, lambda: self.log_message( + f"Row scan: {result['stop_reason']}, {result['appends']} appends, " + f"mosaic: {result['mosaic_after'][0]}x{result['mosaic_after'][1]}")) + self.root.after(0, self._update_mosaic_window) + + threading.Thread(target=run_test, daemon=True).start() + + def _show_scan_estimate(self): + """Show memory estimate based on current settings""" + if not self.stitch_scanner: + self._update_stitch_config() + + est = self.stitch_scanner.get_memory_estimate() + + msg = (f"Memory Estimate:\n" + f" Current: {est['current_size'][0]}x{est['current_size'][1]} = {est['current_mb']:.1f} MB\n" + f" Max: {est['max_size'][0]}x{est['max_size'][1]} = {est['max_mb']:.0f} MB") + + self.log_message(msg) + self.memory_label.config(text=f"~{est['max_mb']:.0f} MB") # ========================================================================= # Overlay Drawing @@ -926,10 +1024,16 @@ class AppGUI: text=f"X: {state.cumulative_x:.1f} Y: {state.cumulative_y:.1f}" ) - # Update row/direction + # Update row/direction and size if state.is_scanning: - self.row_label.config(text=f"{state.current_row + 1}/{state.total_rows}") + # Show current row and progress percentage + width_pct = min(100, (state.mosaic_width / self.stitch_config.max_mosaic_width) * 100) + height_pct = min(100, (state.mosaic_height / self.stitch_config.max_mosaic_height) * 100) + self.row_label.config(text=f"R{state.current_row + 1}") self.direction_label.config(text=state.direction) + # Update mosaic size label with progress + self.mosaic_size_label.config( + text=f"{state.mosaic_width}x{state.mosaic_height} ({height_pct:.0f}%)") else: self.row_label.config(text="--") self.direction_label.config(text="--") diff --git a/src/stitching_scanner.py b/src/stitching_scanner.py index c8b6b4d..dfdc0f7 100644 --- a/src/stitching_scanner.py +++ b/src/stitching_scanner.py @@ -1,84 +1,59 @@ """ -Stitching Scanner v2 - Fixed displacement tracking +Stitching Scanner v2 - Simplified unified approach -Key fix: Track displacement since last APPEND, not just cumulative. -The strip width must match actual movement since we last added to the mosaic. +Same displacement-based stitching for both horizontal rows and vertical row transitions. +No complex visual matching - just track displacement and append strips. """ import cv2 import numpy as np import time import threading -from dataclasses import dataclass, field -from typing import List, Optional, Callable, Tuple +from dataclasses import dataclass +from typing import Optional, Callable, Tuple from enum import Enum class ScanDirection(Enum): - """Scan direction constants""" - RIGHT = 'right' # X+ (E command) - LEFT = 'left' # X- (W command) - DOWN = 'down' # Y- (N command) - UP = 'up' # Y+ (S command) + RIGHT = 'right' + LEFT = 'left' + DOWN = 'down' + UP = 'up' @dataclass class StitchConfig: - """Stitching scanner configuration""" - # Displacement threshold (percentage of frame size) - displacement_threshold: float = 0.10 # 10% of frame dimension - - # Movement timing - movement_interval: float = 0.001 # Seconds of motor on time - frame_interval: float = 0.25 # Seconds between frame captures (settle time) - settle_time: float = 0.5 # Seconds to wait after stopping - max_row_scan_time: float = 3000.0 # Safety timeout (50 minutes) - - # Scan pattern - rows: int = 3 + displacement_threshold: float = 0.10 # 10% of frame triggers append + movement_interval: float = 0.001 + frame_interval: float = 1.00 + settle_time: float = 0.75 + max_scan_time: float = 300.0 row_overlap: float = 0.15 - - # Speed setting for scanning + max_mosaic_width: int = 15000 + max_mosaic_height: int = 12000 scan_speed_index: int = 3 - - # Focus autofocus_every_row: bool = True - - # Memory management - max_mosaic_width: int = 11000 - max_mosaic_height: int = 11000 -# 11000, 24500, 450000 + @dataclass class StitchState: - """Current state for visualization""" is_scanning: bool = False direction: str = '' - - # Displacement tracking cumulative_x: float = 0.0 cumulative_y: float = 0.0 last_displacement: Tuple[float, float] = (0.0, 0.0) - - # Progress current_row: int = 0 total_rows: int = 0 - - # Mosaic size mosaic_width: int = 0 mosaic_height: int = 0 - - # Debug frame_count: int = 0 append_count: int = 0 class StitchingScanner: """ - Slide scanner using continuous stitching with correct displacement tracking. - - Key insight: We must track displacement since the LAST APPEND, and the - strip we append must exactly match that displacement. + Slide scanner using continuous stitching. + Unified approach for horizontal and vertical movement. """ def __init__(self, camera, motion_controller, autofocus_controller=None, @@ -91,32 +66,25 @@ class StitchingScanner: self.autofocus = autofocus_controller self.config = config or StitchConfig() - # Callbacks self.on_log = on_log self.on_progress = on_progress self.on_mosaic_updated = on_mosaic_updated - # State self.running = False self.paused = False self.state = StitchState() self._state_lock = threading.Lock() - # Mosaic data self.mosaic: Optional[np.ndarray] = None self._mosaic_lock = threading.Lock() - # Frame tracking - KEY CHANGE: separate reference for displacement calc vs append - self._prev_frame: Optional[np.ndarray] = None # For frame-to-frame displacement - self._append_ref_frame: Optional[np.ndarray] = None # Reference from last append - self._displacement_since_append_x: float = 0.0 # Accumulated since last append + self._prev_frame: Optional[np.ndarray] = None + self._displacement_since_append_x: float = 0.0 self._displacement_since_append_y: float = 0.0 - # Thread self._thread: Optional[threading.Thread] = None def log(self, message: str): - """Log a message""" if self.on_log: self.on_log(f"[Stitch] {message}") print(f"[Stitch] {message}") @@ -126,17 +94,12 @@ class StitchingScanner: # ========================================================================= def _to_grayscale(self, frame: np.ndarray) -> np.ndarray: - """Convert frame to grayscale""" if len(frame.shape) == 3: return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) return frame def _detect_displacement(self, prev_frame: np.ndarray, curr_frame: np.ndarray) -> Tuple[float, float]: - """ - Detect displacement between two frames using phase correlation. - Returns (dx, dy) in pixels. - """ prev_gray = self._to_grayscale(prev_frame) curr_gray = self._to_grayscale(curr_frame) @@ -146,20 +109,16 @@ class StitchingScanner: prev_f = prev_gray.astype(np.float32) curr_f = curr_gray.astype(np.float32) - # Apply window function to reduce edge effects h, w = prev_gray.shape window = cv2.createHanningWindow((w, h), cv2.CV_32F) prev_f = prev_f * window curr_f = curr_f * window - shift, response = cv2.phaseCorrelate(prev_f, curr_f) - dx, dy = shift - - return (dx, dy) + shift, _ = cv2.phaseCorrelate(prev_f, curr_f) + return shift def _detect_displacement_robust(self, prev_frame: np.ndarray, curr_frame: np.ndarray) -> Tuple[float, float]: - """Displacement detection with sanity checks""" dx, dy = self._detect_displacement(prev_frame, curr_frame) h, w = prev_frame.shape[:2] @@ -172,17 +131,14 @@ class StitchingScanner: return (dx, dy) # ========================================================================= - # Mosaic Building - FIXED VERSION + # Mosaic Building # ========================================================================= def _init_mosaic(self, frame: np.ndarray): - """Initialize mosaic with first frame""" with self._mosaic_lock: self.mosaic = frame.copy() - # Set reference frames self._prev_frame = frame.copy() - self._append_ref_frame = frame.copy() self._displacement_since_append_x = 0.0 self._displacement_since_append_y = 0.0 @@ -195,20 +151,17 @@ class StitchingScanner: self.log(f"Initialized mosaic: {frame.shape[1]}x{frame.shape[0]}") - def _blend_strips_horizontal(self, base: np.ndarray, strip: np.ndarray, - blend_width: int, append_right: bool) -> np.ndarray: - """Blend strip onto base with gradient at seam to hide discontinuities.""" + def _blend_horizontal(self, base: np.ndarray, strip: np.ndarray, + blend_width: int, append_right: bool) -> np.ndarray: if blend_width <= 0 or blend_width >= strip.shape[1]: if append_right: return np.hstack([base, strip]) - else: - return np.hstack([strip, base]) + return np.hstack([strip, base]) h_base, w_base = base.shape[:2] h_strip, w_strip = strip.shape[:2] if h_strip != h_base: - # Height mismatch - can't blend properly if append_right: return np.hstack([base, strip]) return np.hstack([strip, base]) @@ -216,53 +169,80 @@ class StitchingScanner: blend_w = min(blend_width, w_strip, w_base) if append_right: - # base | blend_zone | rest_of_strip result_width = w_base + w_strip - blend_w result = np.zeros((h_base, result_width, 3), dtype=np.uint8) - - # Copy base result[:, :w_base] = base - # Create gradient: 1->0 for base weight alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis] - base_overlap = base[:, -blend_w:].astype(np.float32) strip_overlap = strip[:, :blend_w].astype(np.float32) blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8) result[:, w_base - blend_w:w_base] = blended result[:, w_base:] = strip[:, blend_w:] - return result else: - # rest_of_strip | blend_zone | base result_width = w_base + w_strip - blend_w result = np.zeros((h_base, result_width, 3), dtype=np.uint8) - result[:, :w_strip] = strip alpha = np.linspace(0, 1, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis] - strip_overlap = strip[:, -blend_w:].astype(np.float32) base_overlap = base[:, :blend_w].astype(np.float32) blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8) result[:, w_strip - blend_w:w_strip] = blended result[:, w_strip:] = base[:, blend_w:] - return result - - def _append_to_mosaic_fixed(self, frame: np.ndarray, direction: ScanDirection): - """ - FIXED: Append with blending and fractional pixel preservation. + + def _blend_vertical(self, base: np.ndarray, strip: np.ndarray, + blend_height: int, append_below: bool) -> np.ndarray: + mh, mw = base.shape[:2] + sh, sw = strip.shape[:2] - Key improvements: - 1. Gradient blending at seams to hide color discontinuities - 2. Preserve fractional pixel remainder to prevent cumulative drift - 3. Small safety margin for alignment tolerance - """ - BLEND_WIDTH = 10 # Pixels to blend at seam - SAFETY_MARGIN = 2 # Extra pixels as tolerance + # Match widths + if sw > mw: + strip = strip[:, :mw] + elif sw < mw: + pad = np.zeros((sh, mw - sw, 3), dtype=np.uint8) + strip = np.hstack([strip, pad]) + + blend_h = min(blend_height, sh, mh) + + if blend_h <= 0: + if append_below: + return np.vstack([base, strip]) + return np.vstack([strip, base]) + + if append_below: + alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis] + base_overlap = base[-blend_h:].astype(np.float32) + strip_overlap = strip[:blend_h].astype(np.float32) + blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8) + + result_h = mh + sh - blend_h + result = np.zeros((result_h, mw, 3), dtype=np.uint8) + result[:mh - blend_h] = base[:-blend_h] + result[mh - blend_h:mh] = blended + result[mh:] = strip[blend_h:] + return result + else: + alpha = np.linspace(0, 1, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis] + strip_overlap = strip[-blend_h:].astype(np.float32) + base_overlap = base[:blend_h].astype(np.float32) + blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8) + + result_h = mh + sh - blend_h + result = np.zeros((result_h, mw, 3), dtype=np.uint8) + result[:sh - blend_h] = strip[:-blend_h] + result[sh - blend_h:sh] = blended + result[sh:] = base[blend_h:] + return result + + def _append_strip(self, frame: np.ndarray, direction: ScanDirection): + """Append strip to mosaic based on accumulated displacement.""" + BLEND_WIDTH = 10 + SAFETY_MARGIN = 2 with self._mosaic_lock: if self.mosaic is None: @@ -275,30 +255,26 @@ class StitchingScanner: dy = abs(self._displacement_since_append_y) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]: - # Round and add safety margin append_width = round(dx) + SAFETY_MARGIN append_width = min(append_width, w - BLEND_WIDTH - 5) if append_width < 1: return - # Calculate fractional remainder to preserve pixels_consumed = append_width - SAFETY_MARGIN fractional_remainder = dx - pixels_consumed if direction == ScanDirection.RIGHT: - # Grab strip with extra for blending strip_start = max(0, w - append_width - BLEND_WIDTH) new_strip = frame[:, strip_start:] - self.mosaic = self._blend_strips_horizontal( + self.mosaic = self._blend_horizontal( self.mosaic, new_strip, BLEND_WIDTH, append_right=True) else: strip_end = min(w, append_width + BLEND_WIDTH) new_strip = frame[:, :strip_end] - self.mosaic = self._blend_strips_horizontal( + self.mosaic = self._blend_horizontal( self.mosaic, new_strip, BLEND_WIDTH, append_right=False) - # KEEP fractional remainder instead of resetting to 0! self._displacement_since_append_x = fractional_remainder self._displacement_since_append_y = 0.0 @@ -313,112 +289,34 @@ class StitchingScanner: fractional_remainder = dy - pixels_consumed if direction == ScanDirection.DOWN: - strip_start = max(0, h - append_height - BLEND_WIDTH) - new_strip = frame[strip_start:, :] - - # Match widths - if new_strip.shape[1] > mw: - new_strip = new_strip[:, :mw] - elif new_strip.shape[1] < mw: - pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8) - new_strip = np.hstack([new_strip, pad]) - - # Vertical blend - blend_h = min(BLEND_WIDTH, new_strip.shape[0], mh) - alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis] - - base_overlap = self.mosaic[-blend_h:].astype(np.float32) - strip_overlap = new_strip[:blend_h].astype(np.float32) - blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8) - - result_h = mh + new_strip.shape[0] - blend_h - result = np.zeros((result_h, mw, 3), dtype=np.uint8) - result[:mh - blend_h] = self.mosaic[:-blend_h] - result[mh - blend_h:mh] = blended - result[mh:] = new_strip[blend_h:] - self.mosaic = result - else: strip_end = min(h, append_height + BLEND_WIDTH) - new_strip = frame[:strip_end, :] - - if new_strip.shape[1] > mw: - new_strip = new_strip[:, :mw] - elif new_strip.shape[1] < mw: - pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8) - new_strip = np.hstack([new_strip, pad]) - - self.mosaic = np.vstack([new_strip, self.mosaic]) + new_strip = frame[:strip_end:, :] + self.mosaic = self._blend_vertical( + self.mosaic, new_strip, BLEND_WIDTH, append_below=False) + else: + strip_start = max(0, h - append_height - BLEND_WIDTH) + new_strip = frame[:strip_start, :] + self.mosaic = self._blend_vertical( + self.mosaic, new_strip, BLEND_WIDTH, append_below=True) self._displacement_since_append_x = 0.0 self._displacement_since_append_y = fractional_remainder new_mh, new_mw = self.mosaic.shape[:2] - # Update state with self._state_lock: self.state.mosaic_width = new_mw self.state.mosaic_height = new_mh self.state.append_count += 1 - # Update reference frame (fractional remainder already set above - don't reset!) - self._append_ref_frame = frame.copy() - if self.on_mosaic_updated: self.on_mosaic_updated() - def _start_new_row(self, frame: np.ndarray, direction: ScanDirection): - """Start a new row in the mosaic""" - with self._mosaic_lock: - if self.mosaic is None: - self._init_mosaic(frame) - return - - h, w = frame.shape[:2] - mh, mw = self.mosaic.shape[:2] - - # Calculate overlap - overlap_pixels = int(h * self.config.row_overlap) - append_height = h - overlap_pixels - - if direction == ScanDirection.DOWN: - new_strip = frame[overlap_pixels:, :] - - if new_strip.shape[1] < mw: - pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8) - new_strip = np.hstack([new_strip, pad]) - elif new_strip.shape[1] > mw: - new_strip = new_strip[:, :mw] - - self.mosaic = np.vstack([self.mosaic, new_strip]) - else: - new_strip = frame[:append_height, :] - - if new_strip.shape[1] < mw: - pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8) - new_strip = np.hstack([new_strip, pad]) - elif new_strip.shape[1] > mw: - new_strip = new_strip[:, :mw] - - self.mosaic = np.vstack([new_strip, self.mosaic]) - - # Reset all tracking for new row - self._prev_frame = frame.copy() - self._append_ref_frame = frame.copy() - self._displacement_since_append_x = 0.0 - self._displacement_since_append_y = 0.0 - - with self._state_lock: - self.state.mosaic_height = self.mosaic.shape[0] - self.state.mosaic_width = self.mosaic.shape[1] - - self.log(f"New row started, mosaic: {self.mosaic.shape[1]}x{self.mosaic.shape[0]}") - # ========================================================================= # Scan Control # ========================================================================= def start(self) -> bool: - """Start the stitching scan""" if self.running: self.log("Already running") return False @@ -429,13 +327,11 @@ class StitchingScanner: with self._state_lock: self.state = StitchState() self.state.is_scanning = True - self.state.total_rows = self.config.rows with self._mosaic_lock: self.mosaic = None self._prev_frame = None - self._append_ref_frame = None self._displacement_since_append_x = 0.0 self._displacement_since_append_y = 0.0 @@ -446,7 +342,6 @@ class StitchingScanner: return True def stop(self): - """Stop the scan""" self.running = False self.paused = False self.motion.stop_all() @@ -457,26 +352,24 @@ class StitchingScanner: self.log("Scan stopped") def pause(self): - """Pause the scan""" if self.running and not self.paused: self.paused = True self.motion.stop_all() self.log("Scan paused") def resume(self): - """Resume the scan""" if self.running and self.paused: self.paused = False self.log("Scan resumed") # ========================================================================= - # Main Scan Loop + # Scanning Logic # ========================================================================= def _scan_loop(self): - """Main scanning loop""" try: self.log("Starting scan loop") + self.log(f"Max dimensions: {self.config.max_mosaic_width}x{self.config.max_mosaic_height}") self.motion.set_speed(self.config.scan_speed_index) time.sleep(0.1) @@ -484,30 +377,35 @@ class StitchingScanner: frame = self._capture_frame() self._init_mosaic(frame) - for row in range(self.config.rows): - if not self.running: - break - + row = 0 + while self.running: with self._state_lock: self.state.current_row = row + self.state.total_rows = row + 1 - self.log(f"=== Row {row + 1}/{self.config.rows} ===") + self.log(f"=== Row {row + 1} ===") - # Serpentine pattern - if row % 2 == 0: - h_direction = ScanDirection.RIGHT - else: - h_direction = ScanDirection.LEFT + # Serpentine: even rows right, odd rows left + h_direction = ScanDirection.RIGHT if row % 2 == 0 else ScanDirection.LEFT - self._scan_horizontal(h_direction) + stop_reason = self._scan_direction(h_direction) if not self.running: break - if row < self.config.rows - 1: - self._move_to_next_row() + # Check max height + if self.state.mosaic_height >= self.config.max_mosaic_height: + self.log(f"Max height reached ({self.state.mosaic_height}px)") + break + + # Move to next row using same stitching approach + if not self._move_to_next_row(): + self.log("Failed to move to next row") + break + + row += 1 - self.log("Scan complete!") + self.log(f"Scan complete! Final: {self.state.mosaic_width}x{self.state.mosaic_height}") except Exception as e: self.log(f"Scan error: {e}") @@ -519,8 +417,8 @@ class StitchingScanner: with self._state_lock: self.state.is_scanning = False - def _scan_horizontal(self, direction: ScanDirection): - """Scan horizontally with fixed displacement tracking""" + def _scan_direction(self, direction: ScanDirection) -> str: + """Scan in a direction until edge or max dimension reached.""" self.log(f"Scanning {direction.value}...") with self._state_lock: @@ -528,44 +426,51 @@ class StitchingScanner: frame = self._capture_frame() h, w = frame.shape[:2] - threshold_pixels = w * self.config.displacement_threshold - # Initialize tracking + # Setup based on direction + if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]: + threshold_pixels = w * self.config.displacement_threshold + max_dim = self.config.max_mosaic_width + current_dim = lambda: self.state.mosaic_width + start_cmd = 'E' if direction == ScanDirection.RIGHT else 'W' + stop_cmd = 'e' if direction == ScanDirection.RIGHT else 'w' + else: + threshold_pixels = h * self.config.displacement_threshold + max_dim = self.config.max_mosaic_height + current_dim = lambda: self.state.mosaic_height + start_cmd = 'S' if direction == ScanDirection.DOWN else 'N' + stop_cmd = 's' if direction == ScanDirection.DOWN else 'n' + self._prev_frame = frame.copy() - self._append_ref_frame = frame.copy() self._displacement_since_append_x = 0.0 self._displacement_since_append_y = 0.0 start_time = time.time() no_movement_count = 0 max_no_movement = 50 + stop_reason = 'stopped' while self.running and not self.paused: - if time.time() - start_time > self.config.max_row_scan_time: + if time.time() - start_time > self.config.max_scan_time: self.log("Scan timeout") + stop_reason = 'timeout' break - # Pulse the motor - if direction == ScanDirection.RIGHT: - self.motion.send_command('E') - else: - self.motion.send_command('W') + if current_dim() >= max_dim: + self.log(f"Max dimension reached ({current_dim()}px)") + stop_reason = 'max_dim' + break + # Pulse motor + self.motion.send_command(start_cmd) time.sleep(self.config.movement_interval) + self.motion.send_command(stop_cmd) - if direction == ScanDirection.RIGHT: - self.motion.send_command('e') - else: - self.motion.send_command('w') - - # Wait for settle time.sleep(self.config.frame_interval) - # Capture and measure curr_frame = self._capture_frame() dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame) - # Accumulate displacement SINCE LAST APPEND self._displacement_since_append_x += dx self._displacement_since_append_y += dy @@ -575,78 +480,119 @@ class StitchingScanner: self.state.last_displacement = (dx, dy) self.state.frame_count += 1 - # Check for no movement - if abs(dx) < 1.0 and abs(dy) < 1.0: + # Edge detection + movement = abs(dx) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(dy) + if movement < 1.0: no_movement_count += 1 if no_movement_count >= max_no_movement: - self.log(f"Edge detected (no movement for {no_movement_count} frames)") + self.log(f"Edge detected (no movement)") + stop_reason = 'edge' break else: no_movement_count = 0 - # Check threshold and append - if abs(self._displacement_since_append_x) >= threshold_pixels: - self._append_to_mosaic_fixed(curr_frame, direction) - self.log(f"Appended {abs(self._displacement_since_append_x):.1f}px strip, " - f"mosaic: {self.state.mosaic_width}x{self.state.mosaic_height}") + # Append when threshold reached + disp = abs(self._displacement_since_append_x) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(self._displacement_since_append_y) + if disp >= threshold_pixels: + self._append_strip(curr_frame, direction) + self.log(f"Appended {disp:.1f}px, mosaic: {self.state.mosaic_width}x{self.state.mosaic_height}") - # Update prev_frame for next displacement calculation self._prev_frame = curr_frame.copy() if self.on_progress: self.on_progress(self.state.append_count, 0) - # Stop - if direction == ScanDirection.RIGHT: - self.motion.send_command('e') - else: - self.motion.send_command('w') - + self.motion.send_command(stop_cmd) time.sleep(self.config.settle_time) + self.log(f"Direction finished: {stop_reason}") + return stop_reason - def _move_to_next_row(self): - """Move down to next row""" + def _move_to_next_row(self) -> bool: + """ + Move down to next row using displacement-based stitching. + Same approach as horizontal scanning. + """ self.log("Moving to next row...") frame = self._capture_frame() h, w = frame.shape[:2] - move_distance = h * (1 - self.config.row_overlap) + + # Target: move (1 - overlap) * frame_height + target_displacement = h * (1 - self.config.row_overlap) + threshold_pixels = h * self.config.displacement_threshold + + self.log(f"Target Y: {target_displacement:.0f}px, threshold: {threshold_pixels:.0f}px") with self._state_lock: self.state.direction = 'down' - - self.motion.send_command('N') + self.state.cumulative_y = 0.0 self._prev_frame = frame.copy() - cumulative_y = 0.0 + self._displacement_since_append_x = 0.0 + self._displacement_since_append_y = 0.0 - while self.running: - time.sleep(self.config.frame_interval) - - curr_frame = self._capture_frame() - dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame) - - cumulative_y += dy - self._prev_frame = curr_frame.copy() - - with self._state_lock: - self.state.cumulative_y = cumulative_y - - if abs(cumulative_y) >= move_distance: - break - - if abs(cumulative_y) < 5 and self.state.frame_count > 50: - self.log("Warning: Minimal Y movement") - break + total_y = 0.0 + no_movement_count = 0 + max_no_movement = 30 - self.motion.send_command('n') + # Start moving South + self.motion.send_command('S') + + try: + while self.running: + time.sleep(self.config.frame_interval) + + curr_frame = self._capture_frame() + dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame) + + self._displacement_since_append_y += dy + total_y += dy + + with self._state_lock: + self.state.cumulative_y = total_y + self.state.last_displacement = (dx, dy) + + # Edge detection + if abs(dy) < 1.0: + no_movement_count += 1 + if no_movement_count >= max_no_movement: + self.log("Edge detected during row transition") + self.motion.send_command('s') + time.sleep(self.config.settle_time) + return False + else: + no_movement_count = 0 + + # Append strip when threshold reached + if abs(self._displacement_since_append_y) >= threshold_pixels: + self._append_strip(curr_frame, ScanDirection.DOWN) + self.log(f" Row transition: appended, total Y: {abs(total_y):.1f}px") + + # Done when we've moved enough + if abs(total_y) >= target_displacement: + self.log(f"Row transition complete: {abs(total_y):.1f}px") + self.motion.send_command('s') + time.sleep(self.config.settle_time) + + # Reset for next horizontal row + frame = self._capture_frame() + self._prev_frame = frame.copy() + self._displacement_since_append_x = 0.0 + self._displacement_since_append_y = 0.0 + return True + + self._prev_frame = curr_frame.copy() + + except Exception as e: + self.log(f"Row transition error: {e}") + self.motion.send_command('s') + return False + + self.motion.send_command('s') time.sleep(self.config.settle_time) - - frame = self._capture_frame() - self._start_new_row(frame, ScanDirection.DOWN) + return False def _capture_frame(self) -> np.ndarray: - """Capture and rotate frame""" frame = self.camera.capture_frame() frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) return frame @@ -656,7 +602,6 @@ class StitchingScanner: # ========================================================================= def get_state(self) -> StitchState: - """Get current scan state""" with self._state_lock: return StitchState( is_scanning=self.state.is_scanning, @@ -673,14 +618,12 @@ class StitchingScanner: ) def get_mosaic(self) -> Optional[np.ndarray]: - """Get current mosaic (full resolution)""" with self._mosaic_lock: if self.mosaic is not None: return self.mosaic.copy() return None def get_mosaic_preview(self, max_size: int = 600) -> Optional[np.ndarray]: - """Get scaled mosaic for preview""" with self._mosaic_lock: if self.mosaic is None: return None @@ -696,11 +639,9 @@ class StitchingScanner: return self.mosaic.copy() def save_mosaic(self, filepath: str) -> bool: - """Save mosaic to file""" with self._mosaic_lock: if self.mosaic is None: return False - cv2.imwrite(filepath, self.mosaic) self.log(f"Saved mosaic to {filepath}") return True @@ -710,25 +651,106 @@ class StitchingScanner: # ========================================================================= def test_displacement(self, num_frames: int = 10) -> dict: - """Test displacement detection""" - results = { - 'frames': [], - 'total_dx': 0.0, - 'total_dy': 0.0 - } - + results = {'frames': [], 'total_dx': 0.0, 'total_dy': 0.0} prev_frame = self._capture_frame() for i in range(num_frames): time.sleep(0.1) curr_frame = self._capture_frame() - dx, dy = self._detect_displacement(prev_frame, curr_frame) - results['frames'].append({'frame': i, 'dx': dx, 'dy': dy}) results['total_dx'] += dx results['total_dy'] += dy - prev_frame = curr_frame - return results \ No newline at end of file + return results + + def test_row_transition(self) -> dict: + """Test row transition using displacement stitching.""" + results = { + 'success': False, + 'y_moved': 0.0, + 'mosaic_before': (0, 0), + 'mosaic_after': (0, 0), + 'error': None + } + + try: + self.log("Testing row transition...") + + if self.mosaic is None: + frame = self._capture_frame() + self._init_mosaic(frame) + + results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height) + + with self._state_lock: + self.state.cumulative_y = 0.0 + + self.running = True + success = self._move_to_next_row() + self.running = False + + results['success'] = success + results['y_moved'] = self.state.cumulative_y + results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height) + + self.log(f"Row transition: {'SUCCESS' if success else 'FAILED'}, Y: {results['y_moved']:.1f}px") + + except Exception as e: + results['error'] = str(e) + self.log(f"Test error: {e}") + self.running = False + + return results + + def test_single_row(self, direction: str = 'right') -> dict: + """Test scanning a single row.""" + results = { + 'success': False, + 'stop_reason': None, + 'appends': 0, + 'mosaic_before': (0, 0), + 'mosaic_after': (0, 0), + 'error': None + } + + try: + self.log(f"Testing single row ({direction})...") + + if self.mosaic is None: + frame = self._capture_frame() + self._init_mosaic(frame) + + results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height) + appends_before = self.state.append_count + + self.motion.set_speed(self.config.scan_speed_index) + time.sleep(0.1) + + self.running = True + scan_dir = ScanDirection.RIGHT if direction == 'right' else ScanDirection.LEFT + stop_reason = self._scan_direction(scan_dir) + self.running = False + + results['success'] = True + results['stop_reason'] = stop_reason + results['appends'] = self.state.append_count - appends_before + results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height) + + except Exception as e: + results['error'] = str(e) + self.running = False + + return results + + def get_memory_estimate(self) -> dict: + current_bytes = self.mosaic.nbytes if self.mosaic is not None else 0 + max_bytes = self.config.max_mosaic_width * self.config.max_mosaic_height * 3 + + return { + 'current_size': (self.state.mosaic_width, self.state.mosaic_height), + 'current_mb': current_bytes / (1024 * 1024), + 'max_size': (self.config.max_mosaic_width, self.config.max_mosaic_height), + 'max_mb': max_bytes / (1024 * 1024), + } \ No newline at end of file