Autoscope-Controller/src/stitching_scanner.py
2026-01-11 21:32:15 -06:00

1629 lines
No EOL
72 KiB
Python

"""
Stitching Scanner v2 - Simplified unified approach
Same displacement-based stitching for both horizontal rows and vertical row transitions.
No complex visual matching - just track displacement and append strips.
Continuous alignment correction for gear slippage compensation.
FIXES:
- Y comparison: Compare frame's BOTTOM with ROW 1's TOP (at Y=transition_height)
- X comparison (LEFT scan): Compare frame's RIGHT with transition strip's LEFT edge
- row_start_y now positions row 2 so its bottom overlaps with row 1's top
DEBUG BORDER COLORS (for debugging row 2 placement):
=================================================
In _detect_row_start_alignment():
- WHITE (thick): Outline of TRANSITION STRIPS (X=x_offset, Y=0 to transition_height)
- BLUE line: Y=transition_height (boundary between transition strips and row 1)
- CYAN: Y comparison region - TOP of ROW 1 (just below blue line)
- YELLOW: X comparison region - LEFT edge of transition strips (for LEFT scan)
- MAGENTA: Where the new frame is EXPECTED to be placed
In _blend_horizontal_at_y (append_left):
- RED (thick): Where strip WOULD have been placed (ORIGINAL position)
- GREEN: Where strip was ACTUALLY placed (ADJUSTED position)
MOSAIC LAYOUT AFTER DOWN TRANSITION:
====================================
Y=0 to Y=transition_height: TRANSITION STRIPS (at X=x_offset)
Y=transition_height to Y=mh: ROW 1 (shifted down)
Row 2 placement: Y = transition_height - fh + overlap
(so row 2's bottom overlaps with row 1's top)
"""
import cv2
import numpy as np
import time
import threading
from dataclasses import dataclass
from typing import Optional, Callable, Tuple
from enum import Enum
class ScanDirection(Enum):
RIGHT = 'right'
LEFT = 'left'
DOWN = 'down'
UP = 'up'
@dataclass
class StitchConfig:
displacement_threshold: float = 0.10 # 10% of frame triggers append
movement_interval: float = 0.001
frame_interval: float = 1.00
settle_time: float = 0.75
max_scan_time: float = 300.0
row_overlap: float = 0.15
max_mosaic_width: int = 1000
max_mosaic_height: int = 1000
scan_speed_index: int = 3
autofocus_every_row: bool = True
@dataclass
class StitchState:
is_scanning: bool = False
direction: str = ''
cumulative_x: float = 0.0
cumulative_y: float = 0.0
current_x: float = 0.0
current_y: float = 0.0
last_displacement: Tuple[float, float] = (0.0, 0.0)
current_row: int = 0
total_rows: int = 0
mosaic_width: int = 0
mosaic_height: int = 0
mosaic_init_width: int = 0
mosaic_init_height: int = 0
frame_count: int = 0
append_count: int = 0
@dataclass
class AlignmentOffset:
"""Stores alignment offset for strip placement"""
x_offset: float = 0.0
y_offset: float = 0.0
valid: bool = False
confidence: float = 0.0 # Phase correlation response
class StitchingScanner:
"""
Slide scanner using continuous stitching.
Unified approach for horizontal and vertical movement.
Continuous alignment correction for gear slippage compensation.
"""
def __init__(self, camera, motion_controller, autofocus_controller=None,
config: StitchConfig = None,
on_log: Callable[[str], None] = None,
on_progress: Callable[[int, int], None] = None,
on_mosaic_updated: Callable[[], None] = None):
self.camera = camera
self.motion = motion_controller
self.autofocus = autofocus_controller
self.config = config or StitchConfig()
self.on_log = on_log
self.on_progress = on_progress
self.on_mosaic_updated = on_mosaic_updated
self.running = False
self.paused = False
self.state = StitchState()
self._state_lock = threading.Lock()
self.mosaic: Optional[np.ndarray] = None
self._mosaic_lock = threading.RLock() # Changed to RLock to avoid deadlocks
self._prev_frame: Optional[np.ndarray] = None
self._displacement_since_append_x: float = 0.0
self._displacement_since_append_y: float = 0.0
# Cumulative alignment drift - tracks total correction applied
self._cumulative_align_x: float = 0.0
self._cumulative_align_y: float = 0.0
# Track the Y position in the mosaic where the current row starts
# This is critical for placing strips at the correct vertical position
self._row_start_y: int = 0
# Track the X position in the mosaic where the current row starts
# For row 1 (RIGHT scan): starts at X=0
# For row 2 (LEFT scan): starts at X = mosaic_width - frame_width (right edge)
self._row_start_x: int = 0
# Last strip's alignment for continuity
self._last_strip_alignment = AlignmentOffset()
self._thread: Optional[threading.Thread] = None
def log(self, message: str):
if self.on_log:
self.on_log(f"[Stitch] {message}")
print(f"[Stitch] {message}")
# =========================================================================
# Displacement Detection
# =========================================================================
def _to_grayscale(self, frame: np.ndarray) -> np.ndarray:
if len(frame.shape) == 3:
return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
return frame
def _detect_displacement(self, prev_frame: np.ndarray,
curr_frame: np.ndarray) -> Tuple[float, float]:
prev_gray = self._to_grayscale(prev_frame)
curr_gray = self._to_grayscale(curr_frame)
if prev_gray.shape != curr_gray.shape:
return (0.0, 0.0)
prev_f = prev_gray.astype(np.float32)
curr_f = curr_gray.astype(np.float32)
h, w = prev_gray.shape
window = cv2.createHanningWindow((w, h), cv2.CV_32F)
prev_f = prev_f * window
curr_f = curr_f * window
shift, response = cv2.phaseCorrelate(prev_f, curr_f)
return shift
def _detect_displacement_with_confidence(self, prev_frame: np.ndarray,
curr_frame: np.ndarray) -> Tuple[float, float, float]:
"""Detect displacement and return confidence (phase correlation response)."""
prev_gray = self._to_grayscale(prev_frame)
curr_gray = self._to_grayscale(curr_frame)
if prev_gray.shape != curr_gray.shape:
return (0.0, 0.0, 0.0)
prev_f = prev_gray.astype(np.float32)
curr_f = curr_gray.astype(np.float32)
h, w = prev_gray.shape
window = cv2.createHanningWindow((w, h), cv2.CV_32F)
prev_f = prev_f * window
curr_f = curr_f * window
shift, response = cv2.phaseCorrelate(prev_f, curr_f)
return (shift[0], shift[1], response)
def _detect_displacement_robust(self, prev_frame: np.ndarray,
curr_frame: np.ndarray) -> Tuple[float, float]:
dx, dy = self._detect_displacement(prev_frame, curr_frame)
h, w = prev_frame.shape[:2]
max_displacement = max(w, h) * 0.5
if abs(dx) > max_displacement or abs(dy) > max_displacement:
self.log(f"Warning: Large displacement ({dx:.1f}, {dy:.1f}), ignoring")
return (0.0, 0.0)
return (dx, dy)
def _detect_row_start_alignment(self, frame: np.ndarray, direction: ScanDirection) -> AlignmentOffset:
"""
Detect alignment at the START of a new row.
After a DOWN transition with prepend, the mosaic layout is:
- Y=0 to Y=transition_height: TRANSITION STRIPS (at X=x_offset)
- Y=transition_height to Y=mh: OLD ROW 1 (shifted down)
For Y alignment: Compare frame's BOTTOM with ROW 1's TOP (at Y=transition_height)
For X alignment (LEFT scan): Compare frame's RIGHT with transition strip's LEFT edge
"""
offset = AlignmentOffset()
with self._mosaic_lock:
if self.mosaic is None:
return offset
mh, mw = self.mosaic.shape[:2]
fh, fw = frame.shape[:2]
# Calculate where transition strips are in the mosaic
# x_offset is where the DOWN transition strips were placed horizontally
x_offset = max(0, mw - self.state.mosaic_init_width)
# Transition height is how much was added during DOWN transition
transition_height = mh - fh
self.log(f" Row-start alignment: mosaic {mw}x{mh}, frame {fw}x{fh}")
self.log(f" Transition strips at: X={x_offset}, Y=0 to Y={transition_height}")
self.log(f" Old row 1 at: Y={transition_height} to Y={mh}")
# ========== DEBUG: Draw borders showing layout ==========
# WHITE border: Where TRANSITION STRIPS are
cv2.rectangle(self.mosaic,
(x_offset, 0),
(min(x_offset + fw, mw), transition_height),
(255, 255, 255), 3) # WHITE - transition strips location
self.log(f" DEBUG: WHITE border - TRANSITION STRIPS at X={x_offset}:{min(x_offset + fw, mw)}, Y=0:{transition_height}")
# BLUE line at transition boundary (where transition ends and old row 1 begins)
cv2.line(self.mosaic, (0, transition_height), (mw, transition_height), (255, 0, 0), 3)
self.log(f" DEBUG: BLUE line at Y={transition_height} (transition/row1 boundary)")
vertical_overlap = min(200, fh // 3)
horizontal_overlap = min(200, fw // 3)
min_overlap = 50
# =============================================
# Step 1: Detect Y alignment
# =============================================
# Compare frame's BOTTOM with ROW 1's TOP (at Y=transition_height)
# Frame's bottom should overlap with the area just below the blue line
#
# Row 1's top is at Y=transition_height
# So compare mosaic[transition_height : transition_height+overlap]
row1_top_start = transition_height
row1_top_end = min(transition_height + vertical_overlap, mh)
# Frame's BOTTOM portion (will overlap with row 1's top)
frame_bottom = frame[fh - vertical_overlap:fh, :]
# Get the X range - centered on where transition strips are
x_start = x_offset
x_end = min(x_offset + fw, mw)
compare_width = x_end - x_start
if row1_top_end > row1_top_start and compare_width >= min_overlap:
# Mosaic region: TOP of row 1 (just below the transition)
mosaic_row1_top = self.mosaic[row1_top_start:row1_top_end, x_start:x_end]
# Frame region: bottom portion
frame_compare = frame_bottom[:, :compare_width]
# ========== DEBUG: Save frame with comparison region marked ==========
debug_frame = frame.copy()
cv2.rectangle(debug_frame, (0, fh - vertical_overlap), (compare_width, fh),
(255, 255, 0), 2) # CYAN - frame's bottom region used for Y comparison
self.log(f" DEBUG: Frame Y comparison region: X=0:{compare_width}, Y={fh - vertical_overlap}:{fh}")
try:
cv2.imwrite('/tmp/debug_frame_row2_start.png', debug_frame)
except:
pass
# ========== DEBUG: Draw CYAN border on mosaic for Y comparison region ==========
# CYAN at TOP of row 1 (just below blue line)
cv2.rectangle(self.mosaic,
(x_start, row1_top_start),
(x_end, row1_top_end),
(255, 255, 0), 2) # CYAN - mosaic comparison region for Y
self.log(f" DEBUG: CYAN border - mosaic Y comparison region X={x_start}:{x_end}, Y={row1_top_start}:{row1_top_end}")
# MAGENTA border: Where frame is EXPECTED to be placed
expected_y_start = transition_height - fh + vertical_overlap # Frame overlaps with row 1
expected_y_start = max(0, expected_y_start)
cv2.rectangle(self.mosaic,
(x_start, expected_y_start),
(x_end, expected_y_start + fh),
(255, 0, 255), 2) # MAGENTA - expected frame position
self.log(f" DEBUG: MAGENTA border - expected frame position X={x_start}:{x_end}, Y={expected_y_start}:{expected_y_start + fh}")
min_w = min(frame_compare.shape[1], mosaic_row1_top.shape[1])
min_h = min(frame_compare.shape[0], mosaic_row1_top.shape[0])
if min_w >= min_overlap and min_h >= min_overlap:
frame_compare = frame_compare[:min_h, :min_w]
mosaic_row1_top = mosaic_row1_top[:min_h, :min_w]
# ========== DEBUG: Save the comparison regions as images ==========
try:
cv2.imwrite('/tmp/debug_frame_bottom_region.png', frame_compare)
cv2.imwrite('/tmp/debug_mosaic_row1top_region.png', mosaic_row1_top)
self.log(f" DEBUG: Saved comparison regions to /tmp/debug_*.png")
except:
pass
# Detect displacement
dx_v, dy_v, conf_v = self._detect_displacement_with_confidence(
mosaic_row1_top, frame_compare)
self.log(f" Row-start Y alignment: dx={dx_v:.1f}, dy={dy_v:.1f}, conf={conf_v:.3f}")
self.log(f" Compared frame[{fh - vertical_overlap}:{fh}] with mosaic[{row1_top_start}:{row1_top_end}]")
if conf_v > 0.1:
offset.y_offset = dy_v
offset.confidence = conf_v
# =============================================
# Step 2: Detect X alignment
# =============================================
if direction == ScanDirection.LEFT:
# For LEFT scan: frame will move LEFT
# Frame's RIGHT edge will overlap with existing content's LEFT edge
# The LEFT edge of transition strips is at X=x_offset
# Compare frame's RIGHT with mosaic's LEFT edge of transition strips
# Mosaic region: LEFT edge of transition strips
mosaic_x_start = x_offset
mosaic_x_end = min(x_offset + horizontal_overlap, mw)
# Y range: within the transition strip area AND row 1 top
y_start = max(0, transition_height - fh // 2)
y_end = min(transition_height + fh // 2, mh)
if mosaic_x_end > mosaic_x_start and y_end > y_start:
mosaic_edge = self.mosaic[y_start:y_end, mosaic_x_start:mosaic_x_end]
# Frame's RIGHT edge
frame_edge = frame[:min(y_end - y_start, fh), fw - (mosaic_x_end - mosaic_x_start):fw]
# ========== DEBUG: Draw YELLOW border for X comparison region ==========
# YELLOW on LEFT side of transition strips
cv2.rectangle(self.mosaic,
(mosaic_x_start, y_start),
(mosaic_x_end, y_end),
(0, 255, 255), 2) # YELLOW - mosaic comparison region for X
self.log(f" DEBUG: YELLOW border - mosaic X comparison region X={mosaic_x_start}:{mosaic_x_end}, Y={y_start}:{y_end}")
min_h = min(mosaic_edge.shape[0], frame_edge.shape[0])
min_w = min(mosaic_edge.shape[1], frame_edge.shape[1])
if min_h >= min_overlap and min_w >= min_overlap:
mosaic_edge = mosaic_edge[:min_h, :min_w]
frame_edge = frame_edge[:min_h, :min_w]
dx_h, dy_h, conf_h = self._detect_displacement_with_confidence(
mosaic_edge, frame_edge)
self.log(f" Row-start X alignment: dx={dx_h:.1f}, dy={dy_h:.1f}, conf={conf_h:.3f}")
if conf_h > 0.1:
offset.x_offset = -dx_h
if conf_h > offset.confidence:
offset.confidence = conf_h
else:
# For RIGHT scan at row start
# Frame's LEFT edge will overlap with existing content's RIGHT edge
mosaic_x_end = min(x_offset + fw, mw)
mosaic_x_start = max(mosaic_x_end - horizontal_overlap, x_offset)
y_start = max(0, transition_height - fh // 2)
y_end = min(transition_height + fh // 2, mh)
if mosaic_x_end > mosaic_x_start and y_end > y_start:
mosaic_edge = self.mosaic[y_start:y_end, mosaic_x_start:mosaic_x_end]
frame_edge = frame[:min(y_end - y_start, fh), :mosaic_x_end - mosaic_x_start]
cv2.rectangle(self.mosaic,
(mosaic_x_start, y_start),
(mosaic_x_end, y_end),
(0, 255, 255), 2) # YELLOW
self.log(f" DEBUG: YELLOW border - mosaic X comparison region X={mosaic_x_start}:{mosaic_x_end}, Y={y_start}:{y_end}")
min_h = min(mosaic_edge.shape[0], frame_edge.shape[0])
min_w = min(mosaic_edge.shape[1], frame_edge.shape[1])
if min_h >= min_overlap and min_w >= min_overlap:
mosaic_edge = mosaic_edge[:min_h, :min_w]
frame_edge = frame_edge[:min_h, :min_w]
dx_h, dy_h, conf_h = self._detect_displacement_with_confidence(
mosaic_edge, frame_edge)
self.log(f" Row-start X alignment: dx={dx_h:.1f}, dy={dy_h:.1f}, conf={conf_h:.3f}")
if conf_h > 0.1:
offset.x_offset = -dx_h
if conf_h > offset.confidence:
offset.confidence = conf_h
# Limit maximum adjustment
max_adjust = 80
if abs(offset.x_offset) > max_adjust:
self.log(f" Limiting X offset from {offset.x_offset:.1f} to ±{max_adjust}")
offset.x_offset = max(-max_adjust, min(max_adjust, offset.x_offset))
if abs(offset.y_offset) > max_adjust:
self.log(f" Limiting Y offset from {offset.y_offset:.1f} to ±{max_adjust}")
offset.y_offset = max(-max_adjust, min(max_adjust, offset.y_offset))
offset.valid = offset.confidence > 0.1
if offset.valid:
self.log(f" Row-start alignment FINAL: X={offset.x_offset:.1f}, Y={offset.y_offset:.1f}, conf={offset.confidence:.3f}")
return offset
def _detect_strip_alignment(self, frame: np.ndarray, direction: ScanDirection,
expected_x: int, expected_y: int) -> AlignmentOffset:
"""
Detect alignment offset for a strip by comparing the current frame
with the expected overlap region of the mosaic.
This provides continuous correction for gear slippage during scanning.
Args:
frame: Current camera frame
direction: Scan direction
expected_x: Expected X position in mosaic
expected_y: Expected Y position in mosaic
Returns:
AlignmentOffset with X/Y correction needed
"""
offset = AlignmentOffset()
with self._mosaic_lock:
if self.mosaic is None:
return offset
mh, mw = self.mosaic.shape[:2]
fh, fw = frame.shape[:2]
# Clamp expected positions
expected_y = max(0, min(expected_y, mh - fh))
expected_x = max(0, min(expected_x, mw - fw))
# Increased overlap for better detection
max_overlap = 250 # Increased from 200
min_overlap = 40 # Increased from 30
if direction == ScanDirection.RIGHT:
# We're appending to the right
# Compare left portion of frame with right edge of mosaic
overlap_width = min(fw // 2, mw - expected_x, max_overlap)
if overlap_width < min_overlap:
return offset
# Extract regions
mosaic_region = self.mosaic[expected_y:expected_y + fh, mw - overlap_width:mw]
frame_region = frame[:, :overlap_width]
elif direction == ScanDirection.LEFT:
# We're placing within existing mosaic, moving left
# Compare right portion of frame with mosaic at expected position
overlap_width = min(fw // 2, mw - expected_x, max_overlap)
if overlap_width < min_overlap:
return offset
# The frame's right edge should align with mosaic at expected_x + fw
mosaic_x_end = min(expected_x + fw, mw)
mosaic_x_start = max(mosaic_x_end - overlap_width, 0)
actual_overlap = mosaic_x_end - mosaic_x_start
if actual_overlap < min_overlap:
return offset
mosaic_region = self.mosaic[expected_y:expected_y + fh, mosaic_x_start:mosaic_x_end]
frame_region = frame[:, fw - actual_overlap:]
elif direction == ScanDirection.DOWN:
# We're appending below
# Compare top portion of frame with bottom edge of mosaic
overlap_height = min(fh // 2, mh - expected_y, max_overlap)
if overlap_height < min_overlap:
return offset
mosaic_region = self.mosaic[mh - overlap_height:mh, expected_x:expected_x + fw]
frame_region = frame[:overlap_height, :]
else: # UP
# Compare bottom portion of frame with top edge of mosaic
overlap_height = min(fh // 2, expected_y, max_overlap)
if overlap_height < min_overlap:
return offset
mosaic_region = self.mosaic[:overlap_height, expected_x:expected_x + fw]
frame_region = frame[fh - overlap_height:, :]
# Ensure regions have the same size
min_h = min(mosaic_region.shape[0], frame_region.shape[0])
min_w = min(mosaic_region.shape[1], frame_region.shape[1])
if min_h < min_overlap or min_w < min_overlap:
self.log(f"Strip alignment: overlap too small ({min_w}x{min_h})")
return offset
mosaic_region = mosaic_region[:min_h, :min_w]
frame_region = frame_region[:min_h, :min_w]
# Detect displacement with confidence
dx, dy, confidence = self._detect_displacement_with_confidence(mosaic_region, frame_region)
# Sanity check - reject large displacements
max_adjust = 50 # Max pixels to adjust
if abs(dx) > max_adjust or abs(dy) > max_adjust:
self.log(f"Strip alignment: displacement too large ({dx:.1f}, {dy:.1f}), ignoring")
return offset
offset.x_offset = dx
offset.y_offset = dy
offset.confidence = confidence
offset.valid = confidence > 0.1 # Require minimum confidence
if offset.valid:
self.log(f" Strip alignment: X={dx:.1f}, Y={dy:.1f}, conf={confidence:.3f}")
return offset
# =========================================================================
# Mosaic Building
# =========================================================================
def _init_mosaic(self, frame: np.ndarray):
with self._mosaic_lock:
self.mosaic = frame.copy()
self._prev_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
# Reset cumulative alignment
self._cumulative_align_x = 0.0
self._cumulative_align_y = 0.0
self._last_strip_alignment = AlignmentOffset()
# Row 0 starts at Y=0
self._row_start_y = 0
with self._state_lock:
h, w = frame.shape[:2]
self.state.mosaic_width = w
self.state.mosaic_height = h
self.state.mosaic_init_width = w
self.state.mosaic_init_height = h
self.state.frame_count = 1
self.state.append_count = 0
self.state.current_y = 0
self.state.current_x = 0
self.log(f"Initialized mosaic: {frame.shape[1]}x{frame.shape[0]}")
def _blend_horizontal_at_y(self, base: np.ndarray, strip: np.ndarray,
blend_width: int, append_right: bool,
x_offset: int = None, y_offset: int = 0,
alignment_x: float = 0.0, alignment_y: float = 0.0) -> np.ndarray:
"""
Blend strip horizontally onto base at specified Y position.
Args:
base: The existing mosaic
strip: The new strip to append
blend_width: Width of the blending zone
append_right: True to append to right, False to append left
x_offset: X position for left-append mode
y_offset: Y position in the mosaic
alignment_x: Additional X alignment offset (from strip alignment detection)
alignment_y: Additional Y alignment offset (from strip alignment detection)
"""
h_base, w_base = base.shape[:2]
h_strip, w_strip = strip.shape[:2]
blend_w = min(blend_width, w_strip, w_base)
if append_right:
# Apply alignment offset for Y
y_offset = y_offset + int(round(alignment_y))
# Clamp y_offset
y_offset = max(0, min(y_offset, h_base - h_strip))
# Expand mosaic to the right
result_width = w_base + w_strip - blend_w
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
self.log(f"=== _blend_horizontal_at_y (append_right) ===")
self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}")
self.log(f" y_offset: {y_offset}, blend_w: {blend_w}")
self.log(f" alignment: X={alignment_x:.1f}, Y={alignment_y:.1f}")
self.log(f" result: {result_width}x{h_base}")
# Step 1: Copy entire base
result[:, :w_base] = base
# Step 2: Copy non-overlap portion of strip at correct Y
self.log(f" Placing strip at X={w_base - blend_w}:{result_width}, Y={y_offset}:{y_offset + h_strip}")
result[y_offset:y_offset + h_strip, w_base:] = strip[:, blend_w:]
# Step 3: Create blend
alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
base_overlap = base[y_offset:y_offset + h_strip, -blend_w:].astype(np.float32)
strip_overlap = strip[:, :blend_w].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
# Step 4: Place blend
result[y_offset:y_offset + h_strip, w_base - blend_w:w_base] = blended
return result
else: # append_left - place at x_offset, NO width expansion
# x_offset is where the LEFT edge of the strip should go
if x_offset is None:
x_offset = 0
# Store ORIGINAL position before alignment (for debug)
original_x_offset = x_offset
original_y_offset = y_offset
# Apply alignment offsets (continuous correction)
x_offset = x_offset + int(round(alignment_x))
y_offset_before = y_offset
y_offset = y_offset - int(round(alignment_y))
self.log(f" Y offset computation: {y_offset_before} - {int(round(alignment_y))} = {y_offset}")
# Clamp x_offset to valid range
x_offset = max(0, min(x_offset, w_base))
# Handle strip cropping if y_offset is negative (strip protrudes above frame)
strip_y_start = 0 # How much to crop from top of strip
if y_offset < 0:
strip_y_start = -y_offset # Crop this many rows from top of strip
y_offset = 0
self.log(f" Cropping {strip_y_start}px from top of strip")
# Handle strip cropping if it protrudes below frame
strip_y_end = h_strip
if y_offset + (h_strip - strip_y_start) > h_base:
strip_y_end = h_strip - ((y_offset + (h_strip - strip_y_start)) - h_base)
self.log(f" Cropping bottom of strip, new strip_y_end={strip_y_end}")
# Get the cropped strip
if strip_y_start >= strip_y_end:
self.log(f" WARNING: Strip fully cropped, nothing to place")
return base.copy()
cropped_strip = strip[strip_y_start:strip_y_end, :]
h_cropped = cropped_strip.shape[0]
self.log(f"=== _blend_horizontal_at_y (append_left) ===")
self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}")
self.log(f" x_offset: {x_offset}, y_offset: {y_offset}, blend_w: {blend_w}")
self.log(f" alignment: X={alignment_x:.1f}, Y={alignment_y:.1f}")
self.log(f" cumulative: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
self.log(f" row_start_y: {self._row_start_y}")
self.log(f" Strip crop: rows [{strip_y_start}:{strip_y_end}] -> height {h_cropped}")
self.log(f" ORIGINAL position: X={original_x_offset}, Y={original_y_offset}")
self.log(f" ADJUSTED position: X={x_offset}, Y={y_offset}")
# Result is same size as base (no expansion when going left)
result = base.copy()
# Calculate placement bounds
strip_x_start = x_offset
strip_x_end = min(x_offset + w_strip, w_base)
strip_cols_to_copy = strip_x_end - strip_x_start
self.log(f" Placing strip at X={strip_x_start}:{strip_x_end}, Y={y_offset}:{y_offset + h_cropped}")
self.log(f" Strip cols to copy: {strip_cols_to_copy}")
# ========== DEBUG: Draw borders BEFORE placing strip ==========
# RED border: Where strip WOULD have been placed (original position)
orig_x_end = min(original_x_offset + w_strip, w_base)
orig_y_end = min(original_y_offset + h_strip, h_base)
if original_x_offset >= 0 and original_y_offset >= 0:
cv2.rectangle(result,
(original_x_offset, max(0, original_y_offset)),
(orig_x_end, orig_y_end),
(0, 0, 255), 3) # RED - original position
self.log(f" DEBUG: RED border at original position X={original_x_offset}:{orig_x_end}, Y={original_y_offset}:{orig_y_end}")
# Step 1: Copy strip content (non-blend portion) at correct position
# For LEFT scanning, blend is on the RIGHT side of the strip
non_blend_end = strip_cols_to_copy - blend_w
if non_blend_end > 0:
result[y_offset:y_offset + h_cropped, strip_x_start:strip_x_start + non_blend_end] = cropped_strip[:, :non_blend_end]
self.log(f" Step 1: Placed non-blend at X={strip_x_start}:{strip_x_start + non_blend_end}")
# Step 2: Create blend on the RIGHT edge of strip (blending with existing content)
blend_x_start = strip_x_end - blend_w
blend_x_end = strip_x_end
if blend_w > 0 and blend_x_start >= strip_x_start:
alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
strip_overlap = cropped_strip[:, -blend_w:].astype(np.float32)
base_overlap = base[y_offset:y_offset + h_cropped, blend_x_start:blend_x_end].astype(np.float32)
blended = (strip_overlap * alpha + base_overlap * (1 - alpha)).astype(np.uint8)
result[y_offset:y_offset + h_cropped, blend_x_start:blend_x_end] = blended
self.log(f" Step 2: Blend zone at X={blend_x_start}:{blend_x_end}")
# ========== DEBUG: Draw border AFTER placing strip ==========
# GREEN border: Where strip was ACTUALLY placed (adjusted position)
cv2.rectangle(result,
(strip_x_start, y_offset),
(strip_x_end, y_offset + h_cropped),
(0, 255, 0), 2) # GREEN - actual position
self.log(f" DEBUG: GREEN border at actual position X={strip_x_start}:{strip_x_end}, Y={y_offset}:{y_offset + h_cropped}")
self.log(f" Final: Strip placed at X={strip_x_start}, Y={y_offset}, mosaic size unchanged: {w_base}x{h_base}")
return result
def _blend_horizontal(self, base: np.ndarray, strip: np.ndarray,
blend_width: int, append_right: bool) -> np.ndarray:
if blend_width <= 0 or blend_width >= strip.shape[1]:
if append_right:
return np.hstack([base, strip])
return np.hstack([strip, base])
h_base, w_base = base.shape[:2]
h_strip, w_strip = strip.shape[:2]
self.log(f"Base Width: {w_base}px")
if h_strip != h_base:
if append_right:
return np.hstack([base, strip])
return np.hstack([strip, base])
blend_w = min(blend_width, w_strip, w_base)
if append_right:
result_width = w_base + w_strip - blend_w
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
result[:, :w_base] = base
alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
base_overlap = base[:, -blend_w:].astype(np.float32)
strip_overlap = strip[:, :blend_w].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
result[:, w_base - blend_w:w_base] = blended
result[:, w_base:] = strip[:, blend_w:]
return result
else:
result_width = w_base + w_strip - blend_w
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
result[:, :w_strip] = strip
alpha = np.linspace(0, 1, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
strip_overlap = strip[:, -blend_w:].astype(np.float32)
base_overlap = base[:, :blend_w].astype(np.float32)
blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8)
result[:, w_strip - blend_w:w_strip] = blended
result[:, w_strip:] = base[:, blend_w:]
return result
def _blend_vertical_at_x(self, base: np.ndarray, strip: np.ndarray,
blend_height: int, append_below: bool,
x_off: int = 0,
alignment_x: float = 0.0, alignment_y: float = 0.0) -> np.ndarray:
h_base, w_base = base.shape[:2]
h_strip, w_strip = strip.shape[:2]
# Apply alignment offset for X position
x_offset = max(0, w_base - self.state.mosaic_init_width)
x_offset = x_offset + int(round(alignment_x))
x_offset = max(0, min(x_offset, w_base - w_strip)) if w_strip < w_base else 0
# Create full-width strip with strip placed at x_offset
full_strip = np.zeros((h_strip, w_base, 3), dtype=np.uint8)
available_width = w_base - x_offset
copy_width = min(w_strip, available_width)
full_strip[:, x_offset:x_offset + copy_width] = strip[:, :copy_width]
self.log(f"=== _blend_vertical_at_x ===")
self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}")
self.log(f" x_offset: {x_offset}, alignment: X={alignment_x:.1f}, Y={alignment_y:.1f}")
# Early exit: no blending possible
if blend_height <= 0 or blend_height >= h_strip:
if append_below:
return np.vstack([base, full_strip])
return np.vstack([full_strip, base])
blend_h = min(blend_height, h_strip, h_base)
if append_below:
result_height = h_base + h_strip - blend_h
result = np.zeros((result_height, w_base, 3), dtype=np.uint8)
result[:h_base, :] = base
alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
base_overlap = base[-blend_h:, :].astype(np.float32)
strip_overlap = full_strip[:blend_h, :].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
result[h_base - blend_h:h_base, :] = blended
result[h_base:, :] = full_strip[blend_h:, :]
return result
else:
result_height = h_base + h_strip - blend_h
result = np.zeros((result_height, w_base, 3), dtype=np.uint8)
result[:h_strip, :] = full_strip
alpha = np.linspace(0, 1, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
strip_overlap = full_strip[-blend_h:, :].astype(np.float32)
base_overlap = base[:blend_h, :].astype(np.float32)
blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8)
result[h_strip - blend_h:h_strip, :] = blended
result[h_strip:, :] = base[blend_h:, :]
return result
def _blend_vertical(self, base: np.ndarray, strip: np.ndarray,
blend_height: int, append_below: bool) -> np.ndarray:
mh, mw = base.shape[:2]
sh, sw = strip.shape[:2]
# Match widths
if sw > mw:
strip = strip[:, :mw]
elif sw < mw:
pad = np.zeros((sh, mw - sw, 3), dtype=np.uint8)
strip = np.hstack([strip, pad])
blend_h = min(blend_height, sh, mh)
if blend_h <= 0:
if append_below:
return np.vstack([base, strip])
return np.vstack([strip, base])
if append_below:
alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
base_overlap = base[-blend_h:].astype(np.float32)
strip_overlap = strip[:blend_h].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
result_h = mh + sh - blend_h
result = np.zeros((result_h, mw, 3), dtype=np.uint8)
result[:mh - blend_h] = base[:-blend_h]
result[mh - blend_h:mh] = blended
result[mh:] = strip[blend_h:]
return result
else:
alpha = np.linspace(0, 1, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
strip_overlap = strip[-blend_h:].astype(np.float32)
base_overlap = base[:blend_h].astype(np.float32)
blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8)
result_h = mh + sh - blend_h
result = np.zeros((result_h, mw, 3), dtype=np.uint8)
result[:sh - blend_h] = strip[:-blend_h]
result[sh - blend_h:sh] = blended
result[sh:] = base[blend_h:]
return result
def _append_strip(self, frame: np.ndarray, direction: ScanDirection):
"""Append strip to mosaic based on accumulated displacement with continuous alignment."""
BLEND_WIDTH = 10
SAFETY_MARGIN = 2
with self._mosaic_lock:
if self.mosaic is None:
return
h, w = frame.shape[:2]
mh, mw = self.mosaic.shape[:2]
dx = abs(self._displacement_since_append_x)
dy = abs(self._displacement_since_append_y)
# Calculate expected position for alignment detection
# Use _row_start_y for the Y position since that's where this row's content belongs
expected_x = int(self.state.current_x + self._cumulative_align_x)
expected_y = int(self._row_start_y + self._cumulative_align_y)
# Detect alignment for this strip
alignment = self._detect_strip_alignment(frame, direction, expected_x, expected_y)
if alignment.valid:
# Update cumulative alignment
self._cumulative_align_x += alignment.x_offset
self._cumulative_align_y += alignment.y_offset
self._last_strip_alignment = alignment
# Get total alignment offsets
align_x = self._cumulative_align_x
align_y = self._cumulative_align_y
if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
append_width = round(dx) + SAFETY_MARGIN
append_width = min(append_width, w - BLEND_WIDTH - 5)
if append_width < 1:
return
pixels_consumed = append_width - SAFETY_MARGIN
fractional_remainder = dx - pixels_consumed
# Use _row_start_y for Y position - this is the Y position in the mosaic
# where the current row's content belongs
y_offset = self._row_start_y
if direction == ScanDirection.RIGHT:
strip_start = max(0, w - append_width - BLEND_WIDTH)
new_strip = frame[:, strip_start:]
self.mosaic = self._blend_horizontal_at_y(
self.mosaic, new_strip, BLEND_WIDTH, append_right=True,
x_offset=int(self.state.current_x), y_offset=y_offset,
alignment_x=align_x, alignment_y=align_y)
else:
strip_end = min(w, append_width + BLEND_WIDTH)
new_strip = frame[:, :strip_end]
self.mosaic = self._blend_horizontal_at_y(
self.mosaic, new_strip, BLEND_WIDTH, append_right=False,
x_offset=int(self.state.current_x), y_offset=y_offset,
alignment_x=align_x, alignment_y=align_y)
self._displacement_since_append_x = fractional_remainder
self._displacement_since_append_y = 0.0
elif direction in [ScanDirection.DOWN, ScanDirection.UP]:
append_height = round(dy) + SAFETY_MARGIN
append_height = min(append_height, h - BLEND_WIDTH - 5)
if append_height < 1:
return
pixels_consumed = append_height - SAFETY_MARGIN
fractional_remainder = dy - pixels_consumed
if direction == ScanDirection.DOWN:
strip_end = min(h, append_height + BLEND_WIDTH)
new_strip = frame[:strip_end, :]
self.mosaic = self._blend_vertical_at_x(
self.mosaic, new_strip, BLEND_WIDTH, append_below=False,
x_off=int(self.state.current_x),
alignment_x=align_x, alignment_y=align_y)
else:
strip_start = max(0, h - append_height - BLEND_WIDTH)
new_strip = frame[strip_start:, :]
self.mosaic = self._blend_vertical_at_x(
self.mosaic, new_strip, BLEND_WIDTH, append_below=True,
x_off=int(self.state.current_x),
alignment_x=align_x, alignment_y=align_y)
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = fractional_remainder
new_mh, new_mw = self.mosaic.shape[:2]
with self._state_lock:
self.state.mosaic_width = new_mw
self.state.mosaic_height = new_mh
self.state.append_count += 1
if self.on_mosaic_updated:
self.on_mosaic_updated()
# =========================================================================
# Scan Control
# =========================================================================
def start(self) -> bool:
if self.running:
self.log("Already running")
return False
self.running = True
self.paused = False
with self._state_lock:
self.state = StitchState()
self.state.is_scanning = True
with self._mosaic_lock:
self.mosaic = None
self._prev_frame = None
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
# Reset cumulative alignment
self._cumulative_align_x = 0.0
self._cumulative_align_y = 0.0
self._last_strip_alignment = AlignmentOffset()
# Reset row start Y position
self._row_start_y = 0
self._thread = threading.Thread(target=self._scan_loop, daemon=True)
self._thread.start()
self.log("Stitching scan started")
return True
def stop(self):
self.running = False
self.paused = False
self.motion.stop_all()
with self._state_lock:
self.state.is_scanning = False
self.log("Scan stopped")
def pause(self):
if self.running and not self.paused:
self.paused = True
self.motion.stop_all()
self.log("Scan paused")
def resume(self):
if self.running and self.paused:
self.paused = False
self.log("Scan resumed")
# =========================================================================
# Scanning Logic
# =========================================================================
def _scan_loop(self):
try:
self.log("Starting scan loop")
self.log(f"Max dimensions: {self.config.max_mosaic_width}x{self.config.max_mosaic_height}")
self.motion.set_speed(self.config.scan_speed_index)
time.sleep(0.1)
frame = self._capture_frame()
self._init_mosaic(frame)
row = 0
while self.running:
with self._state_lock:
self.state.current_row = row
self.state.total_rows = row + 1
self.log(f"=== Row {row + 1} ===")
self.log(f"Row start Y position: {self._row_start_y}")
self.log(f"Row start X position: {self.state.current_x}")
self.log(f"Cumulative alignment at row start: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
# Serpentine: even rows right, odd rows left
h_direction = ScanDirection.RIGHT if row % 2 == 0 else ScanDirection.LEFT
# For rows > 0, detect alignment against both edges before scanning
if row > 0:
frame = self._capture_frame()
row_alignment = self._detect_row_start_alignment(frame, h_direction)
if row_alignment.valid:
self.log(f"Applying row-start alignment: X={row_alignment.x_offset:.1f}, Y={row_alignment.y_offset:.1f}")
self._cumulative_align_x += row_alignment.x_offset
self._cumulative_align_y += row_alignment.y_offset
self.log(f"New cumulative alignment: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
stop_reason = self._scan_direction(h_direction)
if not self.running:
break
# Check max height
if self.state.mosaic_height >= self.config.max_mosaic_height:
self.log(f"Max height reached ({self.state.mosaic_height}px)")
break
# Move to next row using same stitching approach
if not self._move_to_next_row():
self.log("Failed to move to next row")
break
row += 1
self.log(f"Scan complete! Final: {self.state.mosaic_width}x{self.state.mosaic_height}")
self.log(f"Final cumulative alignment: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
except Exception as e:
self.log(f"Scan error: {e}")
import traceback
traceback.print_exc()
finally:
self.running = False
self.motion.stop_all()
with self._state_lock:
self.state.is_scanning = False
def _scan_direction(self, direction: ScanDirection) -> str:
"""Scan in a direction until edge or max dimension reached."""
self.log(f"Scanning {direction.value}...")
with self._state_lock:
self.state.direction = direction.value
frame = self._capture_frame()
h, w = frame.shape[:2]
total_x = 0
# Setup based on direction
if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
threshold_pixels = w * self.config.displacement_threshold
max_dim = self.config.max_mosaic_width
current_dim = lambda: self.state.mosaic_width
start_cmd = 'E' if direction == ScanDirection.RIGHT else 'W'
stop_cmd = 'e' if direction == ScanDirection.RIGHT else 'w'
else:
threshold_pixels = h * self.config.displacement_threshold
max_dim = self.config.max_mosaic_height
current_dim = lambda: self.state.mosaic_height
start_cmd = 'S' if direction == ScanDirection.DOWN else 'N'
stop_cmd = 's' if direction == ScanDirection.DOWN else 'n'
self._prev_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
start_time = time.time()
no_movement_count = 0
max_no_movement = 50
stop_reason = 'stopped'
self.log(f"Scanning 2..")
while self.running and not self.paused:
if time.time() - start_time > self.config.max_scan_time:
self.log("Scan timeout")
stop_reason = 'timeout'
break
if current_dim() >= max_dim and direction == ScanDirection.RIGHT:
self.log(f"Max dimension reached ({current_dim()}px)")
stop_reason = 'max_dim'
break
# For LEFT scan: stop when we've reached the left edge (current_x <= 0)
if self.state.current_x <= 0 and direction == ScanDirection.LEFT:
self.log(f"Reached left edge (current_x={self.state.current_x:.1f})")
stop_reason = 'edge'
break
if abs(self.state.current_x) >= self.config.max_mosaic_width and direction == ScanDirection.RIGHT:
self.log(f"Max dimension reached ({self.config.max_mosaic_width}px)")
self.log(f"Current X offset ({self.state.current_x}px)")
stop_reason = 'max_dim'
break
# Pulse motor
self.motion.send_command(start_cmd)
time.sleep(self.config.movement_interval)
self.motion.send_command(stop_cmd)
time.sleep(self.config.frame_interval)
curr_frame = self._capture_frame()
dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame)
self.log(f"Scanning dx{dx} dy{dy}..")
self._displacement_since_append_x += dx
self._displacement_since_append_y += dy
total_x += dx
with self._state_lock:
self.state.current_x += dx
with self._state_lock:
self.state.cumulative_x = self._displacement_since_append_x
self.state.cumulative_y = self._displacement_since_append_y
self.state.last_displacement = (dx, dy)
self.state.frame_count += 1
# Edge detection
movement = abs(dx) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(dy)
self.log(f"Scanning movement{movement}..")
if movement < 1.0:
no_movement_count += 1
if no_movement_count >= max_no_movement:
self.log(f"Edge detected (no movement)")
stop_reason = 'edge'
break
else:
no_movement_count = 0
# Append when threshold reached (with continuous alignment)
disp = abs(self._displacement_since_append_x) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(self._displacement_since_append_y)
self.log(f"Scanning disp{disp}..")
if disp >= threshold_pixels:
self.log(f"Scanning threshold_pixels..")
self._append_strip(curr_frame, direction)
self.log(f"Appended {disp:.1f}px, mosaic: {self.state.mosaic_width}x{self.state.mosaic_height}, align: ({self._cumulative_align_x:.1f}, {self._cumulative_align_y:.1f})")
self._prev_frame = curr_frame.copy()
if self.on_progress:
self.on_progress(self.state.append_count, 0)
self.motion.send_command(stop_cmd)
time.sleep(self.config.settle_time)
self.log(f"Direction finished: {stop_reason}")
return stop_reason
def _move_to_next_row(self) -> bool:
"""
Move down to next row using displacement-based stitching.
Same approach as horizontal scanning with continuous alignment.
"""
self.log("Moving to next row...")
self.log(f"Alignment before row transition: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
frame = self._capture_frame()
h, w = frame.shape[:2]
# Record mosaic height before transition - needed to calculate new row Y position
mosaic_height_before = self.state.mosaic_height
# Target: move (1 - overlap) * frame_height
target_displacement = h * (1 - self.config.row_overlap)
threshold_pixels = h * self.config.displacement_threshold
self.log(f"Target Y: {target_displacement:.0f}px, threshold: {threshold_pixels:.0f}px")
self.log(f"Mosaic height before row transition: {mosaic_height_before}")
with self._state_lock:
self.state.direction = 'down'
self.state.cumulative_y = 0.0
self._prev_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
total_y = 0.0
no_movement_count = 0
max_no_movement = 30
# Start moving South
self.motion.send_command('S')
try:
while self.running:
self.motion.send_command('S')
time.sleep(self.config.movement_interval)
self.motion.send_command('s')
time.sleep(self.config.frame_interval)
curr_frame = self._capture_frame()
dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame)
self._displacement_since_append_y += dy
total_y += dy
with self._state_lock:
self.state.current_y += dy
with self._state_lock:
self.state.cumulative_y = total_y
self.state.last_displacement = (dx, dy)
# Edge detection
if abs(dy) < 1.0:
no_movement_count += 1
if no_movement_count >= max_no_movement:
self.log("Edge detected during row transition")
self.motion.send_command('s')
time.sleep(self.config.settle_time)
return False
else:
no_movement_count = 0
# Append strip when threshold reached (with continuous alignment)
if abs(self._displacement_since_append_y) >= threshold_pixels:
self._append_strip(curr_frame, ScanDirection.DOWN)
self.log(f" Row transition: appended, total Y: {abs(total_y):.1f}px, align: ({self._cumulative_align_x:.1f}, {self._cumulative_align_y:.1f})")
# Done when we've moved enough
if abs(total_y) >= target_displacement:
self.log(f"Row transition complete: {abs(total_y):.1f}px")
self.log(f"Alignment after row transition: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
# Calculate the Y position in the mosaic where the new row should be placed
# After prepending, the layout is:
# Y=0 to Y=transition_height: transition strips
# Y=transition_height to Y=mh: old row 1 (shifted down)
#
# Row 2 should be placed so its BOTTOM overlaps with row 1's TOP
# transition_height = mh - fh (height added during transition)
# overlap_pixels = h * row_overlap
#
# Row 2 Y position: transition_height - fh + overlap_pixels
# This puts row 2's bottom at: (transition_height - fh + overlap) + fh = transition_height + overlap
# Which overlaps with row 1's top at Y=transition_height
overlap_pixels = int(h * self.config.row_overlap)
transition_height = self.state.mosaic_height - h
self._row_start_y = max(0, transition_height - h + overlap_pixels)
# Calculate the X position where transition strips were placed
x_offset = max(0, self.state.mosaic_width - self.state.mosaic_init_width)
self.log(f"New row Y position: {self._row_start_y} (transition_height={transition_height}, overlap={overlap_pixels})")
self.log(f"New row X position: {x_offset} (transition strip location)")
with self._state_lock:
self.state.current_y = 0
# Set current_x to the x_offset where transition strips are
self.state.current_x = x_offset
self.motion.send_command('s')
time.sleep(self.config.settle_time)
# Reset for next horizontal row
frame = self._capture_frame()
self._prev_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
return True
self._prev_frame = curr_frame.copy()
except Exception as e:
self.log(f"Row transition error: {e}")
self.motion.send_command('s')
return False
self.motion.send_command('s')
time.sleep(self.config.settle_time)
return False
def _capture_frame(self) -> np.ndarray:
frame = self.camera.capture_frame()
frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
return frame
# =========================================================================
# Getters
# =========================================================================
def get_state(self) -> StitchState:
with self._state_lock:
return StitchState(
is_scanning=self.state.is_scanning,
direction=self.state.direction,
cumulative_x=self.state.cumulative_x,
cumulative_y=self.state.cumulative_y,
last_displacement=self.state.last_displacement,
current_row=self.state.current_row,
total_rows=self.state.total_rows,
mosaic_width=self.state.mosaic_width,
mosaic_height=self.state.mosaic_height,
frame_count=self.state.frame_count,
append_count=self.state.append_count
)
def get_alignment_state(self) -> dict:
"""Get current alignment correction state."""
return {
'cumulative_x': self._cumulative_align_x,
'cumulative_y': self._cumulative_align_y,
'row_start_y': self._row_start_y,
'last_alignment': {
'x': self._last_strip_alignment.x_offset,
'y': self._last_strip_alignment.y_offset,
'confidence': self._last_strip_alignment.confidence,
'valid': self._last_strip_alignment.valid
}
}
def get_mosaic(self) -> Optional[np.ndarray]:
with self._mosaic_lock:
if self.mosaic is not None:
return self.mosaic.copy()
return None
def get_mosaic_preview(self, max_size: int = 600) -> Optional[np.ndarray]:
with self._mosaic_lock:
if self.mosaic is None:
return None
h, w = self.mosaic.shape[:2]
scale = min(max_size / w, max_size / h, 1.0)
if scale < 1.0:
new_w = int(w * scale)
new_h = int(h * scale)
return cv2.resize(self.mosaic, (new_w, new_h))
return self.mosaic.copy()
def save_mosaic(self, filepath: str) -> bool:
with self._mosaic_lock:
if self.mosaic is None:
return False
cv2.imwrite(filepath, self.mosaic)
self.log(f"Saved mosaic to {filepath}")
return True
# =========================================================================
# Testing
# =========================================================================
def test_displacement(self, num_frames: int = 10) -> dict:
results = {'frames': [], 'total_dx': 0.0, 'total_dy': 0.0}
prev_frame = self._capture_frame()
for i in range(num_frames):
time.sleep(0.1)
curr_frame = self._capture_frame()
dx, dy = self._detect_displacement(prev_frame, curr_frame)
results['frames'].append({'frame': i, 'dx': dx, 'dy': dy})
results['total_dx'] += dx
results['total_dy'] += dy
prev_frame = curr_frame
return results
def test_row_start_alignment(self, direction: str = 'left') -> dict:
"""Test row-start alignment detection."""
results = {
'success': False,
'x_offset': 0.0,
'y_offset': 0.0,
'confidence': 0.0,
'error': None
}
try:
self.log("Testing row-start alignment detection...")
if self.mosaic is None:
self.log("No mosaic - initializing...")
frame = self._capture_frame()
self._init_mosaic(frame)
frame = self._capture_frame()
scan_dir = ScanDirection.LEFT if direction == 'left' else ScanDirection.RIGHT
alignment = self._detect_row_start_alignment(frame, scan_dir)
results['success'] = alignment.valid
results['x_offset'] = alignment.x_offset
results['y_offset'] = alignment.y_offset
results['confidence'] = alignment.confidence
self.log(f"Row-start alignment: valid={alignment.valid}, X={alignment.x_offset:.1f}, Y={alignment.y_offset:.1f}, conf={alignment.confidence:.3f}")
except Exception as e:
results['error'] = str(e)
self.log(f"Test error: {e}")
return results
def test_row_transition(self) -> dict:
"""Test row transition using displacement stitching."""
results = {
'success': False,
'y_moved': 0.0,
'mosaic_before': (0, 0),
'mosaic_after': (0, 0),
'alignment_before': (0.0, 0.0),
'alignment_after': (0.0, 0.0),
'error': None
}
try:
self.log("Testing row transition...")
if self.mosaic is None:
frame = self._capture_frame()
self._init_mosaic(frame)
results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height)
results['alignment_before'] = (self._cumulative_align_x, self._cumulative_align_y)
with self._state_lock:
self.state.cumulative_y = 0.0
self.running = True
success = self._move_to_next_row()
self.running = False
results['success'] = success
results['y_moved'] = self.state.cumulative_y
results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height)
results['alignment_after'] = (self._cumulative_align_x, self._cumulative_align_y)
self.log(f"Row transition: {'SUCCESS' if success else 'FAILED'}, Y: {results['y_moved']:.1f}px")
self.log(f"Alignment change: ({results['alignment_before'][0]:.1f}, {results['alignment_before'][1]:.1f}) -> ({results['alignment_after'][0]:.1f}, {results['alignment_after'][1]:.1f})")
except Exception as e:
results['error'] = str(e)
self.log(f"Test error: {e}")
self.running = False
return results
def test_single_row(self, direction: str = 'right') -> dict:
"""Test scanning a single row."""
results = {
'success': False,
'stop_reason': None,
'appends': 0,
'mosaic_before': (0, 0),
'mosaic_after': (0, 0),
'alignment_before': (0.0, 0.0),
'alignment_after': (0.0, 0.0),
'error': None
}
try:
self.log(f"Testing single row ({direction})...")
if self.mosaic is None:
frame = self._capture_frame()
self._init_mosaic(frame)
results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height)
results['alignment_before'] = (self._cumulative_align_x, self._cumulative_align_y)
appends_before = self.state.append_count
self.motion.set_speed(self.config.scan_speed_index)
time.sleep(0.1)
self.running = True
scan_dir = ScanDirection.RIGHT if direction == 'right' else ScanDirection.LEFT
stop_reason = self._scan_direction(scan_dir)
self.running = False
results['success'] = True
results['stop_reason'] = stop_reason
results['appends'] = self.state.append_count - appends_before
results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height)
results['alignment_after'] = (self._cumulative_align_x, self._cumulative_align_y)
self.log(f"Alignment change: ({results['alignment_before'][0]:.1f}, {results['alignment_before'][1]:.1f}) -> ({results['alignment_after'][0]:.1f}, {results['alignment_after'][1]:.1f})")
except Exception as e:
results['error'] = str(e)
self.running = False
return results
def test_strip_alignment(self) -> dict:
"""Test strip alignment detection at current position."""
results = {
'success': False,
'x_offset': 0.0,
'y_offset': 0.0,
'confidence': 0.0,
'error': None
}
try:
self.log("Testing strip alignment detection...")
if self.mosaic is None:
self.log("No mosaic - initializing...")
frame = self._capture_frame()
self._init_mosaic(frame)
frame = self._capture_frame()
expected_x = int(self.state.current_x + self._cumulative_align_x)
expected_y = int(self._row_start_y + self._cumulative_align_y)
# Test for both directions
for direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
alignment = self._detect_strip_alignment(frame, direction, expected_x, expected_y)
self.log(f" {direction.value}: valid={alignment.valid}, X={alignment.x_offset:.1f}, Y={alignment.y_offset:.1f}, conf={alignment.confidence:.3f}")
# Return RIGHT direction result
alignment = self._detect_strip_alignment(frame, ScanDirection.RIGHT, expected_x, expected_y)
results['success'] = alignment.valid
results['x_offset'] = alignment.x_offset
results['y_offset'] = alignment.y_offset
results['confidence'] = alignment.confidence
except Exception as e:
results['error'] = str(e)
self.log(f"Test error: {e}")
return results
def reset_alignment(self):
"""Reset cumulative alignment to zero."""
self._cumulative_align_x = 0.0
self._cumulative_align_y = 0.0
self._last_strip_alignment = AlignmentOffset()
self.log("Alignment reset to (0, 0)")
def get_memory_estimate(self) -> dict:
current_bytes = self.mosaic.nbytes if self.mosaic is not None else 0
max_bytes = self.config.max_mosaic_width * self.config.max_mosaic_height * 3
return {
'current_size': (self.state.mosaic_width, self.state.mosaic_height),
'current_mb': current_bytes / (1024 * 1024),
'max_size': (self.config.max_mosaic_width, self.config.max_mosaic_height),
'max_mb': max_bytes / (1024 * 1024),
}