Autoscope-Controller/src/stitching_scanner.py
2026-01-12 00:33:57 -06:00

1612 lines
No EOL
71 KiB
Python

"""
Stitching Scanner v2 - Simplified unified approach
Same displacement-based stitching for both horizontal rows and vertical row transitions.
No complex visual matching - just track displacement and append strips.
Continuous alignment correction for gear slippage compensation.
FIXES:
- Row-start alignment now compares with TRANSITION STRIPS, not old row 1
- Correct X position tracking after row transition
- Fixed LEFT scan stop condition
DEBUG BORDER COLORS (for debugging row 2 placement):
=================================================
In _detect_row_start_alignment():
- WHITE (thick): Outline of where TRANSITION STRIPS are in mosaic (X=x_offset, Y=0 to transition_height)
- BLUE line: Y=transition_height (boundary between transition strips and old row 1)
- CYAN: Mosaic region used for Y alignment comparison (bottom of transition strips)
- MAGENTA: Where the new frame is EXPECTED to be placed
- YELLOW: Mosaic region used for X alignment comparison (edge of transition strips)
In _blend_horizontal_at_y (append_left):
- RED (thick): Where strip WOULD have been placed (ORIGINAL position before alignment)
- GREEN: Where strip was ACTUALLY placed (ADJUSTED position after alignment)
MOSAIC LAYOUT AFTER DOWN TRANSITION:
====================================
Y=0 to Y=transition_height: TRANSITION STRIPS (at X=x_offset to X=x_offset+fw)
Y=transition_height to Y=mh: OLD ROW 1 (shifted down, at X=0 to X=mw)
Where x_offset = mosaic_width - initial_frame_width
And transition_height = mosaic_height - initial_frame_height
"""
import cv2
import numpy as np
import time
import threading
from dataclasses import dataclass
from typing import Optional, Callable, Tuple
from enum import Enum
class ScanDirection(Enum):
RIGHT = 'right'
LEFT = 'left'
DOWN = 'down'
UP = 'up'
@dataclass
class StitchConfig:
displacement_threshold: float = 0.10 # 10% of frame triggers append
movement_interval: float = 0.001
frame_interval: float = 1.00
settle_time: float = 0.75
max_scan_time: float = 300.0
row_overlap: float = 0.15
max_mosaic_width: int = 1000
max_mosaic_height: int = 1000
scan_speed_index: int = 3
autofocus_every_row: bool = True
@dataclass
class StitchState:
is_scanning: bool = False
direction: str = ''
cumulative_x: float = 0.0
cumulative_y: float = 0.0
current_x: float = 0.0
current_y: float = 0.0
last_displacement: Tuple[float, float] = (0.0, 0.0)
current_row: int = 0
total_rows: int = 0
mosaic_width: int = 0
mosaic_height: int = 0
mosaic_init_width: int = 0
mosaic_init_height: int = 0
frame_count: int = 0
append_count: int = 0
@dataclass
class AlignmentOffset:
"""Stores alignment offset for strip placement"""
x_offset: float = 0.0
y_offset: float = 0.0
valid: bool = False
confidence: float = 0.0 # Phase correlation response
class StitchingScanner:
"""
Slide scanner using continuous stitching.
Unified approach for horizontal and vertical movement.
Continuous alignment correction for gear slippage compensation.
"""
def __init__(self, camera, motion_controller, autofocus_controller=None,
config: StitchConfig = None,
on_log: Callable[[str], None] = None,
on_progress: Callable[[int, int], None] = None,
on_mosaic_updated: Callable[[], None] = None):
self.camera = camera
self.motion = motion_controller
self.autofocus = autofocus_controller
self.config = config or StitchConfig()
self.on_log = on_log
self.on_progress = on_progress
self.on_mosaic_updated = on_mosaic_updated
self.running = False
self.paused = False
self.state = StitchState()
self._state_lock = threading.Lock()
self.mosaic: Optional[np.ndarray] = None
self._mosaic_lock = threading.RLock() # Changed to RLock to avoid deadlocks
self._prev_frame: Optional[np.ndarray] = None
self._displacement_since_append_x: float = 0.0
self._displacement_since_append_y: float = 0.0
# Cumulative alignment drift - tracks total correction applied
self._cumulative_align_x: float = 0.0
self._cumulative_align_y: float = 0.0
# Track the Y position in the mosaic where the current row starts
# This is critical for placing strips at the correct vertical position
self._row_start_y: int = 0
# Track the X position in the mosaic where the current row starts
# For row 1 (RIGHT scan): starts at X=0
# For row 2 (LEFT scan): starts at X = mosaic_width - frame_width (right edge)
self._row_start_x: int = 0
# Last strip's alignment for continuity
self._last_strip_alignment = AlignmentOffset()
self._thread: Optional[threading.Thread] = None
def log(self, message: str):
if self.on_log:
self.on_log(f"[Stitch] {message}")
print(f"[Stitch] {message}")
# =========================================================================
# Displacement Detection
# =========================================================================
def _to_grayscale(self, frame: np.ndarray) -> np.ndarray:
if len(frame.shape) == 3:
return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
return frame
def _detect_displacement(self, prev_frame: np.ndarray,
curr_frame: np.ndarray) -> Tuple[float, float]:
prev_gray = self._to_grayscale(prev_frame)
curr_gray = self._to_grayscale(curr_frame)
if prev_gray.shape != curr_gray.shape:
return (0.0, 0.0)
prev_f = prev_gray.astype(np.float32)
curr_f = curr_gray.astype(np.float32)
h, w = prev_gray.shape
window = cv2.createHanningWindow((w, h), cv2.CV_32F)
prev_f = prev_f * window
curr_f = curr_f * window
shift, response = cv2.phaseCorrelate(prev_f, curr_f)
return shift
def _detect_displacement_with_confidence(self, prev_frame: np.ndarray,
curr_frame: np.ndarray) -> Tuple[float, float, float]:
"""Detect displacement and return confidence (phase correlation response)."""
prev_gray = self._to_grayscale(prev_frame)
curr_gray = self._to_grayscale(curr_frame)
if prev_gray.shape != curr_gray.shape:
return (0.0, 0.0, 0.0)
prev_f = prev_gray.astype(np.float32)
curr_f = curr_gray.astype(np.float32)
h, w = prev_gray.shape
window = cv2.createHanningWindow((w, h), cv2.CV_32F)
prev_f = prev_f * window
curr_f = curr_f * window
shift, response = cv2.phaseCorrelate(prev_f, curr_f)
return (shift[0], shift[1], response)
def _detect_displacement_robust(self, prev_frame: np.ndarray,
curr_frame: np.ndarray) -> Tuple[float, float]:
dx, dy = self._detect_displacement(prev_frame, curr_frame)
h, w = prev_frame.shape[:2]
max_displacement = max(w, h) * 0.5
if abs(dx) > max_displacement or abs(dy) > max_displacement:
self.log(f"Warning: Large displacement ({dx:.1f}, {dy:.1f}), ignoring")
return (0.0, 0.0)
return (dx, dy)
def _detect_row_start_alignment(self, frame: np.ndarray, direction: ScanDirection) -> AlignmentOffset:
"""
Detect alignment at the START of a new row.
After a DOWN transition with prepend, the mosaic layout is:
- Y=0 to Y≈transition_height: TRANSITION STRIPS (placed at x_offset)
- Y≈transition_height to Y=mosaic_height: OLD row 1 content (shifted down)
The current frame should overlap with the TRANSITION STRIPS, not old row 1.
The camera is at the X position where transition strips were placed.
"""
offset = AlignmentOffset()
with self._mosaic_lock:
if self.mosaic is None:
return offset
mh, mw = self.mosaic.shape[:2]
fh, fw = frame.shape[:2]
# Calculate where transition strips are in the mosaic
# x_offset is where the DOWN transition strips were placed horizontally
x_offset = max(0, mw - self.state.mosaic_init_width)
# Transition height is how much was added during DOWN transition
# transition_height = mh - original_height_before_transition
# But since row 1 was just fh tall, transition_height = mh - fh
transition_height = mh - fh
self.log(f" Row-start alignment: mosaic {mw}x{mh}, frame {fw}x{fh}")
self.log(f" Transition strips at: X={x_offset}, Y=0 to Y={transition_height}")
self.log(f" Old row 1 shifted to: Y={transition_height} to Y={mh}")
# ========== DEBUG: Draw borders showing layout ==========
# WHITE border: Where TRANSITION STRIPS are (this is what we should compare with)
cv2.rectangle(self.mosaic,
(x_offset, 0),
(min(x_offset + fw, mw), transition_height),
(255, 255, 255), 3) # WHITE - transition strips location
self.log(f" DEBUG: WHITE border - TRANSITION STRIPS at X={x_offset}:{min(x_offset + fw, mw)}, Y=0:{transition_height}")
# BLUE line at transition boundary (where transition ends and old row 1 begins)
cv2.line(self.mosaic, (0, transition_height), (mw, transition_height), (255, 0, 0), 3)
self.log(f" DEBUG: BLUE line at Y={transition_height} (transition/row1 boundary)")
vertical_overlap = min(200, fh // 3)
min_overlap = 50
# =============================================
# Step 1: Detect Y alignment
# =============================================
# Compare frame's TOP with the BOTTOM of transition strips
# Frame's top portion overlaps with mosaic's transition bottom
#
# Transition strips are at Y=0 to Y=transition_height
# So transition bottom is around Y=(transition_height - overlap) to Y=transition_height
transition_compare_y_start = transition_height
transition_compare_y_end = max(0, transition_height + vertical_overlap)
# Frame's TOP should overlap with transition BOTTOM
frame_top = frame[:vertical_overlap, :]
# Get the X range - centered on where transition strips are
x_end = x_offset
x_start = min(x_offset - fw, mw)
compare_width = x_end - x_start
if transition_compare_y_start >= 0 and compare_width >= min_overlap:
# Mosaic region: bottom of transition strips
mosaic_transition_bottom = self.mosaic[transition_compare_y_start:transition_compare_y_end,
x_start:x_end]
# Frame region: top portion (what overlaps with transition)
frame_compare = frame_top[:, :compare_width]
# ========== DEBUG: Save frame with comparison region marked ==========
debug_frame = frame.copy()
cv2.rectangle(debug_frame, (0, 0), (compare_width, vertical_overlap),
(255, 255, 0), 2) # CYAN - frame's top region used for Y comparison
self.log(f" DEBUG: Frame Y comparison region: X=0:{compare_width}, Y=0:{vertical_overlap}")
try:
cv2.imwrite('/tmp/debug_frame_row2_start.png', debug_frame)
except:
pass
# ========== DEBUG: Draw CYAN border on mosaic for Y comparison region ==========
cv2.rectangle(self.mosaic,
(x_start, transition_compare_y_start),
(x_end, transition_compare_y_end),
(255, 255, 0), 2) # CYAN - mosaic comparison region for Y
self.log(f" DEBUG: CYAN border - mosaic Y comparison region X={x_start}:{x_end}, Y={transition_compare_y_start}:{transition_compare_y_end}")
# MAGENTA border: Where frame is EXPECTED to be placed (at transition position)
cv2.rectangle(self.mosaic,
(x_start, 0),
(x_end, fh),
(255, 0, 255), 2) # MAGENTA - expected frame position
self.log(f" DEBUG: MAGENTA border - expected frame position X={x_start}:{x_end}, Y=0:{fh}")
min_w = min(frame_compare.shape[1], mosaic_transition_bottom.shape[1])
min_h = min(frame_compare.shape[0], mosaic_transition_bottom.shape[0])
if min_w >= min_overlap and min_h >= min_overlap:
frame_compare = frame_compare[:min_h, :min_w]
mosaic_transition_bottom = mosaic_transition_bottom[:min_h, :min_w]
# ========== DEBUG: Save the comparison regions as images ==========
try:
cv2.imwrite('/tmp/debug_frame_top_region.png', frame_compare)
cv2.imwrite('/tmp/debug_mosaic_transition_region.png', mosaic_transition_bottom)
self.log(f" DEBUG: Saved comparison regions to /tmp/debug_*.png")
except:
pass
# Detect displacement
dx_v, dy_v, conf_v = self._detect_displacement_with_confidence(
mosaic_transition_bottom, frame_compare)
self.log(f" Row-start Y alignment: dx={dx_v:.1f}, dy={dy_v:.1f}, conf={conf_v:.3f}")
self.log(f" Compared frame[0:{vertical_overlap}] with mosaic[{transition_compare_y_start}:{transition_compare_y_end}]")
if conf_v > 0.1:
offset.y_offset = dy_v
offset.confidence = conf_v
# =============================================
# Step 2: Detect X alignment
# =============================================
horizontal_overlap = min(200, fw // 3)
if direction == ScanDirection.LEFT:
# For LEFT scan: frame starts at the transition X position
# Compare frame's RIGHT edge with mosaic's transition strip RIGHT edge
transition_x_end = min(x_offset + fw, mw)
transition_x_start = max(x_offset, transition_x_end - horizontal_overlap)
# Y range: within the transition strip area
y_start = 0
y_end = min(transition_height, fh)
if transition_x_end - transition_x_start >= min_overlap:
mosaic_edge = self.mosaic[y_start:y_end, transition_x_start:transition_x_end]
frame_edge = frame[:y_end, fw - (transition_x_end - transition_x_start):fw]
# ========== DEBUG: Draw YELLOW border for X comparison region ==========
cv2.rectangle(self.mosaic,
(transition_x_start, y_start),
(transition_x_end, y_end),
(0, 255, 255), 2) # YELLOW - mosaic comparison region for X
self.log(f" DEBUG: YELLOW border - mosaic X comparison region X={transition_x_start}:{transition_x_end}, Y={y_start}:{y_end}")
min_h = min(mosaic_edge.shape[0], frame_edge.shape[0])
min_w = min(mosaic_edge.shape[1], frame_edge.shape[1])
if min_h >= min_overlap and min_w >= min_overlap:
mosaic_edge = mosaic_edge[:min_h, :min_w]
frame_edge = frame_edge[:min_h, :min_w]
dx_h, dy_h, conf_h = self._detect_displacement_with_confidence(
mosaic_edge, frame_edge)
self.log(f" Row-start X alignment: dx={dx_h:.1f}, dy={dy_h:.1f}, conf={conf_h:.3f}")
if conf_h > 0.1:
offset.x_offset = -dx_h
if conf_h > offset.confidence:
offset.confidence = conf_h
else:
# For RIGHT scan at row start (similar logic but for left edge)
transition_x_start = x_offset
transition_x_end = min(x_offset + horizontal_overlap, mw)
y_start = 0
y_end = min(transition_height, fh)
if transition_x_end - transition_x_start >= min_overlap:
mosaic_edge = self.mosaic[y_start:y_end, transition_x_start:transition_x_end]
frame_edge = frame[:y_end, :transition_x_end - transition_x_start]
cv2.rectangle(self.mosaic,
(transition_x_start, y_start),
(transition_x_end, y_end),
(0, 255, 255), 2) # YELLOW
self.log(f" DEBUG: YELLOW border - mosaic X comparison region X={transition_x_start}:{transition_x_end}, Y={y_start}:{y_end}")
min_h = min(mosaic_edge.shape[0], frame_edge.shape[0])
min_w = min(mosaic_edge.shape[1], frame_edge.shape[1])
if min_h >= min_overlap and min_w >= min_overlap:
mosaic_edge = mosaic_edge[:min_h, :min_w]
frame_edge = frame_edge[:min_h, :min_w]
dx_h, dy_h, conf_h = self._detect_displacement_with_confidence(
mosaic_edge, frame_edge)
self.log(f" Row-start X alignment: dx={dx_h:.1f}, dy={dy_h:.1f}, conf={conf_h:.3f}")
if conf_h > 0.1:
offset.x_offset = -dx_h
if conf_h > offset.confidence:
offset.confidence = conf_h
# Limit maximum adjustment
max_adjust = 80
if abs(offset.x_offset) > max_adjust:
self.log(f" Limiting X offset from {offset.x_offset:.1f} to ±{max_adjust}")
offset.x_offset = max(-max_adjust, min(max_adjust, offset.x_offset))
if abs(offset.y_offset) > max_adjust:
self.log(f" Limiting Y offset from {offset.y_offset:.1f} to ±{max_adjust}")
offset.y_offset = max(-max_adjust, min(max_adjust, offset.y_offset))
offset.valid = offset.confidence > 0.1
if offset.valid:
self.log(f" Row-start alignment FINAL: X={offset.x_offset:.1f}, Y={offset.y_offset:.1f}, conf={offset.confidence:.3f}")
return offset
def _detect_strip_alignment(self, frame: np.ndarray, direction: ScanDirection,
expected_x: int, expected_y: int) -> AlignmentOffset:
"""
Detect alignment offset for a strip by comparing the current frame
with the expected overlap region of the mosaic.
This provides continuous correction for gear slippage during scanning.
Args:
frame: Current camera frame
direction: Scan direction
expected_x: Expected X position in mosaic
expected_y: Expected Y position in mosaic
Returns:
AlignmentOffset with X/Y correction needed
"""
offset = AlignmentOffset()
with self._mosaic_lock:
if self.mosaic is None:
return offset
mh, mw = self.mosaic.shape[:2]
fh, fw = frame.shape[:2]
# Clamp expected positions
expected_y = max(0, min(expected_y, mh - fh))
expected_x = max(0, min(expected_x, mw - fw))
# Increased overlap for better detection
max_overlap = 250 # Increased from 200
min_overlap = 40 # Increased from 30
if direction == ScanDirection.RIGHT:
# We're appending to the right
# Compare left portion of frame with right edge of mosaic
overlap_width = min(fw // 2, mw - expected_x, max_overlap)
if overlap_width < min_overlap:
return offset
# Extract regions
mosaic_region = self.mosaic[expected_y:expected_y + fh, mw - overlap_width:mw]
frame_region = frame[:, :overlap_width]
elif direction == ScanDirection.LEFT:
# We're placing within existing mosaic, moving left
# Compare right portion of frame with mosaic at expected position
overlap_width = min(fw // 2, mw - expected_x, max_overlap)
if overlap_width < min_overlap:
return offset
# The frame's right edge should align with mosaic at expected_x + fw
mosaic_x_end = min(expected_x + fw, mw)
mosaic_x_start = max(mosaic_x_end - overlap_width, 0)
actual_overlap = mosaic_x_end - mosaic_x_start
if actual_overlap < min_overlap:
return offset
mosaic_region = self.mosaic[expected_y:expected_y + fh, mosaic_x_start:mosaic_x_end]
frame_region = frame[:, fw - actual_overlap:]
elif direction == ScanDirection.DOWN:
# We're appending below
# Compare top portion of frame with bottom edge of mosaic
overlap_height = min(fh // 2, mh - expected_y, max_overlap)
if overlap_height < min_overlap:
return offset
mosaic_region = self.mosaic[mh - overlap_height:mh, expected_x:expected_x + fw]
frame_region = frame[:overlap_height, :]
else: # UP
# Compare bottom portion of frame with top edge of mosaic
overlap_height = min(fh // 2, expected_y, max_overlap)
if overlap_height < min_overlap:
return offset
mosaic_region = self.mosaic[:overlap_height, expected_x:expected_x + fw]
frame_region = frame[fh - overlap_height:, :]
# Ensure regions have the same size
min_h = min(mosaic_region.shape[0], frame_region.shape[0])
min_w = min(mosaic_region.shape[1], frame_region.shape[1])
if min_h < min_overlap or min_w < min_overlap:
self.log(f"Strip alignment: overlap too small ({min_w}x{min_h})")
return offset
mosaic_region = mosaic_region[:min_h, :min_w]
frame_region = frame_region[:min_h, :min_w]
# Detect displacement with confidence
dx, dy, confidence = self._detect_displacement_with_confidence(mosaic_region, frame_region)
# Sanity check - reject large displacements
max_adjust = 50 # Max pixels to adjust
if abs(dx) > max_adjust or abs(dy) > max_adjust:
self.log(f"Strip alignment: displacement too large ({dx:.1f}, {dy:.1f}), ignoring")
return offset
offset.x_offset = dx
offset.y_offset = dy
offset.confidence = confidence
offset.valid = confidence > 0.1 # Require minimum confidence
if offset.valid:
self.log(f" Strip alignment: X={dx:.1f}, Y={dy:.1f}, conf={confidence:.3f}")
return offset
# =========================================================================
# Mosaic Building
# =========================================================================
def _init_mosaic(self, frame: np.ndarray):
with self._mosaic_lock:
self.mosaic = frame.copy()
self._prev_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
# Reset cumulative alignment
self._cumulative_align_x = 0.0
self._cumulative_align_y = 0.0
self._last_strip_alignment = AlignmentOffset()
# Row 0 starts at Y=0
self._row_start_y = 0
with self._state_lock:
h, w = frame.shape[:2]
self.state.mosaic_width = w
self.state.mosaic_height = h
self.state.mosaic_init_width = w
self.state.mosaic_init_height = h
self.state.frame_count = 1
self.state.append_count = 0
self.state.current_y = 0
self.state.current_x = 0
self.log(f"Initialized mosaic: {frame.shape[1]}x{frame.shape[0]}")
def _blend_horizontal_at_y(self, base: np.ndarray, strip: np.ndarray,
blend_width: int, append_right: bool,
x_offset: int = None, y_offset: int = 0,
alignment_x: float = 0.0, alignment_y: float = 0.0) -> np.ndarray:
"""
Blend strip horizontally onto base at specified Y position.
Args:
base: The existing mosaic
strip: The new strip to append
blend_width: Width of the blending zone
append_right: True to append to right, False to append left
x_offset: X position for left-append mode
y_offset: Y position in the mosaic
alignment_x: Additional X alignment offset (from strip alignment detection)
alignment_y: Additional Y alignment offset (from strip alignment detection)
"""
h_base, w_base = base.shape[:2]
h_strip, w_strip = strip.shape[:2]
blend_w = min(blend_width, w_strip, w_base)
if append_right:
# Apply alignment offset for Y
y_offset = y_offset + int(round(alignment_y))
# Clamp y_offset
y_offset = max(0, min(y_offset, h_base - h_strip))
# Expand mosaic to the right
result_width = w_base + w_strip - blend_w
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
self.log(f"=== _blend_horizontal_at_y (append_right) ===")
self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}")
self.log(f" y_offset: {y_offset}, blend_w: {blend_w}")
self.log(f" alignment: X={alignment_x:.1f}, Y={alignment_y:.1f}")
self.log(f" result: {result_width}x{h_base}")
# Step 1: Copy entire base
result[:, :w_base] = base
# Step 2: Copy non-overlap portion of strip at correct Y
self.log(f" Placing strip at X={w_base - blend_w}:{result_width}, Y={y_offset}:{y_offset + h_strip}")
result[y_offset:y_offset + h_strip, w_base:] = strip[:, blend_w:]
# Step 3: Create blend
alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
base_overlap = base[y_offset:y_offset + h_strip, -blend_w:].astype(np.float32)
strip_overlap = strip[:, :blend_w].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
# Step 4: Place blend
result[y_offset:y_offset + h_strip, w_base - blend_w:w_base] = blended
return result
else: # append_left - place at x_offset, NO width expansion
# x_offset is where the LEFT edge of the strip should go
if x_offset is None:
x_offset = 0
# Store ORIGINAL position before alignment (for debug)
original_x_offset = x_offset
original_y_offset = y_offset
# Apply alignment offsets (continuous correction)
x_offset = x_offset + int(round(alignment_x))
y_offset_before = y_offset
y_offset = y_offset - int(round(alignment_y))
self.log(f" Y offset computation: {y_offset_before} - {int(round(alignment_y))} = {y_offset}")
# Clamp x_offset to valid range
x_offset = max(0, min(x_offset, w_base))
# Handle strip cropping if y_offset is negative (strip protrudes above frame)
strip_y_start = 0 # How much to crop from top of strip
if y_offset < 0:
strip_y_start = -y_offset # Crop this many rows from top of strip
y_offset = 0
self.log(f" Cropping {strip_y_start}px from top of strip")
# Handle strip cropping if it protrudes below frame
strip_y_end = h_strip
if y_offset + (h_strip - strip_y_start) > h_base:
strip_y_end = h_strip - ((y_offset + (h_strip - strip_y_start)) - h_base)
self.log(f" Cropping bottom of strip, new strip_y_end={strip_y_end}")
# Get the cropped strip
if strip_y_start >= strip_y_end:
self.log(f" WARNING: Strip fully cropped, nothing to place")
return base.copy()
cropped_strip = strip[strip_y_start:strip_y_end, :]
h_cropped = cropped_strip.shape[0]
self.log(f"=== _blend_horizontal_at_y (append_left) ===")
self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}")
self.log(f" x_offset: {x_offset}, y_offset: {y_offset}, blend_w: {blend_w}")
self.log(f" alignment: X={alignment_x:.1f}, Y={alignment_y:.1f}")
self.log(f" cumulative: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
self.log(f" row_start_y: {self._row_start_y}")
self.log(f" Strip crop: rows [{strip_y_start}:{strip_y_end}] -> height {h_cropped}")
self.log(f" ORIGINAL position: X={original_x_offset}, Y={original_y_offset}")
self.log(f" ADJUSTED position: X={x_offset}, Y={y_offset}")
# Result is same size as base (no expansion when going left)
result = base.copy()
# Calculate placement bounds
strip_x_start = x_offset
strip_x_end = min(x_offset + w_strip, w_base)
strip_cols_to_copy = strip_x_end - strip_x_start
self.log(f" Placing strip at X={strip_x_start}:{strip_x_end}, Y={y_offset}:{y_offset + h_cropped}")
self.log(f" Strip cols to copy: {strip_cols_to_copy}")
# ========== DEBUG: Draw borders BEFORE placing strip ==========
# RED border: Where strip WOULD have been placed (original position)
orig_x_end = min(original_x_offset + w_strip, w_base)
orig_y_end = min(original_y_offset + h_strip, h_base)
if original_x_offset >= 0 and original_y_offset >= 0:
cv2.rectangle(result,
(original_x_offset, max(0, original_y_offset)),
(orig_x_end, orig_y_end),
(0, 0, 255), 3) # RED - original position
self.log(f" DEBUG: RED border at original position X={original_x_offset}:{orig_x_end}, Y={original_y_offset}:{orig_y_end}")
# Step 1: Copy strip content (non-blend portion) at correct position
# For LEFT scanning, blend is on the RIGHT side of the strip
non_blend_end = strip_cols_to_copy - blend_w
if non_blend_end > 0:
result[y_offset:y_offset + h_cropped, strip_x_start:strip_x_start + non_blend_end] = cropped_strip[:, :non_blend_end]
self.log(f" Step 1: Placed non-blend at X={strip_x_start}:{strip_x_start + non_blend_end}")
# Step 2: Create blend on the RIGHT edge of strip (blending with existing content)
blend_x_start = strip_x_end - blend_w
blend_x_end = strip_x_end
if blend_w > 0 and blend_x_start >= strip_x_start:
alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
strip_overlap = cropped_strip[:, -blend_w:].astype(np.float32)
base_overlap = base[y_offset:y_offset + h_cropped, blend_x_start:blend_x_end].astype(np.float32)
blended = (strip_overlap * alpha + base_overlap * (1 - alpha)).astype(np.uint8)
result[y_offset:y_offset + h_cropped, blend_x_start:blend_x_end] = blended
self.log(f" Step 2: Blend zone at X={blend_x_start}:{blend_x_end}")
# ========== DEBUG: Draw border AFTER placing strip ==========
# GREEN border: Where strip was ACTUALLY placed (adjusted position)
cv2.rectangle(result,
(strip_x_start, y_offset),
(strip_x_end, y_offset + h_cropped),
(0, 255, 0), 2) # GREEN - actual position
self.log(f" DEBUG: GREEN border at actual position X={strip_x_start}:{strip_x_end}, Y={y_offset}:{y_offset + h_cropped}")
self.log(f" Final: Strip placed at X={strip_x_start}, Y={y_offset}, mosaic size unchanged: {w_base}x{h_base}")
return result
def _blend_horizontal(self, base: np.ndarray, strip: np.ndarray,
blend_width: int, append_right: bool) -> np.ndarray:
if blend_width <= 0 or blend_width >= strip.shape[1]:
if append_right:
return np.hstack([base, strip])
return np.hstack([strip, base])
h_base, w_base = base.shape[:2]
h_strip, w_strip = strip.shape[:2]
self.log(f"Base Width: {w_base}px")
if h_strip != h_base:
if append_right:
return np.hstack([base, strip])
return np.hstack([strip, base])
blend_w = min(blend_width, w_strip, w_base)
if append_right:
result_width = w_base + w_strip - blend_w
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
result[:, :w_base] = base
alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
base_overlap = base[:, -blend_w:].astype(np.float32)
strip_overlap = strip[:, :blend_w].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
result[:, w_base - blend_w:w_base] = blended
result[:, w_base:] = strip[:, blend_w:]
return result
else:
result_width = w_base + w_strip - blend_w
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
result[:, :w_strip] = strip
alpha = np.linspace(0, 1, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
strip_overlap = strip[:, -blend_w:].astype(np.float32)
base_overlap = base[:, :blend_w].astype(np.float32)
blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8)
result[:, w_strip - blend_w:w_strip] = blended
result[:, w_strip:] = base[:, blend_w:]
return result
def _blend_vertical_at_x(self, base: np.ndarray, strip: np.ndarray,
blend_height: int, append_below: bool,
x_off: int = 0,
alignment_x: float = 0.0, alignment_y: float = 0.0) -> np.ndarray:
h_base, w_base = base.shape[:2]
h_strip, w_strip = strip.shape[:2]
# Apply alignment offset for X position
x_offset = max(0, w_base - self.state.mosaic_init_width)
x_offset = x_offset + int(round(alignment_x))
x_offset = max(0, min(x_offset, w_base - w_strip)) if w_strip < w_base else 0
# Create full-width strip with strip placed at x_offset
full_strip = np.zeros((h_strip, w_base, 3), dtype=np.uint8)
available_width = w_base - x_offset
copy_width = min(w_strip, available_width)
full_strip[:, x_offset:x_offset + copy_width] = strip[:, :copy_width]
self.log(f"=== _blend_vertical_at_x ===")
self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}")
self.log(f" x_offset: {x_offset}, alignment: X={alignment_x:.1f}, Y={alignment_y:.1f}")
# Early exit: no blending possible
if blend_height <= 0 or blend_height >= h_strip:
if append_below:
return np.vstack([base, full_strip])
return np.vstack([full_strip, base])
blend_h = min(blend_height, h_strip, h_base)
if append_below:
result_height = h_base + h_strip - blend_h
result = np.zeros((result_height, w_base, 3), dtype=np.uint8)
result[:h_base, :] = base
alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
base_overlap = base[-blend_h:, :].astype(np.float32)
strip_overlap = full_strip[:blend_h, :].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
result[h_base - blend_h:h_base, :] = blended
result[h_base:, :] = full_strip[blend_h:, :]
return result
else:
result_height = h_base + h_strip - blend_h
result = np.zeros((result_height, w_base, 3), dtype=np.uint8)
result[:h_strip, :] = full_strip
alpha = np.linspace(0, 1, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
strip_overlap = full_strip[-blend_h:, :].astype(np.float32)
base_overlap = base[:blend_h, :].astype(np.float32)
blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8)
result[h_strip - blend_h:h_strip, :] = blended
result[h_strip:, :] = base[blend_h:, :]
return result
def _blend_vertical(self, base: np.ndarray, strip: np.ndarray,
blend_height: int, append_below: bool) -> np.ndarray:
mh, mw = base.shape[:2]
sh, sw = strip.shape[:2]
# Match widths
if sw > mw:
strip = strip[:, :mw]
elif sw < mw:
pad = np.zeros((sh, mw - sw, 3), dtype=np.uint8)
strip = np.hstack([strip, pad])
blend_h = min(blend_height, sh, mh)
if blend_h <= 0:
if append_below:
return np.vstack([base, strip])
return np.vstack([strip, base])
if append_below:
alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
base_overlap = base[-blend_h:].astype(np.float32)
strip_overlap = strip[:blend_h].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
result_h = mh + sh - blend_h
result = np.zeros((result_h, mw, 3), dtype=np.uint8)
result[:mh - blend_h] = base[:-blend_h]
result[mh - blend_h:mh] = blended
result[mh:] = strip[blend_h:]
return result
else:
alpha = np.linspace(0, 1, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
strip_overlap = strip[-blend_h:].astype(np.float32)
base_overlap = base[:blend_h].astype(np.float32)
blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8)
result_h = mh + sh - blend_h
result = np.zeros((result_h, mw, 3), dtype=np.uint8)
result[:sh - blend_h] = strip[:-blend_h]
result[sh - blend_h:sh] = blended
result[sh:] = base[blend_h:]
return result
def _append_strip(self, frame: np.ndarray, direction: ScanDirection):
"""Append strip to mosaic based on accumulated displacement with continuous alignment."""
BLEND_WIDTH = 10
SAFETY_MARGIN = 2
with self._mosaic_lock:
if self.mosaic is None:
return
h, w = frame.shape[:2]
mh, mw = self.mosaic.shape[:2]
dx = abs(self._displacement_since_append_x)
dy = abs(self._displacement_since_append_y)
# Calculate expected position for alignment detection
# Use _row_start_y for the Y position since that's where this row's content belongs
expected_x = int(self.state.current_x + self._cumulative_align_x)
expected_y = int(self._row_start_y + self._cumulative_align_y)
# Detect alignment for this strip
alignment = self._detect_strip_alignment(frame, direction, expected_x, expected_y)
if alignment.valid:
# Update cumulative alignment
self._cumulative_align_x += alignment.x_offset
self._cumulative_align_y += alignment.y_offset
self._last_strip_alignment = alignment
# Get total alignment offsets
align_x = self._cumulative_align_x
align_y = self._cumulative_align_y
if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
append_width = round(dx) + SAFETY_MARGIN
append_width = min(append_width, w - BLEND_WIDTH - 5)
if append_width < 1:
return
pixels_consumed = append_width - SAFETY_MARGIN
fractional_remainder = dx - pixels_consumed
# Use _row_start_y for Y position - this is the Y position in the mosaic
# where the current row's content belongs
y_offset = self._row_start_y
if direction == ScanDirection.RIGHT:
strip_start = max(0, w - append_width - BLEND_WIDTH)
new_strip = frame[:, strip_start:]
self.mosaic = self._blend_horizontal_at_y(
self.mosaic, new_strip, BLEND_WIDTH, append_right=True,
x_offset=int(self.state.current_x), y_offset=y_offset,
alignment_x=align_x, alignment_y=align_y)
else:
strip_end = min(w, append_width + BLEND_WIDTH)
new_strip = frame[:, :strip_end]
self.mosaic = self._blend_horizontal_at_y(
self.mosaic, new_strip, BLEND_WIDTH, append_right=False,
x_offset=int(self.state.current_x), y_offset=y_offset,
alignment_x=align_x, alignment_y=align_y)
self._displacement_since_append_x = fractional_remainder
self._displacement_since_append_y = 0.0
elif direction in [ScanDirection.DOWN, ScanDirection.UP]:
append_height = round(dy) + SAFETY_MARGIN
append_height = min(append_height, h - BLEND_WIDTH - 5)
if append_height < 1:
return
pixels_consumed = append_height - SAFETY_MARGIN
fractional_remainder = dy - pixels_consumed
if direction == ScanDirection.DOWN:
strip_end = min(h, append_height + BLEND_WIDTH)
new_strip = frame[:strip_end, :]
self.mosaic = self._blend_vertical_at_x(
self.mosaic, new_strip, BLEND_WIDTH, append_below=False,
x_off=int(self.state.current_x),
alignment_x=align_x, alignment_y=align_y)
else:
strip_start = max(0, h - append_height - BLEND_WIDTH)
new_strip = frame[strip_start:, :]
self.mosaic = self._blend_vertical_at_x(
self.mosaic, new_strip, BLEND_WIDTH, append_below=True,
x_off=int(self.state.current_x),
alignment_x=align_x, alignment_y=align_y)
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = fractional_remainder
new_mh, new_mw = self.mosaic.shape[:2]
with self._state_lock:
self.state.mosaic_width = new_mw
self.state.mosaic_height = new_mh
self.state.append_count += 1
if self.on_mosaic_updated:
self.on_mosaic_updated()
# =========================================================================
# Scan Control
# =========================================================================
def start(self) -> bool:
if self.running:
self.log("Already running")
return False
self.running = True
self.paused = False
with self._state_lock:
self.state = StitchState()
self.state.is_scanning = True
with self._mosaic_lock:
self.mosaic = None
self._prev_frame = None
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
# Reset cumulative alignment
self._cumulative_align_x = 0.0
self._cumulative_align_y = 0.0
self._last_strip_alignment = AlignmentOffset()
# Reset row start Y position
self._row_start_y = 0
self._thread = threading.Thread(target=self._scan_loop, daemon=True)
self._thread.start()
self.log("Stitching scan started")
return True
def stop(self):
self.running = False
self.paused = False
self.motion.stop_all()
with self._state_lock:
self.state.is_scanning = False
self.log("Scan stopped")
def pause(self):
if self.running and not self.paused:
self.paused = True
self.motion.stop_all()
self.log("Scan paused")
def resume(self):
if self.running and self.paused:
self.paused = False
self.log("Scan resumed")
# =========================================================================
# Scanning Logic
# =========================================================================
def _scan_loop(self):
try:
self.log("Starting scan loop")
self.log(f"Max dimensions: {self.config.max_mosaic_width}x{self.config.max_mosaic_height}")
self.motion.set_speed(self.config.scan_speed_index)
time.sleep(0.1)
frame = self._capture_frame()
self._init_mosaic(frame)
row = 0
while self.running:
with self._state_lock:
self.state.current_row = row
self.state.total_rows = row + 1
self.log(f"=== Row {row + 1} ===")
self.log(f"Row start Y position: {self._row_start_y}")
self.log(f"Row start X position: {self.state.current_x}")
self.log(f"Cumulative alignment at row start: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
# Serpentine: even rows right, odd rows left
h_direction = ScanDirection.RIGHT if row % 2 == 0 else ScanDirection.LEFT
# For rows > 0, detect alignment against both edges before scanning
if row > 0:
frame = self._capture_frame()
row_alignment = self._detect_row_start_alignment(frame, h_direction)
if row_alignment.valid:
self.log(f"Applying row-start alignment: X={row_alignment.x_offset:.1f}, Y={row_alignment.y_offset:.1f}")
self._cumulative_align_x += row_alignment.x_offset
self._cumulative_align_y += row_alignment.y_offset
self.log(f"New cumulative alignment: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
stop_reason = self._scan_direction(h_direction)
if not self.running:
break
# Check max height
if self.state.mosaic_height >= self.config.max_mosaic_height:
self.log(f"Max height reached ({self.state.mosaic_height}px)")
break
# Move to next row using same stitching approach
if not self._move_to_next_row():
self.log("Failed to move to next row")
break
row += 1
self.log(f"Scan complete! Final: {self.state.mosaic_width}x{self.state.mosaic_height}")
self.log(f"Final cumulative alignment: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
except Exception as e:
self.log(f"Scan error: {e}")
import traceback
traceback.print_exc()
finally:
self.running = False
self.motion.stop_all()
with self._state_lock:
self.state.is_scanning = False
def _scan_direction(self, direction: ScanDirection) -> str:
"""Scan in a direction until edge or max dimension reached."""
self.log(f"Scanning {direction.value}...")
with self._state_lock:
self.state.direction = direction.value
frame = self._capture_frame()
h, w = frame.shape[:2]
total_x = 0
# Setup based on direction
if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
threshold_pixels = w * self.config.displacement_threshold
max_dim = self.config.max_mosaic_width
current_dim = lambda: self.state.mosaic_width
start_cmd = 'E' if direction == ScanDirection.RIGHT else 'W'
stop_cmd = 'e' if direction == ScanDirection.RIGHT else 'w'
else:
threshold_pixels = h * self.config.displacement_threshold
max_dim = self.config.max_mosaic_height
current_dim = lambda: self.state.mosaic_height
start_cmd = 'S' if direction == ScanDirection.DOWN else 'N'
stop_cmd = 's' if direction == ScanDirection.DOWN else 'n'
self._prev_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
start_time = time.time()
no_movement_count = 0
max_no_movement = 50
stop_reason = 'stopped'
self.log(f"Scanning 2..")
while self.running and not self.paused:
if time.time() - start_time > self.config.max_scan_time:
self.log("Scan timeout")
stop_reason = 'timeout'
break
if current_dim() >= max_dim and direction == ScanDirection.RIGHT:
self.log(f"Max dimension reached ({current_dim()}px)")
stop_reason = 'max_dim'
break
# For LEFT scan: stop when we've reached the left edge (current_x <= 0)
if self.state.current_x <= 0 and direction == ScanDirection.LEFT:
self.log(f"Reached left edge (current_x={self.state.current_x:.1f})")
stop_reason = 'edge'
break
if abs(self.state.current_x) >= self.config.max_mosaic_width and direction == ScanDirection.RIGHT:
self.log(f"Max dimension reached ({self.config.max_mosaic_width}px)")
self.log(f"Current X offset ({self.state.current_x}px)")
stop_reason = 'max_dim'
break
# Pulse motor
self.motion.send_command(start_cmd)
time.sleep(self.config.movement_interval)
self.motion.send_command(stop_cmd)
time.sleep(self.config.frame_interval)
curr_frame = self._capture_frame()
dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame)
self.log(f"Scanning dx{dx} dy{dy}..")
self._displacement_since_append_x += dx
self._displacement_since_append_y += dy
total_x += dx
with self._state_lock:
self.state.current_x += dx
with self._state_lock:
self.state.cumulative_x = self._displacement_since_append_x
self.state.cumulative_y = self._displacement_since_append_y
self.state.last_displacement = (dx, dy)
self.state.frame_count += 1
# Edge detection
movement = abs(dx) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(dy)
self.log(f"Scanning movement{movement}..")
if movement < 1.0:
no_movement_count += 1
if no_movement_count >= max_no_movement:
self.log(f"Edge detected (no movement)")
stop_reason = 'edge'
break
else:
no_movement_count = 0
# Append when threshold reached (with continuous alignment)
disp = abs(self._displacement_since_append_x) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(self._displacement_since_append_y)
self.log(f"Scanning disp{disp}..")
if disp >= threshold_pixels:
self.log(f"Scanning threshold_pixels..")
self._append_strip(curr_frame, direction)
self.log(f"Appended {disp:.1f}px, mosaic: {self.state.mosaic_width}x{self.state.mosaic_height}, align: ({self._cumulative_align_x:.1f}, {self._cumulative_align_y:.1f})")
self._prev_frame = curr_frame.copy()
if self.on_progress:
self.on_progress(self.state.append_count, 0)
self.motion.send_command(stop_cmd)
time.sleep(self.config.settle_time)
self.log(f"Direction finished: {stop_reason}")
return stop_reason
def _move_to_next_row(self) -> bool:
"""
Move down to next row using displacement-based stitching.
Same approach as horizontal scanning with continuous alignment.
"""
self.log("Moving to next row...")
self.log(f"Alignment before row transition: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
frame = self._capture_frame()
h, w = frame.shape[:2]
# Record mosaic height before transition - needed to calculate new row Y position
mosaic_height_before = self.state.mosaic_height
# Target: move (1 - overlap) * frame_height
target_displacement = h * (1 - self.config.row_overlap)
threshold_pixels = h * self.config.displacement_threshold
self.log(f"Target Y: {target_displacement:.0f}px, threshold: {threshold_pixels:.0f}px")
self.log(f"Mosaic height before row transition: {mosaic_height_before}")
with self._state_lock:
self.state.direction = 'down'
self.state.cumulative_y = 0.0
self._prev_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
total_y = 0.0
no_movement_count = 0
max_no_movement = 30
# Start moving South
self.motion.send_command('S')
try:
while self.running:
self.motion.send_command('S')
time.sleep(self.config.movement_interval)
self.motion.send_command('s')
time.sleep(self.config.frame_interval)
curr_frame = self._capture_frame()
dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame)
self._displacement_since_append_y += dy
total_y += dy
with self._state_lock:
self.state.current_y += dy
with self._state_lock:
self.state.cumulative_y = total_y
self.state.last_displacement = (dx, dy)
# Edge detection
if abs(dy) < 1.0:
no_movement_count += 1
if no_movement_count >= max_no_movement:
self.log("Edge detected during row transition")
self.motion.send_command('s')
time.sleep(self.config.settle_time)
return False
else:
no_movement_count = 0
# Append strip when threshold reached (with continuous alignment)
if abs(self._displacement_since_append_y) >= threshold_pixels:
self._append_strip(curr_frame, ScanDirection.DOWN)
self.log(f" Row transition: appended, total Y: {abs(total_y):.1f}px, align: ({self._cumulative_align_x:.1f}, {self._cumulative_align_y:.1f})")
# Done when we've moved enough
if abs(total_y) >= target_displacement:
self.log(f"Row transition complete: {abs(total_y):.1f}px")
self.log(f"Alignment after row transition: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
# Calculate the Y position in the mosaic where the new row starts
# Since we use append_below=False during DOWN movement, new content
# is PREPENDED to the TOP of the mosaic. So the new row starts at Y=0
self._row_start_y = 0 # New row is at the TOP after prepending
# Calculate the X position where transition strips were placed
# This is where the camera is now (ready to start row 2)
x_offset = max(0, self.state.mosaic_width - self.state.mosaic_init_width)
self.log(f"New row Y position: {self._row_start_y} (mosaic height: {self.state.mosaic_height})")
self.log(f"New row X position: {x_offset} (transition strip location)")
with self._state_lock:
self.state.current_y = 0
# Set current_x to the x_offset where transition strips are
# This is where row 2 will START (then scan LEFT from here)
self.state.current_x = x_offset
self.motion.send_command('s')
time.sleep(self.config.settle_time)
# Reset for next horizontal row
frame = self._capture_frame()
self._prev_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
return True
self._prev_frame = curr_frame.copy()
except Exception as e:
self.log(f"Row transition error: {e}")
self.motion.send_command('s')
return False
self.motion.send_command('s')
time.sleep(self.config.settle_time)
return False
def _capture_frame(self) -> np.ndarray:
frame = self.camera.capture_frame()
frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
return frame
# =========================================================================
# Getters
# =========================================================================
def get_state(self) -> StitchState:
with self._state_lock:
return StitchState(
is_scanning=self.state.is_scanning,
direction=self.state.direction,
cumulative_x=self.state.cumulative_x,
cumulative_y=self.state.cumulative_y,
last_displacement=self.state.last_displacement,
current_row=self.state.current_row,
total_rows=self.state.total_rows,
mosaic_width=self.state.mosaic_width,
mosaic_height=self.state.mosaic_height,
frame_count=self.state.frame_count,
append_count=self.state.append_count
)
def get_alignment_state(self) -> dict:
"""Get current alignment correction state."""
return {
'cumulative_x': self._cumulative_align_x,
'cumulative_y': self._cumulative_align_y,
'row_start_y': self._row_start_y,
'last_alignment': {
'x': self._last_strip_alignment.x_offset,
'y': self._last_strip_alignment.y_offset,
'confidence': self._last_strip_alignment.confidence,
'valid': self._last_strip_alignment.valid
}
}
def get_mosaic(self) -> Optional[np.ndarray]:
with self._mosaic_lock:
if self.mosaic is not None:
return self.mosaic.copy()
return None
def get_mosaic_preview(self, max_size: int = 600) -> Optional[np.ndarray]:
with self._mosaic_lock:
if self.mosaic is None:
return None
h, w = self.mosaic.shape[:2]
scale = min(max_size / w, max_size / h, 1.0)
if scale < 1.0:
new_w = int(w * scale)
new_h = int(h * scale)
return cv2.resize(self.mosaic, (new_w, new_h))
return self.mosaic.copy()
def save_mosaic(self, filepath: str) -> bool:
with self._mosaic_lock:
if self.mosaic is None:
return False
cv2.imwrite(filepath, self.mosaic)
self.log(f"Saved mosaic to {filepath}")
return True
# =========================================================================
# Testing
# =========================================================================
def test_displacement(self, num_frames: int = 10) -> dict:
results = {'frames': [], 'total_dx': 0.0, 'total_dy': 0.0}
prev_frame = self._capture_frame()
for i in range(num_frames):
time.sleep(0.1)
curr_frame = self._capture_frame()
dx, dy = self._detect_displacement(prev_frame, curr_frame)
results['frames'].append({'frame': i, 'dx': dx, 'dy': dy})
results['total_dx'] += dx
results['total_dy'] += dy
prev_frame = curr_frame
return results
def test_row_start_alignment(self, direction: str = 'left') -> dict:
"""Test row-start alignment detection."""
results = {
'success': False,
'x_offset': 0.0,
'y_offset': 0.0,
'confidence': 0.0,
'error': None
}
try:
self.log("Testing row-start alignment detection...")
if self.mosaic is None:
self.log("No mosaic - initializing...")
frame = self._capture_frame()
self._init_mosaic(frame)
frame = self._capture_frame()
scan_dir = ScanDirection.LEFT if direction == 'left' else ScanDirection.RIGHT
alignment = self._detect_row_start_alignment(frame, scan_dir)
results['success'] = alignment.valid
results['x_offset'] = alignment.x_offset
results['y_offset'] = alignment.y_offset
results['confidence'] = alignment.confidence
self.log(f"Row-start alignment: valid={alignment.valid}, X={alignment.x_offset:.1f}, Y={alignment.y_offset:.1f}, conf={alignment.confidence:.3f}")
except Exception as e:
results['error'] = str(e)
self.log(f"Test error: {e}")
return results
def test_row_transition(self) -> dict:
"""Test row transition using displacement stitching."""
results = {
'success': False,
'y_moved': 0.0,
'mosaic_before': (0, 0),
'mosaic_after': (0, 0),
'alignment_before': (0.0, 0.0),
'alignment_after': (0.0, 0.0),
'error': None
}
try:
self.log("Testing row transition...")
if self.mosaic is None:
frame = self._capture_frame()
self._init_mosaic(frame)
results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height)
results['alignment_before'] = (self._cumulative_align_x, self._cumulative_align_y)
with self._state_lock:
self.state.cumulative_y = 0.0
self.running = True
success = self._move_to_next_row()
self.running = False
results['success'] = success
results['y_moved'] = self.state.cumulative_y
results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height)
results['alignment_after'] = (self._cumulative_align_x, self._cumulative_align_y)
self.log(f"Row transition: {'SUCCESS' if success else 'FAILED'}, Y: {results['y_moved']:.1f}px")
self.log(f"Alignment change: ({results['alignment_before'][0]:.1f}, {results['alignment_before'][1]:.1f}) -> ({results['alignment_after'][0]:.1f}, {results['alignment_after'][1]:.1f})")
except Exception as e:
results['error'] = str(e)
self.log(f"Test error: {e}")
self.running = False
return results
def test_single_row(self, direction: str = 'right') -> dict:
"""Test scanning a single row."""
results = {
'success': False,
'stop_reason': None,
'appends': 0,
'mosaic_before': (0, 0),
'mosaic_after': (0, 0),
'alignment_before': (0.0, 0.0),
'alignment_after': (0.0, 0.0),
'error': None
}
try:
self.log(f"Testing single row ({direction})...")
if self.mosaic is None:
frame = self._capture_frame()
self._init_mosaic(frame)
results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height)
results['alignment_before'] = (self._cumulative_align_x, self._cumulative_align_y)
appends_before = self.state.append_count
self.motion.set_speed(self.config.scan_speed_index)
time.sleep(0.1)
self.running = True
scan_dir = ScanDirection.RIGHT if direction == 'right' else ScanDirection.LEFT
stop_reason = self._scan_direction(scan_dir)
self.running = False
results['success'] = True
results['stop_reason'] = stop_reason
results['appends'] = self.state.append_count - appends_before
results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height)
results['alignment_after'] = (self._cumulative_align_x, self._cumulative_align_y)
self.log(f"Alignment change: ({results['alignment_before'][0]:.1f}, {results['alignment_before'][1]:.1f}) -> ({results['alignment_after'][0]:.1f}, {results['alignment_after'][1]:.1f})")
except Exception as e:
results['error'] = str(e)
self.running = False
return results
def test_strip_alignment(self) -> dict:
"""Test strip alignment detection at current position."""
results = {
'success': False,
'x_offset': 0.0,
'y_offset': 0.0,
'confidence': 0.0,
'error': None
}
try:
self.log("Testing strip alignment detection...")
if self.mosaic is None:
self.log("No mosaic - initializing...")
frame = self._capture_frame()
self._init_mosaic(frame)
frame = self._capture_frame()
expected_x = int(self.state.current_x + self._cumulative_align_x)
expected_y = int(self._row_start_y + self._cumulative_align_y)
# Test for both directions
for direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
alignment = self._detect_strip_alignment(frame, direction, expected_x, expected_y)
self.log(f" {direction.value}: valid={alignment.valid}, X={alignment.x_offset:.1f}, Y={alignment.y_offset:.1f}, conf={alignment.confidence:.3f}")
# Return RIGHT direction result
alignment = self._detect_strip_alignment(frame, ScanDirection.RIGHT, expected_x, expected_y)
results['success'] = alignment.valid
results['x_offset'] = alignment.x_offset
results['y_offset'] = alignment.y_offset
results['confidence'] = alignment.confidence
except Exception as e:
results['error'] = str(e)
self.log(f"Test error: {e}")
return results
def reset_alignment(self):
"""Reset cumulative alignment to zero."""
self._cumulative_align_x = 0.0
self._cumulative_align_y = 0.0
self._last_strip_alignment = AlignmentOffset()
self.log("Alignment reset to (0, 0)")
def get_memory_estimate(self) -> dict:
current_bytes = self.mosaic.nbytes if self.mosaic is not None else 0
max_bytes = self.config.max_mosaic_width * self.config.max_mosaic_height * 3
return {
'current_size': (self.state.mosaic_width, self.state.mosaic_height),
'current_mb': current_bytes / (1024 * 1024),
'max_size': (self.config.max_mosaic_width, self.config.max_mosaic_height),
'max_mb': max_bytes / (1024 * 1024),
}