diff --git a/Mos1.png b/Mos1.png new file mode 100644 index 0000000..62a8f10 Binary files /dev/null and b/Mos1.png differ diff --git a/Mos2.png b/Mos2.png new file mode 100644 index 0000000..135eed4 Binary files /dev/null and b/Mos2.png differ diff --git a/src/autofocus.py b/src/autofocus.py index 89ccfff..20499b3 100644 --- a/src/autofocus.py +++ b/src/autofocus.py @@ -4,16 +4,31 @@ from vision import calculate_focus_score_sobel class AutofocusController: - """Manages autofocus operations""" + """ + Manages autofocus operations using visual feedback. + + Key design principle: Never trust step counts due to gear slippage. + Use focus score as the authoritative position feedback. + """ # Default timing settings DEFAULT_SETTINGS = { + # Sweep settings 'sweep_move_time': 0.5, - 'sweep_settle_time': 0.05, + 'sweep_settle_time': 0.01, 'sweep_samples': 10, 'sweep_steps': 30, - 'fine_move_time': 0.15, - 'fine_settle_time': 0.1, + + # Navigation settings (fast approach to peak) + 'nav_move_time': 0.3, # Quick bursts + 'nav_check_samples': 3, # Fewer samples for speed + 'nav_peak_threshold': 0.80, # Consider "near peak" at 80% of target + 'nav_decline_threshold': 1.00, # Detect peak passage at 8% drop + 'nav_max_moves': 100, # Safety limit + + # Fine tuning settings + 'fine_move_time': 0.30, + 'fine_settle_time': 0.01, 'fine_samples': 10, 'fine_max_no_improvement': 5, } @@ -74,6 +89,11 @@ class AutofocusController: return sum(scores) / len(scores) if scores else 0 + def get_quick_focus(self, samples=3): + """Quick focus reading with fewer samples for navigation""" + scores = [self.get_focus_score() for _ in range(samples)] + return sum(scores) / len(scores) + # === Autofocus Control === def start(self, speed_range=(1, 5), coarse=False, fine=False): @@ -111,31 +131,33 @@ class AutofocusController: def _autofocus_routine(self, speed_range): """ - Autofocus using sweep search + hill climbing. - Phase 1: Sweep across range to find approximate peak - Phase 2: Fine tune from best position found + Autofocus using sweep search + visual navigation + hill climbing. + + Phase 1: Sweep to map focus curve and find approximate peak + Phase 2: Navigate to peak using visual feedback + Phase 3: Fine tune with hill climbing """ self.log(f"Starting autofocus (speed range: {speed_range})") min_speed_idx, max_speed_idx = speed_range - s = self.settings # Shorthand + s = self.settings try: - # Phase 1: Sweep search - best_step, best_score = self._sweep_search( + # Phase 1: Sweep search - map the focus curve + best_score, peak_direction = self._sweep_search( min_speed_idx, max_speed_idx, s ) if not self.running: return - # Move to best position found - self._move_to_best_position(best_step, s) + # Phase 2: Navigate to peak + self._navigate_to_peak(best_score, peak_direction, s) if not self.running: return - # Phase 2: Fine tuning + # Phase 3: Fine tuning with hill climbing final_score = self._fine_tune(min_speed_idx, s) self.log(f"Autofocus complete. Final: {final_score:.1f}") @@ -146,30 +168,35 @@ class AutofocusController: self.running = False def _sweep_search(self, min_speed_idx, max_speed_idx, s): - """Phase 1: Sweep to find approximate best position""" + """ + Phase 1: Sweep to map focus curve. + + Returns: + best_score: The highest focus score observed + peak_direction: 'up' or 'down' - which direction has the peak + """ self.log("Phase 1: Sweep search") - # Use medium speed for sweep sweep_speed = (min_speed_idx + max_speed_idx) // 2 self.motion.set_speed(sweep_speed) time.sleep(0.1) - # Record starting position + # Record starting position score time.sleep(s['sweep_settle_time']) start_score = self.get_averaged_focus(samples=s['sweep_samples']) self.update_focus_display(start_score) - - sweep_data = [(0, start_score)] self.log(f"Start position: {start_score:.1f}") - if not self.running: - return 0, start_score + sweep_data = {'start': start_score, 'down': [], 'up': []} - # Sweep DOWN first (away from slide to avoid contact) + if not self.running: + return start_score, 'up' + + # Sweep DOWN first (away from slide) self.log(f"Sweeping down {s['sweep_steps']} steps...") for i in range(1, s['sweep_steps'] + 1): if not self.running: - return 0, start_score + return start_score, 'up' self.motion.move_z_down() time.sleep(s['sweep_move_time']) @@ -177,35 +204,24 @@ class AutofocusController: time.sleep(s['sweep_settle_time']) score = self.get_averaged_focus(samples=s['sweep_samples']) - sweep_data.append((-i, score)) + sweep_data['down'].append(score) self.update_focus_display(score) if i % 5 == 0: - self.log(f" Step -{i}: {score:.1f}") + self.log(f" Down {i}: {score:.1f}") + + # Remember score at bottom + bottom_score = sweep_data['down'][-1] if sweep_data['down'] else start_score if not self.running: - return 0, start_score + return start_score, 'up' - # Return to start - self.log("Returning to start...") - for _ in range(s['sweep_steps']): + # Now sweep UP past start to the other side + # We'll go UP for 2x sweep_steps (to cover both halves) + self.log(f"Sweeping up {s['sweep_steps'] * 2} steps...") + for i in range(1, s['sweep_steps'] * 2 + 1): if not self.running: - return 0, start_score - self.motion.move_z_up() - time.sleep(s['sweep_move_time']) - self.motion.stop_z() - time.sleep(0.1) - - time.sleep(s['sweep_settle_time']) - - if not self.running: - return 0, start_score - - # Sweep UP - self.log(f"Sweeping up {s['sweep_steps']} steps...") - for i in range(1, s['sweep_steps'] + 1): - if not self.running: - return 0, start_score + return start_score, 'down' self.motion.move_z_up() time.sleep(s['sweep_move_time']) @@ -213,73 +229,145 @@ class AutofocusController: time.sleep(s['sweep_settle_time']) score = self.get_averaged_focus(samples=s['sweep_samples']) - sweep_data.append((i, score)) + sweep_data['up'].append(score) self.update_focus_display(score) if i % 5 == 0: - self.log(f" Step +{i}: {score:.1f}") + self.log(f" Up {i}: {score:.1f}") - # Find best position - best_step, best_score = max(sweep_data, key=lambda x: x[1]) - self.log(f"Best found at step {best_step}: {best_score:.1f}") + # Analyze sweep data to find peak + all_scores = sweep_data['down'] + [start_score] + sweep_data['up'] + best_score = max(all_scores) - # Log sweep curve - sorted_data = sorted(sweep_data, key=lambda x: x[0]) - curve_str = " ".join([f"{d[1]:.0f}" for d in sorted_data]) - self.log(f"Sweep curve: {curve_str}") + down_best = max(sweep_data['down']) if sweep_data['down'] else 0 + up_best = max(sweep_data['up']) if sweep_data['up'] else 0 - return best_step, best_score - - def _move_to_best_position(self, best_step, s): - """Move from current position (+SWEEP_STEPS) to best_step""" - steps_to_move = s['sweep_steps'] - best_step - - if steps_to_move > 0: - self.log(f"Moving down {steps_to_move} steps to best position") - move_func = self.motion.move_z_down + # Determine which direction has the peak + # We're currently at the TOP of the sweep (after going up) + # So to reach the peak, we need to go DOWN if peak was in down region or middle + if down_best > up_best and down_best > start_score: + peak_direction = 'down' + self.log(f"Peak in DOWN region: {down_best:.1f}") + elif up_best >= down_best and up_best > start_score: + # Peak is in up region, but we need to check where in the up region + # If peak is in first half of up (returning toward start), go down + # If peak is in second half of up (past start), we might already be close + up_scores = sweep_data['up'] + mid_point = len(up_scores) // 2 + first_half_best = max(up_scores[:mid_point]) if mid_point > 0 else 0 + second_half_best = max(up_scores[mid_point:]) if mid_point < len(up_scores) else 0 + + if second_half_best >= first_half_best: + # Peak is near current position (top), go down slightly + peak_direction = 'down' + else: + # Peak is further down + peak_direction = 'down' + self.log(f"Peak in UP region: {up_best:.1f}") else: - self.log(f"Moving up {abs(steps_to_move)} steps to best position") - move_func = self.motion.move_z_up - steps_to_move = abs(steps_to_move) + peak_direction = 'down' + self.log(f"Peak near start: {start_score:.1f}") - for _ in range(steps_to_move): + # Log sweep curve for debugging + self.log(f"Best score found: {best_score:.1f}") + self.log(f"Navigate direction: {peak_direction}") + + return best_score, peak_direction + + def _navigate_to_peak(self, target_score, direction, s): + """ + Phase 2: Navigate toward peak using camera + """ + self.log(f"Phase 2: Navigate to peak (target: {target_score:.1f}, dir: {direction})") + + move_func = self.motion.move_z_down if direction == 'down' else self.motion.move_z_up + + # Use medium-fast speed for approach + self.motion.set_speed(4) + time.sleep(0.1) + + peak_threshold = target_score * s['nav_peak_threshold'] + decline_threshold = s['nav_decline_threshold'] + self.log(f"(peak_threshold: {peak_threshold:.1f}, decline_threshold: {decline_threshold})") + + prev_score = self.get_quick_focus(s['nav_check_samples']) + max_observed = prev_score + moves_in_peak_region = 0 + declining_streak = 0 + + for move_count in range(s['nav_max_moves']): if not self.running: return + + # Quick movement burst move_func() - time.sleep(s['sweep_move_time']) + time.sleep(s['nav_move_time']) self.motion.stop_z() - time.sleep(0.1) + time.sleep(0.05) # Brief settle + + # Check focus + current_score = self.get_quick_focus(s['nav_check_samples']) + self.log(f" Current: {current_score:.1f} (max was {max_observed:.1f}) (Declining {declining_streak:.1f}) (Decline Thres {max_observed * decline_threshold:.1f})") + self.log(f" Current: {current_score:.1f} >= peak thresh({peak_threshold:.1f}) ") + self.log(f" Current: {current_score:.1f} <= max_observed * decline_threshold:({max_observed * decline_threshold:.1f}) ") + self.update_focus_display(current_score) + + # Track maximum observed + if current_score > max_observed: + max_observed = current_score + declining_streak = 0 + self.log("Setting Max") + elif max_observed > 0.8 * target_score: + declining_streak += 1 + + if declining_streak >=2: + self.log(f" Passed peak after {move_count} moves, stopping") + break + + # # Are we in the peak region? + # if current_score >= peak_threshold: + # moves_in_peak_region += 1 + + # # Check if we're past the peak (score declining from max) + # if current_score < max_observed * decline_threshold: + # declining_streak += 1 + # self.log(f" Declining: {current_score:.1f} (max was {max_observed:.1f})") + + # if declining_streak >= 1: + # self.log(f" Passed peak after {move_count} moves, stopping") + # break + # else: + # declining_streak = 0 + # if moves_in_peak_region % 3 == 0: + # self.log(f" In peak region: {current_score:.1f}") + + prev_score = current_score - time.sleep(s['sweep_settle_time']) - current_score = self.get_averaged_focus(samples=s['sweep_samples']) - self.log(f"At best position: {current_score:.1f}") + final_score = self.get_averaged_focus(samples=s['sweep_samples']) + self.log(f"Navigation complete at: {final_score:.1f}") def _fine_tune(self, min_speed_idx, s): - """Phase 2: Fine hill-climbing from best position""" - self.log("Phase 2: Fine tuning") + """ + Phase 3: Fine hill-climbing from current position. + + Already near the peak from navigation, now do precise adjustments. + """ + self.log("Phase 3: Fine tuning") self.motion.set_speed(min_speed_idx) time.sleep(0.1) best_score = self.get_averaged_focus(samples=s['fine_samples']) + self.log(f"Starting fine tune at: {best_score:.1f}") - # Determine fine direction by testing both + # Try both directions to find improvement fine_direction = self._determine_fine_direction(best_score, s) if not self.running: return best_score - # Fine search - best_score, best_position_offset = self._fine_search( - fine_direction, best_score, s - ) - - if not self.running: - return best_score - - # Return to best position - if best_position_offset > 0: - self._return_to_best(fine_direction, best_position_offset, s) + # Search in best direction + best_score = self._fine_search(fine_direction, best_score, s) # Final reading time.sleep(s['fine_settle_time']) @@ -288,9 +376,9 @@ class AutofocusController: return final_score - def _determine_fine_direction(self, best_score, s): + def _determine_fine_direction(self, current_score, s): """Test both directions to find which improves focus""" - # Try DOWN first + # Try DOWN self.motion.move_z_down() time.sleep(s['fine_move_time']) self.motion.stop_z() @@ -298,11 +386,11 @@ class AutofocusController: down_score = self.get_averaged_focus(samples=s['fine_samples']) - if down_score > best_score: - self.log(f"Fine direction: DOWN ({down_score:.1f})") + if down_score > current_score * 1.02: # 2% improvement threshold + self.log(f"Fine direction: DOWN ({down_score:.1f} > {current_score:.1f})") return 'down' - # Go back and try UP + # Try UP (go back and past) self.motion.move_z_up() time.sleep(s['fine_move_time']) self.motion.stop_z() @@ -315,25 +403,29 @@ class AutofocusController: up_score = self.get_averaged_focus(samples=s['fine_samples']) - if up_score > best_score: - self.log(f"Fine direction: UP ({up_score:.1f})") + if up_score > current_score * 1.02: + self.log(f"Fine direction: UP ({up_score:.1f} > {current_score:.1f})") return 'up' - # Already at peak - self.log("Already at peak, minor adjustment only") + # Already at peak, return to center + self.log("Already at peak") self.motion.move_z_down() time.sleep(s['fine_move_time']) self.motion.stop_z() time.sleep(s['fine_settle_time']) - return 'up' + return 'up' # Default direction for minor adjustments def _fine_search(self, direction, best_score, s): - """Search in given direction until no improvement""" + """ + Search in given direction until no improvement. + Uses visual feedback to track best position. + """ move_func = self.motion.move_z_up if direction == 'up' else self.motion.move_z_down + reverse_func = self.motion.move_z_down if direction == 'up' else self.motion.move_z_up no_improvement_count = 0 - best_position_offset = 0 + moves_since_best = 0 while self.running and no_improvement_count < s['fine_max_no_improvement']: move_func() @@ -347,21 +439,25 @@ class AutofocusController: if current_score > best_score: best_score = current_score no_improvement_count = 0 - best_position_offset = 0 - self.log(f"Fine better: {current_score:.1f}") + moves_since_best = 0 + self.log(f"Fine improved: {current_score:.1f}") else: no_improvement_count += 1 - best_position_offset += 1 + moves_since_best += 1 - return best_score, best_position_offset - - def _return_to_best(self, direction, steps, s): - """Return to best position by reversing direction""" - self.log(f"Returning {steps} steps") - reverse_func = self.motion.move_z_down if direction == 'up' else self.motion.move_z_up + # Go back to best position using visual feedback + if moves_since_best > 0: + self.log(f"Reversing {moves_since_best} moves toward best") + for _ in range(moves_since_best): + reverse_func() + time.sleep(s['fine_move_time']) + self.motion.stop_z() + + # Check if we're back at peak + check_score = self.get_quick_focus(3) + if check_score >= best_score * 0.98: + break + + time.sleep(0.05) - for _ in range(steps): - reverse_func() - time.sleep(s['fine_move_time']) - self.motion.stop_z() - time.sleep(0.1) \ No newline at end of file + return best_score \ No newline at end of file diff --git a/src/camera.py b/src/camera.py index a2e9f30..5a1f82a 100644 --- a/src/camera.py +++ b/src/camera.py @@ -1,22 +1,183 @@ import cv2 +import subprocess +import re + + +def detect_camera_modes(device_id=0): + """ + Detect available camera resolutions using v4l2-ctl. + Returns dict of modes sorted by resolution (smallest to largest). + """ + modes = {} + + try: + # Run v4l2-ctl to get supported formats + device = f"/dev/video{device_id}" + result = subprocess.run( + ['v4l2-ctl', '--device', device, '--list-formats-ext'], + capture_output=True, + text=True, + timeout=5 + ) + + if result.returncode != 0: + print(f"v4l2-ctl failed: {result.stderr}") + return _get_fallback_modes() + + # Parse output for "Size: Discrete WxH" lines + # Example: "Size: Discrete 2592x1944" + size_pattern = re.compile(r'Size:\s+Discrete\s+(\d+)x(\d+)') + + resolutions = set() # Use set to avoid duplicates + for line in result.stdout.split('\n'): + match = size_pattern.search(line) + if match: + width = int(match.group(1)) + height = int(match.group(2)) + resolutions.add((width, height)) + + if not resolutions: + print("No resolutions found in v4l2-ctl output") + return _get_fallback_modes() + + # Sort by total pixels (width * height) + sorted_res = sorted(resolutions, key=lambda r: r[0] * r[1]) + + # Build modes dict with descriptive names + for i, (width, height) in enumerate(sorted_res): + pixels = width * height + + # Generate a name based on position/size + if i == 0: + name = 'low' + desc = 'Low' + elif i == len(sorted_res) - 1: + name = 'high' + desc = 'High' + elif len(sorted_res) == 3 and i == 1: + name = 'medium' + desc = 'Medium' + else: + name = f'res_{width}x{height}' + desc = f'{width}x{height}' + + modes[name] = { + 'width': width, + 'height': height, + 'label': f'{width}x{height} ({desc})' + } + + print(f"Detected {len(modes)} camera modes: {list(modes.keys())}") + return modes + + except FileNotFoundError: + print("v4l2-ctl not found, using fallback modes") + return _get_fallback_modes() + except subprocess.TimeoutExpired: + print("v4l2-ctl timed out, using fallback modes") + return _get_fallback_modes() + except Exception as e: + print(f"Error detecting camera modes: {e}") + return _get_fallback_modes() + + +def _get_fallback_modes(): + """Fallback modes if v4l2-ctl detection fails""" + return { + 'low': {'width': 640, 'height': 480, 'label': '640x480 (Low)'}, + 'medium': {'width': 1280, 'height': 960, 'label': '1280x960 (Medium)'}, + 'high': {'width': 1920, 'height': 1080, 'label': '1920x1080 (High)'}, + } + class Camera: - def __init__(self, device_id=0): + prevFrame = {} + def __init__(self, device_id=0, mode=None): + self.device_id = device_id + + # Detect available modes before opening camera + self.MODES = detect_camera_modes(device_id) + + # Open camera self.cap = cv2.VideoCapture(device_id) if not self.cap.isOpened(): raise RuntimeError("Could not open camera, stop program") + # Default to highest resolution if no mode specified + if mode is None: + mode = list(self.MODES.keys())[0] # Last = highest res + + self.current_mode = mode + self._apply_mode(mode) + # set resolution # self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) # self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080) - + self.cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0) # Disable auto-exposure + self.cap.set(cv2.CAP_PROP_EXPOSURE, -6) # Set fixed exposure + self.cap.set(cv2.CAP_PROP_AUTO_WB, 0) # Disable auto white balance self.window_name = "AutoScope" + def _apply_mode(self, mode_name): + """Apply resolution settings""" + if mode_name not in self.MODES: + print(f"Unknown mode {mode_name}, using first available") + mode_name = list(self.MODES.keys())[0] + + mode = self.MODES[mode_name] + + # Set resolution + self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, mode['width']) + self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, mode['height']) + + self.current_mode = mode_name + + # Verify settings took effect + actual_w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + actual_h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + actual_fps = self.cap.get(cv2.CAP_PROP_FPS) + + print(f"Camera mode: {mode['label']}") + print(f" Actual: {actual_w}x{actual_h} @ {actual_fps:.1f}fps") + + return actual_w, actual_h, actual_fps + + def set_mode(self, mode_name): + """Change camera mode (resolution/framerate)""" + return self._apply_mode(mode_name) + + def get_mode(self): + """Get current mode name""" + return self.current_mode + + def get_mode_info(self): + """Get current mode details""" + return self.MODES.get(self.current_mode, list(self.MODES.values())[0]) + + def get_resolution(self): + """Get current actual resolution""" + w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + return w, h + + def get_fps(self): + """Get current actual FPS""" + return self.cap.get(cv2.CAP_PROP_FPS) + + def get_available_modes(self): + """Get list of available mode names""" + return list(self.MODES.keys()) + + def get_mode_labels(self): + """Get mode labels for UI""" + return {k: v['label'] for k, v in self.MODES.items()} + def capture_frame(self): ret, frame = self.cap.read() if not ret: - raise RuntimeError("Failed to capture frame, stop program") + return prevframe + prevframe = frame return frame def show_frame(self, frame): @@ -68,5 +229,4 @@ class Camera: if key == ord('q'): break - self.close_window() - \ No newline at end of file + self.close_window() \ No newline at end of file diff --git a/src/gui.py b/src/gui.py index 262253f..5a3ac36 100644 --- a/src/gui.py +++ b/src/gui.py @@ -1,12 +1,21 @@ +""" +AutoScope GUI - Updated with Stitching Scanner Integration + +Includes both tile-based scanner and new stitching scanner modes. +""" + import tkinter as tk -from tkinter import ttk, scrolledtext +from tkinter import ttk, scrolledtext, filedialog from PIL import Image, ImageTk import cv2 +import numpy as np import threading import queue from motion_controller import MotionController from autofocus import AutofocusController +from scanner import Scanner, ScanConfig, ScanDirection, Tile +from stitching_scanner import StitchingScanner, StitchConfig class AppGUI: @@ -25,227 +34,697 @@ class AppGUI: on_focus_update=self._update_focus_display_threadsafe ) - # Queue for thread-safe serial log updates + # Scanners - both tile-based and stitching + self.scanner = None # Tile-based scanner + self.stitch_scanner = None # Stitching scanner + self.scan_config = ScanConfig() + self.stitch_config = StitchConfig() + + # Scanner mode: 'tile' or 'stitch' + self.scanner_mode = 'stitch' # Default to new stitching mode + + # Queues for thread-safe updates self.serial_queue = queue.Queue() + self.tile_queue = queue.Queue() + + # Mosaic window + self.mosaic_window = None # Build the window self.root = tk.Tk() self.root.title("AutoScope") - self.root.geometry("1080x1080") - self.root.minsize(1080, 1080) + self.root.geometry("720x1280") + self.root.minsize(640, 900) self.root.protocol("WM_DELETE_WINDOW", self.on_close) self._build_ui() + self._init_scanners() # Start serial reader thread self.serial_thread = threading.Thread(target=self._serial_reader, daemon=True) self.serial_thread.start() + def _init_scanners(self): + """Initialize both scanner types""" + # Tile-based scanner + self.scanner = Scanner( + camera=self.camera, + motion_controller=self.motion, + autofocus_controller=self.autofocus, + config=self.scan_config, + on_tile_captured=self._on_tile_captured, + on_log=self.log_message, + on_progress=self._on_scan_progress + ) + + # Stitching scanner + self.stitch_scanner = StitchingScanner( + camera=self.camera, + motion_controller=self.motion, + autofocus_controller=self.autofocus, + config=self.stitch_config, + on_log=self.log_message, + on_progress=self._on_stitch_progress, + on_mosaic_updated=self._on_mosaic_updated + ) + def _on_command_sent(self, cmd): - """Callback when motion controller sends a command""" self.log_message(f"> {cmd}") def _update_focus_display_threadsafe(self, score): - """Thread-safe focus display update""" self.root.after(0, lambda: self.focus_score_label.config(text=f"Focus: {score:.1f}")) - # === UI Building === + def _on_tile_captured(self, tile: Tile): + self.tile_queue.put(tile) + + def _on_scan_progress(self, current: int, total: int): + self.root.after(0, lambda: self._update_progress(current, total)) + + def _on_stitch_progress(self, appends: int, total: int): + self.root.after(0, lambda: self._update_stitch_progress(appends)) + + def _on_mosaic_updated(self): + """Called when stitching scanner updates mosaic""" + self.root.after(0, self._update_mosaic_window) + + # ========================================================================= + # UI Building - Vertical Layout + # ========================================================================= def _build_ui(self): main_frame = ttk.Frame(self.root) main_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) - # Left: Camera view - self.camera_label = ttk.Label(main_frame) - self.camera_label.pack(side=tk.LEFT, padx=(0, 5)) + # === TOP: Camera View (large, full width) === + camera_frame = ttk.LabelFrame(main_frame, text="Camera") + camera_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 5)) - # Right: Control panel - right_frame = ttk.Frame(main_frame, width=320) - right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True) - right_frame.pack_propagate(False) + self.camera_label = ttk.Label(camera_frame) + self.camera_label.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) - self._build_emergency_stop(right_frame) - self._build_speed_controls(right_frame) - self._build_fine_speed_controls(right_frame) - self._build_movement_controls(right_frame) - self._build_autofocus_controls(right_frame) - self._build_status_controls(right_frame) - self._build_mode_controls(right_frame) - self._build_serial_log(right_frame) - self._build_command_entry(right_frame) - - def _build_emergency_stop(self, parent): - frame = ttk.Frame(parent) - frame.pack(fill=tk.X, pady=(0, 10)) + # Camera options row 1: overlays + cam_opts1 = ttk.Frame(camera_frame) + cam_opts1.pack(fill=tk.X, padx=5, pady=(0, 2)) - self.emergency_btn = tk.Button( - frame, - text="⚠ EMERGENCY STOP ⚠", - command=self.motion.stop_all, - bg='red', fg='white', font=('Arial', 12, 'bold'), - height=2 + self.show_tile_overlay_var = tk.BooleanVar(value=True) + ttk.Checkbutton(cam_opts1, text="Bounds", + variable=self.show_tile_overlay_var).pack(side=tk.LEFT) + + self.show_displacement_var = tk.BooleanVar(value=True) + ttk.Checkbutton(cam_opts1, text="Displacement", + variable=self.show_displacement_var).pack(side=tk.LEFT, padx=(10, 0)) + + self.live_focus_var = tk.BooleanVar(value=True) + ttk.Checkbutton(cam_opts1, text="Live focus", + variable=self.live_focus_var).pack(side=tk.LEFT, padx=(10, 0)) + + # Resolution selector + ttk.Separator(cam_opts1, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=10) + ttk.Label(cam_opts1, text="Res:").pack(side=tk.LEFT) + + self.resolution_var = tk.StringVar(value=self.camera.get_mode()) + self.resolution_combo = ttk.Combobox( + cam_opts1, + textvariable=self.resolution_var, + values=list(self.camera.get_mode_labels().values()), + state='readonly', + width=18 ) - self.emergency_btn.pack(fill=tk.X) + # Set display to label + mode_labels = self.camera.get_mode_labels() + self.resolution_combo.set(mode_labels[self.camera.get_mode()]) + self.resolution_combo.pack(side=tk.LEFT, padx=2) + self.resolution_combo.bind('<>', self._on_resolution_change) + + self.focus_score_label = ttk.Label(cam_opts1, text="Focus: --", font=('Arial', 11, 'bold')) + self.focus_score_label.pack(side=tk.RIGHT) + + # Camera options row 2: displacement info + cam_opts2 = ttk.Frame(camera_frame) + cam_opts2.pack(fill=tk.X, padx=5, pady=(0, 5)) + + ttk.Label(cam_opts2, text="Displacement:").pack(side=tk.LEFT) + self.displacement_label = ttk.Label(cam_opts2, text="X: -- Y: --", font=('Arial', 10)) + self.displacement_label.pack(side=tk.LEFT, padx=5) + + ttk.Separator(cam_opts2, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=10) + + ttk.Label(cam_opts2, text="Mosaic:").pack(side=tk.LEFT) + self.mosaic_size_label = ttk.Label(cam_opts2, text="--", font=('Arial', 10)) + self.mosaic_size_label.pack(side=tk.LEFT, padx=5) + + ttk.Separator(cam_opts2, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=10) + + ttk.Label(cam_opts2, text="Cam:").pack(side=tk.LEFT) + self.cam_res_label = ttk.Label(cam_opts2, text="--", font=('Arial', 10)) + self.cam_res_label.pack(side=tk.LEFT, padx=5) + self._update_cam_res_label() + + # === BOTTOM: Control Panel === + control_frame = ttk.Frame(main_frame) + control_frame.pack(fill=tk.X) + + # Row 1: Emergency Stop + Scanner Mode + Controls + self._build_row1_scanner(control_frame) + + # Row 2: Stitching Settings + self._build_row2_stitch_settings(control_frame) + + # Row 3: Movement Controls + self._build_row3_movement(control_frame) + + # Row 4: Speed + Autofocus + self._build_row4_speed_autofocus(control_frame) + + # Row 5: Status + Log + self._build_row5_status_log(control_frame) - def _build_speed_controls(self, parent): - frame = ttk.LabelFrame(parent, text="Speed") - frame.pack(fill=tk.X, pady=(0, 10)) + def _build_row1_scanner(self, parent): + """Row 1: Emergency stop and scanner controls""" + row = ttk.Frame(parent) + row.pack(fill=tk.X, pady=(0, 3)) - btn_frame = ttk.Frame(frame) - btn_frame.pack(fill=tk.X, padx=5, pady=5) + # Emergency stop button + self.emergency_btn = tk.Button( + row, text="⚠ STOP", command=self._emergency_stop, + bg='#d32f2f', fg='white', font=('Arial', 11, 'bold'), + width=7, height=1, relief=tk.RAISED, bd=2 + ) + self.emergency_btn.pack(side=tk.LEFT, padx=(0, 8)) - self.speed_var = tk.StringVar(value="Medium") + # Scanner frame + scanner_frame = ttk.LabelFrame(row, text="Stitching Scanner") + scanner_frame.pack(side=tk.LEFT, fill=tk.X, expand=True) - for text, value, preset in [("Slow", "Slow", "slow"), - ("Medium", "Medium", "medium"), - ("Fast", "Fast", "fast")]: - ttk.Radiobutton( - btn_frame, text=text, variable=self.speed_var, - value=value, command=lambda p=preset: self.motion.set_speed_preset(p) - ).pack(side=tk.LEFT, padx=5) + sf = ttk.Frame(scanner_frame) + sf.pack(fill=tk.X, padx=5, pady=3) + + # Scanner mode toggle + self.scanner_mode_var = tk.StringVar(value='stitch') + ttk.Radiobutton(sf, text="Stitch", variable=self.scanner_mode_var, + value='stitch', command=self._on_mode_change).pack(side=tk.LEFT) + ttk.Radiobutton(sf, text="Tile", variable=self.scanner_mode_var, + value='tile', command=self._on_mode_change).pack(side=tk.LEFT, padx=(5, 10)) + + # Status indicator + self.scan_status_label = ttk.Label(sf, text="Idle", font=('Arial', 9, 'bold'), width=8) + self.scan_status_label.pack(side=tk.LEFT) + + # Progress + self.scan_progress_bar = ttk.Progressbar(sf, mode='indeterminate', length=60) + self.scan_progress_bar.pack(side=tk.LEFT, padx=3) + + self.scan_progress_label = ttk.Label(sf, text="0 appends", width=10) + self.scan_progress_label.pack(side=tk.LEFT) + + # Separator + ttk.Separator(sf, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=5) + + # Control buttons + btn_frame = ttk.Frame(sf) + btn_frame.pack(side=tk.LEFT) + + self.scan_start_btn = tk.Button( + btn_frame, text="▶ Start", width=7, + bg='#4CAF50', fg='white', font=('Arial', 9, 'bold'), + activebackground='#45a049', relief=tk.RAISED, bd=2, + command=self._start_scan + ) + self.scan_start_btn.pack(side=tk.LEFT, padx=2) + + self.scan_pause_btn = tk.Button( + btn_frame, text="⏸", width=3, + bg='#2196F3', fg='white', font=('Arial', 10, 'bold'), + activebackground='#1976D2', relief=tk.RAISED, bd=2, + command=self._pause_scan, state='disabled' + ) + self.scan_pause_btn.pack(side=tk.LEFT, padx=2) + + self.scan_stop_btn = tk.Button( + btn_frame, text="⏹", width=3, + bg='#FF9800', fg='white', font=('Arial', 10, 'bold'), + activebackground='#F57C00', relief=tk.RAISED, bd=2, + command=self._stop_scan, state='disabled' + ) + self.scan_stop_btn.pack(side=tk.LEFT, padx=2) + + # Test button + ttk.Button(btn_frame, text="Test", width=4, + command=self._test_displacement).pack(side=tk.LEFT, padx=2) + + # Mosaic button + ttk.Button(sf, text="Mosaic", width=6, + command=self._show_mosaic_window).pack(side=tk.RIGHT, padx=2) - def _build_fine_speed_controls(self, parent): - frame = ttk.LabelFrame(parent, text="Fine Speed Control") - frame.pack(fill=tk.X, pady=(0, 10)) + def _build_row2_stitch_settings(self, parent): + """Row 2: Stitching settings""" + row = ttk.LabelFrame(parent, text="Stitch Settings") + row.pack(fill=tk.X, pady=(0, 3)) - inner = ttk.Frame(frame) - inner.pack(fill=tk.X, padx=5, pady=5) + inner = ttk.Frame(row) + inner.pack(fill=tk.X, padx=5, pady=3) - self.fine_speed_label = ttk.Label(inner, text="Speed: 50", font=('Arial', 10, 'bold')) - self.fine_speed_label.pack(anchor=tk.W) + # Displacement threshold + ttk.Label(inner, text="Threshold:").pack(side=tk.LEFT) + self.disp_threshold_var = tk.DoubleVar(value=0.10) + self.disp_threshold_spinbox = ttk.Spinbox( + inner, from_=0.05, to=0.30, increment=0.01, width=5, + textvariable=self.disp_threshold_var, + command=self._update_stitch_config + ) + self.disp_threshold_spinbox.pack(side=tk.LEFT, padx=(2, 10)) - self.speed_slider = ttk.Scale(inner, from_=0, to=6, orient=tk.HORIZONTAL) - self.speed_slider.set(5) - self.speed_slider.config(command=self._on_speed_slider_change) - self.speed_slider.pack(fill=tk.X, pady=(5, 0)) + # Number of rows + ttk.Label(inner, text="Rows:").pack(side=tk.LEFT) + self.num_rows_var = tk.IntVar(value=3) + self.num_rows_spinbox = ttk.Spinbox( + inner, from_=1, to=10, width=3, + textvariable=self.num_rows_var, + command=self._update_stitch_config + ) + self.num_rows_spinbox.pack(side=tk.LEFT, padx=(2, 10)) - btn_row = ttk.Frame(inner) - btn_row.pack(fill=tk.X, pady=(5, 0)) + # Row overlap + ttk.Label(inner, text="Overlap:").pack(side=tk.LEFT) + self.row_overlap_var = tk.DoubleVar(value=0.15) + self.row_overlap_spinbox = ttk.Spinbox( + inner, from_=0.05, to=0.50, increment=0.05, width=5, + textvariable=self.row_overlap_var, + command=self._update_stitch_config + ) + self.row_overlap_spinbox.pack(side=tk.LEFT, padx=(2, 10)) - for i in range(6): - ttk.Button( - btn_row, text=str(i), width=2, - command=lambda x=i: self._set_fine_speed(x) - ).pack(side=tk.LEFT, padx=1) + # Scan speed + ttk.Label(inner, text="Speed:").pack(side=tk.LEFT) + self.scan_speed_var = tk.IntVar(value=3) + self.scan_speed_spinbox = ttk.Spinbox( + inner, from_=1, to=6, width=3, + textvariable=self.scan_speed_var, + command=self._update_stitch_config + ) + self.scan_speed_spinbox.pack(side=tk.LEFT, padx=(2, 10)) + + # Autofocus toggle + self.af_every_row_var = tk.BooleanVar(value=True) + ttk.Checkbutton( + inner, text="AF each row", + variable=self.af_every_row_var, + command=self._update_stitch_config + ).pack(side=tk.LEFT, padx=(10, 0)) + + # Row/Direction status on right + status_frame = ttk.Frame(inner) + status_frame.pack(side=tk.RIGHT) + + ttk.Label(status_frame, text="Row:").pack(side=tk.LEFT) + self.row_label = ttk.Label(status_frame, text="--", width=5, font=('Arial', 9)) + self.row_label.pack(side=tk.LEFT) + + ttk.Label(status_frame, text="Dir:").pack(side=tk.LEFT, padx=(8, 0)) + self.direction_label = ttk.Label(status_frame, text="--", width=6, font=('Arial', 9)) + self.direction_label.pack(side=tk.LEFT) - def _build_movement_controls(self, parent): - frame = ttk.LabelFrame(parent, text="Movement Control") - frame.pack(fill=tk.X, pady=(0, 10)) + def _build_row3_movement(self, parent): + """Row 3: Movement controls for all axes""" + row = ttk.LabelFrame(parent, text="Movement") + row.pack(fill=tk.X, pady=(0, 3)) + + inner = ttk.Frame(row) + inner.pack(fill=tk.X, padx=5, pady=3) self.dir_labels = {} self.move_buttons = {} for axis in ["X", "Y", "Z"]: - row = ttk.Frame(frame) - row.pack(fill=tk.X, pady=3, padx=5) + af = ttk.Frame(inner) + af.pack(side=tk.LEFT, padx=(0, 15)) - ttk.Label(row, text=f"{axis}:", width=3, font=('Arial', 10, 'bold')).pack(side=tk.LEFT) + ttk.Label(af, text=f"{axis}:", font=('Arial', 10, 'bold')).pack(side=tk.LEFT) - dir_btn = ttk.Button(row, text="+ →", width=5, + dir_btn = ttk.Button(af, text="+→", width=4, command=lambda a=axis: self._toggle_direction(a)) dir_btn.pack(side=tk.LEFT, padx=2) self.dir_labels[axis] = dir_btn - move_btn = tk.Button(row, text="Move", width=8, bg='green', fg='white', - command=lambda a=axis: self._toggle_movement(a)) + move_btn = tk.Button( + af, text="Move", width=6, + bg='#4CAF50', fg='white', font=('Arial', 9), + activebackground='#45a049', relief=tk.RAISED, + command=lambda a=axis: self._toggle_movement(a) + ) move_btn.pack(side=tk.LEFT, padx=2) self.move_buttons[axis] = move_btn - ttk.Button(row, text="Stop", width=6, - command=lambda a=axis: self._stop_axis(a)).pack(side=tk.LEFT, padx=2) + ttk.Button(af, text="⏹", width=2, + command=lambda a=axis: self._stop_axis(a)).pack(side=tk.LEFT) - ttk.Button(frame, text="Stop All Axes", - command=self.motion.stop_all).pack(fill=tk.X, padx=5, pady=5) + # Stop all button + tk.Button( + inner, text="STOP ALL", width=10, + bg='#f44336', fg='white', font=('Arial', 9, 'bold'), + activebackground='#d32f2f', relief=tk.RAISED, bd=2, + command=self.motion.stop_all + ).pack(side=tk.RIGHT) - def _build_autofocus_controls(self, parent): - frame = ttk.LabelFrame(parent, text="Autofocus") - frame.pack(fill=tk.X, pady=(0, 10)) + def _build_row4_speed_autofocus(self, parent): + """Row 4: Speed and Autofocus controls""" + row = ttk.Frame(parent) + row.pack(fill=tk.X, pady=(0, 3)) - inner = ttk.Frame(frame) - inner.pack(fill=tk.X, padx=5, pady=5) + # Speed controls + speed_frame = ttk.LabelFrame(row, text="Speed") + speed_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 5)) - self.focus_score_label = ttk.Label(inner, text="Focus: --", font=('Arial', 10)) - self.focus_score_label.pack(anchor=tk.W) + sf = ttk.Frame(speed_frame) + sf.pack(fill=tk.X, padx=5, pady=3) - btn_row = ttk.Frame(inner) - btn_row.pack(fill=tk.X, pady=(5, 0)) + self.speed_var = tk.StringVar(value="Medium") + for text, val, preset in [("S", "Slow", "slow"), ("M", "Medium", "medium"), ("F", "Fast", "fast")]: + ttk.Radiobutton(sf, text=text, variable=self.speed_var, value=val, + command=lambda p=preset: self.motion.set_speed_preset(p)).pack(side=tk.LEFT, padx=2) - self.af_button = ttk.Button(btn_row, text="Autofocus", command=self._start_autofocus) + ttk.Separator(sf, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=5) + + self.fine_speed_label = ttk.Label(sf, text="1", width=3) + self.fine_speed_label.pack(side=tk.LEFT) + + self.speed_slider = ttk.Scale(sf, from_=0, to=6, orient=tk.HORIZONTAL, length=80) + self.speed_slider.set(1) + self.speed_slider.config(command=self._on_speed_slider_change) + self._on_speed_slider_change(1) + self.speed_slider.pack(side=tk.LEFT, padx=3) + + # Autofocus controls + af_frame = ttk.LabelFrame(row, text="Autofocus") + af_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + + af = ttk.Frame(af_frame) + af.pack(fill=tk.X, padx=5, pady=3) + + self.af_button = ttk.Button(af, text="Auto", width=6, command=self._start_autofocus) self.af_button.pack(side=tk.LEFT, padx=2) - ttk.Button(btn_row, text="Coarse AF", + ttk.Button(af, text="Coarse", width=6, command=lambda: self._start_autofocus(coarse=True)).pack(side=tk.LEFT, padx=2) - ttk.Button(btn_row, text="Fine AF", + ttk.Button(af, text="Fine", width=5, command=lambda: self._start_autofocus(fine=True)).pack(side=tk.LEFT, padx=2) - ttk.Button(btn_row, text="Stop AF", + ttk.Button(af, text="Stop", width=5, command=self._stop_autofocus).pack(side=tk.LEFT, padx=2) - - self.live_focus_var = tk.BooleanVar(value=True) - ttk.Checkbutton(inner, text="Show live focus score", - variable=self.live_focus_var).pack(anchor=tk.W, pady=(5, 0)) - def _build_status_controls(self, parent): - frame = ttk.LabelFrame(parent, text="Status") - frame.pack(fill=tk.X, pady=(0, 10)) + def _build_row5_status_log(self, parent): + """Row 5: Status and Log""" + row = ttk.Frame(parent) + row.pack(fill=tk.X, pady=(0, 3)) - btn_frame = ttk.Frame(frame) - btn_frame.pack(fill=tk.X, padx=5, pady=5) + # Status controls + status_frame = ttk.LabelFrame(row, text="Status") + status_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 5)) - ttk.Button(btn_frame, text="Status", command=self.motion.request_status).pack(side=tk.LEFT, padx=2) - ttk.Button(btn_frame, text="Zero All", command=self.motion.zero_all).pack(side=tk.LEFT, padx=2) - ttk.Button(btn_frame, text="Go Origin", command=self.motion.go_origin).pack(side=tk.LEFT, padx=2) - - def _build_mode_controls(self, parent): - frame = ttk.LabelFrame(parent, text="Control Mode") - frame.pack(fill=tk.X, pady=(0, 10)) + stf = ttk.Frame(status_frame) + stf.pack(fill=tk.X, padx=5, pady=3) - btn_frame = ttk.Frame(frame) - btn_frame.pack(fill=tk.X, padx=5, pady=5) + ttk.Button(stf, text="Status", width=6, command=self.motion.request_status).pack(side=tk.LEFT, padx=2) + self.mode_label = ttk.Label(stf, text="Serial", width=8) + self.mode_label.pack(side=tk.LEFT, padx=(10, 0)) + ttk.Button(stf, text="Mode", width=5, command=self.motion.toggle_mode).pack(side=tk.LEFT, padx=2) - self.mode_label = ttk.Label(btn_frame, text="Mode: Serial", font=('Arial', 9)) - self.mode_label.pack(side=tk.LEFT, padx=5) + # Log + log_frame = ttk.LabelFrame(row, text="Log") + log_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - ttk.Button(btn_frame, text="Toggle Mode", command=self.motion.toggle_mode).pack(side=tk.RIGHT, padx=2) - - def _build_serial_log(self, parent): - ttk.Label(parent, text="Serial Log:").pack(anchor=tk.W) - self.serial_log = scrolledtext.ScrolledText(parent, height=10, width=35, state=tk.DISABLED) - self.serial_log.pack(fill=tk.BOTH, expand=True, pady=(0, 10)) - - def _build_command_entry(self, parent): - ttk.Label(parent, text="Send Command:").pack(anchor=tk.W) - cmd_frame = ttk.Frame(parent) - cmd_frame.pack(fill=tk.X) + self.serial_log = scrolledtext.ScrolledText(log_frame, height=4, state=tk.DISABLED) + self.serial_log.pack(fill=tk.BOTH, expand=True, padx=5, pady=(5, 3)) + + cmd_frame = ttk.Frame(log_frame) + cmd_frame.pack(fill=tk.X, padx=5, pady=(0, 5)) self.cmd_entry = ttk.Entry(cmd_frame) self.cmd_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) self.cmd_entry.bind("", lambda e: self._send_custom_command()) - ttk.Button(cmd_frame, text="Send", command=self._send_custom_command).pack(side=tk.RIGHT, padx=(5, 0)) + ttk.Button(cmd_frame, text="Send", width=6, + command=self._send_custom_command).pack(side=tk.RIGHT, padx=(5, 0)) - # === UI Event Handlers === + # ========================================================================= + # Scanner Mode Switching + # ========================================================================= + + def _on_mode_change(self): + """Handle scanner mode change""" + self.scanner_mode = self.scanner_mode_var.get() + self.log_message(f"Scanner mode: {self.scanner_mode}") + + def _on_resolution_change(self, event=None): + """Handle camera resolution change""" + selected_label = self.resolution_combo.get() + + # Find mode name from label + mode_labels = self.camera.get_mode_labels() + mode_name = None + for name, label in mode_labels.items(): + if label == selected_label: + mode_name = name + break + + if mode_name is None: + self.log_message(f"Unknown resolution: {selected_label}") + return + + self.log_message(f"Changing resolution to: {selected_label}") + + # Stop any active scanning + if self.stitch_scanner and self.stitch_scanner.running: + self.stitch_scanner.stop() + self._scan_finished() + + # Change resolution + actual_w, actual_h, actual_fps = self.camera.set_mode(mode_name) + self.log_message(f"Resolution set: {actual_w}x{actual_h} @ {actual_fps:.1f}fps") + self._update_cam_res_label() + + def _update_cam_res_label(self): + """Update camera resolution display""" + w, h = self.camera.get_resolution() + fps = self.camera.get_fps() + self.cam_res_label.config(text=f"{w}x{h} @ {fps:.1f}fps") + + def _update_stitch_config(self): + """Update stitching scanner config from GUI values""" + self.stitch_config.displacement_threshold = self.disp_threshold_var.get() + self.stitch_config.rows = self.num_rows_var.get() + self.stitch_config.row_overlap = self.row_overlap_var.get() + self.stitch_config.scan_speed_index = self.scan_speed_var.get() + self.stitch_config.autofocus_every_row = self.af_every_row_var.get() + + # Reinitialize stitching scanner with new config + self.stitch_scanner = StitchingScanner( + camera=self.camera, + motion_controller=self.motion, + autofocus_controller=self.autofocus, + config=self.stitch_config, + on_log=self.log_message, + on_progress=self._on_stitch_progress, + on_mosaic_updated=self._on_mosaic_updated + ) + + # ========================================================================= + # Overlay Drawing + # ========================================================================= + + def _draw_overlays(self, frame): + """Draw overlays on camera frame""" + frame = self._draw_crosshair(frame) + if self.show_tile_overlay_var.get(): + frame = self._draw_threshold_box(frame) + if self.show_displacement_var.get(): + frame = self._draw_displacement_overlay(frame) + return frame + + def _draw_crosshair(self, frame, color=(0, 0, 255), thickness=1, size=40): + h, w = frame.shape[:2] + cx, cy = w // 2, h // 2 + cv2.line(frame, (cx - size, cy), (cx + size, cy), color, thickness) + cv2.line(frame, (cx, cy - size), (cx, cy + size), color, thickness) + return frame + + def _draw_threshold_box(self, frame, color=(0, 255, 0), thickness=2): + """Draw box showing 10% threshold region""" + h, w = frame.shape[:2] + threshold = self.stitch_config.displacement_threshold + bh, bw = int(h * threshold), int(w * threshold) + cv2.rectangle(frame, (bw, bh), (w - bw, h - bh), color, thickness) + + # Label + cv2.putText(frame, f"{threshold:.0%}", (bw + 5, bh + 20), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1) + return frame + + def _draw_displacement_overlay(self, frame): + """Draw displacement indicator when scanning""" + if not self.stitch_scanner: + return frame + + state = self.stitch_scanner.get_state() + if not state.is_scanning: + return frame + + h, w = frame.shape[:2] + threshold = self.stitch_config.displacement_threshold + + # Calculate fill percentages + threshold_x = w * threshold + threshold_y = h * threshold + + fill_x = min(abs(state.cumulative_x) / threshold_x, 1.0) if threshold_x > 0 else 0 + fill_y = min(abs(state.cumulative_y) / threshold_y, 1.0) if threshold_y > 0 else 0 + + # Draw horizontal progress bar at top + bar_h = 20 + bar_w = int(w * 0.8) + bar_x = (w - bar_w) // 2 + bar_y = 10 + + # Background + cv2.rectangle(frame, (bar_x, bar_y), (bar_x + bar_w, bar_y + bar_h), (50, 50, 50), -1) + # Fill + fill_w = int(bar_w * fill_x) + color = (0, 255, 0) if fill_x >= 1.0 else (0, 165, 255) + cv2.rectangle(frame, (bar_x, bar_y), (bar_x + fill_w, bar_y + bar_h), color, -1) + # Border + cv2.rectangle(frame, (bar_x, bar_y), (bar_x + bar_w, bar_y + bar_h), (200, 200, 200), 1) + + # Text + text = f"X: {state.cumulative_x:.1f}px ({fill_x:.0%})" + cv2.putText(frame, text, (bar_x + 5, bar_y + 15), + cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 1) + + # Direction indicator + dir_text = f"Dir: {state.direction}" + cv2.putText(frame, dir_text, (bar_x + bar_w - 80, bar_y + 15), + cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 1) + + return frame + + # ========================================================================= + # Scanner Handlers + # ========================================================================= + + def _start_scan(self): + """Start the appropriate scanner based on mode""" + if self.scanner_mode == 'stitch': + self._update_stitch_config() + if self.stitch_scanner.start(): + self._scan_started() + else: + # Tile mode + if self.scanner.start(): + self._scan_started() + + def _scan_started(self): + """Update UI when scan starts""" + self.scan_start_btn.config(state='disabled', bg='#888888') + self.scan_pause_btn.config(state='normal') + self.scan_stop_btn.config(state='normal') + self.scan_status_label.config(text="Scanning") + self.scan_progress_bar.start(10) + + def _pause_scan(self): + """Pause/resume the scan""" + scanner = self.stitch_scanner if self.scanner_mode == 'stitch' else self.scanner + + if scanner.paused: + scanner.resume() + self.scan_pause_btn.config(text="⏸", bg='#2196F3') + self.scan_status_label.config(text="Scanning") + self.scan_progress_bar.start(10) + else: + scanner.pause() + self.scan_pause_btn.config(text="▶", bg='#4CAF50') + self.scan_status_label.config(text="Paused") + self.scan_progress_bar.stop() + + def _stop_scan(self): + """Stop the scan""" + if self.scanner_mode == 'stitch': + self.stitch_scanner.stop() + else: + self.scanner.stop() + self._scan_finished() + + def _scan_finished(self): + """Update UI when scan finishes""" + self.scan_start_btn.config(state='normal', bg='#4CAF50') + self.scan_pause_btn.config(state='disabled', text="⏸", bg='#2196F3') + self.scan_stop_btn.config(state='disabled') + self.scan_status_label.config(text="Idle") + self.scan_progress_bar.stop() + + def _update_progress(self, current: int, total: int): + """Update tile scanner progress""" + if total > 0: + self.scan_progress_label.config(text=f"{current}/{total}") + else: + self.scan_progress_label.config(text=f"{current} tiles") + + def _update_stitch_progress(self, appends: int): + """Update stitching scanner progress""" + self.scan_progress_label.config(text=f"{appends} appends") + + def _test_displacement(self): + """Test displacement detection""" + if not self.stitch_scanner: + return + + self.log_message("Testing displacement (10 frames)...") + + def run_test(): + results = self.stitch_scanner.test_displacement(num_frames=10) + self.root.after(0, lambda: self._show_displacement_results(results)) + + threading.Thread(target=run_test, daemon=True).start() + + def _show_displacement_results(self, results): + """Display displacement test results""" + self.log_message(f"Total displacement: X={results['total_dx']:.1f}, Y={results['total_dy']:.1f}") + for fr in results['frames'][:5]: # Show first 5 + self.log_message(f" Frame {fr['frame']}: dx={fr['dx']:.2f}, dy={fr['dy']:.2f}") + + def _emergency_stop(self): + """Emergency stop everything""" + self.motion.stop_all() + if self.stitch_scanner and self.stitch_scanner.running: + self.stitch_scanner.stop() + if self.scanner and self.scanner.running: + self.scanner.stop() + self._scan_finished() + if self.autofocus.is_running(): + self.autofocus.stop() + + # ========================================================================= + # Movement Handlers + # ========================================================================= def _toggle_direction(self, axis): direction = self.motion.toggle_direction(axis) arrow = "→" if direction == 1 else "←" sign = "+" if direction == 1 else "-" - self.dir_labels[axis].config(text=f"{sign} {arrow}") + self.dir_labels[axis].config(text=f"{sign}{arrow}") if self.motion.is_moving(axis): self.motion.stop_axis(axis) self.motion.start_movement(axis) - self.move_buttons[axis].config(text="Moving", bg='orange') + self.move_buttons[axis].config(text="Moving", bg='#FF9800') def _toggle_movement(self, axis): if self.motion.is_moving(axis): self._stop_axis(axis) else: self.motion.start_movement(axis) - self.move_buttons[axis].config(text="Moving", bg='orange') + self.move_buttons[axis].config(text="Moving", bg='#FF9800') def _stop_axis(self, axis): self.motion.stop_axis(axis) - self.move_buttons[axis].config(text="Move", bg='green') + self.move_buttons[axis].config(text="Move", bg='#4CAF50') def _on_speed_slider_change(self, value): if self._updating_slider: @@ -257,7 +736,7 @@ class AppGUI: self._updating_slider = True self.speed_slider.set(index) speed_val = self.motion.get_speed_value(index) - self.fine_speed_label.config(text=f"Speed: {speed_val}") + self.fine_speed_label.config(text=str(speed_val)) self._updating_slider = False self.motion.set_speed(index) @@ -275,23 +754,111 @@ class AppGUI: self.motion.send_command(cmd) self.cmd_entry.delete(0, tk.END) - # === Logging === + # ========================================================================= + # Mosaic Window + # ========================================================================= + + def _show_mosaic_window(self): + if self.mosaic_window is not None: + self.mosaic_window.lift() + self._update_mosaic_window() + return + + self.mosaic_window = tk.Toplevel(self.root) + self.mosaic_window.title("Mosaic") + self.mosaic_window.geometry("600x800") + self.mosaic_window.protocol("WM_DELETE_WINDOW", self._close_mosaic_window) + + self.mosaic_label = ttk.Label(self.mosaic_window, text="No mosaic yet") + self.mosaic_label.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) + + btn_frame = ttk.Frame(self.mosaic_window) + btn_frame.pack(fill=tk.X, padx=10, pady=(0, 10)) + + ttk.Button(btn_frame, text="Save Full Resolution", + command=self._save_mosaic).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_frame, text="Refresh", + command=self._update_mosaic_window).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_frame, text="Clear", + command=self._clear_mosaic).pack(side=tk.LEFT, padx=5) + + self._update_mosaic_window() + + def _update_mosaic_window(self): + if self.mosaic_window is None: + return + + # Get mosaic from appropriate scanner + mosaic = None + if self.scanner_mode == 'stitch' and self.stitch_scanner: + mosaic = self.stitch_scanner.get_mosaic_preview(max_size=580) + elif self.scanner and self.scanner.tiles: + mosaic = self.scanner.get_mosaic_preview(max_size=580) + + if mosaic is None: + return + + mosaic_rgb = cv2.cvtColor(mosaic, cv2.COLOR_BGR2RGB) + img = Image.fromarray(mosaic_rgb) + imgtk = ImageTk.PhotoImage(image=img) + + self.mosaic_label.imgtk = imgtk + self.mosaic_label.config(image=imgtk, text="") + + # Update size label + if self.stitch_scanner: + state = self.stitch_scanner.get_state() + self.mosaic_size_label.config(text=f"{state.mosaic_width}x{state.mosaic_height}") + + def _close_mosaic_window(self): + if self.mosaic_window: + self.mosaic_window.destroy() + self.mosaic_window = None + + def _save_mosaic(self): + filename = filedialog.asksaveasfilename( + defaultextension=".png", + filetypes=[("PNG", "*.png"), ("JPEG", "*.jpg"), ("All", "*.*")] + ) + if not filename: + return + + if self.scanner_mode == 'stitch' and self.stitch_scanner: + if self.stitch_scanner.save_mosaic(filename): + self.log_message(f"Saved: {filename}") + elif self.scanner: + mosaic = self.scanner.build_mosaic(scale=1.0) + if mosaic is not None: + cv2.imwrite(filename, mosaic) + self.log_message(f"Saved: {filename}") + + def _clear_mosaic(self): + """Clear the current mosaic""" + if self.stitch_scanner: + self.stitch_scanner.mosaic = None + self.stitch_scanner.state.mosaic_width = 0 + self.stitch_scanner.state.mosaic_height = 0 + self._update_mosaic_window() + self.log_message("Mosaic cleared") + + # ========================================================================= + # Logging + # ========================================================================= def log_message(self, msg): if not hasattr(self, 'serial_log'): return - self.serial_log.config(state=tk.NORMAL) self.serial_log.insert(tk.END, msg + "\n") self.serial_log.see(tk.END) self.serial_log.config(state=tk.DISABLED) - print(f" Log: {msg}") if msg.startswith("< MODE:"): - mode = msg.split(":")[-1] - self.mode_label.config(text=f"Mode: {mode.capitalize()}") + self.mode_label.config(text=msg.split(":")[-1].capitalize()) - # === Serial & Camera === + # ========================================================================= + # Serial & Camera + # ========================================================================= def _serial_reader(self): while self.running: @@ -306,24 +873,37 @@ class AppGUI: def _process_serial_queue(self): while not self.serial_queue.empty(): - msg = self.serial_queue.get_nowait() - self.log_message(f"< {msg}") + self.log_message(f"< {self.serial_queue.get_nowait()}") + + def _process_tile_queue(self): + updated = False + while not self.tile_queue.empty(): + self.tile_queue.get_nowait() + updated = True + if updated and self.mosaic_window: + self._update_mosaic_window() def update_camera(self): try: frame = self.camera.capture_frame() + frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) if self.live_focus_var.get() and not self.autofocus.is_running(): score = self.autofocus.get_focus_score() self.focus_score_label.config(text=f"Focus: {score:.1f}") - frame = self._draw_crosshair(frame) + # Apply overlays + frame = self._draw_overlays(frame) + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + # Scale to fit h, w = frame.shape[:2] - max_width = 640 - if w > max_width: - scale = max_width / w + max_h = 700 + max_w = 680 + + scale = min(max_h / h, max_w / w) + if scale < 1: frame = cv2.resize(frame, (int(w * scale), int(h * scale))) img = Image.fromarray(frame) @@ -334,42 +914,42 @@ class AppGUI: except Exception as e: print(f"Camera error: {e}") - def _draw_crosshair(self, frame, color=(0, 0, 255), thickness=1, gap=0, size=40): - """ - Draw a crosshair at the center of the frame. + def _update_stitch_state_display(self): + """Update stitch scanner state in UI""" + if not self.stitch_scanner: + return - Args: - frame: OpenCV image - color: BGR color tuple (default green) - thickness: Line thickness - gap: Gap in center of crosshair - size: Length of crosshair arms from center - """ - h, w = frame.shape[:2] - cx, cy = w // 2, h // 2 + state = self.stitch_scanner.get_state() - # Horizontal lines (left and right of center gap) - cv2.line(frame, (cx - size, cy), (cx - gap, cy), color, thickness) - cv2.line(frame, (cx + gap, cy), (cx + size, cy), color, thickness) + # Update displacement label + self.displacement_label.config( + text=f"X: {state.cumulative_x:.1f} Y: {state.cumulative_y:.1f}" + ) - # Vertical lines (above and below center gap) - cv2.line(frame, (cx, cy - size), (cx, cy - gap), color, thickness) - cv2.line(frame, (cx, cy + gap), (cx, cy + size), color, thickness) + # Update row/direction + if state.is_scanning: + self.row_label.config(text=f"{state.current_row + 1}/{state.total_rows}") + self.direction_label.config(text=state.direction) + else: + self.row_label.config(text="--") + self.direction_label.config(text="--") - # Optional: center dot - # cv2.circle(frame, (cx, cy), 2, color, -1) - - return frame - - # === Main Loop === + # Check if scan finished + if not state.is_scanning and self.scan_start_btn['state'] == 'disabled': + self._scan_finished() + + # ========================================================================= + # Main Loop + # ========================================================================= def run(self): def update(): if self.running: self.update_camera() self._process_serial_queue() + self._process_tile_queue() + self._update_stitch_state_display() - # Re-enable AF button when done if not self.autofocus.is_running(): self.af_button.config(state='normal') @@ -380,5 +960,10 @@ class AppGUI: def on_close(self): self.running = False + if self.scanner: + self.scanner.stop() + if self.stitch_scanner: + self.stitch_scanner.stop() self.autofocus.stop() + self._close_mosaic_window() self.root.destroy() \ No newline at end of file diff --git a/src/motion_controller.py b/src/motion_controller.py index 8e2f919..1c8147b 100644 --- a/src/motion_controller.py +++ b/src/motion_controller.py @@ -2,12 +2,12 @@ class MotionController: # Command mapping for each axis AXIS_COMMANDS = { 'X': {'pos': 'E', 'neg': 'W', 'stop': 'e'}, - 'Y': {'pos': 'N', 'neg': 'S', 'stop': 'n'}, + 'Y': {'pos': 'S', 'neg': 'N', 'stop': 'n'}, 'Z': {'pos': 'U', 'neg': 'D', 'stop': 'u'} } # Speed values matching Arduino speedArr - SPEED_VALUES = [0, 2, 5, 10, 30, 50, 70] + SPEED_VALUES = [0, 1, 2, 5, 10, 30, 50, 70] def __init__(self, arduino, on_command_sent=None): """ diff --git a/src/scanner.py b/src/scanner.py index e69de29..0d4a353 100644 --- a/src/scanner.py +++ b/src/scanner.py @@ -0,0 +1,1016 @@ +""" +Scanner Module - Automated slide scanning with visual feedback navigation + +Uses binary template matching to detect tile boundaries instead of step counting. +Camera orientation: Portrait (height > width), X=left/right, Y=up/down + +Enhanced with interpolation-based matching to handle discrete stepper motor steps +that can skip over optimal alignment points. +""" + +import cv2 +import numpy as np +import time +import threading +from dataclasses import dataclass, field +from typing import List, Optional, Callable, Tuple +from enum import Enum + + +class ScanDirection(Enum): + """Scan direction constants""" + RIGHT = 'right' # X+ (E command) + LEFT = 'left' # X- (W command) + DOWN = 'down' # Y- (N command) + UP = 'up' # Y+ (S command) + + +@dataclass +class Tile: + """Represents a captured tile""" + image: np.ndarray + row: int + col: int + x_pos: int + y_pos: int + focus_score: float + timestamp: float + + +@dataclass +class ScanConfig: + """Scanner configuration""" + # Tile extraction + tile_percentage: float = 0.40 # Center portion is the tile + border_percentage: float = 0.30 # Border on each side for tracking + + # Binary comparison settings + similarity_threshold: float = 0.12 # 0-1, higher = stricter matching + comparison_method: str = 'template' # 'template', 'ssim', 'mse', 'hst', 'phase' + + # Interpolation settings (NEW) + num_interpolations: int = 10 # Number of sub-steps to simulate between frames + use_interpolation: bool = True # Enable/disable interpolation matching + use_peak_detection: bool = True # Stop at peak similarity, not just threshold + peak_window_size: int = 5 # Number of frames to track for peak detection + peak_drop_threshold: float = 0.02 # How much similarity must drop to confirm peak + + # Adaptive threshold settings + adaptive_block_size: int = 11 # Must be odd + adaptive_c: int = 2 # Constant subtracted from mean + + # Movement timing + move_check_interval: float = 0.1 # Seconds between frame checks + settle_time: float = 0.2 # Seconds to wait after stopping + max_move_time: float = 120.0 # Safety timeout per tile + + # Scan limits (number of tiles) + max_tiles_per_row: int = 10 + max_rows: int = 10 + + # Scan pattern + start_direction: ScanDirection = ScanDirection.RIGHT + + # Autofocus + autofocus_every_n_tiles: int = -1 + autofocus_every_row: bool = False + + +@dataclass +class ComparisonState: + """Current state of edge comparison for visualization""" + is_tracking: bool = False + is_paused: bool = False + direction: str = '' + reference_edge: str = '' + target_edge: str = '' + + # Images for display (already binarized) + reference_image: Optional[np.ndarray] = None + current_image: Optional[np.ndarray] = None + interpolated_image: Optional[np.ndarray] = None # NEW: Show interpolated frame + + # Comparison result + similarity: float = 0.0 + threshold: float = 0.65 + shift_detected: bool = False + + # Interpolation info (NEW) + best_offset: float = 0.0 # 0-1, where in the interpolation the best match was found + similarity_history: List[float] = field(default_factory=list) + peak_detected: bool = False + + +class Scanner: + """ + Automated slide scanner using visual feedback for navigation. + + Uses binary template matching with interpolation to detect when tile + content has shifted from one edge to the opposite edge, handling the + discrete nature of stepper motor steps. + """ + + def __init__(self, camera, motion_controller, autofocus_controller=None, + config: ScanConfig = None, + on_tile_captured: Callable[[Tile], None] = None, + on_log: Callable[[str], None] = None, + on_progress: Callable[[int, int], None] = None): + self.camera = camera + self.motion = motion_controller + self.autofocus = autofocus_controller + self.config = config or ScanConfig() + + # Callbacks + self.on_tile_captured = on_tile_captured + self.on_log = on_log + self.on_progress = on_progress + + # State + self.running = False + self.paused = False + self.tiles: List[Tile] = [] + self.current_row = 0 + self.current_col = 0 + self.tiles_captured = 0 + + # Comparison state for visualization + self.comparison_state = ComparisonState() + self._state_lock = threading.Lock() + + # Thread + self._thread = None + + def log(self, message: str): + """Log a message""" + if self.on_log: + self.on_log(f"[Scanner] {message}") + print(f"[Scanner] {message}") + + # ========================================================================= + # Edge Region Extraction + # ========================================================================= + + def get_edge_region(self, frame: np.ndarray, edge: str) -> np.ndarray: + """ + Extract a strip from the specified edge for comparison. + + Args: + frame: Input frame + edge: 'left', 'right', 'top', 'bottom' + + Returns: + Edge strip image + """ + h, w = frame.shape[:2] + border_h = int(h * self.config.border_percentage) + border_w = int(w * self.config.border_percentage) + + if edge == 'left': + return frame[:, :border_w].copy() + elif edge == 'right': + return frame[:, w-border_w:].copy() + elif edge == 'top': + return frame[:border_h, :].copy() + elif edge == 'bottom': + return frame[h-border_h:, :].copy() + else: + raise ValueError(f"Unknown edge: {edge}") + + def extract_tile(self, frame: np.ndarray) -> np.ndarray: + """Extract the center tile from a frame""" + h, w = frame.shape[:2] + border_h = int(h * self.config.border_percentage) + border_w = int(w * self.config.border_percentage) + + tile = frame[border_h:h-border_h, border_w:w-border_w] + return tile.copy() + + # ========================================================================= + # Binary Image Processing + # ========================================================================= + + def prepare_edge_image(self, region: np.ndarray) -> np.ndarray: + """ + Convert edge region to an enhanced gradient map. + + Uses Sobel gradients to highlight structure (cells, edges) regardless + of brightness, then thresholds and dilates to make features more + prominent for matching. + """ + # 1. Convert to grayscale + if len(region.shape) == 3: + gray = cv2.cvtColor(region, cv2.COLOR_BGR2GRAY) + else: + gray = region + + # 2. Gaussian Blur + blurred = cv2.GaussianBlur(gray, (5, 5), 0) + + # 3. Calculate Gradient Magnitude (Sobel) + grad_x = cv2.Sobel(blurred, cv2.CV_32F, 1, 0, ksize=3) + grad_y = cv2.Sobel(blurred, cv2.CV_32F, 0, 1, ksize=3) + magnitude = cv2.magnitude(grad_x, grad_y) + + # 4. Normalize to 0-255 + magnitude = cv2.normalize(magnitude, None, 0, 255, cv2.NORM_MINMAX) + magnitude = np.uint8(magnitude) + return magnitude + + # # 5. Threshold to make features more prominent + # _, binary = cv2.threshold(magnitude, 100, 255, cv2.THRESH_BINARY) + + # # 6. Dilate to make features "fatter" and overlap more + # kernel = np.ones((3, 3), np.uint8) + # dilated = cv2.dilate(binary, kernel, iterations=2) + + # return dilated + + # ========================================================================= + # Image Comparison Methods + # ========================================================================= + + def compare_edges(self, reference: np.ndarray, target: np.ndarray) -> float: + """ + Compare two binary edge images. + + Args: + reference: Binary image from reference edge + target: Binary image from target edge + + Returns: + Similarity score (0.0 to 1.0) + """ + # Ensure same size + if reference.shape != target.shape: + target = cv2.resize(target, (reference.shape[1], reference.shape[0])) + + method = self.config.comparison_method + + if method == 'template': + return self._compare_template(reference, target) + elif method == 'ssim': + return self._compare_ssim(reference, target) + elif method == 'mse': + return self._compare_mse(reference, target) + elif method == 'hst': + return self._compare_histogram(reference, target) + elif method == 'phase': + return self._compare_phase_correlation(reference, target) + else: + return self._compare_template(reference, target) + + def _compare_histogram(self, reference: np.ndarray, target: np.ndarray) -> float: + """Compare gradient histograms - works better for sparse images""" + hist_ref = cv2.calcHist([reference], [0], None, [256], [0, 256]) + hist_target = cv2.calcHist([target], [0], None, [256], [0, 256]) + + cv2.normalize(hist_ref, hist_ref) + cv2.normalize(hist_target, hist_target) + + similarity = cv2.compareHist(hist_ref, hist_target, cv2.HISTCMP_CORREL) + return max(0.0, similarity) + + def _compare_template(self, reference: np.ndarray, target: np.ndarray) -> float: + """Template matching using normalized cross-correlation""" + result = cv2.matchTemplate(target, reference, cv2.TM_CCOEFF_NORMED) + min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) + return max(0.0, max_val) + + def _compare_ssim(self, reference: np.ndarray, target: np.ndarray) -> float: + """Structural Similarity Index""" + try: + from skimage.metrics import structural_similarity as ssim + score, _ = ssim(reference, target, full=True) + return float(score) + except ImportError: + self.log("skimage not available, falling back to template matching") + return self._compare_template(reference, target) + + def _compare_mse(self, reference: np.ndarray, target: np.ndarray) -> float: + """Mean Squared Error (inverted to similarity)""" + mse = np.mean((reference.astype(float) - target.astype(float)) ** 2) + similarity = 1 - (mse / 65025) + return max(0.0, similarity) + + def _compare_phase_correlation(self, reference: np.ndarray, target: np.ndarray) -> float: + """ + Phase correlation - good for finding shifts between images. + Returns a similarity score based on correlation peak strength. + """ + ref_f = np.float32(reference) + target_f = np.float32(target) + + # Compute FFT + f_ref = np.fft.fft2(ref_f) + f_target = np.fft.fft2(target_f) + + # Cross-power spectrum + cross_power = (f_ref * np.conj(f_target)) / (np.abs(f_ref * np.conj(f_target)) + 1e-10) + correlation = np.fft.ifft2(cross_power) + correlation = np.abs(correlation) + + # Peak strength as similarity measure + peak_val = np.max(correlation) + # Normalize by image size for consistent scaling + normalized_peak = peak_val / np.sqrt(reference.size) + + return min(1.0, normalized_peak) + + # ========================================================================= + # Interpolation-Based Matching (NEW) + # ========================================================================= + + def _interpolated_match(self, prev_retreat: np.ndarray, curr_retreat: np.ndarray, + reference: np.ndarray, movement_axis: str = 'horizontal' + ) -> Tuple[float, float, np.ndarray]: + """ + Create interpolated frames simulating sub-step positions between + two consecutive frame captures. + + The movement_axis determines how to blend the edge strips: + - 'horizontal': Blend along columns (for top/bottom edge strips during X movement) + - 'vertical': Blend along rows (for left/right edge strips during Y movement) + + Args: + prev_retreat: Previous retreating edge binary image + curr_retreat: Current retreating edge binary image + reference: Reference (leading edge) binary image to match against + movement_axis: 'horizontal' for X movement, 'vertical' for Y movement + + Returns: + Tuple of (best_similarity, best_offset, best_interpolated_image) + - best_offset: 0.0 = prev frame, 1.0 = curr frame + """ + h, w = prev_retreat.shape[:2] + best_similarity = 0.0 + best_offset = 0.0 + best_interpolated = curr_retreat.copy() + + num_interp = self.config.num_interpolations + + for i in range(num_interp + 1): + alpha = i / num_interp # 0.0 to 1.0 + + if movement_axis == 'vertical': + # X movement: blend along columns of horizontal edge strips + interpolated = self._create_horizontal_interpolation( + prev_retreat, curr_retreat, alpha + ) + else: + # Y movement: blend along rows of vertical edge strips + interpolated = self._create_vertical_interpolation( + prev_retreat, curr_retreat, alpha + ) + + similarity = self.compare_edges(reference, interpolated) + + if similarity > best_similarity: + best_similarity = similarity + best_offset = alpha + best_interpolated = interpolated.copy() + + return best_similarity, best_offset, best_interpolated + + def _create_horizontal_interpolation(self, prev: np.ndarray, curr: np.ndarray, + alpha: float) -> np.ndarray: + """ + Create interpolated frame by blending along columns (width dimension). + Used for X movement with top/bottom edge strips (horizontal strips). + + At alpha=0: returns prev + At alpha=1: returns curr + At alpha=0.5: left half from shifted prev, right half from curr + """ + h, w = prev.shape[:2] + split_col = int(w * alpha) + + if split_col <= 0: + return prev.copy() + elif split_col >= w: + return curr.copy() + + interpolated = np.zeros_like(prev) + + # Content from prev that has shifted left (right portion moves to left) + interpolated[:, :w-split_col] = prev[:, split_col:] + + # New content from curr appearing on the right + interpolated[:, w-split_col:] = curr[:, :split_col] + + return interpolated + + def _create_vertical_interpolation(self, prev: np.ndarray, curr: np.ndarray, + alpha: float) -> np.ndarray: + """ + Create interpolated frame by blending along rows (height dimension). + Used for Y movement with left/right edge strips (vertical strips). + + At alpha=0: returns prev + At alpha=1: returns curr + At alpha=0.5: top half from shifted prev, bottom half from curr + """ + h, w = prev.shape[:2] + split_row = int(h * alpha) + + if split_row <= 0: + return prev.copy() + elif split_row >= h: + return curr.copy() + + interpolated = np.zeros_like(prev) + + # Content from prev that has shifted up + interpolated[:h-split_row, :] = prev[split_row:, :] + + # New content from curr appearing at the bottom + interpolated[h-split_row:, :] = curr[:split_row, :] + + return interpolated + + def _detect_peak(self, similarity_history: List[float]) -> Tuple[bool, int]: + """ + Detect if we've passed the similarity peak. + + Returns: + Tuple of (peak_detected, peak_index) + """ + if len(similarity_history) < self.config.peak_window_size: + return False, -1 + + window = similarity_history[-self.config.peak_window_size:] + max_idx = np.argmax(window) + max_val = window[max_idx] + + # Peak is detected if: + # 1. The maximum is not at the edges of the window + # 2. The maximum exceeded our threshold + # 3. Current value has dropped from the peak + if max_idx > 0 and max_idx < len(window) - 1: + if max_val >= self.config.similarity_threshold: + current_drop = max_val - window[-1] + if current_drop >= self.config.peak_drop_threshold: + # Peak confirmed + actual_peak_idx = len(similarity_history) - self.config.peak_window_size + max_idx + return True, actual_peak_idx + + return False, -1 + + # ========================================================================= + # Comparison State Access (for GUI) + # ========================================================================= + + def get_comparison_state(self) -> ComparisonState: + """Get current comparison state (thread-safe)""" + with self._state_lock: + state = ComparisonState( + is_tracking=self.comparison_state.is_tracking, + is_paused=self.paused, + direction=self.comparison_state.direction, + reference_edge=self.comparison_state.reference_edge, + target_edge=self.comparison_state.target_edge, + similarity=self.comparison_state.similarity, + threshold=self.comparison_state.threshold, + shift_detected=self.comparison_state.shift_detected, + best_offset=self.comparison_state.best_offset, + peak_detected=self.comparison_state.peak_detected, + similarity_history=self.comparison_state.similarity_history.copy() + ) + if self.comparison_state.reference_image is not None: + state.reference_image = self.comparison_state.reference_image.copy() + if self.comparison_state.current_image is not None: + state.current_image = self.comparison_state.current_image.copy() + if self.comparison_state.interpolated_image is not None: + state.interpolated_image = self.comparison_state.interpolated_image.copy() + return state + + def _update_comparison_state(self, is_tracking: bool, direction: str = '', + ref_edge: str = '', target_edge: str = '', + ref_image: np.ndarray = None, + current_image: np.ndarray = None, + interpolated_image: np.ndarray = None, + similarity: float = 0.0, + shift_detected: bool = False, + best_offset: float = 0.0, + peak_detected: bool = False, + similarity_history: List[float] = None): + """Update comparison state (thread-safe)""" + with self._state_lock: + self.comparison_state.is_tracking = is_tracking + self.comparison_state.direction = direction + self.comparison_state.reference_edge = ref_edge + self.comparison_state.target_edge = target_edge + self.comparison_state.similarity = similarity + self.comparison_state.threshold = self.config.similarity_threshold + self.comparison_state.shift_detected = shift_detected + self.comparison_state.best_offset = best_offset + self.comparison_state.peak_detected = peak_detected + + if similarity_history is not None: + self.comparison_state.similarity_history = similarity_history.copy() + + if ref_image is not None: + self.comparison_state.reference_image = ref_image.copy() + if current_image is not None: + self.comparison_state.current_image = current_image.copy() + if interpolated_image is not None: + self.comparison_state.interpolated_image = interpolated_image.copy() + + def _clear_comparison_state(self): + """Clear comparison state (but not if paused)""" + if self.paused: + return + with self._state_lock: + self.comparison_state = ComparisonState() + + # ========================================================================= + # Movement with Visual Feedback + # ========================================================================= + + def move_until_tile_shifted(self, direction: ScanDirection) -> bool: + """ + Move in a direction until visual comparison indicates tile shift. + + Uses interpolation between consecutive frames to catch the optimal + alignment point even when discrete stepper steps skip over it. + + Args: + direction: ScanDirection to move + + Returns: + True if tile shift detected, False if timeout/boundary + """ + # Determine edges based on direction (accounting for 90° camera rotation) + # The movement_axis refers to how to interpolate the EDGE STRIPS: + # - X movement: top/bottom edges are horizontal strips, content travels VERTICALLY + # across the frame, so interpolate HORIZONTALLY along the strip width + # - Y movement: left/right edges are vertical strips, content travels HORIZONTALLY + # across the frame, so interpolate VERTICALLY along the strip height + if direction == ScanDirection.RIGHT: + from_edge, to_edge = 'top', 'bottom' + axis = 'X' + movement_axis = 'horizontal' # Interpolate along strip width + self.motion.axis_direction['X'] = 1 + elif direction == ScanDirection.LEFT: + from_edge, to_edge = 'bottom', 'top' + axis = 'X' + movement_axis = 'horizontal' # Interpolate along strip width + self.motion.axis_direction['X'] = -1 + elif direction == ScanDirection.DOWN: + from_edge, to_edge = 'right', 'left' + axis = 'Y' + movement_axis = 'vertical' # Interpolate along strip height + self.motion.axis_direction['Y'] = -1 + elif direction == ScanDirection.UP: + from_edge, to_edge = 'left', 'right' + axis = 'Y' + movement_axis = 'vertical' # Interpolate along strip height + self.motion.axis_direction['Y'] = 1 + else: + raise ValueError(f"Unknown direction: {direction}") + + # Capture and binarize reference edge + frame = self.camera.capture_frame() + reference_region = self.get_edge_region(frame, from_edge) + reference_binary = self.prepare_edge_image(reference_region) + + self.log(f"Captured reference from {from_edge} edge, moving {direction.value}") + self.log(f"Interpolation: {'ON' if self.config.use_interpolation else 'OFF'}, " + f"Peak detection: {'ON' if self.config.use_peak_detection else 'OFF'}") + + # Initialize tracking state + prev_retreat_binary = None + similarity_history = [] + check_count = 0 + + # Update state for visualization + self._update_comparison_state( + is_tracking=True, + direction=direction.value, + ref_edge=from_edge, + target_edge=to_edge, + ref_image=reference_binary, + similarity_history=similarity_history + ) + + # Start movement + self.log(f"Starting movement on {axis} axis") + self.motion.start_movement(axis) + start_time = time.time() + + try: + while self.running and not self.paused: + time.sleep(self.config.move_check_interval) + check_count += 1 + + # Safety timeout + elapsed = time.time() - start_time + if elapsed > self.config.max_move_time: + self.log("Movement timeout - likely at boundary") + return False + + # Capture current retreating edge + current_frame = self.camera.capture_frame() + target_region = self.get_edge_region(current_frame, to_edge) + curr_retreat_binary = self.prepare_edge_image(target_region) + + # Calculate similarity (with or without interpolation) + if self.config.use_interpolation and prev_retreat_binary is not None: + similarity, best_offset, interpolated = self._interpolated_match( + prev_retreat_binary, + curr_retreat_binary, + reference_binary, + movement_axis + ) + else: + similarity = self.compare_edges(reference_binary, curr_retreat_binary) + best_offset = 1.0 + interpolated = curr_retreat_binary + + similarity_history.append(similarity) + + # Update state for visualization + self._update_comparison_state( + is_tracking=True, + direction=direction.value, + ref_edge=from_edge, + target_edge=to_edge, + ref_image=reference_binary, + current_image=curr_retreat_binary, + interpolated_image=interpolated, + similarity=similarity, + best_offset=best_offset, + similarity_history=similarity_history + ) + + # Check for match using peak detection or simple threshold + match_found = False + + if self.config.use_peak_detection: + # Wait for similarity to peak and start declining + peak_detected, peak_idx = self._detect_peak(similarity_history) + if peak_detected: + peak_sim = similarity_history[peak_idx] + self.log(f"Peak detected! Similarity: {peak_sim:.3f} at frame {peak_idx}") + self._update_comparison_state( + is_tracking=True, + direction=direction.value, + ref_edge=from_edge, + target_edge=to_edge, + ref_image=reference_binary, + current_image=curr_retreat_binary, + interpolated_image=interpolated, + similarity=peak_sim, + shift_detected=True, + peak_detected=True, + best_offset=best_offset, + similarity_history=similarity_history + ) + match_found = True + else: + # Simple threshold crossing + if similarity >= self.config.similarity_threshold: + self.log(f"Threshold crossed! Similarity: {similarity:.3f}") + self._update_comparison_state( + is_tracking=True, + direction=direction.value, + ref_edge=from_edge, + target_edge=to_edge, + ref_image=reference_binary, + current_image=curr_retreat_binary, + interpolated_image=interpolated, + similarity=similarity, + shift_detected=True, + best_offset=best_offset, + similarity_history=similarity_history + ) + match_found = True + + if match_found: + return True + + # Log progress periodically + if check_count % 10 == 0: + max_hist = max(similarity_history) if similarity_history else 0 + self.log(f" Frame {check_count}: sim={similarity:.3f}, " + f"offset={best_offset:.2f}, max_seen={max_hist:.3f}") + + # Store for next iteration + prev_retreat_binary = curr_retreat_binary.copy() + + finally: + self.motion.stop_axis(axis) + time.sleep(1) + + # Clear state after a delay, but not if paused + if not self.paused: + threading.Timer(1.5, self._clear_comparison_state).start() + + return False + + # ========================================================================= + # Scanning Operations + # ========================================================================= + + def capture_tile(self) -> Tile: + """Capture a single tile at current position""" + frame = self.camera.capture_frame() + # tile_image = self.extract_tile(frame) + frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) + tile_image = frame.copy() + + # Calculate focus score for metadata + try: + from vision import calculate_focus_score_sobel + focus_score = calculate_focus_score_sobel(frame) + except ImportError: + # Fallback: use Laplacian variance + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if len(frame.shape) == 3 else frame + focus_score = cv2.Laplacian(gray, cv2.CV_64F).var() + + tile = Tile( + image=tile_image, + row=self.current_row, + col=self.current_col, + x_pos=self.current_col, + y_pos=self.current_row, + focus_score=focus_score, + timestamp=time.time() + ) + + self.tiles.append(tile) + self.tiles_captured += 1 + + if self.on_tile_captured: + self.on_tile_captured(tile) + + self.log(f"Captured tile [{self.current_row}, {self.current_col}] focus={focus_score:.1f}") + + return tile + + def scan_row(self, direction: ScanDirection) -> List[Tile]: + """Scan a complete row in the given direction.""" + row_tiles = [] + tiles_in_row = 0 + + self.log(f"Starting row {self.current_row} scan ({direction.value})") + + while self.running and not self.paused: + # Capture tile + tile = self.capture_tile() + row_tiles.append(tile) + tiles_in_row += 1 + + # Update progress + if self.on_progress: + self.on_progress(self.tiles_captured, -1) + + # Safety limit + if tiles_in_row >= self.config.max_tiles_per_row: + self.log(f"Max tiles per row reached ({self.config.max_tiles_per_row})") + break + + # Move to next tile position + tile_shifted = self.move_until_tile_shifted(direction) + + if not tile_shifted: + self.log(f"Row {self.current_row} complete, {len(row_tiles)} tiles") + break + + # Update column position + if direction == ScanDirection.RIGHT: + self.current_col += 1 + else: + self.current_col -= 1 + + return row_tiles + + def step_to_next_row(self) -> bool: + """Move down to the next row.""" + if self.current_row >= self.config.max_rows: + self.log(f"Max rows reached ({self.config.max_rows})") + return False + + self.log("Stepping to next row...") + + tile_shifted = self.move_until_tile_shifted(ScanDirection.DOWN) + + if tile_shifted: + self.current_row += 1 + return True + else: + self.log("Bottom edge reached") + return False + + def full_scan(self): + """Execute a complete serpentine scan of the slide""" + self.log("Starting full scan") + self.running = True + self.tiles = [] + self.current_row = 0 + self.current_col = 0 + self.tiles_captured = 0 + + scan_direction = self.config.start_direction + + try: + while self.running: + # Handle pause + while self.paused and self.running: + time.sleep(0.1) + + if not self.running: + break + + # Scan current row + self.scan_row(scan_direction) + + if not self.running or self.paused: + break + + # Try to move to next row + if not self.step_to_next_row(): + break + + # Reverse direction (serpentine) + if scan_direction == ScanDirection.RIGHT: + scan_direction = ScanDirection.LEFT + else: + scan_direction = ScanDirection.RIGHT + + self.log(f"Scan complete! {self.tiles_captured} tiles captured") + + except Exception as e: + self.log(f"Scan error: {e}") + raise + finally: + self.running = False + self._clear_comparison_state() + + # ========================================================================= + # Control Methods + # ========================================================================= + + def start(self): + """Start scanning in background thread""" + if self.running: + self.log("Scan already running") + return False + + self._thread = threading.Thread(target=self.full_scan, daemon=True) + self._thread.start() + return True + + def stop(self): + """Stop scanning""" + self.running = False + self.paused = False + self.motion.stop_all() + self._clear_comparison_state() + self.log("Scan stopped") + + def pause(self): + """Pause scanning""" + self.paused = True + self.motion.stop_all() + self.log("Scan paused") + + def resume(self): + """Resume scanning""" + self.paused = False + self.log("Scan resumed") + + # ========================================================================= + # Test Methods + # ========================================================================= + + def test_edge_comparison(self) -> dict: + """ + Test edge detection and comparison on current frame. + Returns dict with results for debugging. + """ + frame = self.camera.capture_frame() + + results = {} + for edge in ['left', 'right', 'top', 'bottom']: + region = self.get_edge_region(frame, edge) + binary = self.prepare_edge_image(region) + white_pixels = np.sum(binary == 255) + total_pixels = binary.size + results[edge] = { + 'shape': binary.shape, + 'white_ratio': white_pixels / total_pixels, + 'binary': binary + } + + # Test comparison for X movement (raw top ↔ bottom) + results['x_axis_sim'] = self.compare_edges( + results['top']['binary'], + results['bottom']['binary'] + ) + + # Test comparison for Y movement (raw left ↔ right) + results['y_axis_sim'] = self.compare_edges( + results['left']['binary'], + results['right']['binary'] + ) + + return results + + def test_interpolation(self, num_frames: int = 5) -> dict: + """ + Test interpolation matching by capturing multiple frames. + Useful for debugging and tuning interpolation parameters. + """ + self.log(f"Testing interpolation with {num_frames} frames...") + + frames = [] + for i in range(num_frames): + frame = self.camera.capture_frame() + frames.append(frame) + self.log(f" Captured frame {i+1}/{num_frames}") + time.sleep(0.5) + + # Use first frame's leading edge as reference + reference_region = self.get_edge_region(frames[0], 'top') + reference_binary = self.prepare_edge_image(reference_region) + + results = { + 'reference_shape': reference_binary.shape, + 'frames': [] + } + + prev_binary = None + for i, frame in enumerate(frames): + retreat_region = self.get_edge_region(frame, 'bottom') + curr_binary = self.prepare_edge_image(retreat_region) + + # Direct comparison + direct_sim = self.compare_edges(reference_binary, curr_binary) + + # Interpolated comparison (if we have previous frame) + if prev_binary is not None: + interp_sim, best_offset, _ = self._interpolated_match( + prev_binary, curr_binary, reference_binary, 'horizontal' + ) + else: + interp_sim = direct_sim + best_offset = 1.0 + + results['frames'].append({ + 'frame_idx': i, + 'direct_similarity': direct_sim, + 'interpolated_similarity': interp_sim, + 'best_offset': best_offset + }) + + self.log(f" Frame {i}: direct={direct_sim:.3f}, interp={interp_sim:.3f}, offset={best_offset:.2f}") + prev_binary = curr_binary.copy() + + return results + + # ========================================================================= + # Mosaic Building + # ========================================================================= + + def build_mosaic(self, scale: float = 0.25) -> np.ndarray: + """Build a mosaic image from captured tiles.""" + if not self.tiles: + return None + + max_row = max(t.row for t in self.tiles) + max_col = max(t.col for t in self.tiles) + min_col = min(t.col for t in self.tiles) + + tile_h, tile_w = self.tiles[0].image.shape[:2] + scaled_h = int(tile_h * scale) + scaled_w = int(tile_w * scale) + + num_rows = max_row + 1 + num_cols = max_col - min_col + 1 + + mosaic = np.zeros((num_rows * scaled_h, num_cols * scaled_w, 3), dtype=np.uint8) + + for tile in self.tiles: + row = tile.row + col = tile.col - min_col + + scaled_tile = cv2.resize(tile.image, (scaled_w, scaled_h)) + + y = row * scaled_h + x = col * scaled_w + mosaic[y:y+scaled_h, x:x+scaled_w] = scaled_tile + + return mosaic + + def get_mosaic_preview(self, max_size: int = 800) -> np.ndarray: + """Get a preview-sized mosaic.""" + mosaic = self.build_mosaic(scale=1.0) + if mosaic is None: + return None + + h, w = mosaic.shape[:2] + if max(h, w) > max_size: + scale = max_size / max(h, w) + mosaic = cv2.resize(mosaic, (int(w * scale), int(h * scale))) + + return mosaic \ No newline at end of file diff --git a/src/stitching_scanner.py b/src/stitching_scanner.py new file mode 100644 index 0000000..d47f9a3 --- /dev/null +++ b/src/stitching_scanner.py @@ -0,0 +1,734 @@ +""" +Stitching Scanner v2 - Fixed displacement tracking + +Key fix: Track displacement since last APPEND, not just cumulative. +The strip width must match actual movement since we last added to the mosaic. +""" + +import cv2 +import numpy as np +import time +import threading +from dataclasses import dataclass, field +from typing import List, Optional, Callable, Tuple +from enum import Enum + + +class ScanDirection(Enum): + """Scan direction constants""" + RIGHT = 'right' # X+ (E command) + LEFT = 'left' # X- (W command) + DOWN = 'down' # Y- (N command) + UP = 'up' # Y+ (S command) + + +@dataclass +class StitchConfig: + """Stitching scanner configuration""" + # Displacement threshold (percentage of frame size) + displacement_threshold: float = 0.10 # 10% of frame dimension + + # Movement timing + movement_interval: float = 0.001 # Seconds of motor on time + frame_interval: float = 0.25 # Seconds between frame captures (settle time) + settle_time: float = 0.5 # Seconds to wait after stopping + max_scan_time: float = 2400.0 # Safety timeout (5 minutes) + + # Scan pattern + rows: int = 3 + row_overlap: float = 0.15 + + # Speed setting for scanning + scan_speed_index: int = 3 + + # Focus + autofocus_every_row: bool = True + + # Memory management + max_mosaic_width: int = 11000 + max_mosaic_height: int = 11000 +# 11000, 24500, 450000 + +@dataclass +class StitchState: + """Current state for visualization""" + is_scanning: bool = False + direction: str = '' + + # Displacement tracking + cumulative_x: float = 0.0 + cumulative_y: float = 0.0 + last_displacement: Tuple[float, float] = (0.0, 0.0) + + # Progress + current_row: int = 0 + total_rows: int = 0 + + # Mosaic size + mosaic_width: int = 0 + mosaic_height: int = 0 + + # Debug + frame_count: int = 0 + append_count: int = 0 + + +class StitchingScanner: + """ + Slide scanner using continuous stitching with correct displacement tracking. + + Key insight: We must track displacement since the LAST APPEND, and the + strip we append must exactly match that displacement. + """ + + def __init__(self, camera, motion_controller, autofocus_controller=None, + config: StitchConfig = None, + on_log: Callable[[str], None] = None, + on_progress: Callable[[int, int], None] = None, + on_mosaic_updated: Callable[[], None] = None): + self.camera = camera + self.motion = motion_controller + self.autofocus = autofocus_controller + self.config = config or StitchConfig() + + # Callbacks + self.on_log = on_log + self.on_progress = on_progress + self.on_mosaic_updated = on_mosaic_updated + + # State + self.running = False + self.paused = False + self.state = StitchState() + self._state_lock = threading.Lock() + + # Mosaic data + self.mosaic: Optional[np.ndarray] = None + self._mosaic_lock = threading.Lock() + + # Frame tracking - KEY CHANGE: separate reference for displacement calc vs append + self._prev_frame: Optional[np.ndarray] = None # For frame-to-frame displacement + self._append_ref_frame: Optional[np.ndarray] = None # Reference from last append + self._displacement_since_append_x: float = 0.0 # Accumulated since last append + self._displacement_since_append_y: float = 0.0 + + # Thread + self._thread: Optional[threading.Thread] = None + + def log(self, message: str): + """Log a message""" + if self.on_log: + self.on_log(f"[Stitch] {message}") + print(f"[Stitch] {message}") + + # ========================================================================= + # Displacement Detection + # ========================================================================= + + def _to_grayscale(self, frame: np.ndarray) -> np.ndarray: + """Convert frame to grayscale""" + if len(frame.shape) == 3: + return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + return frame + + def _detect_displacement(self, prev_frame: np.ndarray, + curr_frame: np.ndarray) -> Tuple[float, float]: + """ + Detect displacement between two frames using phase correlation. + Returns (dx, dy) in pixels. + """ + prev_gray = self._to_grayscale(prev_frame) + curr_gray = self._to_grayscale(curr_frame) + + if prev_gray.shape != curr_gray.shape: + return (0.0, 0.0) + + prev_f = prev_gray.astype(np.float32) + curr_f = curr_gray.astype(np.float32) + + # Apply window function to reduce edge effects + h, w = prev_gray.shape + window = cv2.createHanningWindow((w, h), cv2.CV_32F) + prev_f = prev_f * window + curr_f = curr_f * window + + shift, response = cv2.phaseCorrelate(prev_f, curr_f) + dx, dy = shift + + return (dx, dy) + + def _detect_displacement_robust(self, prev_frame: np.ndarray, + curr_frame: np.ndarray) -> Tuple[float, float]: + """Displacement detection with sanity checks""" + dx, dy = self._detect_displacement(prev_frame, curr_frame) + + h, w = prev_frame.shape[:2] + max_displacement = max(w, h) * 0.5 + + if abs(dx) > max_displacement or abs(dy) > max_displacement: + self.log(f"Warning: Large displacement ({dx:.1f}, {dy:.1f}), ignoring") + return (0.0, 0.0) + + return (dx, dy) + + # ========================================================================= + # Mosaic Building - FIXED VERSION + # ========================================================================= + + def _init_mosaic(self, frame: np.ndarray): + """Initialize mosaic with first frame""" + with self._mosaic_lock: + self.mosaic = frame.copy() + + # Set reference frames + self._prev_frame = frame.copy() + self._append_ref_frame = frame.copy() + self._displacement_since_append_x = 0.0 + self._displacement_since_append_y = 0.0 + + with self._state_lock: + h, w = frame.shape[:2] + self.state.mosaic_width = w + self.state.mosaic_height = h + self.state.frame_count = 1 + self.state.append_count = 0 + + self.log(f"Initialized mosaic: {frame.shape[1]}x{frame.shape[0]}") + + def _blend_strips_horizontal(self, base: np.ndarray, strip: np.ndarray, + blend_width: int, append_right: bool) -> np.ndarray: + """Blend strip onto base with gradient at seam to hide discontinuities.""" + if blend_width <= 0 or blend_width >= strip.shape[1]: + if append_right: + return np.hstack([base, strip]) + else: + return np.hstack([strip, base]) + + h_base, w_base = base.shape[:2] + h_strip, w_strip = strip.shape[:2] + + if h_strip != h_base: + # Height mismatch - can't blend properly + if append_right: + return np.hstack([base, strip]) + return np.hstack([strip, base]) + + blend_w = min(blend_width, w_strip, w_base) + + if append_right: + # base | blend_zone | rest_of_strip + result_width = w_base + w_strip - blend_w + result = np.zeros((h_base, result_width, 3), dtype=np.uint8) + + # Copy base + result[:, :w_base] = base + + # Create gradient: 1->0 for base weight + alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis] + + base_overlap = base[:, -blend_w:].astype(np.float32) + strip_overlap = strip[:, :blend_w].astype(np.float32) + blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8) + + result[:, w_base - blend_w:w_base] = blended + result[:, w_base:] = strip[:, blend_w:] + + return result + else: + # rest_of_strip | blend_zone | base + result_width = w_base + w_strip - blend_w + result = np.zeros((h_base, result_width, 3), dtype=np.uint8) + + result[:, :w_strip] = strip + + alpha = np.linspace(0, 1, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis] + + strip_overlap = strip[:, -blend_w:].astype(np.float32) + base_overlap = base[:, :blend_w].astype(np.float32) + blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8) + + result[:, w_strip - blend_w:w_strip] = blended + result[:, w_strip:] = base[:, blend_w:] + + return result + + def _append_to_mosaic_fixed(self, frame: np.ndarray, direction: ScanDirection): + """ + FIXED: Append with blending and fractional pixel preservation. + + Key improvements: + 1. Gradient blending at seams to hide color discontinuities + 2. Preserve fractional pixel remainder to prevent cumulative drift + 3. Small safety margin for alignment tolerance + """ + BLEND_WIDTH = 10 # Pixels to blend at seam + SAFETY_MARGIN = 2 # Extra pixels as tolerance + + with self._mosaic_lock: + if self.mosaic is None: + return + + h, w = frame.shape[:2] + mh, mw = self.mosaic.shape[:2] + + dx = abs(self._displacement_since_append_x) + dy = abs(self._displacement_since_append_y) + + if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]: + # Round and add safety margin + append_width = round(dx) + SAFETY_MARGIN + append_width = min(append_width, w - BLEND_WIDTH - 5) + + if append_width < 1: + return + + # Calculate fractional remainder to preserve + pixels_consumed = append_width - SAFETY_MARGIN + fractional_remainder = dx - pixels_consumed + + if direction == ScanDirection.RIGHT: + # Grab strip with extra for blending + strip_start = max(0, w - append_width - BLEND_WIDTH) + new_strip = frame[:, strip_start:] + self.mosaic = self._blend_strips_horizontal( + self.mosaic, new_strip, BLEND_WIDTH, append_right=True) + else: + strip_end = min(w, append_width + BLEND_WIDTH) + new_strip = frame[:, :strip_end] + self.mosaic = self._blend_strips_horizontal( + self.mosaic, new_strip, BLEND_WIDTH, append_right=False) + + # KEEP fractional remainder instead of resetting to 0! + self._displacement_since_append_x = fractional_remainder + self._displacement_since_append_y = 0.0 + + elif direction in [ScanDirection.DOWN, ScanDirection.UP]: + append_height = round(dy) + SAFETY_MARGIN + append_height = min(append_height, h - BLEND_WIDTH - 5) + + if append_height < 1: + return + + pixels_consumed = append_height - SAFETY_MARGIN + fractional_remainder = dy - pixels_consumed + + if direction == ScanDirection.DOWN: + strip_start = max(0, h - append_height - BLEND_WIDTH) + new_strip = frame[strip_start:, :] + + # Match widths + if new_strip.shape[1] > mw: + new_strip = new_strip[:, :mw] + elif new_strip.shape[1] < mw: + pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8) + new_strip = np.hstack([new_strip, pad]) + + # Vertical blend + blend_h = min(BLEND_WIDTH, new_strip.shape[0], mh) + alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis] + + base_overlap = self.mosaic[-blend_h:].astype(np.float32) + strip_overlap = new_strip[:blend_h].astype(np.float32) + blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8) + + result_h = mh + new_strip.shape[0] - blend_h + result = np.zeros((result_h, mw, 3), dtype=np.uint8) + result[:mh - blend_h] = self.mosaic[:-blend_h] + result[mh - blend_h:mh] = blended + result[mh:] = new_strip[blend_h:] + self.mosaic = result + else: + strip_end = min(h, append_height + BLEND_WIDTH) + new_strip = frame[:strip_end, :] + + if new_strip.shape[1] > mw: + new_strip = new_strip[:, :mw] + elif new_strip.shape[1] < mw: + pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8) + new_strip = np.hstack([new_strip, pad]) + + self.mosaic = np.vstack([new_strip, self.mosaic]) + + self._displacement_since_append_x = 0.0 + self._displacement_since_append_y = fractional_remainder + + new_mh, new_mw = self.mosaic.shape[:2] + + # Update state + with self._state_lock: + self.state.mosaic_width = new_mw + self.state.mosaic_height = new_mh + self.state.append_count += 1 + + # Update reference frame (fractional remainder already set above - don't reset!) + self._append_ref_frame = frame.copy() + + if self.on_mosaic_updated: + self.on_mosaic_updated() + + def _start_new_row(self, frame: np.ndarray, direction: ScanDirection): + """Start a new row in the mosaic""" + with self._mosaic_lock: + if self.mosaic is None: + self._init_mosaic(frame) + return + + h, w = frame.shape[:2] + mh, mw = self.mosaic.shape[:2] + + # Calculate overlap + overlap_pixels = int(h * self.config.row_overlap) + append_height = h - overlap_pixels + + if direction == ScanDirection.DOWN: + new_strip = frame[overlap_pixels:, :] + + if new_strip.shape[1] < mw: + pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8) + new_strip = np.hstack([new_strip, pad]) + elif new_strip.shape[1] > mw: + new_strip = new_strip[:, :mw] + + self.mosaic = np.vstack([self.mosaic, new_strip]) + else: + new_strip = frame[:append_height, :] + + if new_strip.shape[1] < mw: + pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8) + new_strip = np.hstack([new_strip, pad]) + elif new_strip.shape[1] > mw: + new_strip = new_strip[:, :mw] + + self.mosaic = np.vstack([new_strip, self.mosaic]) + + # Reset all tracking for new row + self._prev_frame = frame.copy() + self._append_ref_frame = frame.copy() + self._displacement_since_append_x = 0.0 + self._displacement_since_append_y = 0.0 + + with self._state_lock: + self.state.mosaic_height = self.mosaic.shape[0] + self.state.mosaic_width = self.mosaic.shape[1] + + self.log(f"New row started, mosaic: {self.mosaic.shape[1]}x{self.mosaic.shape[0]}") + + # ========================================================================= + # Scan Control + # ========================================================================= + + def start(self) -> bool: + """Start the stitching scan""" + if self.running: + self.log("Already running") + return False + + self.running = True + self.paused = False + + with self._state_lock: + self.state = StitchState() + self.state.is_scanning = True + self.state.total_rows = self.config.rows + + with self._mosaic_lock: + self.mosaic = None + + self._prev_frame = None + self._append_ref_frame = None + self._displacement_since_append_x = 0.0 + self._displacement_since_append_y = 0.0 + + self._thread = threading.Thread(target=self._scan_loop, daemon=True) + self._thread.start() + + self.log("Stitching scan started") + return True + + def stop(self): + """Stop the scan""" + self.running = False + self.paused = False + self.motion.stop_all() + + with self._state_lock: + self.state.is_scanning = False + + self.log("Scan stopped") + + def pause(self): + """Pause the scan""" + if self.running and not self.paused: + self.paused = True + self.motion.stop_all() + self.log("Scan paused") + + def resume(self): + """Resume the scan""" + if self.running and self.paused: + self.paused = False + self.log("Scan resumed") + + # ========================================================================= + # Main Scan Loop + # ========================================================================= + + def _scan_loop(self): + """Main scanning loop""" + try: + self.log("Starting scan loop") + + self.motion.set_speed(self.config.scan_speed_index) + time.sleep(0.1) + + frame = self._capture_frame() + self._init_mosaic(frame) + + for row in range(self.config.rows): + if not self.running: + break + + with self._state_lock: + self.state.current_row = row + + self.log(f"=== Row {row + 1}/{self.config.rows} ===") + + # Serpentine pattern + if row % 2 == 0: + h_direction = ScanDirection.RIGHT + else: + h_direction = ScanDirection.LEFT + + self._scan_horizontal(h_direction) + + if not self.running: + break + + if row < self.config.rows - 1: + self._move_to_next_row() + + self.log("Scan complete!") + + except Exception as e: + self.log(f"Scan error: {e}") + import traceback + traceback.print_exc() + finally: + self.running = False + self.motion.stop_all() + with self._state_lock: + self.state.is_scanning = False + + def _scan_horizontal(self, direction: ScanDirection): + """Scan horizontally with fixed displacement tracking""" + self.log(f"Scanning {direction.value}...") + + with self._state_lock: + self.state.direction = direction.value + + frame = self._capture_frame() + h, w = frame.shape[:2] + threshold_pixels = w * self.config.displacement_threshold + + # Initialize tracking + self._prev_frame = frame.copy() + self._append_ref_frame = frame.copy() + self._displacement_since_append_x = 0.0 + self._displacement_since_append_y = 0.0 + + start_time = time.time() + no_movement_count = 0 + max_no_movement = 50 + + while self.running and not self.paused: + if time.time() - start_time > self.config.max_scan_time: + self.log("Scan timeout") + break + + # Pulse the motor + if direction == ScanDirection.RIGHT: + self.motion.send_command('E') + else: + self.motion.send_command('W') + + time.sleep(self.config.movement_interval) + + if direction == ScanDirection.RIGHT: + self.motion.send_command('e') + else: + self.motion.send_command('w') + + # Wait for settle + time.sleep(self.config.frame_interval) + + # Capture and measure + curr_frame = self._capture_frame() + dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame) + + # Accumulate displacement SINCE LAST APPEND + self._displacement_since_append_x += dx + self._displacement_since_append_y += dy + + with self._state_lock: + self.state.cumulative_x = self._displacement_since_append_x + self.state.cumulative_y = self._displacement_since_append_y + self.state.last_displacement = (dx, dy) + self.state.frame_count += 1 + + # Check for no movement + if abs(dx) < 1.0 and abs(dy) < 1.0: + no_movement_count += 1 + if no_movement_count >= max_no_movement: + self.log(f"Edge detected (no movement for {no_movement_count} frames)") + break + else: + no_movement_count = 0 + + # Check threshold and append + if abs(self._displacement_since_append_x) >= threshold_pixels: + self._append_to_mosaic_fixed(curr_frame, direction) + self.log(f"Appended {abs(self._displacement_since_append_x):.1f}px strip, " + f"mosaic: {self.state.mosaic_width}x{self.state.mosaic_height}") + + # Update prev_frame for next displacement calculation + self._prev_frame = curr_frame.copy() + + if self.on_progress: + self.on_progress(self.state.append_count, 0) + + # Stop + if direction == ScanDirection.RIGHT: + self.motion.send_command('e') + else: + self.motion.send_command('w') + + time.sleep(self.config.settle_time) + + def _move_to_next_row(self): + """Move down to next row""" + self.log("Moving to next row...") + + frame = self._capture_frame() + h, w = frame.shape[:2] + move_distance = h * (1 - self.config.row_overlap) + + with self._state_lock: + self.state.direction = 'down' + + self.motion.send_command('N') + + self._prev_frame = frame.copy() + cumulative_y = 0.0 + + while self.running: + time.sleep(self.config.frame_interval) + + curr_frame = self._capture_frame() + dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame) + + cumulative_y += dy + self._prev_frame = curr_frame.copy() + + with self._state_lock: + self.state.cumulative_y = cumulative_y + + if abs(cumulative_y) >= move_distance: + break + + if abs(cumulative_y) < 5 and self.state.frame_count > 50: + self.log("Warning: Minimal Y movement") + break + + self.motion.send_command('n') + time.sleep(self.config.settle_time) + + frame = self._capture_frame() + self._start_new_row(frame, ScanDirection.DOWN) + + def _capture_frame(self) -> np.ndarray: + """Capture and rotate frame""" + frame = self.camera.capture_frame() + frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) + return frame + + # ========================================================================= + # Getters + # ========================================================================= + + def get_state(self) -> StitchState: + """Get current scan state""" + with self._state_lock: + return StitchState( + is_scanning=self.state.is_scanning, + direction=self.state.direction, + cumulative_x=self.state.cumulative_x, + cumulative_y=self.state.cumulative_y, + last_displacement=self.state.last_displacement, + current_row=self.state.current_row, + total_rows=self.state.total_rows, + mosaic_width=self.state.mosaic_width, + mosaic_height=self.state.mosaic_height, + frame_count=self.state.frame_count, + append_count=self.state.append_count + ) + + def get_mosaic(self) -> Optional[np.ndarray]: + """Get current mosaic (full resolution)""" + with self._mosaic_lock: + if self.mosaic is not None: + return self.mosaic.copy() + return None + + def get_mosaic_preview(self, max_size: int = 600) -> Optional[np.ndarray]: + """Get scaled mosaic for preview""" + with self._mosaic_lock: + if self.mosaic is None: + return None + + h, w = self.mosaic.shape[:2] + scale = min(max_size / w, max_size / h, 1.0) + + if scale < 1.0: + new_w = int(w * scale) + new_h = int(h * scale) + return cv2.resize(self.mosaic, (new_w, new_h)) + + return self.mosaic.copy() + + def save_mosaic(self, filepath: str) -> bool: + """Save mosaic to file""" + with self._mosaic_lock: + if self.mosaic is None: + return False + + cv2.imwrite(filepath, self.mosaic) + self.log(f"Saved mosaic to {filepath}") + return True + + # ========================================================================= + # Testing + # ========================================================================= + + def test_displacement(self, num_frames: int = 10) -> dict: + """Test displacement detection""" + results = { + 'frames': [], + 'total_dx': 0.0, + 'total_dy': 0.0 + } + + prev_frame = self._capture_frame() + + for i in range(num_frames): + time.sleep(0.1) + curr_frame = self._capture_frame() + + dx, dy = self._detect_displacement(prev_frame, curr_frame) + + results['frames'].append({'frame': i, 'dx': dx, 'dy': dy}) + results['total_dx'] += dx + results['total_dy'] += dy + + prev_frame = curr_frame + + return results \ No newline at end of file