Stitching #1
4 changed files with 1546 additions and 282 deletions
318
src/autofocus.py
318
src/autofocus.py
|
|
@ -4,16 +4,31 @@ from vision import calculate_focus_score_sobel
|
|||
|
||||
|
||||
class AutofocusController:
|
||||
"""Manages autofocus operations"""
|
||||
"""
|
||||
Manages autofocus operations using visual feedback.
|
||||
|
||||
Key design principle: Never trust step counts due to gear slippage.
|
||||
Use focus score as the authoritative position feedback.
|
||||
"""
|
||||
|
||||
# Default timing settings
|
||||
DEFAULT_SETTINGS = {
|
||||
# Sweep settings
|
||||
'sweep_move_time': 0.5,
|
||||
'sweep_settle_time': 0.05,
|
||||
'sweep_settle_time': 0.01,
|
||||
'sweep_samples': 10,
|
||||
'sweep_steps': 30,
|
||||
'fine_move_time': 0.15,
|
||||
'fine_settle_time': 0.1,
|
||||
|
||||
# Navigation settings (fast approach to peak)
|
||||
'nav_move_time': 0.3, # Quick bursts
|
||||
'nav_check_samples': 3, # Fewer samples for speed
|
||||
'nav_peak_threshold': 0.80, # Consider "near peak" at 80% of target
|
||||
'nav_decline_threshold': 1.00, # Detect peak passage at 8% drop
|
||||
'nav_max_moves': 100, # Safety limit
|
||||
|
||||
# Fine tuning settings
|
||||
'fine_move_time': 0.30,
|
||||
'fine_settle_time': 0.01,
|
||||
'fine_samples': 10,
|
||||
'fine_max_no_improvement': 5,
|
||||
}
|
||||
|
|
@ -74,6 +89,11 @@ class AutofocusController:
|
|||
|
||||
return sum(scores) / len(scores) if scores else 0
|
||||
|
||||
def get_quick_focus(self, samples=3):
|
||||
"""Quick focus reading with fewer samples for navigation"""
|
||||
scores = [self.get_focus_score() for _ in range(samples)]
|
||||
return sum(scores) / len(scores)
|
||||
|
||||
# === Autofocus Control ===
|
||||
|
||||
def start(self, speed_range=(1, 5), coarse=False, fine=False):
|
||||
|
|
@ -111,31 +131,33 @@ class AutofocusController:
|
|||
|
||||
def _autofocus_routine(self, speed_range):
|
||||
"""
|
||||
Autofocus using sweep search + hill climbing.
|
||||
Phase 1: Sweep across range to find approximate peak
|
||||
Phase 2: Fine tune from best position found
|
||||
Autofocus using sweep search + visual navigation + hill climbing.
|
||||
|
||||
Phase 1: Sweep to map focus curve and find approximate peak
|
||||
Phase 2: Navigate to peak using visual feedback
|
||||
Phase 3: Fine tune with hill climbing
|
||||
"""
|
||||
self.log(f"Starting autofocus (speed range: {speed_range})")
|
||||
|
||||
min_speed_idx, max_speed_idx = speed_range
|
||||
s = self.settings # Shorthand
|
||||
s = self.settings
|
||||
|
||||
try:
|
||||
# Phase 1: Sweep search
|
||||
best_step, best_score = self._sweep_search(
|
||||
# Phase 1: Sweep search - map the focus curve
|
||||
best_score, peak_direction = self._sweep_search(
|
||||
min_speed_idx, max_speed_idx, s
|
||||
)
|
||||
|
||||
if not self.running:
|
||||
return
|
||||
|
||||
# Move to best position found
|
||||
self._move_to_best_position(best_step, s)
|
||||
# Phase 2: Navigate to peak
|
||||
self._navigate_to_peak(best_score, peak_direction, s)
|
||||
|
||||
if not self.running:
|
||||
return
|
||||
|
||||
# Phase 2: Fine tuning
|
||||
# Phase 3: Fine tuning with hill climbing
|
||||
final_score = self._fine_tune(min_speed_idx, s)
|
||||
|
||||
self.log(f"Autofocus complete. Final: {final_score:.1f}")
|
||||
|
|
@ -146,30 +168,35 @@ class AutofocusController:
|
|||
self.running = False
|
||||
|
||||
def _sweep_search(self, min_speed_idx, max_speed_idx, s):
|
||||
"""Phase 1: Sweep to find approximate best position"""
|
||||
"""
|
||||
Phase 1: Sweep to map focus curve.
|
||||
|
||||
Returns:
|
||||
best_score: The highest focus score observed
|
||||
peak_direction: 'up' or 'down' - which direction has the peak
|
||||
"""
|
||||
self.log("Phase 1: Sweep search")
|
||||
|
||||
# Use medium speed for sweep
|
||||
sweep_speed = (min_speed_idx + max_speed_idx) // 2
|
||||
self.motion.set_speed(sweep_speed)
|
||||
time.sleep(0.1)
|
||||
|
||||
# Record starting position
|
||||
# Record starting position score
|
||||
time.sleep(s['sweep_settle_time'])
|
||||
start_score = self.get_averaged_focus(samples=s['sweep_samples'])
|
||||
self.update_focus_display(start_score)
|
||||
|
||||
sweep_data = [(0, start_score)]
|
||||
self.log(f"Start position: {start_score:.1f}")
|
||||
|
||||
if not self.running:
|
||||
return 0, start_score
|
||||
sweep_data = {'start': start_score, 'down': [], 'up': []}
|
||||
|
||||
# Sweep DOWN first (away from slide to avoid contact)
|
||||
if not self.running:
|
||||
return start_score, 'up'
|
||||
|
||||
# Sweep DOWN first (away from slide)
|
||||
self.log(f"Sweeping down {s['sweep_steps']} steps...")
|
||||
for i in range(1, s['sweep_steps'] + 1):
|
||||
if not self.running:
|
||||
return 0, start_score
|
||||
return start_score, 'up'
|
||||
|
||||
self.motion.move_z_down()
|
||||
time.sleep(s['sweep_move_time'])
|
||||
|
|
@ -177,35 +204,24 @@ class AutofocusController:
|
|||
time.sleep(s['sweep_settle_time'])
|
||||
|
||||
score = self.get_averaged_focus(samples=s['sweep_samples'])
|
||||
sweep_data.append((-i, score))
|
||||
sweep_data['down'].append(score)
|
||||
self.update_focus_display(score)
|
||||
|
||||
if i % 5 == 0:
|
||||
self.log(f" Step -{i}: {score:.1f}")
|
||||
self.log(f" Down {i}: {score:.1f}")
|
||||
|
||||
# Remember score at bottom
|
||||
bottom_score = sweep_data['down'][-1] if sweep_data['down'] else start_score
|
||||
|
||||
if not self.running:
|
||||
return 0, start_score
|
||||
return start_score, 'up'
|
||||
|
||||
# Return to start
|
||||
self.log("Returning to start...")
|
||||
for _ in range(s['sweep_steps']):
|
||||
# Now sweep UP past start to the other side
|
||||
# We'll go UP for 2x sweep_steps (to cover both halves)
|
||||
self.log(f"Sweeping up {s['sweep_steps'] * 2} steps...")
|
||||
for i in range(1, s['sweep_steps'] * 2 + 1):
|
||||
if not self.running:
|
||||
return 0, start_score
|
||||
self.motion.move_z_up()
|
||||
time.sleep(s['sweep_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(0.1)
|
||||
|
||||
time.sleep(s['sweep_settle_time'])
|
||||
|
||||
if not self.running:
|
||||
return 0, start_score
|
||||
|
||||
# Sweep UP
|
||||
self.log(f"Sweeping up {s['sweep_steps']} steps...")
|
||||
for i in range(1, s['sweep_steps'] + 1):
|
||||
if not self.running:
|
||||
return 0, start_score
|
||||
return start_score, 'down'
|
||||
|
||||
self.motion.move_z_up()
|
||||
time.sleep(s['sweep_move_time'])
|
||||
|
|
@ -213,73 +229,145 @@ class AutofocusController:
|
|||
time.sleep(s['sweep_settle_time'])
|
||||
|
||||
score = self.get_averaged_focus(samples=s['sweep_samples'])
|
||||
sweep_data.append((i, score))
|
||||
sweep_data['up'].append(score)
|
||||
self.update_focus_display(score)
|
||||
|
||||
if i % 5 == 0:
|
||||
self.log(f" Step +{i}: {score:.1f}")
|
||||
self.log(f" Up {i}: {score:.1f}")
|
||||
|
||||
# Find best position
|
||||
best_step, best_score = max(sweep_data, key=lambda x: x[1])
|
||||
self.log(f"Best found at step {best_step}: {best_score:.1f}")
|
||||
# Analyze sweep data to find peak
|
||||
all_scores = sweep_data['down'] + [start_score] + sweep_data['up']
|
||||
best_score = max(all_scores)
|
||||
|
||||
# Log sweep curve
|
||||
sorted_data = sorted(sweep_data, key=lambda x: x[0])
|
||||
curve_str = " ".join([f"{d[1]:.0f}" for d in sorted_data])
|
||||
self.log(f"Sweep curve: {curve_str}")
|
||||
down_best = max(sweep_data['down']) if sweep_data['down'] else 0
|
||||
up_best = max(sweep_data['up']) if sweep_data['up'] else 0
|
||||
|
||||
return best_step, best_score
|
||||
# Determine which direction has the peak
|
||||
# We're currently at the TOP of the sweep (after going up)
|
||||
# So to reach the peak, we need to go DOWN if peak was in down region or middle
|
||||
if down_best > up_best and down_best > start_score:
|
||||
peak_direction = 'down'
|
||||
self.log(f"Peak in DOWN region: {down_best:.1f}")
|
||||
elif up_best >= down_best and up_best > start_score:
|
||||
# Peak is in up region, but we need to check where in the up region
|
||||
# If peak is in first half of up (returning toward start), go down
|
||||
# If peak is in second half of up (past start), we might already be close
|
||||
up_scores = sweep_data['up']
|
||||
mid_point = len(up_scores) // 2
|
||||
first_half_best = max(up_scores[:mid_point]) if mid_point > 0 else 0
|
||||
second_half_best = max(up_scores[mid_point:]) if mid_point < len(up_scores) else 0
|
||||
|
||||
def _move_to_best_position(self, best_step, s):
|
||||
"""Move from current position (+SWEEP_STEPS) to best_step"""
|
||||
steps_to_move = s['sweep_steps'] - best_step
|
||||
|
||||
if steps_to_move > 0:
|
||||
self.log(f"Moving down {steps_to_move} steps to best position")
|
||||
move_func = self.motion.move_z_down
|
||||
if second_half_best >= first_half_best:
|
||||
# Peak is near current position (top), go down slightly
|
||||
peak_direction = 'down'
|
||||
else:
|
||||
# Peak is further down
|
||||
peak_direction = 'down'
|
||||
self.log(f"Peak in UP region: {up_best:.1f}")
|
||||
else:
|
||||
self.log(f"Moving up {abs(steps_to_move)} steps to best position")
|
||||
move_func = self.motion.move_z_up
|
||||
steps_to_move = abs(steps_to_move)
|
||||
peak_direction = 'down'
|
||||
self.log(f"Peak near start: {start_score:.1f}")
|
||||
|
||||
for _ in range(steps_to_move):
|
||||
# Log sweep curve for debugging
|
||||
self.log(f"Best score found: {best_score:.1f}")
|
||||
self.log(f"Navigate direction: {peak_direction}")
|
||||
|
||||
return best_score, peak_direction
|
||||
|
||||
def _navigate_to_peak(self, target_score, direction, s):
|
||||
"""
|
||||
Phase 2: Navigate toward peak using camera
|
||||
"""
|
||||
self.log(f"Phase 2: Navigate to peak (target: {target_score:.1f}, dir: {direction})")
|
||||
|
||||
move_func = self.motion.move_z_down if direction == 'down' else self.motion.move_z_up
|
||||
|
||||
# Use medium-fast speed for approach
|
||||
self.motion.set_speed(4)
|
||||
time.sleep(0.1)
|
||||
|
||||
peak_threshold = target_score * s['nav_peak_threshold']
|
||||
decline_threshold = s['nav_decline_threshold']
|
||||
self.log(f"(peak_threshold: {peak_threshold:.1f}, decline_threshold: {decline_threshold})")
|
||||
|
||||
prev_score = self.get_quick_focus(s['nav_check_samples'])
|
||||
max_observed = prev_score
|
||||
moves_in_peak_region = 0
|
||||
declining_streak = 0
|
||||
|
||||
for move_count in range(s['nav_max_moves']):
|
||||
if not self.running:
|
||||
return
|
||||
move_func()
|
||||
time.sleep(s['sweep_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(0.1)
|
||||
|
||||
time.sleep(s['sweep_settle_time'])
|
||||
current_score = self.get_averaged_focus(samples=s['sweep_samples'])
|
||||
self.log(f"At best position: {current_score:.1f}")
|
||||
# Quick movement burst
|
||||
move_func()
|
||||
time.sleep(s['nav_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(0.05) # Brief settle
|
||||
|
||||
# Check focus
|
||||
current_score = self.get_quick_focus(s['nav_check_samples'])
|
||||
self.log(f" Current: {current_score:.1f} (max was {max_observed:.1f}) (Declining {declining_streak:.1f}) (Decline Thres {max_observed * decline_threshold:.1f})")
|
||||
self.log(f" Current: {current_score:.1f} >= peak thresh({peak_threshold:.1f}) ")
|
||||
self.log(f" Current: {current_score:.1f} <= max_observed * decline_threshold:({max_observed * decline_threshold:.1f}) ")
|
||||
self.update_focus_display(current_score)
|
||||
|
||||
# Track maximum observed
|
||||
if current_score > max_observed:
|
||||
max_observed = current_score
|
||||
declining_streak = 0
|
||||
self.log("Setting Max")
|
||||
elif max_observed > 0.8 * target_score:
|
||||
declining_streak += 1
|
||||
|
||||
if declining_streak >=2:
|
||||
self.log(f" Passed peak after {move_count} moves, stopping")
|
||||
break
|
||||
|
||||
# # Are we in the peak region?
|
||||
# if current_score >= peak_threshold:
|
||||
# moves_in_peak_region += 1
|
||||
|
||||
# # Check if we're past the peak (score declining from max)
|
||||
# if current_score < max_observed * decline_threshold:
|
||||
# declining_streak += 1
|
||||
# self.log(f" Declining: {current_score:.1f} (max was {max_observed:.1f})")
|
||||
|
||||
# if declining_streak >= 1:
|
||||
# self.log(f" Passed peak after {move_count} moves, stopping")
|
||||
# break
|
||||
# else:
|
||||
# declining_streak = 0
|
||||
# if moves_in_peak_region % 3 == 0:
|
||||
# self.log(f" In peak region: {current_score:.1f}")
|
||||
|
||||
prev_score = current_score
|
||||
|
||||
final_score = self.get_averaged_focus(samples=s['sweep_samples'])
|
||||
self.log(f"Navigation complete at: {final_score:.1f}")
|
||||
|
||||
def _fine_tune(self, min_speed_idx, s):
|
||||
"""Phase 2: Fine hill-climbing from best position"""
|
||||
self.log("Phase 2: Fine tuning")
|
||||
"""
|
||||
Phase 3: Fine hill-climbing from current position.
|
||||
|
||||
Already near the peak from navigation, now do precise adjustments.
|
||||
"""
|
||||
self.log("Phase 3: Fine tuning")
|
||||
|
||||
self.motion.set_speed(min_speed_idx)
|
||||
time.sleep(0.1)
|
||||
|
||||
best_score = self.get_averaged_focus(samples=s['fine_samples'])
|
||||
self.log(f"Starting fine tune at: {best_score:.1f}")
|
||||
|
||||
# Determine fine direction by testing both
|
||||
# Try both directions to find improvement
|
||||
fine_direction = self._determine_fine_direction(best_score, s)
|
||||
|
||||
if not self.running:
|
||||
return best_score
|
||||
|
||||
# Fine search
|
||||
best_score, best_position_offset = self._fine_search(
|
||||
fine_direction, best_score, s
|
||||
)
|
||||
|
||||
if not self.running:
|
||||
return best_score
|
||||
|
||||
# Return to best position
|
||||
if best_position_offset > 0:
|
||||
self._return_to_best(fine_direction, best_position_offset, s)
|
||||
# Search in best direction
|
||||
best_score = self._fine_search(fine_direction, best_score, s)
|
||||
|
||||
# Final reading
|
||||
time.sleep(s['fine_settle_time'])
|
||||
|
|
@ -288,9 +376,9 @@ class AutofocusController:
|
|||
|
||||
return final_score
|
||||
|
||||
def _determine_fine_direction(self, best_score, s):
|
||||
def _determine_fine_direction(self, current_score, s):
|
||||
"""Test both directions to find which improves focus"""
|
||||
# Try DOWN first
|
||||
# Try DOWN
|
||||
self.motion.move_z_down()
|
||||
time.sleep(s['fine_move_time'])
|
||||
self.motion.stop_z()
|
||||
|
|
@ -298,11 +386,11 @@ class AutofocusController:
|
|||
|
||||
down_score = self.get_averaged_focus(samples=s['fine_samples'])
|
||||
|
||||
if down_score > best_score:
|
||||
self.log(f"Fine direction: DOWN ({down_score:.1f})")
|
||||
if down_score > current_score * 1.02: # 2% improvement threshold
|
||||
self.log(f"Fine direction: DOWN ({down_score:.1f} > {current_score:.1f})")
|
||||
return 'down'
|
||||
|
||||
# Go back and try UP
|
||||
# Try UP (go back and past)
|
||||
self.motion.move_z_up()
|
||||
time.sleep(s['fine_move_time'])
|
||||
self.motion.stop_z()
|
||||
|
|
@ -315,25 +403,29 @@ class AutofocusController:
|
|||
|
||||
up_score = self.get_averaged_focus(samples=s['fine_samples'])
|
||||
|
||||
if up_score > best_score:
|
||||
self.log(f"Fine direction: UP ({up_score:.1f})")
|
||||
if up_score > current_score * 1.02:
|
||||
self.log(f"Fine direction: UP ({up_score:.1f} > {current_score:.1f})")
|
||||
return 'up'
|
||||
|
||||
# Already at peak
|
||||
self.log("Already at peak, minor adjustment only")
|
||||
# Already at peak, return to center
|
||||
self.log("Already at peak")
|
||||
self.motion.move_z_down()
|
||||
time.sleep(s['fine_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(s['fine_settle_time'])
|
||||
|
||||
return 'up'
|
||||
return 'up' # Default direction for minor adjustments
|
||||
|
||||
def _fine_search(self, direction, best_score, s):
|
||||
"""Search in given direction until no improvement"""
|
||||
"""
|
||||
Search in given direction until no improvement.
|
||||
Uses visual feedback to track best position.
|
||||
"""
|
||||
move_func = self.motion.move_z_up if direction == 'up' else self.motion.move_z_down
|
||||
reverse_func = self.motion.move_z_down if direction == 'up' else self.motion.move_z_up
|
||||
|
||||
no_improvement_count = 0
|
||||
best_position_offset = 0
|
||||
moves_since_best = 0
|
||||
|
||||
while self.running and no_improvement_count < s['fine_max_no_improvement']:
|
||||
move_func()
|
||||
|
|
@ -347,21 +439,25 @@ class AutofocusController:
|
|||
if current_score > best_score:
|
||||
best_score = current_score
|
||||
no_improvement_count = 0
|
||||
best_position_offset = 0
|
||||
self.log(f"Fine better: {current_score:.1f}")
|
||||
moves_since_best = 0
|
||||
self.log(f"Fine improved: {current_score:.1f}")
|
||||
else:
|
||||
no_improvement_count += 1
|
||||
best_position_offset += 1
|
||||
moves_since_best += 1
|
||||
|
||||
return best_score, best_position_offset
|
||||
# Go back to best position using visual feedback
|
||||
if moves_since_best > 0:
|
||||
self.log(f"Reversing {moves_since_best} moves toward best")
|
||||
for _ in range(moves_since_best):
|
||||
reverse_func()
|
||||
time.sleep(s['fine_move_time'])
|
||||
self.motion.stop_z()
|
||||
|
||||
def _return_to_best(self, direction, steps, s):
|
||||
"""Return to best position by reversing direction"""
|
||||
self.log(f"Returning {steps} steps")
|
||||
reverse_func = self.motion.move_z_down if direction == 'up' else self.motion.move_z_up
|
||||
# Check if we're back at peak
|
||||
check_score = self.get_quick_focus(3)
|
||||
if check_score >= best_score * 0.98:
|
||||
break
|
||||
|
||||
for _ in range(steps):
|
||||
reverse_func()
|
||||
time.sleep(s['fine_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(0.1)
|
||||
time.sleep(0.05)
|
||||
|
||||
return best_score
|
||||
605
src/gui.py
605
src/gui.py
|
|
@ -1,12 +1,20 @@
|
|||
"""
|
||||
AutoScope GUI - Vertical Monitor Layout with Feature Visualization
|
||||
|
||||
Enhanced with feature overlay debugging for scanner development.
|
||||
"""
|
||||
|
||||
import tkinter as tk
|
||||
from tkinter import ttk, scrolledtext
|
||||
from PIL import Image, ImageTk
|
||||
import cv2
|
||||
import numpy as np
|
||||
import threading
|
||||
import queue
|
||||
|
||||
from motion_controller import MotionController
|
||||
from autofocus import AutofocusController
|
||||
from scanner import Scanner, ScanConfig, ScanDirection, Tile
|
||||
|
||||
|
||||
class AppGUI:
|
||||
|
|
@ -25,211 +33,456 @@ class AppGUI:
|
|||
on_focus_update=self._update_focus_display_threadsafe
|
||||
)
|
||||
|
||||
# Queue for thread-safe serial log updates
|
||||
# Scanner
|
||||
self.scanner = None
|
||||
self.scan_config = ScanConfig()
|
||||
|
||||
# Queues for thread-safe updates
|
||||
self.serial_queue = queue.Queue()
|
||||
self.tile_queue = queue.Queue()
|
||||
|
||||
# Mosaic window
|
||||
self.mosaic_window = None
|
||||
|
||||
# Build the window
|
||||
self.root = tk.Tk()
|
||||
self.root.title("AutoScope")
|
||||
self.root.geometry("1080x1080")
|
||||
self.root.minsize(1080, 1080)
|
||||
self.root.geometry("720x1280")
|
||||
self.root.minsize(640, 900)
|
||||
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
|
||||
|
||||
self._build_ui()
|
||||
self._init_scanner()
|
||||
|
||||
# Start serial reader thread
|
||||
self.serial_thread = threading.Thread(target=self._serial_reader, daemon=True)
|
||||
self.serial_thread.start()
|
||||
|
||||
def _init_scanner(self):
|
||||
"""Initialize scanner with current config"""
|
||||
self.scanner = Scanner(
|
||||
camera=self.camera,
|
||||
motion_controller=self.motion,
|
||||
autofocus_controller=self.autofocus,
|
||||
config=self.scan_config,
|
||||
on_tile_captured=self._on_tile_captured,
|
||||
on_log=self.log_message,
|
||||
on_progress=self._on_scan_progress
|
||||
)
|
||||
|
||||
def _on_command_sent(self, cmd):
|
||||
"""Callback when motion controller sends a command"""
|
||||
self.log_message(f"> {cmd}")
|
||||
|
||||
def _update_focus_display_threadsafe(self, score):
|
||||
"""Thread-safe focus display update"""
|
||||
self.root.after(0, lambda: self.focus_score_label.config(text=f"Focus: {score:.1f}"))
|
||||
|
||||
# === UI Building ===
|
||||
def _on_tile_captured(self, tile: Tile):
|
||||
self.tile_queue.put(tile)
|
||||
|
||||
def _on_scan_progress(self, current: int, total: int):
|
||||
self.root.after(0, lambda: self._update_progress(current, total))
|
||||
|
||||
# =========================================================================
|
||||
# UI Building - Vertical Layout
|
||||
# =========================================================================
|
||||
|
||||
def _build_ui(self):
|
||||
main_frame = ttk.Frame(self.root)
|
||||
main_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
||||
|
||||
# Left: Camera view
|
||||
self.camera_label = ttk.Label(main_frame)
|
||||
self.camera_label.pack(side=tk.LEFT, padx=(0, 5))
|
||||
# === TOP: Camera View (large, full width) ===
|
||||
camera_frame = ttk.LabelFrame(main_frame, text="Camera")
|
||||
camera_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 5))
|
||||
|
||||
# Right: Control panel
|
||||
right_frame = ttk.Frame(main_frame, width=320)
|
||||
right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)
|
||||
right_frame.pack_propagate(False)
|
||||
self.camera_label = ttk.Label(camera_frame)
|
||||
self.camera_label.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
||||
|
||||
self._build_emergency_stop(right_frame)
|
||||
self._build_speed_controls(right_frame)
|
||||
self._build_fine_speed_controls(right_frame)
|
||||
self._build_movement_controls(right_frame)
|
||||
self._build_autofocus_controls(right_frame)
|
||||
self._build_status_controls(right_frame)
|
||||
self._build_mode_controls(right_frame)
|
||||
self._build_serial_log(right_frame)
|
||||
self._build_command_entry(right_frame)
|
||||
# Camera options row
|
||||
cam_opts = ttk.Frame(camera_frame)
|
||||
cam_opts.pack(fill=tk.X, padx=5, pady=(0, 5))
|
||||
|
||||
def _build_emergency_stop(self, parent):
|
||||
frame = ttk.Frame(parent)
|
||||
frame.pack(fill=tk.X, pady=(0, 10))
|
||||
self.show_tile_overlay_var = tk.BooleanVar(value=True)
|
||||
ttk.Checkbutton(cam_opts, text="Tile bounds",
|
||||
variable=self.show_tile_overlay_var).pack(side=tk.LEFT)
|
||||
|
||||
self.show_edge_regions_var = tk.BooleanVar(value=False)
|
||||
ttk.Checkbutton(cam_opts, text="Track regions",
|
||||
variable=self.show_edge_regions_var).pack(side=tk.LEFT, padx=(10, 0))
|
||||
|
||||
# NEW: Feature overlay checkbox
|
||||
self.show_features_var = tk.BooleanVar(value=False)
|
||||
ttk.Checkbutton(cam_opts, text="Features",
|
||||
variable=self.show_features_var).pack(side=tk.LEFT, padx=(10, 0))
|
||||
|
||||
self.live_focus_var = tk.BooleanVar(value=True)
|
||||
ttk.Checkbutton(cam_opts, text="Live focus",
|
||||
variable=self.live_focus_var).pack(side=tk.LEFT, padx=(10, 0))
|
||||
|
||||
self.focus_score_label = ttk.Label(cam_opts, text="Focus: --", font=('Arial', 11, 'bold'))
|
||||
self.focus_score_label.pack(side=tk.RIGHT)
|
||||
|
||||
# === BOTTOM: Control Panel ===
|
||||
control_frame = ttk.Frame(main_frame)
|
||||
control_frame.pack(fill=tk.X)
|
||||
|
||||
# Row 1: Emergency Stop + Scanner Controls
|
||||
self._build_row1_emergency_scanner(control_frame)
|
||||
|
||||
# Row 2: Movement Controls
|
||||
self._build_row2_movement(control_frame)
|
||||
|
||||
# Row 3: Speed + Autofocus
|
||||
self._build_row3_speed_autofocus(control_frame)
|
||||
|
||||
# Row 4: Status + Log
|
||||
self._build_row4_status_log(control_frame)
|
||||
|
||||
def _build_row1_emergency_scanner(self, parent):
|
||||
"""Row 1: Emergency stop and scanner controls"""
|
||||
row = ttk.Frame(parent)
|
||||
row.pack(fill=tk.X, pady=(0, 3))
|
||||
|
||||
# Emergency stop button
|
||||
self.emergency_btn = tk.Button(
|
||||
frame,
|
||||
text="⚠ EMERGENCY STOP ⚠",
|
||||
command=self.motion.stop_all,
|
||||
row, text="⚠ STOP", command=self._emergency_stop,
|
||||
bg='red', fg='white', font=('Arial', 12, 'bold'),
|
||||
height=2
|
||||
width=8, height=1
|
||||
)
|
||||
self.emergency_btn.pack(fill=tk.X)
|
||||
self.emergency_btn.pack(side=tk.LEFT, padx=(0, 10))
|
||||
|
||||
def _build_speed_controls(self, parent):
|
||||
frame = ttk.LabelFrame(parent, text="Speed")
|
||||
frame.pack(fill=tk.X, pady=(0, 10))
|
||||
# Scanner controls
|
||||
scanner_frame = ttk.LabelFrame(row, text="Scanner")
|
||||
scanner_frame.pack(side=tk.LEFT, fill=tk.X, expand=True)
|
||||
|
||||
btn_frame = ttk.Frame(frame)
|
||||
btn_frame.pack(fill=tk.X, padx=5, pady=5)
|
||||
sf = ttk.Frame(scanner_frame)
|
||||
sf.pack(fill=tk.X, padx=5, pady=3)
|
||||
|
||||
self.speed_var = tk.StringVar(value="Medium")
|
||||
# Status + Progress
|
||||
self.scan_status_label = ttk.Label(sf, text="Idle", font=('Arial', 9, 'bold'), width=8)
|
||||
self.scan_status_label.pack(side=tk.LEFT)
|
||||
|
||||
for text, value, preset in [("Slow", "Slow", "slow"),
|
||||
("Medium", "Medium", "medium"),
|
||||
("Fast", "Fast", "fast")]:
|
||||
ttk.Radiobutton(
|
||||
btn_frame, text=text, variable=self.speed_var,
|
||||
value=value, command=lambda p=preset: self.motion.set_speed_preset(p)
|
||||
).pack(side=tk.LEFT, padx=5)
|
||||
self.scan_progress_bar = ttk.Progressbar(sf, mode='indeterminate', length=80)
|
||||
self.scan_progress_bar.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
def _build_fine_speed_controls(self, parent):
|
||||
frame = ttk.LabelFrame(parent, text="Fine Speed Control")
|
||||
frame.pack(fill=tk.X, pady=(0, 10))
|
||||
self.scan_progress_label = ttk.Label(sf, text="", width=10)
|
||||
self.scan_progress_label.pack(side=tk.LEFT)
|
||||
|
||||
inner = ttk.Frame(frame)
|
||||
inner.pack(fill=tk.X, padx=5, pady=5)
|
||||
# Buttons
|
||||
self.scan_start_btn = tk.Button(
|
||||
sf, text="▶ Start", width=7, bg='green', fg='white',
|
||||
font=('Arial', 9, 'bold'), command=self._start_scan
|
||||
)
|
||||
self.scan_start_btn.pack(side=tk.LEFT, padx=2)
|
||||
|
||||
self.fine_speed_label = ttk.Label(inner, text="Speed: 50", font=('Arial', 10, 'bold'))
|
||||
self.fine_speed_label.pack(anchor=tk.W)
|
||||
self.scan_pause_btn = ttk.Button(sf, text="⏸", width=3,
|
||||
command=self._pause_scan, state='disabled')
|
||||
self.scan_pause_btn.pack(side=tk.LEFT, padx=1)
|
||||
|
||||
self.speed_slider = ttk.Scale(inner, from_=0, to=6, orient=tk.HORIZONTAL)
|
||||
self.speed_slider.set(5)
|
||||
self.speed_slider.config(command=self._on_speed_slider_change)
|
||||
self.speed_slider.pack(fill=tk.X, pady=(5, 0))
|
||||
self.scan_stop_btn = tk.Button(
|
||||
sf, text="⏹", width=3, bg='orange', fg='white',
|
||||
command=self._stop_scan, state='disabled'
|
||||
)
|
||||
self.scan_stop_btn.pack(side=tk.LEFT, padx=1)
|
||||
|
||||
btn_row = ttk.Frame(inner)
|
||||
btn_row.pack(fill=tk.X, pady=(5, 0))
|
||||
ttk.Button(sf, text="📷", width=3,
|
||||
command=self._capture_single_tile).pack(side=tk.LEFT, padx=1)
|
||||
|
||||
for i in range(6):
|
||||
ttk.Button(
|
||||
btn_row, text=str(i), width=2,
|
||||
command=lambda x=i: self._set_fine_speed(x)
|
||||
).pack(side=tk.LEFT, padx=1)
|
||||
ttk.Button(sf, text="Mosaic", width=6,
|
||||
command=self._show_mosaic_window).pack(side=tk.LEFT, padx=(5, 2))
|
||||
|
||||
def _build_movement_controls(self, parent):
|
||||
frame = ttk.LabelFrame(parent, text="Movement Control")
|
||||
frame.pack(fill=tk.X, pady=(0, 10))
|
||||
# AF interval
|
||||
ttk.Label(sf, text="AF:").pack(side=tk.LEFT, padx=(10, 2))
|
||||
self.af_every_var = tk.StringVar(value="5")
|
||||
ttk.Spinbox(sf, from_=1, to=20, width=3,
|
||||
textvariable=self.af_every_var).pack(side=tk.LEFT)
|
||||
|
||||
self.af_every_row_var = tk.BooleanVar(value=True)
|
||||
ttk.Checkbutton(sf, text="Row", variable=self.af_every_row_var).pack(side=tk.LEFT)
|
||||
|
||||
def _build_row2_movement(self, parent):
|
||||
"""Row 2: Movement controls for all axes"""
|
||||
row = ttk.LabelFrame(parent, text="Movement")
|
||||
row.pack(fill=tk.X, pady=(0, 3))
|
||||
|
||||
inner = ttk.Frame(row)
|
||||
inner.pack(fill=tk.X, padx=5, pady=3)
|
||||
|
||||
self.dir_labels = {}
|
||||
self.move_buttons = {}
|
||||
|
||||
for axis in ["X", "Y", "Z"]:
|
||||
row = ttk.Frame(frame)
|
||||
row.pack(fill=tk.X, pady=3, padx=5)
|
||||
af = ttk.Frame(inner)
|
||||
af.pack(side=tk.LEFT, padx=(0, 20))
|
||||
|
||||
ttk.Label(row, text=f"{axis}:", width=3, font=('Arial', 10, 'bold')).pack(side=tk.LEFT)
|
||||
ttk.Label(af, text=f"{axis}:", font=('Arial', 10, 'bold')).pack(side=tk.LEFT)
|
||||
|
||||
dir_btn = ttk.Button(row, text="+ →", width=5,
|
||||
dir_btn = ttk.Button(af, text="+→", width=4,
|
||||
command=lambda a=axis: self._toggle_direction(a))
|
||||
dir_btn.pack(side=tk.LEFT, padx=2)
|
||||
self.dir_labels[axis] = dir_btn
|
||||
|
||||
move_btn = tk.Button(row, text="Move", width=8, bg='green', fg='white',
|
||||
move_btn = tk.Button(af, text="Move", width=6, bg='#4CAF50', fg='white',
|
||||
command=lambda a=axis: self._toggle_movement(a))
|
||||
move_btn.pack(side=tk.LEFT, padx=2)
|
||||
self.move_buttons[axis] = move_btn
|
||||
|
||||
ttk.Button(row, text="Stop", width=6,
|
||||
command=lambda a=axis: self._stop_axis(a)).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(af, text="⏹", width=2,
|
||||
command=lambda a=axis: self._stop_axis(a)).pack(side=tk.LEFT)
|
||||
|
||||
ttk.Button(frame, text="Stop All Axes",
|
||||
command=self.motion.stop_all).pack(fill=tk.X, padx=5, pady=5)
|
||||
# Stop all
|
||||
tk.Button(inner, text="STOP ALL", width=10, bg='#f44336', fg='white',
|
||||
font=('Arial', 9, 'bold'),
|
||||
command=self.motion.stop_all).pack(side=tk.RIGHT)
|
||||
|
||||
def _build_autofocus_controls(self, parent):
|
||||
frame = ttk.LabelFrame(parent, text="Autofocus")
|
||||
frame.pack(fill=tk.X, pady=(0, 10))
|
||||
# Test feature detection button
|
||||
ttk.Button(inner, text="Test Features", width=11,
|
||||
command=self._test_feature_detection).pack(side=tk.RIGHT, padx=(0, 10))
|
||||
|
||||
inner = ttk.Frame(frame)
|
||||
inner.pack(fill=tk.X, padx=5, pady=5)
|
||||
def _build_row3_speed_autofocus(self, parent):
|
||||
"""Row 3: Speed and Autofocus controls"""
|
||||
row = ttk.Frame(parent)
|
||||
row.pack(fill=tk.X, pady=(0, 3))
|
||||
|
||||
self.focus_score_label = ttk.Label(inner, text="Focus: --", font=('Arial', 10))
|
||||
self.focus_score_label.pack(anchor=tk.W)
|
||||
# Speed controls
|
||||
speed_frame = ttk.LabelFrame(row, text="Speed")
|
||||
speed_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 5))
|
||||
|
||||
btn_row = ttk.Frame(inner)
|
||||
btn_row.pack(fill=tk.X, pady=(5, 0))
|
||||
sf = ttk.Frame(speed_frame)
|
||||
sf.pack(fill=tk.X, padx=5, pady=3)
|
||||
|
||||
self.af_button = ttk.Button(btn_row, text="Autofocus", command=self._start_autofocus)
|
||||
self.speed_var = tk.StringVar(value="Medium")
|
||||
for text, val, preset in [("S", "Slow", "slow"), ("M", "Medium", "medium"), ("F", "Fast", "fast")]:
|
||||
ttk.Radiobutton(sf, text=text, variable=self.speed_var, value=val,
|
||||
command=lambda p=preset: self.motion.set_speed_preset(p)).pack(side=tk.LEFT, padx=2)
|
||||
|
||||
ttk.Separator(sf, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=5)
|
||||
|
||||
self.fine_speed_label = ttk.Label(sf, text="50", width=3)
|
||||
self.fine_speed_label.pack(side=tk.LEFT)
|
||||
|
||||
self.speed_slider = ttk.Scale(sf, from_=0, to=6, orient=tk.HORIZONTAL, length=80)
|
||||
self.speed_slider.set(5)
|
||||
self.speed_slider.config(command=self._on_speed_slider_change)
|
||||
self.speed_slider.pack(side=tk.LEFT, padx=3)
|
||||
|
||||
# Autofocus controls
|
||||
af_frame = ttk.LabelFrame(row, text="Autofocus")
|
||||
af_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
||||
|
||||
af = ttk.Frame(af_frame)
|
||||
af.pack(fill=tk.X, padx=5, pady=3)
|
||||
|
||||
self.af_button = ttk.Button(af, text="Auto", width=6, command=self._start_autofocus)
|
||||
self.af_button.pack(side=tk.LEFT, padx=2)
|
||||
|
||||
ttk.Button(btn_row, text="Coarse AF",
|
||||
ttk.Button(af, text="Coarse", width=6,
|
||||
command=lambda: self._start_autofocus(coarse=True)).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(btn_row, text="Fine AF",
|
||||
ttk.Button(af, text="Fine", width=5,
|
||||
command=lambda: self._start_autofocus(fine=True)).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(btn_row, text="Stop AF",
|
||||
ttk.Button(af, text="Stop", width=5,
|
||||
command=self._stop_autofocus).pack(side=tk.LEFT, padx=2)
|
||||
|
||||
self.live_focus_var = tk.BooleanVar(value=True)
|
||||
ttk.Checkbutton(inner, text="Show live focus score",
|
||||
variable=self.live_focus_var).pack(anchor=tk.W, pady=(5, 0))
|
||||
def _build_row4_status_log(self, parent):
|
||||
"""Row 4: Status and Log"""
|
||||
row = ttk.Frame(parent)
|
||||
row.pack(fill=tk.X, pady=(0, 3))
|
||||
|
||||
def _build_status_controls(self, parent):
|
||||
frame = ttk.LabelFrame(parent, text="Status")
|
||||
frame.pack(fill=tk.X, pady=(0, 10))
|
||||
# Status controls
|
||||
status_frame = ttk.LabelFrame(row, text="Status")
|
||||
status_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 5))
|
||||
|
||||
btn_frame = ttk.Frame(frame)
|
||||
btn_frame.pack(fill=tk.X, padx=5, pady=5)
|
||||
stf = ttk.Frame(status_frame)
|
||||
stf.pack(fill=tk.X, padx=5, pady=3)
|
||||
|
||||
ttk.Button(btn_frame, text="Status", command=self.motion.request_status).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(btn_frame, text="Zero All", command=self.motion.zero_all).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(btn_frame, text="Go Origin", command=self.motion.go_origin).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(stf, text="Status", width=6, command=self.motion.request_status).pack(side=tk.LEFT, padx=2)
|
||||
self.mode_label = ttk.Label(stf, text="Serial", width=8)
|
||||
self.mode_label.pack(side=tk.LEFT, padx=(10, 0))
|
||||
ttk.Button(stf, text="Mode", width=5, command=self.motion.toggle_mode).pack(side=tk.LEFT, padx=2)
|
||||
|
||||
def _build_mode_controls(self, parent):
|
||||
frame = ttk.LabelFrame(parent, text="Control Mode")
|
||||
frame.pack(fill=tk.X, pady=(0, 10))
|
||||
# Log
|
||||
log_frame = ttk.LabelFrame(row, text="Log")
|
||||
log_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
||||
|
||||
btn_frame = ttk.Frame(frame)
|
||||
btn_frame.pack(fill=tk.X, padx=5, pady=5)
|
||||
self.serial_log = scrolledtext.ScrolledText(log_frame, height=4, state=tk.DISABLED)
|
||||
self.serial_log.pack(fill=tk.BOTH, expand=True, padx=5, pady=(5, 3))
|
||||
|
||||
self.mode_label = ttk.Label(btn_frame, text="Mode: Serial", font=('Arial', 9))
|
||||
self.mode_label.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
ttk.Button(btn_frame, text="Toggle Mode", command=self.motion.toggle_mode).pack(side=tk.RIGHT, padx=2)
|
||||
|
||||
def _build_serial_log(self, parent):
|
||||
ttk.Label(parent, text="Serial Log:").pack(anchor=tk.W)
|
||||
self.serial_log = scrolledtext.ScrolledText(parent, height=10, width=35, state=tk.DISABLED)
|
||||
self.serial_log.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
|
||||
|
||||
def _build_command_entry(self, parent):
|
||||
ttk.Label(parent, text="Send Command:").pack(anchor=tk.W)
|
||||
cmd_frame = ttk.Frame(parent)
|
||||
cmd_frame.pack(fill=tk.X)
|
||||
cmd_frame = ttk.Frame(log_frame)
|
||||
cmd_frame.pack(fill=tk.X, padx=5, pady=(0, 5))
|
||||
|
||||
self.cmd_entry = ttk.Entry(cmd_frame)
|
||||
self.cmd_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
|
||||
self.cmd_entry.bind("<Return>", lambda e: self._send_custom_command())
|
||||
|
||||
ttk.Button(cmd_frame, text="Send", command=self._send_custom_command).pack(side=tk.RIGHT, padx=(5, 0))
|
||||
ttk.Button(cmd_frame, text="Send", width=6,
|
||||
command=self._send_custom_command).pack(side=tk.RIGHT, padx=(5, 0))
|
||||
|
||||
# === UI Event Handlers ===
|
||||
# =========================================================================
|
||||
# Feature Detection Testing
|
||||
# =========================================================================
|
||||
|
||||
def _test_feature_detection(self):
|
||||
"""Test and log feature detection on current frame"""
|
||||
if not self.scanner:
|
||||
self.log_message("Scanner not initialized")
|
||||
return
|
||||
|
||||
try:
|
||||
frame = self.camera.capture_frame()
|
||||
|
||||
# Detect features on all edges
|
||||
for edge in ['left', 'right', 'top', 'bottom']:
|
||||
region = self.scanner.get_edge_region(frame, edge)
|
||||
kp, desc = self.scanner.detect_features(region)
|
||||
count = len(kp) if kp else 0
|
||||
self.log_message(f" {edge.upper()}: {count} features")
|
||||
|
||||
# Also update scanner's visualization data
|
||||
self.scanner._update_edge_features(frame)
|
||||
|
||||
# Enable feature view
|
||||
self.show_features_var.set(True)
|
||||
|
||||
self.log_message("Feature detection test complete - enable 'Features' checkbox to see overlay")
|
||||
|
||||
except Exception as e:
|
||||
self.log_message(f"Feature detection error: {e}")
|
||||
|
||||
# =========================================================================
|
||||
# Scanner Handlers
|
||||
# =========================================================================
|
||||
|
||||
def _update_scan_config(self):
|
||||
"""Update scanner config from GUI values"""
|
||||
self.scan_config.autofocus_every_n_tiles = int(self.af_every_var.get())
|
||||
self.scan_config.autofocus_every_row = self.af_every_row_var.get()
|
||||
self._init_scanner()
|
||||
|
||||
def _start_scan(self):
|
||||
self._update_scan_config()
|
||||
# Enable feature view during scan
|
||||
self.show_features_var.set(True)
|
||||
if self.scanner.start():
|
||||
self.scan_start_btn.config(state='disabled')
|
||||
self.scan_pause_btn.config(state='normal')
|
||||
self.scan_stop_btn.config(state='normal')
|
||||
self.scan_status_label.config(text="Scanning")
|
||||
self.scan_progress_bar.start(10)
|
||||
|
||||
def _pause_scan(self):
|
||||
if self.scanner.paused:
|
||||
self.scanner.resume()
|
||||
self.scan_pause_btn.config(text="⏸")
|
||||
self.scan_status_label.config(text="Scanning")
|
||||
self.scan_progress_bar.start(10)
|
||||
else:
|
||||
self.scanner.pause()
|
||||
self.scan_pause_btn.config(text="▶")
|
||||
self.scan_status_label.config(text="Paused")
|
||||
self.scan_progress_bar.stop()
|
||||
|
||||
def _stop_scan(self):
|
||||
self.scanner.stop()
|
||||
self._scan_finished()
|
||||
|
||||
def _scan_finished(self):
|
||||
self.scan_start_btn.config(state='normal')
|
||||
self.scan_pause_btn.config(state='disabled', text="⏸")
|
||||
self.scan_stop_btn.config(state='disabled')
|
||||
self.scan_status_label.config(text="Idle")
|
||||
self.scan_progress_bar.stop()
|
||||
|
||||
def _update_progress(self, current: int, total: int):
|
||||
if total > 0:
|
||||
self.scan_progress_label.config(text=f"{current}/{total}")
|
||||
else:
|
||||
self.scan_progress_label.config(text=f"{current} tiles")
|
||||
|
||||
def _capture_single_tile(self):
|
||||
if self.scanner:
|
||||
self.scanner.capture_tile()
|
||||
self._update_mosaic_window()
|
||||
|
||||
def _emergency_stop(self):
|
||||
self.motion.stop_all()
|
||||
if self.scanner and self.scanner.running:
|
||||
self.scanner.stop()
|
||||
self._scan_finished()
|
||||
if self.autofocus.is_running():
|
||||
self.autofocus.stop()
|
||||
|
||||
# =========================================================================
|
||||
# Mosaic Window
|
||||
# =========================================================================
|
||||
|
||||
def _show_mosaic_window(self):
|
||||
if self.mosaic_window is not None:
|
||||
self.mosaic_window.lift()
|
||||
self._update_mosaic_window()
|
||||
return
|
||||
|
||||
self.mosaic_window = tk.Toplevel(self.root)
|
||||
self.mosaic_window.title("Mosaic")
|
||||
self.mosaic_window.geometry("600x800")
|
||||
self.mosaic_window.protocol("WM_DELETE_WINDOW", self._close_mosaic_window)
|
||||
|
||||
self.mosaic_label = ttk.Label(self.mosaic_window, text="No tiles captured")
|
||||
self.mosaic_label.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
|
||||
|
||||
btn_frame = ttk.Frame(self.mosaic_window)
|
||||
btn_frame.pack(fill=tk.X, padx=10, pady=(0, 10))
|
||||
|
||||
ttk.Button(btn_frame, text="Save Full Resolution",
|
||||
command=self._save_mosaic).pack(side=tk.LEFT, padx=5)
|
||||
ttk.Button(btn_frame, text="Refresh",
|
||||
command=self._update_mosaic_window).pack(side=tk.LEFT, padx=5)
|
||||
|
||||
self._update_mosaic_window()
|
||||
|
||||
def _update_mosaic_window(self):
|
||||
if self.mosaic_window is None or not self.scanner or not self.scanner.tiles:
|
||||
return
|
||||
|
||||
mosaic = self.scanner.get_mosaic_preview(max_size=580)
|
||||
if mosaic is None:
|
||||
return
|
||||
|
||||
mosaic_rgb = cv2.cvtColor(mosaic, cv2.COLOR_BGR2RGB)
|
||||
img = Image.fromarray(mosaic_rgb)
|
||||
imgtk = ImageTk.PhotoImage(image=img)
|
||||
|
||||
self.mosaic_label.imgtk = imgtk
|
||||
self.mosaic_label.config(image=imgtk, text="")
|
||||
|
||||
def _close_mosaic_window(self):
|
||||
if self.mosaic_window:
|
||||
self.mosaic_window.destroy()
|
||||
self.mosaic_window = None
|
||||
|
||||
def _save_mosaic(self):
|
||||
if not self.scanner or not self.scanner.tiles:
|
||||
self.log_message("No tiles to save")
|
||||
return
|
||||
|
||||
from tkinter import filedialog
|
||||
filename = filedialog.asksaveasfilename(
|
||||
defaultextension=".png",
|
||||
filetypes=[("PNG", "*.png"), ("JPEG", "*.jpg"), ("All", "*.*")]
|
||||
)
|
||||
if filename:
|
||||
mosaic = self.scanner.build_mosaic(scale=1.0)
|
||||
if mosaic is not None:
|
||||
cv2.imwrite(filename, mosaic)
|
||||
self.log_message(f"Saved: {filename}")
|
||||
|
||||
# =========================================================================
|
||||
# Movement Handlers
|
||||
# =========================================================================
|
||||
|
||||
def _toggle_direction(self, axis):
|
||||
direction = self.motion.toggle_direction(axis)
|
||||
arrow = "→" if direction == 1 else "←"
|
||||
sign = "+" if direction == 1 else "-"
|
||||
self.dir_labels[axis].config(text=f"{sign} {arrow}")
|
||||
self.dir_labels[axis].config(text=f"{sign}{arrow}")
|
||||
|
||||
if self.motion.is_moving(axis):
|
||||
self.motion.stop_axis(axis)
|
||||
|
|
@ -245,7 +498,7 @@ class AppGUI:
|
|||
|
||||
def _stop_axis(self, axis):
|
||||
self.motion.stop_axis(axis)
|
||||
self.move_buttons[axis].config(text="Move", bg='green')
|
||||
self.move_buttons[axis].config(text="Move", bg='#4CAF50')
|
||||
|
||||
def _on_speed_slider_change(self, value):
|
||||
if self._updating_slider:
|
||||
|
|
@ -257,7 +510,7 @@ class AppGUI:
|
|||
self._updating_slider = True
|
||||
self.speed_slider.set(index)
|
||||
speed_val = self.motion.get_speed_value(index)
|
||||
self.fine_speed_label.config(text=f"Speed: {speed_val}")
|
||||
self.fine_speed_label.config(text=str(speed_val))
|
||||
self._updating_slider = False
|
||||
self.motion.set_speed(index)
|
||||
|
||||
|
|
@ -275,23 +528,24 @@ class AppGUI:
|
|||
self.motion.send_command(cmd)
|
||||
self.cmd_entry.delete(0, tk.END)
|
||||
|
||||
# === Logging ===
|
||||
# =========================================================================
|
||||
# Logging
|
||||
# =========================================================================
|
||||
|
||||
def log_message(self, msg):
|
||||
if not hasattr(self, 'serial_log'):
|
||||
return
|
||||
|
||||
self.serial_log.config(state=tk.NORMAL)
|
||||
self.serial_log.insert(tk.END, msg + "\n")
|
||||
self.serial_log.see(tk.END)
|
||||
self.serial_log.config(state=tk.DISABLED)
|
||||
print(f" Log: {msg}")
|
||||
|
||||
if msg.startswith("< MODE:"):
|
||||
mode = msg.split(":")[-1]
|
||||
self.mode_label.config(text=f"Mode: {mode.capitalize()}")
|
||||
self.mode_label.config(text=msg.split(":")[-1].capitalize())
|
||||
|
||||
# === Serial & Camera ===
|
||||
# =========================================================================
|
||||
# Serial & Camera
|
||||
# =========================================================================
|
||||
|
||||
def _serial_reader(self):
|
||||
while self.running:
|
||||
|
|
@ -306,24 +560,44 @@ class AppGUI:
|
|||
|
||||
def _process_serial_queue(self):
|
||||
while not self.serial_queue.empty():
|
||||
msg = self.serial_queue.get_nowait()
|
||||
self.log_message(f"< {msg}")
|
||||
self.log_message(f"< {self.serial_queue.get_nowait()}")
|
||||
|
||||
def _process_tile_queue(self):
|
||||
updated = False
|
||||
while not self.tile_queue.empty():
|
||||
self.tile_queue.get_nowait()
|
||||
updated = True
|
||||
if updated and self.mosaic_window:
|
||||
self._update_mosaic_window()
|
||||
|
||||
def update_camera(self):
|
||||
try:
|
||||
frame = self.camera.capture_frame()
|
||||
frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
|
||||
|
||||
if self.live_focus_var.get() and not self.autofocus.is_running():
|
||||
score = self.autofocus.get_focus_score()
|
||||
self.focus_score_label.config(text=f"Focus: {score:.1f}")
|
||||
|
||||
frame = self._draw_crosshair(frame)
|
||||
# Apply overlays
|
||||
frame = self._draw_overlays(frame)
|
||||
|
||||
# Apply feature overlay if enabled
|
||||
if self.show_features_var.get() and self.scanner:
|
||||
# Update edge features periodically
|
||||
self.scanner._update_edge_features(frame)
|
||||
# Get overlay from scanner
|
||||
frame = self.scanner.get_feature_overlay(frame)
|
||||
|
||||
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
|
||||
# Scale to fit - maximize for vertical monitor
|
||||
h, w = frame.shape[:2]
|
||||
max_width = 640
|
||||
if w > max_width:
|
||||
scale = max_width / w
|
||||
max_h = 700
|
||||
max_w = 650
|
||||
|
||||
scale = min(max_h / h, max_w / w)
|
||||
if scale < 1:
|
||||
frame = cv2.resize(frame, (int(w * scale), int(h * scale)))
|
||||
|
||||
img = Image.fromarray(frame)
|
||||
|
|
@ -334,45 +608,53 @@ class AppGUI:
|
|||
except Exception as e:
|
||||
print(f"Camera error: {e}")
|
||||
|
||||
def _draw_crosshair(self, frame, color=(0, 0, 255), thickness=1, gap=0, size=40):
|
||||
"""
|
||||
Draw a crosshair at the center of the frame.
|
||||
|
||||
Args:
|
||||
frame: OpenCV image
|
||||
color: BGR color tuple (default green)
|
||||
thickness: Line thickness
|
||||
gap: Gap in center of crosshair
|
||||
size: Length of crosshair arms from center
|
||||
"""
|
||||
h, w = frame.shape[:2]
|
||||
cx, cy = w // 2, h // 2
|
||||
|
||||
# Horizontal lines (left and right of center gap)
|
||||
cv2.line(frame, (cx - size, cy), (cx - gap, cy), color, thickness)
|
||||
cv2.line(frame, (cx + gap, cy), (cx + size, cy), color, thickness)
|
||||
|
||||
# Vertical lines (above and below center gap)
|
||||
cv2.line(frame, (cx, cy - size), (cx, cy - gap), color, thickness)
|
||||
cv2.line(frame, (cx, cy + gap), (cx, cy + size), color, thickness)
|
||||
|
||||
# Optional: center dot
|
||||
# cv2.circle(frame, (cx, cy), 2, color, -1)
|
||||
|
||||
def _draw_overlays(self, frame):
|
||||
frame = self._draw_crosshair(frame)
|
||||
if self.show_tile_overlay_var.get():
|
||||
frame = self._draw_tile_boundary(frame)
|
||||
if self.show_edge_regions_var.get():
|
||||
frame = self._draw_edge_regions(frame)
|
||||
return frame
|
||||
|
||||
# === Main Loop ===
|
||||
def _draw_crosshair(self, frame, color=(0, 0, 255), thickness=1, size=40):
|
||||
h, w = frame.shape[:2]
|
||||
cx, cy = w // 2, h // 2
|
||||
cv2.line(frame, (cx - size, cy), (cx + size, cy), color, thickness)
|
||||
cv2.line(frame, (cx, cy - size), (cx, cy + size), color, thickness)
|
||||
return frame
|
||||
|
||||
def _draw_tile_boundary(self, frame, color=(0, 255, 0), thickness=2):
|
||||
h, w = frame.shape[:2]
|
||||
bh, bw = int(h * 0.10), int(w * 0.10)
|
||||
cv2.rectangle(frame, (bw, bh), (w - bw, h - bh), color, thickness)
|
||||
return frame
|
||||
|
||||
def _draw_edge_regions(self, frame, color=(255, 255, 0), thickness=1):
|
||||
h, w = frame.shape[:2]
|
||||
bh, bw = int(h * 0.10), int(w * 0.10)
|
||||
cv2.rectangle(frame, (0, 0), (bw, h), color, thickness)
|
||||
cv2.rectangle(frame, (w - bw, 0), (w, h), color, thickness)
|
||||
cv2.rectangle(frame, (0, 0), (w, bh), color, thickness)
|
||||
cv2.rectangle(frame, (0, h - bh), (w, h), color, thickness)
|
||||
return frame
|
||||
|
||||
# =========================================================================
|
||||
# Main Loop
|
||||
# =========================================================================
|
||||
|
||||
def run(self):
|
||||
def update():
|
||||
if self.running:
|
||||
self.update_camera()
|
||||
self._process_serial_queue()
|
||||
self._process_tile_queue()
|
||||
|
||||
# Re-enable AF button when done
|
||||
if not self.autofocus.is_running():
|
||||
self.af_button.config(state='normal')
|
||||
|
||||
if self.scanner and not self.scanner.running and self.scan_start_btn['state'] == 'disabled':
|
||||
self._scan_finished()
|
||||
|
||||
self.root.after(33, update)
|
||||
|
||||
update()
|
||||
|
|
@ -380,5 +662,8 @@ class AppGUI:
|
|||
|
||||
def on_close(self):
|
||||
self.running = False
|
||||
if self.scanner:
|
||||
self.scanner.stop()
|
||||
self.autofocus.stop()
|
||||
self._close_mosaic_window()
|
||||
self.root.destroy()
|
||||
|
|
@ -2,12 +2,12 @@ class MotionController:
|
|||
# Command mapping for each axis
|
||||
AXIS_COMMANDS = {
|
||||
'X': {'pos': 'E', 'neg': 'W', 'stop': 'e'},
|
||||
'Y': {'pos': 'N', 'neg': 'S', 'stop': 'n'},
|
||||
'Y': {'pos': 'S', 'neg': 'N', 'stop': 'n'},
|
||||
'Z': {'pos': 'U', 'neg': 'D', 'stop': 'u'}
|
||||
}
|
||||
|
||||
# Speed values matching Arduino speedArr
|
||||
SPEED_VALUES = [0, 2, 5, 10, 30, 50, 70]
|
||||
SPEED_VALUES = [0, 1, 2, 5, 10, 30, 50, 70]
|
||||
|
||||
def __init__(self, arduino, on_command_sent=None):
|
||||
"""
|
||||
|
|
|
|||
883
src/scanner.py
883
src/scanner.py
|
|
@ -0,0 +1,883 @@
|
|||
"""
|
||||
Scanner Module - Automated slide scanning with visual feedback navigation
|
||||
|
||||
Uses feature tracking to detect tile boundaries instead of step counting.
|
||||
Camera orientation: Portrait (height > width), X=left/right, Y=up/down
|
||||
|
||||
Enhanced with feature visualization for debugging.
|
||||
"""
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import time
|
||||
import threading
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Tuple, Optional, Callable, Dict
|
||||
from enum import Enum
|
||||
from collections import deque
|
||||
|
||||
|
||||
class ScanDirection(Enum):
|
||||
"""Scan direction constants"""
|
||||
RIGHT = 'right' # X+ (E command)
|
||||
LEFT = 'left' # X- (W command)
|
||||
DOWN = 'down' # Y- (N command)
|
||||
UP = 'up' # Y+ (S command)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Tile:
|
||||
"""Represents a captured tile"""
|
||||
image: np.ndarray
|
||||
row: int
|
||||
col: int
|
||||
x_pos: int # Grid position
|
||||
y_pos: int
|
||||
focus_score: float
|
||||
timestamp: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class ScanConfig:
|
||||
"""Scanner configuration"""
|
||||
# Tile extraction
|
||||
tile_percentage: float = 0.80 # Center 80% is the tile
|
||||
border_percentage: float = 0.10 # 10% on each side for tracking
|
||||
|
||||
# Feature detection
|
||||
feature_detector: str = 'SIFT' # 'ORB', 'SIFT', or 'AKAZE'
|
||||
min_features: int = 10 # Minimum features to track
|
||||
match_threshold: float = 0.75 # Feature match confidence
|
||||
min_good_matches: int = 5 # Matches needed to confirm movement
|
||||
|
||||
# Movement timing
|
||||
move_check_interval: float = 0.1 # Seconds between frame checks
|
||||
settle_time: float = 0.2 # Seconds to wait after stopping
|
||||
max_move_time: float = 10.0 # Safety timeout per tile
|
||||
|
||||
# Scan limits (number of tiles)
|
||||
max_tiles_per_row: int = 50 # Safety limit
|
||||
max_rows: int = 50 # Safety limit
|
||||
|
||||
# Scan pattern
|
||||
start_direction: ScanDirection = ScanDirection.RIGHT
|
||||
|
||||
# Autofocus
|
||||
autofocus_every_n_tiles: int = -1 # Refocus periodically
|
||||
autofocus_every_row: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class FeatureVisualization:
|
||||
"""Stores feature visualization data for debugging overlay"""
|
||||
# Current features on each edge
|
||||
left_features: List[cv2.KeyPoint] = field(default_factory=list)
|
||||
right_features: List[cv2.KeyPoint] = field(default_factory=list)
|
||||
top_features: List[cv2.KeyPoint] = field(default_factory=list)
|
||||
bottom_features: List[cv2.KeyPoint] = field(default_factory=list)
|
||||
|
||||
# Reference features being tracked (from leading edge)
|
||||
reference_features: List[cv2.KeyPoint] = field(default_factory=list)
|
||||
reference_edge: str = ''
|
||||
|
||||
# Matched features on target edge
|
||||
matched_features: List[cv2.KeyPoint] = field(default_factory=list)
|
||||
target_edge: str = ''
|
||||
|
||||
# Match quality info
|
||||
num_good_matches: int = 0
|
||||
match_threshold: int = 5
|
||||
|
||||
# Feature position history for trails (deque of (x, y, timestamp))
|
||||
feature_trails: Dict[int, deque] = field(default_factory=dict)
|
||||
|
||||
# Active tracking state
|
||||
is_tracking: bool = False
|
||||
tracking_direction: str = ''
|
||||
|
||||
# Movement detection info
|
||||
shift_detected: bool = False
|
||||
movement_progress: float = 0.0 # Estimated 0-1 progress
|
||||
|
||||
|
||||
class Scanner:
|
||||
"""
|
||||
Automated slide scanner using visual feedback for navigation.
|
||||
|
||||
Key principle: Use camera as position sensor, not step counting.
|
||||
Boundary detection: Feature matching - when features stop shifting,
|
||||
we've likely hit the edge of the slide content.
|
||||
"""
|
||||
|
||||
def __init__(self, camera, motion_controller, autofocus_controller=None,
|
||||
config: ScanConfig = None,
|
||||
on_tile_captured: Callable[[Tile], None] = None,
|
||||
on_log: Callable[[str], None] = None,
|
||||
on_progress: Callable[[int, int], None] = None):
|
||||
"""
|
||||
Args:
|
||||
camera: Camera instance
|
||||
motion_controller: MotionController instance
|
||||
autofocus_controller: Optional AutofocusController
|
||||
config: ScanConfig settings
|
||||
on_tile_captured: Callback(tile) when tile is captured
|
||||
on_log: Callback(message) for logging
|
||||
on_progress: Callback(current, total) for progress updates
|
||||
"""
|
||||
self.camera = camera
|
||||
self.motion = motion_controller
|
||||
self.autofocus = autofocus_controller
|
||||
self.config = config or ScanConfig()
|
||||
|
||||
# Callbacks
|
||||
self.on_tile_captured = on_tile_captured
|
||||
self.on_log = on_log
|
||||
self.on_progress = on_progress
|
||||
|
||||
# State
|
||||
self.running = False
|
||||
self.paused = False
|
||||
self.tiles: List[Tile] = []
|
||||
self.current_row = 0
|
||||
self.current_col = 0
|
||||
self.tiles_captured = 0
|
||||
|
||||
# Feature detector
|
||||
self._init_feature_detector()
|
||||
|
||||
# Thread
|
||||
self._thread = None
|
||||
|
||||
# Feature visualization for debugging
|
||||
self.feature_viz = FeatureVisualization()
|
||||
self._viz_lock = threading.Lock()
|
||||
|
||||
# Trail history length (frames)
|
||||
self.trail_history_length = 30
|
||||
|
||||
def _init_feature_detector(self):
|
||||
"""Initialize the feature detector based on config"""
|
||||
if self.config.feature_detector == 'ORB':
|
||||
self.detector = cv2.ORB_create(nfeatures=500)
|
||||
self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
|
||||
elif self.config.feature_detector == 'SIFT':
|
||||
self.detector = cv2.SIFT_create()
|
||||
self.matcher = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False)
|
||||
elif self.config.feature_detector == 'AKAZE':
|
||||
self.detector = cv2.AKAZE_create()
|
||||
self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
|
||||
else:
|
||||
raise ValueError(f"Unknown feature detector: {self.config.feature_detector}")
|
||||
|
||||
def log(self, message: str):
|
||||
"""Log a message"""
|
||||
if self.on_log:
|
||||
self.on_log(f"[Scanner] {message}")
|
||||
print(f"[Scanner] {message}")
|
||||
|
||||
# =========================================================================
|
||||
# Feature Visualization
|
||||
# =========================================================================
|
||||
|
||||
def get_feature_overlay(self, frame: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Draw feature visualization overlay on the frame.
|
||||
|
||||
Args:
|
||||
frame: Input frame to draw on (will be copied)
|
||||
|
||||
Returns:
|
||||
Frame with feature overlays drawn
|
||||
"""
|
||||
overlay = frame.copy()
|
||||
h, w = frame.shape[:2]
|
||||
border_h = int(h * self.config.border_percentage)
|
||||
border_w = int(w * self.config.border_percentage)
|
||||
|
||||
with self._viz_lock:
|
||||
viz = self.feature_viz
|
||||
|
||||
# Color scheme
|
||||
COLORS = {
|
||||
'left': (255, 100, 100), # Blue-ish
|
||||
'right': (100, 100, 255), # Red-ish
|
||||
'top': (100, 255, 100), # Green-ish
|
||||
'bottom': (255, 255, 100), # Cyan-ish
|
||||
'reference': (0, 255, 255), # Yellow - reference features
|
||||
'matched': (0, 255, 0), # Green - matched features
|
||||
'trail': (255, 0, 255), # Magenta - feature trails
|
||||
}
|
||||
|
||||
# Draw edge region boundaries (semi-transparent)
|
||||
overlay_alpha = overlay.copy()
|
||||
|
||||
# Left edge region
|
||||
cv2.rectangle(overlay_alpha, (0, 0), (border_w, h), COLORS['left'], -1)
|
||||
# Right edge region
|
||||
cv2.rectangle(overlay_alpha, (w - border_w, 0), (w, h), COLORS['right'], -1)
|
||||
# Top edge region
|
||||
cv2.rectangle(overlay_alpha, (0, 0), (w, border_h), COLORS['top'], -1)
|
||||
# Bottom edge region
|
||||
cv2.rectangle(overlay_alpha, (0, h - border_h), (w, h), COLORS['bottom'], -1)
|
||||
|
||||
# Blend with transparency
|
||||
cv2.addWeighted(overlay_alpha, 0.1, overlay, 0.9, 0, overlay)
|
||||
|
||||
# Draw features on each edge
|
||||
self._draw_edge_features(overlay, viz.left_features, 'left',
|
||||
border_w, COLORS['left'])
|
||||
self._draw_edge_features(overlay, viz.right_features, 'right',
|
||||
border_w, COLORS['right'], x_offset=w-border_w)
|
||||
self._draw_edge_features(overlay, viz.top_features, 'top',
|
||||
border_h, COLORS['top'])
|
||||
self._draw_edge_features(overlay, viz.bottom_features, 'bottom',
|
||||
border_h, COLORS['bottom'], y_offset=h-border_h)
|
||||
|
||||
# Draw reference features with larger circles
|
||||
if viz.is_tracking and viz.reference_features:
|
||||
ref_offset = self._get_edge_offset(viz.reference_edge, w, h, border_w, border_h)
|
||||
for kp in viz.reference_features:
|
||||
pt = (int(kp.pt[0] + ref_offset[0]), int(kp.pt[1] + ref_offset[1]))
|
||||
cv2.circle(overlay, pt, 8, COLORS['reference'], 2)
|
||||
cv2.circle(overlay, pt, 3, COLORS['reference'], -1)
|
||||
|
||||
# Draw matched features with connecting lines
|
||||
if viz.is_tracking and viz.matched_features:
|
||||
target_offset = self._get_edge_offset(viz.target_edge, w, h, border_w, border_h)
|
||||
for kp in viz.matched_features:
|
||||
pt = (int(kp.pt[0] + target_offset[0]), int(kp.pt[1] + target_offset[1]))
|
||||
cv2.circle(overlay, pt, 8, COLORS['matched'], 2)
|
||||
cv2.circle(overlay, pt, 3, COLORS['matched'], -1)
|
||||
|
||||
# Draw feature trails
|
||||
for trail_id, trail in viz.feature_trails.items():
|
||||
if len(trail) > 1:
|
||||
points = [(int(p[0]), int(p[1])) for p in trail]
|
||||
for i in range(1, len(points)):
|
||||
# Fade trail from bright to dim
|
||||
alpha = i / len(points)
|
||||
color = tuple(int(c * alpha) for c in COLORS['trail'])
|
||||
cv2.line(overlay, points[i-1], points[i], color, 2)
|
||||
|
||||
# Draw tracking status info
|
||||
self._draw_tracking_info(overlay, viz, w, h)
|
||||
|
||||
return overlay
|
||||
|
||||
def _draw_edge_features(self, frame, keypoints, edge, region_size, color,
|
||||
x_offset=0, y_offset=0):
|
||||
"""Draw feature points for an edge region"""
|
||||
for kp in keypoints:
|
||||
pt = (int(kp.pt[0] + x_offset), int(kp.pt[1] + y_offset))
|
||||
# Draw circle with size proportional to keypoint size
|
||||
size = max(3, int(kp.size / 4)) if kp.size > 0 else 4
|
||||
cv2.circle(frame, pt, size, color, 1)
|
||||
# Draw center dot
|
||||
cv2.circle(frame, pt, 2, color, -1)
|
||||
|
||||
def _get_edge_offset(self, edge, w, h, border_w, border_h):
|
||||
"""Get x, y offset for an edge region"""
|
||||
if edge == 'left':
|
||||
return (0, 0)
|
||||
elif edge == 'right':
|
||||
return (w - border_w, 0)
|
||||
elif edge == 'top':
|
||||
return (0, 0)
|
||||
elif edge == 'bottom':
|
||||
return (0, h - border_h)
|
||||
return (0, 0)
|
||||
|
||||
def _draw_tracking_info(self, frame, viz, w, h):
|
||||
"""Draw tracking status text overlay"""
|
||||
y_pos = 30
|
||||
line_height = 25
|
||||
|
||||
def draw_text(text, color=(255, 255, 255)):
|
||||
nonlocal y_pos
|
||||
cv2.putText(frame, text, (10, y_pos),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 3)
|
||||
cv2.putText(frame, text, (10, y_pos),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 1)
|
||||
y_pos += line_height
|
||||
|
||||
if viz.is_tracking:
|
||||
draw_text(f"TRACKING: {viz.tracking_direction}", (0, 255, 255))
|
||||
draw_text(f"Ref edge: {viz.reference_edge} ({len(viz.reference_features)} features)")
|
||||
draw_text(f"Target edge: {viz.target_edge}")
|
||||
|
||||
# Match status with color coding
|
||||
match_color = (0, 255, 0) if viz.num_good_matches >= viz.match_threshold else (0, 165, 255)
|
||||
draw_text(f"Matches: {viz.num_good_matches}/{viz.match_threshold}", match_color)
|
||||
|
||||
if viz.shift_detected:
|
||||
draw_text("SHIFT DETECTED!", (0, 255, 0))
|
||||
|
||||
# Progress bar
|
||||
bar_width = 200
|
||||
bar_height = 15
|
||||
bar_x = 10
|
||||
bar_y = y_pos
|
||||
progress = min(1.0, viz.movement_progress)
|
||||
cv2.rectangle(frame, (bar_x, bar_y),
|
||||
(bar_x + bar_width, bar_y + bar_height), (100, 100, 100), -1)
|
||||
cv2.rectangle(frame, (bar_x, bar_y),
|
||||
(bar_x + int(bar_width * progress), bar_y + bar_height),
|
||||
(0, 255, 0), -1)
|
||||
cv2.rectangle(frame, (bar_x, bar_y),
|
||||
(bar_x + bar_width, bar_y + bar_height), (255, 255, 255), 1)
|
||||
else:
|
||||
# Show feature counts on all edges
|
||||
draw_text(f"L:{len(viz.left_features)} R:{len(viz.right_features)} "
|
||||
f"T:{len(viz.top_features)} B:{len(viz.bottom_features)}")
|
||||
|
||||
def _update_edge_features(self, frame):
|
||||
"""Update feature visualization for all edges"""
|
||||
with self._viz_lock:
|
||||
self.feature_viz.left_features = self._detect_edge_keypoints(frame, 'left')
|
||||
self.feature_viz.right_features = self._detect_edge_keypoints(frame, 'right')
|
||||
self.feature_viz.top_features = self._detect_edge_keypoints(frame, 'top')
|
||||
self.feature_viz.bottom_features = self._detect_edge_keypoints(frame, 'bottom')
|
||||
|
||||
def _detect_edge_keypoints(self, frame, edge) -> List[cv2.KeyPoint]:
|
||||
"""Detect keypoints in an edge region"""
|
||||
region = self.get_edge_region(frame, edge)
|
||||
gray = cv2.cvtColor(region, cv2.COLOR_BGR2GRAY) if len(region.shape) == 3 else region
|
||||
keypoints, _ = self.detector.detectAndCompute(gray, None)
|
||||
return list(keypoints) if keypoints else []
|
||||
|
||||
def _update_tracking_visualization(self, ref_kps, ref_edge, target_edge,
|
||||
current_frame, good_matches, direction):
|
||||
"""Update visualization during tracking"""
|
||||
with self._viz_lock:
|
||||
viz = self.feature_viz
|
||||
viz.is_tracking = True
|
||||
viz.tracking_direction = direction
|
||||
viz.reference_edge = ref_edge
|
||||
viz.target_edge = target_edge
|
||||
viz.reference_features = list(ref_kps) if ref_kps else []
|
||||
viz.num_good_matches = len(good_matches) if good_matches else 0
|
||||
viz.match_threshold = self.config.min_good_matches
|
||||
|
||||
# Detect features on target edge
|
||||
target_kps = self._detect_edge_keypoints(current_frame, target_edge)
|
||||
viz.matched_features = target_kps # Show all features on target edge
|
||||
|
||||
def _clear_tracking_visualization(self):
|
||||
"""Clear tracking visualization state"""
|
||||
with self._viz_lock:
|
||||
viz = self.feature_viz
|
||||
viz.is_tracking = False
|
||||
viz.tracking_direction = ''
|
||||
viz.reference_features = []
|
||||
viz.matched_features = []
|
||||
viz.num_good_matches = 0
|
||||
viz.shift_detected = False
|
||||
viz.movement_progress = 0.0
|
||||
viz.feature_trails.clear()
|
||||
|
||||
# =========================================================================
|
||||
# Tile Extraction
|
||||
# =========================================================================
|
||||
|
||||
def extract_tile(self, frame: np.ndarray) -> np.ndarray:
|
||||
"""Extract the center tile (80%) from a frame"""
|
||||
h, w = frame.shape[:2]
|
||||
border_h = int(h * self.config.border_percentage)
|
||||
border_w = int(w * self.config.border_percentage)
|
||||
|
||||
tile = frame[border_h:h-border_h, border_w:w-border_w]
|
||||
return tile.copy()
|
||||
|
||||
def get_edge_region(self, frame: np.ndarray, edge: str) -> np.ndarray:
|
||||
"""
|
||||
Extract a strip from the specified edge for feature tracking.
|
||||
|
||||
Args:
|
||||
frame: Input frame
|
||||
edge: 'left', 'right', 'top', 'bottom'
|
||||
|
||||
Returns:
|
||||
Edge strip image
|
||||
"""
|
||||
h, w = frame.shape[:2]
|
||||
border_h = int(h * self.config.border_percentage)
|
||||
border_w = int(w * self.config.border_percentage)
|
||||
|
||||
if edge == 'left':
|
||||
return frame[:, :border_w].copy()
|
||||
elif edge == 'right':
|
||||
return frame[:, w-border_w:].copy()
|
||||
elif edge == 'top':
|
||||
return frame[:border_h, :].copy()
|
||||
elif edge == 'bottom':
|
||||
return frame[h-border_h:, :].copy()
|
||||
else:
|
||||
raise ValueError(f"Unknown edge: {edge}")
|
||||
|
||||
# =========================================================================
|
||||
# Feature Detection and Matching
|
||||
# =========================================================================
|
||||
|
||||
def detect_features(self, region: np.ndarray) -> Tuple[list, np.ndarray]:
|
||||
"""
|
||||
Detect features in a region.
|
||||
|
||||
Returns:
|
||||
(keypoints, descriptors)
|
||||
"""
|
||||
gray = cv2.cvtColor(region, cv2.COLOR_BGR2GRAY) if len(region.shape) == 3 else region
|
||||
keypoints, descriptors = self.detector.detectAndCompute(gray, None)
|
||||
return keypoints, descriptors
|
||||
|
||||
def match_features(self, desc1: np.ndarray, desc2: np.ndarray) -> List:
|
||||
"""
|
||||
Match features between two descriptor sets.
|
||||
|
||||
Returns:
|
||||
List of good matches
|
||||
"""
|
||||
if desc1 is None or desc2 is None:
|
||||
return []
|
||||
|
||||
if len(desc1) < 2 or len(desc2) < 2:
|
||||
return []
|
||||
|
||||
# KNN match
|
||||
try:
|
||||
matches = self.matcher.knnMatch(desc1, desc2, k=2)
|
||||
except cv2.error:
|
||||
return []
|
||||
|
||||
# Apply ratio test
|
||||
good_matches = []
|
||||
for match_pair in matches:
|
||||
if len(match_pair) == 2:
|
||||
m, n = match_pair
|
||||
if m.distance < self.config.match_threshold * n.distance:
|
||||
good_matches.append(m)
|
||||
|
||||
return good_matches
|
||||
|
||||
def features_have_shifted(self,
|
||||
reference_desc: np.ndarray,
|
||||
current_frame: np.ndarray,
|
||||
from_edge: str,
|
||||
to_edge: str) -> bool:
|
||||
"""
|
||||
Check if features from one edge now appear on the opposite edge.
|
||||
|
||||
Args:
|
||||
reference_desc: Descriptors from the reference edge
|
||||
current_frame: Current camera frame
|
||||
from_edge: Edge where features were originally ('left', 'right')
|
||||
to_edge: Edge to check for features ('right', 'left')
|
||||
|
||||
Returns:
|
||||
True if enough features matched (tile movement complete)
|
||||
"""
|
||||
# Get the target edge region
|
||||
target_region = self.get_edge_region(current_frame, to_edge)
|
||||
|
||||
# Detect features in target region
|
||||
_, target_desc = self.detect_features(target_region)
|
||||
|
||||
# Match with reference
|
||||
good_matches = self.match_features(reference_desc, target_desc)
|
||||
|
||||
return len(good_matches) >= self.config.min_good_matches
|
||||
|
||||
def count_features_on_edge(self, frame: np.ndarray, edge: str) -> int:
|
||||
"""Count features visible on an edge region"""
|
||||
region = self.get_edge_region(frame, edge)
|
||||
kp, desc = self.detect_features(region)
|
||||
return len(kp) if kp else 0
|
||||
|
||||
# =========================================================================
|
||||
# Movement with Visual Feedback
|
||||
# =========================================================================
|
||||
|
||||
def move_until_tile_shifted(self, direction: ScanDirection) -> bool:
|
||||
"""
|
||||
Move in a direction until features indicate we've moved one tile.
|
||||
|
||||
Uses feature tracking to detect when tile content has shifted.
|
||||
Returns False if timeout or no more content detected.
|
||||
|
||||
Args:
|
||||
direction: ScanDirection to move
|
||||
|
||||
Returns:
|
||||
True if tile shift detected, False if timeout/boundary
|
||||
"""
|
||||
# Determine edges based on direction
|
||||
if direction == ScanDirection.RIGHT:
|
||||
from_edge, to_edge = 'right', 'left'
|
||||
move_cmd = lambda: self.motion.start_movement('X')
|
||||
self.motion.axis_direction['X'] = 1
|
||||
axis = 'X'
|
||||
elif direction == ScanDirection.LEFT:
|
||||
from_edge, to_edge = 'left', 'right'
|
||||
move_cmd = lambda: self.motion.start_movement('X')
|
||||
self.motion.axis_direction['X'] = -1
|
||||
axis = 'X'
|
||||
elif direction == ScanDirection.DOWN:
|
||||
from_edge, to_edge = 'bottom', 'top'
|
||||
move_cmd = lambda: self.motion.start_movement('Y')
|
||||
self.motion.axis_direction['Y'] = -1
|
||||
axis = 'Y'
|
||||
elif direction == ScanDirection.UP:
|
||||
from_edge, to_edge = 'top', 'bottom'
|
||||
move_cmd = lambda: self.motion.start_movement('Y')
|
||||
self.motion.axis_direction['Y'] = 1
|
||||
axis = 'Y'
|
||||
else:
|
||||
raise ValueError(f"Unknown direction: {direction}")
|
||||
|
||||
# Capture reference features from the leading edge
|
||||
frame = self.camera.capture_frame()
|
||||
reference_region = self.get_edge_region(frame, from_edge)
|
||||
ref_kp, ref_desc = self.detect_features(reference_region)
|
||||
|
||||
if ref_desc is None or len(ref_desc) < self.config.min_features:
|
||||
self.log(f"Warning: Only {len(ref_kp) if ref_kp else 0} features found on {from_edge} edge")
|
||||
|
||||
# Initialize tracking visualization
|
||||
with self._viz_lock:
|
||||
self.feature_viz.is_tracking = True
|
||||
self.feature_viz.tracking_direction = direction.value
|
||||
self.feature_viz.reference_edge = from_edge
|
||||
self.feature_viz.target_edge = to_edge
|
||||
self.feature_viz.reference_features = list(ref_kp) if ref_kp else []
|
||||
self.feature_viz.shift_detected = False
|
||||
self.feature_viz.movement_progress = 0.0
|
||||
|
||||
# Start movement
|
||||
move_cmd()
|
||||
start_time = time.time()
|
||||
check_count = 0
|
||||
|
||||
try:
|
||||
while self.running and not self.paused:
|
||||
time.sleep(self.config.move_check_interval)
|
||||
check_count += 1
|
||||
|
||||
# Safety timeout
|
||||
elapsed = time.time() - start_time
|
||||
if elapsed > self.config.max_move_time:
|
||||
self.log("Movement timeout - likely at boundary")
|
||||
return False
|
||||
|
||||
# Update progress estimate
|
||||
with self._viz_lock:
|
||||
self.feature_viz.movement_progress = min(1.0, elapsed / (self.config.max_move_time * 0.5))
|
||||
|
||||
# Capture current frame
|
||||
current_frame = self.camera.capture_frame()
|
||||
|
||||
# Update edge features for visualization
|
||||
self._update_edge_features(current_frame)
|
||||
|
||||
# Get target edge features
|
||||
target_region = self.get_edge_region(current_frame, to_edge)
|
||||
target_kp, target_desc = self.detect_features(target_region)
|
||||
|
||||
# Match with reference
|
||||
good_matches = self.match_features(ref_desc, target_desc) if ref_desc is not None else []
|
||||
|
||||
# Update visualization
|
||||
self._update_tracking_visualization(
|
||||
ref_kp, from_edge, to_edge, current_frame, good_matches, direction.value
|
||||
)
|
||||
|
||||
# Check if features have shifted (tile boundary crossed)
|
||||
if ref_desc is not None and len(good_matches) >= self.config.min_good_matches:
|
||||
self.log(f"Tile shift detected via feature matching ({len(good_matches)} matches)")
|
||||
with self._viz_lock:
|
||||
self.feature_viz.shift_detected = True
|
||||
self.feature_viz.movement_progress = 1.0
|
||||
return True
|
||||
|
||||
# Log progress periodically
|
||||
if check_count % 10 == 0:
|
||||
self.log(f" Checking... {len(good_matches)} matches so far")
|
||||
|
||||
finally:
|
||||
# Stop movement
|
||||
self.motion.stop_axis(axis)
|
||||
time.sleep(self.config.settle_time)
|
||||
|
||||
# Clear tracking visualization after a delay
|
||||
# (keep it visible briefly so user can see final state)
|
||||
threading.Timer(1.0, self._clear_tracking_visualization).start()
|
||||
|
||||
return False
|
||||
|
||||
# =========================================================================
|
||||
# Scanning Operations
|
||||
# =========================================================================
|
||||
|
||||
def capture_tile(self) -> Tile:
|
||||
"""Capture a single tile at current position"""
|
||||
frame = self.camera.capture_frame()
|
||||
tile_image = self.extract_tile(frame)
|
||||
|
||||
# Calculate focus score for metadata
|
||||
from vision import calculate_focus_score_sobel
|
||||
focus_score = calculate_focus_score_sobel(frame)
|
||||
|
||||
tile = Tile(
|
||||
image=tile_image,
|
||||
row=self.current_row,
|
||||
col=self.current_col,
|
||||
x_pos=self.current_col,
|
||||
y_pos=self.current_row,
|
||||
focus_score=focus_score,
|
||||
timestamp=time.time()
|
||||
)
|
||||
|
||||
self.tiles.append(tile)
|
||||
self.tiles_captured += 1
|
||||
|
||||
if self.on_tile_captured:
|
||||
self.on_tile_captured(tile)
|
||||
|
||||
self.log(f"Captured tile [{self.current_row}, {self.current_col}] focus={focus_score:.1f}")
|
||||
|
||||
return tile
|
||||
|
||||
def scan_row(self, direction: ScanDirection) -> List[Tile]:
|
||||
"""
|
||||
Scan a complete row in the given direction.
|
||||
|
||||
Args:
|
||||
direction: ScanDirection.LEFT or ScanDirection.RIGHT
|
||||
|
||||
Returns:
|
||||
List of tiles captured in this row
|
||||
"""
|
||||
row_tiles = []
|
||||
tiles_in_row = 0
|
||||
|
||||
self.log(f"Starting row {self.current_row} scan ({direction.value})")
|
||||
|
||||
while self.running and not self.paused:
|
||||
# Safety limit
|
||||
if tiles_in_row >= self.config.max_tiles_per_row:
|
||||
self.log(f"Max tiles per row reached ({self.config.max_tiles_per_row})")
|
||||
break
|
||||
|
||||
# Autofocus check
|
||||
# if (self.autofocus and
|
||||
# self.config.autofocus_every_n_tiles > 0 and
|
||||
# self.tiles_captured > 0 and
|
||||
# self.tiles_captured % self.config.autofocus_every_n_tiles == 0):
|
||||
# self.log("Running autofocus...")
|
||||
# self.autofocus.start()
|
||||
# while self.autofocus.is_running():
|
||||
# time.sleep(0.1)
|
||||
|
||||
# Update edge features before capture
|
||||
frame = self.camera.capture_frame()
|
||||
self._update_edge_features(frame)
|
||||
|
||||
# Capture tile
|
||||
tile = self.capture_tile()
|
||||
row_tiles.append(tile)
|
||||
tiles_in_row += 1
|
||||
|
||||
# Update progress
|
||||
if self.on_progress:
|
||||
self.on_progress(self.tiles_captured, -1)
|
||||
|
||||
# Move to next tile position
|
||||
tile_shifted = self.move_until_tile_shifted(direction)
|
||||
|
||||
if not tile_shifted:
|
||||
# Timeout or boundary = end of row
|
||||
self.log(f"Row {self.current_row} complete, {len(row_tiles)} tiles")
|
||||
break
|
||||
|
||||
# Update column position
|
||||
if direction == ScanDirection.RIGHT:
|
||||
self.current_col += 1
|
||||
else:
|
||||
self.current_col -= 1
|
||||
|
||||
return row_tiles
|
||||
|
||||
def step_to_next_row(self) -> bool:
|
||||
"""
|
||||
Move down to the next row.
|
||||
|
||||
Returns:
|
||||
True if successful, False if bottom edge reached
|
||||
"""
|
||||
# Safety limit
|
||||
if self.current_row >= self.config.max_rows:
|
||||
self.log(f"Max rows reached ({self.config.max_rows})")
|
||||
return False
|
||||
|
||||
self.log("Stepping to next row...")
|
||||
|
||||
# Move down
|
||||
tile_shifted = self.move_until_tile_shifted(ScanDirection.DOWN)
|
||||
|
||||
if tile_shifted:
|
||||
self.current_row += 1
|
||||
|
||||
# Autofocus at start of each row if configured
|
||||
# if self.autofocus and self.config.autofocus_every_row:
|
||||
# self.log("Row start autofocus...")
|
||||
# self.autofocus.start()
|
||||
# while self.autofocus.is_running():
|
||||
# time.sleep(0.1)
|
||||
|
||||
return True
|
||||
else:
|
||||
self.log("Bottom edge reached")
|
||||
return False
|
||||
|
||||
def full_scan(self):
|
||||
"""Execute a complete serpentine scan of the slide"""
|
||||
self.log("Starting full scan")
|
||||
self.running = True
|
||||
self.tiles = []
|
||||
self.current_row = 0
|
||||
self.current_col = 0
|
||||
self.tiles_captured = 0
|
||||
|
||||
# Alternate scan direction each row (serpentine)
|
||||
scan_direction = self.config.start_direction
|
||||
|
||||
try:
|
||||
while self.running:
|
||||
# Handle pause
|
||||
while self.paused and self.running:
|
||||
time.sleep(0.1)
|
||||
|
||||
if not self.running:
|
||||
break
|
||||
|
||||
# Scan current row
|
||||
self.scan_row(scan_direction)
|
||||
|
||||
if not self.running:
|
||||
break
|
||||
|
||||
# Try to move to next row
|
||||
if not self.step_to_next_row():
|
||||
break
|
||||
|
||||
# Reverse direction for next row (serpentine)
|
||||
if scan_direction == ScanDirection.RIGHT:
|
||||
scan_direction = ScanDirection.LEFT
|
||||
else:
|
||||
scan_direction = ScanDirection.RIGHT
|
||||
|
||||
self.log(f"Scan complete! {self.tiles_captured} tiles captured")
|
||||
|
||||
except Exception as e:
|
||||
self.log(f"Scan error: {e}")
|
||||
raise
|
||||
finally:
|
||||
self.running = False
|
||||
self._clear_tracking_visualization()
|
||||
|
||||
# =========================================================================
|
||||
# Control Methods
|
||||
# =========================================================================
|
||||
|
||||
def start(self):
|
||||
"""Start scanning in background thread"""
|
||||
if self.running:
|
||||
self.log("Scan already running")
|
||||
return False
|
||||
|
||||
self._thread = threading.Thread(target=self.full_scan, daemon=True)
|
||||
self._thread.start()
|
||||
return True
|
||||
|
||||
def stop(self):
|
||||
"""Stop scanning"""
|
||||
self.running = False
|
||||
self.motion.stop_all()
|
||||
self._clear_tracking_visualization()
|
||||
self.log("Scan stopped")
|
||||
|
||||
def pause(self):
|
||||
"""Pause scanning"""
|
||||
self.paused = True
|
||||
self.motion.stop_all()
|
||||
self.log("Scan paused")
|
||||
|
||||
def resume(self):
|
||||
"""Resume scanning"""
|
||||
self.paused = False
|
||||
self.log("Scan resumed")
|
||||
|
||||
# =========================================================================
|
||||
# Mosaic Building
|
||||
# =========================================================================
|
||||
|
||||
def build_mosaic(self, scale: float = 0.25) -> np.ndarray:
|
||||
"""
|
||||
Build a mosaic image from captured tiles.
|
||||
|
||||
Args:
|
||||
scale: Scale factor for output (0.25 = 25% size)
|
||||
|
||||
Returns:
|
||||
Mosaic image
|
||||
"""
|
||||
if not self.tiles:
|
||||
return None
|
||||
|
||||
# Find grid dimensions
|
||||
max_row = max(t.row for t in self.tiles)
|
||||
max_col = max(t.col for t in self.tiles)
|
||||
min_col = min(t.col for t in self.tiles)
|
||||
|
||||
# Get tile dimensions (from first tile)
|
||||
tile_h, tile_w = self.tiles[0].image.shape[:2]
|
||||
scaled_h = int(tile_h * scale)
|
||||
scaled_w = int(tile_w * scale)
|
||||
|
||||
# Calculate mosaic size
|
||||
num_rows = max_row + 1
|
||||
num_cols = max_col - min_col + 1
|
||||
|
||||
mosaic = np.zeros((num_rows * scaled_h, num_cols * scaled_w, 3), dtype=np.uint8)
|
||||
|
||||
# Place tiles
|
||||
for tile in self.tiles:
|
||||
row = tile.row
|
||||
col = tile.col - min_col
|
||||
|
||||
scaled_tile = cv2.resize(tile.image, (scaled_w, scaled_h))
|
||||
|
||||
y = row * scaled_h
|
||||
x = col * scaled_w
|
||||
mosaic[y:y+scaled_h, x:x+scaled_w] = scaled_tile
|
||||
|
||||
return mosaic
|
||||
|
||||
def get_mosaic_preview(self, max_size: int = 800) -> np.ndarray:
|
||||
"""
|
||||
Get a preview-sized mosaic.
|
||||
|
||||
Args:
|
||||
max_size: Maximum dimension of output
|
||||
|
||||
Returns:
|
||||
Preview mosaic image
|
||||
"""
|
||||
mosaic = self.build_mosaic(scale=1.0)
|
||||
if mosaic is None:
|
||||
return None
|
||||
|
||||
h, w = mosaic.shape[:2]
|
||||
if max(h, w) > max_size:
|
||||
scale = max_size / max(h, w)
|
||||
mosaic = cv2.resize(mosaic, (int(w * scale), int(h * scale)))
|
||||
|
||||
return mosaic
|
||||
Loading…
Reference in a new issue