Autoscope-Controller/src/stitching_scanner.py
2026-01-09 19:56:49 -06:00

911 lines
No EOL
36 KiB
Python

"""
Stitching Scanner v2 - Simplified unified approach
Same displacement-based stitching for both horizontal rows and vertical row transitions.
No complex visual matching - just track displacement and append strips.
"""
import cv2
import numpy as np
import time
import threading
from dataclasses import dataclass
from typing import Optional, Callable, Tuple
from enum import Enum
class ScanDirection(Enum):
RIGHT = 'right'
LEFT = 'left'
DOWN = 'down'
UP = 'up'
@dataclass
class StitchConfig:
displacement_threshold: float = 0.10 # 10% of frame triggers append
movement_interval: float = 0.001
frame_interval: float = 1.00
settle_time: float = 0.75
max_scan_time: float = 300.0
row_overlap: float = 0.15
max_mosaic_width: int = 1000
max_mosaic_height: int = 1000
scan_speed_index: int = 3
autofocus_every_row: bool = True
@dataclass
class StitchState:
is_scanning: bool = False
direction: str = ''
cumulative_x: float = 0.0
cumulative_y: float = 0.0
current_x: float = 0.0
current_y: float = 0.0
last_displacement: Tuple[float, float] = (0.0, 0.0)
current_row: int = 0
total_rows: int = 0
mosaic_width: int = 0
mosaic_height: int = 0
mosaic_init_width: int = 0
mosaic_init_height: int = 0
frame_count: int = 0
append_count: int = 0
class StitchingScanner:
"""
Slide scanner using continuous stitching.
Unified approach for horizontal and vertical movement.
"""
def __init__(self, camera, motion_controller, autofocus_controller=None,
config: StitchConfig = None,
on_log: Callable[[str], None] = None,
on_progress: Callable[[int, int], None] = None,
on_mosaic_updated: Callable[[], None] = None):
self.camera = camera
self.motion = motion_controller
self.autofocus = autofocus_controller
self.config = config or StitchConfig()
self.on_log = on_log
self.on_progress = on_progress
self.on_mosaic_updated = on_mosaic_updated
self.running = False
self.paused = False
self.state = StitchState()
self._state_lock = threading.Lock()
self.mosaic: Optional[np.ndarray] = None
self._mosaic_lock = threading.Lock()
self._prev_frame: Optional[np.ndarray] = None
self._displacement_since_append_x: float = 0.0
self._displacement_since_append_y: float = 0.0
self._thread: Optional[threading.Thread] = None
def log(self, message: str):
if self.on_log:
self.on_log(f"[Stitch] {message}")
print(f"[Stitch] {message}")
# =========================================================================
# Displacement Detection
# =========================================================================
def _to_grayscale(self, frame: np.ndarray) -> np.ndarray:
if len(frame.shape) == 3:
return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
return frame
def _detect_displacement(self, prev_frame: np.ndarray,
curr_frame: np.ndarray) -> Tuple[float, float]:
prev_gray = self._to_grayscale(prev_frame)
curr_gray = self._to_grayscale(curr_frame)
if prev_gray.shape != curr_gray.shape:
return (0.0, 0.0)
prev_f = prev_gray.astype(np.float32)
curr_f = curr_gray.astype(np.float32)
h, w = prev_gray.shape
window = cv2.createHanningWindow((w, h), cv2.CV_32F)
prev_f = prev_f * window
curr_f = curr_f * window
shift, _ = cv2.phaseCorrelate(prev_f, curr_f)
return shift
def _detect_displacement_robust(self, prev_frame: np.ndarray,
curr_frame: np.ndarray) -> Tuple[float, float]:
dx, dy = self._detect_displacement(prev_frame, curr_frame)
h, w = prev_frame.shape[:2]
max_displacement = max(w, h) * 0.5
if abs(dx) > max_displacement or abs(dy) > max_displacement:
self.log(f"Warning: Large displacement ({dx:.1f}, {dy:.1f}), ignoring")
return (0.0, 0.0)
return (dx, dy)
# =========================================================================
# Mosaic Building
# =========================================================================
def _init_mosaic(self, frame: np.ndarray):
with self._mosaic_lock:
self.mosaic = frame.copy()
self._prev_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
with self._state_lock:
h, w = frame.shape[:2]
self.state.mosaic_width = w
self.state.mosaic_height = h
self.state.mosaic_init_width = w
self.state.mosaic_init_height = h
self.state.frame_count = 1
self.state.append_count = 0
self.state.current_y = 0
self.state.current_x = 0
self.log(f"Initialized mosaic: {frame.shape[1]}x{frame.shape[0]}")
def _blend_horizontal_at_y(self, base: np.ndarray, strip: np.ndarray,
blend_width: int, append_right: bool,
y_offset: int = 0) -> np.ndarray:
h_base, w_base = base.shape[:2]
h_strip, w_strip = strip.shape[:2]
# Clamp y_offset
y_offset = max(0, min(y_offset, h_base - h_strip))
blend_w = min(blend_width, w_strip, w_base)
# Early exit: no blending
if blend_w <= 0 or blend_w >= w_strip:
if append_right:
result_width = w_base + w_strip
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
result[:, :w_base] = base
result[y_offset:y_offset + h_strip, w_base:] = strip
return result
else:
result_width = w_base + w_strip
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
result[:, w_strip:] = base
result[y_offset:y_offset + h_strip, :w_strip] = strip
return result
if append_right:
result_width = w_base + w_strip - blend_w
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
# Step 1: Copy entire base
result[:, :w_base] = base
# Step 2: Copy non-overlap portion of strip at correct Y
result[y_offset:y_offset + h_strip, w_base:] = strip[:, blend_w:]
# Step 3: Create blend
alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
base_overlap = base[y_offset:y_offset + h_strip, -blend_w:].astype(np.float32)
strip_overlap = strip[:, :blend_w].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
# Step 4: Place blend (overwrites part of base at correct Y)
result[y_offset:y_offset + h_strip, w_base - blend_w:w_base] = blended
return result
else: # append_left
result_width = w_base + w_strip - blend_w
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
# Step 1: Copy ENTIRE base shifted right
result[:, w_strip - blend_w:] = base
# Step 2: Copy non-overlap portion of strip at correct Y
result[y_offset:y_offset + h_strip, :w_strip - blend_w] = strip[:, :w_strip - blend_w]
# Step 3: Create blend
alpha = np.linspace(0, 1, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
strip_overlap = strip[:, -blend_w:].astype(np.float32)
base_overlap = base[y_offset:y_offset + h_strip, :blend_w].astype(np.float32)
blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8)
# Step 4: Place blend at correct Y
result[y_offset:y_offset + h_strip, w_strip - blend_w:w_strip] = blended
return result
def _blend_horizontal(self, base: np.ndarray, strip: np.ndarray,
blend_width: int, append_right: bool) -> np.ndarray:
if blend_width <= 0 or blend_width >= strip.shape[1]:
if append_right:
return np.hstack([base, strip])
return np.hstack([strip, base])
h_base, w_base = base.shape[:2]
h_strip, w_strip = strip.shape[:2]
self.log(f"Base Width: {w_base}px")
if h_strip != h_base:
if append_right:
return np.hstack([base, strip])
return np.hstack([strip, base])
blend_w = min(blend_width, w_strip, w_base)
if append_right:
result_width = w_base + w_strip - blend_w
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
result[:, :w_base] = base
alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
base_overlap = base[:, -blend_w:].astype(np.float32)
strip_overlap = strip[:, :blend_w].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
result[:, w_base - blend_w:w_base] = blended
result[:, w_base:] = strip[:, blend_w:]
return result
else:
result_width = w_base + w_strip - blend_w
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
result[:, :w_strip] = strip
alpha = np.linspace(0, 1, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
strip_overlap = strip[:, -blend_w:].astype(np.float32)
base_overlap = base[:, :blend_w].astype(np.float32)
blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8)
result[:, w_strip - blend_w:w_strip] = blended
result[:, w_strip:] = base[:, blend_w:]
return result
def _blend_vertical_at_x(self, base: np.ndarray, strip: np.ndarray,
blend_height: int, append_below: bool,
x_off: int = 0) -> np.ndarray:
h_base, w_base = base.shape[:2]
h_strip, w_strip = strip.shape[:2]
# Clamp x_offset to valid range
x_offset = max(0, w_base - self.state.mosaic_init_width)
# Create full-width strip with strip placed at x_offset
full_strip = np.zeros((h_strip, w_base, 3), dtype=np.uint8)
available_width = w_base - x_offset
copy_width = min(w_strip, available_width)
full_strip[:, x_offset:x_offset + copy_width] = strip[:, :copy_width]
# Early exit: no blending possible
if blend_height <= 0 or blend_height >= h_strip:
if append_below:
return np.vstack([base, full_strip])
return np.vstack([full_strip, base])
# Height mismatch shouldn't happen with full_strip, but safety check
if w_strip > w_base:
self.log(f"Warning: strip wider than base ({w_strip} > {w_base})")
blend_h = min(blend_height, h_strip, h_base)
if append_below:
result_height = h_base + h_strip - blend_h
result = np.zeros((result_height, w_base, 3), dtype=np.uint8)
result[:h_base, :] = base
alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
base_overlap = base[-blend_h:, :].astype(np.float32)
strip_overlap = full_strip[:blend_h, :].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
result[h_base - blend_h:h_base, :] = blended
result[h_base:, :] = full_strip[blend_h:, :]
return result
else:
result_height = h_base + h_strip - blend_h
result = np.zeros((result_height, w_base, 3), dtype=np.uint8)
result[:h_strip, :] = full_strip
alpha = np.linspace(0, 1, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
strip_overlap = full_strip[-blend_h:, :].astype(np.float32)
base_overlap = base[:blend_h, :].astype(np.float32)
blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8)
result[h_strip - blend_h:h_strip, :] = blended
result[h_strip:, :] = base[blend_h:, :]
return result
def _blend_vertical(self, base: np.ndarray, strip: np.ndarray,
blend_height: int, append_below: bool) -> np.ndarray:
mh, mw = base.shape[:2]
sh, sw = strip.shape[:2]
# Match widths
if sw > mw:
strip = strip[:, :mw]
elif sw < mw:
pad = np.zeros((sh, mw - sw, 3), dtype=np.uint8)
strip = np.hstack([strip, pad])
blend_h = min(blend_height, sh, mh)
if blend_h <= 0:
if append_below:
return np.vstack([base, strip])
return np.vstack([strip, base])
if append_below:
alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
base_overlap = base[-blend_h:].astype(np.float32)
strip_overlap = strip[:blend_h].astype(np.float32)
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
result_h = mh + sh - blend_h
result = np.zeros((result_h, mw, 3), dtype=np.uint8)
result[:mh - blend_h] = base[:-blend_h]
result[mh - blend_h:mh] = blended
result[mh:] = strip[blend_h:]
return result
else:
alpha = np.linspace(0, 1, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
strip_overlap = strip[-blend_h:].astype(np.float32)
base_overlap = base[:blend_h].astype(np.float32)
blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8)
result_h = mh + sh - blend_h
result = np.zeros((result_h, mw, 3), dtype=np.uint8)
result[:sh - blend_h] = strip[:-blend_h]
result[sh - blend_h:sh] = blended
result[sh:] = base[blend_h:]
return result
def _append_strip(self, frame: np.ndarray, direction: ScanDirection):
"""Append strip to mosaic based on accumulated displacement."""
BLEND_WIDTH = 10
SAFETY_MARGIN = 2
with self._mosaic_lock:
if self.mosaic is None:
return
h, w = frame.shape[:2]
mh, mw = self.mosaic.shape[:2]
dx = abs(self._displacement_since_append_x)
dy = abs(self._displacement_since_append_y)
if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
append_width = round(dx) + SAFETY_MARGIN
append_width = min(append_width, w - BLEND_WIDTH - 5)
if append_width < 1:
return
pixels_consumed = append_width - SAFETY_MARGIN
fractional_remainder = dx - pixels_consumed
# Calculate Y offset for current row
y_offset = int(self.state.current_y)
if direction == ScanDirection.RIGHT:
strip_start = max(0, w - append_width - BLEND_WIDTH)
new_strip = frame[:, strip_start:]
self.mosaic = self._blend_horizontal_at_y(
self.mosaic, new_strip, BLEND_WIDTH, append_right=True, y_offset=y_offset)
else:
strip_end = min(w, append_width + BLEND_WIDTH)
new_strip = frame[:, :strip_end]
self.mosaic = self._blend_horizontal_at_y(
self.mosaic, new_strip, BLEND_WIDTH, append_right=False, y_offset=y_offset)
self._displacement_since_append_x = fractional_remainder
self._displacement_since_append_y = 0.0
elif direction in [ScanDirection.DOWN, ScanDirection.UP]:
append_height = round(dy) + SAFETY_MARGIN
append_height = min(append_height, h - BLEND_WIDTH - 5)
if append_height < 1:
return
pixels_consumed = append_height - SAFETY_MARGIN
fractional_remainder = dy - pixels_consumed
if direction == ScanDirection.DOWN:
strip_end = min(h, append_height + BLEND_WIDTH)
new_strip = frame[:strip_end:, :]
self.mosaic = self._blend_vertical_at_x(
self.mosaic, new_strip, BLEND_WIDTH, append_below=False, x_off=int(self.state.current_x))
else:
strip_start = max(0, h - append_height - BLEND_WIDTH)
new_strip = frame[:strip_start, :]
self.mosaic = self._blend_vertical_at_x(
self.mosaic, new_strip, BLEND_WIDTH, append_below=True, x_off=int(self.state.current_x))
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = fractional_remainder
new_mh, new_mw = self.mosaic.shape[:2]
with self._state_lock:
self.state.mosaic_width = new_mw
self.state.mosaic_height = new_mh
self.state.append_count += 1
if self.on_mosaic_updated:
self.on_mosaic_updated()
# =========================================================================
# Scan Control
# =========================================================================
def start(self) -> bool:
if self.running:
self.log("Already running")
return False
self.running = True
self.paused = False
with self._state_lock:
self.state = StitchState()
self.state.is_scanning = True
with self._mosaic_lock:
self.mosaic = None
self._prev_frame = None
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
self._thread = threading.Thread(target=self._scan_loop, daemon=True)
self._thread.start()
self.log("Stitching scan started")
return True
def stop(self):
self.running = False
self.paused = False
self.motion.stop_all()
with self._state_lock:
self.state.is_scanning = False
self.log("Scan stopped")
def pause(self):
if self.running and not self.paused:
self.paused = True
self.motion.stop_all()
self.log("Scan paused")
def resume(self):
if self.running and self.paused:
self.paused = False
self.log("Scan resumed")
# =========================================================================
# Scanning Logic
# =========================================================================
def _scan_loop(self):
try:
self.log("Starting scan loop")
self.log(f"Max dimensions: {self.config.max_mosaic_width}x{self.config.max_mosaic_height}")
self.motion.set_speed(self.config.scan_speed_index)
time.sleep(0.1)
frame = self._capture_frame()
self._init_mosaic(frame)
row = 0
while self.running:
with self._state_lock:
self.state.current_row = row
self.state.total_rows = row + 1
self.log(f"=== Row {row + 1} ===")
# Serpentine: even rows right, odd rows left
h_direction = ScanDirection.RIGHT if row % 2 == 0 else ScanDirection.LEFT
stop_reason = self._scan_direction(h_direction)
if not self.running:
break
# Check max height
if self.state.mosaic_height >= self.config.max_mosaic_height:
self.log(f"Max height reached ({self.state.mosaic_height}px)")
break
# Move to next row using same stitching approach
if not self._move_to_next_row():
self.log("Failed to move to next row")
break
row += 1
self.log(f"Scan complete! Final: {self.state.mosaic_width}x{self.state.mosaic_height}")
except Exception as e:
self.log(f"Scan error: {e}")
import traceback
traceback.print_exc()
finally:
self.running = False
self.motion.stop_all()
with self._state_lock:
self.state.is_scanning = False
def _scan_direction(self, direction: ScanDirection) -> str:
"""Scan in a direction until edge or max dimension reached."""
self.log(f"Scanning {direction.value}...")
with self._state_lock:
self.state.direction = direction.value
frame = self._capture_frame()
h, w = frame.shape[:2]
total_x = 0
# Setup based on direction
if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
threshold_pixels = w * self.config.displacement_threshold
max_dim = self.config.max_mosaic_width
current_dim = lambda: self.state.mosaic_width
start_cmd = 'E' if direction == ScanDirection.RIGHT else 'W'
stop_cmd = 'e' if direction == ScanDirection.RIGHT else 'w'
else:
threshold_pixels = h * self.config.displacement_threshold
max_dim = self.config.max_mosaic_height
current_dim = lambda: self.state.mosaic_height
start_cmd = 'S' if direction == ScanDirection.DOWN else 'N'
stop_cmd = 's' if direction == ScanDirection.DOWN else 'n'
self._prev_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
start_time = time.time()
no_movement_count = 0
max_no_movement = 50
stop_reason = 'stopped'
while self.running and not self.paused:
if time.time() - start_time > self.config.max_scan_time:
self.log("Scan timeout")
stop_reason = 'timeout'
break
if current_dim() >= max_dim and direction == ScanDirection.RIGHT:
self.log(f"Max dimension reached ({current_dim()}px)")
stop_reason = 'max_dim'
break
# if current_dim() <= 0 and direction == ScanDirection.LEFT:
# self.log(f"Max dimension reached ({current_dim()}px)")
# stop_reason = 'min_dim'
# break
if self.state.current_x >= 0 and direction == ScanDirection.LEFT:
self.log(f"Max dimension reached ({self.config.max_mosaic_width}px)")
self.log(f"Current X offset ({self.state.current_x}px) total_x ({total_x}px)")
stop_reason = 'max_dim'
break
if abs(self.state.current_x) >= self.config.max_mosaic_width and direction == ScanDirection.RIGHT:
self.log(f"Max dimension reached ({self.config.max_mosaic_width}px)")
self.log(f"Current X offset ({self.state.current_x}px)")
stop_reason = 'max_dim'
break
# Pulse motor
self.motion.send_command(start_cmd)
time.sleep(self.config.movement_interval)
self.motion.send_command(stop_cmd)
time.sleep(self.config.frame_interval)
curr_frame = self._capture_frame()
dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame)
self._displacement_since_append_x += dx
self._displacement_since_append_y += dy
total_x += dx
self.state.current_x += dx
with self._state_lock:
self.state.cumulative_x = self._displacement_since_append_x
self.state.cumulative_y = self._displacement_since_append_y
self.state.last_displacement = (dx, dy)
self.state.frame_count += 1
# Edge detection
movement = abs(dx) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(dy)
if movement < 1.0:
no_movement_count += 1
if no_movement_count >= max_no_movement:
self.log(f"Edge detected (no movement)")
stop_reason = 'edge'
break
else:
no_movement_count = 0
# Append when threshold reached
disp = abs(self._displacement_since_append_x) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(self._displacement_since_append_y)
if disp >= threshold_pixels:
self._append_strip(curr_frame, direction)
self.log(f"Appended {disp:.1f}px, mosaic: {self.state.mosaic_width}x{self.state.mosaic_height}")
self._prev_frame = curr_frame.copy()
if self.on_progress:
self.on_progress(self.state.append_count, 0)
self.motion.send_command(stop_cmd)
time.sleep(self.config.settle_time)
self.log(f"Direction finished: {stop_reason}")
return stop_reason
def _move_to_next_row(self) -> bool:
"""
Move down to next row using displacement-based stitching.
Same approach as horizontal scanning.
"""
self.log("Moving to next row...")
frame = self._capture_frame()
h, w = frame.shape[:2]
# Target: move (1 - overlap) * frame_height
target_displacement = h * (1 - self.config.row_overlap)
threshold_pixels = h * self.config.displacement_threshold
self.log(f"Target Y: {target_displacement:.0f}px, threshold: {threshold_pixels:.0f}px")
with self._state_lock:
self.state.direction = 'down'
self.state.cumulative_y = 0.0
self._prev_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
total_y = 0.0
no_movement_count = 0
max_no_movement = 30
# Start moving South
self.motion.send_command('S')
try:
while self.running:
self.motion.send_command('S')
time.sleep(self.config.movement_interval)
self.motion.send_command('s')
time.sleep(self.config.frame_interval)
curr_frame = self._capture_frame()
dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame)
self._displacement_since_append_y += dy
total_y += dy
self.state.current_y += dy
with self._state_lock:
self.state.cumulative_y = total_y
self.state.last_displacement = (dx, dy)
# Edge detection
if abs(dy) < 1.0:
no_movement_count += 1
if no_movement_count >= max_no_movement:
self.log("Edge detected during row transition")
self.motion.send_command('s')
time.sleep(self.config.settle_time)
return False
else:
no_movement_count = 0
# Append strip when threshold reached
if abs(self._displacement_since_append_y) >= threshold_pixels:
self._append_strip(curr_frame, ScanDirection.DOWN)
self.log(f" Row transition: appended, total Y: {abs(total_y):.1f}px")
# Done when we've moved enough
if abs(total_y) >= target_displacement:
self.log(f"Row transition complete: {abs(total_y):.1f}px")
self.state.current_y = 0
self.motion.send_command('s')
time.sleep(self.config.settle_time)
# Reset for next horizontal row
frame = self._capture_frame()
self._prev_frame = frame.copy()
self._displacement_since_append_x = 0.0
self._displacement_since_append_y = 0.0
return True
self._prev_frame = curr_frame.copy()
except Exception as e:
self.log(f"Row transition error: {e}")
self.motion.send_command('s')
return False
self.motion.send_command('s')
time.sleep(self.config.settle_time)
return False
def _capture_frame(self) -> np.ndarray:
frame = self.camera.capture_frame()
frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
return frame
# =========================================================================
# Getters
# =========================================================================
def get_state(self) -> StitchState:
with self._state_lock:
return StitchState(
is_scanning=self.state.is_scanning,
direction=self.state.direction,
cumulative_x=self.state.cumulative_x,
cumulative_y=self.state.cumulative_y,
last_displacement=self.state.last_displacement,
current_row=self.state.current_row,
total_rows=self.state.total_rows,
mosaic_width=self.state.mosaic_width,
mosaic_height=self.state.mosaic_height,
frame_count=self.state.frame_count,
append_count=self.state.append_count
)
def get_mosaic(self) -> Optional[np.ndarray]:
with self._mosaic_lock:
if self.mosaic is not None:
return self.mosaic.copy()
return None
def get_mosaic_preview(self, max_size: int = 600) -> Optional[np.ndarray]:
with self._mosaic_lock:
if self.mosaic is None:
return None
h, w = self.mosaic.shape[:2]
scale = min(max_size / w, max_size / h, 1.0)
if scale < 1.0:
new_w = int(w * scale)
new_h = int(h * scale)
return cv2.resize(self.mosaic, (new_w, new_h))
return self.mosaic.copy()
def save_mosaic(self, filepath: str) -> bool:
with self._mosaic_lock:
if self.mosaic is None:
return False
cv2.imwrite(filepath, self.mosaic)
self.log(f"Saved mosaic to {filepath}")
return True
# =========================================================================
# Testing
# =========================================================================
def test_displacement(self, num_frames: int = 10) -> dict:
results = {'frames': [], 'total_dx': 0.0, 'total_dy': 0.0}
prev_frame = self._capture_frame()
for i in range(num_frames):
time.sleep(0.1)
curr_frame = self._capture_frame()
dx, dy = self._detect_displacement(prev_frame, curr_frame)
results['frames'].append({'frame': i, 'dx': dx, 'dy': dy})
results['total_dx'] += dx
results['total_dy'] += dy
prev_frame = curr_frame
return results
def test_row_transition(self) -> dict:
"""Test row transition using displacement stitching."""
results = {
'success': False,
'y_moved': 0.0,
'mosaic_before': (0, 0),
'mosaic_after': (0, 0),
'error': None
}
try:
self.log("Testing row transition...")
if self.mosaic is None:
frame = self._capture_frame()
self._init_mosaic(frame)
results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height)
with self._state_lock:
self.state.cumulative_y = 0.0
self.running = True
success = self._move_to_next_row()
self.running = False
results['success'] = success
results['y_moved'] = self.state.cumulative_y
results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height)
self.log(f"Row transition: {'SUCCESS' if success else 'FAILED'}, Y: {results['y_moved']:.1f}px")
except Exception as e:
results['error'] = str(e)
self.log(f"Test error: {e}")
self.running = False
return results
def test_single_row(self, direction: str = 'right') -> dict:
"""Test scanning a single row."""
results = {
'success': False,
'stop_reason': None,
'appends': 0,
'mosaic_before': (0, 0),
'mosaic_after': (0, 0),
'error': None
}
try:
self.log(f"Testing single row ({direction})...")
if self.mosaic is None:
frame = self._capture_frame()
self._init_mosaic(frame)
results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height)
appends_before = self.state.append_count
self.motion.set_speed(self.config.scan_speed_index)
time.sleep(0.1)
self.running = True
scan_dir = ScanDirection.RIGHT if direction == 'right' else ScanDirection.LEFT
stop_reason = self._scan_direction(scan_dir)
self.running = False
results['success'] = True
results['stop_reason'] = stop_reason
results['appends'] = self.state.append_count - appends_before
results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height)
except Exception as e:
results['error'] = str(e)
self.running = False
return results
def get_memory_estimate(self) -> dict:
current_bytes = self.mosaic.nbytes if self.mosaic is not None else 0
max_bytes = self.config.max_mosaic_width * self.config.max_mosaic_height * 3
return {
'current_size': (self.state.mosaic_width, self.state.mosaic_height),
'current_mb': current_bytes / (1024 * 1024),
'max_size': (self.config.max_mosaic_width, self.config.max_mosaic_height),
'max_mb': max_bytes / (1024 * 1024),
}