Compare commits

..

No commits in common. "master" and "Stitching" have entirely different histories.

4 changed files with 315 additions and 441 deletions

BIN
Mos1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 513 KiB

BIN
Mos2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 194 KiB

View file

@ -88,18 +88,6 @@ class AppGUI:
on_mosaic_updated=self._on_mosaic_updated on_mosaic_updated=self._on_mosaic_updated
) )
# Update initial memory estimate
self._update_memory_estimate()
def _update_memory_estimate(self):
"""Update memory estimate label"""
bytes_per_pixel = 3
mem_mb = (self.stitch_config.max_mosaic_width *
self.stitch_config.max_mosaic_height *
bytes_per_pixel) / (1024 * 1024)
if hasattr(self, 'memory_label'):
self.memory_label.config(text=f"~{mem_mb:.0f} MB")
def _on_command_sent(self, cmd): def _on_command_sent(self, cmd):
self.log_message(f"> {cmd}") self.log_message(f"> {cmd}")
@ -289,54 +277,63 @@ class AppGUI:
command=self._show_mosaic_window).pack(side=tk.RIGHT, padx=2) command=self._show_mosaic_window).pack(side=tk.RIGHT, padx=2)
def _build_row2_stitch_settings(self, parent): def _build_row2_stitch_settings(self, parent):
"""Row 2: Stitching settings with max dimensions""" """Row 2: Stitching settings"""
row = ttk.LabelFrame(parent, text="Stitch Settings") row = ttk.LabelFrame(parent, text="Stitch Settings")
row.pack(fill=tk.X, pady=(0, 3)) row.pack(fill=tk.X, pady=(0, 3))
# First row: Threshold, Speed, Overlap inner = ttk.Frame(row)
inner1 = ttk.Frame(row) inner.pack(fill=tk.X, padx=5, pady=3)
inner1.pack(fill=tk.X, padx=5, pady=3)
# Displacement threshold # Displacement threshold
ttk.Label(inner1, text="Threshold:").pack(side=tk.LEFT) ttk.Label(inner, text="Threshold:").pack(side=tk.LEFT)
self.disp_threshold_var = tk.DoubleVar(value=0.10) self.disp_threshold_var = tk.DoubleVar(value=0.10)
self.disp_threshold_spinbox = ttk.Spinbox( self.disp_threshold_spinbox = ttk.Spinbox(
inner1, from_=0.05, to=0.30, increment=0.01, width=5, inner, from_=0.05, to=0.30, increment=0.01, width=5,
textvariable=self.disp_threshold_var, textvariable=self.disp_threshold_var,
command=self._update_stitch_config command=self._update_stitch_config
) )
self.disp_threshold_spinbox.pack(side=tk.LEFT, padx=(2, 10)) self.disp_threshold_spinbox.pack(side=tk.LEFT, padx=(2, 10))
# Scan speed # Number of rows
ttk.Label(inner1, text="Speed:").pack(side=tk.LEFT) ttk.Label(inner, text="Rows:").pack(side=tk.LEFT)
self.scan_speed_var = tk.IntVar(value=3) self.num_rows_var = tk.IntVar(value=3)
self.scan_speed_spinbox = ttk.Spinbox( self.num_rows_spinbox = ttk.Spinbox(
inner1, from_=1, to=6, width=3, inner, from_=1, to=10, width=3,
textvariable=self.scan_speed_var, textvariable=self.num_rows_var,
command=self._update_stitch_config command=self._update_stitch_config
) )
self.scan_speed_spinbox.pack(side=tk.LEFT, padx=(2, 10)) self.num_rows_spinbox.pack(side=tk.LEFT, padx=(2, 10))
# Row overlap # Row overlap
ttk.Label(inner1, text="Overlap:").pack(side=tk.LEFT) ttk.Label(inner, text="Overlap:").pack(side=tk.LEFT)
self.row_overlap_var = tk.DoubleVar(value=0.15) self.row_overlap_var = tk.DoubleVar(value=0.15)
self.row_overlap_spinbox = ttk.Spinbox( self.row_overlap_spinbox = ttk.Spinbox(
inner1, from_=0.05, to=0.50, increment=0.05, width=5, inner, from_=0.05, to=0.50, increment=0.05, width=5,
textvariable=self.row_overlap_var, textvariable=self.row_overlap_var,
command=self._update_stitch_config command=self._update_stitch_config
) )
self.row_overlap_spinbox.pack(side=tk.LEFT, padx=(2, 10)) self.row_overlap_spinbox.pack(side=tk.LEFT, padx=(2, 10))
# Scan speed
ttk.Label(inner, text="Speed:").pack(side=tk.LEFT)
self.scan_speed_var = tk.IntVar(value=3)
self.scan_speed_spinbox = ttk.Spinbox(
inner, from_=1, to=6, width=3,
textvariable=self.scan_speed_var,
command=self._update_stitch_config
)
self.scan_speed_spinbox.pack(side=tk.LEFT, padx=(2, 10))
# Autofocus toggle # Autofocus toggle
self.af_every_row_var = tk.BooleanVar(value=True) self.af_every_row_var = tk.BooleanVar(value=True)
ttk.Checkbutton( ttk.Checkbutton(
inner1, text="AF each row", inner, text="AF each row",
variable=self.af_every_row_var, variable=self.af_every_row_var,
command=self._update_stitch_config command=self._update_stitch_config
).pack(side=tk.LEFT, padx=(10, 0)) ).pack(side=tk.LEFT, padx=(10, 0))
# Row/Direction status on right # Row/Direction status on right
status_frame = ttk.Frame(inner1) status_frame = ttk.Frame(inner)
status_frame.pack(side=tk.RIGHT) status_frame.pack(side=tk.RIGHT)
ttk.Label(status_frame, text="Row:").pack(side=tk.LEFT) ttk.Label(status_frame, text="Row:").pack(side=tk.LEFT)
@ -347,49 +344,6 @@ class AppGUI:
self.direction_label = ttk.Label(status_frame, text="--", width=6, font=('Arial', 9)) self.direction_label = ttk.Label(status_frame, text="--", width=6, font=('Arial', 9))
self.direction_label.pack(side=tk.LEFT) self.direction_label.pack(side=tk.LEFT)
# Second row: Max dimensions and test buttons
inner2 = ttk.Frame(row)
inner2.pack(fill=tk.X, padx=5, pady=(0, 3))
# Max Width
ttk.Label(inner2, text="Max W:").pack(side=tk.LEFT)
self.max_width_var = tk.IntVar(value=5000)
self.max_width_entry = ttk.Entry(inner2, textvariable=self.max_width_var, width=7)
self.max_width_entry.pack(side=tk.LEFT, padx=(2, 5))
self.max_width_entry.bind('<Return>', lambda e: self._update_stitch_config())
# Max Height
ttk.Label(inner2, text="Max H:").pack(side=tk.LEFT)
self.max_height_var = tk.IntVar(value=5000)
self.max_height_entry = ttk.Entry(inner2, textvariable=self.max_height_var, width=7)
self.max_height_entry.pack(side=tk.LEFT, padx=(2, 5))
self.max_height_entry.bind('<Return>', lambda e: self._update_stitch_config())
# Apply button
ttk.Button(inner2, text="Apply", width=5,
command=self._update_stitch_config).pack(side=tk.LEFT, padx=(5, 10))
ttk.Separator(inner2, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=5)
# Test buttons
ttk.Label(inner2, text="Test:").pack(side=tk.LEFT)
ttk.Button(inner2, text="Row↓", width=5,
command=self._test_row_transition).pack(side=tk.LEFT, padx=2)
ttk.Button(inner2, text="Scan→", width=5,
command=lambda: self._test_horizontal('right')).pack(side=tk.LEFT, padx=2)
ttk.Button(inner2, text="Scan←", width=5,
command=lambda: self._test_horizontal('left')).pack(side=tk.LEFT, padx=2)
ttk.Button(inner2, text="Est", width=4,
command=self._show_scan_estimate).pack(side=tk.LEFT, padx=2)
# Memory estimate label
self.memory_label = ttk.Label(inner2, text="~-- MB", font=('Arial', 9))
self.memory_label.pack(side=tk.RIGHT)
def _build_row3_movement(self, parent): def _build_row3_movement(self, parent):
"""Row 3: Movement controls for all axes""" """Row 3: Movement controls for all axes"""
row = ttk.LabelFrame(parent, text="Movement") row = ttk.LabelFrame(parent, text="Movement")
@ -557,15 +511,11 @@ class AppGUI:
def _update_stitch_config(self): def _update_stitch_config(self):
"""Update stitching scanner config from GUI values""" """Update stitching scanner config from GUI values"""
self.stitch_config.displacement_threshold = self.disp_threshold_var.get() self.stitch_config.displacement_threshold = self.disp_threshold_var.get()
self.stitch_config.max_mosaic_width = self.max_width_var.get() self.stitch_config.rows = self.num_rows_var.get()
self.stitch_config.max_mosaic_height = self.max_height_var.get()
self.stitch_config.row_overlap = self.row_overlap_var.get() self.stitch_config.row_overlap = self.row_overlap_var.get()
self.stitch_config.scan_speed_index = self.scan_speed_var.get() self.stitch_config.scan_speed_index = self.scan_speed_var.get()
self.stitch_config.autofocus_every_row = self.af_every_row_var.get() self.stitch_config.autofocus_every_row = self.af_every_row_var.get()
# Update memory estimate
self._update_memory_estimate()
# Reinitialize stitching scanner with new config # Reinitialize stitching scanner with new config
self.stitch_scanner = StitchingScanner( self.stitch_scanner = StitchingScanner(
camera=self.camera, camera=self.camera,
@ -577,54 +527,6 @@ class AppGUI:
on_mosaic_updated=self._on_mosaic_updated on_mosaic_updated=self._on_mosaic_updated
) )
self.log_message(f"Config updated: {self.stitch_config.max_mosaic_width}x{self.stitch_config.max_mosaic_height}")
def _test_row_transition(self):
"""Test row transition without full scan"""
if not self.stitch_scanner:
self._update_stitch_config()
self.log_message("Testing row transition...")
def run_test():
result = self.stitch_scanner.test_row_transition()
self.root.after(0, lambda: self.log_message(
f"Row transition: {'SUCCESS' if result['success'] else 'FAILED'}, "
f"Y moved: {result['y_moved']:.1f}px"))
self.root.after(0, self._update_mosaic_window)
threading.Thread(target=run_test, daemon=True).start()
def _test_horizontal(self, direction: str):
"""Test single row scan"""
if not self.stitch_scanner:
self._update_stitch_config()
self.log_message(f"Testing single row scan ({direction})...")
def run_test():
result = self.stitch_scanner.test_single_row(direction)
self.root.after(0, lambda: self.log_message(
f"Row scan: {result['stop_reason']}, {result['appends']} appends, "
f"mosaic: {result['mosaic_after'][0]}x{result['mosaic_after'][1]}"))
self.root.after(0, self._update_mosaic_window)
threading.Thread(target=run_test, daemon=True).start()
def _show_scan_estimate(self):
"""Show memory estimate based on current settings"""
if not self.stitch_scanner:
self._update_stitch_config()
est = self.stitch_scanner.get_memory_estimate()
msg = (f"Memory Estimate:\n"
f" Current: {est['current_size'][0]}x{est['current_size'][1]} = {est['current_mb']:.1f} MB\n"
f" Max: {est['max_size'][0]}x{est['max_size'][1]} = {est['max_mb']:.0f} MB")
self.log_message(msg)
self.memory_label.config(text=f"~{est['max_mb']:.0f} MB")
# ========================================================================= # =========================================================================
# Overlay Drawing # Overlay Drawing
# ========================================================================= # =========================================================================
@ -1024,16 +926,10 @@ class AppGUI:
text=f"X: {state.cumulative_x:.1f} Y: {state.cumulative_y:.1f}" text=f"X: {state.cumulative_x:.1f} Y: {state.cumulative_y:.1f}"
) )
# Update row/direction and size # Update row/direction
if state.is_scanning: if state.is_scanning:
# Show current row and progress percentage self.row_label.config(text=f"{state.current_row + 1}/{state.total_rows}")
width_pct = min(100, (state.mosaic_width / self.stitch_config.max_mosaic_width) * 100)
height_pct = min(100, (state.mosaic_height / self.stitch_config.max_mosaic_height) * 100)
self.row_label.config(text=f"R{state.current_row + 1}")
self.direction_label.config(text=state.direction) self.direction_label.config(text=state.direction)
# Update mosaic size label with progress
self.mosaic_size_label.config(
text=f"{state.mosaic_width}x{state.mosaic_height} ({height_pct:.0f}%)")
else: else:
self.row_label.config(text="--") self.row_label.config(text="--")
self.direction_label.config(text="--") self.direction_label.config(text="--")

View file

@ -1,59 +1,84 @@
""" """
Stitching Scanner v2 - Simplified unified approach Stitching Scanner v2 - Fixed displacement tracking
Same displacement-based stitching for both horizontal rows and vertical row transitions. Key fix: Track displacement since last APPEND, not just cumulative.
No complex visual matching - just track displacement and append strips. The strip width must match actual movement since we last added to the mosaic.
""" """
import cv2 import cv2
import numpy as np import numpy as np
import time import time
import threading import threading
from dataclasses import dataclass from dataclasses import dataclass, field
from typing import Optional, Callable, Tuple from typing import List, Optional, Callable, Tuple
from enum import Enum from enum import Enum
class ScanDirection(Enum): class ScanDirection(Enum):
RIGHT = 'right' """Scan direction constants"""
LEFT = 'left' RIGHT = 'right' # X+ (E command)
DOWN = 'down' LEFT = 'left' # X- (W command)
UP = 'up' DOWN = 'down' # Y- (N command)
UP = 'up' # Y+ (S command)
@dataclass @dataclass
class StitchConfig: class StitchConfig:
displacement_threshold: float = 0.10 # 10% of frame triggers append """Stitching scanner configuration"""
movement_interval: float = 0.001 # Displacement threshold (percentage of frame size)
frame_interval: float = 1.00 displacement_threshold: float = 0.10 # 10% of frame dimension
settle_time: float = 0.75
max_scan_time: float = 300.0 # Movement timing
movement_interval: float = 0.001 # Seconds of motor on time
frame_interval: float = 0.25 # Seconds between frame captures (settle time)
settle_time: float = 0.5 # Seconds to wait after stopping
max_scan_time: float = 2400.0 # Safety timeout (5 minutes)
# Scan pattern
rows: int = 3
row_overlap: float = 0.15 row_overlap: float = 0.15
max_mosaic_width: int = 15000
max_mosaic_height: int = 12000 # Speed setting for scanning
scan_speed_index: int = 3 scan_speed_index: int = 3
# Focus
autofocus_every_row: bool = True autofocus_every_row: bool = True
# Memory management
max_mosaic_width: int = 11000
max_mosaic_height: int = 11000
# 11000, 24500, 450000
@dataclass @dataclass
class StitchState: class StitchState:
"""Current state for visualization"""
is_scanning: bool = False is_scanning: bool = False
direction: str = '' direction: str = ''
# Displacement tracking
cumulative_x: float = 0.0 cumulative_x: float = 0.0
cumulative_y: float = 0.0 cumulative_y: float = 0.0
last_displacement: Tuple[float, float] = (0.0, 0.0) last_displacement: Tuple[float, float] = (0.0, 0.0)
# Progress
current_row: int = 0 current_row: int = 0
total_rows: int = 0 total_rows: int = 0
# Mosaic size
mosaic_width: int = 0 mosaic_width: int = 0
mosaic_height: int = 0 mosaic_height: int = 0
# Debug
frame_count: int = 0 frame_count: int = 0
append_count: int = 0 append_count: int = 0
class StitchingScanner: class StitchingScanner:
""" """
Slide scanner using continuous stitching. Slide scanner using continuous stitching with correct displacement tracking.
Unified approach for horizontal and vertical movement.
Key insight: We must track displacement since the LAST APPEND, and the
strip we append must exactly match that displacement.
""" """
def __init__(self, camera, motion_controller, autofocus_controller=None, def __init__(self, camera, motion_controller, autofocus_controller=None,
@ -66,25 +91,32 @@ class StitchingScanner:
self.autofocus = autofocus_controller self.autofocus = autofocus_controller
self.config = config or StitchConfig() self.config = config or StitchConfig()
# Callbacks
self.on_log = on_log self.on_log = on_log
self.on_progress = on_progress self.on_progress = on_progress
self.on_mosaic_updated = on_mosaic_updated self.on_mosaic_updated = on_mosaic_updated
# State
self.running = False self.running = False
self.paused = False self.paused = False
self.state = StitchState() self.state = StitchState()
self._state_lock = threading.Lock() self._state_lock = threading.Lock()
# Mosaic data
self.mosaic: Optional[np.ndarray] = None self.mosaic: Optional[np.ndarray] = None
self._mosaic_lock = threading.Lock() self._mosaic_lock = threading.Lock()
self._prev_frame: Optional[np.ndarray] = None # Frame tracking - KEY CHANGE: separate reference for displacement calc vs append
self._displacement_since_append_x: float = 0.0 self._prev_frame: Optional[np.ndarray] = None # For frame-to-frame displacement
self._append_ref_frame: Optional[np.ndarray] = None # Reference from last append
self._displacement_since_append_x: float = 0.0 # Accumulated since last append
self._displacement_since_append_y: float = 0.0 self._displacement_since_append_y: float = 0.0
# Thread
self._thread: Optional[threading.Thread] = None self._thread: Optional[threading.Thread] = None
def log(self, message: str): def log(self, message: str):
"""Log a message"""
if self.on_log: if self.on_log:
self.on_log(f"[Stitch] {message}") self.on_log(f"[Stitch] {message}")
print(f"[Stitch] {message}") print(f"[Stitch] {message}")
@ -94,12 +126,17 @@ class StitchingScanner:
# ========================================================================= # =========================================================================
def _to_grayscale(self, frame: np.ndarray) -> np.ndarray: def _to_grayscale(self, frame: np.ndarray) -> np.ndarray:
"""Convert frame to grayscale"""
if len(frame.shape) == 3: if len(frame.shape) == 3:
return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
return frame return frame
def _detect_displacement(self, prev_frame: np.ndarray, def _detect_displacement(self, prev_frame: np.ndarray,
curr_frame: np.ndarray) -> Tuple[float, float]: curr_frame: np.ndarray) -> Tuple[float, float]:
"""
Detect displacement between two frames using phase correlation.
Returns (dx, dy) in pixels.
"""
prev_gray = self._to_grayscale(prev_frame) prev_gray = self._to_grayscale(prev_frame)
curr_gray = self._to_grayscale(curr_frame) curr_gray = self._to_grayscale(curr_frame)
@ -109,16 +146,20 @@ class StitchingScanner:
prev_f = prev_gray.astype(np.float32) prev_f = prev_gray.astype(np.float32)
curr_f = curr_gray.astype(np.float32) curr_f = curr_gray.astype(np.float32)
# Apply window function to reduce edge effects
h, w = prev_gray.shape h, w = prev_gray.shape
window = cv2.createHanningWindow((w, h), cv2.CV_32F) window = cv2.createHanningWindow((w, h), cv2.CV_32F)
prev_f = prev_f * window prev_f = prev_f * window
curr_f = curr_f * window curr_f = curr_f * window
shift, _ = cv2.phaseCorrelate(prev_f, curr_f) shift, response = cv2.phaseCorrelate(prev_f, curr_f)
return shift dx, dy = shift
return (dx, dy)
def _detect_displacement_robust(self, prev_frame: np.ndarray, def _detect_displacement_robust(self, prev_frame: np.ndarray,
curr_frame: np.ndarray) -> Tuple[float, float]: curr_frame: np.ndarray) -> Tuple[float, float]:
"""Displacement detection with sanity checks"""
dx, dy = self._detect_displacement(prev_frame, curr_frame) dx, dy = self._detect_displacement(prev_frame, curr_frame)
h, w = prev_frame.shape[:2] h, w = prev_frame.shape[:2]
@ -131,14 +172,17 @@ class StitchingScanner:
return (dx, dy) return (dx, dy)
# ========================================================================= # =========================================================================
# Mosaic Building # Mosaic Building - FIXED VERSION
# ========================================================================= # =========================================================================
def _init_mosaic(self, frame: np.ndarray): def _init_mosaic(self, frame: np.ndarray):
"""Initialize mosaic with first frame"""
with self._mosaic_lock: with self._mosaic_lock:
self.mosaic = frame.copy() self.mosaic = frame.copy()
# Set reference frames
self._prev_frame = frame.copy() self._prev_frame = frame.copy()
self._append_ref_frame = frame.copy()
self._displacement_since_append_x = 0.0 self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0 self._displacement_since_append_y = 0.0
@ -151,17 +195,20 @@ class StitchingScanner:
self.log(f"Initialized mosaic: {frame.shape[1]}x{frame.shape[0]}") self.log(f"Initialized mosaic: {frame.shape[1]}x{frame.shape[0]}")
def _blend_horizontal(self, base: np.ndarray, strip: np.ndarray, def _blend_strips_horizontal(self, base: np.ndarray, strip: np.ndarray,
blend_width: int, append_right: bool) -> np.ndarray: blend_width: int, append_right: bool) -> np.ndarray:
"""Blend strip onto base with gradient at seam to hide discontinuities."""
if blend_width <= 0 or blend_width >= strip.shape[1]: if blend_width <= 0 or blend_width >= strip.shape[1]:
if append_right: if append_right:
return np.hstack([base, strip]) return np.hstack([base, strip])
else:
return np.hstack([strip, base]) return np.hstack([strip, base])
h_base, w_base = base.shape[:2] h_base, w_base = base.shape[:2]
h_strip, w_strip = strip.shape[:2] h_strip, w_strip = strip.shape[:2]
if h_strip != h_base: if h_strip != h_base:
# Height mismatch - can't blend properly
if append_right: if append_right:
return np.hstack([base, strip]) return np.hstack([base, strip])
return np.hstack([strip, base]) return np.hstack([strip, base])
@ -169,80 +216,53 @@ class StitchingScanner:
blend_w = min(blend_width, w_strip, w_base) blend_w = min(blend_width, w_strip, w_base)
if append_right: if append_right:
# base | blend_zone | rest_of_strip
result_width = w_base + w_strip - blend_w result_width = w_base + w_strip - blend_w
result = np.zeros((h_base, result_width, 3), dtype=np.uint8) result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
# Copy base
result[:, :w_base] = base result[:, :w_base] = base
# Create gradient: 1->0 for base weight
alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis] alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
base_overlap = base[:, -blend_w:].astype(np.float32) base_overlap = base[:, -blend_w:].astype(np.float32)
strip_overlap = strip[:, :blend_w].astype(np.float32) strip_overlap = strip[:, :blend_w].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8) blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
result[:, w_base - blend_w:w_base] = blended result[:, w_base - blend_w:w_base] = blended
result[:, w_base:] = strip[:, blend_w:] result[:, w_base:] = strip[:, blend_w:]
return result return result
else: else:
# rest_of_strip | blend_zone | base
result_width = w_base + w_strip - blend_w result_width = w_base + w_strip - blend_w
result = np.zeros((h_base, result_width, 3), dtype=np.uint8) result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
result[:, :w_strip] = strip result[:, :w_strip] = strip
alpha = np.linspace(0, 1, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis] alpha = np.linspace(0, 1, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
strip_overlap = strip[:, -blend_w:].astype(np.float32) strip_overlap = strip[:, -blend_w:].astype(np.float32)
base_overlap = base[:, :blend_w].astype(np.float32) base_overlap = base[:, :blend_w].astype(np.float32)
blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8) blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8)
result[:, w_strip - blend_w:w_strip] = blended result[:, w_strip - blend_w:w_strip] = blended
result[:, w_strip:] = base[:, blend_w:] result[:, w_strip:] = base[:, blend_w:]
return result return result
def _blend_vertical(self, base: np.ndarray, strip: np.ndarray, def _append_to_mosaic_fixed(self, frame: np.ndarray, direction: ScanDirection):
blend_height: int, append_below: bool) -> np.ndarray: """
mh, mw = base.shape[:2] FIXED: Append with blending and fractional pixel preservation.
sh, sw = strip.shape[:2]
# Match widths Key improvements:
if sw > mw: 1. Gradient blending at seams to hide color discontinuities
strip = strip[:, :mw] 2. Preserve fractional pixel remainder to prevent cumulative drift
elif sw < mw: 3. Small safety margin for alignment tolerance
pad = np.zeros((sh, mw - sw, 3), dtype=np.uint8) """
strip = np.hstack([strip, pad]) BLEND_WIDTH = 10 # Pixels to blend at seam
SAFETY_MARGIN = 2 # Extra pixels as tolerance
blend_h = min(blend_height, sh, mh)
if blend_h <= 0:
if append_below:
return np.vstack([base, strip])
return np.vstack([strip, base])
if append_below:
alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
base_overlap = base[-blend_h:].astype(np.float32)
strip_overlap = strip[:blend_h].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
result_h = mh + sh - blend_h
result = np.zeros((result_h, mw, 3), dtype=np.uint8)
result[:mh - blend_h] = base[:-blend_h]
result[mh - blend_h:mh] = blended
result[mh:] = strip[blend_h:]
return result
else:
alpha = np.linspace(0, 1, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
strip_overlap = strip[-blend_h:].astype(np.float32)
base_overlap = base[:blend_h].astype(np.float32)
blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8)
result_h = mh + sh - blend_h
result = np.zeros((result_h, mw, 3), dtype=np.uint8)
result[:sh - blend_h] = strip[:-blend_h]
result[sh - blend_h:sh] = blended
result[sh:] = base[blend_h:]
return result
def _append_strip(self, frame: np.ndarray, direction: ScanDirection):
"""Append strip to mosaic based on accumulated displacement."""
BLEND_WIDTH = 10
SAFETY_MARGIN = 2
with self._mosaic_lock: with self._mosaic_lock:
if self.mosaic is None: if self.mosaic is None:
@ -255,26 +275,30 @@ class StitchingScanner:
dy = abs(self._displacement_since_append_y) dy = abs(self._displacement_since_append_y)
if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]: if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
# Round and add safety margin
append_width = round(dx) + SAFETY_MARGIN append_width = round(dx) + SAFETY_MARGIN
append_width = min(append_width, w - BLEND_WIDTH - 5) append_width = min(append_width, w - BLEND_WIDTH - 5)
if append_width < 1: if append_width < 1:
return return
# Calculate fractional remainder to preserve
pixels_consumed = append_width - SAFETY_MARGIN pixels_consumed = append_width - SAFETY_MARGIN
fractional_remainder = dx - pixels_consumed fractional_remainder = dx - pixels_consumed
if direction == ScanDirection.RIGHT: if direction == ScanDirection.RIGHT:
# Grab strip with extra for blending
strip_start = max(0, w - append_width - BLEND_WIDTH) strip_start = max(0, w - append_width - BLEND_WIDTH)
new_strip = frame[:, strip_start:] new_strip = frame[:, strip_start:]
self.mosaic = self._blend_horizontal( self.mosaic = self._blend_strips_horizontal(
self.mosaic, new_strip, BLEND_WIDTH, append_right=True) self.mosaic, new_strip, BLEND_WIDTH, append_right=True)
else: else:
strip_end = min(w, append_width + BLEND_WIDTH) strip_end = min(w, append_width + BLEND_WIDTH)
new_strip = frame[:, :strip_end] new_strip = frame[:, :strip_end]
self.mosaic = self._blend_horizontal( self.mosaic = self._blend_strips_horizontal(
self.mosaic, new_strip, BLEND_WIDTH, append_right=False) self.mosaic, new_strip, BLEND_WIDTH, append_right=False)
# KEEP fractional remainder instead of resetting to 0!
self._displacement_since_append_x = fractional_remainder self._displacement_since_append_x = fractional_remainder
self._displacement_since_append_y = 0.0 self._displacement_since_append_y = 0.0
@ -289,34 +313,112 @@ class StitchingScanner:
fractional_remainder = dy - pixels_consumed fractional_remainder = dy - pixels_consumed
if direction == ScanDirection.DOWN: if direction == ScanDirection.DOWN:
strip_end = min(h, append_height + BLEND_WIDTH)
new_strip = frame[:strip_end:, :]
self.mosaic = self._blend_vertical(
self.mosaic, new_strip, BLEND_WIDTH, append_below=False)
else:
strip_start = max(0, h - append_height - BLEND_WIDTH) strip_start = max(0, h - append_height - BLEND_WIDTH)
new_strip = frame[:strip_start, :] new_strip = frame[strip_start:, :]
self.mosaic = self._blend_vertical(
self.mosaic, new_strip, BLEND_WIDTH, append_below=True) # Match widths
if new_strip.shape[1] > mw:
new_strip = new_strip[:, :mw]
elif new_strip.shape[1] < mw:
pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8)
new_strip = np.hstack([new_strip, pad])
# Vertical blend
blend_h = min(BLEND_WIDTH, new_strip.shape[0], mh)
alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
base_overlap = self.mosaic[-blend_h:].astype(np.float32)
strip_overlap = new_strip[:blend_h].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
result_h = mh + new_strip.shape[0] - blend_h
result = np.zeros((result_h, mw, 3), dtype=np.uint8)
result[:mh - blend_h] = self.mosaic[:-blend_h]
result[mh - blend_h:mh] = blended
result[mh:] = new_strip[blend_h:]
self.mosaic = result
else:
strip_end = min(h, append_height + BLEND_WIDTH)
new_strip = frame[:strip_end, :]
if new_strip.shape[1] > mw:
new_strip = new_strip[:, :mw]
elif new_strip.shape[1] < mw:
pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8)
new_strip = np.hstack([new_strip, pad])
self.mosaic = np.vstack([new_strip, self.mosaic])
self._displacement_since_append_x = 0.0 self._displacement_since_append_x = 0.0
self._displacement_since_append_y = fractional_remainder self._displacement_since_append_y = fractional_remainder
new_mh, new_mw = self.mosaic.shape[:2] new_mh, new_mw = self.mosaic.shape[:2]
# Update state
with self._state_lock: with self._state_lock:
self.state.mosaic_width = new_mw self.state.mosaic_width = new_mw
self.state.mosaic_height = new_mh self.state.mosaic_height = new_mh
self.state.append_count += 1 self.state.append_count += 1
# Update reference frame (fractional remainder already set above - don't reset!)
self._append_ref_frame = frame.copy()
if self.on_mosaic_updated: if self.on_mosaic_updated:
self.on_mosaic_updated() self.on_mosaic_updated()
def _start_new_row(self, frame: np.ndarray, direction: ScanDirection):
"""Start a new row in the mosaic"""
with self._mosaic_lock:
if self.mosaic is None:
self._init_mosaic(frame)
return
h, w = frame.shape[:2]
mh, mw = self.mosaic.shape[:2]
# Calculate overlap
overlap_pixels = int(h * self.config.row_overlap)
append_height = h - overlap_pixels
if direction == ScanDirection.DOWN:
new_strip = frame[overlap_pixels:, :]
if new_strip.shape[1] < mw:
pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8)
new_strip = np.hstack([new_strip, pad])
elif new_strip.shape[1] > mw:
new_strip = new_strip[:, :mw]
self.mosaic = np.vstack([self.mosaic, new_strip])
else:
new_strip = frame[:append_height, :]
if new_strip.shape[1] < mw:
pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8)
new_strip = np.hstack([new_strip, pad])
elif new_strip.shape[1] > mw:
new_strip = new_strip[:, :mw]
self.mosaic = np.vstack([new_strip, self.mosaic])
# Reset all tracking for new row
self._prev_frame = frame.copy()
self._append_ref_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
with self._state_lock:
self.state.mosaic_height = self.mosaic.shape[0]
self.state.mosaic_width = self.mosaic.shape[1]
self.log(f"New row started, mosaic: {self.mosaic.shape[1]}x{self.mosaic.shape[0]}")
# ========================================================================= # =========================================================================
# Scan Control # Scan Control
# ========================================================================= # =========================================================================
def start(self) -> bool: def start(self) -> bool:
"""Start the stitching scan"""
if self.running: if self.running:
self.log("Already running") self.log("Already running")
return False return False
@ -327,11 +429,13 @@ class StitchingScanner:
with self._state_lock: with self._state_lock:
self.state = StitchState() self.state = StitchState()
self.state.is_scanning = True self.state.is_scanning = True
self.state.total_rows = self.config.rows
with self._mosaic_lock: with self._mosaic_lock:
self.mosaic = None self.mosaic = None
self._prev_frame = None self._prev_frame = None
self._append_ref_frame = None
self._displacement_since_append_x = 0.0 self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0 self._displacement_since_append_y = 0.0
@ -342,6 +446,7 @@ class StitchingScanner:
return True return True
def stop(self): def stop(self):
"""Stop the scan"""
self.running = False self.running = False
self.paused = False self.paused = False
self.motion.stop_all() self.motion.stop_all()
@ -352,24 +457,26 @@ class StitchingScanner:
self.log("Scan stopped") self.log("Scan stopped")
def pause(self): def pause(self):
"""Pause the scan"""
if self.running and not self.paused: if self.running and not self.paused:
self.paused = True self.paused = True
self.motion.stop_all() self.motion.stop_all()
self.log("Scan paused") self.log("Scan paused")
def resume(self): def resume(self):
"""Resume the scan"""
if self.running and self.paused: if self.running and self.paused:
self.paused = False self.paused = False
self.log("Scan resumed") self.log("Scan resumed")
# ========================================================================= # =========================================================================
# Scanning Logic # Main Scan Loop
# ========================================================================= # =========================================================================
def _scan_loop(self): def _scan_loop(self):
"""Main scanning loop"""
try: try:
self.log("Starting scan loop") self.log("Starting scan loop")
self.log(f"Max dimensions: {self.config.max_mosaic_width}x{self.config.max_mosaic_height}")
self.motion.set_speed(self.config.scan_speed_index) self.motion.set_speed(self.config.scan_speed_index)
time.sleep(0.1) time.sleep(0.1)
@ -377,35 +484,30 @@ class StitchingScanner:
frame = self._capture_frame() frame = self._capture_frame()
self._init_mosaic(frame) self._init_mosaic(frame)
row = 0 for row in range(self.config.rows):
while self.running: if not self.running:
break
with self._state_lock: with self._state_lock:
self.state.current_row = row self.state.current_row = row
self.state.total_rows = row + 1
self.log(f"=== Row {row + 1} ===") self.log(f"=== Row {row + 1}/{self.config.rows} ===")
# Serpentine: even rows right, odd rows left # Serpentine pattern
h_direction = ScanDirection.RIGHT if row % 2 == 0 else ScanDirection.LEFT if row % 2 == 0:
h_direction = ScanDirection.RIGHT
else:
h_direction = ScanDirection.LEFT
stop_reason = self._scan_direction(h_direction) self._scan_horizontal(h_direction)
if not self.running: if not self.running:
break break
# Check max height if row < self.config.rows - 1:
if self.state.mosaic_height >= self.config.max_mosaic_height: self._move_to_next_row()
self.log(f"Max height reached ({self.state.mosaic_height}px)")
break
# Move to next row using same stitching approach self.log("Scan complete!")
if not self._move_to_next_row():
self.log("Failed to move to next row")
break
row += 1
self.log(f"Scan complete! Final: {self.state.mosaic_width}x{self.state.mosaic_height}")
except Exception as e: except Exception as e:
self.log(f"Scan error: {e}") self.log(f"Scan error: {e}")
@ -417,8 +519,8 @@ class StitchingScanner:
with self._state_lock: with self._state_lock:
self.state.is_scanning = False self.state.is_scanning = False
def _scan_direction(self, direction: ScanDirection) -> str: def _scan_horizontal(self, direction: ScanDirection):
"""Scan in a direction until edge or max dimension reached.""" """Scan horizontally with fixed displacement tracking"""
self.log(f"Scanning {direction.value}...") self.log(f"Scanning {direction.value}...")
with self._state_lock: with self._state_lock:
@ -426,51 +528,44 @@ class StitchingScanner:
frame = self._capture_frame() frame = self._capture_frame()
h, w = frame.shape[:2] h, w = frame.shape[:2]
# Setup based on direction
if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
threshold_pixels = w * self.config.displacement_threshold threshold_pixels = w * self.config.displacement_threshold
max_dim = self.config.max_mosaic_width
current_dim = lambda: self.state.mosaic_width
start_cmd = 'E' if direction == ScanDirection.RIGHT else 'W'
stop_cmd = 'e' if direction == ScanDirection.RIGHT else 'w'
else:
threshold_pixels = h * self.config.displacement_threshold
max_dim = self.config.max_mosaic_height
current_dim = lambda: self.state.mosaic_height
start_cmd = 'S' if direction == ScanDirection.DOWN else 'N'
stop_cmd = 's' if direction == ScanDirection.DOWN else 'n'
# Initialize tracking
self._prev_frame = frame.copy() self._prev_frame = frame.copy()
self._append_ref_frame = frame.copy()
self._displacement_since_append_x = 0.0 self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0 self._displacement_since_append_y = 0.0
start_time = time.time() start_time = time.time()
no_movement_count = 0 no_movement_count = 0
max_no_movement = 50 max_no_movement = 50
stop_reason = 'stopped'
while self.running and not self.paused: while self.running and not self.paused:
if time.time() - start_time > self.config.max_scan_time: if time.time() - start_time > self.config.max_scan_time:
self.log("Scan timeout") self.log("Scan timeout")
stop_reason = 'timeout'
break break
if current_dim() >= max_dim: # Pulse the motor
self.log(f"Max dimension reached ({current_dim()}px)") if direction == ScanDirection.RIGHT:
stop_reason = 'max_dim' self.motion.send_command('E')
break else:
self.motion.send_command('W')
# Pulse motor
self.motion.send_command(start_cmd)
time.sleep(self.config.movement_interval) time.sleep(self.config.movement_interval)
self.motion.send_command(stop_cmd)
if direction == ScanDirection.RIGHT:
self.motion.send_command('e')
else:
self.motion.send_command('w')
# Wait for settle
time.sleep(self.config.frame_interval) time.sleep(self.config.frame_interval)
# Capture and measure
curr_frame = self._capture_frame() curr_frame = self._capture_frame()
dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame) dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame)
# Accumulate displacement SINCE LAST APPEND
self._displacement_since_append_x += dx self._displacement_since_append_x += dx
self._displacement_since_append_y += dy self._displacement_since_append_y += dy
@ -480,119 +575,78 @@ class StitchingScanner:
self.state.last_displacement = (dx, dy) self.state.last_displacement = (dx, dy)
self.state.frame_count += 1 self.state.frame_count += 1
# Edge detection # Check for no movement
movement = abs(dx) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(dy) if abs(dx) < 1.0 and abs(dy) < 1.0:
if movement < 1.0:
no_movement_count += 1 no_movement_count += 1
if no_movement_count >= max_no_movement: if no_movement_count >= max_no_movement:
self.log(f"Edge detected (no movement)") self.log(f"Edge detected (no movement for {no_movement_count} frames)")
stop_reason = 'edge'
break break
else: else:
no_movement_count = 0 no_movement_count = 0
# Append when threshold reached # Check threshold and append
disp = abs(self._displacement_since_append_x) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(self._displacement_since_append_y) if abs(self._displacement_since_append_x) >= threshold_pixels:
if disp >= threshold_pixels: self._append_to_mosaic_fixed(curr_frame, direction)
self._append_strip(curr_frame, direction) self.log(f"Appended {abs(self._displacement_since_append_x):.1f}px strip, "
self.log(f"Appended {disp:.1f}px, mosaic: {self.state.mosaic_width}x{self.state.mosaic_height}") f"mosaic: {self.state.mosaic_width}x{self.state.mosaic_height}")
# Update prev_frame for next displacement calculation
self._prev_frame = curr_frame.copy() self._prev_frame = curr_frame.copy()
if self.on_progress: if self.on_progress:
self.on_progress(self.state.append_count, 0) self.on_progress(self.state.append_count, 0)
self.motion.send_command(stop_cmd) # Stop
time.sleep(self.config.settle_time) if direction == ScanDirection.RIGHT:
self.log(f"Direction finished: {stop_reason}") self.motion.send_command('e')
return stop_reason else:
self.motion.send_command('w')
def _move_to_next_row(self) -> bool: time.sleep(self.config.settle_time)
"""
Move down to next row using displacement-based stitching. def _move_to_next_row(self):
Same approach as horizontal scanning. """Move down to next row"""
"""
self.log("Moving to next row...") self.log("Moving to next row...")
frame = self._capture_frame() frame = self._capture_frame()
h, w = frame.shape[:2] h, w = frame.shape[:2]
move_distance = h * (1 - self.config.row_overlap)
# Target: move (1 - overlap) * frame_height
target_displacement = h * (1 - self.config.row_overlap)
threshold_pixels = h * self.config.displacement_threshold
self.log(f"Target Y: {target_displacement:.0f}px, threshold: {threshold_pixels:.0f}px")
with self._state_lock: with self._state_lock:
self.state.direction = 'down' self.state.direction = 'down'
self.state.cumulative_y = 0.0
self.motion.send_command('N')
self._prev_frame = frame.copy() self._prev_frame = frame.copy()
self._displacement_since_append_x = 0.0 cumulative_y = 0.0
self._displacement_since_append_y = 0.0
total_y = 0.0
no_movement_count = 0
max_no_movement = 30
# Start moving South
self.motion.send_command('S')
try:
while self.running: while self.running:
time.sleep(self.config.frame_interval) time.sleep(self.config.frame_interval)
curr_frame = self._capture_frame() curr_frame = self._capture_frame()
dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame) dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame)
self._displacement_since_append_y += dy cumulative_y += dy
total_y += dy
with self._state_lock:
self.state.cumulative_y = total_y
self.state.last_displacement = (dx, dy)
# Edge detection
if abs(dy) < 1.0:
no_movement_count += 1
if no_movement_count >= max_no_movement:
self.log("Edge detected during row transition")
self.motion.send_command('s')
time.sleep(self.config.settle_time)
return False
else:
no_movement_count = 0
# Append strip when threshold reached
if abs(self._displacement_since_append_y) >= threshold_pixels:
self._append_strip(curr_frame, ScanDirection.DOWN)
self.log(f" Row transition: appended, total Y: {abs(total_y):.1f}px")
# Done when we've moved enough
if abs(total_y) >= target_displacement:
self.log(f"Row transition complete: {abs(total_y):.1f}px")
self.motion.send_command('s')
time.sleep(self.config.settle_time)
# Reset for next horizontal row
frame = self._capture_frame()
self._prev_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
return True
self._prev_frame = curr_frame.copy() self._prev_frame = curr_frame.copy()
except Exception as e: with self._state_lock:
self.log(f"Row transition error: {e}") self.state.cumulative_y = cumulative_y
self.motion.send_command('s')
return False
self.motion.send_command('s') if abs(cumulative_y) >= move_distance:
break
if abs(cumulative_y) < 5 and self.state.frame_count > 50:
self.log("Warning: Minimal Y movement")
break
self.motion.send_command('n')
time.sleep(self.config.settle_time) time.sleep(self.config.settle_time)
return False
frame = self._capture_frame()
self._start_new_row(frame, ScanDirection.DOWN)
def _capture_frame(self) -> np.ndarray: def _capture_frame(self) -> np.ndarray:
"""Capture and rotate frame"""
frame = self.camera.capture_frame() frame = self.camera.capture_frame()
frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
return frame return frame
@ -602,6 +656,7 @@ class StitchingScanner:
# ========================================================================= # =========================================================================
def get_state(self) -> StitchState: def get_state(self) -> StitchState:
"""Get current scan state"""
with self._state_lock: with self._state_lock:
return StitchState( return StitchState(
is_scanning=self.state.is_scanning, is_scanning=self.state.is_scanning,
@ -618,12 +673,14 @@ class StitchingScanner:
) )
def get_mosaic(self) -> Optional[np.ndarray]: def get_mosaic(self) -> Optional[np.ndarray]:
"""Get current mosaic (full resolution)"""
with self._mosaic_lock: with self._mosaic_lock:
if self.mosaic is not None: if self.mosaic is not None:
return self.mosaic.copy() return self.mosaic.copy()
return None return None
def get_mosaic_preview(self, max_size: int = 600) -> Optional[np.ndarray]: def get_mosaic_preview(self, max_size: int = 600) -> Optional[np.ndarray]:
"""Get scaled mosaic for preview"""
with self._mosaic_lock: with self._mosaic_lock:
if self.mosaic is None: if self.mosaic is None:
return None return None
@ -639,9 +696,11 @@ class StitchingScanner:
return self.mosaic.copy() return self.mosaic.copy()
def save_mosaic(self, filepath: str) -> bool: def save_mosaic(self, filepath: str) -> bool:
"""Save mosaic to file"""
with self._mosaic_lock: with self._mosaic_lock:
if self.mosaic is None: if self.mosaic is None:
return False return False
cv2.imwrite(filepath, self.mosaic) cv2.imwrite(filepath, self.mosaic)
self.log(f"Saved mosaic to {filepath}") self.log(f"Saved mosaic to {filepath}")
return True return True
@ -651,106 +710,25 @@ class StitchingScanner:
# ========================================================================= # =========================================================================
def test_displacement(self, num_frames: int = 10) -> dict: def test_displacement(self, num_frames: int = 10) -> dict:
results = {'frames': [], 'total_dx': 0.0, 'total_dy': 0.0} """Test displacement detection"""
results = {
'frames': [],
'total_dx': 0.0,
'total_dy': 0.0
}
prev_frame = self._capture_frame() prev_frame = self._capture_frame()
for i in range(num_frames): for i in range(num_frames):
time.sleep(0.1) time.sleep(0.1)
curr_frame = self._capture_frame() curr_frame = self._capture_frame()
dx, dy = self._detect_displacement(prev_frame, curr_frame) dx, dy = self._detect_displacement(prev_frame, curr_frame)
results['frames'].append({'frame': i, 'dx': dx, 'dy': dy}) results['frames'].append({'frame': i, 'dx': dx, 'dy': dy})
results['total_dx'] += dx results['total_dx'] += dx
results['total_dy'] += dy results['total_dy'] += dy
prev_frame = curr_frame prev_frame = curr_frame
return results return results
def test_row_transition(self) -> dict:
"""Test row transition using displacement stitching."""
results = {
'success': False,
'y_moved': 0.0,
'mosaic_before': (0, 0),
'mosaic_after': (0, 0),
'error': None
}
try:
self.log("Testing row transition...")
if self.mosaic is None:
frame = self._capture_frame()
self._init_mosaic(frame)
results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height)
with self._state_lock:
self.state.cumulative_y = 0.0
self.running = True
success = self._move_to_next_row()
self.running = False
results['success'] = success
results['y_moved'] = self.state.cumulative_y
results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height)
self.log(f"Row transition: {'SUCCESS' if success else 'FAILED'}, Y: {results['y_moved']:.1f}px")
except Exception as e:
results['error'] = str(e)
self.log(f"Test error: {e}")
self.running = False
return results
def test_single_row(self, direction: str = 'right') -> dict:
"""Test scanning a single row."""
results = {
'success': False,
'stop_reason': None,
'appends': 0,
'mosaic_before': (0, 0),
'mosaic_after': (0, 0),
'error': None
}
try:
self.log(f"Testing single row ({direction})...")
if self.mosaic is None:
frame = self._capture_frame()
self._init_mosaic(frame)
results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height)
appends_before = self.state.append_count
self.motion.set_speed(self.config.scan_speed_index)
time.sleep(0.1)
self.running = True
scan_dir = ScanDirection.RIGHT if direction == 'right' else ScanDirection.LEFT
stop_reason = self._scan_direction(scan_dir)
self.running = False
results['success'] = True
results['stop_reason'] = stop_reason
results['appends'] = self.state.append_count - appends_before
results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height)
except Exception as e:
results['error'] = str(e)
self.running = False
return results
def get_memory_estimate(self) -> dict:
current_bytes = self.mosaic.nbytes if self.mosaic is not None else 0
max_bytes = self.config.max_mosaic_width * self.config.max_mosaic_height * 3
return {
'current_size': (self.state.mosaic_width, self.state.mosaic_height),
'current_mb': current_bytes / (1024 * 1024),
'max_size': (self.config.max_mosaic_width, self.config.max_mosaic_height),
'max_mb': max_bytes / (1024 * 1024),
}