Compare commits
5 commits
Simplified
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| f422284a66 | |||
| c63e02b582 | |||
| de8b3944b0 | |||
| 7d8d1733c0 | |||
| 2a894d7d98 |
5 changed files with 1455 additions and 556 deletions
BIN
Mos1.png
BIN
Mos1.png
Binary file not shown.
|
Before Width: | Height: | Size: 513 KiB |
BIN
Mos2.png
BIN
Mos2.png
Binary file not shown.
|
Before Width: | Height: | Size: 194 KiB |
170
src/camera.py
170
src/camera.py
|
|
@ -1,22 +1,183 @@
|
||||||
import cv2
|
import cv2
|
||||||
|
import subprocess
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
|
def detect_camera_modes(device_id=0):
|
||||||
|
"""
|
||||||
|
Detect available camera resolutions using v4l2-ctl.
|
||||||
|
Returns dict of modes sorted by resolution (smallest to largest).
|
||||||
|
"""
|
||||||
|
modes = {}
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Run v4l2-ctl to get supported formats
|
||||||
|
device = f"/dev/video{device_id}"
|
||||||
|
result = subprocess.run(
|
||||||
|
['v4l2-ctl', '--device', device, '--list-formats-ext'],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=5
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.returncode != 0:
|
||||||
|
print(f"v4l2-ctl failed: {result.stderr}")
|
||||||
|
return _get_fallback_modes()
|
||||||
|
|
||||||
|
# Parse output for "Size: Discrete WxH" lines
|
||||||
|
# Example: "Size: Discrete 2592x1944"
|
||||||
|
size_pattern = re.compile(r'Size:\s+Discrete\s+(\d+)x(\d+)')
|
||||||
|
|
||||||
|
resolutions = set() # Use set to avoid duplicates
|
||||||
|
for line in result.stdout.split('\n'):
|
||||||
|
match = size_pattern.search(line)
|
||||||
|
if match:
|
||||||
|
width = int(match.group(1))
|
||||||
|
height = int(match.group(2))
|
||||||
|
resolutions.add((width, height))
|
||||||
|
|
||||||
|
if not resolutions:
|
||||||
|
print("No resolutions found in v4l2-ctl output")
|
||||||
|
return _get_fallback_modes()
|
||||||
|
|
||||||
|
# Sort by total pixels (width * height)
|
||||||
|
sorted_res = sorted(resolutions, key=lambda r: r[0] * r[1])
|
||||||
|
|
||||||
|
# Build modes dict with descriptive names
|
||||||
|
for i, (width, height) in enumerate(sorted_res):
|
||||||
|
pixels = width * height
|
||||||
|
|
||||||
|
# Generate a name based on position/size
|
||||||
|
if i == 0:
|
||||||
|
name = 'low'
|
||||||
|
desc = 'Low'
|
||||||
|
elif i == len(sorted_res) - 1:
|
||||||
|
name = 'high'
|
||||||
|
desc = 'High'
|
||||||
|
elif len(sorted_res) == 3 and i == 1:
|
||||||
|
name = 'medium'
|
||||||
|
desc = 'Medium'
|
||||||
|
else:
|
||||||
|
name = f'res_{width}x{height}'
|
||||||
|
desc = f'{width}x{height}'
|
||||||
|
|
||||||
|
modes[name] = {
|
||||||
|
'width': width,
|
||||||
|
'height': height,
|
||||||
|
'label': f'{width}x{height} ({desc})'
|
||||||
|
}
|
||||||
|
|
||||||
|
print(f"Detected {len(modes)} camera modes: {list(modes.keys())}")
|
||||||
|
return modes
|
||||||
|
|
||||||
|
except FileNotFoundError:
|
||||||
|
print("v4l2-ctl not found, using fallback modes")
|
||||||
|
return _get_fallback_modes()
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
print("v4l2-ctl timed out, using fallback modes")
|
||||||
|
return _get_fallback_modes()
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error detecting camera modes: {e}")
|
||||||
|
return _get_fallback_modes()
|
||||||
|
|
||||||
|
|
||||||
|
def _get_fallback_modes():
|
||||||
|
"""Fallback modes if v4l2-ctl detection fails"""
|
||||||
|
return {
|
||||||
|
'low': {'width': 640, 'height': 480, 'label': '640x480 (Low)'},
|
||||||
|
'medium': {'width': 1280, 'height': 960, 'label': '1280x960 (Medium)'},
|
||||||
|
'high': {'width': 1920, 'height': 1080, 'label': '1920x1080 (High)'},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class Camera:
|
class Camera:
|
||||||
def __init__(self, device_id=0):
|
prevFrame = {}
|
||||||
|
def __init__(self, device_id=0, mode=None):
|
||||||
|
self.device_id = device_id
|
||||||
|
|
||||||
|
# Detect available modes before opening camera
|
||||||
|
self.MODES = detect_camera_modes(device_id)
|
||||||
|
|
||||||
|
# Open camera
|
||||||
self.cap = cv2.VideoCapture(device_id)
|
self.cap = cv2.VideoCapture(device_id)
|
||||||
|
|
||||||
if not self.cap.isOpened():
|
if not self.cap.isOpened():
|
||||||
raise RuntimeError("Could not open camera, stop program")
|
raise RuntimeError("Could not open camera, stop program")
|
||||||
|
|
||||||
|
# Default to highest resolution if no mode specified
|
||||||
|
if mode is None:
|
||||||
|
mode = list(self.MODES.keys())[0] # Last = highest res
|
||||||
|
|
||||||
|
self.current_mode = mode
|
||||||
|
self._apply_mode(mode)
|
||||||
|
|
||||||
# set resolution
|
# set resolution
|
||||||
# self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
|
# self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
|
||||||
# self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
|
# self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
|
||||||
|
self.cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0) # Disable auto-exposure
|
||||||
|
self.cap.set(cv2.CAP_PROP_EXPOSURE, -6) # Set fixed exposure
|
||||||
|
self.cap.set(cv2.CAP_PROP_AUTO_WB, 0) # Disable auto white balance
|
||||||
self.window_name = "AutoScope"
|
self.window_name = "AutoScope"
|
||||||
|
|
||||||
|
def _apply_mode(self, mode_name):
|
||||||
|
"""Apply resolution settings"""
|
||||||
|
if mode_name not in self.MODES:
|
||||||
|
print(f"Unknown mode {mode_name}, using first available")
|
||||||
|
mode_name = list(self.MODES.keys())[0]
|
||||||
|
|
||||||
|
mode = self.MODES[mode_name]
|
||||||
|
|
||||||
|
# Set resolution
|
||||||
|
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, mode['width'])
|
||||||
|
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, mode['height'])
|
||||||
|
|
||||||
|
self.current_mode = mode_name
|
||||||
|
|
||||||
|
# Verify settings took effect
|
||||||
|
actual_w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||||
|
actual_h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||||
|
actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
|
||||||
|
|
||||||
|
print(f"Camera mode: {mode['label']}")
|
||||||
|
print(f" Actual: {actual_w}x{actual_h} @ {actual_fps:.1f}fps")
|
||||||
|
|
||||||
|
return actual_w, actual_h, actual_fps
|
||||||
|
|
||||||
|
def set_mode(self, mode_name):
|
||||||
|
"""Change camera mode (resolution/framerate)"""
|
||||||
|
return self._apply_mode(mode_name)
|
||||||
|
|
||||||
|
def get_mode(self):
|
||||||
|
"""Get current mode name"""
|
||||||
|
return self.current_mode
|
||||||
|
|
||||||
|
def get_mode_info(self):
|
||||||
|
"""Get current mode details"""
|
||||||
|
return self.MODES.get(self.current_mode, list(self.MODES.values())[0])
|
||||||
|
|
||||||
|
def get_resolution(self):
|
||||||
|
"""Get current actual resolution"""
|
||||||
|
w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||||
|
h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||||
|
return w, h
|
||||||
|
|
||||||
|
def get_fps(self):
|
||||||
|
"""Get current actual FPS"""
|
||||||
|
return self.cap.get(cv2.CAP_PROP_FPS)
|
||||||
|
|
||||||
|
def get_available_modes(self):
|
||||||
|
"""Get list of available mode names"""
|
||||||
|
return list(self.MODES.keys())
|
||||||
|
|
||||||
|
def get_mode_labels(self):
|
||||||
|
"""Get mode labels for UI"""
|
||||||
|
return {k: v['label'] for k, v in self.MODES.items()}
|
||||||
|
|
||||||
def capture_frame(self):
|
def capture_frame(self):
|
||||||
ret, frame = self.cap.read()
|
ret, frame = self.cap.read()
|
||||||
if not ret:
|
if not ret:
|
||||||
raise RuntimeError("Failed to capture frame, stop program")
|
return prevframe
|
||||||
|
prevframe = frame
|
||||||
return frame
|
return frame
|
||||||
|
|
||||||
def show_frame(self, frame):
|
def show_frame(self, frame):
|
||||||
|
|
@ -68,5 +229,4 @@ class Camera:
|
||||||
if key == ord('q'):
|
if key == ord('q'):
|
||||||
break
|
break
|
||||||
|
|
||||||
self.close_window()
|
self.close_window()
|
||||||
|
|
||||||
1085
src/gui.py
1085
src/gui.py
File diff suppressed because it is too large
Load diff
756
src/stitching_scanner.py
Normal file
756
src/stitching_scanner.py
Normal file
|
|
@ -0,0 +1,756 @@
|
||||||
|
"""
|
||||||
|
Stitching Scanner v2 - Simplified unified approach
|
||||||
|
|
||||||
|
Same displacement-based stitching for both horizontal rows and vertical row transitions.
|
||||||
|
No complex visual matching - just track displacement and append strips.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Optional, Callable, Tuple
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
|
||||||
|
class ScanDirection(Enum):
|
||||||
|
RIGHT = 'right'
|
||||||
|
LEFT = 'left'
|
||||||
|
DOWN = 'down'
|
||||||
|
UP = 'up'
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class StitchConfig:
|
||||||
|
displacement_threshold: float = 0.10 # 10% of frame triggers append
|
||||||
|
movement_interval: float = 0.001
|
||||||
|
frame_interval: float = 1.00
|
||||||
|
settle_time: float = 0.75
|
||||||
|
max_scan_time: float = 300.0
|
||||||
|
row_overlap: float = 0.15
|
||||||
|
max_mosaic_width: int = 15000
|
||||||
|
max_mosaic_height: int = 12000
|
||||||
|
scan_speed_index: int = 3
|
||||||
|
autofocus_every_row: bool = True
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class StitchState:
|
||||||
|
is_scanning: bool = False
|
||||||
|
direction: str = ''
|
||||||
|
cumulative_x: float = 0.0
|
||||||
|
cumulative_y: float = 0.0
|
||||||
|
last_displacement: Tuple[float, float] = (0.0, 0.0)
|
||||||
|
current_row: int = 0
|
||||||
|
total_rows: int = 0
|
||||||
|
mosaic_width: int = 0
|
||||||
|
mosaic_height: int = 0
|
||||||
|
frame_count: int = 0
|
||||||
|
append_count: int = 0
|
||||||
|
|
||||||
|
|
||||||
|
class StitchingScanner:
|
||||||
|
"""
|
||||||
|
Slide scanner using continuous stitching.
|
||||||
|
Unified approach for horizontal and vertical movement.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, camera, motion_controller, autofocus_controller=None,
|
||||||
|
config: StitchConfig = None,
|
||||||
|
on_log: Callable[[str], None] = None,
|
||||||
|
on_progress: Callable[[int, int], None] = None,
|
||||||
|
on_mosaic_updated: Callable[[], None] = None):
|
||||||
|
self.camera = camera
|
||||||
|
self.motion = motion_controller
|
||||||
|
self.autofocus = autofocus_controller
|
||||||
|
self.config = config or StitchConfig()
|
||||||
|
|
||||||
|
self.on_log = on_log
|
||||||
|
self.on_progress = on_progress
|
||||||
|
self.on_mosaic_updated = on_mosaic_updated
|
||||||
|
|
||||||
|
self.running = False
|
||||||
|
self.paused = False
|
||||||
|
self.state = StitchState()
|
||||||
|
self._state_lock = threading.Lock()
|
||||||
|
|
||||||
|
self.mosaic: Optional[np.ndarray] = None
|
||||||
|
self._mosaic_lock = threading.Lock()
|
||||||
|
|
||||||
|
self._prev_frame: Optional[np.ndarray] = None
|
||||||
|
self._displacement_since_append_x: float = 0.0
|
||||||
|
self._displacement_since_append_y: float = 0.0
|
||||||
|
|
||||||
|
self._thread: Optional[threading.Thread] = None
|
||||||
|
|
||||||
|
def log(self, message: str):
|
||||||
|
if self.on_log:
|
||||||
|
self.on_log(f"[Stitch] {message}")
|
||||||
|
print(f"[Stitch] {message}")
|
||||||
|
|
||||||
|
# =========================================================================
|
||||||
|
# Displacement Detection
|
||||||
|
# =========================================================================
|
||||||
|
|
||||||
|
def _to_grayscale(self, frame: np.ndarray) -> np.ndarray:
|
||||||
|
if len(frame.shape) == 3:
|
||||||
|
return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||||
|
return frame
|
||||||
|
|
||||||
|
def _detect_displacement(self, prev_frame: np.ndarray,
|
||||||
|
curr_frame: np.ndarray) -> Tuple[float, float]:
|
||||||
|
prev_gray = self._to_grayscale(prev_frame)
|
||||||
|
curr_gray = self._to_grayscale(curr_frame)
|
||||||
|
|
||||||
|
if prev_gray.shape != curr_gray.shape:
|
||||||
|
return (0.0, 0.0)
|
||||||
|
|
||||||
|
prev_f = prev_gray.astype(np.float32)
|
||||||
|
curr_f = curr_gray.astype(np.float32)
|
||||||
|
|
||||||
|
h, w = prev_gray.shape
|
||||||
|
window = cv2.createHanningWindow((w, h), cv2.CV_32F)
|
||||||
|
prev_f = prev_f * window
|
||||||
|
curr_f = curr_f * window
|
||||||
|
|
||||||
|
shift, _ = cv2.phaseCorrelate(prev_f, curr_f)
|
||||||
|
return shift
|
||||||
|
|
||||||
|
def _detect_displacement_robust(self, prev_frame: np.ndarray,
|
||||||
|
curr_frame: np.ndarray) -> Tuple[float, float]:
|
||||||
|
dx, dy = self._detect_displacement(prev_frame, curr_frame)
|
||||||
|
|
||||||
|
h, w = prev_frame.shape[:2]
|
||||||
|
max_displacement = max(w, h) * 0.5
|
||||||
|
|
||||||
|
if abs(dx) > max_displacement or abs(dy) > max_displacement:
|
||||||
|
self.log(f"Warning: Large displacement ({dx:.1f}, {dy:.1f}), ignoring")
|
||||||
|
return (0.0, 0.0)
|
||||||
|
|
||||||
|
return (dx, dy)
|
||||||
|
|
||||||
|
# =========================================================================
|
||||||
|
# Mosaic Building
|
||||||
|
# =========================================================================
|
||||||
|
|
||||||
|
def _init_mosaic(self, frame: np.ndarray):
|
||||||
|
with self._mosaic_lock:
|
||||||
|
self.mosaic = frame.copy()
|
||||||
|
|
||||||
|
self._prev_frame = frame.copy()
|
||||||
|
self._displacement_since_append_x = 0.0
|
||||||
|
self._displacement_since_append_y = 0.0
|
||||||
|
|
||||||
|
with self._state_lock:
|
||||||
|
h, w = frame.shape[:2]
|
||||||
|
self.state.mosaic_width = w
|
||||||
|
self.state.mosaic_height = h
|
||||||
|
self.state.frame_count = 1
|
||||||
|
self.state.append_count = 0
|
||||||
|
|
||||||
|
self.log(f"Initialized mosaic: {frame.shape[1]}x{frame.shape[0]}")
|
||||||
|
|
||||||
|
def _blend_horizontal(self, base: np.ndarray, strip: np.ndarray,
|
||||||
|
blend_width: int, append_right: bool) -> np.ndarray:
|
||||||
|
if blend_width <= 0 or blend_width >= strip.shape[1]:
|
||||||
|
if append_right:
|
||||||
|
return np.hstack([base, strip])
|
||||||
|
return np.hstack([strip, base])
|
||||||
|
|
||||||
|
h_base, w_base = base.shape[:2]
|
||||||
|
h_strip, w_strip = strip.shape[:2]
|
||||||
|
|
||||||
|
if h_strip != h_base:
|
||||||
|
if append_right:
|
||||||
|
return np.hstack([base, strip])
|
||||||
|
return np.hstack([strip, base])
|
||||||
|
|
||||||
|
blend_w = min(blend_width, w_strip, w_base)
|
||||||
|
|
||||||
|
if append_right:
|
||||||
|
result_width = w_base + w_strip - blend_w
|
||||||
|
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
|
||||||
|
result[:, :w_base] = base
|
||||||
|
|
||||||
|
alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
|
||||||
|
base_overlap = base[:, -blend_w:].astype(np.float32)
|
||||||
|
strip_overlap = strip[:, :blend_w].astype(np.float32)
|
||||||
|
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
|
||||||
|
|
||||||
|
result[:, w_base - blend_w:w_base] = blended
|
||||||
|
result[:, w_base:] = strip[:, blend_w:]
|
||||||
|
return result
|
||||||
|
else:
|
||||||
|
result_width = w_base + w_strip - blend_w
|
||||||
|
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
|
||||||
|
result[:, :w_strip] = strip
|
||||||
|
|
||||||
|
alpha = np.linspace(0, 1, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
|
||||||
|
strip_overlap = strip[:, -blend_w:].astype(np.float32)
|
||||||
|
base_overlap = base[:, :blend_w].astype(np.float32)
|
||||||
|
blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8)
|
||||||
|
|
||||||
|
result[:, w_strip - blend_w:w_strip] = blended
|
||||||
|
result[:, w_strip:] = base[:, blend_w:]
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _blend_vertical(self, base: np.ndarray, strip: np.ndarray,
|
||||||
|
blend_height: int, append_below: bool) -> np.ndarray:
|
||||||
|
mh, mw = base.shape[:2]
|
||||||
|
sh, sw = strip.shape[:2]
|
||||||
|
|
||||||
|
# Match widths
|
||||||
|
if sw > mw:
|
||||||
|
strip = strip[:, :mw]
|
||||||
|
elif sw < mw:
|
||||||
|
pad = np.zeros((sh, mw - sw, 3), dtype=np.uint8)
|
||||||
|
strip = np.hstack([strip, pad])
|
||||||
|
|
||||||
|
blend_h = min(blend_height, sh, mh)
|
||||||
|
|
||||||
|
if blend_h <= 0:
|
||||||
|
if append_below:
|
||||||
|
return np.vstack([base, strip])
|
||||||
|
return np.vstack([strip, base])
|
||||||
|
|
||||||
|
if append_below:
|
||||||
|
alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
|
||||||
|
base_overlap = base[-blend_h:].astype(np.float32)
|
||||||
|
strip_overlap = strip[:blend_h].astype(np.float32)
|
||||||
|
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
|
||||||
|
|
||||||
|
result_h = mh + sh - blend_h
|
||||||
|
result = np.zeros((result_h, mw, 3), dtype=np.uint8)
|
||||||
|
result[:mh - blend_h] = base[:-blend_h]
|
||||||
|
result[mh - blend_h:mh] = blended
|
||||||
|
result[mh:] = strip[blend_h:]
|
||||||
|
return result
|
||||||
|
else:
|
||||||
|
alpha = np.linspace(0, 1, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
|
||||||
|
strip_overlap = strip[-blend_h:].astype(np.float32)
|
||||||
|
base_overlap = base[:blend_h].astype(np.float32)
|
||||||
|
blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8)
|
||||||
|
|
||||||
|
result_h = mh + sh - blend_h
|
||||||
|
result = np.zeros((result_h, mw, 3), dtype=np.uint8)
|
||||||
|
result[:sh - blend_h] = strip[:-blend_h]
|
||||||
|
result[sh - blend_h:sh] = blended
|
||||||
|
result[sh:] = base[blend_h:]
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _append_strip(self, frame: np.ndarray, direction: ScanDirection):
|
||||||
|
"""Append strip to mosaic based on accumulated displacement."""
|
||||||
|
BLEND_WIDTH = 10
|
||||||
|
SAFETY_MARGIN = 2
|
||||||
|
|
||||||
|
with self._mosaic_lock:
|
||||||
|
if self.mosaic is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
h, w = frame.shape[:2]
|
||||||
|
mh, mw = self.mosaic.shape[:2]
|
||||||
|
|
||||||
|
dx = abs(self._displacement_since_append_x)
|
||||||
|
dy = abs(self._displacement_since_append_y)
|
||||||
|
|
||||||
|
if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
|
||||||
|
append_width = round(dx) + SAFETY_MARGIN
|
||||||
|
append_width = min(append_width, w - BLEND_WIDTH - 5)
|
||||||
|
|
||||||
|
if append_width < 1:
|
||||||
|
return
|
||||||
|
|
||||||
|
pixels_consumed = append_width - SAFETY_MARGIN
|
||||||
|
fractional_remainder = dx - pixels_consumed
|
||||||
|
|
||||||
|
if direction == ScanDirection.RIGHT:
|
||||||
|
strip_start = max(0, w - append_width - BLEND_WIDTH)
|
||||||
|
new_strip = frame[:, strip_start:]
|
||||||
|
self.mosaic = self._blend_horizontal(
|
||||||
|
self.mosaic, new_strip, BLEND_WIDTH, append_right=True)
|
||||||
|
else:
|
||||||
|
strip_end = min(w, append_width + BLEND_WIDTH)
|
||||||
|
new_strip = frame[:, :strip_end]
|
||||||
|
self.mosaic = self._blend_horizontal(
|
||||||
|
self.mosaic, new_strip, BLEND_WIDTH, append_right=False)
|
||||||
|
|
||||||
|
self._displacement_since_append_x = fractional_remainder
|
||||||
|
self._displacement_since_append_y = 0.0
|
||||||
|
|
||||||
|
elif direction in [ScanDirection.DOWN, ScanDirection.UP]:
|
||||||
|
append_height = round(dy) + SAFETY_MARGIN
|
||||||
|
append_height = min(append_height, h - BLEND_WIDTH - 5)
|
||||||
|
|
||||||
|
if append_height < 1:
|
||||||
|
return
|
||||||
|
|
||||||
|
pixels_consumed = append_height - SAFETY_MARGIN
|
||||||
|
fractional_remainder = dy - pixels_consumed
|
||||||
|
|
||||||
|
if direction == ScanDirection.DOWN:
|
||||||
|
strip_end = min(h, append_height + BLEND_WIDTH)
|
||||||
|
new_strip = frame[:strip_end:, :]
|
||||||
|
self.mosaic = self._blend_vertical(
|
||||||
|
self.mosaic, new_strip, BLEND_WIDTH, append_below=False)
|
||||||
|
else:
|
||||||
|
strip_start = max(0, h - append_height - BLEND_WIDTH)
|
||||||
|
new_strip = frame[:strip_start, :]
|
||||||
|
self.mosaic = self._blend_vertical(
|
||||||
|
self.mosaic, new_strip, BLEND_WIDTH, append_below=True)
|
||||||
|
|
||||||
|
self._displacement_since_append_x = 0.0
|
||||||
|
self._displacement_since_append_y = fractional_remainder
|
||||||
|
|
||||||
|
new_mh, new_mw = self.mosaic.shape[:2]
|
||||||
|
|
||||||
|
with self._state_lock:
|
||||||
|
self.state.mosaic_width = new_mw
|
||||||
|
self.state.mosaic_height = new_mh
|
||||||
|
self.state.append_count += 1
|
||||||
|
|
||||||
|
if self.on_mosaic_updated:
|
||||||
|
self.on_mosaic_updated()
|
||||||
|
|
||||||
|
# =========================================================================
|
||||||
|
# Scan Control
|
||||||
|
# =========================================================================
|
||||||
|
|
||||||
|
def start(self) -> bool:
|
||||||
|
if self.running:
|
||||||
|
self.log("Already running")
|
||||||
|
return False
|
||||||
|
|
||||||
|
self.running = True
|
||||||
|
self.paused = False
|
||||||
|
|
||||||
|
with self._state_lock:
|
||||||
|
self.state = StitchState()
|
||||||
|
self.state.is_scanning = True
|
||||||
|
|
||||||
|
with self._mosaic_lock:
|
||||||
|
self.mosaic = None
|
||||||
|
|
||||||
|
self._prev_frame = None
|
||||||
|
self._displacement_since_append_x = 0.0
|
||||||
|
self._displacement_since_append_y = 0.0
|
||||||
|
|
||||||
|
self._thread = threading.Thread(target=self._scan_loop, daemon=True)
|
||||||
|
self._thread.start()
|
||||||
|
|
||||||
|
self.log("Stitching scan started")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.running = False
|
||||||
|
self.paused = False
|
||||||
|
self.motion.stop_all()
|
||||||
|
|
||||||
|
with self._state_lock:
|
||||||
|
self.state.is_scanning = False
|
||||||
|
|
||||||
|
self.log("Scan stopped")
|
||||||
|
|
||||||
|
def pause(self):
|
||||||
|
if self.running and not self.paused:
|
||||||
|
self.paused = True
|
||||||
|
self.motion.stop_all()
|
||||||
|
self.log("Scan paused")
|
||||||
|
|
||||||
|
def resume(self):
|
||||||
|
if self.running and self.paused:
|
||||||
|
self.paused = False
|
||||||
|
self.log("Scan resumed")
|
||||||
|
|
||||||
|
# =========================================================================
|
||||||
|
# Scanning Logic
|
||||||
|
# =========================================================================
|
||||||
|
|
||||||
|
def _scan_loop(self):
|
||||||
|
try:
|
||||||
|
self.log("Starting scan loop")
|
||||||
|
self.log(f"Max dimensions: {self.config.max_mosaic_width}x{self.config.max_mosaic_height}")
|
||||||
|
|
||||||
|
self.motion.set_speed(self.config.scan_speed_index)
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
frame = self._capture_frame()
|
||||||
|
self._init_mosaic(frame)
|
||||||
|
|
||||||
|
row = 0
|
||||||
|
while self.running:
|
||||||
|
with self._state_lock:
|
||||||
|
self.state.current_row = row
|
||||||
|
self.state.total_rows = row + 1
|
||||||
|
|
||||||
|
self.log(f"=== Row {row + 1} ===")
|
||||||
|
|
||||||
|
# Serpentine: even rows right, odd rows left
|
||||||
|
h_direction = ScanDirection.RIGHT if row % 2 == 0 else ScanDirection.LEFT
|
||||||
|
|
||||||
|
stop_reason = self._scan_direction(h_direction)
|
||||||
|
|
||||||
|
if not self.running:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Check max height
|
||||||
|
if self.state.mosaic_height >= self.config.max_mosaic_height:
|
||||||
|
self.log(f"Max height reached ({self.state.mosaic_height}px)")
|
||||||
|
break
|
||||||
|
|
||||||
|
# Move to next row using same stitching approach
|
||||||
|
if not self._move_to_next_row():
|
||||||
|
self.log("Failed to move to next row")
|
||||||
|
break
|
||||||
|
|
||||||
|
row += 1
|
||||||
|
|
||||||
|
self.log(f"Scan complete! Final: {self.state.mosaic_width}x{self.state.mosaic_height}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log(f"Scan error: {e}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
finally:
|
||||||
|
self.running = False
|
||||||
|
self.motion.stop_all()
|
||||||
|
with self._state_lock:
|
||||||
|
self.state.is_scanning = False
|
||||||
|
|
||||||
|
def _scan_direction(self, direction: ScanDirection) -> str:
|
||||||
|
"""Scan in a direction until edge or max dimension reached."""
|
||||||
|
self.log(f"Scanning {direction.value}...")
|
||||||
|
|
||||||
|
with self._state_lock:
|
||||||
|
self.state.direction = direction.value
|
||||||
|
|
||||||
|
frame = self._capture_frame()
|
||||||
|
h, w = frame.shape[:2]
|
||||||
|
|
||||||
|
# Setup based on direction
|
||||||
|
if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
|
||||||
|
threshold_pixels = w * self.config.displacement_threshold
|
||||||
|
max_dim = self.config.max_mosaic_width
|
||||||
|
current_dim = lambda: self.state.mosaic_width
|
||||||
|
start_cmd = 'E' if direction == ScanDirection.RIGHT else 'W'
|
||||||
|
stop_cmd = 'e' if direction == ScanDirection.RIGHT else 'w'
|
||||||
|
else:
|
||||||
|
threshold_pixels = h * self.config.displacement_threshold
|
||||||
|
max_dim = self.config.max_mosaic_height
|
||||||
|
current_dim = lambda: self.state.mosaic_height
|
||||||
|
start_cmd = 'S' if direction == ScanDirection.DOWN else 'N'
|
||||||
|
stop_cmd = 's' if direction == ScanDirection.DOWN else 'n'
|
||||||
|
|
||||||
|
self._prev_frame = frame.copy()
|
||||||
|
self._displacement_since_append_x = 0.0
|
||||||
|
self._displacement_since_append_y = 0.0
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
|
no_movement_count = 0
|
||||||
|
max_no_movement = 50
|
||||||
|
stop_reason = 'stopped'
|
||||||
|
|
||||||
|
while self.running and not self.paused:
|
||||||
|
if time.time() - start_time > self.config.max_scan_time:
|
||||||
|
self.log("Scan timeout")
|
||||||
|
stop_reason = 'timeout'
|
||||||
|
break
|
||||||
|
|
||||||
|
if current_dim() >= max_dim:
|
||||||
|
self.log(f"Max dimension reached ({current_dim()}px)")
|
||||||
|
stop_reason = 'max_dim'
|
||||||
|
break
|
||||||
|
|
||||||
|
# Pulse motor
|
||||||
|
self.motion.send_command(start_cmd)
|
||||||
|
time.sleep(self.config.movement_interval)
|
||||||
|
self.motion.send_command(stop_cmd)
|
||||||
|
|
||||||
|
time.sleep(self.config.frame_interval)
|
||||||
|
|
||||||
|
curr_frame = self._capture_frame()
|
||||||
|
dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame)
|
||||||
|
|
||||||
|
self._displacement_since_append_x += dx
|
||||||
|
self._displacement_since_append_y += dy
|
||||||
|
|
||||||
|
with self._state_lock:
|
||||||
|
self.state.cumulative_x = self._displacement_since_append_x
|
||||||
|
self.state.cumulative_y = self._displacement_since_append_y
|
||||||
|
self.state.last_displacement = (dx, dy)
|
||||||
|
self.state.frame_count += 1
|
||||||
|
|
||||||
|
# Edge detection
|
||||||
|
movement = abs(dx) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(dy)
|
||||||
|
if movement < 1.0:
|
||||||
|
no_movement_count += 1
|
||||||
|
if no_movement_count >= max_no_movement:
|
||||||
|
self.log(f"Edge detected (no movement)")
|
||||||
|
stop_reason = 'edge'
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
no_movement_count = 0
|
||||||
|
|
||||||
|
# Append when threshold reached
|
||||||
|
disp = abs(self._displacement_since_append_x) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(self._displacement_since_append_y)
|
||||||
|
if disp >= threshold_pixels:
|
||||||
|
self._append_strip(curr_frame, direction)
|
||||||
|
self.log(f"Appended {disp:.1f}px, mosaic: {self.state.mosaic_width}x{self.state.mosaic_height}")
|
||||||
|
|
||||||
|
self._prev_frame = curr_frame.copy()
|
||||||
|
|
||||||
|
if self.on_progress:
|
||||||
|
self.on_progress(self.state.append_count, 0)
|
||||||
|
|
||||||
|
self.motion.send_command(stop_cmd)
|
||||||
|
time.sleep(self.config.settle_time)
|
||||||
|
self.log(f"Direction finished: {stop_reason}")
|
||||||
|
return stop_reason
|
||||||
|
|
||||||
|
def _move_to_next_row(self) -> bool:
|
||||||
|
"""
|
||||||
|
Move down to next row using displacement-based stitching.
|
||||||
|
Same approach as horizontal scanning.
|
||||||
|
"""
|
||||||
|
self.log("Moving to next row...")
|
||||||
|
|
||||||
|
frame = self._capture_frame()
|
||||||
|
h, w = frame.shape[:2]
|
||||||
|
|
||||||
|
# Target: move (1 - overlap) * frame_height
|
||||||
|
target_displacement = h * (1 - self.config.row_overlap)
|
||||||
|
threshold_pixels = h * self.config.displacement_threshold
|
||||||
|
|
||||||
|
self.log(f"Target Y: {target_displacement:.0f}px, threshold: {threshold_pixels:.0f}px")
|
||||||
|
|
||||||
|
with self._state_lock:
|
||||||
|
self.state.direction = 'down'
|
||||||
|
self.state.cumulative_y = 0.0
|
||||||
|
|
||||||
|
self._prev_frame = frame.copy()
|
||||||
|
self._displacement_since_append_x = 0.0
|
||||||
|
self._displacement_since_append_y = 0.0
|
||||||
|
|
||||||
|
total_y = 0.0
|
||||||
|
no_movement_count = 0
|
||||||
|
max_no_movement = 30
|
||||||
|
|
||||||
|
# Start moving South
|
||||||
|
self.motion.send_command('S')
|
||||||
|
|
||||||
|
try:
|
||||||
|
while self.running:
|
||||||
|
time.sleep(self.config.frame_interval)
|
||||||
|
|
||||||
|
curr_frame = self._capture_frame()
|
||||||
|
dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame)
|
||||||
|
|
||||||
|
self._displacement_since_append_y += dy
|
||||||
|
total_y += dy
|
||||||
|
|
||||||
|
with self._state_lock:
|
||||||
|
self.state.cumulative_y = total_y
|
||||||
|
self.state.last_displacement = (dx, dy)
|
||||||
|
|
||||||
|
# Edge detection
|
||||||
|
if abs(dy) < 1.0:
|
||||||
|
no_movement_count += 1
|
||||||
|
if no_movement_count >= max_no_movement:
|
||||||
|
self.log("Edge detected during row transition")
|
||||||
|
self.motion.send_command('s')
|
||||||
|
time.sleep(self.config.settle_time)
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
no_movement_count = 0
|
||||||
|
|
||||||
|
# Append strip when threshold reached
|
||||||
|
if abs(self._displacement_since_append_y) >= threshold_pixels:
|
||||||
|
self._append_strip(curr_frame, ScanDirection.DOWN)
|
||||||
|
self.log(f" Row transition: appended, total Y: {abs(total_y):.1f}px")
|
||||||
|
|
||||||
|
# Done when we've moved enough
|
||||||
|
if abs(total_y) >= target_displacement:
|
||||||
|
self.log(f"Row transition complete: {abs(total_y):.1f}px")
|
||||||
|
self.motion.send_command('s')
|
||||||
|
time.sleep(self.config.settle_time)
|
||||||
|
|
||||||
|
# Reset for next horizontal row
|
||||||
|
frame = self._capture_frame()
|
||||||
|
self._prev_frame = frame.copy()
|
||||||
|
self._displacement_since_append_x = 0.0
|
||||||
|
self._displacement_since_append_y = 0.0
|
||||||
|
return True
|
||||||
|
|
||||||
|
self._prev_frame = curr_frame.copy()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log(f"Row transition error: {e}")
|
||||||
|
self.motion.send_command('s')
|
||||||
|
return False
|
||||||
|
|
||||||
|
self.motion.send_command('s')
|
||||||
|
time.sleep(self.config.settle_time)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _capture_frame(self) -> np.ndarray:
|
||||||
|
frame = self.camera.capture_frame()
|
||||||
|
frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
|
||||||
|
return frame
|
||||||
|
|
||||||
|
# =========================================================================
|
||||||
|
# Getters
|
||||||
|
# =========================================================================
|
||||||
|
|
||||||
|
def get_state(self) -> StitchState:
|
||||||
|
with self._state_lock:
|
||||||
|
return StitchState(
|
||||||
|
is_scanning=self.state.is_scanning,
|
||||||
|
direction=self.state.direction,
|
||||||
|
cumulative_x=self.state.cumulative_x,
|
||||||
|
cumulative_y=self.state.cumulative_y,
|
||||||
|
last_displacement=self.state.last_displacement,
|
||||||
|
current_row=self.state.current_row,
|
||||||
|
total_rows=self.state.total_rows,
|
||||||
|
mosaic_width=self.state.mosaic_width,
|
||||||
|
mosaic_height=self.state.mosaic_height,
|
||||||
|
frame_count=self.state.frame_count,
|
||||||
|
append_count=self.state.append_count
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_mosaic(self) -> Optional[np.ndarray]:
|
||||||
|
with self._mosaic_lock:
|
||||||
|
if self.mosaic is not None:
|
||||||
|
return self.mosaic.copy()
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_mosaic_preview(self, max_size: int = 600) -> Optional[np.ndarray]:
|
||||||
|
with self._mosaic_lock:
|
||||||
|
if self.mosaic is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
h, w = self.mosaic.shape[:2]
|
||||||
|
scale = min(max_size / w, max_size / h, 1.0)
|
||||||
|
|
||||||
|
if scale < 1.0:
|
||||||
|
new_w = int(w * scale)
|
||||||
|
new_h = int(h * scale)
|
||||||
|
return cv2.resize(self.mosaic, (new_w, new_h))
|
||||||
|
|
||||||
|
return self.mosaic.copy()
|
||||||
|
|
||||||
|
def save_mosaic(self, filepath: str) -> bool:
|
||||||
|
with self._mosaic_lock:
|
||||||
|
if self.mosaic is None:
|
||||||
|
return False
|
||||||
|
cv2.imwrite(filepath, self.mosaic)
|
||||||
|
self.log(f"Saved mosaic to {filepath}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
# =========================================================================
|
||||||
|
# Testing
|
||||||
|
# =========================================================================
|
||||||
|
|
||||||
|
def test_displacement(self, num_frames: int = 10) -> dict:
|
||||||
|
results = {'frames': [], 'total_dx': 0.0, 'total_dy': 0.0}
|
||||||
|
prev_frame = self._capture_frame()
|
||||||
|
|
||||||
|
for i in range(num_frames):
|
||||||
|
time.sleep(0.1)
|
||||||
|
curr_frame = self._capture_frame()
|
||||||
|
dx, dy = self._detect_displacement(prev_frame, curr_frame)
|
||||||
|
results['frames'].append({'frame': i, 'dx': dx, 'dy': dy})
|
||||||
|
results['total_dx'] += dx
|
||||||
|
results['total_dy'] += dy
|
||||||
|
prev_frame = curr_frame
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
def test_row_transition(self) -> dict:
|
||||||
|
"""Test row transition using displacement stitching."""
|
||||||
|
results = {
|
||||||
|
'success': False,
|
||||||
|
'y_moved': 0.0,
|
||||||
|
'mosaic_before': (0, 0),
|
||||||
|
'mosaic_after': (0, 0),
|
||||||
|
'error': None
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.log("Testing row transition...")
|
||||||
|
|
||||||
|
if self.mosaic is None:
|
||||||
|
frame = self._capture_frame()
|
||||||
|
self._init_mosaic(frame)
|
||||||
|
|
||||||
|
results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height)
|
||||||
|
|
||||||
|
with self._state_lock:
|
||||||
|
self.state.cumulative_y = 0.0
|
||||||
|
|
||||||
|
self.running = True
|
||||||
|
success = self._move_to_next_row()
|
||||||
|
self.running = False
|
||||||
|
|
||||||
|
results['success'] = success
|
||||||
|
results['y_moved'] = self.state.cumulative_y
|
||||||
|
results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height)
|
||||||
|
|
||||||
|
self.log(f"Row transition: {'SUCCESS' if success else 'FAILED'}, Y: {results['y_moved']:.1f}px")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
results['error'] = str(e)
|
||||||
|
self.log(f"Test error: {e}")
|
||||||
|
self.running = False
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
def test_single_row(self, direction: str = 'right') -> dict:
|
||||||
|
"""Test scanning a single row."""
|
||||||
|
results = {
|
||||||
|
'success': False,
|
||||||
|
'stop_reason': None,
|
||||||
|
'appends': 0,
|
||||||
|
'mosaic_before': (0, 0),
|
||||||
|
'mosaic_after': (0, 0),
|
||||||
|
'error': None
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.log(f"Testing single row ({direction})...")
|
||||||
|
|
||||||
|
if self.mosaic is None:
|
||||||
|
frame = self._capture_frame()
|
||||||
|
self._init_mosaic(frame)
|
||||||
|
|
||||||
|
results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height)
|
||||||
|
appends_before = self.state.append_count
|
||||||
|
|
||||||
|
self.motion.set_speed(self.config.scan_speed_index)
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
self.running = True
|
||||||
|
scan_dir = ScanDirection.RIGHT if direction == 'right' else ScanDirection.LEFT
|
||||||
|
stop_reason = self._scan_direction(scan_dir)
|
||||||
|
self.running = False
|
||||||
|
|
||||||
|
results['success'] = True
|
||||||
|
results['stop_reason'] = stop_reason
|
||||||
|
results['appends'] = self.state.append_count - appends_before
|
||||||
|
results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
results['error'] = str(e)
|
||||||
|
self.running = False
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
def get_memory_estimate(self) -> dict:
|
||||||
|
current_bytes = self.mosaic.nbytes if self.mosaic is not None else 0
|
||||||
|
max_bytes = self.config.max_mosaic_width * self.config.max_mosaic_height * 3
|
||||||
|
|
||||||
|
return {
|
||||||
|
'current_size': (self.state.mosaic_width, self.state.mosaic_height),
|
||||||
|
'current_mb': current_bytes / (1024 * 1024),
|
||||||
|
'max_size': (self.config.max_mosaic_width, self.config.max_mosaic_height),
|
||||||
|
'max_mb': max_bytes / (1024 * 1024),
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue