From 2a894d7d9886b30939c725b7933455d4508d34ed Mon Sep 17 00:00:00 2001 From: Shaiv Kamat Date: Mon, 5 Jan 2026 07:59:38 -0800 Subject: [PATCH] Moved away from tile based stiching to continuouse stiching --- src/camera.py | 170 ++++++- src/gui.py | 981 +++++++++++++++++---------------------- src/stitching_scanner.py | 734 +++++++++++++++++++++++++++++ 3 files changed, 1329 insertions(+), 556 deletions(-) create mode 100644 src/stitching_scanner.py diff --git a/src/camera.py b/src/camera.py index a2e9f30..5a1f82a 100644 --- a/src/camera.py +++ b/src/camera.py @@ -1,22 +1,183 @@ import cv2 +import subprocess +import re + + +def detect_camera_modes(device_id=0): + """ + Detect available camera resolutions using v4l2-ctl. + Returns dict of modes sorted by resolution (smallest to largest). + """ + modes = {} + + try: + # Run v4l2-ctl to get supported formats + device = f"/dev/video{device_id}" + result = subprocess.run( + ['v4l2-ctl', '--device', device, '--list-formats-ext'], + capture_output=True, + text=True, + timeout=5 + ) + + if result.returncode != 0: + print(f"v4l2-ctl failed: {result.stderr}") + return _get_fallback_modes() + + # Parse output for "Size: Discrete WxH" lines + # Example: "Size: Discrete 2592x1944" + size_pattern = re.compile(r'Size:\s+Discrete\s+(\d+)x(\d+)') + + resolutions = set() # Use set to avoid duplicates + for line in result.stdout.split('\n'): + match = size_pattern.search(line) + if match: + width = int(match.group(1)) + height = int(match.group(2)) + resolutions.add((width, height)) + + if not resolutions: + print("No resolutions found in v4l2-ctl output") + return _get_fallback_modes() + + # Sort by total pixels (width * height) + sorted_res = sorted(resolutions, key=lambda r: r[0] * r[1]) + + # Build modes dict with descriptive names + for i, (width, height) in enumerate(sorted_res): + pixels = width * height + + # Generate a name based on position/size + if i == 0: + name = 'low' + desc = 'Low' + elif i == len(sorted_res) - 1: + name = 'high' + desc = 'High' + elif len(sorted_res) == 3 and i == 1: + name = 'medium' + desc = 'Medium' + else: + name = f'res_{width}x{height}' + desc = f'{width}x{height}' + + modes[name] = { + 'width': width, + 'height': height, + 'label': f'{width}x{height} ({desc})' + } + + print(f"Detected {len(modes)} camera modes: {list(modes.keys())}") + return modes + + except FileNotFoundError: + print("v4l2-ctl not found, using fallback modes") + return _get_fallback_modes() + except subprocess.TimeoutExpired: + print("v4l2-ctl timed out, using fallback modes") + return _get_fallback_modes() + except Exception as e: + print(f"Error detecting camera modes: {e}") + return _get_fallback_modes() + + +def _get_fallback_modes(): + """Fallback modes if v4l2-ctl detection fails""" + return { + 'low': {'width': 640, 'height': 480, 'label': '640x480 (Low)'}, + 'medium': {'width': 1280, 'height': 960, 'label': '1280x960 (Medium)'}, + 'high': {'width': 1920, 'height': 1080, 'label': '1920x1080 (High)'}, + } + class Camera: - def __init__(self, device_id=0): + prevFrame = {} + def __init__(self, device_id=0, mode=None): + self.device_id = device_id + + # Detect available modes before opening camera + self.MODES = detect_camera_modes(device_id) + + # Open camera self.cap = cv2.VideoCapture(device_id) if not self.cap.isOpened(): raise RuntimeError("Could not open camera, stop program") + # Default to highest resolution if no mode specified + if mode is None: + mode = list(self.MODES.keys())[0] # Last = highest res + + self.current_mode = mode + self._apply_mode(mode) + # set resolution # self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) # self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080) - + self.cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0) # Disable auto-exposure + self.cap.set(cv2.CAP_PROP_EXPOSURE, -6) # Set fixed exposure + self.cap.set(cv2.CAP_PROP_AUTO_WB, 0) # Disable auto white balance self.window_name = "AutoScope" + def _apply_mode(self, mode_name): + """Apply resolution settings""" + if mode_name not in self.MODES: + print(f"Unknown mode {mode_name}, using first available") + mode_name = list(self.MODES.keys())[0] + + mode = self.MODES[mode_name] + + # Set resolution + self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, mode['width']) + self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, mode['height']) + + self.current_mode = mode_name + + # Verify settings took effect + actual_w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + actual_h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + actual_fps = self.cap.get(cv2.CAP_PROP_FPS) + + print(f"Camera mode: {mode['label']}") + print(f" Actual: {actual_w}x{actual_h} @ {actual_fps:.1f}fps") + + return actual_w, actual_h, actual_fps + + def set_mode(self, mode_name): + """Change camera mode (resolution/framerate)""" + return self._apply_mode(mode_name) + + def get_mode(self): + """Get current mode name""" + return self.current_mode + + def get_mode_info(self): + """Get current mode details""" + return self.MODES.get(self.current_mode, list(self.MODES.values())[0]) + + def get_resolution(self): + """Get current actual resolution""" + w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + return w, h + + def get_fps(self): + """Get current actual FPS""" + return self.cap.get(cv2.CAP_PROP_FPS) + + def get_available_modes(self): + """Get list of available mode names""" + return list(self.MODES.keys()) + + def get_mode_labels(self): + """Get mode labels for UI""" + return {k: v['label'] for k, v in self.MODES.items()} + def capture_frame(self): ret, frame = self.cap.read() if not ret: - raise RuntimeError("Failed to capture frame, stop program") + return prevframe + prevframe = frame return frame def show_frame(self, frame): @@ -68,5 +229,4 @@ class Camera: if key == ord('q'): break - self.close_window() - \ No newline at end of file + self.close_window() \ No newline at end of file diff --git a/src/gui.py b/src/gui.py index d8ccc2f..5a3ac36 100644 --- a/src/gui.py +++ b/src/gui.py @@ -1,12 +1,11 @@ """ -AutoScope GUI - Vertical Monitor Layout with Edge Comparison Overlay +AutoScope GUI - Updated with Stitching Scanner Integration -Shows reference and current binary edge images overlaid directly on camera view. -Updated with interpolation controls and improved scanner UI. +Includes both tile-based scanner and new stitching scanner modes. """ import tkinter as tk -from tkinter import ttk, scrolledtext +from tkinter import ttk, scrolledtext, filedialog from PIL import Image, ImageTk import cv2 import numpy as np @@ -16,6 +15,7 @@ import queue from motion_controller import MotionController from autofocus import AutofocusController from scanner import Scanner, ScanConfig, ScanDirection, Tile +from stitching_scanner import StitchingScanner, StitchConfig class AppGUI: @@ -34,9 +34,14 @@ class AppGUI: on_focus_update=self._update_focus_display_threadsafe ) - # Scanner - self.scanner = None + # Scanners - both tile-based and stitching + self.scanner = None # Tile-based scanner + self.stitch_scanner = None # Stitching scanner self.scan_config = ScanConfig() + self.stitch_config = StitchConfig() + + # Scanner mode: 'tile' or 'stitch' + self.scanner_mode = 'stitch' # Default to new stitching mode # Queues for thread-safe updates self.serial_queue = queue.Queue() @@ -53,14 +58,15 @@ class AppGUI: self.root.protocol("WM_DELETE_WINDOW", self.on_close) self._build_ui() - self._init_scanner() + self._init_scanners() # Start serial reader thread self.serial_thread = threading.Thread(target=self._serial_reader, daemon=True) self.serial_thread.start() - def _init_scanner(self): - """Initialize scanner with current config""" + def _init_scanners(self): + """Initialize both scanner types""" + # Tile-based scanner self.scanner = Scanner( camera=self.camera, motion_controller=self.motion, @@ -70,6 +76,17 @@ class AppGUI: on_log=self.log_message, on_progress=self._on_scan_progress ) + + # Stitching scanner + self.stitch_scanner = StitchingScanner( + camera=self.camera, + motion_controller=self.motion, + autofocus_controller=self.autofocus, + config=self.stitch_config, + on_log=self.log_message, + on_progress=self._on_stitch_progress, + on_mosaic_updated=self._on_mosaic_updated + ) def _on_command_sent(self, cmd): self.log_message(f"> {cmd}") @@ -83,6 +100,13 @@ class AppGUI: def _on_scan_progress(self, current: int, total: int): self.root.after(0, lambda: self._update_progress(current, total)) + def _on_stitch_progress(self, appends: int, total: int): + self.root.after(0, lambda: self._update_stitch_progress(appends)) + + def _on_mosaic_updated(self): + """Called when stitching scanner updates mosaic""" + self.root.after(0, self._update_mosaic_window) + # ========================================================================= # UI Building - Vertical Layout # ========================================================================= @@ -103,69 +127,68 @@ class AppGUI: cam_opts1.pack(fill=tk.X, padx=5, pady=(0, 2)) self.show_tile_overlay_var = tk.BooleanVar(value=True) - ttk.Checkbutton(cam_opts1, text="Tile bounds", + ttk.Checkbutton(cam_opts1, text="Bounds", variable=self.show_tile_overlay_var).pack(side=tk.LEFT) - self.show_edge_regions_var = tk.BooleanVar(value=False) - ttk.Checkbutton(cam_opts1, text="Edge regions", - variable=self.show_edge_regions_var).pack(side=tk.LEFT, padx=(10, 0)) - - self.show_comparison_var = tk.BooleanVar(value=True) - ttk.Checkbutton(cam_opts1, text="Comparison overlay", - variable=self.show_comparison_var).pack(side=tk.LEFT, padx=(10, 0)) + self.show_displacement_var = tk.BooleanVar(value=True) + ttk.Checkbutton(cam_opts1, text="Displacement", + variable=self.show_displacement_var).pack(side=tk.LEFT, padx=(10, 0)) self.live_focus_var = tk.BooleanVar(value=True) ttk.Checkbutton(cam_opts1, text="Live focus", variable=self.live_focus_var).pack(side=tk.LEFT, padx=(10, 0)) + # Resolution selector + ttk.Separator(cam_opts1, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=10) + ttk.Label(cam_opts1, text="Res:").pack(side=tk.LEFT) + + self.resolution_var = tk.StringVar(value=self.camera.get_mode()) + self.resolution_combo = ttk.Combobox( + cam_opts1, + textvariable=self.resolution_var, + values=list(self.camera.get_mode_labels().values()), + state='readonly', + width=18 + ) + # Set display to label + mode_labels = self.camera.get_mode_labels() + self.resolution_combo.set(mode_labels[self.camera.get_mode()]) + self.resolution_combo.pack(side=tk.LEFT, padx=2) + self.resolution_combo.bind('<>', self._on_resolution_change) + self.focus_score_label = ttk.Label(cam_opts1, text="Focus: --", font=('Arial', 11, 'bold')) self.focus_score_label.pack(side=tk.RIGHT) - # Camera options row 2: threshold and similarity + # Camera options row 2: displacement info cam_opts2 = ttk.Frame(camera_frame) cam_opts2.pack(fill=tk.X, padx=5, pady=(0, 5)) - ttk.Label(cam_opts2, text="Match threshold:").pack(side=tk.LEFT) + ttk.Label(cam_opts2, text="Displacement:").pack(side=tk.LEFT) + self.displacement_label = ttk.Label(cam_opts2, text="X: -- Y: --", font=('Arial', 10)) + self.displacement_label.pack(side=tk.LEFT, padx=5) - self.threshold_var = tk.DoubleVar(value=0.12) - self.threshold_slider = ttk.Scale( - cam_opts2, - from_=0.05, - to=0.8, - orient=tk.HORIZONTAL, - variable=self.threshold_var, - command=self._on_threshold_change, - length=100 - ) - self.threshold_slider.pack(side=tk.LEFT, padx=5) - - self.threshold_label = ttk.Label(cam_opts2, text="0.12", width=5) - self.threshold_label.pack(side=tk.LEFT) - - ttk.Button(cam_opts2, text="Test Edges", width=10, - command=self._test_edge_comparison).pack(side=tk.LEFT, padx=(10, 0)) - - ttk.Button(cam_opts2, text="Test Interp", width=10, - command=self._test_interpolation).pack(side=tk.LEFT, padx=(5, 0)) - - # Similarity display ttk.Separator(cam_opts2, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=10) - ttk.Label(cam_opts2, text="Sim:").pack(side=tk.LEFT) - self.similarity_label = ttk.Label(cam_opts2, text="--", font=('Arial', 10, 'bold'), width=6) - self.similarity_label.pack(side=tk.LEFT) - self.match_status_label = ttk.Label(cam_opts2, text="", width=10) - self.match_status_label.pack(side=tk.LEFT, padx=(3, 0)) + ttk.Label(cam_opts2, text="Mosaic:").pack(side=tk.LEFT) + self.mosaic_size_label = ttk.Label(cam_opts2, text="--", font=('Arial', 10)) + self.mosaic_size_label.pack(side=tk.LEFT, padx=5) + + ttk.Separator(cam_opts2, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=10) + + ttk.Label(cam_opts2, text="Cam:").pack(side=tk.LEFT) + self.cam_res_label = ttk.Label(cam_opts2, text="--", font=('Arial', 10)) + self.cam_res_label.pack(side=tk.LEFT, padx=5) + self._update_cam_res_label() # === BOTTOM: Control Panel === control_frame = ttk.Frame(main_frame) control_frame.pack(fill=tk.X) - # Row 1: Emergency Stop + Scanner Controls - self._build_row1_emergency_scanner(control_frame) + # Row 1: Emergency Stop + Scanner Mode + Controls + self._build_row1_scanner(control_frame) - # Row 2: Interpolation Settings (NEW) - self._build_row2_interpolation(control_frame) + # Row 2: Stitching Settings + self._build_row2_stitch_settings(control_frame) # Row 3: Movement Controls self._build_row3_movement(control_frame) @@ -176,7 +199,7 @@ class AppGUI: # Row 5: Status + Log self._build_row5_status_log(control_frame) - def _build_row1_emergency_scanner(self, parent): + def _build_row1_scanner(self, parent): """Row 1: Emergency stop and scanner controls""" row = ttk.Frame(parent) row.pack(fill=tk.X, pady=(0, 3)) @@ -189,13 +212,20 @@ class AppGUI: ) self.emergency_btn.pack(side=tk.LEFT, padx=(0, 8)) - # Scanner controls - scanner_frame = ttk.LabelFrame(row, text="Scanner") + # Scanner frame + scanner_frame = ttk.LabelFrame(row, text="Stitching Scanner") scanner_frame.pack(side=tk.LEFT, fill=tk.X, expand=True) sf = ttk.Frame(scanner_frame) sf.pack(fill=tk.X, padx=5, pady=3) + # Scanner mode toggle + self.scanner_mode_var = tk.StringVar(value='stitch') + ttk.Radiobutton(sf, text="Stitch", variable=self.scanner_mode_var, + value='stitch', command=self._on_mode_change).pack(side=tk.LEFT) + ttk.Radiobutton(sf, text="Tile", variable=self.scanner_mode_var, + value='tile', command=self._on_mode_change).pack(side=tk.LEFT, padx=(5, 10)) + # Status indicator self.scan_status_label = ttk.Label(sf, text="Idle", font=('Arial', 9, 'bold'), width=8) self.scan_status_label.pack(side=tk.LEFT) @@ -204,13 +234,13 @@ class AppGUI: self.scan_progress_bar = ttk.Progressbar(sf, mode='indeterminate', length=60) self.scan_progress_bar.pack(side=tk.LEFT, padx=3) - self.scan_progress_label = ttk.Label(sf, text="0 tiles", width=8) + self.scan_progress_label = ttk.Label(sf, text="0 appends", width=10) self.scan_progress_label.pack(side=tk.LEFT) # Separator ttk.Separator(sf, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=5) - # Control buttons with proper styling + # Control buttons btn_frame = ttk.Frame(sf) btn_frame.pack(side=tk.LEFT) @@ -238,96 +268,81 @@ class AppGUI: ) self.scan_stop_btn.pack(side=tk.LEFT, padx=2) - # Capture single tile - ttk.Button(btn_frame, text="📷", width=3, - command=self._capture_single_tile).pack(side=tk.LEFT, padx=2) + # Test button + ttk.Button(btn_frame, text="Test", width=4, + command=self._test_displacement).pack(side=tk.LEFT, padx=2) # Mosaic button ttk.Button(sf, text="Mosaic", width=6, command=self._show_mosaic_window).pack(side=tk.RIGHT, padx=2) - def _build_row2_interpolation(self, parent): - """Row 2: Interpolation and peak detection settings""" - row = ttk.LabelFrame(parent, text="Matching Settings") + def _build_row2_stitch_settings(self, parent): + """Row 2: Stitching settings""" + row = ttk.LabelFrame(parent, text="Stitch Settings") row.pack(fill=tk.X, pady=(0, 3)) inner = ttk.Frame(row) inner.pack(fill=tk.X, padx=5, pady=3) - # Interpolation toggle - self.use_interpolation_var = tk.BooleanVar(value=True) + # Displacement threshold + ttk.Label(inner, text="Threshold:").pack(side=tk.LEFT) + self.disp_threshold_var = tk.DoubleVar(value=0.10) + self.disp_threshold_spinbox = ttk.Spinbox( + inner, from_=0.05, to=0.30, increment=0.01, width=5, + textvariable=self.disp_threshold_var, + command=self._update_stitch_config + ) + self.disp_threshold_spinbox.pack(side=tk.LEFT, padx=(2, 10)) + + # Number of rows + ttk.Label(inner, text="Rows:").pack(side=tk.LEFT) + self.num_rows_var = tk.IntVar(value=3) + self.num_rows_spinbox = ttk.Spinbox( + inner, from_=1, to=10, width=3, + textvariable=self.num_rows_var, + command=self._update_stitch_config + ) + self.num_rows_spinbox.pack(side=tk.LEFT, padx=(2, 10)) + + # Row overlap + ttk.Label(inner, text="Overlap:").pack(side=tk.LEFT) + self.row_overlap_var = tk.DoubleVar(value=0.15) + self.row_overlap_spinbox = ttk.Spinbox( + inner, from_=0.05, to=0.50, increment=0.05, width=5, + textvariable=self.row_overlap_var, + command=self._update_stitch_config + ) + self.row_overlap_spinbox.pack(side=tk.LEFT, padx=(2, 10)) + + # Scan speed + ttk.Label(inner, text="Speed:").pack(side=tk.LEFT) + self.scan_speed_var = tk.IntVar(value=3) + self.scan_speed_spinbox = ttk.Spinbox( + inner, from_=1, to=6, width=3, + textvariable=self.scan_speed_var, + command=self._update_stitch_config + ) + self.scan_speed_spinbox.pack(side=tk.LEFT, padx=(2, 10)) + + # Autofocus toggle + self.af_every_row_var = tk.BooleanVar(value=True) ttk.Checkbutton( - inner, text="Interpolation", - variable=self.use_interpolation_var, - command=self._update_scan_config - ).pack(side=tk.LEFT) + inner, text="AF each row", + variable=self.af_every_row_var, + command=self._update_stitch_config + ).pack(side=tk.LEFT, padx=(10, 0)) - # Number of interpolations - ttk.Label(inner, text="Steps:").pack(side=tk.LEFT, padx=(10, 2)) - self.num_interp_var = tk.IntVar(value=10) - self.num_interp_spinbox = ttk.Spinbox( - inner, from_=5, to=30, width=4, - textvariable=self.num_interp_var, - command=self._update_scan_config - ) - self.num_interp_spinbox.pack(side=tk.LEFT) - - # Separator - ttk.Separator(inner, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=10) - - # Peak detection toggle - self.use_peak_detection_var = tk.BooleanVar(value=True) - ttk.Checkbutton( - inner, text="Peak Detection", - variable=self.use_peak_detection_var, - command=self._update_scan_config - ).pack(side=tk.LEFT) - - # Peak window size - ttk.Label(inner, text="Window:").pack(side=tk.LEFT, padx=(10, 2)) - self.peak_window_var = tk.IntVar(value=5) - self.peak_window_spinbox = ttk.Spinbox( - inner, from_=3, to=15, width=3, - textvariable=self.peak_window_var, - command=self._update_scan_config - ) - self.peak_window_spinbox.pack(side=tk.LEFT) - - # Peak drop threshold - ttk.Label(inner, text="Drop:").pack(side=tk.LEFT, padx=(10, 2)) - self.peak_drop_var = tk.DoubleVar(value=0.02) - self.peak_drop_spinbox = ttk.Spinbox( - inner, from_=0.01, to=0.1, increment=0.01, width=5, - textvariable=self.peak_drop_var, - command=self._update_scan_config - ) - self.peak_drop_spinbox.pack(side=tk.LEFT) - - # Separator - ttk.Separator(inner, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=10) - - # Comparison method dropdown - ttk.Label(inner, text="Method:").pack(side=tk.LEFT, padx=(0, 2)) - self.comparison_method_var = tk.StringVar(value='template') - method_combo = ttk.Combobox( - inner, textvariable=self.comparison_method_var, - values=['template', 'ssim', 'mse', 'hst', 'phase'], - width=8, state='readonly' - ) - method_combo.pack(side=tk.LEFT) - method_combo.bind('<>', lambda e: self._update_scan_config()) - - # Status indicators on right + # Row/Direction status on right status_frame = ttk.Frame(inner) status_frame.pack(side=tk.RIGHT) - ttk.Label(status_frame, text="Offset:").pack(side=tk.LEFT) - self.offset_label = ttk.Label(status_frame, text="--", width=5, font=('Arial', 9)) - self.offset_label.pack(side=tk.LEFT) + ttk.Label(status_frame, text="Row:").pack(side=tk.LEFT) + self.row_label = ttk.Label(status_frame, text="--", width=5, font=('Arial', 9)) + self.row_label.pack(side=tk.LEFT) - ttk.Label(status_frame, text="Peak:").pack(side=tk.LEFT, padx=(8, 0)) - self.peak_label = ttk.Label(status_frame, text="--", width=5, font=('Arial', 9)) - self.peak_label.pack(side=tk.LEFT) + ttk.Label(status_frame, text="Dir:").pack(side=tk.LEFT, padx=(8, 0)) + self.direction_label = ttk.Label(status_frame, text="--", width=6, font=('Arial', 9)) + self.direction_label.pack(side=tk.LEFT) def _build_row3_movement(self, parent): """Row 3: Movement controls for all axes""" @@ -451,364 +466,193 @@ class AppGUI: command=self._send_custom_command).pack(side=tk.RIGHT, padx=(5, 0)) # ========================================================================= - # Comparison Overlay Drawing + # Scanner Mode Switching # ========================================================================= - def _draw_comparison_overlay(self, frame: np.ndarray) -> np.ndarray: - """ - Draw edge comparison overlay directly on camera frame. - Shows reference, current, and interpolated binary images. - """ - if not self.scanner or not self.show_comparison_var.get(): - return frame + def _on_mode_change(self): + """Handle scanner mode change""" + self.scanner_mode = self.scanner_mode_var.get() + self.log_message(f"Scanner mode: {self.scanner_mode}") + + def _on_resolution_change(self, event=None): + """Handle camera resolution change""" + selected_label = self.resolution_combo.get() - state = self.scanner.get_comparison_state() + # Find mode name from label + mode_labels = self.camera.get_mode_labels() + mode_name = None + for name, label in mode_labels.items(): + if label == selected_label: + mode_name = name + break - # Show overlay if tracking OR if paused with valid reference image - if not state.is_tracking and not (self.scanner.paused and state.reference_image is not None): - return frame + if mode_name is None: + self.log_message(f"Unknown resolution: {selected_label}") + return - h, w = frame.shape[:2] - border_pct = self.scan_config.border_percentage + self.log_message(f"Changing resolution to: {selected_label}") - # Map raw edges to display positions after 90° CW rotation - edge_to_display = { - 'top': 'right', - 'bottom': 'left', - 'left': 'top', - 'right': 'bottom' - } + # Stop any active scanning + if self.stitch_scanner and self.stitch_scanner.running: + self.stitch_scanner.stop() + self._scan_finished() - ref_display_edge = edge_to_display.get(state.reference_edge, '') - target_display_edge = edge_to_display.get(state.target_edge, '') + # Change resolution + actual_w, actual_h, actual_fps = self.camera.set_mode(mode_name) + self.log_message(f"Resolution set: {actual_w}x{actual_h} @ {actual_fps:.1f}fps") + self._update_cam_res_label() + + def _update_cam_res_label(self): + """Update camera resolution display""" + w, h = self.camera.get_resolution() + fps = self.camera.get_fps() + self.cam_res_label.config(text=f"{w}x{h} @ {fps:.1f}fps") + + def _update_stitch_config(self): + """Update stitching scanner config from GUI values""" + self.stitch_config.displacement_threshold = self.disp_threshold_var.get() + self.stitch_config.rows = self.num_rows_var.get() + self.stitch_config.row_overlap = self.row_overlap_var.get() + self.stitch_config.scan_speed_index = self.scan_speed_var.get() + self.stitch_config.autofocus_every_row = self.af_every_row_var.get() - # Draw reference image (cyan) - if state.reference_image is not None: - frame = self._draw_binary_at_edge( - frame, state.reference_image, ref_display_edge, - border_pct, color=(0, 255, 255), label="REF" - ) - - # Draw current/interpolated image (yellow) - # Prefer interpolated if available, otherwise current - display_image = state.interpolated_image if state.interpolated_image is not None else state.current_image - if display_image is not None: - label = "INT" if state.interpolated_image is not None else "CUR" - frame = self._draw_binary_at_edge( - frame, display_image, target_display_edge, - border_pct, color=(255, 255, 0), label=label - ) - - # Draw status box with extended info - frame = self._draw_comparison_status_box(frame, state) - - # Draw similarity history graph if available - if state.similarity_history and len(state.similarity_history) > 1: - frame = self._draw_similarity_graph(frame, state.similarity_history, state.threshold) - + # Reinitialize stitching scanner with new config + self.stitch_scanner = StitchingScanner( + camera=self.camera, + motion_controller=self.motion, + autofocus_controller=self.autofocus, + config=self.stitch_config, + on_log=self.log_message, + on_progress=self._on_stitch_progress, + on_mosaic_updated=self._on_mosaic_updated + ) + + # ========================================================================= + # Overlay Drawing + # ========================================================================= + + def _draw_overlays(self, frame): + """Draw overlays on camera frame""" + frame = self._draw_crosshair(frame) + if self.show_tile_overlay_var.get(): + frame = self._draw_threshold_box(frame) + if self.show_displacement_var.get(): + frame = self._draw_displacement_overlay(frame) return frame - def _draw_binary_at_edge(self, frame: np.ndarray, binary_img: np.ndarray, - display_edge: str, border_pct: float, - color: tuple, label: str) -> np.ndarray: - """Draw a binary image overlaid at the specified display edge.""" + def _draw_crosshair(self, frame, color=(0, 0, 255), thickness=1, size=40): h, w = frame.shape[:2] - - # Convert binary to BGR and apply color tint - binary_bgr = cv2.cvtColor(binary_img, cv2.COLOR_GRAY2BGR) - mask = binary_img > 127 - binary_bgr[mask] = color - - # Rotate binary to match 90° CW display rotation - binary_bgr = cv2.rotate(binary_bgr, cv2.ROTATE_90_CLOCKWISE) - - alpha = 0.7 - - if display_edge == 'left': - border_w = int(w * border_pct) - resized = cv2.resize(binary_bgr, (border_w, h)) - frame[:, :border_w] = cv2.addWeighted( - frame[:, :border_w], 1 - alpha, resized, alpha, 0 - ) - cv2.putText(frame, label, (5, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) - - elif display_edge == 'right': - border_w = int(w * border_pct) - resized = cv2.resize(binary_bgr, (border_w, h)) - frame[:, w-border_w:] = cv2.addWeighted( - frame[:, w-border_w:], 1 - alpha, resized, alpha, 0 - ) - cv2.putText(frame, label, (w - border_w + 5, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) - - elif display_edge == 'top': - border_h = int(h * border_pct) - resized = cv2.resize(binary_bgr, (w, border_h)) - frame[:border_h, :] = cv2.addWeighted( - frame[:border_h, :], 1 - alpha, resized, alpha, 0 - ) - cv2.putText(frame, label, (5, border_h - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) - - elif display_edge == 'bottom': - border_h = int(h * border_pct) - resized = cv2.resize(binary_bgr, (w, border_h)) - frame[h-border_h:, :] = cv2.addWeighted( - frame[h-border_h:, :], 1 - alpha, resized, alpha, 0 - ) - cv2.putText(frame, label, (5, h - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) - + cx, cy = w // 2, h // 2 + cv2.line(frame, (cx - size, cy), (cx + size, cy), color, thickness) + cv2.line(frame, (cx, cy - size), (cx, cy + size), color, thickness) return frame - def _draw_comparison_status_box(self, frame: np.ndarray, state) -> np.ndarray: - """Draw a status box showing similarity, offset, and match status""" + def _draw_threshold_box(self, frame, color=(0, 255, 0), thickness=2): + """Draw box showing 10% threshold region""" h, w = frame.shape[:2] - - # Position: top center - box_w, box_h = 240, 90 - box_x = (w - box_w) // 2 - box_y = 10 - - # Semi-transparent background - overlay = frame.copy() - cv2.rectangle(overlay, (box_x, box_y), (box_x + box_w, box_y + box_h), (0, 0, 0), -1) - frame = cv2.addWeighted(overlay, 0.6, frame, 0.4, 0) - - # Text positioning - tx = box_x + 10 - ty = box_y + 18 - line_h = 18 - - # Direction info - cv2.putText(frame, f"Dir: {state.direction}", (tx, ty), - cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 1) - ty += line_h - - # Similarity with color coding - sim = state.similarity - threshold = state.threshold - is_paused = self.scanner and self.scanner.paused - - if state.shift_detected or sim >= threshold: - sim_color = (0, 255, 0) # Green - elif is_paused: - sim_color = (255, 200, 0) # Cyan - else: - sim_color = (0, 165, 255) # Orange - - cv2.putText(frame, f"Similarity: {sim:.3f}", (tx, ty), - cv2.FONT_HERSHEY_SIMPLEX, 0.45, sim_color, 1) - - # Threshold on same line - cv2.putText(frame, f"(thr: {threshold:.2f})", (tx + 120, ty), - cv2.FONT_HERSHEY_SIMPLEX, 0.4, (150, 150, 150), 1) - ty += line_h - - # Offset info (from interpolation) - offset_text = f"Offset: {state.best_offset:.2f}" if state.best_offset > 0 else "Offset: --" - cv2.putText(frame, offset_text, (tx, ty), - cv2.FONT_HERSHEY_SIMPLEX, 0.45, (200, 200, 200), 1) - ty += line_h - - # Status - if state.shift_detected: - if state.peak_detected: - status = "PEAK MATCHED!" - else: - status = "MATCHED!" - status_color = (0, 255, 0) - elif sim >= threshold: - status = "ABOVE THRESHOLD" - status_color = (0, 255, 0) - elif is_paused: - status = "PAUSED" - status_color = (255, 200, 0) - else: - status = "Searching..." - status_color = (200, 200, 200) - - cv2.putText(frame, status, (tx, ty), - cv2.FONT_HERSHEY_SIMPLEX, 0.45, status_color, 1) - - return frame - - def _draw_similarity_graph(self, frame: np.ndarray, history: list, threshold: float) -> np.ndarray: - """Draw a small similarity history graph in the corner""" - h, w = frame.shape[:2] - - # Graph dimensions and position (bottom right) - graph_w, graph_h = 150, 60 - margin = 10 - gx = w - graph_w - margin - gy = h - graph_h - margin - 50 # Above the bottom edge region - - # Semi-transparent background - overlay = frame.copy() - cv2.rectangle(overlay, (gx, gy), (gx + graph_w, gy + graph_h), (0, 0, 0), -1) - frame = cv2.addWeighted(overlay, 0.5, frame, 0.5, 0) - - # Draw threshold line - thr_y = int(gy + graph_h - (threshold * graph_h)) - cv2.line(frame, (gx, thr_y), (gx + graph_w, thr_y), (0, 100, 255), 1) - - # Draw history line - if len(history) > 1: - # Take last N points that fit in the graph - max_points = graph_w // 2 - plot_history = history[-max_points:] if len(history) > max_points else history - - points = [] - for i, val in enumerate(plot_history): - px = gx + int((i / max(len(plot_history) - 1, 1)) * (graph_w - 1)) - py = int(gy + graph_h - (min(val, 1.0) * graph_h)) - points.append((px, py)) - - # Draw the line - for i in range(1, len(points)): - color = (0, 255, 0) if plot_history[i] >= threshold else (0, 165, 255) - cv2.line(frame, points[i-1], points[i], color, 2) + threshold = self.stitch_config.displacement_threshold + bh, bw = int(h * threshold), int(w * threshold) + cv2.rectangle(frame, (bw, bh), (w - bw, h - bh), color, thickness) # Label - cv2.putText(frame, "History", (gx + 5, gy + 12), - cv2.FONT_HERSHEY_SIMPLEX, 0.35, (200, 200, 200), 1) - - # Max value in history - if history: - max_sim = max(history) - cv2.putText(frame, f"max:{max_sim:.2f}", (gx + graph_w - 55, gy + 12), - cv2.FONT_HERSHEY_SIMPLEX, 0.3, (150, 255, 150), 1) - + cv2.putText(frame, f"{threshold:.0%}", (bw + 5, bh + 20), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1) return frame - def _on_threshold_change(self, value): - """Handle threshold slider change""" - val = float(value) - self.threshold_label.config(text=f"{val:.2f}") - if self.scanner: - self.scanner.config.similarity_threshold = val - - def _update_similarity_display(self): - """Update the similarity labels in the UI""" - if not self.scanner: - return + def _draw_displacement_overlay(self, frame): + """Draw displacement indicator when scanning""" + if not self.stitch_scanner: + return frame - state = self.scanner.get_comparison_state() + state = self.stitch_scanner.get_state() + if not state.is_scanning: + return frame - # Show status if tracking OR if paused with valid data - if state.is_tracking or (self.scanner.paused and state.reference_image is not None): - sim = state.similarity - self.similarity_label.config(text=f"{sim:.3f}") - - # Update offset and peak labels - self.offset_label.config(text=f"{state.best_offset:.2f}") - if state.similarity_history: - max_sim = max(state.similarity_history) - self.peak_label.config(text=f"{max_sim:.2f}") - - if state.shift_detected or sim >= state.threshold: - self.similarity_label.config(foreground='green') - status_text = "PEAK!" if state.peak_detected else "MATCH!" - self.match_status_label.config(text=status_text, foreground='green') - elif self.scanner.paused: - self.similarity_label.config(foreground='blue') - self.match_status_label.config(text="PAUSED", foreground='blue') - else: - self.similarity_label.config(foreground='orange') - self.match_status_label.config(text="Searching", foreground='gray') - else: - self.similarity_label.config(text="--", foreground='black') - self.match_status_label.config(text="", foreground='black') - self.offset_label.config(text="--") - self.peak_label.config(text="--") - - def _test_edge_comparison(self): - """Test edge detection on current frame""" - if not self.scanner: - self.log_message("Scanner not initialized") - return + h, w = frame.shape[:2] + threshold = self.stitch_config.displacement_threshold - try: - results = self.scanner.test_edge_comparison() - - self.log_message("Edge test results (raw frame coordinates):") - for edge in ['left', 'right', 'top', 'bottom']: - r = results[edge] - self.log_message(f" {edge.upper()}: {r['shape']}, white: {r['white_ratio']:.1%}") - - self.log_message(f" X-axis (top↔bottom): {results['x_axis_sim']:.3f}") - self.log_message(f" Y-axis (left↔right): {results['y_axis_sim']:.3f}") - - except Exception as e: - self.log_message(f"Edge test error: {e}") - - def _test_interpolation(self): - """Test interpolation matching""" - if not self.scanner: - self.log_message("Scanner not initialized") - return + # Calculate fill percentages + threshold_x = w * threshold + threshold_y = h * threshold - try: - self.log_message("Starting interpolation test (5 frames)...") - - def run_test(): - results = self.scanner.test_interpolation(num_frames=5) - self.root.after(0, lambda: self._show_interp_results(results)) - - threading.Thread(target=run_test, daemon=True).start() - - except Exception as e: - self.log_message(f"Interpolation test error: {e}") - - def _show_interp_results(self, results): - """Display interpolation test results""" - self.log_message("Interpolation test results:") - for fr in results['frames']: - self.log_message(f" Frame {fr['frame_idx']}: " - f"direct={fr['direct_similarity']:.3f}, " - f"interp={fr['interpolated_similarity']:.3f}, " - f"offset={fr['best_offset']:.2f}") + fill_x = min(abs(state.cumulative_x) / threshold_x, 1.0) if threshold_x > 0 else 0 + fill_y = min(abs(state.cumulative_y) / threshold_y, 1.0) if threshold_y > 0 else 0 + + # Draw horizontal progress bar at top + bar_h = 20 + bar_w = int(w * 0.8) + bar_x = (w - bar_w) // 2 + bar_y = 10 + + # Background + cv2.rectangle(frame, (bar_x, bar_y), (bar_x + bar_w, bar_y + bar_h), (50, 50, 50), -1) + # Fill + fill_w = int(bar_w * fill_x) + color = (0, 255, 0) if fill_x >= 1.0 else (0, 165, 255) + cv2.rectangle(frame, (bar_x, bar_y), (bar_x + fill_w, bar_y + bar_h), color, -1) + # Border + cv2.rectangle(frame, (bar_x, bar_y), (bar_x + bar_w, bar_y + bar_h), (200, 200, 200), 1) + + # Text + text = f"X: {state.cumulative_x:.1f}px ({fill_x:.0%})" + cv2.putText(frame, text, (bar_x + 5, bar_y + 15), + cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 1) + + # Direction indicator + dir_text = f"Dir: {state.direction}" + cv2.putText(frame, dir_text, (bar_x + bar_w - 80, bar_y + 15), + cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 1) + + return frame # ========================================================================= # Scanner Handlers # ========================================================================= - def _update_scan_config(self): - """Update scanner config from GUI values""" - self.scan_config.similarity_threshold = self.threshold_var.get() - self.scan_config.use_interpolation = self.use_interpolation_var.get() - self.scan_config.num_interpolations = self.num_interp_var.get() - self.scan_config.use_peak_detection = self.use_peak_detection_var.get() - self.scan_config.peak_window_size = self.peak_window_var.get() - self.scan_config.peak_drop_threshold = self.peak_drop_var.get() - self.scan_config.comparison_method = self.comparison_method_var.get() - - # Reinitialize scanner with new config - self._init_scanner() - def _start_scan(self): - self._update_scan_config() - if self.scanner.start(): - self.scan_start_btn.config(state='disabled', bg='#888888') - self.scan_pause_btn.config(state='normal') - self.scan_stop_btn.config(state='normal') - self.scan_status_label.config(text="Scanning") - self.scan_progress_bar.start(10) + """Start the appropriate scanner based on mode""" + if self.scanner_mode == 'stitch': + self._update_stitch_config() + if self.stitch_scanner.start(): + self._scan_started() + else: + # Tile mode + if self.scanner.start(): + self._scan_started() + + def _scan_started(self): + """Update UI when scan starts""" + self.scan_start_btn.config(state='disabled', bg='#888888') + self.scan_pause_btn.config(state='normal') + self.scan_stop_btn.config(state='normal') + self.scan_status_label.config(text="Scanning") + self.scan_progress_bar.start(10) def _pause_scan(self): - if self.scanner.paused: - self.scanner.resume() + """Pause/resume the scan""" + scanner = self.stitch_scanner if self.scanner_mode == 'stitch' else self.scanner + + if scanner.paused: + scanner.resume() self.scan_pause_btn.config(text="⏸", bg='#2196F3') self.scan_status_label.config(text="Scanning") self.scan_progress_bar.start(10) else: - self.scanner.pause() + scanner.pause() self.scan_pause_btn.config(text="▶", bg='#4CAF50') self.scan_status_label.config(text="Paused") self.scan_progress_bar.stop() def _stop_scan(self): - self.scanner.stop() + """Stop the scan""" + if self.scanner_mode == 'stitch': + self.stitch_scanner.stop() + else: + self.scanner.stop() self._scan_finished() def _scan_finished(self): + """Update UI when scan finishes""" self.scan_start_btn.config(state='normal', bg='#4CAF50') self.scan_pause_btn.config(state='disabled', text="⏸", bg='#2196F3') self.scan_stop_btn.config(state='disabled') @@ -816,88 +660,46 @@ class AppGUI: self.scan_progress_bar.stop() def _update_progress(self, current: int, total: int): + """Update tile scanner progress""" if total > 0: self.scan_progress_label.config(text=f"{current}/{total}") else: self.scan_progress_label.config(text=f"{current} tiles") - def _capture_single_tile(self): - if self.scanner: - self.scanner.capture_tile() - self._update_mosaic_window() + def _update_stitch_progress(self, appends: int): + """Update stitching scanner progress""" + self.scan_progress_label.config(text=f"{appends} appends") + + def _test_displacement(self): + """Test displacement detection""" + if not self.stitch_scanner: + return + + self.log_message("Testing displacement (10 frames)...") + + def run_test(): + results = self.stitch_scanner.test_displacement(num_frames=10) + self.root.after(0, lambda: self._show_displacement_results(results)) + + threading.Thread(target=run_test, daemon=True).start() + + def _show_displacement_results(self, results): + """Display displacement test results""" + self.log_message(f"Total displacement: X={results['total_dx']:.1f}, Y={results['total_dy']:.1f}") + for fr in results['frames'][:5]: # Show first 5 + self.log_message(f" Frame {fr['frame']}: dx={fr['dx']:.2f}, dy={fr['dy']:.2f}") def _emergency_stop(self): + """Emergency stop everything""" self.motion.stop_all() + if self.stitch_scanner and self.stitch_scanner.running: + self.stitch_scanner.stop() if self.scanner and self.scanner.running: self.scanner.stop() - self._scan_finished() + self._scan_finished() if self.autofocus.is_running(): self.autofocus.stop() - # ========================================================================= - # Mosaic Window - # ========================================================================= - - def _show_mosaic_window(self): - if self.mosaic_window is not None: - self.mosaic_window.lift() - self._update_mosaic_window() - return - - self.mosaic_window = tk.Toplevel(self.root) - self.mosaic_window.title("Mosaic") - self.mosaic_window.geometry("600x800") - self.mosaic_window.protocol("WM_DELETE_WINDOW", self._close_mosaic_window) - - self.mosaic_label = ttk.Label(self.mosaic_window, text="No tiles captured") - self.mosaic_label.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) - - btn_frame = ttk.Frame(self.mosaic_window) - btn_frame.pack(fill=tk.X, padx=10, pady=(0, 10)) - - ttk.Button(btn_frame, text="Save Full Resolution", - command=self._save_mosaic).pack(side=tk.LEFT, padx=5) - ttk.Button(btn_frame, text="Refresh", - command=self._update_mosaic_window).pack(side=tk.LEFT, padx=5) - - self._update_mosaic_window() - - def _update_mosaic_window(self): - if self.mosaic_window is None or not self.scanner or not self.scanner.tiles: - return - - mosaic = self.scanner.get_mosaic_preview(max_size=580) - if mosaic is None: - return - - mosaic_rgb = cv2.cvtColor(mosaic, cv2.COLOR_BGR2RGB) - img = Image.fromarray(mosaic_rgb) - imgtk = ImageTk.PhotoImage(image=img) - - self.mosaic_label.imgtk = imgtk - self.mosaic_label.config(image=imgtk, text="") - - def _close_mosaic_window(self): - if self.mosaic_window: - self.mosaic_window.destroy() - self.mosaic_window = None - - def _save_mosaic(self): - if not self.scanner or not self.scanner.tiles: - self.log_message("No tiles to save") - return - - from tkinter import filedialog - filename = filedialog.asksaveasfilename( - defaultextension=".png", - filetypes=[("PNG", "*.png"), ("JPEG", "*.jpg"), ("All", "*.*")] - ) - if filename: - mosaic = self.scanner.build_mosaic(scale=1.0) - if mosaic is not None: - cv2.imwrite(filename, mosaic) - self.log_message(f"Saved: {filename}") - # ========================================================================= # Movement Handlers # ========================================================================= @@ -952,6 +754,93 @@ class AppGUI: self.motion.send_command(cmd) self.cmd_entry.delete(0, tk.END) + # ========================================================================= + # Mosaic Window + # ========================================================================= + + def _show_mosaic_window(self): + if self.mosaic_window is not None: + self.mosaic_window.lift() + self._update_mosaic_window() + return + + self.mosaic_window = tk.Toplevel(self.root) + self.mosaic_window.title("Mosaic") + self.mosaic_window.geometry("600x800") + self.mosaic_window.protocol("WM_DELETE_WINDOW", self._close_mosaic_window) + + self.mosaic_label = ttk.Label(self.mosaic_window, text="No mosaic yet") + self.mosaic_label.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) + + btn_frame = ttk.Frame(self.mosaic_window) + btn_frame.pack(fill=tk.X, padx=10, pady=(0, 10)) + + ttk.Button(btn_frame, text="Save Full Resolution", + command=self._save_mosaic).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_frame, text="Refresh", + command=self._update_mosaic_window).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_frame, text="Clear", + command=self._clear_mosaic).pack(side=tk.LEFT, padx=5) + + self._update_mosaic_window() + + def _update_mosaic_window(self): + if self.mosaic_window is None: + return + + # Get mosaic from appropriate scanner + mosaic = None + if self.scanner_mode == 'stitch' and self.stitch_scanner: + mosaic = self.stitch_scanner.get_mosaic_preview(max_size=580) + elif self.scanner and self.scanner.tiles: + mosaic = self.scanner.get_mosaic_preview(max_size=580) + + if mosaic is None: + return + + mosaic_rgb = cv2.cvtColor(mosaic, cv2.COLOR_BGR2RGB) + img = Image.fromarray(mosaic_rgb) + imgtk = ImageTk.PhotoImage(image=img) + + self.mosaic_label.imgtk = imgtk + self.mosaic_label.config(image=imgtk, text="") + + # Update size label + if self.stitch_scanner: + state = self.stitch_scanner.get_state() + self.mosaic_size_label.config(text=f"{state.mosaic_width}x{state.mosaic_height}") + + def _close_mosaic_window(self): + if self.mosaic_window: + self.mosaic_window.destroy() + self.mosaic_window = None + + def _save_mosaic(self): + filename = filedialog.asksaveasfilename( + defaultextension=".png", + filetypes=[("PNG", "*.png"), ("JPEG", "*.jpg"), ("All", "*.*")] + ) + if not filename: + return + + if self.scanner_mode == 'stitch' and self.stitch_scanner: + if self.stitch_scanner.save_mosaic(filename): + self.log_message(f"Saved: {filename}") + elif self.scanner: + mosaic = self.scanner.build_mosaic(scale=1.0) + if mosaic is not None: + cv2.imwrite(filename, mosaic) + self.log_message(f"Saved: {filename}") + + def _clear_mosaic(self): + """Clear the current mosaic""" + if self.stitch_scanner: + self.stitch_scanner.mosaic = None + self.stitch_scanner.state.mosaic_width = 0 + self.stitch_scanner.state.mosaic_height = 0 + self._update_mosaic_window() + self.log_message("Mosaic cleared") + # ========================================================================= # Logging # ========================================================================= @@ -1003,12 +892,9 @@ class AppGUI: score = self.autofocus.get_focus_score() self.focus_score_label.config(text=f"Focus: {score:.1f}") - # Apply standard overlays + # Apply overlays frame = self._draw_overlays(frame) - # Apply comparison overlay (shows binary edges during tracking) - frame = self._draw_comparison_overlay(frame) - frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Scale to fit @@ -1028,35 +914,29 @@ class AppGUI: except Exception as e: print(f"Camera error: {e}") - def _draw_overlays(self, frame): - frame = self._draw_crosshair(frame) - if self.show_tile_overlay_var.get(): - frame = self._draw_tile_boundary(frame) - if self.show_edge_regions_var.get(): - frame = self._draw_edge_regions(frame) - return frame - - def _draw_crosshair(self, frame, color=(0, 0, 255), thickness=1, size=40): - h, w = frame.shape[:2] - cx, cy = w // 2, h // 2 - cv2.line(frame, (cx - size, cy), (cx + size, cy), color, thickness) - cv2.line(frame, (cx, cy - size), (cx, cy + size), color, thickness) - return frame - - def _draw_tile_boundary(self, frame, color=(0, 255, 0), thickness=2): - h, w = frame.shape[:2] - bh, bw = int(h * 0.10), int(w * 0.10) - cv2.rectangle(frame, (bw, bh), (w - bw, h - bh), color, thickness) - return frame - - def _draw_edge_regions(self, frame, color=(255, 255, 0), thickness=1): - h, w = frame.shape[:2] - bh, bw = int(h * 0.10), int(w * 0.10) - cv2.rectangle(frame, (0, 0), (bw, h), color, thickness) # Left - cv2.rectangle(frame, (w - bw, 0), (w, h), color, thickness) # Right - cv2.rectangle(frame, (0, 0), (w, bh), color, thickness) # Top - cv2.rectangle(frame, (0, h - bh), (w, h), color, thickness) # Bottom - return frame + def _update_stitch_state_display(self): + """Update stitch scanner state in UI""" + if not self.stitch_scanner: + return + + state = self.stitch_scanner.get_state() + + # Update displacement label + self.displacement_label.config( + text=f"X: {state.cumulative_x:.1f} Y: {state.cumulative_y:.1f}" + ) + + # Update row/direction + if state.is_scanning: + self.row_label.config(text=f"{state.current_row + 1}/{state.total_rows}") + self.direction_label.config(text=state.direction) + else: + self.row_label.config(text="--") + self.direction_label.config(text="--") + + # Check if scan finished + if not state.is_scanning and self.scan_start_btn['state'] == 'disabled': + self._scan_finished() # ========================================================================= # Main Loop @@ -1068,14 +948,11 @@ class AppGUI: self.update_camera() self._process_serial_queue() self._process_tile_queue() - self._update_similarity_display() + self._update_stitch_state_display() if not self.autofocus.is_running(): self.af_button.config(state='normal') - if self.scanner and not self.scanner.running and self.scan_start_btn['state'] == 'disabled': - self._scan_finished() - self.root.after(33, update) update() @@ -1085,6 +962,8 @@ class AppGUI: self.running = False if self.scanner: self.scanner.stop() + if self.stitch_scanner: + self.stitch_scanner.stop() self.autofocus.stop() self._close_mosaic_window() self.root.destroy() \ No newline at end of file diff --git a/src/stitching_scanner.py b/src/stitching_scanner.py new file mode 100644 index 0000000..d47f9a3 --- /dev/null +++ b/src/stitching_scanner.py @@ -0,0 +1,734 @@ +""" +Stitching Scanner v2 - Fixed displacement tracking + +Key fix: Track displacement since last APPEND, not just cumulative. +The strip width must match actual movement since we last added to the mosaic. +""" + +import cv2 +import numpy as np +import time +import threading +from dataclasses import dataclass, field +from typing import List, Optional, Callable, Tuple +from enum import Enum + + +class ScanDirection(Enum): + """Scan direction constants""" + RIGHT = 'right' # X+ (E command) + LEFT = 'left' # X- (W command) + DOWN = 'down' # Y- (N command) + UP = 'up' # Y+ (S command) + + +@dataclass +class StitchConfig: + """Stitching scanner configuration""" + # Displacement threshold (percentage of frame size) + displacement_threshold: float = 0.10 # 10% of frame dimension + + # Movement timing + movement_interval: float = 0.001 # Seconds of motor on time + frame_interval: float = 0.25 # Seconds between frame captures (settle time) + settle_time: float = 0.5 # Seconds to wait after stopping + max_scan_time: float = 2400.0 # Safety timeout (5 minutes) + + # Scan pattern + rows: int = 3 + row_overlap: float = 0.15 + + # Speed setting for scanning + scan_speed_index: int = 3 + + # Focus + autofocus_every_row: bool = True + + # Memory management + max_mosaic_width: int = 11000 + max_mosaic_height: int = 11000 +# 11000, 24500, 450000 + +@dataclass +class StitchState: + """Current state for visualization""" + is_scanning: bool = False + direction: str = '' + + # Displacement tracking + cumulative_x: float = 0.0 + cumulative_y: float = 0.0 + last_displacement: Tuple[float, float] = (0.0, 0.0) + + # Progress + current_row: int = 0 + total_rows: int = 0 + + # Mosaic size + mosaic_width: int = 0 + mosaic_height: int = 0 + + # Debug + frame_count: int = 0 + append_count: int = 0 + + +class StitchingScanner: + """ + Slide scanner using continuous stitching with correct displacement tracking. + + Key insight: We must track displacement since the LAST APPEND, and the + strip we append must exactly match that displacement. + """ + + def __init__(self, camera, motion_controller, autofocus_controller=None, + config: StitchConfig = None, + on_log: Callable[[str], None] = None, + on_progress: Callable[[int, int], None] = None, + on_mosaic_updated: Callable[[], None] = None): + self.camera = camera + self.motion = motion_controller + self.autofocus = autofocus_controller + self.config = config or StitchConfig() + + # Callbacks + self.on_log = on_log + self.on_progress = on_progress + self.on_mosaic_updated = on_mosaic_updated + + # State + self.running = False + self.paused = False + self.state = StitchState() + self._state_lock = threading.Lock() + + # Mosaic data + self.mosaic: Optional[np.ndarray] = None + self._mosaic_lock = threading.Lock() + + # Frame tracking - KEY CHANGE: separate reference for displacement calc vs append + self._prev_frame: Optional[np.ndarray] = None # For frame-to-frame displacement + self._append_ref_frame: Optional[np.ndarray] = None # Reference from last append + self._displacement_since_append_x: float = 0.0 # Accumulated since last append + self._displacement_since_append_y: float = 0.0 + + # Thread + self._thread: Optional[threading.Thread] = None + + def log(self, message: str): + """Log a message""" + if self.on_log: + self.on_log(f"[Stitch] {message}") + print(f"[Stitch] {message}") + + # ========================================================================= + # Displacement Detection + # ========================================================================= + + def _to_grayscale(self, frame: np.ndarray) -> np.ndarray: + """Convert frame to grayscale""" + if len(frame.shape) == 3: + return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + return frame + + def _detect_displacement(self, prev_frame: np.ndarray, + curr_frame: np.ndarray) -> Tuple[float, float]: + """ + Detect displacement between two frames using phase correlation. + Returns (dx, dy) in pixels. + """ + prev_gray = self._to_grayscale(prev_frame) + curr_gray = self._to_grayscale(curr_frame) + + if prev_gray.shape != curr_gray.shape: + return (0.0, 0.0) + + prev_f = prev_gray.astype(np.float32) + curr_f = curr_gray.astype(np.float32) + + # Apply window function to reduce edge effects + h, w = prev_gray.shape + window = cv2.createHanningWindow((w, h), cv2.CV_32F) + prev_f = prev_f * window + curr_f = curr_f * window + + shift, response = cv2.phaseCorrelate(prev_f, curr_f) + dx, dy = shift + + return (dx, dy) + + def _detect_displacement_robust(self, prev_frame: np.ndarray, + curr_frame: np.ndarray) -> Tuple[float, float]: + """Displacement detection with sanity checks""" + dx, dy = self._detect_displacement(prev_frame, curr_frame) + + h, w = prev_frame.shape[:2] + max_displacement = max(w, h) * 0.5 + + if abs(dx) > max_displacement or abs(dy) > max_displacement: + self.log(f"Warning: Large displacement ({dx:.1f}, {dy:.1f}), ignoring") + return (0.0, 0.0) + + return (dx, dy) + + # ========================================================================= + # Mosaic Building - FIXED VERSION + # ========================================================================= + + def _init_mosaic(self, frame: np.ndarray): + """Initialize mosaic with first frame""" + with self._mosaic_lock: + self.mosaic = frame.copy() + + # Set reference frames + self._prev_frame = frame.copy() + self._append_ref_frame = frame.copy() + self._displacement_since_append_x = 0.0 + self._displacement_since_append_y = 0.0 + + with self._state_lock: + h, w = frame.shape[:2] + self.state.mosaic_width = w + self.state.mosaic_height = h + self.state.frame_count = 1 + self.state.append_count = 0 + + self.log(f"Initialized mosaic: {frame.shape[1]}x{frame.shape[0]}") + + def _blend_strips_horizontal(self, base: np.ndarray, strip: np.ndarray, + blend_width: int, append_right: bool) -> np.ndarray: + """Blend strip onto base with gradient at seam to hide discontinuities.""" + if blend_width <= 0 or blend_width >= strip.shape[1]: + if append_right: + return np.hstack([base, strip]) + else: + return np.hstack([strip, base]) + + h_base, w_base = base.shape[:2] + h_strip, w_strip = strip.shape[:2] + + if h_strip != h_base: + # Height mismatch - can't blend properly + if append_right: + return np.hstack([base, strip]) + return np.hstack([strip, base]) + + blend_w = min(blend_width, w_strip, w_base) + + if append_right: + # base | blend_zone | rest_of_strip + result_width = w_base + w_strip - blend_w + result = np.zeros((h_base, result_width, 3), dtype=np.uint8) + + # Copy base + result[:, :w_base] = base + + # Create gradient: 1->0 for base weight + alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis] + + base_overlap = base[:, -blend_w:].astype(np.float32) + strip_overlap = strip[:, :blend_w].astype(np.float32) + blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8) + + result[:, w_base - blend_w:w_base] = blended + result[:, w_base:] = strip[:, blend_w:] + + return result + else: + # rest_of_strip | blend_zone | base + result_width = w_base + w_strip - blend_w + result = np.zeros((h_base, result_width, 3), dtype=np.uint8) + + result[:, :w_strip] = strip + + alpha = np.linspace(0, 1, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis] + + strip_overlap = strip[:, -blend_w:].astype(np.float32) + base_overlap = base[:, :blend_w].astype(np.float32) + blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8) + + result[:, w_strip - blend_w:w_strip] = blended + result[:, w_strip:] = base[:, blend_w:] + + return result + + def _append_to_mosaic_fixed(self, frame: np.ndarray, direction: ScanDirection): + """ + FIXED: Append with blending and fractional pixel preservation. + + Key improvements: + 1. Gradient blending at seams to hide color discontinuities + 2. Preserve fractional pixel remainder to prevent cumulative drift + 3. Small safety margin for alignment tolerance + """ + BLEND_WIDTH = 10 # Pixels to blend at seam + SAFETY_MARGIN = 2 # Extra pixels as tolerance + + with self._mosaic_lock: + if self.mosaic is None: + return + + h, w = frame.shape[:2] + mh, mw = self.mosaic.shape[:2] + + dx = abs(self._displacement_since_append_x) + dy = abs(self._displacement_since_append_y) + + if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]: + # Round and add safety margin + append_width = round(dx) + SAFETY_MARGIN + append_width = min(append_width, w - BLEND_WIDTH - 5) + + if append_width < 1: + return + + # Calculate fractional remainder to preserve + pixels_consumed = append_width - SAFETY_MARGIN + fractional_remainder = dx - pixels_consumed + + if direction == ScanDirection.RIGHT: + # Grab strip with extra for blending + strip_start = max(0, w - append_width - BLEND_WIDTH) + new_strip = frame[:, strip_start:] + self.mosaic = self._blend_strips_horizontal( + self.mosaic, new_strip, BLEND_WIDTH, append_right=True) + else: + strip_end = min(w, append_width + BLEND_WIDTH) + new_strip = frame[:, :strip_end] + self.mosaic = self._blend_strips_horizontal( + self.mosaic, new_strip, BLEND_WIDTH, append_right=False) + + # KEEP fractional remainder instead of resetting to 0! + self._displacement_since_append_x = fractional_remainder + self._displacement_since_append_y = 0.0 + + elif direction in [ScanDirection.DOWN, ScanDirection.UP]: + append_height = round(dy) + SAFETY_MARGIN + append_height = min(append_height, h - BLEND_WIDTH - 5) + + if append_height < 1: + return + + pixels_consumed = append_height - SAFETY_MARGIN + fractional_remainder = dy - pixels_consumed + + if direction == ScanDirection.DOWN: + strip_start = max(0, h - append_height - BLEND_WIDTH) + new_strip = frame[strip_start:, :] + + # Match widths + if new_strip.shape[1] > mw: + new_strip = new_strip[:, :mw] + elif new_strip.shape[1] < mw: + pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8) + new_strip = np.hstack([new_strip, pad]) + + # Vertical blend + blend_h = min(BLEND_WIDTH, new_strip.shape[0], mh) + alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis] + + base_overlap = self.mosaic[-blend_h:].astype(np.float32) + strip_overlap = new_strip[:blend_h].astype(np.float32) + blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8) + + result_h = mh + new_strip.shape[0] - blend_h + result = np.zeros((result_h, mw, 3), dtype=np.uint8) + result[:mh - blend_h] = self.mosaic[:-blend_h] + result[mh - blend_h:mh] = blended + result[mh:] = new_strip[blend_h:] + self.mosaic = result + else: + strip_end = min(h, append_height + BLEND_WIDTH) + new_strip = frame[:strip_end, :] + + if new_strip.shape[1] > mw: + new_strip = new_strip[:, :mw] + elif new_strip.shape[1] < mw: + pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8) + new_strip = np.hstack([new_strip, pad]) + + self.mosaic = np.vstack([new_strip, self.mosaic]) + + self._displacement_since_append_x = 0.0 + self._displacement_since_append_y = fractional_remainder + + new_mh, new_mw = self.mosaic.shape[:2] + + # Update state + with self._state_lock: + self.state.mosaic_width = new_mw + self.state.mosaic_height = new_mh + self.state.append_count += 1 + + # Update reference frame (fractional remainder already set above - don't reset!) + self._append_ref_frame = frame.copy() + + if self.on_mosaic_updated: + self.on_mosaic_updated() + + def _start_new_row(self, frame: np.ndarray, direction: ScanDirection): + """Start a new row in the mosaic""" + with self._mosaic_lock: + if self.mosaic is None: + self._init_mosaic(frame) + return + + h, w = frame.shape[:2] + mh, mw = self.mosaic.shape[:2] + + # Calculate overlap + overlap_pixels = int(h * self.config.row_overlap) + append_height = h - overlap_pixels + + if direction == ScanDirection.DOWN: + new_strip = frame[overlap_pixels:, :] + + if new_strip.shape[1] < mw: + pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8) + new_strip = np.hstack([new_strip, pad]) + elif new_strip.shape[1] > mw: + new_strip = new_strip[:, :mw] + + self.mosaic = np.vstack([self.mosaic, new_strip]) + else: + new_strip = frame[:append_height, :] + + if new_strip.shape[1] < mw: + pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8) + new_strip = np.hstack([new_strip, pad]) + elif new_strip.shape[1] > mw: + new_strip = new_strip[:, :mw] + + self.mosaic = np.vstack([new_strip, self.mosaic]) + + # Reset all tracking for new row + self._prev_frame = frame.copy() + self._append_ref_frame = frame.copy() + self._displacement_since_append_x = 0.0 + self._displacement_since_append_y = 0.0 + + with self._state_lock: + self.state.mosaic_height = self.mosaic.shape[0] + self.state.mosaic_width = self.mosaic.shape[1] + + self.log(f"New row started, mosaic: {self.mosaic.shape[1]}x{self.mosaic.shape[0]}") + + # ========================================================================= + # Scan Control + # ========================================================================= + + def start(self) -> bool: + """Start the stitching scan""" + if self.running: + self.log("Already running") + return False + + self.running = True + self.paused = False + + with self._state_lock: + self.state = StitchState() + self.state.is_scanning = True + self.state.total_rows = self.config.rows + + with self._mosaic_lock: + self.mosaic = None + + self._prev_frame = None + self._append_ref_frame = None + self._displacement_since_append_x = 0.0 + self._displacement_since_append_y = 0.0 + + self._thread = threading.Thread(target=self._scan_loop, daemon=True) + self._thread.start() + + self.log("Stitching scan started") + return True + + def stop(self): + """Stop the scan""" + self.running = False + self.paused = False + self.motion.stop_all() + + with self._state_lock: + self.state.is_scanning = False + + self.log("Scan stopped") + + def pause(self): + """Pause the scan""" + if self.running and not self.paused: + self.paused = True + self.motion.stop_all() + self.log("Scan paused") + + def resume(self): + """Resume the scan""" + if self.running and self.paused: + self.paused = False + self.log("Scan resumed") + + # ========================================================================= + # Main Scan Loop + # ========================================================================= + + def _scan_loop(self): + """Main scanning loop""" + try: + self.log("Starting scan loop") + + self.motion.set_speed(self.config.scan_speed_index) + time.sleep(0.1) + + frame = self._capture_frame() + self._init_mosaic(frame) + + for row in range(self.config.rows): + if not self.running: + break + + with self._state_lock: + self.state.current_row = row + + self.log(f"=== Row {row + 1}/{self.config.rows} ===") + + # Serpentine pattern + if row % 2 == 0: + h_direction = ScanDirection.RIGHT + else: + h_direction = ScanDirection.LEFT + + self._scan_horizontal(h_direction) + + if not self.running: + break + + if row < self.config.rows - 1: + self._move_to_next_row() + + self.log("Scan complete!") + + except Exception as e: + self.log(f"Scan error: {e}") + import traceback + traceback.print_exc() + finally: + self.running = False + self.motion.stop_all() + with self._state_lock: + self.state.is_scanning = False + + def _scan_horizontal(self, direction: ScanDirection): + """Scan horizontally with fixed displacement tracking""" + self.log(f"Scanning {direction.value}...") + + with self._state_lock: + self.state.direction = direction.value + + frame = self._capture_frame() + h, w = frame.shape[:2] + threshold_pixels = w * self.config.displacement_threshold + + # Initialize tracking + self._prev_frame = frame.copy() + self._append_ref_frame = frame.copy() + self._displacement_since_append_x = 0.0 + self._displacement_since_append_y = 0.0 + + start_time = time.time() + no_movement_count = 0 + max_no_movement = 50 + + while self.running and not self.paused: + if time.time() - start_time > self.config.max_scan_time: + self.log("Scan timeout") + break + + # Pulse the motor + if direction == ScanDirection.RIGHT: + self.motion.send_command('E') + else: + self.motion.send_command('W') + + time.sleep(self.config.movement_interval) + + if direction == ScanDirection.RIGHT: + self.motion.send_command('e') + else: + self.motion.send_command('w') + + # Wait for settle + time.sleep(self.config.frame_interval) + + # Capture and measure + curr_frame = self._capture_frame() + dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame) + + # Accumulate displacement SINCE LAST APPEND + self._displacement_since_append_x += dx + self._displacement_since_append_y += dy + + with self._state_lock: + self.state.cumulative_x = self._displacement_since_append_x + self.state.cumulative_y = self._displacement_since_append_y + self.state.last_displacement = (dx, dy) + self.state.frame_count += 1 + + # Check for no movement + if abs(dx) < 1.0 and abs(dy) < 1.0: + no_movement_count += 1 + if no_movement_count >= max_no_movement: + self.log(f"Edge detected (no movement for {no_movement_count} frames)") + break + else: + no_movement_count = 0 + + # Check threshold and append + if abs(self._displacement_since_append_x) >= threshold_pixels: + self._append_to_mosaic_fixed(curr_frame, direction) + self.log(f"Appended {abs(self._displacement_since_append_x):.1f}px strip, " + f"mosaic: {self.state.mosaic_width}x{self.state.mosaic_height}") + + # Update prev_frame for next displacement calculation + self._prev_frame = curr_frame.copy() + + if self.on_progress: + self.on_progress(self.state.append_count, 0) + + # Stop + if direction == ScanDirection.RIGHT: + self.motion.send_command('e') + else: + self.motion.send_command('w') + + time.sleep(self.config.settle_time) + + def _move_to_next_row(self): + """Move down to next row""" + self.log("Moving to next row...") + + frame = self._capture_frame() + h, w = frame.shape[:2] + move_distance = h * (1 - self.config.row_overlap) + + with self._state_lock: + self.state.direction = 'down' + + self.motion.send_command('N') + + self._prev_frame = frame.copy() + cumulative_y = 0.0 + + while self.running: + time.sleep(self.config.frame_interval) + + curr_frame = self._capture_frame() + dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame) + + cumulative_y += dy + self._prev_frame = curr_frame.copy() + + with self._state_lock: + self.state.cumulative_y = cumulative_y + + if abs(cumulative_y) >= move_distance: + break + + if abs(cumulative_y) < 5 and self.state.frame_count > 50: + self.log("Warning: Minimal Y movement") + break + + self.motion.send_command('n') + time.sleep(self.config.settle_time) + + frame = self._capture_frame() + self._start_new_row(frame, ScanDirection.DOWN) + + def _capture_frame(self) -> np.ndarray: + """Capture and rotate frame""" + frame = self.camera.capture_frame() + frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) + return frame + + # ========================================================================= + # Getters + # ========================================================================= + + def get_state(self) -> StitchState: + """Get current scan state""" + with self._state_lock: + return StitchState( + is_scanning=self.state.is_scanning, + direction=self.state.direction, + cumulative_x=self.state.cumulative_x, + cumulative_y=self.state.cumulative_y, + last_displacement=self.state.last_displacement, + current_row=self.state.current_row, + total_rows=self.state.total_rows, + mosaic_width=self.state.mosaic_width, + mosaic_height=self.state.mosaic_height, + frame_count=self.state.frame_count, + append_count=self.state.append_count + ) + + def get_mosaic(self) -> Optional[np.ndarray]: + """Get current mosaic (full resolution)""" + with self._mosaic_lock: + if self.mosaic is not None: + return self.mosaic.copy() + return None + + def get_mosaic_preview(self, max_size: int = 600) -> Optional[np.ndarray]: + """Get scaled mosaic for preview""" + with self._mosaic_lock: + if self.mosaic is None: + return None + + h, w = self.mosaic.shape[:2] + scale = min(max_size / w, max_size / h, 1.0) + + if scale < 1.0: + new_w = int(w * scale) + new_h = int(h * scale) + return cv2.resize(self.mosaic, (new_w, new_h)) + + return self.mosaic.copy() + + def save_mosaic(self, filepath: str) -> bool: + """Save mosaic to file""" + with self._mosaic_lock: + if self.mosaic is None: + return False + + cv2.imwrite(filepath, self.mosaic) + self.log(f"Saved mosaic to {filepath}") + return True + + # ========================================================================= + # Testing + # ========================================================================= + + def test_displacement(self, num_frames: int = 10) -> dict: + """Test displacement detection""" + results = { + 'frames': [], + 'total_dx': 0.0, + 'total_dy': 0.0 + } + + prev_frame = self._capture_frame() + + for i in range(num_frames): + time.sleep(0.1) + curr_frame = self._capture_frame() + + dx, dy = self._detect_displacement(prev_frame, curr_frame) + + results['frames'].append({'frame': i, 'dx': dx, 'dy': dy}) + results['total_dx'] += dx + results['total_dy'] += dy + + prev_frame = curr_frame + + return results \ No newline at end of file