diff --git a/Mos1.png b/Mos1.png new file mode 100644 index 0000000..62a8f10 Binary files /dev/null and b/Mos1.png differ diff --git a/Mos2.png b/Mos2.png new file mode 100644 index 0000000..135eed4 Binary files /dev/null and b/Mos2.png differ diff --git a/src/gui.py b/src/gui.py index d8244ca..d8ccc2f 100644 --- a/src/gui.py +++ b/src/gui.py @@ -1,7 +1,8 @@ """ -AutoScope GUI - Vertical Monitor Layout with Feature Visualization +AutoScope GUI - Vertical Monitor Layout with Edge Comparison Overlay -Enhanced with feature overlay debugging for scanner development. +Shows reference and current binary edge images overlaid directly on camera view. +Updated with interpolation controls and improved scanner UI. """ import tkinter as tk @@ -97,30 +98,65 @@ class AppGUI: self.camera_label = ttk.Label(camera_frame) self.camera_label.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) - # Camera options row - cam_opts = ttk.Frame(camera_frame) - cam_opts.pack(fill=tk.X, padx=5, pady=(0, 5)) + # Camera options row 1: overlays + cam_opts1 = ttk.Frame(camera_frame) + cam_opts1.pack(fill=tk.X, padx=5, pady=(0, 2)) self.show_tile_overlay_var = tk.BooleanVar(value=True) - ttk.Checkbutton(cam_opts, text="Tile bounds", + ttk.Checkbutton(cam_opts1, text="Tile bounds", variable=self.show_tile_overlay_var).pack(side=tk.LEFT) self.show_edge_regions_var = tk.BooleanVar(value=False) - ttk.Checkbutton(cam_opts, text="Track regions", + ttk.Checkbutton(cam_opts1, text="Edge regions", variable=self.show_edge_regions_var).pack(side=tk.LEFT, padx=(10, 0)) - # NEW: Feature overlay checkbox - self.show_features_var = tk.BooleanVar(value=False) - ttk.Checkbutton(cam_opts, text="Features", - variable=self.show_features_var).pack(side=tk.LEFT, padx=(10, 0)) + self.show_comparison_var = tk.BooleanVar(value=True) + ttk.Checkbutton(cam_opts1, text="Comparison overlay", + variable=self.show_comparison_var).pack(side=tk.LEFT, padx=(10, 0)) self.live_focus_var = tk.BooleanVar(value=True) - ttk.Checkbutton(cam_opts, text="Live focus", + ttk.Checkbutton(cam_opts1, text="Live focus", variable=self.live_focus_var).pack(side=tk.LEFT, padx=(10, 0)) - self.focus_score_label = ttk.Label(cam_opts, text="Focus: --", font=('Arial', 11, 'bold')) + self.focus_score_label = ttk.Label(cam_opts1, text="Focus: --", font=('Arial', 11, 'bold')) self.focus_score_label.pack(side=tk.RIGHT) + # Camera options row 2: threshold and similarity + cam_opts2 = ttk.Frame(camera_frame) + cam_opts2.pack(fill=tk.X, padx=5, pady=(0, 5)) + + ttk.Label(cam_opts2, text="Match threshold:").pack(side=tk.LEFT) + + self.threshold_var = tk.DoubleVar(value=0.12) + self.threshold_slider = ttk.Scale( + cam_opts2, + from_=0.05, + to=0.8, + orient=tk.HORIZONTAL, + variable=self.threshold_var, + command=self._on_threshold_change, + length=100 + ) + self.threshold_slider.pack(side=tk.LEFT, padx=5) + + self.threshold_label = ttk.Label(cam_opts2, text="0.12", width=5) + self.threshold_label.pack(side=tk.LEFT) + + ttk.Button(cam_opts2, text="Test Edges", width=10, + command=self._test_edge_comparison).pack(side=tk.LEFT, padx=(10, 0)) + + ttk.Button(cam_opts2, text="Test Interp", width=10, + command=self._test_interpolation).pack(side=tk.LEFT, padx=(5, 0)) + + # Similarity display + ttk.Separator(cam_opts2, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=10) + ttk.Label(cam_opts2, text="Sim:").pack(side=tk.LEFT) + self.similarity_label = ttk.Label(cam_opts2, text="--", font=('Arial', 10, 'bold'), width=6) + self.similarity_label.pack(side=tk.LEFT) + + self.match_status_label = ttk.Label(cam_opts2, text="", width=10) + self.match_status_label.pack(side=tk.LEFT, padx=(3, 0)) + # === BOTTOM: Control Panel === control_frame = ttk.Frame(main_frame) control_frame.pack(fill=tk.X) @@ -128,14 +164,17 @@ class AppGUI: # Row 1: Emergency Stop + Scanner Controls self._build_row1_emergency_scanner(control_frame) - # Row 2: Movement Controls - self._build_row2_movement(control_frame) + # Row 2: Interpolation Settings (NEW) + self._build_row2_interpolation(control_frame) - # Row 3: Speed + Autofocus - self._build_row3_speed_autofocus(control_frame) + # Row 3: Movement Controls + self._build_row3_movement(control_frame) - # Row 4: Status + Log - self._build_row4_status_log(control_frame) + # Row 4: Speed + Autofocus + self._build_row4_speed_autofocus(control_frame) + + # Row 5: Status + Log + self._build_row5_status_log(control_frame) def _build_row1_emergency_scanner(self, parent): """Row 1: Emergency stop and scanner controls""" @@ -145,10 +184,10 @@ class AppGUI: # Emergency stop button self.emergency_btn = tk.Button( row, text="⚠ STOP", command=self._emergency_stop, - bg='red', fg='white', font=('Arial', 12, 'bold'), - width=8, height=1 + bg='#d32f2f', fg='white', font=('Arial', 11, 'bold'), + width=7, height=1, relief=tk.RAISED, bd=2 ) - self.emergency_btn.pack(side=tk.LEFT, padx=(0, 10)) + self.emergency_btn.pack(side=tk.LEFT, padx=(0, 8)) # Scanner controls scanner_frame = ttk.LabelFrame(row, text="Scanner") @@ -157,50 +196,141 @@ class AppGUI: sf = ttk.Frame(scanner_frame) sf.pack(fill=tk.X, padx=5, pady=3) - # Status + Progress + # Status indicator self.scan_status_label = ttk.Label(sf, text="Idle", font=('Arial', 9, 'bold'), width=8) self.scan_status_label.pack(side=tk.LEFT) - self.scan_progress_bar = ttk.Progressbar(sf, mode='indeterminate', length=80) - self.scan_progress_bar.pack(side=tk.LEFT, padx=5) + # Progress + self.scan_progress_bar = ttk.Progressbar(sf, mode='indeterminate', length=60) + self.scan_progress_bar.pack(side=tk.LEFT, padx=3) - self.scan_progress_label = ttk.Label(sf, text="", width=10) + self.scan_progress_label = ttk.Label(sf, text="0 tiles", width=8) self.scan_progress_label.pack(side=tk.LEFT) - # Buttons + # Separator + ttk.Separator(sf, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=5) + + # Control buttons with proper styling + btn_frame = ttk.Frame(sf) + btn_frame.pack(side=tk.LEFT) + self.scan_start_btn = tk.Button( - sf, text="▶ Start", width=7, bg='green', fg='white', - font=('Arial', 9, 'bold'), command=self._start_scan + btn_frame, text="▶ Start", width=7, + bg='#4CAF50', fg='white', font=('Arial', 9, 'bold'), + activebackground='#45a049', relief=tk.RAISED, bd=2, + command=self._start_scan ) self.scan_start_btn.pack(side=tk.LEFT, padx=2) - self.scan_pause_btn = ttk.Button(sf, text="⏸", width=3, - command=self._pause_scan, state='disabled') - self.scan_pause_btn.pack(side=tk.LEFT, padx=1) + self.scan_pause_btn = tk.Button( + btn_frame, text="⏸", width=3, + bg='#2196F3', fg='white', font=('Arial', 10, 'bold'), + activebackground='#1976D2', relief=tk.RAISED, bd=2, + command=self._pause_scan, state='disabled' + ) + self.scan_pause_btn.pack(side=tk.LEFT, padx=2) self.scan_stop_btn = tk.Button( - sf, text="⏹", width=3, bg='orange', fg='white', + btn_frame, text="⏹", width=3, + bg='#FF9800', fg='white', font=('Arial', 10, 'bold'), + activebackground='#F57C00', relief=tk.RAISED, bd=2, command=self._stop_scan, state='disabled' ) - self.scan_stop_btn.pack(side=tk.LEFT, padx=1) + self.scan_stop_btn.pack(side=tk.LEFT, padx=2) - ttk.Button(sf, text="📷", width=3, - command=self._capture_single_tile).pack(side=tk.LEFT, padx=1) + # Capture single tile + ttk.Button(btn_frame, text="📷", width=3, + command=self._capture_single_tile).pack(side=tk.LEFT, padx=2) + # Mosaic button ttk.Button(sf, text="Mosaic", width=6, - command=self._show_mosaic_window).pack(side=tk.LEFT, padx=(5, 2)) - - # AF interval - ttk.Label(sf, text="AF:").pack(side=tk.LEFT, padx=(10, 2)) - self.af_every_var = tk.StringVar(value="5") - ttk.Spinbox(sf, from_=1, to=20, width=3, - textvariable=self.af_every_var).pack(side=tk.LEFT) - - self.af_every_row_var = tk.BooleanVar(value=True) - ttk.Checkbutton(sf, text="Row", variable=self.af_every_row_var).pack(side=tk.LEFT) + command=self._show_mosaic_window).pack(side=tk.RIGHT, padx=2) - def _build_row2_movement(self, parent): - """Row 2: Movement controls for all axes""" + def _build_row2_interpolation(self, parent): + """Row 2: Interpolation and peak detection settings""" + row = ttk.LabelFrame(parent, text="Matching Settings") + row.pack(fill=tk.X, pady=(0, 3)) + + inner = ttk.Frame(row) + inner.pack(fill=tk.X, padx=5, pady=3) + + # Interpolation toggle + self.use_interpolation_var = tk.BooleanVar(value=True) + ttk.Checkbutton( + inner, text="Interpolation", + variable=self.use_interpolation_var, + command=self._update_scan_config + ).pack(side=tk.LEFT) + + # Number of interpolations + ttk.Label(inner, text="Steps:").pack(side=tk.LEFT, padx=(10, 2)) + self.num_interp_var = tk.IntVar(value=10) + self.num_interp_spinbox = ttk.Spinbox( + inner, from_=5, to=30, width=4, + textvariable=self.num_interp_var, + command=self._update_scan_config + ) + self.num_interp_spinbox.pack(side=tk.LEFT) + + # Separator + ttk.Separator(inner, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=10) + + # Peak detection toggle + self.use_peak_detection_var = tk.BooleanVar(value=True) + ttk.Checkbutton( + inner, text="Peak Detection", + variable=self.use_peak_detection_var, + command=self._update_scan_config + ).pack(side=tk.LEFT) + + # Peak window size + ttk.Label(inner, text="Window:").pack(side=tk.LEFT, padx=(10, 2)) + self.peak_window_var = tk.IntVar(value=5) + self.peak_window_spinbox = ttk.Spinbox( + inner, from_=3, to=15, width=3, + textvariable=self.peak_window_var, + command=self._update_scan_config + ) + self.peak_window_spinbox.pack(side=tk.LEFT) + + # Peak drop threshold + ttk.Label(inner, text="Drop:").pack(side=tk.LEFT, padx=(10, 2)) + self.peak_drop_var = tk.DoubleVar(value=0.02) + self.peak_drop_spinbox = ttk.Spinbox( + inner, from_=0.01, to=0.1, increment=0.01, width=5, + textvariable=self.peak_drop_var, + command=self._update_scan_config + ) + self.peak_drop_spinbox.pack(side=tk.LEFT) + + # Separator + ttk.Separator(inner, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=10) + + # Comparison method dropdown + ttk.Label(inner, text="Method:").pack(side=tk.LEFT, padx=(0, 2)) + self.comparison_method_var = tk.StringVar(value='template') + method_combo = ttk.Combobox( + inner, textvariable=self.comparison_method_var, + values=['template', 'ssim', 'mse', 'hst', 'phase'], + width=8, state='readonly' + ) + method_combo.pack(side=tk.LEFT) + method_combo.bind('<>', lambda e: self._update_scan_config()) + + # Status indicators on right + status_frame = ttk.Frame(inner) + status_frame.pack(side=tk.RIGHT) + + ttk.Label(status_frame, text="Offset:").pack(side=tk.LEFT) + self.offset_label = ttk.Label(status_frame, text="--", width=5, font=('Arial', 9)) + self.offset_label.pack(side=tk.LEFT) + + ttk.Label(status_frame, text="Peak:").pack(side=tk.LEFT, padx=(8, 0)) + self.peak_label = ttk.Label(status_frame, text="--", width=5, font=('Arial', 9)) + self.peak_label.pack(side=tk.LEFT) + + def _build_row3_movement(self, parent): + """Row 3: Movement controls for all axes""" row = ttk.LabelFrame(parent, text="Movement") row.pack(fill=tk.X, pady=(0, 3)) @@ -212,7 +342,7 @@ class AppGUI: for axis in ["X", "Y", "Z"]: af = ttk.Frame(inner) - af.pack(side=tk.LEFT, padx=(0, 20)) + af.pack(side=tk.LEFT, padx=(0, 15)) ttk.Label(af, text=f"{axis}:", font=('Arial', 10, 'bold')).pack(side=tk.LEFT) @@ -221,25 +351,28 @@ class AppGUI: dir_btn.pack(side=tk.LEFT, padx=2) self.dir_labels[axis] = dir_btn - move_btn = tk.Button(af, text="Move", width=6, bg='#4CAF50', fg='white', - command=lambda a=axis: self._toggle_movement(a)) + move_btn = tk.Button( + af, text="Move", width=6, + bg='#4CAF50', fg='white', font=('Arial', 9), + activebackground='#45a049', relief=tk.RAISED, + command=lambda a=axis: self._toggle_movement(a) + ) move_btn.pack(side=tk.LEFT, padx=2) self.move_buttons[axis] = move_btn ttk.Button(af, text="⏹", width=2, command=lambda a=axis: self._stop_axis(a)).pack(side=tk.LEFT) - # Stop all - tk.Button(inner, text="STOP ALL", width=10, bg='#f44336', fg='white', - font=('Arial', 9, 'bold'), - command=self.motion.stop_all).pack(side=tk.RIGHT) - - # Test feature detection button - ttk.Button(inner, text="Test Features", width=11, - command=self._test_feature_detection).pack(side=tk.RIGHT, padx=(0, 10)) + # Stop all button + tk.Button( + inner, text="STOP ALL", width=10, + bg='#f44336', fg='white', font=('Arial', 9, 'bold'), + activebackground='#d32f2f', relief=tk.RAISED, bd=2, + command=self.motion.stop_all + ).pack(side=tk.RIGHT) - def _build_row3_speed_autofocus(self, parent): - """Row 3: Speed and Autofocus controls""" + def _build_row4_speed_autofocus(self, parent): + """Row 4: Speed and Autofocus controls""" row = ttk.Frame(parent) row.pack(fill=tk.X, pady=(0, 3)) @@ -257,12 +390,13 @@ class AppGUI: ttk.Separator(sf, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=5) - self.fine_speed_label = ttk.Label(sf, text="50", width=3) + self.fine_speed_label = ttk.Label(sf, text="1", width=3) self.fine_speed_label.pack(side=tk.LEFT) self.speed_slider = ttk.Scale(sf, from_=0, to=6, orient=tk.HORIZONTAL, length=80) - self.speed_slider.set(5) + self.speed_slider.set(1) self.speed_slider.config(command=self._on_speed_slider_change) + self._on_speed_slider_change(1) self.speed_slider.pack(side=tk.LEFT, padx=3) # Autofocus controls @@ -282,8 +416,8 @@ class AppGUI: ttk.Button(af, text="Stop", width=5, command=self._stop_autofocus).pack(side=tk.LEFT, padx=2) - def _build_row4_status_log(self, parent): - """Row 4: Status and Log""" + def _build_row5_status_log(self, parent): + """Row 5: Status and Log""" row = ttk.Frame(parent) row.pack(fill=tk.X, pady=(0, 3)) @@ -317,35 +451,320 @@ class AppGUI: command=self._send_custom_command).pack(side=tk.RIGHT, padx=(5, 0)) # ========================================================================= - # Feature Detection Testing + # Comparison Overlay Drawing # ========================================================================= - def _test_feature_detection(self): - """Test and log feature detection on current frame""" + def _draw_comparison_overlay(self, frame: np.ndarray) -> np.ndarray: + """ + Draw edge comparison overlay directly on camera frame. + Shows reference, current, and interpolated binary images. + """ + if not self.scanner or not self.show_comparison_var.get(): + return frame + + state = self.scanner.get_comparison_state() + + # Show overlay if tracking OR if paused with valid reference image + if not state.is_tracking and not (self.scanner.paused and state.reference_image is not None): + return frame + + h, w = frame.shape[:2] + border_pct = self.scan_config.border_percentage + + # Map raw edges to display positions after 90° CW rotation + edge_to_display = { + 'top': 'right', + 'bottom': 'left', + 'left': 'top', + 'right': 'bottom' + } + + ref_display_edge = edge_to_display.get(state.reference_edge, '') + target_display_edge = edge_to_display.get(state.target_edge, '') + + # Draw reference image (cyan) + if state.reference_image is not None: + frame = self._draw_binary_at_edge( + frame, state.reference_image, ref_display_edge, + border_pct, color=(0, 255, 255), label="REF" + ) + + # Draw current/interpolated image (yellow) + # Prefer interpolated if available, otherwise current + display_image = state.interpolated_image if state.interpolated_image is not None else state.current_image + if display_image is not None: + label = "INT" if state.interpolated_image is not None else "CUR" + frame = self._draw_binary_at_edge( + frame, display_image, target_display_edge, + border_pct, color=(255, 255, 0), label=label + ) + + # Draw status box with extended info + frame = self._draw_comparison_status_box(frame, state) + + # Draw similarity history graph if available + if state.similarity_history and len(state.similarity_history) > 1: + frame = self._draw_similarity_graph(frame, state.similarity_history, state.threshold) + + return frame + + def _draw_binary_at_edge(self, frame: np.ndarray, binary_img: np.ndarray, + display_edge: str, border_pct: float, + color: tuple, label: str) -> np.ndarray: + """Draw a binary image overlaid at the specified display edge.""" + h, w = frame.shape[:2] + + # Convert binary to BGR and apply color tint + binary_bgr = cv2.cvtColor(binary_img, cv2.COLOR_GRAY2BGR) + mask = binary_img > 127 + binary_bgr[mask] = color + + # Rotate binary to match 90° CW display rotation + binary_bgr = cv2.rotate(binary_bgr, cv2.ROTATE_90_CLOCKWISE) + + alpha = 0.7 + + if display_edge == 'left': + border_w = int(w * border_pct) + resized = cv2.resize(binary_bgr, (border_w, h)) + frame[:, :border_w] = cv2.addWeighted( + frame[:, :border_w], 1 - alpha, resized, alpha, 0 + ) + cv2.putText(frame, label, (5, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) + + elif display_edge == 'right': + border_w = int(w * border_pct) + resized = cv2.resize(binary_bgr, (border_w, h)) + frame[:, w-border_w:] = cv2.addWeighted( + frame[:, w-border_w:], 1 - alpha, resized, alpha, 0 + ) + cv2.putText(frame, label, (w - border_w + 5, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) + + elif display_edge == 'top': + border_h = int(h * border_pct) + resized = cv2.resize(binary_bgr, (w, border_h)) + frame[:border_h, :] = cv2.addWeighted( + frame[:border_h, :], 1 - alpha, resized, alpha, 0 + ) + cv2.putText(frame, label, (5, border_h - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) + + elif display_edge == 'bottom': + border_h = int(h * border_pct) + resized = cv2.resize(binary_bgr, (w, border_h)) + frame[h-border_h:, :] = cv2.addWeighted( + frame[h-border_h:, :], 1 - alpha, resized, alpha, 0 + ) + cv2.putText(frame, label, (5, h - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) + + return frame + + def _draw_comparison_status_box(self, frame: np.ndarray, state) -> np.ndarray: + """Draw a status box showing similarity, offset, and match status""" + h, w = frame.shape[:2] + + # Position: top center + box_w, box_h = 240, 90 + box_x = (w - box_w) // 2 + box_y = 10 + + # Semi-transparent background + overlay = frame.copy() + cv2.rectangle(overlay, (box_x, box_y), (box_x + box_w, box_y + box_h), (0, 0, 0), -1) + frame = cv2.addWeighted(overlay, 0.6, frame, 0.4, 0) + + # Text positioning + tx = box_x + 10 + ty = box_y + 18 + line_h = 18 + + # Direction info + cv2.putText(frame, f"Dir: {state.direction}", (tx, ty), + cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 1) + ty += line_h + + # Similarity with color coding + sim = state.similarity + threshold = state.threshold + is_paused = self.scanner and self.scanner.paused + + if state.shift_detected or sim >= threshold: + sim_color = (0, 255, 0) # Green + elif is_paused: + sim_color = (255, 200, 0) # Cyan + else: + sim_color = (0, 165, 255) # Orange + + cv2.putText(frame, f"Similarity: {sim:.3f}", (tx, ty), + cv2.FONT_HERSHEY_SIMPLEX, 0.45, sim_color, 1) + + # Threshold on same line + cv2.putText(frame, f"(thr: {threshold:.2f})", (tx + 120, ty), + cv2.FONT_HERSHEY_SIMPLEX, 0.4, (150, 150, 150), 1) + ty += line_h + + # Offset info (from interpolation) + offset_text = f"Offset: {state.best_offset:.2f}" if state.best_offset > 0 else "Offset: --" + cv2.putText(frame, offset_text, (tx, ty), + cv2.FONT_HERSHEY_SIMPLEX, 0.45, (200, 200, 200), 1) + ty += line_h + + # Status + if state.shift_detected: + if state.peak_detected: + status = "PEAK MATCHED!" + else: + status = "MATCHED!" + status_color = (0, 255, 0) + elif sim >= threshold: + status = "ABOVE THRESHOLD" + status_color = (0, 255, 0) + elif is_paused: + status = "PAUSED" + status_color = (255, 200, 0) + else: + status = "Searching..." + status_color = (200, 200, 200) + + cv2.putText(frame, status, (tx, ty), + cv2.FONT_HERSHEY_SIMPLEX, 0.45, status_color, 1) + + return frame + + def _draw_similarity_graph(self, frame: np.ndarray, history: list, threshold: float) -> np.ndarray: + """Draw a small similarity history graph in the corner""" + h, w = frame.shape[:2] + + # Graph dimensions and position (bottom right) + graph_w, graph_h = 150, 60 + margin = 10 + gx = w - graph_w - margin + gy = h - graph_h - margin - 50 # Above the bottom edge region + + # Semi-transparent background + overlay = frame.copy() + cv2.rectangle(overlay, (gx, gy), (gx + graph_w, gy + graph_h), (0, 0, 0), -1) + frame = cv2.addWeighted(overlay, 0.5, frame, 0.5, 0) + + # Draw threshold line + thr_y = int(gy + graph_h - (threshold * graph_h)) + cv2.line(frame, (gx, thr_y), (gx + graph_w, thr_y), (0, 100, 255), 1) + + # Draw history line + if len(history) > 1: + # Take last N points that fit in the graph + max_points = graph_w // 2 + plot_history = history[-max_points:] if len(history) > max_points else history + + points = [] + for i, val in enumerate(plot_history): + px = gx + int((i / max(len(plot_history) - 1, 1)) * (graph_w - 1)) + py = int(gy + graph_h - (min(val, 1.0) * graph_h)) + points.append((px, py)) + + # Draw the line + for i in range(1, len(points)): + color = (0, 255, 0) if plot_history[i] >= threshold else (0, 165, 255) + cv2.line(frame, points[i-1], points[i], color, 2) + + # Label + cv2.putText(frame, "History", (gx + 5, gy + 12), + cv2.FONT_HERSHEY_SIMPLEX, 0.35, (200, 200, 200), 1) + + # Max value in history + if history: + max_sim = max(history) + cv2.putText(frame, f"max:{max_sim:.2f}", (gx + graph_w - 55, gy + 12), + cv2.FONT_HERSHEY_SIMPLEX, 0.3, (150, 255, 150), 1) + + return frame + + def _on_threshold_change(self, value): + """Handle threshold slider change""" + val = float(value) + self.threshold_label.config(text=f"{val:.2f}") + if self.scanner: + self.scanner.config.similarity_threshold = val + + def _update_similarity_display(self): + """Update the similarity labels in the UI""" + if not self.scanner: + return + + state = self.scanner.get_comparison_state() + + # Show status if tracking OR if paused with valid data + if state.is_tracking or (self.scanner.paused and state.reference_image is not None): + sim = state.similarity + self.similarity_label.config(text=f"{sim:.3f}") + + # Update offset and peak labels + self.offset_label.config(text=f"{state.best_offset:.2f}") + if state.similarity_history: + max_sim = max(state.similarity_history) + self.peak_label.config(text=f"{max_sim:.2f}") + + if state.shift_detected or sim >= state.threshold: + self.similarity_label.config(foreground='green') + status_text = "PEAK!" if state.peak_detected else "MATCH!" + self.match_status_label.config(text=status_text, foreground='green') + elif self.scanner.paused: + self.similarity_label.config(foreground='blue') + self.match_status_label.config(text="PAUSED", foreground='blue') + else: + self.similarity_label.config(foreground='orange') + self.match_status_label.config(text="Searching", foreground='gray') + else: + self.similarity_label.config(text="--", foreground='black') + self.match_status_label.config(text="", foreground='black') + self.offset_label.config(text="--") + self.peak_label.config(text="--") + + def _test_edge_comparison(self): + """Test edge detection on current frame""" if not self.scanner: self.log_message("Scanner not initialized") return try: - frame = self.camera.capture_frame() + results = self.scanner.test_edge_comparison() - # Detect features on all edges + self.log_message("Edge test results (raw frame coordinates):") for edge in ['left', 'right', 'top', 'bottom']: - region = self.scanner.get_edge_region(frame, edge) - kp, desc = self.scanner.detect_features(region) - count = len(kp) if kp else 0 - self.log_message(f" {edge.upper()}: {count} features") + r = results[edge] + self.log_message(f" {edge.upper()}: {r['shape']}, white: {r['white_ratio']:.1%}") - # Also update scanner's visualization data - self.scanner._update_edge_features(frame) - - # Enable feature view - self.show_features_var.set(True) - - self.log_message("Feature detection test complete - enable 'Features' checkbox to see overlay") + self.log_message(f" X-axis (top↔bottom): {results['x_axis_sim']:.3f}") + self.log_message(f" Y-axis (left↔right): {results['y_axis_sim']:.3f}") except Exception as e: - self.log_message(f"Feature detection error: {e}") + self.log_message(f"Edge test error: {e}") + + def _test_interpolation(self): + """Test interpolation matching""" + if not self.scanner: + self.log_message("Scanner not initialized") + return + + try: + self.log_message("Starting interpolation test (5 frames)...") + + def run_test(): + results = self.scanner.test_interpolation(num_frames=5) + self.root.after(0, lambda: self._show_interp_results(results)) + + threading.Thread(target=run_test, daemon=True).start() + + except Exception as e: + self.log_message(f"Interpolation test error: {e}") + + def _show_interp_results(self, results): + """Display interpolation test results""" + self.log_message("Interpolation test results:") + for fr in results['frames']: + self.log_message(f" Frame {fr['frame_idx']}: " + f"direct={fr['direct_similarity']:.3f}, " + f"interp={fr['interpolated_similarity']:.3f}, " + f"offset={fr['best_offset']:.2f}") # ========================================================================= # Scanner Handlers @@ -353,16 +772,21 @@ class AppGUI: def _update_scan_config(self): """Update scanner config from GUI values""" - self.scan_config.autofocus_every_n_tiles = int(self.af_every_var.get()) - self.scan_config.autofocus_every_row = self.af_every_row_var.get() + self.scan_config.similarity_threshold = self.threshold_var.get() + self.scan_config.use_interpolation = self.use_interpolation_var.get() + self.scan_config.num_interpolations = self.num_interp_var.get() + self.scan_config.use_peak_detection = self.use_peak_detection_var.get() + self.scan_config.peak_window_size = self.peak_window_var.get() + self.scan_config.peak_drop_threshold = self.peak_drop_var.get() + self.scan_config.comparison_method = self.comparison_method_var.get() + + # Reinitialize scanner with new config self._init_scanner() def _start_scan(self): self._update_scan_config() - # Enable feature view during scan - self.show_features_var.set(True) if self.scanner.start(): - self.scan_start_btn.config(state='disabled') + self.scan_start_btn.config(state='disabled', bg='#888888') self.scan_pause_btn.config(state='normal') self.scan_stop_btn.config(state='normal') self.scan_status_label.config(text="Scanning") @@ -371,12 +795,12 @@ class AppGUI: def _pause_scan(self): if self.scanner.paused: self.scanner.resume() - self.scan_pause_btn.config(text="⏸") + self.scan_pause_btn.config(text="⏸", bg='#2196F3') self.scan_status_label.config(text="Scanning") self.scan_progress_bar.start(10) else: self.scanner.pause() - self.scan_pause_btn.config(text="▶") + self.scan_pause_btn.config(text="▶", bg='#4CAF50') self.scan_status_label.config(text="Paused") self.scan_progress_bar.stop() @@ -385,8 +809,8 @@ class AppGUI: self._scan_finished() def _scan_finished(self): - self.scan_start_btn.config(state='normal') - self.scan_pause_btn.config(state='disabled', text="⏸") + self.scan_start_btn.config(state='normal', bg='#4CAF50') + self.scan_pause_btn.config(state='disabled', text="⏸", bg='#2196F3') self.scan_stop_btn.config(state='disabled') self.scan_status_label.config(text="Idle") self.scan_progress_bar.stop() @@ -487,14 +911,14 @@ class AppGUI: if self.motion.is_moving(axis): self.motion.stop_axis(axis) self.motion.start_movement(axis) - self.move_buttons[axis].config(text="Moving", bg='orange') + self.move_buttons[axis].config(text="Moving", bg='#FF9800') def _toggle_movement(self, axis): if self.motion.is_moving(axis): self._stop_axis(axis) else: self.motion.start_movement(axis) - self.move_buttons[axis].config(text="Moving", bg='orange') + self.move_buttons[axis].config(text="Moving", bg='#FF9800') def _stop_axis(self, axis): self.motion.stop_axis(axis) @@ -579,22 +1003,18 @@ class AppGUI: score = self.autofocus.get_focus_score() self.focus_score_label.config(text=f"Focus: {score:.1f}") - # Apply overlays + # Apply standard overlays frame = self._draw_overlays(frame) - # Apply feature overlay if enabled - if self.show_features_var.get() and self.scanner: - # Update edge features periodically - self.scanner._update_edge_features(frame) - # Get overlay from scanner - frame = self.scanner.get_feature_overlay(frame) + # Apply comparison overlay (shows binary edges during tracking) + frame = self._draw_comparison_overlay(frame) frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - # Scale to fit - maximize for vertical monitor + # Scale to fit h, w = frame.shape[:2] max_h = 700 - max_w = 650 + max_w = 680 scale = min(max_h / h, max_w / w) if scale < 1: @@ -632,10 +1052,10 @@ class AppGUI: def _draw_edge_regions(self, frame, color=(255, 255, 0), thickness=1): h, w = frame.shape[:2] bh, bw = int(h * 0.10), int(w * 0.10) - cv2.rectangle(frame, (0, 0), (bw, h), color, thickness) - cv2.rectangle(frame, (w - bw, 0), (w, h), color, thickness) - cv2.rectangle(frame, (0, 0), (w, bh), color, thickness) - cv2.rectangle(frame, (0, h - bh), (w, h), color, thickness) + cv2.rectangle(frame, (0, 0), (bw, h), color, thickness) # Left + cv2.rectangle(frame, (w - bw, 0), (w, h), color, thickness) # Right + cv2.rectangle(frame, (0, 0), (w, bh), color, thickness) # Top + cv2.rectangle(frame, (0, h - bh), (w, h), color, thickness) # Bottom return frame # ========================================================================= @@ -648,6 +1068,7 @@ class AppGUI: self.update_camera() self._process_serial_queue() self._process_tile_queue() + self._update_similarity_display() if not self.autofocus.is_running(): self.af_button.config(state='normal') diff --git a/src/scanner.py b/src/scanner.py index aa193b2..0d4a353 100644 --- a/src/scanner.py +++ b/src/scanner.py @@ -1,10 +1,11 @@ """ Scanner Module - Automated slide scanning with visual feedback navigation -Uses feature tracking to detect tile boundaries instead of step counting. +Uses binary template matching to detect tile boundaries instead of step counting. Camera orientation: Portrait (height > width), X=left/right, Y=up/down -Enhanced with feature visualization for debugging. +Enhanced with interpolation-based matching to handle discrete stepper motor steps +that can skip over optimal alignment points. """ import cv2 @@ -12,9 +13,8 @@ import numpy as np import time import threading from dataclasses import dataclass, field -from typing import List, Tuple, Optional, Callable, Dict +from typing import List, Optional, Callable, Tuple from enum import Enum -from collections import deque class ScanDirection(Enum): @@ -31,7 +31,7 @@ class Tile: image: np.ndarray row: int col: int - x_pos: int # Grid position + x_pos: int y_pos: int focus_score: float timestamp: float @@ -41,72 +41,73 @@ class Tile: class ScanConfig: """Scanner configuration""" # Tile extraction - tile_percentage: float = 0.80 # Center 80% is the tile - border_percentage: float = 0.10 # 10% on each side for tracking + tile_percentage: float = 0.40 # Center portion is the tile + border_percentage: float = 0.30 # Border on each side for tracking - # Feature detection - feature_detector: str = 'SIFT' # 'ORB', 'SIFT', or 'AKAZE' - min_features: int = 10 # Minimum features to track - match_threshold: float = 0.75 # Feature match confidence - min_good_matches: int = 5 # Matches needed to confirm movement + # Binary comparison settings + similarity_threshold: float = 0.12 # 0-1, higher = stricter matching + comparison_method: str = 'template' # 'template', 'ssim', 'mse', 'hst', 'phase' + + # Interpolation settings (NEW) + num_interpolations: int = 10 # Number of sub-steps to simulate between frames + use_interpolation: bool = True # Enable/disable interpolation matching + use_peak_detection: bool = True # Stop at peak similarity, not just threshold + peak_window_size: int = 5 # Number of frames to track for peak detection + peak_drop_threshold: float = 0.02 # How much similarity must drop to confirm peak + + # Adaptive threshold settings + adaptive_block_size: int = 11 # Must be odd + adaptive_c: int = 2 # Constant subtracted from mean # Movement timing move_check_interval: float = 0.1 # Seconds between frame checks settle_time: float = 0.2 # Seconds to wait after stopping - max_move_time: float = 10.0 # Safety timeout per tile + max_move_time: float = 120.0 # Safety timeout per tile # Scan limits (number of tiles) - max_tiles_per_row: int = 50 # Safety limit - max_rows: int = 50 # Safety limit + max_tiles_per_row: int = 10 + max_rows: int = 10 # Scan pattern start_direction: ScanDirection = ScanDirection.RIGHT # Autofocus - autofocus_every_n_tiles: int = -1 # Refocus periodically + autofocus_every_n_tiles: int = -1 autofocus_every_row: bool = False @dataclass -class FeatureVisualization: - """Stores feature visualization data for debugging overlay""" - # Current features on each edge - left_features: List[cv2.KeyPoint] = field(default_factory=list) - right_features: List[cv2.KeyPoint] = field(default_factory=list) - top_features: List[cv2.KeyPoint] = field(default_factory=list) - bottom_features: List[cv2.KeyPoint] = field(default_factory=list) - - # Reference features being tracked (from leading edge) - reference_features: List[cv2.KeyPoint] = field(default_factory=list) +class ComparisonState: + """Current state of edge comparison for visualization""" + is_tracking: bool = False + is_paused: bool = False + direction: str = '' reference_edge: str = '' - - # Matched features on target edge - matched_features: List[cv2.KeyPoint] = field(default_factory=list) target_edge: str = '' - # Match quality info - num_good_matches: int = 0 - match_threshold: int = 5 + # Images for display (already binarized) + reference_image: Optional[np.ndarray] = None + current_image: Optional[np.ndarray] = None + interpolated_image: Optional[np.ndarray] = None # NEW: Show interpolated frame - # Feature position history for trails (deque of (x, y, timestamp)) - feature_trails: Dict[int, deque] = field(default_factory=dict) - - # Active tracking state - is_tracking: bool = False - tracking_direction: str = '' - - # Movement detection info + # Comparison result + similarity: float = 0.0 + threshold: float = 0.65 shift_detected: bool = False - movement_progress: float = 0.0 # Estimated 0-1 progress + + # Interpolation info (NEW) + best_offset: float = 0.0 # 0-1, where in the interpolation the best match was found + similarity_history: List[float] = field(default_factory=list) + peak_detected: bool = False class Scanner: """ Automated slide scanner using visual feedback for navigation. - Key principle: Use camera as position sensor, not step counting. - Boundary detection: Feature matching - when features stop shifting, - we've likely hit the edge of the slide content. + Uses binary template matching with interpolation to detect when tile + content has shifted from one edge to the opposite edge, handling the + discrete nature of stepper motor steps. """ def __init__(self, camera, motion_controller, autofocus_controller=None, @@ -114,16 +115,6 @@ class Scanner: on_tile_captured: Callable[[Tile], None] = None, on_log: Callable[[str], None] = None, on_progress: Callable[[int, int], None] = None): - """ - Args: - camera: Camera instance - motion_controller: MotionController instance - autofocus_controller: Optional AutofocusController - config: ScanConfig settings - on_tile_captured: Callback(tile) when tile is captured - on_log: Callback(message) for logging - on_progress: Callback(current, total) for progress updates - """ self.camera = camera self.motion = motion_controller self.autofocus = autofocus_controller @@ -142,32 +133,12 @@ class Scanner: self.current_col = 0 self.tiles_captured = 0 - # Feature detector - self._init_feature_detector() + # Comparison state for visualization + self.comparison_state = ComparisonState() + self._state_lock = threading.Lock() # Thread self._thread = None - - # Feature visualization for debugging - self.feature_viz = FeatureVisualization() - self._viz_lock = threading.Lock() - - # Trail history length (frames) - self.trail_history_length = 30 - - def _init_feature_detector(self): - """Initialize the feature detector based on config""" - if self.config.feature_detector == 'ORB': - self.detector = cv2.ORB_create(nfeatures=500) - self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False) - elif self.config.feature_detector == 'SIFT': - self.detector = cv2.SIFT_create() - self.matcher = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False) - elif self.config.feature_detector == 'AKAZE': - self.detector = cv2.AKAZE_create() - self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False) - else: - raise ValueError(f"Unknown feature detector: {self.config.feature_detector}") def log(self, message: str): """Log a message""" @@ -176,221 +147,12 @@ class Scanner: print(f"[Scanner] {message}") # ========================================================================= - # Feature Visualization + # Edge Region Extraction # ========================================================================= - def get_feature_overlay(self, frame: np.ndarray) -> np.ndarray: - """ - Draw feature visualization overlay on the frame. - - Args: - frame: Input frame to draw on (will be copied) - - Returns: - Frame with feature overlays drawn - """ - overlay = frame.copy() - h, w = frame.shape[:2] - border_h = int(h * self.config.border_percentage) - border_w = int(w * self.config.border_percentage) - - with self._viz_lock: - viz = self.feature_viz - - # Color scheme - COLORS = { - 'left': (255, 100, 100), # Blue-ish - 'right': (100, 100, 255), # Red-ish - 'top': (100, 255, 100), # Green-ish - 'bottom': (255, 255, 100), # Cyan-ish - 'reference': (0, 255, 255), # Yellow - reference features - 'matched': (0, 255, 0), # Green - matched features - 'trail': (255, 0, 255), # Magenta - feature trails - } - - # Draw edge region boundaries (semi-transparent) - overlay_alpha = overlay.copy() - - # Left edge region - cv2.rectangle(overlay_alpha, (0, 0), (border_w, h), COLORS['left'], -1) - # Right edge region - cv2.rectangle(overlay_alpha, (w - border_w, 0), (w, h), COLORS['right'], -1) - # Top edge region - cv2.rectangle(overlay_alpha, (0, 0), (w, border_h), COLORS['top'], -1) - # Bottom edge region - cv2.rectangle(overlay_alpha, (0, h - border_h), (w, h), COLORS['bottom'], -1) - - # Blend with transparency - cv2.addWeighted(overlay_alpha, 0.1, overlay, 0.9, 0, overlay) - - # Draw features on each edge - self._draw_edge_features(overlay, viz.left_features, 'left', - border_w, COLORS['left']) - self._draw_edge_features(overlay, viz.right_features, 'right', - border_w, COLORS['right'], x_offset=w-border_w) - self._draw_edge_features(overlay, viz.top_features, 'top', - border_h, COLORS['top']) - self._draw_edge_features(overlay, viz.bottom_features, 'bottom', - border_h, COLORS['bottom'], y_offset=h-border_h) - - # Draw reference features with larger circles - if viz.is_tracking and viz.reference_features: - ref_offset = self._get_edge_offset(viz.reference_edge, w, h, border_w, border_h) - for kp in viz.reference_features: - pt = (int(kp.pt[0] + ref_offset[0]), int(kp.pt[1] + ref_offset[1])) - cv2.circle(overlay, pt, 8, COLORS['reference'], 2) - cv2.circle(overlay, pt, 3, COLORS['reference'], -1) - - # Draw matched features with connecting lines - if viz.is_tracking and viz.matched_features: - target_offset = self._get_edge_offset(viz.target_edge, w, h, border_w, border_h) - for kp in viz.matched_features: - pt = (int(kp.pt[0] + target_offset[0]), int(kp.pt[1] + target_offset[1])) - cv2.circle(overlay, pt, 8, COLORS['matched'], 2) - cv2.circle(overlay, pt, 3, COLORS['matched'], -1) - - # Draw feature trails - for trail_id, trail in viz.feature_trails.items(): - if len(trail) > 1: - points = [(int(p[0]), int(p[1])) for p in trail] - for i in range(1, len(points)): - # Fade trail from bright to dim - alpha = i / len(points) - color = tuple(int(c * alpha) for c in COLORS['trail']) - cv2.line(overlay, points[i-1], points[i], color, 2) - - # Draw tracking status info - self._draw_tracking_info(overlay, viz, w, h) - - return overlay - - def _draw_edge_features(self, frame, keypoints, edge, region_size, color, - x_offset=0, y_offset=0): - """Draw feature points for an edge region""" - for kp in keypoints: - pt = (int(kp.pt[0] + x_offset), int(kp.pt[1] + y_offset)) - # Draw circle with size proportional to keypoint size - size = max(3, int(kp.size / 4)) if kp.size > 0 else 4 - cv2.circle(frame, pt, size, color, 1) - # Draw center dot - cv2.circle(frame, pt, 2, color, -1) - - def _get_edge_offset(self, edge, w, h, border_w, border_h): - """Get x, y offset for an edge region""" - if edge == 'left': - return (0, 0) - elif edge == 'right': - return (w - border_w, 0) - elif edge == 'top': - return (0, 0) - elif edge == 'bottom': - return (0, h - border_h) - return (0, 0) - - def _draw_tracking_info(self, frame, viz, w, h): - """Draw tracking status text overlay""" - y_pos = 30 - line_height = 25 - - def draw_text(text, color=(255, 255, 255)): - nonlocal y_pos - cv2.putText(frame, text, (10, y_pos), - cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 3) - cv2.putText(frame, text, (10, y_pos), - cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 1) - y_pos += line_height - - if viz.is_tracking: - draw_text(f"TRACKING: {viz.tracking_direction}", (0, 255, 255)) - draw_text(f"Ref edge: {viz.reference_edge} ({len(viz.reference_features)} features)") - draw_text(f"Target edge: {viz.target_edge}") - - # Match status with color coding - match_color = (0, 255, 0) if viz.num_good_matches >= viz.match_threshold else (0, 165, 255) - draw_text(f"Matches: {viz.num_good_matches}/{viz.match_threshold}", match_color) - - if viz.shift_detected: - draw_text("SHIFT DETECTED!", (0, 255, 0)) - - # Progress bar - bar_width = 200 - bar_height = 15 - bar_x = 10 - bar_y = y_pos - progress = min(1.0, viz.movement_progress) - cv2.rectangle(frame, (bar_x, bar_y), - (bar_x + bar_width, bar_y + bar_height), (100, 100, 100), -1) - cv2.rectangle(frame, (bar_x, bar_y), - (bar_x + int(bar_width * progress), bar_y + bar_height), - (0, 255, 0), -1) - cv2.rectangle(frame, (bar_x, bar_y), - (bar_x + bar_width, bar_y + bar_height), (255, 255, 255), 1) - else: - # Show feature counts on all edges - draw_text(f"L:{len(viz.left_features)} R:{len(viz.right_features)} " - f"T:{len(viz.top_features)} B:{len(viz.bottom_features)}") - - def _update_edge_features(self, frame): - """Update feature visualization for all edges""" - with self._viz_lock: - self.feature_viz.left_features = self._detect_edge_keypoints(frame, 'left') - self.feature_viz.right_features = self._detect_edge_keypoints(frame, 'right') - self.feature_viz.top_features = self._detect_edge_keypoints(frame, 'top') - self.feature_viz.bottom_features = self._detect_edge_keypoints(frame, 'bottom') - - def _detect_edge_keypoints(self, frame, edge) -> List[cv2.KeyPoint]: - """Detect keypoints in an edge region""" - region = self.get_edge_region(frame, edge) - gray = cv2.cvtColor(region, cv2.COLOR_BGR2GRAY) if len(region.shape) == 3 else region - keypoints, _ = self.detector.detectAndCompute(gray, None) - return list(keypoints) if keypoints else [] - - def _update_tracking_visualization(self, ref_kps, ref_edge, target_edge, - current_frame, good_matches, direction): - """Update visualization during tracking""" - with self._viz_lock: - viz = self.feature_viz - viz.is_tracking = True - viz.tracking_direction = direction - viz.reference_edge = ref_edge - viz.target_edge = target_edge - viz.reference_features = list(ref_kps) if ref_kps else [] - viz.num_good_matches = len(good_matches) if good_matches else 0 - viz.match_threshold = self.config.min_good_matches - - # Detect features on target edge - target_kps = self._detect_edge_keypoints(current_frame, target_edge) - viz.matched_features = target_kps # Show all features on target edge - - def _clear_tracking_visualization(self): - """Clear tracking visualization state""" - with self._viz_lock: - viz = self.feature_viz - viz.is_tracking = False - viz.tracking_direction = '' - viz.reference_features = [] - viz.matched_features = [] - viz.num_good_matches = 0 - viz.shift_detected = False - viz.movement_progress = 0.0 - viz.feature_trails.clear() - - # ========================================================================= - # Tile Extraction - # ========================================================================= - - def extract_tile(self, frame: np.ndarray) -> np.ndarray: - """Extract the center tile (80%) from a frame""" - h, w = frame.shape[:2] - border_h = int(h * self.config.border_percentage) - border_w = int(w * self.config.border_percentage) - - tile = frame[border_h:h-border_h, border_w:w-border_w] - return tile.copy() - def get_edge_region(self, frame: np.ndarray, edge: str) -> np.ndarray: """ - Extract a strip from the specified edge for feature tracking. + Extract a strip from the specified edge for comparison. Args: frame: Input frame @@ -414,83 +176,351 @@ class Scanner: else: raise ValueError(f"Unknown edge: {edge}") + def extract_tile(self, frame: np.ndarray) -> np.ndarray: + """Extract the center tile from a frame""" + h, w = frame.shape[:2] + border_h = int(h * self.config.border_percentage) + border_w = int(w * self.config.border_percentage) + + tile = frame[border_h:h-border_h, border_w:w-border_w] + return tile.copy() + # ========================================================================= - # Feature Detection and Matching + # Binary Image Processing # ========================================================================= - def detect_features(self, region: np.ndarray) -> Tuple[list, np.ndarray]: + def prepare_edge_image(self, region: np.ndarray) -> np.ndarray: """ - Detect features in a region. + Convert edge region to an enhanced gradient map. - Returns: - (keypoints, descriptors) + Uses Sobel gradients to highlight structure (cells, edges) regardless + of brightness, then thresholds and dilates to make features more + prominent for matching. """ - gray = cv2.cvtColor(region, cv2.COLOR_BGR2GRAY) if len(region.shape) == 3 else region - keypoints, descriptors = self.detector.detectAndCompute(gray, None) - return keypoints, descriptors + # 1. Convert to grayscale + if len(region.shape) == 3: + gray = cv2.cvtColor(region, cv2.COLOR_BGR2GRAY) + else: + gray = region + + # 2. Gaussian Blur + blurred = cv2.GaussianBlur(gray, (5, 5), 0) + + # 3. Calculate Gradient Magnitude (Sobel) + grad_x = cv2.Sobel(blurred, cv2.CV_32F, 1, 0, ksize=3) + grad_y = cv2.Sobel(blurred, cv2.CV_32F, 0, 1, ksize=3) + magnitude = cv2.magnitude(grad_x, grad_y) + + # 4. Normalize to 0-255 + magnitude = cv2.normalize(magnitude, None, 0, 255, cv2.NORM_MINMAX) + magnitude = np.uint8(magnitude) + return magnitude + + # # 5. Threshold to make features more prominent + # _, binary = cv2.threshold(magnitude, 100, 255, cv2.THRESH_BINARY) + + # # 6. Dilate to make features "fatter" and overlap more + # kernel = np.ones((3, 3), np.uint8) + # dilated = cv2.dilate(binary, kernel, iterations=2) + + # return dilated - def match_features(self, desc1: np.ndarray, desc2: np.ndarray) -> List: - """ - Match features between two descriptor sets. - - Returns: - List of good matches - """ - if desc1 is None or desc2 is None: - return [] - - if len(desc1) < 2 or len(desc2) < 2: - return [] - - # KNN match - try: - matches = self.matcher.knnMatch(desc1, desc2, k=2) - except cv2.error: - return [] - - # Apply ratio test - good_matches = [] - for match_pair in matches: - if len(match_pair) == 2: - m, n = match_pair - if m.distance < self.config.match_threshold * n.distance: - good_matches.append(m) - - return good_matches + # ========================================================================= + # Image Comparison Methods + # ========================================================================= - def features_have_shifted(self, - reference_desc: np.ndarray, - current_frame: np.ndarray, - from_edge: str, - to_edge: str) -> bool: + def compare_edges(self, reference: np.ndarray, target: np.ndarray) -> float: """ - Check if features from one edge now appear on the opposite edge. + Compare two binary edge images. Args: - reference_desc: Descriptors from the reference edge - current_frame: Current camera frame - from_edge: Edge where features were originally ('left', 'right') - to_edge: Edge to check for features ('right', 'left') + reference: Binary image from reference edge + target: Binary image from target edge + + Returns: + Similarity score (0.0 to 1.0) + """ + # Ensure same size + if reference.shape != target.shape: + target = cv2.resize(target, (reference.shape[1], reference.shape[0])) + + method = self.config.comparison_method + + if method == 'template': + return self._compare_template(reference, target) + elif method == 'ssim': + return self._compare_ssim(reference, target) + elif method == 'mse': + return self._compare_mse(reference, target) + elif method == 'hst': + return self._compare_histogram(reference, target) + elif method == 'phase': + return self._compare_phase_correlation(reference, target) + else: + return self._compare_template(reference, target) + + def _compare_histogram(self, reference: np.ndarray, target: np.ndarray) -> float: + """Compare gradient histograms - works better for sparse images""" + hist_ref = cv2.calcHist([reference], [0], None, [256], [0, 256]) + hist_target = cv2.calcHist([target], [0], None, [256], [0, 256]) + + cv2.normalize(hist_ref, hist_ref) + cv2.normalize(hist_target, hist_target) + + similarity = cv2.compareHist(hist_ref, hist_target, cv2.HISTCMP_CORREL) + return max(0.0, similarity) + + def _compare_template(self, reference: np.ndarray, target: np.ndarray) -> float: + """Template matching using normalized cross-correlation""" + result = cv2.matchTemplate(target, reference, cv2.TM_CCOEFF_NORMED) + min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) + return max(0.0, max_val) + + def _compare_ssim(self, reference: np.ndarray, target: np.ndarray) -> float: + """Structural Similarity Index""" + try: + from skimage.metrics import structural_similarity as ssim + score, _ = ssim(reference, target, full=True) + return float(score) + except ImportError: + self.log("skimage not available, falling back to template matching") + return self._compare_template(reference, target) + + def _compare_mse(self, reference: np.ndarray, target: np.ndarray) -> float: + """Mean Squared Error (inverted to similarity)""" + mse = np.mean((reference.astype(float) - target.astype(float)) ** 2) + similarity = 1 - (mse / 65025) + return max(0.0, similarity) + + def _compare_phase_correlation(self, reference: np.ndarray, target: np.ndarray) -> float: + """ + Phase correlation - good for finding shifts between images. + Returns a similarity score based on correlation peak strength. + """ + ref_f = np.float32(reference) + target_f = np.float32(target) + + # Compute FFT + f_ref = np.fft.fft2(ref_f) + f_target = np.fft.fft2(target_f) + + # Cross-power spectrum + cross_power = (f_ref * np.conj(f_target)) / (np.abs(f_ref * np.conj(f_target)) + 1e-10) + correlation = np.fft.ifft2(cross_power) + correlation = np.abs(correlation) + + # Peak strength as similarity measure + peak_val = np.max(correlation) + # Normalize by image size for consistent scaling + normalized_peak = peak_val / np.sqrt(reference.size) + + return min(1.0, normalized_peak) + + # ========================================================================= + # Interpolation-Based Matching (NEW) + # ========================================================================= + + def _interpolated_match(self, prev_retreat: np.ndarray, curr_retreat: np.ndarray, + reference: np.ndarray, movement_axis: str = 'horizontal' + ) -> Tuple[float, float, np.ndarray]: + """ + Create interpolated frames simulating sub-step positions between + two consecutive frame captures. + + The movement_axis determines how to blend the edge strips: + - 'horizontal': Blend along columns (for top/bottom edge strips during X movement) + - 'vertical': Blend along rows (for left/right edge strips during Y movement) + + Args: + prev_retreat: Previous retreating edge binary image + curr_retreat: Current retreating edge binary image + reference: Reference (leading edge) binary image to match against + movement_axis: 'horizontal' for X movement, 'vertical' for Y movement Returns: - True if enough features matched (tile movement complete) + Tuple of (best_similarity, best_offset, best_interpolated_image) + - best_offset: 0.0 = prev frame, 1.0 = curr frame """ - # Get the target edge region - target_region = self.get_edge_region(current_frame, to_edge) + h, w = prev_retreat.shape[:2] + best_similarity = 0.0 + best_offset = 0.0 + best_interpolated = curr_retreat.copy() - # Detect features in target region - _, target_desc = self.detect_features(target_region) + num_interp = self.config.num_interpolations - # Match with reference - good_matches = self.match_features(reference_desc, target_desc) + for i in range(num_interp + 1): + alpha = i / num_interp # 0.0 to 1.0 + + if movement_axis == 'vertical': + # X movement: blend along columns of horizontal edge strips + interpolated = self._create_horizontal_interpolation( + prev_retreat, curr_retreat, alpha + ) + else: + # Y movement: blend along rows of vertical edge strips + interpolated = self._create_vertical_interpolation( + prev_retreat, curr_retreat, alpha + ) + + similarity = self.compare_edges(reference, interpolated) + + if similarity > best_similarity: + best_similarity = similarity + best_offset = alpha + best_interpolated = interpolated.copy() - return len(good_matches) >= self.config.min_good_matches + return best_similarity, best_offset, best_interpolated - def count_features_on_edge(self, frame: np.ndarray, edge: str) -> int: - """Count features visible on an edge region""" - region = self.get_edge_region(frame, edge) - kp, desc = self.detect_features(region) - return len(kp) if kp else 0 + def _create_horizontal_interpolation(self, prev: np.ndarray, curr: np.ndarray, + alpha: float) -> np.ndarray: + """ + Create interpolated frame by blending along columns (width dimension). + Used for X movement with top/bottom edge strips (horizontal strips). + + At alpha=0: returns prev + At alpha=1: returns curr + At alpha=0.5: left half from shifted prev, right half from curr + """ + h, w = prev.shape[:2] + split_col = int(w * alpha) + + if split_col <= 0: + return prev.copy() + elif split_col >= w: + return curr.copy() + + interpolated = np.zeros_like(prev) + + # Content from prev that has shifted left (right portion moves to left) + interpolated[:, :w-split_col] = prev[:, split_col:] + + # New content from curr appearing on the right + interpolated[:, w-split_col:] = curr[:, :split_col] + + return interpolated + + def _create_vertical_interpolation(self, prev: np.ndarray, curr: np.ndarray, + alpha: float) -> np.ndarray: + """ + Create interpolated frame by blending along rows (height dimension). + Used for Y movement with left/right edge strips (vertical strips). + + At alpha=0: returns prev + At alpha=1: returns curr + At alpha=0.5: top half from shifted prev, bottom half from curr + """ + h, w = prev.shape[:2] + split_row = int(h * alpha) + + if split_row <= 0: + return prev.copy() + elif split_row >= h: + return curr.copy() + + interpolated = np.zeros_like(prev) + + # Content from prev that has shifted up + interpolated[:h-split_row, :] = prev[split_row:, :] + + # New content from curr appearing at the bottom + interpolated[h-split_row:, :] = curr[:split_row, :] + + return interpolated + + def _detect_peak(self, similarity_history: List[float]) -> Tuple[bool, int]: + """ + Detect if we've passed the similarity peak. + + Returns: + Tuple of (peak_detected, peak_index) + """ + if len(similarity_history) < self.config.peak_window_size: + return False, -1 + + window = similarity_history[-self.config.peak_window_size:] + max_idx = np.argmax(window) + max_val = window[max_idx] + + # Peak is detected if: + # 1. The maximum is not at the edges of the window + # 2. The maximum exceeded our threshold + # 3. Current value has dropped from the peak + if max_idx > 0 and max_idx < len(window) - 1: + if max_val >= self.config.similarity_threshold: + current_drop = max_val - window[-1] + if current_drop >= self.config.peak_drop_threshold: + # Peak confirmed + actual_peak_idx = len(similarity_history) - self.config.peak_window_size + max_idx + return True, actual_peak_idx + + return False, -1 + + # ========================================================================= + # Comparison State Access (for GUI) + # ========================================================================= + + def get_comparison_state(self) -> ComparisonState: + """Get current comparison state (thread-safe)""" + with self._state_lock: + state = ComparisonState( + is_tracking=self.comparison_state.is_tracking, + is_paused=self.paused, + direction=self.comparison_state.direction, + reference_edge=self.comparison_state.reference_edge, + target_edge=self.comparison_state.target_edge, + similarity=self.comparison_state.similarity, + threshold=self.comparison_state.threshold, + shift_detected=self.comparison_state.shift_detected, + best_offset=self.comparison_state.best_offset, + peak_detected=self.comparison_state.peak_detected, + similarity_history=self.comparison_state.similarity_history.copy() + ) + if self.comparison_state.reference_image is not None: + state.reference_image = self.comparison_state.reference_image.copy() + if self.comparison_state.current_image is not None: + state.current_image = self.comparison_state.current_image.copy() + if self.comparison_state.interpolated_image is not None: + state.interpolated_image = self.comparison_state.interpolated_image.copy() + return state + + def _update_comparison_state(self, is_tracking: bool, direction: str = '', + ref_edge: str = '', target_edge: str = '', + ref_image: np.ndarray = None, + current_image: np.ndarray = None, + interpolated_image: np.ndarray = None, + similarity: float = 0.0, + shift_detected: bool = False, + best_offset: float = 0.0, + peak_detected: bool = False, + similarity_history: List[float] = None): + """Update comparison state (thread-safe)""" + with self._state_lock: + self.comparison_state.is_tracking = is_tracking + self.comparison_state.direction = direction + self.comparison_state.reference_edge = ref_edge + self.comparison_state.target_edge = target_edge + self.comparison_state.similarity = similarity + self.comparison_state.threshold = self.config.similarity_threshold + self.comparison_state.shift_detected = shift_detected + self.comparison_state.best_offset = best_offset + self.comparison_state.peak_detected = peak_detected + + if similarity_history is not None: + self.comparison_state.similarity_history = similarity_history.copy() + + if ref_image is not None: + self.comparison_state.reference_image = ref_image.copy() + if current_image is not None: + self.comparison_state.current_image = current_image.copy() + if interpolated_image is not None: + self.comparison_state.interpolated_image = interpolated_image.copy() + + def _clear_comparison_state(self): + """Clear comparison state (but not if paused)""" + if self.paused: + return + with self._state_lock: + self.comparison_state = ComparisonState() # ========================================================================= # Movement with Visual Feedback @@ -498,10 +528,10 @@ class Scanner: def move_until_tile_shifted(self, direction: ScanDirection) -> bool: """ - Move in a direction until features indicate we've moved one tile. + Move in a direction until visual comparison indicates tile shift. - Uses feature tracking to detect when tile content has shifted. - Returns False if timeout or no more content detected. + Uses interpolation between consecutive frames to catch the optimal + alignment point even when discrete stepper steps skip over it. Args: direction: ScanDirection to move @@ -509,52 +539,63 @@ class Scanner: Returns: True if tile shift detected, False if timeout/boundary """ - # Determine edges based on direction + # Determine edges based on direction (accounting for 90° camera rotation) + # The movement_axis refers to how to interpolate the EDGE STRIPS: + # - X movement: top/bottom edges are horizontal strips, content travels VERTICALLY + # across the frame, so interpolate HORIZONTALLY along the strip width + # - Y movement: left/right edges are vertical strips, content travels HORIZONTALLY + # across the frame, so interpolate VERTICALLY along the strip height if direction == ScanDirection.RIGHT: - from_edge, to_edge = 'right', 'left' - move_cmd = lambda: self.motion.start_movement('X') - self.motion.axis_direction['X'] = 1 - axis = 'X' - elif direction == ScanDirection.LEFT: - from_edge, to_edge = 'left', 'right' - move_cmd = lambda: self.motion.start_movement('X') - self.motion.axis_direction['X'] = -1 - axis = 'X' - elif direction == ScanDirection.DOWN: - from_edge, to_edge = 'bottom', 'top' - move_cmd = lambda: self.motion.start_movement('Y') - self.motion.axis_direction['Y'] = -1 - axis = 'Y' - elif direction == ScanDirection.UP: from_edge, to_edge = 'top', 'bottom' - move_cmd = lambda: self.motion.start_movement('Y') - self.motion.axis_direction['Y'] = 1 + axis = 'X' + movement_axis = 'horizontal' # Interpolate along strip width + self.motion.axis_direction['X'] = 1 + elif direction == ScanDirection.LEFT: + from_edge, to_edge = 'bottom', 'top' + axis = 'X' + movement_axis = 'horizontal' # Interpolate along strip width + self.motion.axis_direction['X'] = -1 + elif direction == ScanDirection.DOWN: + from_edge, to_edge = 'right', 'left' axis = 'Y' + movement_axis = 'vertical' # Interpolate along strip height + self.motion.axis_direction['Y'] = -1 + elif direction == ScanDirection.UP: + from_edge, to_edge = 'left', 'right' + axis = 'Y' + movement_axis = 'vertical' # Interpolate along strip height + self.motion.axis_direction['Y'] = 1 else: raise ValueError(f"Unknown direction: {direction}") - # Capture reference features from the leading edge + # Capture and binarize reference edge frame = self.camera.capture_frame() reference_region = self.get_edge_region(frame, from_edge) - ref_kp, ref_desc = self.detect_features(reference_region) + reference_binary = self.prepare_edge_image(reference_region) - if ref_desc is None or len(ref_desc) < self.config.min_features: - self.log(f"Warning: Only {len(ref_kp) if ref_kp else 0} features found on {from_edge} edge") + self.log(f"Captured reference from {from_edge} edge, moving {direction.value}") + self.log(f"Interpolation: {'ON' if self.config.use_interpolation else 'OFF'}, " + f"Peak detection: {'ON' if self.config.use_peak_detection else 'OFF'}") - # Initialize tracking visualization - with self._viz_lock: - self.feature_viz.is_tracking = True - self.feature_viz.tracking_direction = direction.value - self.feature_viz.reference_edge = from_edge - self.feature_viz.target_edge = to_edge - self.feature_viz.reference_features = list(ref_kp) if ref_kp else [] - self.feature_viz.shift_detected = False - self.feature_viz.movement_progress = 0.0 + # Initialize tracking state + prev_retreat_binary = None + similarity_history = [] + check_count = 0 + + # Update state for visualization + self._update_comparison_state( + is_tracking=True, + direction=direction.value, + ref_edge=from_edge, + target_edge=to_edge, + ref_image=reference_binary, + similarity_history=similarity_history + ) # Start movement - move_cmd() + self.log(f"Starting movement on {axis} axis") + self.motion.start_movement(axis) start_time = time.time() - check_count = 0 try: while self.running and not self.paused: @@ -567,48 +608,102 @@ class Scanner: self.log("Movement timeout - likely at boundary") return False - # Update progress estimate - with self._viz_lock: - self.feature_viz.movement_progress = min(1.0, elapsed / (self.config.max_move_time * 0.5)) - - # Capture current frame + # Capture current retreating edge current_frame = self.camera.capture_frame() - - # Update edge features for visualization - self._update_edge_features(current_frame) - - # Get target edge features target_region = self.get_edge_region(current_frame, to_edge) - target_kp, target_desc = self.detect_features(target_region) + curr_retreat_binary = self.prepare_edge_image(target_region) - # Match with reference - good_matches = self.match_features(ref_desc, target_desc) if ref_desc is not None else [] + # Calculate similarity (with or without interpolation) + if self.config.use_interpolation and prev_retreat_binary is not None: + similarity, best_offset, interpolated = self._interpolated_match( + prev_retreat_binary, + curr_retreat_binary, + reference_binary, + movement_axis + ) + else: + similarity = self.compare_edges(reference_binary, curr_retreat_binary) + best_offset = 1.0 + interpolated = curr_retreat_binary - # Update visualization - self._update_tracking_visualization( - ref_kp, from_edge, to_edge, current_frame, good_matches, direction.value + similarity_history.append(similarity) + + # Update state for visualization + self._update_comparison_state( + is_tracking=True, + direction=direction.value, + ref_edge=from_edge, + target_edge=to_edge, + ref_image=reference_binary, + current_image=curr_retreat_binary, + interpolated_image=interpolated, + similarity=similarity, + best_offset=best_offset, + similarity_history=similarity_history ) - # Check if features have shifted (tile boundary crossed) - if ref_desc is not None and len(good_matches) >= self.config.min_good_matches: - self.log(f"Tile shift detected via feature matching ({len(good_matches)} matches)") - with self._viz_lock: - self.feature_viz.shift_detected = True - self.feature_viz.movement_progress = 1.0 + # Check for match using peak detection or simple threshold + match_found = False + + if self.config.use_peak_detection: + # Wait for similarity to peak and start declining + peak_detected, peak_idx = self._detect_peak(similarity_history) + if peak_detected: + peak_sim = similarity_history[peak_idx] + self.log(f"Peak detected! Similarity: {peak_sim:.3f} at frame {peak_idx}") + self._update_comparison_state( + is_tracking=True, + direction=direction.value, + ref_edge=from_edge, + target_edge=to_edge, + ref_image=reference_binary, + current_image=curr_retreat_binary, + interpolated_image=interpolated, + similarity=peak_sim, + shift_detected=True, + peak_detected=True, + best_offset=best_offset, + similarity_history=similarity_history + ) + match_found = True + else: + # Simple threshold crossing + if similarity >= self.config.similarity_threshold: + self.log(f"Threshold crossed! Similarity: {similarity:.3f}") + self._update_comparison_state( + is_tracking=True, + direction=direction.value, + ref_edge=from_edge, + target_edge=to_edge, + ref_image=reference_binary, + current_image=curr_retreat_binary, + interpolated_image=interpolated, + similarity=similarity, + shift_detected=True, + best_offset=best_offset, + similarity_history=similarity_history + ) + match_found = True + + if match_found: return True # Log progress periodically if check_count % 10 == 0: - self.log(f" Checking... {len(good_matches)} matches so far") + max_hist = max(similarity_history) if similarity_history else 0 + self.log(f" Frame {check_count}: sim={similarity:.3f}, " + f"offset={best_offset:.2f}, max_seen={max_hist:.3f}") + + # Store for next iteration + prev_retreat_binary = curr_retreat_binary.copy() finally: - # Stop movement self.motion.stop_axis(axis) - time.sleep(self.config.settle_time) + time.sleep(1) - # Clear tracking visualization after a delay - # (keep it visible briefly so user can see final state) - threading.Timer(1.0, self._clear_tracking_visualization).start() + # Clear state after a delay, but not if paused + if not self.paused: + threading.Timer(1.5, self._clear_comparison_state).start() return False @@ -619,11 +714,18 @@ class Scanner: def capture_tile(self) -> Tile: """Capture a single tile at current position""" frame = self.camera.capture_frame() - tile_image = self.extract_tile(frame) + # tile_image = self.extract_tile(frame) + frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) + tile_image = frame.copy() # Calculate focus score for metadata - from vision import calculate_focus_score_sobel - focus_score = calculate_focus_score_sobel(frame) + try: + from vision import calculate_focus_score_sobel + focus_score = calculate_focus_score_sobel(frame) + except ImportError: + # Fallback: use Laplacian variance + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if len(frame.shape) == 3 else frame + focus_score = cv2.Laplacian(gray, cv2.CV_64F).var() tile = Tile( image=tile_image, @@ -646,40 +748,13 @@ class Scanner: return tile def scan_row(self, direction: ScanDirection) -> List[Tile]: - """ - Scan a complete row in the given direction. - - Args: - direction: ScanDirection.LEFT or ScanDirection.RIGHT - - Returns: - List of tiles captured in this row - """ + """Scan a complete row in the given direction.""" row_tiles = [] tiles_in_row = 0 self.log(f"Starting row {self.current_row} scan ({direction.value})") while self.running and not self.paused: - # Safety limit - if tiles_in_row >= self.config.max_tiles_per_row: - self.log(f"Max tiles per row reached ({self.config.max_tiles_per_row})") - break - - # Autofocus check - # if (self.autofocus and - # self.config.autofocus_every_n_tiles > 0 and - # self.tiles_captured > 0 and - # self.tiles_captured % self.config.autofocus_every_n_tiles == 0): - # self.log("Running autofocus...") - # self.autofocus.start() - # while self.autofocus.is_running(): - # time.sleep(0.1) - - # Update edge features before capture - frame = self.camera.capture_frame() - self._update_edge_features(frame) - # Capture tile tile = self.capture_tile() row_tiles.append(tile) @@ -689,11 +764,15 @@ class Scanner: if self.on_progress: self.on_progress(self.tiles_captured, -1) + # Safety limit + if tiles_in_row >= self.config.max_tiles_per_row: + self.log(f"Max tiles per row reached ({self.config.max_tiles_per_row})") + break + # Move to next tile position tile_shifted = self.move_until_tile_shifted(direction) if not tile_shifted: - # Timeout or boundary = end of row self.log(f"Row {self.current_row} complete, {len(row_tiles)} tiles") break @@ -706,32 +785,17 @@ class Scanner: return row_tiles def step_to_next_row(self) -> bool: - """ - Move down to the next row. - - Returns: - True if successful, False if bottom edge reached - """ - # Safety limit + """Move down to the next row.""" if self.current_row >= self.config.max_rows: self.log(f"Max rows reached ({self.config.max_rows})") return False self.log("Stepping to next row...") - # Move down tile_shifted = self.move_until_tile_shifted(ScanDirection.DOWN) if tile_shifted: self.current_row += 1 - - # Autofocus at start of each row if configured - # if self.autofocus and self.config.autofocus_every_row: - # self.log("Row start autofocus...") - # self.autofocus.start() - # while self.autofocus.is_running(): - # time.sleep(0.1) - return True else: self.log("Bottom edge reached") @@ -746,7 +810,6 @@ class Scanner: self.current_col = 0 self.tiles_captured = 0 - # Alternate scan direction each row (serpentine) scan_direction = self.config.start_direction try: @@ -761,14 +824,14 @@ class Scanner: # Scan current row self.scan_row(scan_direction) - if not self.running: + if not self.running or self.paused: break # Try to move to next row if not self.step_to_next_row(): break - # Reverse direction for next row (serpentine) + # Reverse direction (serpentine) if scan_direction == ScanDirection.RIGHT: scan_direction = ScanDirection.LEFT else: @@ -781,7 +844,7 @@ class Scanner: raise finally: self.running = False - self._clear_tracking_visualization() + self._clear_comparison_state() # ========================================================================= # Control Methods @@ -800,8 +863,9 @@ class Scanner: def stop(self): """Stop scanning""" self.running = False + self.paused = False self.motion.stop_all() - self._clear_tracking_visualization() + self._clear_comparison_state() self.log("Scan stopped") def pause(self): @@ -815,40 +879,117 @@ class Scanner: self.paused = False self.log("Scan resumed") + # ========================================================================= + # Test Methods + # ========================================================================= + + def test_edge_comparison(self) -> dict: + """ + Test edge detection and comparison on current frame. + Returns dict with results for debugging. + """ + frame = self.camera.capture_frame() + + results = {} + for edge in ['left', 'right', 'top', 'bottom']: + region = self.get_edge_region(frame, edge) + binary = self.prepare_edge_image(region) + white_pixels = np.sum(binary == 255) + total_pixels = binary.size + results[edge] = { + 'shape': binary.shape, + 'white_ratio': white_pixels / total_pixels, + 'binary': binary + } + + # Test comparison for X movement (raw top ↔ bottom) + results['x_axis_sim'] = self.compare_edges( + results['top']['binary'], + results['bottom']['binary'] + ) + + # Test comparison for Y movement (raw left ↔ right) + results['y_axis_sim'] = self.compare_edges( + results['left']['binary'], + results['right']['binary'] + ) + + return results + + def test_interpolation(self, num_frames: int = 5) -> dict: + """ + Test interpolation matching by capturing multiple frames. + Useful for debugging and tuning interpolation parameters. + """ + self.log(f"Testing interpolation with {num_frames} frames...") + + frames = [] + for i in range(num_frames): + frame = self.camera.capture_frame() + frames.append(frame) + self.log(f" Captured frame {i+1}/{num_frames}") + time.sleep(0.5) + + # Use first frame's leading edge as reference + reference_region = self.get_edge_region(frames[0], 'top') + reference_binary = self.prepare_edge_image(reference_region) + + results = { + 'reference_shape': reference_binary.shape, + 'frames': [] + } + + prev_binary = None + for i, frame in enumerate(frames): + retreat_region = self.get_edge_region(frame, 'bottom') + curr_binary = self.prepare_edge_image(retreat_region) + + # Direct comparison + direct_sim = self.compare_edges(reference_binary, curr_binary) + + # Interpolated comparison (if we have previous frame) + if prev_binary is not None: + interp_sim, best_offset, _ = self._interpolated_match( + prev_binary, curr_binary, reference_binary, 'horizontal' + ) + else: + interp_sim = direct_sim + best_offset = 1.0 + + results['frames'].append({ + 'frame_idx': i, + 'direct_similarity': direct_sim, + 'interpolated_similarity': interp_sim, + 'best_offset': best_offset + }) + + self.log(f" Frame {i}: direct={direct_sim:.3f}, interp={interp_sim:.3f}, offset={best_offset:.2f}") + prev_binary = curr_binary.copy() + + return results + # ========================================================================= # Mosaic Building # ========================================================================= def build_mosaic(self, scale: float = 0.25) -> np.ndarray: - """ - Build a mosaic image from captured tiles. - - Args: - scale: Scale factor for output (0.25 = 25% size) - - Returns: - Mosaic image - """ + """Build a mosaic image from captured tiles.""" if not self.tiles: return None - # Find grid dimensions max_row = max(t.row for t in self.tiles) max_col = max(t.col for t in self.tiles) min_col = min(t.col for t in self.tiles) - # Get tile dimensions (from first tile) tile_h, tile_w = self.tiles[0].image.shape[:2] scaled_h = int(tile_h * scale) scaled_w = int(tile_w * scale) - # Calculate mosaic size num_rows = max_row + 1 num_cols = max_col - min_col + 1 mosaic = np.zeros((num_rows * scaled_h, num_cols * scaled_w, 3), dtype=np.uint8) - # Place tiles for tile in self.tiles: row = tile.row col = tile.col - min_col @@ -862,15 +1003,7 @@ class Scanner: return mosaic def get_mosaic_preview(self, max_size: int = 800) -> np.ndarray: - """ - Get a preview-sized mosaic. - - Args: - max_size: Maximum dimension of output - - Returns: - Preview mosaic image - """ + """Get a preview-sized mosaic.""" mosaic = self.build_mosaic(scale=1.0) if mosaic is None: return None