From 2acfccf6e1cfb317c9a207363e427519f13f9021 Mon Sep 17 00:00:00 2001 From: Shaiv Kamat Date: Tue, 30 Dec 2025 21:15:04 -0800 Subject: [PATCH] Sweift tracking --- src/autofocus.py | 318 +++++++++----- src/gui.py | 623 +++++++++++++++++++-------- src/motion_controller.py | 4 +- src/scanner.py | 883 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 1546 insertions(+), 282 deletions(-) diff --git a/src/autofocus.py b/src/autofocus.py index 89ccfff..20499b3 100644 --- a/src/autofocus.py +++ b/src/autofocus.py @@ -4,16 +4,31 @@ from vision import calculate_focus_score_sobel class AutofocusController: - """Manages autofocus operations""" + """ + Manages autofocus operations using visual feedback. + + Key design principle: Never trust step counts due to gear slippage. + Use focus score as the authoritative position feedback. + """ # Default timing settings DEFAULT_SETTINGS = { + # Sweep settings 'sweep_move_time': 0.5, - 'sweep_settle_time': 0.05, + 'sweep_settle_time': 0.01, 'sweep_samples': 10, 'sweep_steps': 30, - 'fine_move_time': 0.15, - 'fine_settle_time': 0.1, + + # Navigation settings (fast approach to peak) + 'nav_move_time': 0.3, # Quick bursts + 'nav_check_samples': 3, # Fewer samples for speed + 'nav_peak_threshold': 0.80, # Consider "near peak" at 80% of target + 'nav_decline_threshold': 1.00, # Detect peak passage at 8% drop + 'nav_max_moves': 100, # Safety limit + + # Fine tuning settings + 'fine_move_time': 0.30, + 'fine_settle_time': 0.01, 'fine_samples': 10, 'fine_max_no_improvement': 5, } @@ -74,6 +89,11 @@ class AutofocusController: return sum(scores) / len(scores) if scores else 0 + def get_quick_focus(self, samples=3): + """Quick focus reading with fewer samples for navigation""" + scores = [self.get_focus_score() for _ in range(samples)] + return sum(scores) / len(scores) + # === Autofocus Control === def start(self, speed_range=(1, 5), coarse=False, fine=False): @@ -111,31 +131,33 @@ class AutofocusController: def _autofocus_routine(self, speed_range): """ - Autofocus using sweep search + hill climbing. - Phase 1: Sweep across range to find approximate peak - Phase 2: Fine tune from best position found + Autofocus using sweep search + visual navigation + hill climbing. + + Phase 1: Sweep to map focus curve and find approximate peak + Phase 2: Navigate to peak using visual feedback + Phase 3: Fine tune with hill climbing """ self.log(f"Starting autofocus (speed range: {speed_range})") min_speed_idx, max_speed_idx = speed_range - s = self.settings # Shorthand + s = self.settings try: - # Phase 1: Sweep search - best_step, best_score = self._sweep_search( + # Phase 1: Sweep search - map the focus curve + best_score, peak_direction = self._sweep_search( min_speed_idx, max_speed_idx, s ) if not self.running: return - # Move to best position found - self._move_to_best_position(best_step, s) + # Phase 2: Navigate to peak + self._navigate_to_peak(best_score, peak_direction, s) if not self.running: return - # Phase 2: Fine tuning + # Phase 3: Fine tuning with hill climbing final_score = self._fine_tune(min_speed_idx, s) self.log(f"Autofocus complete. Final: {final_score:.1f}") @@ -146,30 +168,35 @@ class AutofocusController: self.running = False def _sweep_search(self, min_speed_idx, max_speed_idx, s): - """Phase 1: Sweep to find approximate best position""" + """ + Phase 1: Sweep to map focus curve. + + Returns: + best_score: The highest focus score observed + peak_direction: 'up' or 'down' - which direction has the peak + """ self.log("Phase 1: Sweep search") - # Use medium speed for sweep sweep_speed = (min_speed_idx + max_speed_idx) // 2 self.motion.set_speed(sweep_speed) time.sleep(0.1) - # Record starting position + # Record starting position score time.sleep(s['sweep_settle_time']) start_score = self.get_averaged_focus(samples=s['sweep_samples']) self.update_focus_display(start_score) - - sweep_data = [(0, start_score)] self.log(f"Start position: {start_score:.1f}") - if not self.running: - return 0, start_score + sweep_data = {'start': start_score, 'down': [], 'up': []} - # Sweep DOWN first (away from slide to avoid contact) + if not self.running: + return start_score, 'up' + + # Sweep DOWN first (away from slide) self.log(f"Sweeping down {s['sweep_steps']} steps...") for i in range(1, s['sweep_steps'] + 1): if not self.running: - return 0, start_score + return start_score, 'up' self.motion.move_z_down() time.sleep(s['sweep_move_time']) @@ -177,35 +204,24 @@ class AutofocusController: time.sleep(s['sweep_settle_time']) score = self.get_averaged_focus(samples=s['sweep_samples']) - sweep_data.append((-i, score)) + sweep_data['down'].append(score) self.update_focus_display(score) if i % 5 == 0: - self.log(f" Step -{i}: {score:.1f}") + self.log(f" Down {i}: {score:.1f}") + + # Remember score at bottom + bottom_score = sweep_data['down'][-1] if sweep_data['down'] else start_score if not self.running: - return 0, start_score + return start_score, 'up' - # Return to start - self.log("Returning to start...") - for _ in range(s['sweep_steps']): + # Now sweep UP past start to the other side + # We'll go UP for 2x sweep_steps (to cover both halves) + self.log(f"Sweeping up {s['sweep_steps'] * 2} steps...") + for i in range(1, s['sweep_steps'] * 2 + 1): if not self.running: - return 0, start_score - self.motion.move_z_up() - time.sleep(s['sweep_move_time']) - self.motion.stop_z() - time.sleep(0.1) - - time.sleep(s['sweep_settle_time']) - - if not self.running: - return 0, start_score - - # Sweep UP - self.log(f"Sweeping up {s['sweep_steps']} steps...") - for i in range(1, s['sweep_steps'] + 1): - if not self.running: - return 0, start_score + return start_score, 'down' self.motion.move_z_up() time.sleep(s['sweep_move_time']) @@ -213,73 +229,145 @@ class AutofocusController: time.sleep(s['sweep_settle_time']) score = self.get_averaged_focus(samples=s['sweep_samples']) - sweep_data.append((i, score)) + sweep_data['up'].append(score) self.update_focus_display(score) if i % 5 == 0: - self.log(f" Step +{i}: {score:.1f}") + self.log(f" Up {i}: {score:.1f}") - # Find best position - best_step, best_score = max(sweep_data, key=lambda x: x[1]) - self.log(f"Best found at step {best_step}: {best_score:.1f}") + # Analyze sweep data to find peak + all_scores = sweep_data['down'] + [start_score] + sweep_data['up'] + best_score = max(all_scores) - # Log sweep curve - sorted_data = sorted(sweep_data, key=lambda x: x[0]) - curve_str = " ".join([f"{d[1]:.0f}" for d in sorted_data]) - self.log(f"Sweep curve: {curve_str}") + down_best = max(sweep_data['down']) if sweep_data['down'] else 0 + up_best = max(sweep_data['up']) if sweep_data['up'] else 0 - return best_step, best_score - - def _move_to_best_position(self, best_step, s): - """Move from current position (+SWEEP_STEPS) to best_step""" - steps_to_move = s['sweep_steps'] - best_step - - if steps_to_move > 0: - self.log(f"Moving down {steps_to_move} steps to best position") - move_func = self.motion.move_z_down + # Determine which direction has the peak + # We're currently at the TOP of the sweep (after going up) + # So to reach the peak, we need to go DOWN if peak was in down region or middle + if down_best > up_best and down_best > start_score: + peak_direction = 'down' + self.log(f"Peak in DOWN region: {down_best:.1f}") + elif up_best >= down_best and up_best > start_score: + # Peak is in up region, but we need to check where in the up region + # If peak is in first half of up (returning toward start), go down + # If peak is in second half of up (past start), we might already be close + up_scores = sweep_data['up'] + mid_point = len(up_scores) // 2 + first_half_best = max(up_scores[:mid_point]) if mid_point > 0 else 0 + second_half_best = max(up_scores[mid_point:]) if mid_point < len(up_scores) else 0 + + if second_half_best >= first_half_best: + # Peak is near current position (top), go down slightly + peak_direction = 'down' + else: + # Peak is further down + peak_direction = 'down' + self.log(f"Peak in UP region: {up_best:.1f}") else: - self.log(f"Moving up {abs(steps_to_move)} steps to best position") - move_func = self.motion.move_z_up - steps_to_move = abs(steps_to_move) + peak_direction = 'down' + self.log(f"Peak near start: {start_score:.1f}") - for _ in range(steps_to_move): + # Log sweep curve for debugging + self.log(f"Best score found: {best_score:.1f}") + self.log(f"Navigate direction: {peak_direction}") + + return best_score, peak_direction + + def _navigate_to_peak(self, target_score, direction, s): + """ + Phase 2: Navigate toward peak using camera + """ + self.log(f"Phase 2: Navigate to peak (target: {target_score:.1f}, dir: {direction})") + + move_func = self.motion.move_z_down if direction == 'down' else self.motion.move_z_up + + # Use medium-fast speed for approach + self.motion.set_speed(4) + time.sleep(0.1) + + peak_threshold = target_score * s['nav_peak_threshold'] + decline_threshold = s['nav_decline_threshold'] + self.log(f"(peak_threshold: {peak_threshold:.1f}, decline_threshold: {decline_threshold})") + + prev_score = self.get_quick_focus(s['nav_check_samples']) + max_observed = prev_score + moves_in_peak_region = 0 + declining_streak = 0 + + for move_count in range(s['nav_max_moves']): if not self.running: return + + # Quick movement burst move_func() - time.sleep(s['sweep_move_time']) + time.sleep(s['nav_move_time']) self.motion.stop_z() - time.sleep(0.1) + time.sleep(0.05) # Brief settle + + # Check focus + current_score = self.get_quick_focus(s['nav_check_samples']) + self.log(f" Current: {current_score:.1f} (max was {max_observed:.1f}) (Declining {declining_streak:.1f}) (Decline Thres {max_observed * decline_threshold:.1f})") + self.log(f" Current: {current_score:.1f} >= peak thresh({peak_threshold:.1f}) ") + self.log(f" Current: {current_score:.1f} <= max_observed * decline_threshold:({max_observed * decline_threshold:.1f}) ") + self.update_focus_display(current_score) + + # Track maximum observed + if current_score > max_observed: + max_observed = current_score + declining_streak = 0 + self.log("Setting Max") + elif max_observed > 0.8 * target_score: + declining_streak += 1 + + if declining_streak >=2: + self.log(f" Passed peak after {move_count} moves, stopping") + break + + # # Are we in the peak region? + # if current_score >= peak_threshold: + # moves_in_peak_region += 1 + + # # Check if we're past the peak (score declining from max) + # if current_score < max_observed * decline_threshold: + # declining_streak += 1 + # self.log(f" Declining: {current_score:.1f} (max was {max_observed:.1f})") + + # if declining_streak >= 1: + # self.log(f" Passed peak after {move_count} moves, stopping") + # break + # else: + # declining_streak = 0 + # if moves_in_peak_region % 3 == 0: + # self.log(f" In peak region: {current_score:.1f}") + + prev_score = current_score - time.sleep(s['sweep_settle_time']) - current_score = self.get_averaged_focus(samples=s['sweep_samples']) - self.log(f"At best position: {current_score:.1f}") + final_score = self.get_averaged_focus(samples=s['sweep_samples']) + self.log(f"Navigation complete at: {final_score:.1f}") def _fine_tune(self, min_speed_idx, s): - """Phase 2: Fine hill-climbing from best position""" - self.log("Phase 2: Fine tuning") + """ + Phase 3: Fine hill-climbing from current position. + + Already near the peak from navigation, now do precise adjustments. + """ + self.log("Phase 3: Fine tuning") self.motion.set_speed(min_speed_idx) time.sleep(0.1) best_score = self.get_averaged_focus(samples=s['fine_samples']) + self.log(f"Starting fine tune at: {best_score:.1f}") - # Determine fine direction by testing both + # Try both directions to find improvement fine_direction = self._determine_fine_direction(best_score, s) if not self.running: return best_score - # Fine search - best_score, best_position_offset = self._fine_search( - fine_direction, best_score, s - ) - - if not self.running: - return best_score - - # Return to best position - if best_position_offset > 0: - self._return_to_best(fine_direction, best_position_offset, s) + # Search in best direction + best_score = self._fine_search(fine_direction, best_score, s) # Final reading time.sleep(s['fine_settle_time']) @@ -288,9 +376,9 @@ class AutofocusController: return final_score - def _determine_fine_direction(self, best_score, s): + def _determine_fine_direction(self, current_score, s): """Test both directions to find which improves focus""" - # Try DOWN first + # Try DOWN self.motion.move_z_down() time.sleep(s['fine_move_time']) self.motion.stop_z() @@ -298,11 +386,11 @@ class AutofocusController: down_score = self.get_averaged_focus(samples=s['fine_samples']) - if down_score > best_score: - self.log(f"Fine direction: DOWN ({down_score:.1f})") + if down_score > current_score * 1.02: # 2% improvement threshold + self.log(f"Fine direction: DOWN ({down_score:.1f} > {current_score:.1f})") return 'down' - # Go back and try UP + # Try UP (go back and past) self.motion.move_z_up() time.sleep(s['fine_move_time']) self.motion.stop_z() @@ -315,25 +403,29 @@ class AutofocusController: up_score = self.get_averaged_focus(samples=s['fine_samples']) - if up_score > best_score: - self.log(f"Fine direction: UP ({up_score:.1f})") + if up_score > current_score * 1.02: + self.log(f"Fine direction: UP ({up_score:.1f} > {current_score:.1f})") return 'up' - # Already at peak - self.log("Already at peak, minor adjustment only") + # Already at peak, return to center + self.log("Already at peak") self.motion.move_z_down() time.sleep(s['fine_move_time']) self.motion.stop_z() time.sleep(s['fine_settle_time']) - return 'up' + return 'up' # Default direction for minor adjustments def _fine_search(self, direction, best_score, s): - """Search in given direction until no improvement""" + """ + Search in given direction until no improvement. + Uses visual feedback to track best position. + """ move_func = self.motion.move_z_up if direction == 'up' else self.motion.move_z_down + reverse_func = self.motion.move_z_down if direction == 'up' else self.motion.move_z_up no_improvement_count = 0 - best_position_offset = 0 + moves_since_best = 0 while self.running and no_improvement_count < s['fine_max_no_improvement']: move_func() @@ -347,21 +439,25 @@ class AutofocusController: if current_score > best_score: best_score = current_score no_improvement_count = 0 - best_position_offset = 0 - self.log(f"Fine better: {current_score:.1f}") + moves_since_best = 0 + self.log(f"Fine improved: {current_score:.1f}") else: no_improvement_count += 1 - best_position_offset += 1 + moves_since_best += 1 - return best_score, best_position_offset - - def _return_to_best(self, direction, steps, s): - """Return to best position by reversing direction""" - self.log(f"Returning {steps} steps") - reverse_func = self.motion.move_z_down if direction == 'up' else self.motion.move_z_up + # Go back to best position using visual feedback + if moves_since_best > 0: + self.log(f"Reversing {moves_since_best} moves toward best") + for _ in range(moves_since_best): + reverse_func() + time.sleep(s['fine_move_time']) + self.motion.stop_z() + + # Check if we're back at peak + check_score = self.get_quick_focus(3) + if check_score >= best_score * 0.98: + break + + time.sleep(0.05) - for _ in range(steps): - reverse_func() - time.sleep(s['fine_move_time']) - self.motion.stop_z() - time.sleep(0.1) \ No newline at end of file + return best_score \ No newline at end of file diff --git a/src/gui.py b/src/gui.py index 262253f..d8244ca 100644 --- a/src/gui.py +++ b/src/gui.py @@ -1,12 +1,20 @@ +""" +AutoScope GUI - Vertical Monitor Layout with Feature Visualization + +Enhanced with feature overlay debugging for scanner development. +""" + import tkinter as tk from tkinter import ttk, scrolledtext from PIL import Image, ImageTk import cv2 +import numpy as np import threading import queue from motion_controller import MotionController from autofocus import AutofocusController +from scanner import Scanner, ScanConfig, ScanDirection, Tile class AppGUI: @@ -25,211 +33,456 @@ class AppGUI: on_focus_update=self._update_focus_display_threadsafe ) - # Queue for thread-safe serial log updates + # Scanner + self.scanner = None + self.scan_config = ScanConfig() + + # Queues for thread-safe updates self.serial_queue = queue.Queue() + self.tile_queue = queue.Queue() + + # Mosaic window + self.mosaic_window = None # Build the window self.root = tk.Tk() self.root.title("AutoScope") - self.root.geometry("1080x1080") - self.root.minsize(1080, 1080) + self.root.geometry("720x1280") + self.root.minsize(640, 900) self.root.protocol("WM_DELETE_WINDOW", self.on_close) self._build_ui() + self._init_scanner() # Start serial reader thread self.serial_thread = threading.Thread(target=self._serial_reader, daemon=True) self.serial_thread.start() + def _init_scanner(self): + """Initialize scanner with current config""" + self.scanner = Scanner( + camera=self.camera, + motion_controller=self.motion, + autofocus_controller=self.autofocus, + config=self.scan_config, + on_tile_captured=self._on_tile_captured, + on_log=self.log_message, + on_progress=self._on_scan_progress + ) + def _on_command_sent(self, cmd): - """Callback when motion controller sends a command""" self.log_message(f"> {cmd}") def _update_focus_display_threadsafe(self, score): - """Thread-safe focus display update""" self.root.after(0, lambda: self.focus_score_label.config(text=f"Focus: {score:.1f}")) - # === UI Building === + def _on_tile_captured(self, tile: Tile): + self.tile_queue.put(tile) + + def _on_scan_progress(self, current: int, total: int): + self.root.after(0, lambda: self._update_progress(current, total)) + + # ========================================================================= + # UI Building - Vertical Layout + # ========================================================================= def _build_ui(self): main_frame = ttk.Frame(self.root) main_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) - # Left: Camera view - self.camera_label = ttk.Label(main_frame) - self.camera_label.pack(side=tk.LEFT, padx=(0, 5)) + # === TOP: Camera View (large, full width) === + camera_frame = ttk.LabelFrame(main_frame, text="Camera") + camera_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 5)) - # Right: Control panel - right_frame = ttk.Frame(main_frame, width=320) - right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True) - right_frame.pack_propagate(False) + self.camera_label = ttk.Label(camera_frame) + self.camera_label.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) - self._build_emergency_stop(right_frame) - self._build_speed_controls(right_frame) - self._build_fine_speed_controls(right_frame) - self._build_movement_controls(right_frame) - self._build_autofocus_controls(right_frame) - self._build_status_controls(right_frame) - self._build_mode_controls(right_frame) - self._build_serial_log(right_frame) - self._build_command_entry(right_frame) + # Camera options row + cam_opts = ttk.Frame(camera_frame) + cam_opts.pack(fill=tk.X, padx=5, pady=(0, 5)) + + self.show_tile_overlay_var = tk.BooleanVar(value=True) + ttk.Checkbutton(cam_opts, text="Tile bounds", + variable=self.show_tile_overlay_var).pack(side=tk.LEFT) + + self.show_edge_regions_var = tk.BooleanVar(value=False) + ttk.Checkbutton(cam_opts, text="Track regions", + variable=self.show_edge_regions_var).pack(side=tk.LEFT, padx=(10, 0)) + + # NEW: Feature overlay checkbox + self.show_features_var = tk.BooleanVar(value=False) + ttk.Checkbutton(cam_opts, text="Features", + variable=self.show_features_var).pack(side=tk.LEFT, padx=(10, 0)) + + self.live_focus_var = tk.BooleanVar(value=True) + ttk.Checkbutton(cam_opts, text="Live focus", + variable=self.live_focus_var).pack(side=tk.LEFT, padx=(10, 0)) + + self.focus_score_label = ttk.Label(cam_opts, text="Focus: --", font=('Arial', 11, 'bold')) + self.focus_score_label.pack(side=tk.RIGHT) + + # === BOTTOM: Control Panel === + control_frame = ttk.Frame(main_frame) + control_frame.pack(fill=tk.X) + + # Row 1: Emergency Stop + Scanner Controls + self._build_row1_emergency_scanner(control_frame) + + # Row 2: Movement Controls + self._build_row2_movement(control_frame) + + # Row 3: Speed + Autofocus + self._build_row3_speed_autofocus(control_frame) + + # Row 4: Status + Log + self._build_row4_status_log(control_frame) - def _build_emergency_stop(self, parent): - frame = ttk.Frame(parent) - frame.pack(fill=tk.X, pady=(0, 10)) + def _build_row1_emergency_scanner(self, parent): + """Row 1: Emergency stop and scanner controls""" + row = ttk.Frame(parent) + row.pack(fill=tk.X, pady=(0, 3)) + # Emergency stop button self.emergency_btn = tk.Button( - frame, - text="⚠ EMERGENCY STOP ⚠", - command=self.motion.stop_all, + row, text="⚠ STOP", command=self._emergency_stop, bg='red', fg='white', font=('Arial', 12, 'bold'), - height=2 + width=8, height=1 ) - self.emergency_btn.pack(fill=tk.X) + self.emergency_btn.pack(side=tk.LEFT, padx=(0, 10)) + + # Scanner controls + scanner_frame = ttk.LabelFrame(row, text="Scanner") + scanner_frame.pack(side=tk.LEFT, fill=tk.X, expand=True) + + sf = ttk.Frame(scanner_frame) + sf.pack(fill=tk.X, padx=5, pady=3) + + # Status + Progress + self.scan_status_label = ttk.Label(sf, text="Idle", font=('Arial', 9, 'bold'), width=8) + self.scan_status_label.pack(side=tk.LEFT) + + self.scan_progress_bar = ttk.Progressbar(sf, mode='indeterminate', length=80) + self.scan_progress_bar.pack(side=tk.LEFT, padx=5) + + self.scan_progress_label = ttk.Label(sf, text="", width=10) + self.scan_progress_label.pack(side=tk.LEFT) + + # Buttons + self.scan_start_btn = tk.Button( + sf, text="▶ Start", width=7, bg='green', fg='white', + font=('Arial', 9, 'bold'), command=self._start_scan + ) + self.scan_start_btn.pack(side=tk.LEFT, padx=2) + + self.scan_pause_btn = ttk.Button(sf, text="⏸", width=3, + command=self._pause_scan, state='disabled') + self.scan_pause_btn.pack(side=tk.LEFT, padx=1) + + self.scan_stop_btn = tk.Button( + sf, text="⏹", width=3, bg='orange', fg='white', + command=self._stop_scan, state='disabled' + ) + self.scan_stop_btn.pack(side=tk.LEFT, padx=1) + + ttk.Button(sf, text="📷", width=3, + command=self._capture_single_tile).pack(side=tk.LEFT, padx=1) + + ttk.Button(sf, text="Mosaic", width=6, + command=self._show_mosaic_window).pack(side=tk.LEFT, padx=(5, 2)) + + # AF interval + ttk.Label(sf, text="AF:").pack(side=tk.LEFT, padx=(10, 2)) + self.af_every_var = tk.StringVar(value="5") + ttk.Spinbox(sf, from_=1, to=20, width=3, + textvariable=self.af_every_var).pack(side=tk.LEFT) + + self.af_every_row_var = tk.BooleanVar(value=True) + ttk.Checkbutton(sf, text="Row", variable=self.af_every_row_var).pack(side=tk.LEFT) - def _build_speed_controls(self, parent): - frame = ttk.LabelFrame(parent, text="Speed") - frame.pack(fill=tk.X, pady=(0, 10)) + def _build_row2_movement(self, parent): + """Row 2: Movement controls for all axes""" + row = ttk.LabelFrame(parent, text="Movement") + row.pack(fill=tk.X, pady=(0, 3)) - btn_frame = ttk.Frame(frame) - btn_frame.pack(fill=tk.X, padx=5, pady=5) - - self.speed_var = tk.StringVar(value="Medium") - - for text, value, preset in [("Slow", "Slow", "slow"), - ("Medium", "Medium", "medium"), - ("Fast", "Fast", "fast")]: - ttk.Radiobutton( - btn_frame, text=text, variable=self.speed_var, - value=value, command=lambda p=preset: self.motion.set_speed_preset(p) - ).pack(side=tk.LEFT, padx=5) - - def _build_fine_speed_controls(self, parent): - frame = ttk.LabelFrame(parent, text="Fine Speed Control") - frame.pack(fill=tk.X, pady=(0, 10)) - - inner = ttk.Frame(frame) - inner.pack(fill=tk.X, padx=5, pady=5) - - self.fine_speed_label = ttk.Label(inner, text="Speed: 50", font=('Arial', 10, 'bold')) - self.fine_speed_label.pack(anchor=tk.W) - - self.speed_slider = ttk.Scale(inner, from_=0, to=6, orient=tk.HORIZONTAL) - self.speed_slider.set(5) - self.speed_slider.config(command=self._on_speed_slider_change) - self.speed_slider.pack(fill=tk.X, pady=(5, 0)) - - btn_row = ttk.Frame(inner) - btn_row.pack(fill=tk.X, pady=(5, 0)) - - for i in range(6): - ttk.Button( - btn_row, text=str(i), width=2, - command=lambda x=i: self._set_fine_speed(x) - ).pack(side=tk.LEFT, padx=1) - - def _build_movement_controls(self, parent): - frame = ttk.LabelFrame(parent, text="Movement Control") - frame.pack(fill=tk.X, pady=(0, 10)) + inner = ttk.Frame(row) + inner.pack(fill=tk.X, padx=5, pady=3) self.dir_labels = {} self.move_buttons = {} for axis in ["X", "Y", "Z"]: - row = ttk.Frame(frame) - row.pack(fill=tk.X, pady=3, padx=5) + af = ttk.Frame(inner) + af.pack(side=tk.LEFT, padx=(0, 20)) - ttk.Label(row, text=f"{axis}:", width=3, font=('Arial', 10, 'bold')).pack(side=tk.LEFT) + ttk.Label(af, text=f"{axis}:", font=('Arial', 10, 'bold')).pack(side=tk.LEFT) - dir_btn = ttk.Button(row, text="+ →", width=5, + dir_btn = ttk.Button(af, text="+→", width=4, command=lambda a=axis: self._toggle_direction(a)) dir_btn.pack(side=tk.LEFT, padx=2) self.dir_labels[axis] = dir_btn - move_btn = tk.Button(row, text="Move", width=8, bg='green', fg='white', + move_btn = tk.Button(af, text="Move", width=6, bg='#4CAF50', fg='white', command=lambda a=axis: self._toggle_movement(a)) move_btn.pack(side=tk.LEFT, padx=2) self.move_buttons[axis] = move_btn - ttk.Button(row, text="Stop", width=6, - command=lambda a=axis: self._stop_axis(a)).pack(side=tk.LEFT, padx=2) + ttk.Button(af, text="⏹", width=2, + command=lambda a=axis: self._stop_axis(a)).pack(side=tk.LEFT) - ttk.Button(frame, text="Stop All Axes", - command=self.motion.stop_all).pack(fill=tk.X, padx=5, pady=5) + # Stop all + tk.Button(inner, text="STOP ALL", width=10, bg='#f44336', fg='white', + font=('Arial', 9, 'bold'), + command=self.motion.stop_all).pack(side=tk.RIGHT) + + # Test feature detection button + ttk.Button(inner, text="Test Features", width=11, + command=self._test_feature_detection).pack(side=tk.RIGHT, padx=(0, 10)) - def _build_autofocus_controls(self, parent): - frame = ttk.LabelFrame(parent, text="Autofocus") - frame.pack(fill=tk.X, pady=(0, 10)) + def _build_row3_speed_autofocus(self, parent): + """Row 3: Speed and Autofocus controls""" + row = ttk.Frame(parent) + row.pack(fill=tk.X, pady=(0, 3)) - inner = ttk.Frame(frame) - inner.pack(fill=tk.X, padx=5, pady=5) + # Speed controls + speed_frame = ttk.LabelFrame(row, text="Speed") + speed_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 5)) - self.focus_score_label = ttk.Label(inner, text="Focus: --", font=('Arial', 10)) - self.focus_score_label.pack(anchor=tk.W) + sf = ttk.Frame(speed_frame) + sf.pack(fill=tk.X, padx=5, pady=3) - btn_row = ttk.Frame(inner) - btn_row.pack(fill=tk.X, pady=(5, 0)) + self.speed_var = tk.StringVar(value="Medium") + for text, val, preset in [("S", "Slow", "slow"), ("M", "Medium", "medium"), ("F", "Fast", "fast")]: + ttk.Radiobutton(sf, text=text, variable=self.speed_var, value=val, + command=lambda p=preset: self.motion.set_speed_preset(p)).pack(side=tk.LEFT, padx=2) - self.af_button = ttk.Button(btn_row, text="Autofocus", command=self._start_autofocus) + ttk.Separator(sf, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=5) + + self.fine_speed_label = ttk.Label(sf, text="50", width=3) + self.fine_speed_label.pack(side=tk.LEFT) + + self.speed_slider = ttk.Scale(sf, from_=0, to=6, orient=tk.HORIZONTAL, length=80) + self.speed_slider.set(5) + self.speed_slider.config(command=self._on_speed_slider_change) + self.speed_slider.pack(side=tk.LEFT, padx=3) + + # Autofocus controls + af_frame = ttk.LabelFrame(row, text="Autofocus") + af_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + + af = ttk.Frame(af_frame) + af.pack(fill=tk.X, padx=5, pady=3) + + self.af_button = ttk.Button(af, text="Auto", width=6, command=self._start_autofocus) self.af_button.pack(side=tk.LEFT, padx=2) - ttk.Button(btn_row, text="Coarse AF", + ttk.Button(af, text="Coarse", width=6, command=lambda: self._start_autofocus(coarse=True)).pack(side=tk.LEFT, padx=2) - ttk.Button(btn_row, text="Fine AF", + ttk.Button(af, text="Fine", width=5, command=lambda: self._start_autofocus(fine=True)).pack(side=tk.LEFT, padx=2) - ttk.Button(btn_row, text="Stop AF", + ttk.Button(af, text="Stop", width=5, command=self._stop_autofocus).pack(side=tk.LEFT, padx=2) - - self.live_focus_var = tk.BooleanVar(value=True) - ttk.Checkbutton(inner, text="Show live focus score", - variable=self.live_focus_var).pack(anchor=tk.W, pady=(5, 0)) - def _build_status_controls(self, parent): - frame = ttk.LabelFrame(parent, text="Status") - frame.pack(fill=tk.X, pady=(0, 10)) + def _build_row4_status_log(self, parent): + """Row 4: Status and Log""" + row = ttk.Frame(parent) + row.pack(fill=tk.X, pady=(0, 3)) - btn_frame = ttk.Frame(frame) - btn_frame.pack(fill=tk.X, padx=5, pady=5) + # Status controls + status_frame = ttk.LabelFrame(row, text="Status") + status_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 5)) - ttk.Button(btn_frame, text="Status", command=self.motion.request_status).pack(side=tk.LEFT, padx=2) - ttk.Button(btn_frame, text="Zero All", command=self.motion.zero_all).pack(side=tk.LEFT, padx=2) - ttk.Button(btn_frame, text="Go Origin", command=self.motion.go_origin).pack(side=tk.LEFT, padx=2) - - def _build_mode_controls(self, parent): - frame = ttk.LabelFrame(parent, text="Control Mode") - frame.pack(fill=tk.X, pady=(0, 10)) + stf = ttk.Frame(status_frame) + stf.pack(fill=tk.X, padx=5, pady=3) - btn_frame = ttk.Frame(frame) - btn_frame.pack(fill=tk.X, padx=5, pady=5) + ttk.Button(stf, text="Status", width=6, command=self.motion.request_status).pack(side=tk.LEFT, padx=2) + self.mode_label = ttk.Label(stf, text="Serial", width=8) + self.mode_label.pack(side=tk.LEFT, padx=(10, 0)) + ttk.Button(stf, text="Mode", width=5, command=self.motion.toggle_mode).pack(side=tk.LEFT, padx=2) - self.mode_label = ttk.Label(btn_frame, text="Mode: Serial", font=('Arial', 9)) - self.mode_label.pack(side=tk.LEFT, padx=5) + # Log + log_frame = ttk.LabelFrame(row, text="Log") + log_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) - ttk.Button(btn_frame, text="Toggle Mode", command=self.motion.toggle_mode).pack(side=tk.RIGHT, padx=2) - - def _build_serial_log(self, parent): - ttk.Label(parent, text="Serial Log:").pack(anchor=tk.W) - self.serial_log = scrolledtext.ScrolledText(parent, height=10, width=35, state=tk.DISABLED) - self.serial_log.pack(fill=tk.BOTH, expand=True, pady=(0, 10)) - - def _build_command_entry(self, parent): - ttk.Label(parent, text="Send Command:").pack(anchor=tk.W) - cmd_frame = ttk.Frame(parent) - cmd_frame.pack(fill=tk.X) + self.serial_log = scrolledtext.ScrolledText(log_frame, height=4, state=tk.DISABLED) + self.serial_log.pack(fill=tk.BOTH, expand=True, padx=5, pady=(5, 3)) + + cmd_frame = ttk.Frame(log_frame) + cmd_frame.pack(fill=tk.X, padx=5, pady=(0, 5)) self.cmd_entry = ttk.Entry(cmd_frame) self.cmd_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) self.cmd_entry.bind("", lambda e: self._send_custom_command()) - ttk.Button(cmd_frame, text="Send", command=self._send_custom_command).pack(side=tk.RIGHT, padx=(5, 0)) + ttk.Button(cmd_frame, text="Send", width=6, + command=self._send_custom_command).pack(side=tk.RIGHT, padx=(5, 0)) - # === UI Event Handlers === + # ========================================================================= + # Feature Detection Testing + # ========================================================================= + + def _test_feature_detection(self): + """Test and log feature detection on current frame""" + if not self.scanner: + self.log_message("Scanner not initialized") + return + + try: + frame = self.camera.capture_frame() + + # Detect features on all edges + for edge in ['left', 'right', 'top', 'bottom']: + region = self.scanner.get_edge_region(frame, edge) + kp, desc = self.scanner.detect_features(region) + count = len(kp) if kp else 0 + self.log_message(f" {edge.upper()}: {count} features") + + # Also update scanner's visualization data + self.scanner._update_edge_features(frame) + + # Enable feature view + self.show_features_var.set(True) + + self.log_message("Feature detection test complete - enable 'Features' checkbox to see overlay") + + except Exception as e: + self.log_message(f"Feature detection error: {e}") + + # ========================================================================= + # Scanner Handlers + # ========================================================================= + + def _update_scan_config(self): + """Update scanner config from GUI values""" + self.scan_config.autofocus_every_n_tiles = int(self.af_every_var.get()) + self.scan_config.autofocus_every_row = self.af_every_row_var.get() + self._init_scanner() + + def _start_scan(self): + self._update_scan_config() + # Enable feature view during scan + self.show_features_var.set(True) + if self.scanner.start(): + self.scan_start_btn.config(state='disabled') + self.scan_pause_btn.config(state='normal') + self.scan_stop_btn.config(state='normal') + self.scan_status_label.config(text="Scanning") + self.scan_progress_bar.start(10) + + def _pause_scan(self): + if self.scanner.paused: + self.scanner.resume() + self.scan_pause_btn.config(text="⏸") + self.scan_status_label.config(text="Scanning") + self.scan_progress_bar.start(10) + else: + self.scanner.pause() + self.scan_pause_btn.config(text="▶") + self.scan_status_label.config(text="Paused") + self.scan_progress_bar.stop() + + def _stop_scan(self): + self.scanner.stop() + self._scan_finished() + + def _scan_finished(self): + self.scan_start_btn.config(state='normal') + self.scan_pause_btn.config(state='disabled', text="⏸") + self.scan_stop_btn.config(state='disabled') + self.scan_status_label.config(text="Idle") + self.scan_progress_bar.stop() + + def _update_progress(self, current: int, total: int): + if total > 0: + self.scan_progress_label.config(text=f"{current}/{total}") + else: + self.scan_progress_label.config(text=f"{current} tiles") + + def _capture_single_tile(self): + if self.scanner: + self.scanner.capture_tile() + self._update_mosaic_window() + + def _emergency_stop(self): + self.motion.stop_all() + if self.scanner and self.scanner.running: + self.scanner.stop() + self._scan_finished() + if self.autofocus.is_running(): + self.autofocus.stop() + + # ========================================================================= + # Mosaic Window + # ========================================================================= + + def _show_mosaic_window(self): + if self.mosaic_window is not None: + self.mosaic_window.lift() + self._update_mosaic_window() + return + + self.mosaic_window = tk.Toplevel(self.root) + self.mosaic_window.title("Mosaic") + self.mosaic_window.geometry("600x800") + self.mosaic_window.protocol("WM_DELETE_WINDOW", self._close_mosaic_window) + + self.mosaic_label = ttk.Label(self.mosaic_window, text="No tiles captured") + self.mosaic_label.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) + + btn_frame = ttk.Frame(self.mosaic_window) + btn_frame.pack(fill=tk.X, padx=10, pady=(0, 10)) + + ttk.Button(btn_frame, text="Save Full Resolution", + command=self._save_mosaic).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_frame, text="Refresh", + command=self._update_mosaic_window).pack(side=tk.LEFT, padx=5) + + self._update_mosaic_window() + + def _update_mosaic_window(self): + if self.mosaic_window is None or not self.scanner or not self.scanner.tiles: + return + + mosaic = self.scanner.get_mosaic_preview(max_size=580) + if mosaic is None: + return + + mosaic_rgb = cv2.cvtColor(mosaic, cv2.COLOR_BGR2RGB) + img = Image.fromarray(mosaic_rgb) + imgtk = ImageTk.PhotoImage(image=img) + + self.mosaic_label.imgtk = imgtk + self.mosaic_label.config(image=imgtk, text="") + + def _close_mosaic_window(self): + if self.mosaic_window: + self.mosaic_window.destroy() + self.mosaic_window = None + + def _save_mosaic(self): + if not self.scanner or not self.scanner.tiles: + self.log_message("No tiles to save") + return + + from tkinter import filedialog + filename = filedialog.asksaveasfilename( + defaultextension=".png", + filetypes=[("PNG", "*.png"), ("JPEG", "*.jpg"), ("All", "*.*")] + ) + if filename: + mosaic = self.scanner.build_mosaic(scale=1.0) + if mosaic is not None: + cv2.imwrite(filename, mosaic) + self.log_message(f"Saved: {filename}") + + # ========================================================================= + # Movement Handlers + # ========================================================================= def _toggle_direction(self, axis): direction = self.motion.toggle_direction(axis) arrow = "→" if direction == 1 else "←" sign = "+" if direction == 1 else "-" - self.dir_labels[axis].config(text=f"{sign} {arrow}") + self.dir_labels[axis].config(text=f"{sign}{arrow}") if self.motion.is_moving(axis): self.motion.stop_axis(axis) @@ -245,7 +498,7 @@ class AppGUI: def _stop_axis(self, axis): self.motion.stop_axis(axis) - self.move_buttons[axis].config(text="Move", bg='green') + self.move_buttons[axis].config(text="Move", bg='#4CAF50') def _on_speed_slider_change(self, value): if self._updating_slider: @@ -257,7 +510,7 @@ class AppGUI: self._updating_slider = True self.speed_slider.set(index) speed_val = self.motion.get_speed_value(index) - self.fine_speed_label.config(text=f"Speed: {speed_val}") + self.fine_speed_label.config(text=str(speed_val)) self._updating_slider = False self.motion.set_speed(index) @@ -275,23 +528,24 @@ class AppGUI: self.motion.send_command(cmd) self.cmd_entry.delete(0, tk.END) - # === Logging === + # ========================================================================= + # Logging + # ========================================================================= def log_message(self, msg): if not hasattr(self, 'serial_log'): return - self.serial_log.config(state=tk.NORMAL) self.serial_log.insert(tk.END, msg + "\n") self.serial_log.see(tk.END) self.serial_log.config(state=tk.DISABLED) - print(f" Log: {msg}") if msg.startswith("< MODE:"): - mode = msg.split(":")[-1] - self.mode_label.config(text=f"Mode: {mode.capitalize()}") + self.mode_label.config(text=msg.split(":")[-1].capitalize()) - # === Serial & Camera === + # ========================================================================= + # Serial & Camera + # ========================================================================= def _serial_reader(self): while self.running: @@ -306,24 +560,44 @@ class AppGUI: def _process_serial_queue(self): while not self.serial_queue.empty(): - msg = self.serial_queue.get_nowait() - self.log_message(f"< {msg}") + self.log_message(f"< {self.serial_queue.get_nowait()}") + + def _process_tile_queue(self): + updated = False + while not self.tile_queue.empty(): + self.tile_queue.get_nowait() + updated = True + if updated and self.mosaic_window: + self._update_mosaic_window() def update_camera(self): try: frame = self.camera.capture_frame() + frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) if self.live_focus_var.get() and not self.autofocus.is_running(): score = self.autofocus.get_focus_score() self.focus_score_label.config(text=f"Focus: {score:.1f}") - frame = self._draw_crosshair(frame) + # Apply overlays + frame = self._draw_overlays(frame) + + # Apply feature overlay if enabled + if self.show_features_var.get() and self.scanner: + # Update edge features periodically + self.scanner._update_edge_features(frame) + # Get overlay from scanner + frame = self.scanner.get_feature_overlay(frame) + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + # Scale to fit - maximize for vertical monitor h, w = frame.shape[:2] - max_width = 640 - if w > max_width: - scale = max_width / w + max_h = 700 + max_w = 650 + + scale = min(max_h / h, max_w / w) + if scale < 1: frame = cv2.resize(frame, (int(w * scale), int(h * scale))) img = Image.fromarray(frame) @@ -334,45 +608,53 @@ class AppGUI: except Exception as e: print(f"Camera error: {e}") - def _draw_crosshair(self, frame, color=(0, 0, 255), thickness=1, gap=0, size=40): - """ - Draw a crosshair at the center of the frame. - - Args: - frame: OpenCV image - color: BGR color tuple (default green) - thickness: Line thickness - gap: Gap in center of crosshair - size: Length of crosshair arms from center - """ + def _draw_overlays(self, frame): + frame = self._draw_crosshair(frame) + if self.show_tile_overlay_var.get(): + frame = self._draw_tile_boundary(frame) + if self.show_edge_regions_var.get(): + frame = self._draw_edge_regions(frame) + return frame + + def _draw_crosshair(self, frame, color=(0, 0, 255), thickness=1, size=40): h, w = frame.shape[:2] cx, cy = w // 2, h // 2 - - # Horizontal lines (left and right of center gap) - cv2.line(frame, (cx - size, cy), (cx - gap, cy), color, thickness) - cv2.line(frame, (cx + gap, cy), (cx + size, cy), color, thickness) - - # Vertical lines (above and below center gap) - cv2.line(frame, (cx, cy - size), (cx, cy - gap), color, thickness) - cv2.line(frame, (cx, cy + gap), (cx, cy + size), color, thickness) - - # Optional: center dot - # cv2.circle(frame, (cx, cy), 2, color, -1) - + cv2.line(frame, (cx - size, cy), (cx + size, cy), color, thickness) + cv2.line(frame, (cx, cy - size), (cx, cy + size), color, thickness) return frame - - # === Main Loop === + + def _draw_tile_boundary(self, frame, color=(0, 255, 0), thickness=2): + h, w = frame.shape[:2] + bh, bw = int(h * 0.10), int(w * 0.10) + cv2.rectangle(frame, (bw, bh), (w - bw, h - bh), color, thickness) + return frame + + def _draw_edge_regions(self, frame, color=(255, 255, 0), thickness=1): + h, w = frame.shape[:2] + bh, bw = int(h * 0.10), int(w * 0.10) + cv2.rectangle(frame, (0, 0), (bw, h), color, thickness) + cv2.rectangle(frame, (w - bw, 0), (w, h), color, thickness) + cv2.rectangle(frame, (0, 0), (w, bh), color, thickness) + cv2.rectangle(frame, (0, h - bh), (w, h), color, thickness) + return frame + + # ========================================================================= + # Main Loop + # ========================================================================= def run(self): def update(): if self.running: self.update_camera() self._process_serial_queue() + self._process_tile_queue() - # Re-enable AF button when done if not self.autofocus.is_running(): self.af_button.config(state='normal') + if self.scanner and not self.scanner.running and self.scan_start_btn['state'] == 'disabled': + self._scan_finished() + self.root.after(33, update) update() @@ -380,5 +662,8 @@ class AppGUI: def on_close(self): self.running = False + if self.scanner: + self.scanner.stop() self.autofocus.stop() + self._close_mosaic_window() self.root.destroy() \ No newline at end of file diff --git a/src/motion_controller.py b/src/motion_controller.py index 8e2f919..1c8147b 100644 --- a/src/motion_controller.py +++ b/src/motion_controller.py @@ -2,12 +2,12 @@ class MotionController: # Command mapping for each axis AXIS_COMMANDS = { 'X': {'pos': 'E', 'neg': 'W', 'stop': 'e'}, - 'Y': {'pos': 'N', 'neg': 'S', 'stop': 'n'}, + 'Y': {'pos': 'S', 'neg': 'N', 'stop': 'n'}, 'Z': {'pos': 'U', 'neg': 'D', 'stop': 'u'} } # Speed values matching Arduino speedArr - SPEED_VALUES = [0, 2, 5, 10, 30, 50, 70] + SPEED_VALUES = [0, 1, 2, 5, 10, 30, 50, 70] def __init__(self, arduino, on_command_sent=None): """ diff --git a/src/scanner.py b/src/scanner.py index e69de29..aa193b2 100644 --- a/src/scanner.py +++ b/src/scanner.py @@ -0,0 +1,883 @@ +""" +Scanner Module - Automated slide scanning with visual feedback navigation + +Uses feature tracking to detect tile boundaries instead of step counting. +Camera orientation: Portrait (height > width), X=left/right, Y=up/down + +Enhanced with feature visualization for debugging. +""" + +import cv2 +import numpy as np +import time +import threading +from dataclasses import dataclass, field +from typing import List, Tuple, Optional, Callable, Dict +from enum import Enum +from collections import deque + + +class ScanDirection(Enum): + """Scan direction constants""" + RIGHT = 'right' # X+ (E command) + LEFT = 'left' # X- (W command) + DOWN = 'down' # Y- (N command) + UP = 'up' # Y+ (S command) + + +@dataclass +class Tile: + """Represents a captured tile""" + image: np.ndarray + row: int + col: int + x_pos: int # Grid position + y_pos: int + focus_score: float + timestamp: float + + +@dataclass +class ScanConfig: + """Scanner configuration""" + # Tile extraction + tile_percentage: float = 0.80 # Center 80% is the tile + border_percentage: float = 0.10 # 10% on each side for tracking + + # Feature detection + feature_detector: str = 'SIFT' # 'ORB', 'SIFT', or 'AKAZE' + min_features: int = 10 # Minimum features to track + match_threshold: float = 0.75 # Feature match confidence + min_good_matches: int = 5 # Matches needed to confirm movement + + # Movement timing + move_check_interval: float = 0.1 # Seconds between frame checks + settle_time: float = 0.2 # Seconds to wait after stopping + max_move_time: float = 10.0 # Safety timeout per tile + + # Scan limits (number of tiles) + max_tiles_per_row: int = 50 # Safety limit + max_rows: int = 50 # Safety limit + + # Scan pattern + start_direction: ScanDirection = ScanDirection.RIGHT + + # Autofocus + autofocus_every_n_tiles: int = -1 # Refocus periodically + autofocus_every_row: bool = False + + +@dataclass +class FeatureVisualization: + """Stores feature visualization data for debugging overlay""" + # Current features on each edge + left_features: List[cv2.KeyPoint] = field(default_factory=list) + right_features: List[cv2.KeyPoint] = field(default_factory=list) + top_features: List[cv2.KeyPoint] = field(default_factory=list) + bottom_features: List[cv2.KeyPoint] = field(default_factory=list) + + # Reference features being tracked (from leading edge) + reference_features: List[cv2.KeyPoint] = field(default_factory=list) + reference_edge: str = '' + + # Matched features on target edge + matched_features: List[cv2.KeyPoint] = field(default_factory=list) + target_edge: str = '' + + # Match quality info + num_good_matches: int = 0 + match_threshold: int = 5 + + # Feature position history for trails (deque of (x, y, timestamp)) + feature_trails: Dict[int, deque] = field(default_factory=dict) + + # Active tracking state + is_tracking: bool = False + tracking_direction: str = '' + + # Movement detection info + shift_detected: bool = False + movement_progress: float = 0.0 # Estimated 0-1 progress + + +class Scanner: + """ + Automated slide scanner using visual feedback for navigation. + + Key principle: Use camera as position sensor, not step counting. + Boundary detection: Feature matching - when features stop shifting, + we've likely hit the edge of the slide content. + """ + + def __init__(self, camera, motion_controller, autofocus_controller=None, + config: ScanConfig = None, + on_tile_captured: Callable[[Tile], None] = None, + on_log: Callable[[str], None] = None, + on_progress: Callable[[int, int], None] = None): + """ + Args: + camera: Camera instance + motion_controller: MotionController instance + autofocus_controller: Optional AutofocusController + config: ScanConfig settings + on_tile_captured: Callback(tile) when tile is captured + on_log: Callback(message) for logging + on_progress: Callback(current, total) for progress updates + """ + self.camera = camera + self.motion = motion_controller + self.autofocus = autofocus_controller + self.config = config or ScanConfig() + + # Callbacks + self.on_tile_captured = on_tile_captured + self.on_log = on_log + self.on_progress = on_progress + + # State + self.running = False + self.paused = False + self.tiles: List[Tile] = [] + self.current_row = 0 + self.current_col = 0 + self.tiles_captured = 0 + + # Feature detector + self._init_feature_detector() + + # Thread + self._thread = None + + # Feature visualization for debugging + self.feature_viz = FeatureVisualization() + self._viz_lock = threading.Lock() + + # Trail history length (frames) + self.trail_history_length = 30 + + def _init_feature_detector(self): + """Initialize the feature detector based on config""" + if self.config.feature_detector == 'ORB': + self.detector = cv2.ORB_create(nfeatures=500) + self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False) + elif self.config.feature_detector == 'SIFT': + self.detector = cv2.SIFT_create() + self.matcher = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False) + elif self.config.feature_detector == 'AKAZE': + self.detector = cv2.AKAZE_create() + self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False) + else: + raise ValueError(f"Unknown feature detector: {self.config.feature_detector}") + + def log(self, message: str): + """Log a message""" + if self.on_log: + self.on_log(f"[Scanner] {message}") + print(f"[Scanner] {message}") + + # ========================================================================= + # Feature Visualization + # ========================================================================= + + def get_feature_overlay(self, frame: np.ndarray) -> np.ndarray: + """ + Draw feature visualization overlay on the frame. + + Args: + frame: Input frame to draw on (will be copied) + + Returns: + Frame with feature overlays drawn + """ + overlay = frame.copy() + h, w = frame.shape[:2] + border_h = int(h * self.config.border_percentage) + border_w = int(w * self.config.border_percentage) + + with self._viz_lock: + viz = self.feature_viz + + # Color scheme + COLORS = { + 'left': (255, 100, 100), # Blue-ish + 'right': (100, 100, 255), # Red-ish + 'top': (100, 255, 100), # Green-ish + 'bottom': (255, 255, 100), # Cyan-ish + 'reference': (0, 255, 255), # Yellow - reference features + 'matched': (0, 255, 0), # Green - matched features + 'trail': (255, 0, 255), # Magenta - feature trails + } + + # Draw edge region boundaries (semi-transparent) + overlay_alpha = overlay.copy() + + # Left edge region + cv2.rectangle(overlay_alpha, (0, 0), (border_w, h), COLORS['left'], -1) + # Right edge region + cv2.rectangle(overlay_alpha, (w - border_w, 0), (w, h), COLORS['right'], -1) + # Top edge region + cv2.rectangle(overlay_alpha, (0, 0), (w, border_h), COLORS['top'], -1) + # Bottom edge region + cv2.rectangle(overlay_alpha, (0, h - border_h), (w, h), COLORS['bottom'], -1) + + # Blend with transparency + cv2.addWeighted(overlay_alpha, 0.1, overlay, 0.9, 0, overlay) + + # Draw features on each edge + self._draw_edge_features(overlay, viz.left_features, 'left', + border_w, COLORS['left']) + self._draw_edge_features(overlay, viz.right_features, 'right', + border_w, COLORS['right'], x_offset=w-border_w) + self._draw_edge_features(overlay, viz.top_features, 'top', + border_h, COLORS['top']) + self._draw_edge_features(overlay, viz.bottom_features, 'bottom', + border_h, COLORS['bottom'], y_offset=h-border_h) + + # Draw reference features with larger circles + if viz.is_tracking and viz.reference_features: + ref_offset = self._get_edge_offset(viz.reference_edge, w, h, border_w, border_h) + for kp in viz.reference_features: + pt = (int(kp.pt[0] + ref_offset[0]), int(kp.pt[1] + ref_offset[1])) + cv2.circle(overlay, pt, 8, COLORS['reference'], 2) + cv2.circle(overlay, pt, 3, COLORS['reference'], -1) + + # Draw matched features with connecting lines + if viz.is_tracking and viz.matched_features: + target_offset = self._get_edge_offset(viz.target_edge, w, h, border_w, border_h) + for kp in viz.matched_features: + pt = (int(kp.pt[0] + target_offset[0]), int(kp.pt[1] + target_offset[1])) + cv2.circle(overlay, pt, 8, COLORS['matched'], 2) + cv2.circle(overlay, pt, 3, COLORS['matched'], -1) + + # Draw feature trails + for trail_id, trail in viz.feature_trails.items(): + if len(trail) > 1: + points = [(int(p[0]), int(p[1])) for p in trail] + for i in range(1, len(points)): + # Fade trail from bright to dim + alpha = i / len(points) + color = tuple(int(c * alpha) for c in COLORS['trail']) + cv2.line(overlay, points[i-1], points[i], color, 2) + + # Draw tracking status info + self._draw_tracking_info(overlay, viz, w, h) + + return overlay + + def _draw_edge_features(self, frame, keypoints, edge, region_size, color, + x_offset=0, y_offset=0): + """Draw feature points for an edge region""" + for kp in keypoints: + pt = (int(kp.pt[0] + x_offset), int(kp.pt[1] + y_offset)) + # Draw circle with size proportional to keypoint size + size = max(3, int(kp.size / 4)) if kp.size > 0 else 4 + cv2.circle(frame, pt, size, color, 1) + # Draw center dot + cv2.circle(frame, pt, 2, color, -1) + + def _get_edge_offset(self, edge, w, h, border_w, border_h): + """Get x, y offset for an edge region""" + if edge == 'left': + return (0, 0) + elif edge == 'right': + return (w - border_w, 0) + elif edge == 'top': + return (0, 0) + elif edge == 'bottom': + return (0, h - border_h) + return (0, 0) + + def _draw_tracking_info(self, frame, viz, w, h): + """Draw tracking status text overlay""" + y_pos = 30 + line_height = 25 + + def draw_text(text, color=(255, 255, 255)): + nonlocal y_pos + cv2.putText(frame, text, (10, y_pos), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 3) + cv2.putText(frame, text, (10, y_pos), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 1) + y_pos += line_height + + if viz.is_tracking: + draw_text(f"TRACKING: {viz.tracking_direction}", (0, 255, 255)) + draw_text(f"Ref edge: {viz.reference_edge} ({len(viz.reference_features)} features)") + draw_text(f"Target edge: {viz.target_edge}") + + # Match status with color coding + match_color = (0, 255, 0) if viz.num_good_matches >= viz.match_threshold else (0, 165, 255) + draw_text(f"Matches: {viz.num_good_matches}/{viz.match_threshold}", match_color) + + if viz.shift_detected: + draw_text("SHIFT DETECTED!", (0, 255, 0)) + + # Progress bar + bar_width = 200 + bar_height = 15 + bar_x = 10 + bar_y = y_pos + progress = min(1.0, viz.movement_progress) + cv2.rectangle(frame, (bar_x, bar_y), + (bar_x + bar_width, bar_y + bar_height), (100, 100, 100), -1) + cv2.rectangle(frame, (bar_x, bar_y), + (bar_x + int(bar_width * progress), bar_y + bar_height), + (0, 255, 0), -1) + cv2.rectangle(frame, (bar_x, bar_y), + (bar_x + bar_width, bar_y + bar_height), (255, 255, 255), 1) + else: + # Show feature counts on all edges + draw_text(f"L:{len(viz.left_features)} R:{len(viz.right_features)} " + f"T:{len(viz.top_features)} B:{len(viz.bottom_features)}") + + def _update_edge_features(self, frame): + """Update feature visualization for all edges""" + with self._viz_lock: + self.feature_viz.left_features = self._detect_edge_keypoints(frame, 'left') + self.feature_viz.right_features = self._detect_edge_keypoints(frame, 'right') + self.feature_viz.top_features = self._detect_edge_keypoints(frame, 'top') + self.feature_viz.bottom_features = self._detect_edge_keypoints(frame, 'bottom') + + def _detect_edge_keypoints(self, frame, edge) -> List[cv2.KeyPoint]: + """Detect keypoints in an edge region""" + region = self.get_edge_region(frame, edge) + gray = cv2.cvtColor(region, cv2.COLOR_BGR2GRAY) if len(region.shape) == 3 else region + keypoints, _ = self.detector.detectAndCompute(gray, None) + return list(keypoints) if keypoints else [] + + def _update_tracking_visualization(self, ref_kps, ref_edge, target_edge, + current_frame, good_matches, direction): + """Update visualization during tracking""" + with self._viz_lock: + viz = self.feature_viz + viz.is_tracking = True + viz.tracking_direction = direction + viz.reference_edge = ref_edge + viz.target_edge = target_edge + viz.reference_features = list(ref_kps) if ref_kps else [] + viz.num_good_matches = len(good_matches) if good_matches else 0 + viz.match_threshold = self.config.min_good_matches + + # Detect features on target edge + target_kps = self._detect_edge_keypoints(current_frame, target_edge) + viz.matched_features = target_kps # Show all features on target edge + + def _clear_tracking_visualization(self): + """Clear tracking visualization state""" + with self._viz_lock: + viz = self.feature_viz + viz.is_tracking = False + viz.tracking_direction = '' + viz.reference_features = [] + viz.matched_features = [] + viz.num_good_matches = 0 + viz.shift_detected = False + viz.movement_progress = 0.0 + viz.feature_trails.clear() + + # ========================================================================= + # Tile Extraction + # ========================================================================= + + def extract_tile(self, frame: np.ndarray) -> np.ndarray: + """Extract the center tile (80%) from a frame""" + h, w = frame.shape[:2] + border_h = int(h * self.config.border_percentage) + border_w = int(w * self.config.border_percentage) + + tile = frame[border_h:h-border_h, border_w:w-border_w] + return tile.copy() + + def get_edge_region(self, frame: np.ndarray, edge: str) -> np.ndarray: + """ + Extract a strip from the specified edge for feature tracking. + + Args: + frame: Input frame + edge: 'left', 'right', 'top', 'bottom' + + Returns: + Edge strip image + """ + h, w = frame.shape[:2] + border_h = int(h * self.config.border_percentage) + border_w = int(w * self.config.border_percentage) + + if edge == 'left': + return frame[:, :border_w].copy() + elif edge == 'right': + return frame[:, w-border_w:].copy() + elif edge == 'top': + return frame[:border_h, :].copy() + elif edge == 'bottom': + return frame[h-border_h:, :].copy() + else: + raise ValueError(f"Unknown edge: {edge}") + + # ========================================================================= + # Feature Detection and Matching + # ========================================================================= + + def detect_features(self, region: np.ndarray) -> Tuple[list, np.ndarray]: + """ + Detect features in a region. + + Returns: + (keypoints, descriptors) + """ + gray = cv2.cvtColor(region, cv2.COLOR_BGR2GRAY) if len(region.shape) == 3 else region + keypoints, descriptors = self.detector.detectAndCompute(gray, None) + return keypoints, descriptors + + def match_features(self, desc1: np.ndarray, desc2: np.ndarray) -> List: + """ + Match features between two descriptor sets. + + Returns: + List of good matches + """ + if desc1 is None or desc2 is None: + return [] + + if len(desc1) < 2 or len(desc2) < 2: + return [] + + # KNN match + try: + matches = self.matcher.knnMatch(desc1, desc2, k=2) + except cv2.error: + return [] + + # Apply ratio test + good_matches = [] + for match_pair in matches: + if len(match_pair) == 2: + m, n = match_pair + if m.distance < self.config.match_threshold * n.distance: + good_matches.append(m) + + return good_matches + + def features_have_shifted(self, + reference_desc: np.ndarray, + current_frame: np.ndarray, + from_edge: str, + to_edge: str) -> bool: + """ + Check if features from one edge now appear on the opposite edge. + + Args: + reference_desc: Descriptors from the reference edge + current_frame: Current camera frame + from_edge: Edge where features were originally ('left', 'right') + to_edge: Edge to check for features ('right', 'left') + + Returns: + True if enough features matched (tile movement complete) + """ + # Get the target edge region + target_region = self.get_edge_region(current_frame, to_edge) + + # Detect features in target region + _, target_desc = self.detect_features(target_region) + + # Match with reference + good_matches = self.match_features(reference_desc, target_desc) + + return len(good_matches) >= self.config.min_good_matches + + def count_features_on_edge(self, frame: np.ndarray, edge: str) -> int: + """Count features visible on an edge region""" + region = self.get_edge_region(frame, edge) + kp, desc = self.detect_features(region) + return len(kp) if kp else 0 + + # ========================================================================= + # Movement with Visual Feedback + # ========================================================================= + + def move_until_tile_shifted(self, direction: ScanDirection) -> bool: + """ + Move in a direction until features indicate we've moved one tile. + + Uses feature tracking to detect when tile content has shifted. + Returns False if timeout or no more content detected. + + Args: + direction: ScanDirection to move + + Returns: + True if tile shift detected, False if timeout/boundary + """ + # Determine edges based on direction + if direction == ScanDirection.RIGHT: + from_edge, to_edge = 'right', 'left' + move_cmd = lambda: self.motion.start_movement('X') + self.motion.axis_direction['X'] = 1 + axis = 'X' + elif direction == ScanDirection.LEFT: + from_edge, to_edge = 'left', 'right' + move_cmd = lambda: self.motion.start_movement('X') + self.motion.axis_direction['X'] = -1 + axis = 'X' + elif direction == ScanDirection.DOWN: + from_edge, to_edge = 'bottom', 'top' + move_cmd = lambda: self.motion.start_movement('Y') + self.motion.axis_direction['Y'] = -1 + axis = 'Y' + elif direction == ScanDirection.UP: + from_edge, to_edge = 'top', 'bottom' + move_cmd = lambda: self.motion.start_movement('Y') + self.motion.axis_direction['Y'] = 1 + axis = 'Y' + else: + raise ValueError(f"Unknown direction: {direction}") + + # Capture reference features from the leading edge + frame = self.camera.capture_frame() + reference_region = self.get_edge_region(frame, from_edge) + ref_kp, ref_desc = self.detect_features(reference_region) + + if ref_desc is None or len(ref_desc) < self.config.min_features: + self.log(f"Warning: Only {len(ref_kp) if ref_kp else 0} features found on {from_edge} edge") + + # Initialize tracking visualization + with self._viz_lock: + self.feature_viz.is_tracking = True + self.feature_viz.tracking_direction = direction.value + self.feature_viz.reference_edge = from_edge + self.feature_viz.target_edge = to_edge + self.feature_viz.reference_features = list(ref_kp) if ref_kp else [] + self.feature_viz.shift_detected = False + self.feature_viz.movement_progress = 0.0 + + # Start movement + move_cmd() + start_time = time.time() + check_count = 0 + + try: + while self.running and not self.paused: + time.sleep(self.config.move_check_interval) + check_count += 1 + + # Safety timeout + elapsed = time.time() - start_time + if elapsed > self.config.max_move_time: + self.log("Movement timeout - likely at boundary") + return False + + # Update progress estimate + with self._viz_lock: + self.feature_viz.movement_progress = min(1.0, elapsed / (self.config.max_move_time * 0.5)) + + # Capture current frame + current_frame = self.camera.capture_frame() + + # Update edge features for visualization + self._update_edge_features(current_frame) + + # Get target edge features + target_region = self.get_edge_region(current_frame, to_edge) + target_kp, target_desc = self.detect_features(target_region) + + # Match with reference + good_matches = self.match_features(ref_desc, target_desc) if ref_desc is not None else [] + + # Update visualization + self._update_tracking_visualization( + ref_kp, from_edge, to_edge, current_frame, good_matches, direction.value + ) + + # Check if features have shifted (tile boundary crossed) + if ref_desc is not None and len(good_matches) >= self.config.min_good_matches: + self.log(f"Tile shift detected via feature matching ({len(good_matches)} matches)") + with self._viz_lock: + self.feature_viz.shift_detected = True + self.feature_viz.movement_progress = 1.0 + return True + + # Log progress periodically + if check_count % 10 == 0: + self.log(f" Checking... {len(good_matches)} matches so far") + + finally: + # Stop movement + self.motion.stop_axis(axis) + time.sleep(self.config.settle_time) + + # Clear tracking visualization after a delay + # (keep it visible briefly so user can see final state) + threading.Timer(1.0, self._clear_tracking_visualization).start() + + return False + + # ========================================================================= + # Scanning Operations + # ========================================================================= + + def capture_tile(self) -> Tile: + """Capture a single tile at current position""" + frame = self.camera.capture_frame() + tile_image = self.extract_tile(frame) + + # Calculate focus score for metadata + from vision import calculate_focus_score_sobel + focus_score = calculate_focus_score_sobel(frame) + + tile = Tile( + image=tile_image, + row=self.current_row, + col=self.current_col, + x_pos=self.current_col, + y_pos=self.current_row, + focus_score=focus_score, + timestamp=time.time() + ) + + self.tiles.append(tile) + self.tiles_captured += 1 + + if self.on_tile_captured: + self.on_tile_captured(tile) + + self.log(f"Captured tile [{self.current_row}, {self.current_col}] focus={focus_score:.1f}") + + return tile + + def scan_row(self, direction: ScanDirection) -> List[Tile]: + """ + Scan a complete row in the given direction. + + Args: + direction: ScanDirection.LEFT or ScanDirection.RIGHT + + Returns: + List of tiles captured in this row + """ + row_tiles = [] + tiles_in_row = 0 + + self.log(f"Starting row {self.current_row} scan ({direction.value})") + + while self.running and not self.paused: + # Safety limit + if tiles_in_row >= self.config.max_tiles_per_row: + self.log(f"Max tiles per row reached ({self.config.max_tiles_per_row})") + break + + # Autofocus check + # if (self.autofocus and + # self.config.autofocus_every_n_tiles > 0 and + # self.tiles_captured > 0 and + # self.tiles_captured % self.config.autofocus_every_n_tiles == 0): + # self.log("Running autofocus...") + # self.autofocus.start() + # while self.autofocus.is_running(): + # time.sleep(0.1) + + # Update edge features before capture + frame = self.camera.capture_frame() + self._update_edge_features(frame) + + # Capture tile + tile = self.capture_tile() + row_tiles.append(tile) + tiles_in_row += 1 + + # Update progress + if self.on_progress: + self.on_progress(self.tiles_captured, -1) + + # Move to next tile position + tile_shifted = self.move_until_tile_shifted(direction) + + if not tile_shifted: + # Timeout or boundary = end of row + self.log(f"Row {self.current_row} complete, {len(row_tiles)} tiles") + break + + # Update column position + if direction == ScanDirection.RIGHT: + self.current_col += 1 + else: + self.current_col -= 1 + + return row_tiles + + def step_to_next_row(self) -> bool: + """ + Move down to the next row. + + Returns: + True if successful, False if bottom edge reached + """ + # Safety limit + if self.current_row >= self.config.max_rows: + self.log(f"Max rows reached ({self.config.max_rows})") + return False + + self.log("Stepping to next row...") + + # Move down + tile_shifted = self.move_until_tile_shifted(ScanDirection.DOWN) + + if tile_shifted: + self.current_row += 1 + + # Autofocus at start of each row if configured + # if self.autofocus and self.config.autofocus_every_row: + # self.log("Row start autofocus...") + # self.autofocus.start() + # while self.autofocus.is_running(): + # time.sleep(0.1) + + return True + else: + self.log("Bottom edge reached") + return False + + def full_scan(self): + """Execute a complete serpentine scan of the slide""" + self.log("Starting full scan") + self.running = True + self.tiles = [] + self.current_row = 0 + self.current_col = 0 + self.tiles_captured = 0 + + # Alternate scan direction each row (serpentine) + scan_direction = self.config.start_direction + + try: + while self.running: + # Handle pause + while self.paused and self.running: + time.sleep(0.1) + + if not self.running: + break + + # Scan current row + self.scan_row(scan_direction) + + if not self.running: + break + + # Try to move to next row + if not self.step_to_next_row(): + break + + # Reverse direction for next row (serpentine) + if scan_direction == ScanDirection.RIGHT: + scan_direction = ScanDirection.LEFT + else: + scan_direction = ScanDirection.RIGHT + + self.log(f"Scan complete! {self.tiles_captured} tiles captured") + + except Exception as e: + self.log(f"Scan error: {e}") + raise + finally: + self.running = False + self._clear_tracking_visualization() + + # ========================================================================= + # Control Methods + # ========================================================================= + + def start(self): + """Start scanning in background thread""" + if self.running: + self.log("Scan already running") + return False + + self._thread = threading.Thread(target=self.full_scan, daemon=True) + self._thread.start() + return True + + def stop(self): + """Stop scanning""" + self.running = False + self.motion.stop_all() + self._clear_tracking_visualization() + self.log("Scan stopped") + + def pause(self): + """Pause scanning""" + self.paused = True + self.motion.stop_all() + self.log("Scan paused") + + def resume(self): + """Resume scanning""" + self.paused = False + self.log("Scan resumed") + + # ========================================================================= + # Mosaic Building + # ========================================================================= + + def build_mosaic(self, scale: float = 0.25) -> np.ndarray: + """ + Build a mosaic image from captured tiles. + + Args: + scale: Scale factor for output (0.25 = 25% size) + + Returns: + Mosaic image + """ + if not self.tiles: + return None + + # Find grid dimensions + max_row = max(t.row for t in self.tiles) + max_col = max(t.col for t in self.tiles) + min_col = min(t.col for t in self.tiles) + + # Get tile dimensions (from first tile) + tile_h, tile_w = self.tiles[0].image.shape[:2] + scaled_h = int(tile_h * scale) + scaled_w = int(tile_w * scale) + + # Calculate mosaic size + num_rows = max_row + 1 + num_cols = max_col - min_col + 1 + + mosaic = np.zeros((num_rows * scaled_h, num_cols * scaled_w, 3), dtype=np.uint8) + + # Place tiles + for tile in self.tiles: + row = tile.row + col = tile.col - min_col + + scaled_tile = cv2.resize(tile.image, (scaled_w, scaled_h)) + + y = row * scaled_h + x = col * scaled_w + mosaic[y:y+scaled_h, x:x+scaled_w] = scaled_tile + + return mosaic + + def get_mosaic_preview(self, max_size: int = 800) -> np.ndarray: + """ + Get a preview-sized mosaic. + + Args: + max_size: Maximum dimension of output + + Returns: + Preview mosaic image + """ + mosaic = self.build_mosaic(scale=1.0) + if mosaic is None: + return None + + h, w = mosaic.shape[:2] + if max(h, w) > max_size: + scale = max_size / max(h, w) + mosaic = cv2.resize(mosaic, (int(w * scale), int(h * scale))) + + return mosaic \ No newline at end of file