Merge pull request 'Stitching' (#1) from Stitching into master

Reviewed-on: #1
This commit is contained in:
root 2026-01-05 16:02:49 +00:00
commit 7d8d1733c0
8 changed files with 2875 additions and 284 deletions

BIN
Mos1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 513 KiB

BIN
Mos2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 194 KiB

View file

@ -4,16 +4,31 @@ from vision import calculate_focus_score_sobel
class AutofocusController:
"""Manages autofocus operations"""
"""
Manages autofocus operations using visual feedback.
Key design principle: Never trust step counts due to gear slippage.
Use focus score as the authoritative position feedback.
"""
# Default timing settings
DEFAULT_SETTINGS = {
# Sweep settings
'sweep_move_time': 0.5,
'sweep_settle_time': 0.05,
'sweep_settle_time': 0.01,
'sweep_samples': 10,
'sweep_steps': 30,
'fine_move_time': 0.15,
'fine_settle_time': 0.1,
# Navigation settings (fast approach to peak)
'nav_move_time': 0.3, # Quick bursts
'nav_check_samples': 3, # Fewer samples for speed
'nav_peak_threshold': 0.80, # Consider "near peak" at 80% of target
'nav_decline_threshold': 1.00, # Detect peak passage at 8% drop
'nav_max_moves': 100, # Safety limit
# Fine tuning settings
'fine_move_time': 0.30,
'fine_settle_time': 0.01,
'fine_samples': 10,
'fine_max_no_improvement': 5,
}
@ -74,6 +89,11 @@ class AutofocusController:
return sum(scores) / len(scores) if scores else 0
def get_quick_focus(self, samples=3):
"""Quick focus reading with fewer samples for navigation"""
scores = [self.get_focus_score() for _ in range(samples)]
return sum(scores) / len(scores)
# === Autofocus Control ===
def start(self, speed_range=(1, 5), coarse=False, fine=False):
@ -111,31 +131,33 @@ class AutofocusController:
def _autofocus_routine(self, speed_range):
"""
Autofocus using sweep search + hill climbing.
Phase 1: Sweep across range to find approximate peak
Phase 2: Fine tune from best position found
Autofocus using sweep search + visual navigation + hill climbing.
Phase 1: Sweep to map focus curve and find approximate peak
Phase 2: Navigate to peak using visual feedback
Phase 3: Fine tune with hill climbing
"""
self.log(f"Starting autofocus (speed range: {speed_range})")
min_speed_idx, max_speed_idx = speed_range
s = self.settings # Shorthand
s = self.settings
try:
# Phase 1: Sweep search
best_step, best_score = self._sweep_search(
# Phase 1: Sweep search - map the focus curve
best_score, peak_direction = self._sweep_search(
min_speed_idx, max_speed_idx, s
)
if not self.running:
return
# Move to best position found
self._move_to_best_position(best_step, s)
# Phase 2: Navigate to peak
self._navigate_to_peak(best_score, peak_direction, s)
if not self.running:
return
# Phase 2: Fine tuning
# Phase 3: Fine tuning with hill climbing
final_score = self._fine_tune(min_speed_idx, s)
self.log(f"Autofocus complete. Final: {final_score:.1f}")
@ -146,30 +168,35 @@ class AutofocusController:
self.running = False
def _sweep_search(self, min_speed_idx, max_speed_idx, s):
"""Phase 1: Sweep to find approximate best position"""
"""
Phase 1: Sweep to map focus curve.
Returns:
best_score: The highest focus score observed
peak_direction: 'up' or 'down' - which direction has the peak
"""
self.log("Phase 1: Sweep search")
# Use medium speed for sweep
sweep_speed = (min_speed_idx + max_speed_idx) // 2
self.motion.set_speed(sweep_speed)
time.sleep(0.1)
# Record starting position
# Record starting position score
time.sleep(s['sweep_settle_time'])
start_score = self.get_averaged_focus(samples=s['sweep_samples'])
self.update_focus_display(start_score)
sweep_data = [(0, start_score)]
self.log(f"Start position: {start_score:.1f}")
if not self.running:
return 0, start_score
sweep_data = {'start': start_score, 'down': [], 'up': []}
# Sweep DOWN first (away from slide to avoid contact)
if not self.running:
return start_score, 'up'
# Sweep DOWN first (away from slide)
self.log(f"Sweeping down {s['sweep_steps']} steps...")
for i in range(1, s['sweep_steps'] + 1):
if not self.running:
return 0, start_score
return start_score, 'up'
self.motion.move_z_down()
time.sleep(s['sweep_move_time'])
@ -177,35 +204,24 @@ class AutofocusController:
time.sleep(s['sweep_settle_time'])
score = self.get_averaged_focus(samples=s['sweep_samples'])
sweep_data.append((-i, score))
sweep_data['down'].append(score)
self.update_focus_display(score)
if i % 5 == 0:
self.log(f" Step -{i}: {score:.1f}")
self.log(f" Down {i}: {score:.1f}")
# Remember score at bottom
bottom_score = sweep_data['down'][-1] if sweep_data['down'] else start_score
if not self.running:
return 0, start_score
return start_score, 'up'
# Return to start
self.log("Returning to start...")
for _ in range(s['sweep_steps']):
# Now sweep UP past start to the other side
# We'll go UP for 2x sweep_steps (to cover both halves)
self.log(f"Sweeping up {s['sweep_steps'] * 2} steps...")
for i in range(1, s['sweep_steps'] * 2 + 1):
if not self.running:
return 0, start_score
self.motion.move_z_up()
time.sleep(s['sweep_move_time'])
self.motion.stop_z()
time.sleep(0.1)
time.sleep(s['sweep_settle_time'])
if not self.running:
return 0, start_score
# Sweep UP
self.log(f"Sweeping up {s['sweep_steps']} steps...")
for i in range(1, s['sweep_steps'] + 1):
if not self.running:
return 0, start_score
return start_score, 'down'
self.motion.move_z_up()
time.sleep(s['sweep_move_time'])
@ -213,73 +229,145 @@ class AutofocusController:
time.sleep(s['sweep_settle_time'])
score = self.get_averaged_focus(samples=s['sweep_samples'])
sweep_data.append((i, score))
sweep_data['up'].append(score)
self.update_focus_display(score)
if i % 5 == 0:
self.log(f" Step +{i}: {score:.1f}")
self.log(f" Up {i}: {score:.1f}")
# Find best position
best_step, best_score = max(sweep_data, key=lambda x: x[1])
self.log(f"Best found at step {best_step}: {best_score:.1f}")
# Analyze sweep data to find peak
all_scores = sweep_data['down'] + [start_score] + sweep_data['up']
best_score = max(all_scores)
# Log sweep curve
sorted_data = sorted(sweep_data, key=lambda x: x[0])
curve_str = " ".join([f"{d[1]:.0f}" for d in sorted_data])
self.log(f"Sweep curve: {curve_str}")
down_best = max(sweep_data['down']) if sweep_data['down'] else 0
up_best = max(sweep_data['up']) if sweep_data['up'] else 0
return best_step, best_score
# Determine which direction has the peak
# We're currently at the TOP of the sweep (after going up)
# So to reach the peak, we need to go DOWN if peak was in down region or middle
if down_best > up_best and down_best > start_score:
peak_direction = 'down'
self.log(f"Peak in DOWN region: {down_best:.1f}")
elif up_best >= down_best and up_best > start_score:
# Peak is in up region, but we need to check where in the up region
# If peak is in first half of up (returning toward start), go down
# If peak is in second half of up (past start), we might already be close
up_scores = sweep_data['up']
mid_point = len(up_scores) // 2
first_half_best = max(up_scores[:mid_point]) if mid_point > 0 else 0
second_half_best = max(up_scores[mid_point:]) if mid_point < len(up_scores) else 0
def _move_to_best_position(self, best_step, s):
"""Move from current position (+SWEEP_STEPS) to best_step"""
steps_to_move = s['sweep_steps'] - best_step
if steps_to_move > 0:
self.log(f"Moving down {steps_to_move} steps to best position")
move_func = self.motion.move_z_down
if second_half_best >= first_half_best:
# Peak is near current position (top), go down slightly
peak_direction = 'down'
else:
self.log(f"Moving up {abs(steps_to_move)} steps to best position")
move_func = self.motion.move_z_up
steps_to_move = abs(steps_to_move)
# Peak is further down
peak_direction = 'down'
self.log(f"Peak in UP region: {up_best:.1f}")
else:
peak_direction = 'down'
self.log(f"Peak near start: {start_score:.1f}")
for _ in range(steps_to_move):
if not self.running:
return
move_func()
time.sleep(s['sweep_move_time'])
self.motion.stop_z()
# Log sweep curve for debugging
self.log(f"Best score found: {best_score:.1f}")
self.log(f"Navigate direction: {peak_direction}")
return best_score, peak_direction
def _navigate_to_peak(self, target_score, direction, s):
"""
Phase 2: Navigate toward peak using camera
"""
self.log(f"Phase 2: Navigate to peak (target: {target_score:.1f}, dir: {direction})")
move_func = self.motion.move_z_down if direction == 'down' else self.motion.move_z_up
# Use medium-fast speed for approach
self.motion.set_speed(4)
time.sleep(0.1)
time.sleep(s['sweep_settle_time'])
current_score = self.get_averaged_focus(samples=s['sweep_samples'])
self.log(f"At best position: {current_score:.1f}")
peak_threshold = target_score * s['nav_peak_threshold']
decline_threshold = s['nav_decline_threshold']
self.log(f"(peak_threshold: {peak_threshold:.1f}, decline_threshold: {decline_threshold})")
prev_score = self.get_quick_focus(s['nav_check_samples'])
max_observed = prev_score
moves_in_peak_region = 0
declining_streak = 0
for move_count in range(s['nav_max_moves']):
if not self.running:
return
# Quick movement burst
move_func()
time.sleep(s['nav_move_time'])
self.motion.stop_z()
time.sleep(0.05) # Brief settle
# Check focus
current_score = self.get_quick_focus(s['nav_check_samples'])
self.log(f" Current: {current_score:.1f} (max was {max_observed:.1f}) (Declining {declining_streak:.1f}) (Decline Thres {max_observed * decline_threshold:.1f})")
self.log(f" Current: {current_score:.1f} >= peak thresh({peak_threshold:.1f}) ")
self.log(f" Current: {current_score:.1f} <= max_observed * decline_threshold:({max_observed * decline_threshold:.1f}) ")
self.update_focus_display(current_score)
# Track maximum observed
if current_score > max_observed:
max_observed = current_score
declining_streak = 0
self.log("Setting Max")
elif max_observed > 0.8 * target_score:
declining_streak += 1
if declining_streak >=2:
self.log(f" Passed peak after {move_count} moves, stopping")
break
# # Are we in the peak region?
# if current_score >= peak_threshold:
# moves_in_peak_region += 1
# # Check if we're past the peak (score declining from max)
# if current_score < max_observed * decline_threshold:
# declining_streak += 1
# self.log(f" Declining: {current_score:.1f} (max was {max_observed:.1f})")
# if declining_streak >= 1:
# self.log(f" Passed peak after {move_count} moves, stopping")
# break
# else:
# declining_streak = 0
# if moves_in_peak_region % 3 == 0:
# self.log(f" In peak region: {current_score:.1f}")
prev_score = current_score
final_score = self.get_averaged_focus(samples=s['sweep_samples'])
self.log(f"Navigation complete at: {final_score:.1f}")
def _fine_tune(self, min_speed_idx, s):
"""Phase 2: Fine hill-climbing from best position"""
self.log("Phase 2: Fine tuning")
"""
Phase 3: Fine hill-climbing from current position.
Already near the peak from navigation, now do precise adjustments.
"""
self.log("Phase 3: Fine tuning")
self.motion.set_speed(min_speed_idx)
time.sleep(0.1)
best_score = self.get_averaged_focus(samples=s['fine_samples'])
self.log(f"Starting fine tune at: {best_score:.1f}")
# Determine fine direction by testing both
# Try both directions to find improvement
fine_direction = self._determine_fine_direction(best_score, s)
if not self.running:
return best_score
# Fine search
best_score, best_position_offset = self._fine_search(
fine_direction, best_score, s
)
if not self.running:
return best_score
# Return to best position
if best_position_offset > 0:
self._return_to_best(fine_direction, best_position_offset, s)
# Search in best direction
best_score = self._fine_search(fine_direction, best_score, s)
# Final reading
time.sleep(s['fine_settle_time'])
@ -288,9 +376,9 @@ class AutofocusController:
return final_score
def _determine_fine_direction(self, best_score, s):
def _determine_fine_direction(self, current_score, s):
"""Test both directions to find which improves focus"""
# Try DOWN first
# Try DOWN
self.motion.move_z_down()
time.sleep(s['fine_move_time'])
self.motion.stop_z()
@ -298,11 +386,11 @@ class AutofocusController:
down_score = self.get_averaged_focus(samples=s['fine_samples'])
if down_score > best_score:
self.log(f"Fine direction: DOWN ({down_score:.1f})")
if down_score > current_score * 1.02: # 2% improvement threshold
self.log(f"Fine direction: DOWN ({down_score:.1f} > {current_score:.1f})")
return 'down'
# Go back and try UP
# Try UP (go back and past)
self.motion.move_z_up()
time.sleep(s['fine_move_time'])
self.motion.stop_z()
@ -315,25 +403,29 @@ class AutofocusController:
up_score = self.get_averaged_focus(samples=s['fine_samples'])
if up_score > best_score:
self.log(f"Fine direction: UP ({up_score:.1f})")
if up_score > current_score * 1.02:
self.log(f"Fine direction: UP ({up_score:.1f} > {current_score:.1f})")
return 'up'
# Already at peak
self.log("Already at peak, minor adjustment only")
# Already at peak, return to center
self.log("Already at peak")
self.motion.move_z_down()
time.sleep(s['fine_move_time'])
self.motion.stop_z()
time.sleep(s['fine_settle_time'])
return 'up'
return 'up' # Default direction for minor adjustments
def _fine_search(self, direction, best_score, s):
"""Search in given direction until no improvement"""
"""
Search in given direction until no improvement.
Uses visual feedback to track best position.
"""
move_func = self.motion.move_z_up if direction == 'up' else self.motion.move_z_down
reverse_func = self.motion.move_z_down if direction == 'up' else self.motion.move_z_up
no_improvement_count = 0
best_position_offset = 0
moves_since_best = 0
while self.running and no_improvement_count < s['fine_max_no_improvement']:
move_func()
@ -347,21 +439,25 @@ class AutofocusController:
if current_score > best_score:
best_score = current_score
no_improvement_count = 0
best_position_offset = 0
self.log(f"Fine better: {current_score:.1f}")
moves_since_best = 0
self.log(f"Fine improved: {current_score:.1f}")
else:
no_improvement_count += 1
best_position_offset += 1
moves_since_best += 1
return best_score, best_position_offset
def _return_to_best(self, direction, steps, s):
"""Return to best position by reversing direction"""
self.log(f"Returning {steps} steps")
reverse_func = self.motion.move_z_down if direction == 'up' else self.motion.move_z_up
for _ in range(steps):
# Go back to best position using visual feedback
if moves_since_best > 0:
self.log(f"Reversing {moves_since_best} moves toward best")
for _ in range(moves_since_best):
reverse_func()
time.sleep(s['fine_move_time'])
self.motion.stop_z()
time.sleep(0.1)
# Check if we're back at peak
check_score = self.get_quick_focus(3)
if check_score >= best_score * 0.98:
break
time.sleep(0.05)
return best_score

View file

@ -1,22 +1,183 @@
import cv2
import subprocess
import re
def detect_camera_modes(device_id=0):
"""
Detect available camera resolutions using v4l2-ctl.
Returns dict of modes sorted by resolution (smallest to largest).
"""
modes = {}
try:
# Run v4l2-ctl to get supported formats
device = f"/dev/video{device_id}"
result = subprocess.run(
['v4l2-ctl', '--device', device, '--list-formats-ext'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode != 0:
print(f"v4l2-ctl failed: {result.stderr}")
return _get_fallback_modes()
# Parse output for "Size: Discrete WxH" lines
# Example: "Size: Discrete 2592x1944"
size_pattern = re.compile(r'Size:\s+Discrete\s+(\d+)x(\d+)')
resolutions = set() # Use set to avoid duplicates
for line in result.stdout.split('\n'):
match = size_pattern.search(line)
if match:
width = int(match.group(1))
height = int(match.group(2))
resolutions.add((width, height))
if not resolutions:
print("No resolutions found in v4l2-ctl output")
return _get_fallback_modes()
# Sort by total pixels (width * height)
sorted_res = sorted(resolutions, key=lambda r: r[0] * r[1])
# Build modes dict with descriptive names
for i, (width, height) in enumerate(sorted_res):
pixels = width * height
# Generate a name based on position/size
if i == 0:
name = 'low'
desc = 'Low'
elif i == len(sorted_res) - 1:
name = 'high'
desc = 'High'
elif len(sorted_res) == 3 and i == 1:
name = 'medium'
desc = 'Medium'
else:
name = f'res_{width}x{height}'
desc = f'{width}x{height}'
modes[name] = {
'width': width,
'height': height,
'label': f'{width}x{height} ({desc})'
}
print(f"Detected {len(modes)} camera modes: {list(modes.keys())}")
return modes
except FileNotFoundError:
print("v4l2-ctl not found, using fallback modes")
return _get_fallback_modes()
except subprocess.TimeoutExpired:
print("v4l2-ctl timed out, using fallback modes")
return _get_fallback_modes()
except Exception as e:
print(f"Error detecting camera modes: {e}")
return _get_fallback_modes()
def _get_fallback_modes():
"""Fallback modes if v4l2-ctl detection fails"""
return {
'low': {'width': 640, 'height': 480, 'label': '640x480 (Low)'},
'medium': {'width': 1280, 'height': 960, 'label': '1280x960 (Medium)'},
'high': {'width': 1920, 'height': 1080, 'label': '1920x1080 (High)'},
}
class Camera:
def __init__(self, device_id=0):
prevFrame = {}
def __init__(self, device_id=0, mode=None):
self.device_id = device_id
# Detect available modes before opening camera
self.MODES = detect_camera_modes(device_id)
# Open camera
self.cap = cv2.VideoCapture(device_id)
if not self.cap.isOpened():
raise RuntimeError("Could not open camera, stop program")
# Default to highest resolution if no mode specified
if mode is None:
mode = list(self.MODES.keys())[0] # Last = highest res
self.current_mode = mode
self._apply_mode(mode)
# set resolution
# self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
# self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
self.cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0) # Disable auto-exposure
self.cap.set(cv2.CAP_PROP_EXPOSURE, -6) # Set fixed exposure
self.cap.set(cv2.CAP_PROP_AUTO_WB, 0) # Disable auto white balance
self.window_name = "AutoScope"
def _apply_mode(self, mode_name):
"""Apply resolution settings"""
if mode_name not in self.MODES:
print(f"Unknown mode {mode_name}, using first available")
mode_name = list(self.MODES.keys())[0]
mode = self.MODES[mode_name]
# Set resolution
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, mode['width'])
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, mode['height'])
self.current_mode = mode_name
# Verify settings took effect
actual_w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
print(f"Camera mode: {mode['label']}")
print(f" Actual: {actual_w}x{actual_h} @ {actual_fps:.1f}fps")
return actual_w, actual_h, actual_fps
def set_mode(self, mode_name):
"""Change camera mode (resolution/framerate)"""
return self._apply_mode(mode_name)
def get_mode(self):
"""Get current mode name"""
return self.current_mode
def get_mode_info(self):
"""Get current mode details"""
return self.MODES.get(self.current_mode, list(self.MODES.values())[0])
def get_resolution(self):
"""Get current actual resolution"""
w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
return w, h
def get_fps(self):
"""Get current actual FPS"""
return self.cap.get(cv2.CAP_PROP_FPS)
def get_available_modes(self):
"""Get list of available mode names"""
return list(self.MODES.keys())
def get_mode_labels(self):
"""Get mode labels for UI"""
return {k: v['label'] for k, v in self.MODES.items()}
def capture_frame(self):
ret, frame = self.cap.read()
if not ret:
raise RuntimeError("Failed to capture frame, stop program")
return prevframe
prevframe = frame
return frame
def show_frame(self, frame):
@ -69,4 +230,3 @@ class Camera:
break
self.close_window()

File diff suppressed because it is too large Load diff

View file

@ -2,12 +2,12 @@ class MotionController:
# Command mapping for each axis
AXIS_COMMANDS = {
'X': {'pos': 'E', 'neg': 'W', 'stop': 'e'},
'Y': {'pos': 'N', 'neg': 'S', 'stop': 'n'},
'Y': {'pos': 'S', 'neg': 'N', 'stop': 'n'},
'Z': {'pos': 'U', 'neg': 'D', 'stop': 'u'}
}
# Speed values matching Arduino speedArr
SPEED_VALUES = [0, 2, 5, 10, 30, 50, 70]
SPEED_VALUES = [0, 1, 2, 5, 10, 30, 50, 70]
def __init__(self, arduino, on_command_sent=None):
"""

File diff suppressed because it is too large Load diff

734
src/stitching_scanner.py Normal file
View file

@ -0,0 +1,734 @@
"""
Stitching Scanner v2 - Fixed displacement tracking
Key fix: Track displacement since last APPEND, not just cumulative.
The strip width must match actual movement since we last added to the mosaic.
"""
import cv2
import numpy as np
import time
import threading
from dataclasses import dataclass, field
from typing import List, Optional, Callable, Tuple
from enum import Enum
class ScanDirection(Enum):
"""Scan direction constants"""
RIGHT = 'right' # X+ (E command)
LEFT = 'left' # X- (W command)
DOWN = 'down' # Y- (N command)
UP = 'up' # Y+ (S command)
@dataclass
class StitchConfig:
"""Stitching scanner configuration"""
# Displacement threshold (percentage of frame size)
displacement_threshold: float = 0.10 # 10% of frame dimension
# Movement timing
movement_interval: float = 0.001 # Seconds of motor on time
frame_interval: float = 0.25 # Seconds between frame captures (settle time)
settle_time: float = 0.5 # Seconds to wait after stopping
max_scan_time: float = 2400.0 # Safety timeout (5 minutes)
# Scan pattern
rows: int = 3
row_overlap: float = 0.15
# Speed setting for scanning
scan_speed_index: int = 3
# Focus
autofocus_every_row: bool = True
# Memory management
max_mosaic_width: int = 11000
max_mosaic_height: int = 11000
# 11000, 24500, 450000
@dataclass
class StitchState:
"""Current state for visualization"""
is_scanning: bool = False
direction: str = ''
# Displacement tracking
cumulative_x: float = 0.0
cumulative_y: float = 0.0
last_displacement: Tuple[float, float] = (0.0, 0.0)
# Progress
current_row: int = 0
total_rows: int = 0
# Mosaic size
mosaic_width: int = 0
mosaic_height: int = 0
# Debug
frame_count: int = 0
append_count: int = 0
class StitchingScanner:
"""
Slide scanner using continuous stitching with correct displacement tracking.
Key insight: We must track displacement since the LAST APPEND, and the
strip we append must exactly match that displacement.
"""
def __init__(self, camera, motion_controller, autofocus_controller=None,
config: StitchConfig = None,
on_log: Callable[[str], None] = None,
on_progress: Callable[[int, int], None] = None,
on_mosaic_updated: Callable[[], None] = None):
self.camera = camera
self.motion = motion_controller
self.autofocus = autofocus_controller
self.config = config or StitchConfig()
# Callbacks
self.on_log = on_log
self.on_progress = on_progress
self.on_mosaic_updated = on_mosaic_updated
# State
self.running = False
self.paused = False
self.state = StitchState()
self._state_lock = threading.Lock()
# Mosaic data
self.mosaic: Optional[np.ndarray] = None
self._mosaic_lock = threading.Lock()
# Frame tracking - KEY CHANGE: separate reference for displacement calc vs append
self._prev_frame: Optional[np.ndarray] = None # For frame-to-frame displacement
self._append_ref_frame: Optional[np.ndarray] = None # Reference from last append
self._displacement_since_append_x: float = 0.0 # Accumulated since last append
self._displacement_since_append_y: float = 0.0
# Thread
self._thread: Optional[threading.Thread] = None
def log(self, message: str):
"""Log a message"""
if self.on_log:
self.on_log(f"[Stitch] {message}")
print(f"[Stitch] {message}")
# =========================================================================
# Displacement Detection
# =========================================================================
def _to_grayscale(self, frame: np.ndarray) -> np.ndarray:
"""Convert frame to grayscale"""
if len(frame.shape) == 3:
return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
return frame
def _detect_displacement(self, prev_frame: np.ndarray,
curr_frame: np.ndarray) -> Tuple[float, float]:
"""
Detect displacement between two frames using phase correlation.
Returns (dx, dy) in pixels.
"""
prev_gray = self._to_grayscale(prev_frame)
curr_gray = self._to_grayscale(curr_frame)
if prev_gray.shape != curr_gray.shape:
return (0.0, 0.0)
prev_f = prev_gray.astype(np.float32)
curr_f = curr_gray.astype(np.float32)
# Apply window function to reduce edge effects
h, w = prev_gray.shape
window = cv2.createHanningWindow((w, h), cv2.CV_32F)
prev_f = prev_f * window
curr_f = curr_f * window
shift, response = cv2.phaseCorrelate(prev_f, curr_f)
dx, dy = shift
return (dx, dy)
def _detect_displacement_robust(self, prev_frame: np.ndarray,
curr_frame: np.ndarray) -> Tuple[float, float]:
"""Displacement detection with sanity checks"""
dx, dy = self._detect_displacement(prev_frame, curr_frame)
h, w = prev_frame.shape[:2]
max_displacement = max(w, h) * 0.5
if abs(dx) > max_displacement or abs(dy) > max_displacement:
self.log(f"Warning: Large displacement ({dx:.1f}, {dy:.1f}), ignoring")
return (0.0, 0.0)
return (dx, dy)
# =========================================================================
# Mosaic Building - FIXED VERSION
# =========================================================================
def _init_mosaic(self, frame: np.ndarray):
"""Initialize mosaic with first frame"""
with self._mosaic_lock:
self.mosaic = frame.copy()
# Set reference frames
self._prev_frame = frame.copy()
self._append_ref_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
with self._state_lock:
h, w = frame.shape[:2]
self.state.mosaic_width = w
self.state.mosaic_height = h
self.state.frame_count = 1
self.state.append_count = 0
self.log(f"Initialized mosaic: {frame.shape[1]}x{frame.shape[0]}")
def _blend_strips_horizontal(self, base: np.ndarray, strip: np.ndarray,
blend_width: int, append_right: bool) -> np.ndarray:
"""Blend strip onto base with gradient at seam to hide discontinuities."""
if blend_width <= 0 or blend_width >= strip.shape[1]:
if append_right:
return np.hstack([base, strip])
else:
return np.hstack([strip, base])
h_base, w_base = base.shape[:2]
h_strip, w_strip = strip.shape[:2]
if h_strip != h_base:
# Height mismatch - can't blend properly
if append_right:
return np.hstack([base, strip])
return np.hstack([strip, base])
blend_w = min(blend_width, w_strip, w_base)
if append_right:
# base | blend_zone | rest_of_strip
result_width = w_base + w_strip - blend_w
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
# Copy base
result[:, :w_base] = base
# Create gradient: 1->0 for base weight
alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
base_overlap = base[:, -blend_w:].astype(np.float32)
strip_overlap = strip[:, :blend_w].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
result[:, w_base - blend_w:w_base] = blended
result[:, w_base:] = strip[:, blend_w:]
return result
else:
# rest_of_strip | blend_zone | base
result_width = w_base + w_strip - blend_w
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
result[:, :w_strip] = strip
alpha = np.linspace(0, 1, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
strip_overlap = strip[:, -blend_w:].astype(np.float32)
base_overlap = base[:, :blend_w].astype(np.float32)
blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8)
result[:, w_strip - blend_w:w_strip] = blended
result[:, w_strip:] = base[:, blend_w:]
return result
def _append_to_mosaic_fixed(self, frame: np.ndarray, direction: ScanDirection):
"""
FIXED: Append with blending and fractional pixel preservation.
Key improvements:
1. Gradient blending at seams to hide color discontinuities
2. Preserve fractional pixel remainder to prevent cumulative drift
3. Small safety margin for alignment tolerance
"""
BLEND_WIDTH = 10 # Pixels to blend at seam
SAFETY_MARGIN = 2 # Extra pixels as tolerance
with self._mosaic_lock:
if self.mosaic is None:
return
h, w = frame.shape[:2]
mh, mw = self.mosaic.shape[:2]
dx = abs(self._displacement_since_append_x)
dy = abs(self._displacement_since_append_y)
if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
# Round and add safety margin
append_width = round(dx) + SAFETY_MARGIN
append_width = min(append_width, w - BLEND_WIDTH - 5)
if append_width < 1:
return
# Calculate fractional remainder to preserve
pixels_consumed = append_width - SAFETY_MARGIN
fractional_remainder = dx - pixels_consumed
if direction == ScanDirection.RIGHT:
# Grab strip with extra for blending
strip_start = max(0, w - append_width - BLEND_WIDTH)
new_strip = frame[:, strip_start:]
self.mosaic = self._blend_strips_horizontal(
self.mosaic, new_strip, BLEND_WIDTH, append_right=True)
else:
strip_end = min(w, append_width + BLEND_WIDTH)
new_strip = frame[:, :strip_end]
self.mosaic = self._blend_strips_horizontal(
self.mosaic, new_strip, BLEND_WIDTH, append_right=False)
# KEEP fractional remainder instead of resetting to 0!
self._displacement_since_append_x = fractional_remainder
self._displacement_since_append_y = 0.0
elif direction in [ScanDirection.DOWN, ScanDirection.UP]:
append_height = round(dy) + SAFETY_MARGIN
append_height = min(append_height, h - BLEND_WIDTH - 5)
if append_height < 1:
return
pixels_consumed = append_height - SAFETY_MARGIN
fractional_remainder = dy - pixels_consumed
if direction == ScanDirection.DOWN:
strip_start = max(0, h - append_height - BLEND_WIDTH)
new_strip = frame[strip_start:, :]
# Match widths
if new_strip.shape[1] > mw:
new_strip = new_strip[:, :mw]
elif new_strip.shape[1] < mw:
pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8)
new_strip = np.hstack([new_strip, pad])
# Vertical blend
blend_h = min(BLEND_WIDTH, new_strip.shape[0], mh)
alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
base_overlap = self.mosaic[-blend_h:].astype(np.float32)
strip_overlap = new_strip[:blend_h].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
result_h = mh + new_strip.shape[0] - blend_h
result = np.zeros((result_h, mw, 3), dtype=np.uint8)
result[:mh - blend_h] = self.mosaic[:-blend_h]
result[mh - blend_h:mh] = blended
result[mh:] = new_strip[blend_h:]
self.mosaic = result
else:
strip_end = min(h, append_height + BLEND_WIDTH)
new_strip = frame[:strip_end, :]
if new_strip.shape[1] > mw:
new_strip = new_strip[:, :mw]
elif new_strip.shape[1] < mw:
pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8)
new_strip = np.hstack([new_strip, pad])
self.mosaic = np.vstack([new_strip, self.mosaic])
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = fractional_remainder
new_mh, new_mw = self.mosaic.shape[:2]
# Update state
with self._state_lock:
self.state.mosaic_width = new_mw
self.state.mosaic_height = new_mh
self.state.append_count += 1
# Update reference frame (fractional remainder already set above - don't reset!)
self._append_ref_frame = frame.copy()
if self.on_mosaic_updated:
self.on_mosaic_updated()
def _start_new_row(self, frame: np.ndarray, direction: ScanDirection):
"""Start a new row in the mosaic"""
with self._mosaic_lock:
if self.mosaic is None:
self._init_mosaic(frame)
return
h, w = frame.shape[:2]
mh, mw = self.mosaic.shape[:2]
# Calculate overlap
overlap_pixels = int(h * self.config.row_overlap)
append_height = h - overlap_pixels
if direction == ScanDirection.DOWN:
new_strip = frame[overlap_pixels:, :]
if new_strip.shape[1] < mw:
pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8)
new_strip = np.hstack([new_strip, pad])
elif new_strip.shape[1] > mw:
new_strip = new_strip[:, :mw]
self.mosaic = np.vstack([self.mosaic, new_strip])
else:
new_strip = frame[:append_height, :]
if new_strip.shape[1] < mw:
pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8)
new_strip = np.hstack([new_strip, pad])
elif new_strip.shape[1] > mw:
new_strip = new_strip[:, :mw]
self.mosaic = np.vstack([new_strip, self.mosaic])
# Reset all tracking for new row
self._prev_frame = frame.copy()
self._append_ref_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
with self._state_lock:
self.state.mosaic_height = self.mosaic.shape[0]
self.state.mosaic_width = self.mosaic.shape[1]
self.log(f"New row started, mosaic: {self.mosaic.shape[1]}x{self.mosaic.shape[0]}")
# =========================================================================
# Scan Control
# =========================================================================
def start(self) -> bool:
"""Start the stitching scan"""
if self.running:
self.log("Already running")
return False
self.running = True
self.paused = False
with self._state_lock:
self.state = StitchState()
self.state.is_scanning = True
self.state.total_rows = self.config.rows
with self._mosaic_lock:
self.mosaic = None
self._prev_frame = None
self._append_ref_frame = None
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
self._thread = threading.Thread(target=self._scan_loop, daemon=True)
self._thread.start()
self.log("Stitching scan started")
return True
def stop(self):
"""Stop the scan"""
self.running = False
self.paused = False
self.motion.stop_all()
with self._state_lock:
self.state.is_scanning = False
self.log("Scan stopped")
def pause(self):
"""Pause the scan"""
if self.running and not self.paused:
self.paused = True
self.motion.stop_all()
self.log("Scan paused")
def resume(self):
"""Resume the scan"""
if self.running and self.paused:
self.paused = False
self.log("Scan resumed")
# =========================================================================
# Main Scan Loop
# =========================================================================
def _scan_loop(self):
"""Main scanning loop"""
try:
self.log("Starting scan loop")
self.motion.set_speed(self.config.scan_speed_index)
time.sleep(0.1)
frame = self._capture_frame()
self._init_mosaic(frame)
for row in range(self.config.rows):
if not self.running:
break
with self._state_lock:
self.state.current_row = row
self.log(f"=== Row {row + 1}/{self.config.rows} ===")
# Serpentine pattern
if row % 2 == 0:
h_direction = ScanDirection.RIGHT
else:
h_direction = ScanDirection.LEFT
self._scan_horizontal(h_direction)
if not self.running:
break
if row < self.config.rows - 1:
self._move_to_next_row()
self.log("Scan complete!")
except Exception as e:
self.log(f"Scan error: {e}")
import traceback
traceback.print_exc()
finally:
self.running = False
self.motion.stop_all()
with self._state_lock:
self.state.is_scanning = False
def _scan_horizontal(self, direction: ScanDirection):
"""Scan horizontally with fixed displacement tracking"""
self.log(f"Scanning {direction.value}...")
with self._state_lock:
self.state.direction = direction.value
frame = self._capture_frame()
h, w = frame.shape[:2]
threshold_pixels = w * self.config.displacement_threshold
# Initialize tracking
self._prev_frame = frame.copy()
self._append_ref_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
start_time = time.time()
no_movement_count = 0
max_no_movement = 50
while self.running and not self.paused:
if time.time() - start_time > self.config.max_scan_time:
self.log("Scan timeout")
break
# Pulse the motor
if direction == ScanDirection.RIGHT:
self.motion.send_command('E')
else:
self.motion.send_command('W')
time.sleep(self.config.movement_interval)
if direction == ScanDirection.RIGHT:
self.motion.send_command('e')
else:
self.motion.send_command('w')
# Wait for settle
time.sleep(self.config.frame_interval)
# Capture and measure
curr_frame = self._capture_frame()
dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame)
# Accumulate displacement SINCE LAST APPEND
self._displacement_since_append_x += dx
self._displacement_since_append_y += dy
with self._state_lock:
self.state.cumulative_x = self._displacement_since_append_x
self.state.cumulative_y = self._displacement_since_append_y
self.state.last_displacement = (dx, dy)
self.state.frame_count += 1
# Check for no movement
if abs(dx) < 1.0 and abs(dy) < 1.0:
no_movement_count += 1
if no_movement_count >= max_no_movement:
self.log(f"Edge detected (no movement for {no_movement_count} frames)")
break
else:
no_movement_count = 0
# Check threshold and append
if abs(self._displacement_since_append_x) >= threshold_pixels:
self._append_to_mosaic_fixed(curr_frame, direction)
self.log(f"Appended {abs(self._displacement_since_append_x):.1f}px strip, "
f"mosaic: {self.state.mosaic_width}x{self.state.mosaic_height}")
# Update prev_frame for next displacement calculation
self._prev_frame = curr_frame.copy()
if self.on_progress:
self.on_progress(self.state.append_count, 0)
# Stop
if direction == ScanDirection.RIGHT:
self.motion.send_command('e')
else:
self.motion.send_command('w')
time.sleep(self.config.settle_time)
def _move_to_next_row(self):
"""Move down to next row"""
self.log("Moving to next row...")
frame = self._capture_frame()
h, w = frame.shape[:2]
move_distance = h * (1 - self.config.row_overlap)
with self._state_lock:
self.state.direction = 'down'
self.motion.send_command('N')
self._prev_frame = frame.copy()
cumulative_y = 0.0
while self.running:
time.sleep(self.config.frame_interval)
curr_frame = self._capture_frame()
dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame)
cumulative_y += dy
self._prev_frame = curr_frame.copy()
with self._state_lock:
self.state.cumulative_y = cumulative_y
if abs(cumulative_y) >= move_distance:
break
if abs(cumulative_y) < 5 and self.state.frame_count > 50:
self.log("Warning: Minimal Y movement")
break
self.motion.send_command('n')
time.sleep(self.config.settle_time)
frame = self._capture_frame()
self._start_new_row(frame, ScanDirection.DOWN)
def _capture_frame(self) -> np.ndarray:
"""Capture and rotate frame"""
frame = self.camera.capture_frame()
frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
return frame
# =========================================================================
# Getters
# =========================================================================
def get_state(self) -> StitchState:
"""Get current scan state"""
with self._state_lock:
return StitchState(
is_scanning=self.state.is_scanning,
direction=self.state.direction,
cumulative_x=self.state.cumulative_x,
cumulative_y=self.state.cumulative_y,
last_displacement=self.state.last_displacement,
current_row=self.state.current_row,
total_rows=self.state.total_rows,
mosaic_width=self.state.mosaic_width,
mosaic_height=self.state.mosaic_height,
frame_count=self.state.frame_count,
append_count=self.state.append_count
)
def get_mosaic(self) -> Optional[np.ndarray]:
"""Get current mosaic (full resolution)"""
with self._mosaic_lock:
if self.mosaic is not None:
return self.mosaic.copy()
return None
def get_mosaic_preview(self, max_size: int = 600) -> Optional[np.ndarray]:
"""Get scaled mosaic for preview"""
with self._mosaic_lock:
if self.mosaic is None:
return None
h, w = self.mosaic.shape[:2]
scale = min(max_size / w, max_size / h, 1.0)
if scale < 1.0:
new_w = int(w * scale)
new_h = int(h * scale)
return cv2.resize(self.mosaic, (new_w, new_h))
return self.mosaic.copy()
def save_mosaic(self, filepath: str) -> bool:
"""Save mosaic to file"""
with self._mosaic_lock:
if self.mosaic is None:
return False
cv2.imwrite(filepath, self.mosaic)
self.log(f"Saved mosaic to {filepath}")
return True
# =========================================================================
# Testing
# =========================================================================
def test_displacement(self, num_frames: int = 10) -> dict:
"""Test displacement detection"""
results = {
'frames': [],
'total_dx': 0.0,
'total_dy': 0.0
}
prev_frame = self._capture_frame()
for i in range(num_frames):
time.sleep(0.1)
curr_frame = self._capture_frame()
dx, dy = self._detect_displacement(prev_frame, curr_frame)
results['frames'].append({'frame': i, 'dx': dx, 'dy': dy})
results['total_dx'] += dx
results['total_dy'] += dy
prev_frame = curr_frame
return results