constant displacement check
This commit is contained in:
parent
16cb8b360b
commit
5177eea77e
1 changed files with 217 additions and 107 deletions
|
|
@ -3,6 +3,7 @@ Stitching Scanner v2 - Simplified unified approach
|
||||||
|
|
||||||
Same displacement-based stitching for both horizontal rows and vertical row transitions.
|
Same displacement-based stitching for both horizontal rows and vertical row transitions.
|
||||||
No complex visual matching - just track displacement and append strips.
|
No complex visual matching - just track displacement and append strips.
|
||||||
|
Continuous alignment correction for gear slippage compensation.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import cv2
|
import cv2
|
||||||
|
|
@ -55,17 +56,19 @@ class StitchState:
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class RowAlignmentOffset:
|
class AlignmentOffset:
|
||||||
"""Stores the alignment offset calculated at the start of each row"""
|
"""Stores alignment offset for strip placement"""
|
||||||
x_offset: float = 0.0
|
x_offset: float = 0.0
|
||||||
y_offset: float = 0.0
|
y_offset: float = 0.0
|
||||||
valid: bool = False
|
valid: bool = False
|
||||||
|
confidence: float = 0.0 # Phase correlation response
|
||||||
|
|
||||||
|
|
||||||
class StitchingScanner:
|
class StitchingScanner:
|
||||||
"""
|
"""
|
||||||
Slide scanner using continuous stitching.
|
Slide scanner using continuous stitching.
|
||||||
Unified approach for horizontal and vertical movement.
|
Unified approach for horizontal and vertical movement.
|
||||||
|
Continuous alignment correction for gear slippage compensation.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, camera, motion_controller, autofocus_controller=None,
|
def __init__(self, camera, motion_controller, autofocus_controller=None,
|
||||||
|
|
@ -94,9 +97,12 @@ class StitchingScanner:
|
||||||
self._displacement_since_append_x: float = 0.0
|
self._displacement_since_append_x: float = 0.0
|
||||||
self._displacement_since_append_y: float = 0.0
|
self._displacement_since_append_y: float = 0.0
|
||||||
|
|
||||||
# Row alignment offset - calculated at the start of each row
|
# Cumulative alignment drift - tracks total correction applied
|
||||||
self._row_alignment = RowAlignmentOffset()
|
self._cumulative_align_x: float = 0.0
|
||||||
self._is_first_strip_of_row: bool = True
|
self._cumulative_align_y: float = 0.0
|
||||||
|
|
||||||
|
# Last strip's alignment for continuity
|
||||||
|
self._last_strip_alignment = AlignmentOffset()
|
||||||
|
|
||||||
self._thread: Optional[threading.Thread] = None
|
self._thread: Optional[threading.Thread] = None
|
||||||
|
|
||||||
|
|
@ -130,9 +136,29 @@ class StitchingScanner:
|
||||||
prev_f = prev_f * window
|
prev_f = prev_f * window
|
||||||
curr_f = curr_f * window
|
curr_f = curr_f * window
|
||||||
|
|
||||||
shift, _ = cv2.phaseCorrelate(prev_f, curr_f)
|
shift, response = cv2.phaseCorrelate(prev_f, curr_f)
|
||||||
return shift
|
return shift
|
||||||
|
|
||||||
|
def _detect_displacement_with_confidence(self, prev_frame: np.ndarray,
|
||||||
|
curr_frame: np.ndarray) -> Tuple[float, float, float]:
|
||||||
|
"""Detect displacement and return confidence (phase correlation response)."""
|
||||||
|
prev_gray = self._to_grayscale(prev_frame)
|
||||||
|
curr_gray = self._to_grayscale(curr_frame)
|
||||||
|
|
||||||
|
if prev_gray.shape != curr_gray.shape:
|
||||||
|
return (0.0, 0.0, 0.0)
|
||||||
|
|
||||||
|
prev_f = prev_gray.astype(np.float32)
|
||||||
|
curr_f = curr_gray.astype(np.float32)
|
||||||
|
|
||||||
|
h, w = prev_gray.shape
|
||||||
|
window = cv2.createHanningWindow((w, h), cv2.CV_32F)
|
||||||
|
prev_f = prev_f * window
|
||||||
|
curr_f = curr_f * window
|
||||||
|
|
||||||
|
shift, response = cv2.phaseCorrelate(prev_f, curr_f)
|
||||||
|
return (shift[0], shift[1], response)
|
||||||
|
|
||||||
def _detect_displacement_robust(self, prev_frame: np.ndarray,
|
def _detect_displacement_robust(self, prev_frame: np.ndarray,
|
||||||
curr_frame: np.ndarray) -> Tuple[float, float]:
|
curr_frame: np.ndarray) -> Tuple[float, float]:
|
||||||
dx, dy = self._detect_displacement(prev_frame, curr_frame)
|
dx, dy = self._detect_displacement(prev_frame, curr_frame)
|
||||||
|
|
@ -146,76 +172,115 @@ class StitchingScanner:
|
||||||
|
|
||||||
return (dx, dy)
|
return (dx, dy)
|
||||||
|
|
||||||
def _detect_row_alignment(self, current_frame: np.ndarray, direction: ScanDirection) -> RowAlignmentOffset:
|
def _detect_strip_alignment(self, frame: np.ndarray, direction: ScanDirection,
|
||||||
|
expected_x: int, expected_y: int) -> AlignmentOffset:
|
||||||
"""
|
"""
|
||||||
Detect alignment offset at the start of a new row by comparing
|
Detect alignment offset for a strip by comparing the current frame
|
||||||
the current frame with the overlapping region of the mosaic.
|
with the expected overlap region of the mosaic.
|
||||||
|
|
||||||
This compensates for backlash in control gears that causes circular
|
This provides continuous correction for gear slippage during scanning.
|
||||||
motion instead of right angles when transitioning from vertical to horizontal.
|
|
||||||
|
Args:
|
||||||
|
frame: Current camera frame
|
||||||
|
direction: Scan direction
|
||||||
|
expected_x: Expected X position in mosaic
|
||||||
|
expected_y: Expected Y position in mosaic
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
AlignmentOffset with X/Y correction needed
|
||||||
"""
|
"""
|
||||||
offset = RowAlignmentOffset()
|
offset = AlignmentOffset()
|
||||||
|
|
||||||
with self._mosaic_lock:
|
with self._mosaic_lock:
|
||||||
if self.mosaic is None:
|
if self.mosaic is None:
|
||||||
self.log("No mosaic for row alignment detection")
|
|
||||||
return offset
|
return offset
|
||||||
|
|
||||||
mh, mw = self.mosaic.shape[:2]
|
mh, mw = self.mosaic.shape[:2]
|
||||||
fh, fw = current_frame.shape[:2]
|
fh, fw = frame.shape[:2]
|
||||||
|
|
||||||
# Determine where in the mosaic we expect to overlap
|
# Clamp expected positions
|
||||||
# For LEFT direction: we're at the right edge of the mosaic
|
expected_y = max(0, min(expected_y, mh - fh))
|
||||||
# For RIGHT direction: we're at the left edge (but this is row 0, shouldn't need alignment)
|
expected_x = max(0, min(expected_x, mw - fw))
|
||||||
|
|
||||||
if direction == ScanDirection.LEFT:
|
if direction == ScanDirection.RIGHT:
|
||||||
# We're starting from the right side, moving left
|
# We're appending to the right
|
||||||
# The current frame should overlap with the right edge of the mosaic
|
# Compare left portion of frame with right edge of mosaic
|
||||||
# Extract the rightmost portion of the mosaic that should overlap
|
overlap_width = min(fw // 2, mw - expected_x, 200) # Use up to 200px overlap
|
||||||
|
|
||||||
# The overlap region is at the bottom-right of the mosaic
|
if overlap_width < 30:
|
||||||
# Current Y position in mosaic
|
return offset
|
||||||
y_pos = int(self.state.current_y)
|
|
||||||
y_pos = max(0, min(y_pos, mh - fh))
|
|
||||||
|
|
||||||
# Extract overlap region from mosaic (right edge)
|
# Extract regions
|
||||||
overlap_width = min(fw, mw)
|
mosaic_region = self.mosaic[expected_y:expected_y + fh, mw - overlap_width:mw]
|
||||||
mosaic_region = self.mosaic[y_pos:y_pos + fh, mw - overlap_width:mw]
|
frame_region = frame[:, :overlap_width]
|
||||||
frame_region = current_frame[:, :overlap_width]
|
|
||||||
|
|
||||||
else: # RIGHT direction
|
elif direction == ScanDirection.LEFT:
|
||||||
# We're starting from the left side, moving right
|
# We're placing within existing mosaic, moving left
|
||||||
# Current Y position in mosaic
|
# Compare right portion of frame with mosaic at expected position
|
||||||
y_pos = int(self.state.current_y)
|
overlap_width = min(fw // 2, mw - expected_x, 200)
|
||||||
y_pos = max(0, min(y_pos, mh - fh))
|
|
||||||
|
|
||||||
# Extract overlap region from mosaic (left edge)
|
if overlap_width < 30:
|
||||||
overlap_width = min(fw, mw)
|
return offset
|
||||||
mosaic_region = self.mosaic[y_pos:y_pos + fh, :overlap_width]
|
|
||||||
frame_region = current_frame[:, :overlap_width]
|
# The frame's right edge should align with mosaic at expected_x + fw
|
||||||
|
mosaic_x_end = min(expected_x + fw, mw)
|
||||||
|
mosaic_x_start = max(mosaic_x_end - overlap_width, 0)
|
||||||
|
actual_overlap = mosaic_x_end - mosaic_x_start
|
||||||
|
|
||||||
|
if actual_overlap < 30:
|
||||||
|
return offset
|
||||||
|
|
||||||
|
mosaic_region = self.mosaic[expected_y:expected_y + fh, mosaic_x_start:mosaic_x_end]
|
||||||
|
frame_region = frame[:, fw - actual_overlap:]
|
||||||
|
|
||||||
|
elif direction == ScanDirection.DOWN:
|
||||||
|
# We're appending below
|
||||||
|
# Compare top portion of frame with bottom edge of mosaic
|
||||||
|
overlap_height = min(fh // 2, mh - expected_y, 200)
|
||||||
|
|
||||||
|
if overlap_height < 30:
|
||||||
|
return offset
|
||||||
|
|
||||||
|
mosaic_region = self.mosaic[mh - overlap_height:mh, expected_x:expected_x + fw]
|
||||||
|
frame_region = frame[:overlap_height, :]
|
||||||
|
|
||||||
|
else: # UP
|
||||||
|
# Compare bottom portion of frame with top edge of mosaic
|
||||||
|
overlap_height = min(fh // 2, expected_y, 200)
|
||||||
|
|
||||||
|
if overlap_height < 30:
|
||||||
|
return offset
|
||||||
|
|
||||||
|
mosaic_region = self.mosaic[:overlap_height, expected_x:expected_x + fw]
|
||||||
|
frame_region = frame[fh - overlap_height:, :]
|
||||||
|
|
||||||
# Ensure regions have the same size
|
# Ensure regions have the same size
|
||||||
min_h = min(mosaic_region.shape[0], frame_region.shape[0])
|
min_h = min(mosaic_region.shape[0], frame_region.shape[0])
|
||||||
min_w = min(mosaic_region.shape[1], frame_region.shape[1])
|
min_w = min(mosaic_region.shape[1], frame_region.shape[1])
|
||||||
|
|
||||||
if min_h < 50 or min_w < 50:
|
if min_h < 30 or min_w < 30:
|
||||||
self.log(f"Overlap region too small for alignment: {min_w}x{min_h}")
|
self.log(f"Strip alignment: overlap too small ({min_w}x{min_h})")
|
||||||
return offset
|
return offset
|
||||||
|
|
||||||
mosaic_region = mosaic_region[:min_h, :min_w]
|
mosaic_region = mosaic_region[:min_h, :min_w]
|
||||||
frame_region = frame_region[:min_h, :min_w]
|
frame_region = frame_region[:min_h, :min_w]
|
||||||
|
|
||||||
# Detect displacement between mosaic region and current frame
|
# Detect displacement with confidence
|
||||||
dx, dy = self._detect_displacement_robust(mosaic_region, frame_region)
|
dx, dy, confidence = self._detect_displacement_with_confidence(mosaic_region, frame_region)
|
||||||
|
|
||||||
|
# Sanity check - reject large displacements
|
||||||
|
max_adjust = 50 # Max pixels to adjust
|
||||||
|
if abs(dx) > max_adjust or abs(dy) > max_adjust:
|
||||||
|
self.log(f"Strip alignment: displacement too large ({dx:.1f}, {dy:.1f}), ignoring")
|
||||||
|
return offset
|
||||||
|
|
||||||
offset.x_offset = dx
|
offset.x_offset = dx
|
||||||
offset.y_offset = dy
|
offset.y_offset = dy
|
||||||
offset.valid = True
|
offset.confidence = confidence
|
||||||
|
offset.valid = confidence > 0.1 # Require minimum confidence
|
||||||
|
|
||||||
self.log(f"=== Row Alignment Detection ===")
|
if offset.valid:
|
||||||
self.log(f" Direction: {direction.value}")
|
self.log(f" Strip alignment: X={dx:.1f}, Y={dy:.1f}, conf={confidence:.3f}")
|
||||||
self.log(f" Mosaic region: {mosaic_region.shape[1]}x{mosaic_region.shape[0]} at Y={y_pos}")
|
|
||||||
self.log(f" Detected offset: X={dx:.1f}, Y={dy:.1f}")
|
|
||||||
|
|
||||||
return offset
|
return offset
|
||||||
|
|
||||||
|
|
@ -231,9 +296,10 @@ class StitchingScanner:
|
||||||
self._displacement_since_append_x = 0.0
|
self._displacement_since_append_x = 0.0
|
||||||
self._displacement_since_append_y = 0.0
|
self._displacement_since_append_y = 0.0
|
||||||
|
|
||||||
# Reset row alignment for first row (no alignment needed)
|
# Reset cumulative alignment
|
||||||
self._row_alignment = RowAlignmentOffset()
|
self._cumulative_align_x = 0.0
|
||||||
self._is_first_strip_of_row = False # First row doesn't need alignment
|
self._cumulative_align_y = 0.0
|
||||||
|
self._last_strip_alignment = AlignmentOffset()
|
||||||
|
|
||||||
with self._state_lock:
|
with self._state_lock:
|
||||||
h, w = frame.shape[:2]
|
h, w = frame.shape[:2]
|
||||||
|
|
@ -262,8 +328,8 @@ class StitchingScanner:
|
||||||
append_right: True to append to right, False to append left
|
append_right: True to append to right, False to append left
|
||||||
x_offset: X position for left-append mode
|
x_offset: X position for left-append mode
|
||||||
y_offset: Y position in the mosaic
|
y_offset: Y position in the mosaic
|
||||||
alignment_x: Additional X alignment offset (from row alignment detection)
|
alignment_x: Additional X alignment offset (from strip alignment detection)
|
||||||
alignment_y: Additional Y alignment offset (from row alignment detection)
|
alignment_y: Additional Y alignment offset (from strip alignment detection)
|
||||||
"""
|
"""
|
||||||
h_base, w_base = base.shape[:2]
|
h_base, w_base = base.shape[:2]
|
||||||
h_strip, w_strip = strip.shape[:2]
|
h_strip, w_strip = strip.shape[:2]
|
||||||
|
|
@ -310,13 +376,12 @@ class StitchingScanner:
|
||||||
if x_offset is None:
|
if x_offset is None:
|
||||||
x_offset = 0
|
x_offset = 0
|
||||||
|
|
||||||
# Apply alignment offsets (detected at start of row)
|
# Apply alignment offsets (continuous correction)
|
||||||
x_offset = x_offset + int(round(alignment_x))
|
x_offset = x_offset + int(round(alignment_x))
|
||||||
y_offset = y_offset - int(round(alignment_y))
|
y_offset = y_offset - int(round(alignment_y))
|
||||||
|
|
||||||
# Clamp x_offset to valid range
|
# Clamp x_offset to valid range
|
||||||
# x_offset = max(0, min(x_offset, w_base - blend_w))
|
x_offset = max(0, min(x_offset, w_base - blend_w))
|
||||||
x_offset = 0 - min(x_offset, w_base)
|
|
||||||
|
|
||||||
# Handle strip cropping if y_offset is negative (strip protrudes above frame)
|
# Handle strip cropping if y_offset is negative (strip protrudes above frame)
|
||||||
strip_y_start = 0 # How much to crop from top of strip
|
strip_y_start = 0 # How much to crop from top of strip
|
||||||
|
|
@ -343,6 +408,7 @@ class StitchingScanner:
|
||||||
self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}")
|
self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}")
|
||||||
self.log(f" x_offset: {x_offset}, y_offset: {y_offset}, blend_w: {blend_w}")
|
self.log(f" x_offset: {x_offset}, y_offset: {y_offset}, blend_w: {blend_w}")
|
||||||
self.log(f" alignment: X={alignment_x:.1f}, Y={alignment_y:.1f}")
|
self.log(f" alignment: X={alignment_x:.1f}, Y={alignment_y:.1f}")
|
||||||
|
self.log(f" cumulative: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
|
||||||
self.log(f" Strip crop: rows [{strip_y_start}:{strip_y_end}] -> height {h_cropped}")
|
self.log(f" Strip crop: rows [{strip_y_start}:{strip_y_end}] -> height {h_cropped}")
|
||||||
|
|
||||||
# Result is same size as base (no expansion when going left)
|
# Result is same size as base (no expansion when going left)
|
||||||
|
|
@ -426,12 +492,15 @@ class StitchingScanner:
|
||||||
|
|
||||||
def _blend_vertical_at_x(self, base: np.ndarray, strip: np.ndarray,
|
def _blend_vertical_at_x(self, base: np.ndarray, strip: np.ndarray,
|
||||||
blend_height: int, append_below: bool,
|
blend_height: int, append_below: bool,
|
||||||
x_off: int = 0) -> np.ndarray:
|
x_off: int = 0,
|
||||||
|
alignment_x: float = 0.0, alignment_y: float = 0.0) -> np.ndarray:
|
||||||
h_base, w_base = base.shape[:2]
|
h_base, w_base = base.shape[:2]
|
||||||
h_strip, w_strip = strip.shape[:2]
|
h_strip, w_strip = strip.shape[:2]
|
||||||
|
|
||||||
# Clamp x_offset to valid range
|
# Apply alignment offset for X position
|
||||||
x_offset = max(0, w_base - self.state.mosaic_init_width)
|
x_offset = max(0, w_base - self.state.mosaic_init_width)
|
||||||
|
x_offset = x_offset + int(round(alignment_x))
|
||||||
|
x_offset = max(0, min(x_offset, w_base - w_strip)) if w_strip < w_base else 0
|
||||||
|
|
||||||
# Create full-width strip with strip placed at x_offset
|
# Create full-width strip with strip placed at x_offset
|
||||||
full_strip = np.zeros((h_strip, w_base, 3), dtype=np.uint8)
|
full_strip = np.zeros((h_strip, w_base, 3), dtype=np.uint8)
|
||||||
|
|
@ -439,16 +508,16 @@ class StitchingScanner:
|
||||||
copy_width = min(w_strip, available_width)
|
copy_width = min(w_strip, available_width)
|
||||||
full_strip[:, x_offset:x_offset + copy_width] = strip[:, :copy_width]
|
full_strip[:, x_offset:x_offset + copy_width] = strip[:, :copy_width]
|
||||||
|
|
||||||
|
self.log(f"=== _blend_vertical_at_x ===")
|
||||||
|
self.log(f" base: {w_base}x{h_base}, strip: {w_strip}x{h_strip}")
|
||||||
|
self.log(f" x_offset: {x_offset}, alignment: X={alignment_x:.1f}, Y={alignment_y:.1f}")
|
||||||
|
|
||||||
# Early exit: no blending possible
|
# Early exit: no blending possible
|
||||||
if blend_height <= 0 or blend_height >= h_strip:
|
if blend_height <= 0 or blend_height >= h_strip:
|
||||||
if append_below:
|
if append_below:
|
||||||
return np.vstack([base, full_strip])
|
return np.vstack([base, full_strip])
|
||||||
return np.vstack([full_strip, base])
|
return np.vstack([full_strip, base])
|
||||||
|
|
||||||
# Height mismatch shouldn't happen with full_strip, but safety check
|
|
||||||
if w_strip > w_base:
|
|
||||||
self.log(f"Warning: strip wider than base ({w_strip} > {w_base})")
|
|
||||||
|
|
||||||
blend_h = min(blend_height, h_strip, h_base)
|
blend_h = min(blend_height, h_strip, h_base)
|
||||||
|
|
||||||
if append_below:
|
if append_below:
|
||||||
|
|
@ -523,7 +592,7 @@ class StitchingScanner:
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def _append_strip(self, frame: np.ndarray, direction: ScanDirection):
|
def _append_strip(self, frame: np.ndarray, direction: ScanDirection):
|
||||||
"""Append strip to mosaic based on accumulated displacement."""
|
"""Append strip to mosaic based on accumulated displacement with continuous alignment."""
|
||||||
BLEND_WIDTH = 10
|
BLEND_WIDTH = 10
|
||||||
SAFETY_MARGIN = 2
|
SAFETY_MARGIN = 2
|
||||||
|
|
||||||
|
|
@ -537,9 +606,22 @@ class StitchingScanner:
|
||||||
dx = abs(self._displacement_since_append_x)
|
dx = abs(self._displacement_since_append_x)
|
||||||
dy = abs(self._displacement_since_append_y)
|
dy = abs(self._displacement_since_append_y)
|
||||||
|
|
||||||
# Get alignment offsets for this row
|
# Calculate expected position for alignment detection
|
||||||
align_x = self._row_alignment.x_offset if self._row_alignment.valid else 0.0
|
expected_x = int(self.state.current_x + self._cumulative_align_x)
|
||||||
align_y = self._row_alignment.y_offset if self._row_alignment.valid else 0.0
|
expected_y = int(self.state.current_y + self._cumulative_align_y)
|
||||||
|
|
||||||
|
# Detect alignment for this strip
|
||||||
|
alignment = self._detect_strip_alignment(frame, direction, expected_x, expected_y)
|
||||||
|
|
||||||
|
if alignment.valid:
|
||||||
|
# Update cumulative alignment
|
||||||
|
self._cumulative_align_x += alignment.x_offset
|
||||||
|
self._cumulative_align_y += alignment.y_offset
|
||||||
|
self._last_strip_alignment = alignment
|
||||||
|
|
||||||
|
# Get total alignment offsets
|
||||||
|
align_x = self._cumulative_align_x
|
||||||
|
align_y = self._cumulative_align_y
|
||||||
|
|
||||||
if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
|
if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
|
||||||
append_width = round(dx) + SAFETY_MARGIN
|
append_width = round(dx) + SAFETY_MARGIN
|
||||||
|
|
@ -584,14 +666,18 @@ class StitchingScanner:
|
||||||
|
|
||||||
if direction == ScanDirection.DOWN:
|
if direction == ScanDirection.DOWN:
|
||||||
strip_end = min(h, append_height + BLEND_WIDTH)
|
strip_end = min(h, append_height + BLEND_WIDTH)
|
||||||
new_strip = frame[:strip_end:, :]
|
new_strip = frame[:strip_end, :]
|
||||||
self.mosaic = self._blend_vertical_at_x(
|
self.mosaic = self._blend_vertical_at_x(
|
||||||
self.mosaic, new_strip, BLEND_WIDTH, append_below=False, x_off=int(self.state.current_x))
|
self.mosaic, new_strip, BLEND_WIDTH, append_below=False,
|
||||||
|
x_off=int(self.state.current_x),
|
||||||
|
alignment_x=align_x, alignment_y=align_y)
|
||||||
else:
|
else:
|
||||||
strip_start = max(0, h - append_height - BLEND_WIDTH)
|
strip_start = max(0, h - append_height - BLEND_WIDTH)
|
||||||
new_strip = frame[:strip_start, :]
|
new_strip = frame[strip_start:, :]
|
||||||
self.mosaic = self._blend_vertical_at_x(
|
self.mosaic = self._blend_vertical_at_x(
|
||||||
self.mosaic, new_strip, BLEND_WIDTH, append_below=True, x_off=int(self.state.current_x))
|
self.mosaic, new_strip, BLEND_WIDTH, append_below=True,
|
||||||
|
x_off=int(self.state.current_x),
|
||||||
|
alignment_x=align_x, alignment_y=align_y)
|
||||||
|
|
||||||
self._displacement_since_append_x = 0.0
|
self._displacement_since_append_x = 0.0
|
||||||
self._displacement_since_append_y = fractional_remainder
|
self._displacement_since_append_y = fractional_remainder
|
||||||
|
|
@ -628,8 +714,11 @@ class StitchingScanner:
|
||||||
self._prev_frame = None
|
self._prev_frame = None
|
||||||
self._displacement_since_append_x = 0.0
|
self._displacement_since_append_x = 0.0
|
||||||
self._displacement_since_append_y = 0.0
|
self._displacement_since_append_y = 0.0
|
||||||
self._row_alignment = RowAlignmentOffset()
|
|
||||||
self._is_first_strip_of_row = False # First row doesn't need alignment
|
# Reset cumulative alignment
|
||||||
|
self._cumulative_align_x = 0.0
|
||||||
|
self._cumulative_align_y = 0.0
|
||||||
|
self._last_strip_alignment = AlignmentOffset()
|
||||||
|
|
||||||
self._thread = threading.Thread(target=self._scan_loop, daemon=True)
|
self._thread = threading.Thread(target=self._scan_loop, daemon=True)
|
||||||
self._thread.start()
|
self._thread.start()
|
||||||
|
|
@ -680,26 +769,13 @@ class StitchingScanner:
|
||||||
self.state.total_rows = row + 1
|
self.state.total_rows = row + 1
|
||||||
|
|
||||||
self.log(f"=== Row {row + 1} ===")
|
self.log(f"=== Row {row + 1} ===")
|
||||||
|
self.log(f"Cumulative alignment at row start: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
|
||||||
|
|
||||||
# Serpentine: even rows right, odd rows left
|
# Serpentine: even rows right, odd rows left
|
||||||
h_direction = ScanDirection.RIGHT if row % 2 == 0 else ScanDirection.LEFT
|
h_direction = ScanDirection.RIGHT if row % 2 == 0 else ScanDirection.LEFT
|
||||||
|
|
||||||
# For rows after the first, detect alignment at the start
|
|
||||||
if row > 0:
|
|
||||||
self._is_first_strip_of_row = True
|
|
||||||
frame = self._capture_frame()
|
|
||||||
self._row_alignment = self._detect_row_alignment(frame, h_direction)
|
|
||||||
self.log(f"Row {row + 1} alignment: X={self._row_alignment.x_offset:.1f}, Y={self._row_alignment.y_offset:.1f}")
|
|
||||||
else:
|
|
||||||
# First row - no alignment needed
|
|
||||||
self._row_alignment = RowAlignmentOffset()
|
|
||||||
self._is_first_strip_of_row = False
|
|
||||||
|
|
||||||
stop_reason = self._scan_direction(h_direction)
|
stop_reason = self._scan_direction(h_direction)
|
||||||
|
|
||||||
# After first strip is appended, clear the flag
|
|
||||||
self._is_first_strip_of_row = False
|
|
||||||
|
|
||||||
if not self.running:
|
if not self.running:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
@ -716,6 +792,7 @@ class StitchingScanner:
|
||||||
row += 1
|
row += 1
|
||||||
|
|
||||||
self.log(f"Scan complete! Final: {self.state.mosaic_width}x{self.state.mosaic_height}")
|
self.log(f"Scan complete! Final: {self.state.mosaic_width}x{self.state.mosaic_height}")
|
||||||
|
self.log(f"Final cumulative alignment: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log(f"Scan error: {e}")
|
self.log(f"Scan error: {e}")
|
||||||
|
|
@ -773,7 +850,7 @@ class StitchingScanner:
|
||||||
break
|
break
|
||||||
|
|
||||||
if self.state.current_x >= 0 and direction == ScanDirection.LEFT:
|
if self.state.current_x >= 0 and direction == ScanDirection.LEFT:
|
||||||
self.log(f"Max dimension reached ({self.config.max_mosaic_width}px)")
|
self.log(f"Returned to start ({self.config.max_mosaic_width}px)")
|
||||||
self.log(f"Current X offset ({self.state.current_x}px) total_x ({total_x}px)")
|
self.log(f"Current X offset ({self.state.current_x}px) total_x ({total_x}px)")
|
||||||
stop_reason = 'max_dim'
|
stop_reason = 'max_dim'
|
||||||
break
|
break
|
||||||
|
|
@ -799,7 +876,6 @@ class StitchingScanner:
|
||||||
total_x += dx
|
total_x += dx
|
||||||
with self._state_lock:
|
with self._state_lock:
|
||||||
self.state.current_x += dx
|
self.state.current_x += dx
|
||||||
self.log(f"Current X offset ({self.state.current_x}px)")
|
|
||||||
|
|
||||||
with self._state_lock:
|
with self._state_lock:
|
||||||
self.state.cumulative_x = self._displacement_since_append_x
|
self.state.cumulative_x = self._displacement_since_append_x
|
||||||
|
|
@ -818,11 +894,11 @@ class StitchingScanner:
|
||||||
else:
|
else:
|
||||||
no_movement_count = 0
|
no_movement_count = 0
|
||||||
|
|
||||||
# Append when threshold reached
|
# Append when threshold reached (with continuous alignment)
|
||||||
disp = abs(self._displacement_since_append_x) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(self._displacement_since_append_y)
|
disp = abs(self._displacement_since_append_x) if direction in [ScanDirection.RIGHT, ScanDirection.LEFT] else abs(self._displacement_since_append_y)
|
||||||
if disp >= threshold_pixels:
|
if disp >= threshold_pixels:
|
||||||
self._append_strip(curr_frame, direction)
|
self._append_strip(curr_frame, direction)
|
||||||
self.log(f"Appended {disp:.1f}px, mosaic: {self.state.mosaic_width}x{self.state.mosaic_height}")
|
self.log(f"Appended {disp:.1f}px, mosaic: {self.state.mosaic_width}x{self.state.mosaic_height}, align: ({self._cumulative_align_x:.1f}, {self._cumulative_align_y:.1f})")
|
||||||
|
|
||||||
self._prev_frame = curr_frame.copy()
|
self._prev_frame = curr_frame.copy()
|
||||||
|
|
||||||
|
|
@ -837,9 +913,10 @@ class StitchingScanner:
|
||||||
def _move_to_next_row(self) -> bool:
|
def _move_to_next_row(self) -> bool:
|
||||||
"""
|
"""
|
||||||
Move down to next row using displacement-based stitching.
|
Move down to next row using displacement-based stitching.
|
||||||
Same approach as horizontal scanning.
|
Same approach as horizontal scanning with continuous alignment.
|
||||||
"""
|
"""
|
||||||
self.log("Moving to next row...")
|
self.log("Moving to next row...")
|
||||||
|
self.log(f"Alignment before row transition: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
|
||||||
|
|
||||||
frame = self._capture_frame()
|
frame = self._capture_frame()
|
||||||
h, w = frame.shape[:2]
|
h, w = frame.shape[:2]
|
||||||
|
|
@ -895,14 +972,16 @@ class StitchingScanner:
|
||||||
else:
|
else:
|
||||||
no_movement_count = 0
|
no_movement_count = 0
|
||||||
|
|
||||||
# Append strip when threshold reached
|
# Append strip when threshold reached (with continuous alignment)
|
||||||
if abs(self._displacement_since_append_y) >= threshold_pixels:
|
if abs(self._displacement_since_append_y) >= threshold_pixels:
|
||||||
self._append_strip(curr_frame, ScanDirection.DOWN)
|
self._append_strip(curr_frame, ScanDirection.DOWN)
|
||||||
self.log(f" Row transition: appended, total Y: {abs(total_y):.1f}px")
|
self.log(f" Row transition: appended, total Y: {abs(total_y):.1f}px, align: ({self._cumulative_align_x:.1f}, {self._cumulative_align_y:.1f})")
|
||||||
|
|
||||||
# Done when we've moved enough
|
# Done when we've moved enough
|
||||||
if abs(total_y) >= target_displacement:
|
if abs(total_y) >= target_displacement:
|
||||||
self.log(f"Row transition complete: {abs(total_y):.1f}px")
|
self.log(f"Row transition complete: {abs(total_y):.1f}px")
|
||||||
|
self.log(f"Alignment after row transition: X={self._cumulative_align_x:.1f}, Y={self._cumulative_align_y:.1f}")
|
||||||
|
|
||||||
with self._state_lock:
|
with self._state_lock:
|
||||||
self.state.current_y = 0
|
self.state.current_y = 0
|
||||||
self.motion.send_command('s')
|
self.motion.send_command('s')
|
||||||
|
|
@ -951,6 +1030,19 @@ class StitchingScanner:
|
||||||
append_count=self.state.append_count
|
append_count=self.state.append_count
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def get_alignment_state(self) -> dict:
|
||||||
|
"""Get current alignment correction state."""
|
||||||
|
return {
|
||||||
|
'cumulative_x': self._cumulative_align_x,
|
||||||
|
'cumulative_y': self._cumulative_align_y,
|
||||||
|
'last_alignment': {
|
||||||
|
'x': self._last_strip_alignment.x_offset,
|
||||||
|
'y': self._last_strip_alignment.y_offset,
|
||||||
|
'confidence': self._last_strip_alignment.confidence,
|
||||||
|
'valid': self._last_strip_alignment.valid
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def get_mosaic(self) -> Optional[np.ndarray]:
|
def get_mosaic(self) -> Optional[np.ndarray]:
|
||||||
with self._mosaic_lock:
|
with self._mosaic_lock:
|
||||||
if self.mosaic is not None:
|
if self.mosaic is not None:
|
||||||
|
|
@ -1006,6 +1098,8 @@ class StitchingScanner:
|
||||||
'y_moved': 0.0,
|
'y_moved': 0.0,
|
||||||
'mosaic_before': (0, 0),
|
'mosaic_before': (0, 0),
|
||||||
'mosaic_after': (0, 0),
|
'mosaic_after': (0, 0),
|
||||||
|
'alignment_before': (0.0, 0.0),
|
||||||
|
'alignment_after': (0.0, 0.0),
|
||||||
'error': None
|
'error': None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1017,6 +1111,7 @@ class StitchingScanner:
|
||||||
self._init_mosaic(frame)
|
self._init_mosaic(frame)
|
||||||
|
|
||||||
results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height)
|
results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height)
|
||||||
|
results['alignment_before'] = (self._cumulative_align_x, self._cumulative_align_y)
|
||||||
|
|
||||||
with self._state_lock:
|
with self._state_lock:
|
||||||
self.state.cumulative_y = 0.0
|
self.state.cumulative_y = 0.0
|
||||||
|
|
@ -1028,8 +1123,10 @@ class StitchingScanner:
|
||||||
results['success'] = success
|
results['success'] = success
|
||||||
results['y_moved'] = self.state.cumulative_y
|
results['y_moved'] = self.state.cumulative_y
|
||||||
results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height)
|
results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height)
|
||||||
|
results['alignment_after'] = (self._cumulative_align_x, self._cumulative_align_y)
|
||||||
|
|
||||||
self.log(f"Row transition: {'SUCCESS' if success else 'FAILED'}, Y: {results['y_moved']:.1f}px")
|
self.log(f"Row transition: {'SUCCESS' if success else 'FAILED'}, Y: {results['y_moved']:.1f}px")
|
||||||
|
self.log(f"Alignment change: ({results['alignment_before'][0]:.1f}, {results['alignment_before'][1]:.1f}) -> ({results['alignment_after'][0]:.1f}, {results['alignment_after'][1]:.1f})")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
results['error'] = str(e)
|
results['error'] = str(e)
|
||||||
|
|
@ -1046,6 +1143,8 @@ class StitchingScanner:
|
||||||
'appends': 0,
|
'appends': 0,
|
||||||
'mosaic_before': (0, 0),
|
'mosaic_before': (0, 0),
|
||||||
'mosaic_after': (0, 0),
|
'mosaic_after': (0, 0),
|
||||||
|
'alignment_before': (0.0, 0.0),
|
||||||
|
'alignment_after': (0.0, 0.0),
|
||||||
'error': None
|
'error': None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1057,6 +1156,7 @@ class StitchingScanner:
|
||||||
self._init_mosaic(frame)
|
self._init_mosaic(frame)
|
||||||
|
|
||||||
results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height)
|
results['mosaic_before'] = (self.state.mosaic_width, self.state.mosaic_height)
|
||||||
|
results['alignment_before'] = (self._cumulative_align_x, self._cumulative_align_y)
|
||||||
appends_before = self.state.append_count
|
appends_before = self.state.append_count
|
||||||
|
|
||||||
self.motion.set_speed(self.config.scan_speed_index)
|
self.motion.set_speed(self.config.scan_speed_index)
|
||||||
|
|
@ -1064,13 +1164,6 @@ class StitchingScanner:
|
||||||
|
|
||||||
self.running = True
|
self.running = True
|
||||||
scan_dir = ScanDirection.RIGHT if direction == 'right' else ScanDirection.LEFT
|
scan_dir = ScanDirection.RIGHT if direction == 'right' else ScanDirection.LEFT
|
||||||
|
|
||||||
# Test row alignment detection if not first row
|
|
||||||
if self.state.current_row > 0 or self.mosaic is not None:
|
|
||||||
frame = self._capture_frame()
|
|
||||||
self._row_alignment = self._detect_row_alignment(frame, scan_dir)
|
|
||||||
self.log(f"Test row alignment: X={self._row_alignment.x_offset:.1f}, Y={self._row_alignment.y_offset:.1f}")
|
|
||||||
|
|
||||||
stop_reason = self._scan_direction(scan_dir)
|
stop_reason = self._scan_direction(scan_dir)
|
||||||
self.running = False
|
self.running = False
|
||||||
|
|
||||||
|
|
@ -1078,6 +1171,9 @@ class StitchingScanner:
|
||||||
results['stop_reason'] = stop_reason
|
results['stop_reason'] = stop_reason
|
||||||
results['appends'] = self.state.append_count - appends_before
|
results['appends'] = self.state.append_count - appends_before
|
||||||
results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height)
|
results['mosaic_after'] = (self.state.mosaic_width, self.state.mosaic_height)
|
||||||
|
results['alignment_after'] = (self._cumulative_align_x, self._cumulative_align_y)
|
||||||
|
|
||||||
|
self.log(f"Alignment change: ({results['alignment_before'][0]:.1f}, {results['alignment_before'][1]:.1f}) -> ({results['alignment_after'][0]:.1f}, {results['alignment_after'][1]:.1f})")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
results['error'] = str(e)
|
results['error'] = str(e)
|
||||||
|
|
@ -1085,17 +1181,18 @@ class StitchingScanner:
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
def test_row_alignment(self, direction: str = 'left') -> dict:
|
def test_strip_alignment(self) -> dict:
|
||||||
"""Test row alignment detection without scanning."""
|
"""Test strip alignment detection at current position."""
|
||||||
results = {
|
results = {
|
||||||
'success': False,
|
'success': False,
|
||||||
'x_offset': 0.0,
|
'x_offset': 0.0,
|
||||||
'y_offset': 0.0,
|
'y_offset': 0.0,
|
||||||
|
'confidence': 0.0,
|
||||||
'error': None
|
'error': None
|
||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.log(f"Testing row alignment detection ({direction})...")
|
self.log("Testing strip alignment detection...")
|
||||||
|
|
||||||
if self.mosaic is None:
|
if self.mosaic is None:
|
||||||
self.log("No mosaic - initializing...")
|
self.log("No mosaic - initializing...")
|
||||||
|
|
@ -1103,14 +1200,20 @@ class StitchingScanner:
|
||||||
self._init_mosaic(frame)
|
self._init_mosaic(frame)
|
||||||
|
|
||||||
frame = self._capture_frame()
|
frame = self._capture_frame()
|
||||||
scan_dir = ScanDirection.LEFT if direction == 'left' else ScanDirection.RIGHT
|
expected_x = int(self.state.current_x + self._cumulative_align_x)
|
||||||
alignment = self._detect_row_alignment(frame, scan_dir)
|
expected_y = int(self.state.current_y + self._cumulative_align_y)
|
||||||
|
|
||||||
|
# Test for both directions
|
||||||
|
for direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
|
||||||
|
alignment = self._detect_strip_alignment(frame, direction, expected_x, expected_y)
|
||||||
|
self.log(f" {direction.value}: valid={alignment.valid}, X={alignment.x_offset:.1f}, Y={alignment.y_offset:.1f}, conf={alignment.confidence:.3f}")
|
||||||
|
|
||||||
|
# Return RIGHT direction result
|
||||||
|
alignment = self._detect_strip_alignment(frame, ScanDirection.RIGHT, expected_x, expected_y)
|
||||||
results['success'] = alignment.valid
|
results['success'] = alignment.valid
|
||||||
results['x_offset'] = alignment.x_offset
|
results['x_offset'] = alignment.x_offset
|
||||||
results['y_offset'] = alignment.y_offset
|
results['y_offset'] = alignment.y_offset
|
||||||
|
results['confidence'] = alignment.confidence
|
||||||
self.log(f"Alignment result: valid={alignment.valid}, X={alignment.x_offset:.1f}, Y={alignment.y_offset:.1f}")
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
results['error'] = str(e)
|
results['error'] = str(e)
|
||||||
|
|
@ -1118,6 +1221,13 @@ class StitchingScanner:
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
def reset_alignment(self):
|
||||||
|
"""Reset cumulative alignment to zero."""
|
||||||
|
self._cumulative_align_x = 0.0
|
||||||
|
self._cumulative_align_y = 0.0
|
||||||
|
self._last_strip_alignment = AlignmentOffset()
|
||||||
|
self.log("Alignment reset to (0, 0)")
|
||||||
|
|
||||||
def get_memory_estimate(self) -> dict:
|
def get_memory_estimate(self) -> dict:
|
||||||
current_bytes = self.mosaic.nbytes if self.mosaic is not None else 0
|
current_bytes = self.mosaic.nbytes if self.mosaic is not None else 0
|
||||||
max_bytes = self.config.max_mosaic_width * self.config.max_mosaic_height * 3
|
max_bytes = self.config.max_mosaic_width * self.config.max_mosaic_height * 3
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue