Compare commits
No commits in common. "7d8d1733c04627ce8357f052dbec36beb554deac" and "65bc86eb52cae312abf403912025df9df29f5c01" have entirely different histories.
7d8d1733c0
...
65bc86eb52
8 changed files with 285 additions and 2876 deletions
BIN
Mos1.png
BIN
Mos1.png
Binary file not shown.
|
Before Width: | Height: | Size: 513 KiB |
BIN
Mos2.png
BIN
Mos2.png
Binary file not shown.
|
Before Width: | Height: | Size: 194 KiB |
310
src/autofocus.py
310
src/autofocus.py
|
|
@ -4,31 +4,16 @@ from vision import calculate_focus_score_sobel
|
|||
|
||||
|
||||
class AutofocusController:
|
||||
"""
|
||||
Manages autofocus operations using visual feedback.
|
||||
|
||||
Key design principle: Never trust step counts due to gear slippage.
|
||||
Use focus score as the authoritative position feedback.
|
||||
"""
|
||||
"""Manages autofocus operations"""
|
||||
|
||||
# Default timing settings
|
||||
DEFAULT_SETTINGS = {
|
||||
# Sweep settings
|
||||
'sweep_move_time': 0.5,
|
||||
'sweep_settle_time': 0.01,
|
||||
'sweep_settle_time': 0.05,
|
||||
'sweep_samples': 10,
|
||||
'sweep_steps': 30,
|
||||
|
||||
# Navigation settings (fast approach to peak)
|
||||
'nav_move_time': 0.3, # Quick bursts
|
||||
'nav_check_samples': 3, # Fewer samples for speed
|
||||
'nav_peak_threshold': 0.80, # Consider "near peak" at 80% of target
|
||||
'nav_decline_threshold': 1.00, # Detect peak passage at 8% drop
|
||||
'nav_max_moves': 100, # Safety limit
|
||||
|
||||
# Fine tuning settings
|
||||
'fine_move_time': 0.30,
|
||||
'fine_settle_time': 0.01,
|
||||
'fine_move_time': 0.15,
|
||||
'fine_settle_time': 0.1,
|
||||
'fine_samples': 10,
|
||||
'fine_max_no_improvement': 5,
|
||||
}
|
||||
|
|
@ -89,11 +74,6 @@ class AutofocusController:
|
|||
|
||||
return sum(scores) / len(scores) if scores else 0
|
||||
|
||||
def get_quick_focus(self, samples=3):
|
||||
"""Quick focus reading with fewer samples for navigation"""
|
||||
scores = [self.get_focus_score() for _ in range(samples)]
|
||||
return sum(scores) / len(scores)
|
||||
|
||||
# === Autofocus Control ===
|
||||
|
||||
def start(self, speed_range=(1, 5), coarse=False, fine=False):
|
||||
|
|
@ -131,33 +111,31 @@ class AutofocusController:
|
|||
|
||||
def _autofocus_routine(self, speed_range):
|
||||
"""
|
||||
Autofocus using sweep search + visual navigation + hill climbing.
|
||||
|
||||
Phase 1: Sweep to map focus curve and find approximate peak
|
||||
Phase 2: Navigate to peak using visual feedback
|
||||
Phase 3: Fine tune with hill climbing
|
||||
Autofocus using sweep search + hill climbing.
|
||||
Phase 1: Sweep across range to find approximate peak
|
||||
Phase 2: Fine tune from best position found
|
||||
"""
|
||||
self.log(f"Starting autofocus (speed range: {speed_range})")
|
||||
|
||||
min_speed_idx, max_speed_idx = speed_range
|
||||
s = self.settings
|
||||
s = self.settings # Shorthand
|
||||
|
||||
try:
|
||||
# Phase 1: Sweep search - map the focus curve
|
||||
best_score, peak_direction = self._sweep_search(
|
||||
# Phase 1: Sweep search
|
||||
best_step, best_score = self._sweep_search(
|
||||
min_speed_idx, max_speed_idx, s
|
||||
)
|
||||
|
||||
if not self.running:
|
||||
return
|
||||
|
||||
# Phase 2: Navigate to peak
|
||||
self._navigate_to_peak(best_score, peak_direction, s)
|
||||
# Move to best position found
|
||||
self._move_to_best_position(best_step, s)
|
||||
|
||||
if not self.running:
|
||||
return
|
||||
|
||||
# Phase 3: Fine tuning with hill climbing
|
||||
# Phase 2: Fine tuning
|
||||
final_score = self._fine_tune(min_speed_idx, s)
|
||||
|
||||
self.log(f"Autofocus complete. Final: {final_score:.1f}")
|
||||
|
|
@ -168,35 +146,30 @@ class AutofocusController:
|
|||
self.running = False
|
||||
|
||||
def _sweep_search(self, min_speed_idx, max_speed_idx, s):
|
||||
"""
|
||||
Phase 1: Sweep to map focus curve.
|
||||
|
||||
Returns:
|
||||
best_score: The highest focus score observed
|
||||
peak_direction: 'up' or 'down' - which direction has the peak
|
||||
"""
|
||||
"""Phase 1: Sweep to find approximate best position"""
|
||||
self.log("Phase 1: Sweep search")
|
||||
|
||||
# Use medium speed for sweep
|
||||
sweep_speed = (min_speed_idx + max_speed_idx) // 2
|
||||
self.motion.set_speed(sweep_speed)
|
||||
time.sleep(0.1)
|
||||
|
||||
# Record starting position score
|
||||
# Record starting position
|
||||
time.sleep(s['sweep_settle_time'])
|
||||
start_score = self.get_averaged_focus(samples=s['sweep_samples'])
|
||||
self.update_focus_display(start_score)
|
||||
|
||||
sweep_data = [(0, start_score)]
|
||||
self.log(f"Start position: {start_score:.1f}")
|
||||
|
||||
sweep_data = {'start': start_score, 'down': [], 'up': []}
|
||||
|
||||
if not self.running:
|
||||
return start_score, 'up'
|
||||
return 0, start_score
|
||||
|
||||
# Sweep DOWN first (away from slide)
|
||||
# Sweep DOWN first (away from slide to avoid contact)
|
||||
self.log(f"Sweeping down {s['sweep_steps']} steps...")
|
||||
for i in range(1, s['sweep_steps'] + 1):
|
||||
if not self.running:
|
||||
return start_score, 'up'
|
||||
return 0, start_score
|
||||
|
||||
self.motion.move_z_down()
|
||||
time.sleep(s['sweep_move_time'])
|
||||
|
|
@ -204,24 +177,35 @@ class AutofocusController:
|
|||
time.sleep(s['sweep_settle_time'])
|
||||
|
||||
score = self.get_averaged_focus(samples=s['sweep_samples'])
|
||||
sweep_data['down'].append(score)
|
||||
sweep_data.append((-i, score))
|
||||
self.update_focus_display(score)
|
||||
|
||||
if i % 5 == 0:
|
||||
self.log(f" Down {i}: {score:.1f}")
|
||||
|
||||
# Remember score at bottom
|
||||
bottom_score = sweep_data['down'][-1] if sweep_data['down'] else start_score
|
||||
self.log(f" Step -{i}: {score:.1f}")
|
||||
|
||||
if not self.running:
|
||||
return start_score, 'up'
|
||||
return 0, start_score
|
||||
|
||||
# Now sweep UP past start to the other side
|
||||
# We'll go UP for 2x sweep_steps (to cover both halves)
|
||||
self.log(f"Sweeping up {s['sweep_steps'] * 2} steps...")
|
||||
for i in range(1, s['sweep_steps'] * 2 + 1):
|
||||
# Return to start
|
||||
self.log("Returning to start...")
|
||||
for _ in range(s['sweep_steps']):
|
||||
if not self.running:
|
||||
return start_score, 'down'
|
||||
return 0, start_score
|
||||
self.motion.move_z_up()
|
||||
time.sleep(s['sweep_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(0.1)
|
||||
|
||||
time.sleep(s['sweep_settle_time'])
|
||||
|
||||
if not self.running:
|
||||
return 0, start_score
|
||||
|
||||
# Sweep UP
|
||||
self.log(f"Sweeping up {s['sweep_steps']} steps...")
|
||||
for i in range(1, s['sweep_steps'] + 1):
|
||||
if not self.running:
|
||||
return 0, start_score
|
||||
|
||||
self.motion.move_z_up()
|
||||
time.sleep(s['sweep_move_time'])
|
||||
|
|
@ -229,145 +213,73 @@ class AutofocusController:
|
|||
time.sleep(s['sweep_settle_time'])
|
||||
|
||||
score = self.get_averaged_focus(samples=s['sweep_samples'])
|
||||
sweep_data['up'].append(score)
|
||||
sweep_data.append((i, score))
|
||||
self.update_focus_display(score)
|
||||
|
||||
if i % 5 == 0:
|
||||
self.log(f" Up {i}: {score:.1f}")
|
||||
self.log(f" Step +{i}: {score:.1f}")
|
||||
|
||||
# Analyze sweep data to find peak
|
||||
all_scores = sweep_data['down'] + [start_score] + sweep_data['up']
|
||||
best_score = max(all_scores)
|
||||
# Find best position
|
||||
best_step, best_score = max(sweep_data, key=lambda x: x[1])
|
||||
self.log(f"Best found at step {best_step}: {best_score:.1f}")
|
||||
|
||||
down_best = max(sweep_data['down']) if sweep_data['down'] else 0
|
||||
up_best = max(sweep_data['up']) if sweep_data['up'] else 0
|
||||
# Log sweep curve
|
||||
sorted_data = sorted(sweep_data, key=lambda x: x[0])
|
||||
curve_str = " ".join([f"{d[1]:.0f}" for d in sorted_data])
|
||||
self.log(f"Sweep curve: {curve_str}")
|
||||
|
||||
# Determine which direction has the peak
|
||||
# We're currently at the TOP of the sweep (after going up)
|
||||
# So to reach the peak, we need to go DOWN if peak was in down region or middle
|
||||
if down_best > up_best and down_best > start_score:
|
||||
peak_direction = 'down'
|
||||
self.log(f"Peak in DOWN region: {down_best:.1f}")
|
||||
elif up_best >= down_best and up_best > start_score:
|
||||
# Peak is in up region, but we need to check where in the up region
|
||||
# If peak is in first half of up (returning toward start), go down
|
||||
# If peak is in second half of up (past start), we might already be close
|
||||
up_scores = sweep_data['up']
|
||||
mid_point = len(up_scores) // 2
|
||||
first_half_best = max(up_scores[:mid_point]) if mid_point > 0 else 0
|
||||
second_half_best = max(up_scores[mid_point:]) if mid_point < len(up_scores) else 0
|
||||
return best_step, best_score
|
||||
|
||||
if second_half_best >= first_half_best:
|
||||
# Peak is near current position (top), go down slightly
|
||||
peak_direction = 'down'
|
||||
def _move_to_best_position(self, best_step, s):
|
||||
"""Move from current position (+SWEEP_STEPS) to best_step"""
|
||||
steps_to_move = s['sweep_steps'] - best_step
|
||||
|
||||
if steps_to_move > 0:
|
||||
self.log(f"Moving down {steps_to_move} steps to best position")
|
||||
move_func = self.motion.move_z_down
|
||||
else:
|
||||
# Peak is further down
|
||||
peak_direction = 'down'
|
||||
self.log(f"Peak in UP region: {up_best:.1f}")
|
||||
else:
|
||||
peak_direction = 'down'
|
||||
self.log(f"Peak near start: {start_score:.1f}")
|
||||
self.log(f"Moving up {abs(steps_to_move)} steps to best position")
|
||||
move_func = self.motion.move_z_up
|
||||
steps_to_move = abs(steps_to_move)
|
||||
|
||||
# Log sweep curve for debugging
|
||||
self.log(f"Best score found: {best_score:.1f}")
|
||||
self.log(f"Navigate direction: {peak_direction}")
|
||||
|
||||
return best_score, peak_direction
|
||||
|
||||
def _navigate_to_peak(self, target_score, direction, s):
|
||||
"""
|
||||
Phase 2: Navigate toward peak using camera
|
||||
"""
|
||||
self.log(f"Phase 2: Navigate to peak (target: {target_score:.1f}, dir: {direction})")
|
||||
|
||||
move_func = self.motion.move_z_down if direction == 'down' else self.motion.move_z_up
|
||||
|
||||
# Use medium-fast speed for approach
|
||||
self.motion.set_speed(4)
|
||||
time.sleep(0.1)
|
||||
|
||||
peak_threshold = target_score * s['nav_peak_threshold']
|
||||
decline_threshold = s['nav_decline_threshold']
|
||||
self.log(f"(peak_threshold: {peak_threshold:.1f}, decline_threshold: {decline_threshold})")
|
||||
|
||||
prev_score = self.get_quick_focus(s['nav_check_samples'])
|
||||
max_observed = prev_score
|
||||
moves_in_peak_region = 0
|
||||
declining_streak = 0
|
||||
|
||||
for move_count in range(s['nav_max_moves']):
|
||||
for _ in range(steps_to_move):
|
||||
if not self.running:
|
||||
return
|
||||
|
||||
# Quick movement burst
|
||||
move_func()
|
||||
time.sleep(s['nav_move_time'])
|
||||
time.sleep(s['sweep_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(0.05) # Brief settle
|
||||
time.sleep(0.1)
|
||||
|
||||
# Check focus
|
||||
current_score = self.get_quick_focus(s['nav_check_samples'])
|
||||
self.log(f" Current: {current_score:.1f} (max was {max_observed:.1f}) (Declining {declining_streak:.1f}) (Decline Thres {max_observed * decline_threshold:.1f})")
|
||||
self.log(f" Current: {current_score:.1f} >= peak thresh({peak_threshold:.1f}) ")
|
||||
self.log(f" Current: {current_score:.1f} <= max_observed * decline_threshold:({max_observed * decline_threshold:.1f}) ")
|
||||
self.update_focus_display(current_score)
|
||||
|
||||
# Track maximum observed
|
||||
if current_score > max_observed:
|
||||
max_observed = current_score
|
||||
declining_streak = 0
|
||||
self.log("Setting Max")
|
||||
elif max_observed > 0.8 * target_score:
|
||||
declining_streak += 1
|
||||
|
||||
if declining_streak >=2:
|
||||
self.log(f" Passed peak after {move_count} moves, stopping")
|
||||
break
|
||||
|
||||
# # Are we in the peak region?
|
||||
# if current_score >= peak_threshold:
|
||||
# moves_in_peak_region += 1
|
||||
|
||||
# # Check if we're past the peak (score declining from max)
|
||||
# if current_score < max_observed * decline_threshold:
|
||||
# declining_streak += 1
|
||||
# self.log(f" Declining: {current_score:.1f} (max was {max_observed:.1f})")
|
||||
|
||||
# if declining_streak >= 1:
|
||||
# self.log(f" Passed peak after {move_count} moves, stopping")
|
||||
# break
|
||||
# else:
|
||||
# declining_streak = 0
|
||||
# if moves_in_peak_region % 3 == 0:
|
||||
# self.log(f" In peak region: {current_score:.1f}")
|
||||
|
||||
prev_score = current_score
|
||||
|
||||
final_score = self.get_averaged_focus(samples=s['sweep_samples'])
|
||||
self.log(f"Navigation complete at: {final_score:.1f}")
|
||||
time.sleep(s['sweep_settle_time'])
|
||||
current_score = self.get_averaged_focus(samples=s['sweep_samples'])
|
||||
self.log(f"At best position: {current_score:.1f}")
|
||||
|
||||
def _fine_tune(self, min_speed_idx, s):
|
||||
"""
|
||||
Phase 3: Fine hill-climbing from current position.
|
||||
|
||||
Already near the peak from navigation, now do precise adjustments.
|
||||
"""
|
||||
self.log("Phase 3: Fine tuning")
|
||||
"""Phase 2: Fine hill-climbing from best position"""
|
||||
self.log("Phase 2: Fine tuning")
|
||||
|
||||
self.motion.set_speed(min_speed_idx)
|
||||
time.sleep(0.1)
|
||||
|
||||
best_score = self.get_averaged_focus(samples=s['fine_samples'])
|
||||
self.log(f"Starting fine tune at: {best_score:.1f}")
|
||||
|
||||
# Try both directions to find improvement
|
||||
# Determine fine direction by testing both
|
||||
fine_direction = self._determine_fine_direction(best_score, s)
|
||||
|
||||
if not self.running:
|
||||
return best_score
|
||||
|
||||
# Search in best direction
|
||||
best_score = self._fine_search(fine_direction, best_score, s)
|
||||
# Fine search
|
||||
best_score, best_position_offset = self._fine_search(
|
||||
fine_direction, best_score, s
|
||||
)
|
||||
|
||||
if not self.running:
|
||||
return best_score
|
||||
|
||||
# Return to best position
|
||||
if best_position_offset > 0:
|
||||
self._return_to_best(fine_direction, best_position_offset, s)
|
||||
|
||||
# Final reading
|
||||
time.sleep(s['fine_settle_time'])
|
||||
|
|
@ -376,9 +288,9 @@ class AutofocusController:
|
|||
|
||||
return final_score
|
||||
|
||||
def _determine_fine_direction(self, current_score, s):
|
||||
def _determine_fine_direction(self, best_score, s):
|
||||
"""Test both directions to find which improves focus"""
|
||||
# Try DOWN
|
||||
# Try DOWN first
|
||||
self.motion.move_z_down()
|
||||
time.sleep(s['fine_move_time'])
|
||||
self.motion.stop_z()
|
||||
|
|
@ -386,11 +298,11 @@ class AutofocusController:
|
|||
|
||||
down_score = self.get_averaged_focus(samples=s['fine_samples'])
|
||||
|
||||
if down_score > current_score * 1.02: # 2% improvement threshold
|
||||
self.log(f"Fine direction: DOWN ({down_score:.1f} > {current_score:.1f})")
|
||||
if down_score > best_score:
|
||||
self.log(f"Fine direction: DOWN ({down_score:.1f})")
|
||||
return 'down'
|
||||
|
||||
# Try UP (go back and past)
|
||||
# Go back and try UP
|
||||
self.motion.move_z_up()
|
||||
time.sleep(s['fine_move_time'])
|
||||
self.motion.stop_z()
|
||||
|
|
@ -403,29 +315,25 @@ class AutofocusController:
|
|||
|
||||
up_score = self.get_averaged_focus(samples=s['fine_samples'])
|
||||
|
||||
if up_score > current_score * 1.02:
|
||||
self.log(f"Fine direction: UP ({up_score:.1f} > {current_score:.1f})")
|
||||
if up_score > best_score:
|
||||
self.log(f"Fine direction: UP ({up_score:.1f})")
|
||||
return 'up'
|
||||
|
||||
# Already at peak, return to center
|
||||
self.log("Already at peak")
|
||||
# Already at peak
|
||||
self.log("Already at peak, minor adjustment only")
|
||||
self.motion.move_z_down()
|
||||
time.sleep(s['fine_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(s['fine_settle_time'])
|
||||
|
||||
return 'up' # Default direction for minor adjustments
|
||||
return 'up'
|
||||
|
||||
def _fine_search(self, direction, best_score, s):
|
||||
"""
|
||||
Search in given direction until no improvement.
|
||||
Uses visual feedback to track best position.
|
||||
"""
|
||||
"""Search in given direction until no improvement"""
|
||||
move_func = self.motion.move_z_up if direction == 'up' else self.motion.move_z_down
|
||||
reverse_func = self.motion.move_z_down if direction == 'up' else self.motion.move_z_up
|
||||
|
||||
no_improvement_count = 0
|
||||
moves_since_best = 0
|
||||
best_position_offset = 0
|
||||
|
||||
while self.running and no_improvement_count < s['fine_max_no_improvement']:
|
||||
move_func()
|
||||
|
|
@ -439,25 +347,21 @@ class AutofocusController:
|
|||
if current_score > best_score:
|
||||
best_score = current_score
|
||||
no_improvement_count = 0
|
||||
moves_since_best = 0
|
||||
self.log(f"Fine improved: {current_score:.1f}")
|
||||
best_position_offset = 0
|
||||
self.log(f"Fine better: {current_score:.1f}")
|
||||
else:
|
||||
no_improvement_count += 1
|
||||
moves_since_best += 1
|
||||
best_position_offset += 1
|
||||
|
||||
# Go back to best position using visual feedback
|
||||
if moves_since_best > 0:
|
||||
self.log(f"Reversing {moves_since_best} moves toward best")
|
||||
for _ in range(moves_since_best):
|
||||
return best_score, best_position_offset
|
||||
|
||||
def _return_to_best(self, direction, steps, s):
|
||||
"""Return to best position by reversing direction"""
|
||||
self.log(f"Returning {steps} steps")
|
||||
reverse_func = self.motion.move_z_down if direction == 'up' else self.motion.move_z_up
|
||||
|
||||
for _ in range(steps):
|
||||
reverse_func()
|
||||
time.sleep(s['fine_move_time'])
|
||||
self.motion.stop_z()
|
||||
|
||||
# Check if we're back at peak
|
||||
check_score = self.get_quick_focus(3)
|
||||
if check_score >= best_score * 0.98:
|
||||
break
|
||||
|
||||
time.sleep(0.05)
|
||||
|
||||
return best_score
|
||||
time.sleep(0.1)
|
||||
168
src/camera.py
168
src/camera.py
|
|
@ -1,183 +1,22 @@
|
|||
import cv2
|
||||
import subprocess
|
||||
import re
|
||||
|
||||
|
||||
def detect_camera_modes(device_id=0):
|
||||
"""
|
||||
Detect available camera resolutions using v4l2-ctl.
|
||||
Returns dict of modes sorted by resolution (smallest to largest).
|
||||
"""
|
||||
modes = {}
|
||||
|
||||
try:
|
||||
# Run v4l2-ctl to get supported formats
|
||||
device = f"/dev/video{device_id}"
|
||||
result = subprocess.run(
|
||||
['v4l2-ctl', '--device', device, '--list-formats-ext'],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
print(f"v4l2-ctl failed: {result.stderr}")
|
||||
return _get_fallback_modes()
|
||||
|
||||
# Parse output for "Size: Discrete WxH" lines
|
||||
# Example: "Size: Discrete 2592x1944"
|
||||
size_pattern = re.compile(r'Size:\s+Discrete\s+(\d+)x(\d+)')
|
||||
|
||||
resolutions = set() # Use set to avoid duplicates
|
||||
for line in result.stdout.split('\n'):
|
||||
match = size_pattern.search(line)
|
||||
if match:
|
||||
width = int(match.group(1))
|
||||
height = int(match.group(2))
|
||||
resolutions.add((width, height))
|
||||
|
||||
if not resolutions:
|
||||
print("No resolutions found in v4l2-ctl output")
|
||||
return _get_fallback_modes()
|
||||
|
||||
# Sort by total pixels (width * height)
|
||||
sorted_res = sorted(resolutions, key=lambda r: r[0] * r[1])
|
||||
|
||||
# Build modes dict with descriptive names
|
||||
for i, (width, height) in enumerate(sorted_res):
|
||||
pixels = width * height
|
||||
|
||||
# Generate a name based on position/size
|
||||
if i == 0:
|
||||
name = 'low'
|
||||
desc = 'Low'
|
||||
elif i == len(sorted_res) - 1:
|
||||
name = 'high'
|
||||
desc = 'High'
|
||||
elif len(sorted_res) == 3 and i == 1:
|
||||
name = 'medium'
|
||||
desc = 'Medium'
|
||||
else:
|
||||
name = f'res_{width}x{height}'
|
||||
desc = f'{width}x{height}'
|
||||
|
||||
modes[name] = {
|
||||
'width': width,
|
||||
'height': height,
|
||||
'label': f'{width}x{height} ({desc})'
|
||||
}
|
||||
|
||||
print(f"Detected {len(modes)} camera modes: {list(modes.keys())}")
|
||||
return modes
|
||||
|
||||
except FileNotFoundError:
|
||||
print("v4l2-ctl not found, using fallback modes")
|
||||
return _get_fallback_modes()
|
||||
except subprocess.TimeoutExpired:
|
||||
print("v4l2-ctl timed out, using fallback modes")
|
||||
return _get_fallback_modes()
|
||||
except Exception as e:
|
||||
print(f"Error detecting camera modes: {e}")
|
||||
return _get_fallback_modes()
|
||||
|
||||
|
||||
def _get_fallback_modes():
|
||||
"""Fallback modes if v4l2-ctl detection fails"""
|
||||
return {
|
||||
'low': {'width': 640, 'height': 480, 'label': '640x480 (Low)'},
|
||||
'medium': {'width': 1280, 'height': 960, 'label': '1280x960 (Medium)'},
|
||||
'high': {'width': 1920, 'height': 1080, 'label': '1920x1080 (High)'},
|
||||
}
|
||||
|
||||
|
||||
class Camera:
|
||||
prevFrame = {}
|
||||
def __init__(self, device_id=0, mode=None):
|
||||
self.device_id = device_id
|
||||
|
||||
# Detect available modes before opening camera
|
||||
self.MODES = detect_camera_modes(device_id)
|
||||
|
||||
# Open camera
|
||||
def __init__(self, device_id=0):
|
||||
self.cap = cv2.VideoCapture(device_id)
|
||||
|
||||
if not self.cap.isOpened():
|
||||
raise RuntimeError("Could not open camera, stop program")
|
||||
|
||||
# Default to highest resolution if no mode specified
|
||||
if mode is None:
|
||||
mode = list(self.MODES.keys())[0] # Last = highest res
|
||||
|
||||
self.current_mode = mode
|
||||
self._apply_mode(mode)
|
||||
|
||||
# set resolution
|
||||
# self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
|
||||
# self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
|
||||
self.cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0) # Disable auto-exposure
|
||||
self.cap.set(cv2.CAP_PROP_EXPOSURE, -6) # Set fixed exposure
|
||||
self.cap.set(cv2.CAP_PROP_AUTO_WB, 0) # Disable auto white balance
|
||||
|
||||
self.window_name = "AutoScope"
|
||||
|
||||
def _apply_mode(self, mode_name):
|
||||
"""Apply resolution settings"""
|
||||
if mode_name not in self.MODES:
|
||||
print(f"Unknown mode {mode_name}, using first available")
|
||||
mode_name = list(self.MODES.keys())[0]
|
||||
|
||||
mode = self.MODES[mode_name]
|
||||
|
||||
# Set resolution
|
||||
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, mode['width'])
|
||||
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, mode['height'])
|
||||
|
||||
self.current_mode = mode_name
|
||||
|
||||
# Verify settings took effect
|
||||
actual_w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
actual_h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
|
||||
|
||||
print(f"Camera mode: {mode['label']}")
|
||||
print(f" Actual: {actual_w}x{actual_h} @ {actual_fps:.1f}fps")
|
||||
|
||||
return actual_w, actual_h, actual_fps
|
||||
|
||||
def set_mode(self, mode_name):
|
||||
"""Change camera mode (resolution/framerate)"""
|
||||
return self._apply_mode(mode_name)
|
||||
|
||||
def get_mode(self):
|
||||
"""Get current mode name"""
|
||||
return self.current_mode
|
||||
|
||||
def get_mode_info(self):
|
||||
"""Get current mode details"""
|
||||
return self.MODES.get(self.current_mode, list(self.MODES.values())[0])
|
||||
|
||||
def get_resolution(self):
|
||||
"""Get current actual resolution"""
|
||||
w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
return w, h
|
||||
|
||||
def get_fps(self):
|
||||
"""Get current actual FPS"""
|
||||
return self.cap.get(cv2.CAP_PROP_FPS)
|
||||
|
||||
def get_available_modes(self):
|
||||
"""Get list of available mode names"""
|
||||
return list(self.MODES.keys())
|
||||
|
||||
def get_mode_labels(self):
|
||||
"""Get mode labels for UI"""
|
||||
return {k: v['label'] for k, v in self.MODES.items()}
|
||||
|
||||
def capture_frame(self):
|
||||
ret, frame = self.cap.read()
|
||||
if not ret:
|
||||
return prevframe
|
||||
prevframe = frame
|
||||
raise RuntimeError("Failed to capture frame, stop program")
|
||||
return frame
|
||||
|
||||
def show_frame(self, frame):
|
||||
|
|
@ -230,3 +69,4 @@ class Camera:
|
|||
break
|
||||
|
||||
self.close_window()
|
||||
|
||||
907
src/gui.py
907
src/gui.py
File diff suppressed because it is too large
Load diff
|
|
@ -2,12 +2,12 @@ class MotionController:
|
|||
# Command mapping for each axis
|
||||
AXIS_COMMANDS = {
|
||||
'X': {'pos': 'E', 'neg': 'W', 'stop': 'e'},
|
||||
'Y': {'pos': 'S', 'neg': 'N', 'stop': 'n'},
|
||||
'Y': {'pos': 'N', 'neg': 'S', 'stop': 'n'},
|
||||
'Z': {'pos': 'U', 'neg': 'D', 'stop': 'u'}
|
||||
}
|
||||
|
||||
# Speed values matching Arduino speedArr
|
||||
SPEED_VALUES = [0, 1, 2, 5, 10, 30, 50, 70]
|
||||
SPEED_VALUES = [0, 2, 5, 10, 30, 50, 70]
|
||||
|
||||
def __init__(self, arduino, on_command_sent=None):
|
||||
"""
|
||||
|
|
|
|||
1016
src/scanner.py
1016
src/scanner.py
File diff suppressed because it is too large
Load diff
|
|
@ -1,734 +0,0 @@
|
|||
"""
|
||||
Stitching Scanner v2 - Fixed displacement tracking
|
||||
|
||||
Key fix: Track displacement since last APPEND, not just cumulative.
|
||||
The strip width must match actual movement since we last added to the mosaic.
|
||||
"""
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import time
|
||||
import threading
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional, Callable, Tuple
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class ScanDirection(Enum):
|
||||
"""Scan direction constants"""
|
||||
RIGHT = 'right' # X+ (E command)
|
||||
LEFT = 'left' # X- (W command)
|
||||
DOWN = 'down' # Y- (N command)
|
||||
UP = 'up' # Y+ (S command)
|
||||
|
||||
|
||||
@dataclass
|
||||
class StitchConfig:
|
||||
"""Stitching scanner configuration"""
|
||||
# Displacement threshold (percentage of frame size)
|
||||
displacement_threshold: float = 0.10 # 10% of frame dimension
|
||||
|
||||
# Movement timing
|
||||
movement_interval: float = 0.001 # Seconds of motor on time
|
||||
frame_interval: float = 0.25 # Seconds between frame captures (settle time)
|
||||
settle_time: float = 0.5 # Seconds to wait after stopping
|
||||
max_scan_time: float = 2400.0 # Safety timeout (5 minutes)
|
||||
|
||||
# Scan pattern
|
||||
rows: int = 3
|
||||
row_overlap: float = 0.15
|
||||
|
||||
# Speed setting for scanning
|
||||
scan_speed_index: int = 3
|
||||
|
||||
# Focus
|
||||
autofocus_every_row: bool = True
|
||||
|
||||
# Memory management
|
||||
max_mosaic_width: int = 11000
|
||||
max_mosaic_height: int = 11000
|
||||
# 11000, 24500, 450000
|
||||
|
||||
@dataclass
|
||||
class StitchState:
|
||||
"""Current state for visualization"""
|
||||
is_scanning: bool = False
|
||||
direction: str = ''
|
||||
|
||||
# Displacement tracking
|
||||
cumulative_x: float = 0.0
|
||||
cumulative_y: float = 0.0
|
||||
last_displacement: Tuple[float, float] = (0.0, 0.0)
|
||||
|
||||
# Progress
|
||||
current_row: int = 0
|
||||
total_rows: int = 0
|
||||
|
||||
# Mosaic size
|
||||
mosaic_width: int = 0
|
||||
mosaic_height: int = 0
|
||||
|
||||
# Debug
|
||||
frame_count: int = 0
|
||||
append_count: int = 0
|
||||
|
||||
|
||||
class StitchingScanner:
|
||||
"""
|
||||
Slide scanner using continuous stitching with correct displacement tracking.
|
||||
|
||||
Key insight: We must track displacement since the LAST APPEND, and the
|
||||
strip we append must exactly match that displacement.
|
||||
"""
|
||||
|
||||
def __init__(self, camera, motion_controller, autofocus_controller=None,
|
||||
config: StitchConfig = None,
|
||||
on_log: Callable[[str], None] = None,
|
||||
on_progress: Callable[[int, int], None] = None,
|
||||
on_mosaic_updated: Callable[[], None] = None):
|
||||
self.camera = camera
|
||||
self.motion = motion_controller
|
||||
self.autofocus = autofocus_controller
|
||||
self.config = config or StitchConfig()
|
||||
|
||||
# Callbacks
|
||||
self.on_log = on_log
|
||||
self.on_progress = on_progress
|
||||
self.on_mosaic_updated = on_mosaic_updated
|
||||
|
||||
# State
|
||||
self.running = False
|
||||
self.paused = False
|
||||
self.state = StitchState()
|
||||
self._state_lock = threading.Lock()
|
||||
|
||||
# Mosaic data
|
||||
self.mosaic: Optional[np.ndarray] = None
|
||||
self._mosaic_lock = threading.Lock()
|
||||
|
||||
# Frame tracking - KEY CHANGE: separate reference for displacement calc vs append
|
||||
self._prev_frame: Optional[np.ndarray] = None # For frame-to-frame displacement
|
||||
self._append_ref_frame: Optional[np.ndarray] = None # Reference from last append
|
||||
self._displacement_since_append_x: float = 0.0 # Accumulated since last append
|
||||
self._displacement_since_append_y: float = 0.0
|
||||
|
||||
# Thread
|
||||
self._thread: Optional[threading.Thread] = None
|
||||
|
||||
def log(self, message: str):
|
||||
"""Log a message"""
|
||||
if self.on_log:
|
||||
self.on_log(f"[Stitch] {message}")
|
||||
print(f"[Stitch] {message}")
|
||||
|
||||
# =========================================================================
|
||||
# Displacement Detection
|
||||
# =========================================================================
|
||||
|
||||
def _to_grayscale(self, frame: np.ndarray) -> np.ndarray:
|
||||
"""Convert frame to grayscale"""
|
||||
if len(frame.shape) == 3:
|
||||
return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
return frame
|
||||
|
||||
def _detect_displacement(self, prev_frame: np.ndarray,
|
||||
curr_frame: np.ndarray) -> Tuple[float, float]:
|
||||
"""
|
||||
Detect displacement between two frames using phase correlation.
|
||||
Returns (dx, dy) in pixels.
|
||||
"""
|
||||
prev_gray = self._to_grayscale(prev_frame)
|
||||
curr_gray = self._to_grayscale(curr_frame)
|
||||
|
||||
if prev_gray.shape != curr_gray.shape:
|
||||
return (0.0, 0.0)
|
||||
|
||||
prev_f = prev_gray.astype(np.float32)
|
||||
curr_f = curr_gray.astype(np.float32)
|
||||
|
||||
# Apply window function to reduce edge effects
|
||||
h, w = prev_gray.shape
|
||||
window = cv2.createHanningWindow((w, h), cv2.CV_32F)
|
||||
prev_f = prev_f * window
|
||||
curr_f = curr_f * window
|
||||
|
||||
shift, response = cv2.phaseCorrelate(prev_f, curr_f)
|
||||
dx, dy = shift
|
||||
|
||||
return (dx, dy)
|
||||
|
||||
def _detect_displacement_robust(self, prev_frame: np.ndarray,
|
||||
curr_frame: np.ndarray) -> Tuple[float, float]:
|
||||
"""Displacement detection with sanity checks"""
|
||||
dx, dy = self._detect_displacement(prev_frame, curr_frame)
|
||||
|
||||
h, w = prev_frame.shape[:2]
|
||||
max_displacement = max(w, h) * 0.5
|
||||
|
||||
if abs(dx) > max_displacement or abs(dy) > max_displacement:
|
||||
self.log(f"Warning: Large displacement ({dx:.1f}, {dy:.1f}), ignoring")
|
||||
return (0.0, 0.0)
|
||||
|
||||
return (dx, dy)
|
||||
|
||||
# =========================================================================
|
||||
# Mosaic Building - FIXED VERSION
|
||||
# =========================================================================
|
||||
|
||||
def _init_mosaic(self, frame: np.ndarray):
|
||||
"""Initialize mosaic with first frame"""
|
||||
with self._mosaic_lock:
|
||||
self.mosaic = frame.copy()
|
||||
|
||||
# Set reference frames
|
||||
self._prev_frame = frame.copy()
|
||||
self._append_ref_frame = frame.copy()
|
||||
self._displacement_since_append_x = 0.0
|
||||
self._displacement_since_append_y = 0.0
|
||||
|
||||
with self._state_lock:
|
||||
h, w = frame.shape[:2]
|
||||
self.state.mosaic_width = w
|
||||
self.state.mosaic_height = h
|
||||
self.state.frame_count = 1
|
||||
self.state.append_count = 0
|
||||
|
||||
self.log(f"Initialized mosaic: {frame.shape[1]}x{frame.shape[0]}")
|
||||
|
||||
def _blend_strips_horizontal(self, base: np.ndarray, strip: np.ndarray,
|
||||
blend_width: int, append_right: bool) -> np.ndarray:
|
||||
"""Blend strip onto base with gradient at seam to hide discontinuities."""
|
||||
if blend_width <= 0 or blend_width >= strip.shape[1]:
|
||||
if append_right:
|
||||
return np.hstack([base, strip])
|
||||
else:
|
||||
return np.hstack([strip, base])
|
||||
|
||||
h_base, w_base = base.shape[:2]
|
||||
h_strip, w_strip = strip.shape[:2]
|
||||
|
||||
if h_strip != h_base:
|
||||
# Height mismatch - can't blend properly
|
||||
if append_right:
|
||||
return np.hstack([base, strip])
|
||||
return np.hstack([strip, base])
|
||||
|
||||
blend_w = min(blend_width, w_strip, w_base)
|
||||
|
||||
if append_right:
|
||||
# base | blend_zone | rest_of_strip
|
||||
result_width = w_base + w_strip - blend_w
|
||||
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
|
||||
|
||||
# Copy base
|
||||
result[:, :w_base] = base
|
||||
|
||||
# Create gradient: 1->0 for base weight
|
||||
alpha = np.linspace(1, 0, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
|
||||
|
||||
base_overlap = base[:, -blend_w:].astype(np.float32)
|
||||
strip_overlap = strip[:, :blend_w].astype(np.float32)
|
||||
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
|
||||
|
||||
result[:, w_base - blend_w:w_base] = blended
|
||||
result[:, w_base:] = strip[:, blend_w:]
|
||||
|
||||
return result
|
||||
else:
|
||||
# rest_of_strip | blend_zone | base
|
||||
result_width = w_base + w_strip - blend_w
|
||||
result = np.zeros((h_base, result_width, 3), dtype=np.uint8)
|
||||
|
||||
result[:, :w_strip] = strip
|
||||
|
||||
alpha = np.linspace(0, 1, blend_w, dtype=np.float32)[np.newaxis, :, np.newaxis]
|
||||
|
||||
strip_overlap = strip[:, -blend_w:].astype(np.float32)
|
||||
base_overlap = base[:, :blend_w].astype(np.float32)
|
||||
blended = (strip_overlap * (1 - alpha) + base_overlap * alpha).astype(np.uint8)
|
||||
|
||||
result[:, w_strip - blend_w:w_strip] = blended
|
||||
result[:, w_strip:] = base[:, blend_w:]
|
||||
|
||||
return result
|
||||
|
||||
def _append_to_mosaic_fixed(self, frame: np.ndarray, direction: ScanDirection):
|
||||
"""
|
||||
FIXED: Append with blending and fractional pixel preservation.
|
||||
|
||||
Key improvements:
|
||||
1. Gradient blending at seams to hide color discontinuities
|
||||
2. Preserve fractional pixel remainder to prevent cumulative drift
|
||||
3. Small safety margin for alignment tolerance
|
||||
"""
|
||||
BLEND_WIDTH = 10 # Pixels to blend at seam
|
||||
SAFETY_MARGIN = 2 # Extra pixels as tolerance
|
||||
|
||||
with self._mosaic_lock:
|
||||
if self.mosaic is None:
|
||||
return
|
||||
|
||||
h, w = frame.shape[:2]
|
||||
mh, mw = self.mosaic.shape[:2]
|
||||
|
||||
dx = abs(self._displacement_since_append_x)
|
||||
dy = abs(self._displacement_since_append_y)
|
||||
|
||||
if direction in [ScanDirection.RIGHT, ScanDirection.LEFT]:
|
||||
# Round and add safety margin
|
||||
append_width = round(dx) + SAFETY_MARGIN
|
||||
append_width = min(append_width, w - BLEND_WIDTH - 5)
|
||||
|
||||
if append_width < 1:
|
||||
return
|
||||
|
||||
# Calculate fractional remainder to preserve
|
||||
pixels_consumed = append_width - SAFETY_MARGIN
|
||||
fractional_remainder = dx - pixels_consumed
|
||||
|
||||
if direction == ScanDirection.RIGHT:
|
||||
# Grab strip with extra for blending
|
||||
strip_start = max(0, w - append_width - BLEND_WIDTH)
|
||||
new_strip = frame[:, strip_start:]
|
||||
self.mosaic = self._blend_strips_horizontal(
|
||||
self.mosaic, new_strip, BLEND_WIDTH, append_right=True)
|
||||
else:
|
||||
strip_end = min(w, append_width + BLEND_WIDTH)
|
||||
new_strip = frame[:, :strip_end]
|
||||
self.mosaic = self._blend_strips_horizontal(
|
||||
self.mosaic, new_strip, BLEND_WIDTH, append_right=False)
|
||||
|
||||
# KEEP fractional remainder instead of resetting to 0!
|
||||
self._displacement_since_append_x = fractional_remainder
|
||||
self._displacement_since_append_y = 0.0
|
||||
|
||||
elif direction in [ScanDirection.DOWN, ScanDirection.UP]:
|
||||
append_height = round(dy) + SAFETY_MARGIN
|
||||
append_height = min(append_height, h - BLEND_WIDTH - 5)
|
||||
|
||||
if append_height < 1:
|
||||
return
|
||||
|
||||
pixels_consumed = append_height - SAFETY_MARGIN
|
||||
fractional_remainder = dy - pixels_consumed
|
||||
|
||||
if direction == ScanDirection.DOWN:
|
||||
strip_start = max(0, h - append_height - BLEND_WIDTH)
|
||||
new_strip = frame[strip_start:, :]
|
||||
|
||||
# Match widths
|
||||
if new_strip.shape[1] > mw:
|
||||
new_strip = new_strip[:, :mw]
|
||||
elif new_strip.shape[1] < mw:
|
||||
pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8)
|
||||
new_strip = np.hstack([new_strip, pad])
|
||||
|
||||
# Vertical blend
|
||||
blend_h = min(BLEND_WIDTH, new_strip.shape[0], mh)
|
||||
alpha = np.linspace(1, 0, blend_h, dtype=np.float32)[:, np.newaxis, np.newaxis]
|
||||
|
||||
base_overlap = self.mosaic[-blend_h:].astype(np.float32)
|
||||
strip_overlap = new_strip[:blend_h].astype(np.float32)
|
||||
blended = (base_overlap * alpha + strip_overlap * (1 - alpha)).astype(np.uint8)
|
||||
|
||||
result_h = mh + new_strip.shape[0] - blend_h
|
||||
result = np.zeros((result_h, mw, 3), dtype=np.uint8)
|
||||
result[:mh - blend_h] = self.mosaic[:-blend_h]
|
||||
result[mh - blend_h:mh] = blended
|
||||
result[mh:] = new_strip[blend_h:]
|
||||
self.mosaic = result
|
||||
else:
|
||||
strip_end = min(h, append_height + BLEND_WIDTH)
|
||||
new_strip = frame[:strip_end, :]
|
||||
|
||||
if new_strip.shape[1] > mw:
|
||||
new_strip = new_strip[:, :mw]
|
||||
elif new_strip.shape[1] < mw:
|
||||
pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8)
|
||||
new_strip = np.hstack([new_strip, pad])
|
||||
|
||||
self.mosaic = np.vstack([new_strip, self.mosaic])
|
||||
|
||||
self._displacement_since_append_x = 0.0
|
||||
self._displacement_since_append_y = fractional_remainder
|
||||
|
||||
new_mh, new_mw = self.mosaic.shape[:2]
|
||||
|
||||
# Update state
|
||||
with self._state_lock:
|
||||
self.state.mosaic_width = new_mw
|
||||
self.state.mosaic_height = new_mh
|
||||
self.state.append_count += 1
|
||||
|
||||
# Update reference frame (fractional remainder already set above - don't reset!)
|
||||
self._append_ref_frame = frame.copy()
|
||||
|
||||
if self.on_mosaic_updated:
|
||||
self.on_mosaic_updated()
|
||||
|
||||
def _start_new_row(self, frame: np.ndarray, direction: ScanDirection):
|
||||
"""Start a new row in the mosaic"""
|
||||
with self._mosaic_lock:
|
||||
if self.mosaic is None:
|
||||
self._init_mosaic(frame)
|
||||
return
|
||||
|
||||
h, w = frame.shape[:2]
|
||||
mh, mw = self.mosaic.shape[:2]
|
||||
|
||||
# Calculate overlap
|
||||
overlap_pixels = int(h * self.config.row_overlap)
|
||||
append_height = h - overlap_pixels
|
||||
|
||||
if direction == ScanDirection.DOWN:
|
||||
new_strip = frame[overlap_pixels:, :]
|
||||
|
||||
if new_strip.shape[1] < mw:
|
||||
pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8)
|
||||
new_strip = np.hstack([new_strip, pad])
|
||||
elif new_strip.shape[1] > mw:
|
||||
new_strip = new_strip[:, :mw]
|
||||
|
||||
self.mosaic = np.vstack([self.mosaic, new_strip])
|
||||
else:
|
||||
new_strip = frame[:append_height, :]
|
||||
|
||||
if new_strip.shape[1] < mw:
|
||||
pad = np.zeros((new_strip.shape[0], mw - new_strip.shape[1], 3), dtype=np.uint8)
|
||||
new_strip = np.hstack([new_strip, pad])
|
||||
elif new_strip.shape[1] > mw:
|
||||
new_strip = new_strip[:, :mw]
|
||||
|
||||
self.mosaic = np.vstack([new_strip, self.mosaic])
|
||||
|
||||
# Reset all tracking for new row
|
||||
self._prev_frame = frame.copy()
|
||||
self._append_ref_frame = frame.copy()
|
||||
self._displacement_since_append_x = 0.0
|
||||
self._displacement_since_append_y = 0.0
|
||||
|
||||
with self._state_lock:
|
||||
self.state.mosaic_height = self.mosaic.shape[0]
|
||||
self.state.mosaic_width = self.mosaic.shape[1]
|
||||
|
||||
self.log(f"New row started, mosaic: {self.mosaic.shape[1]}x{self.mosaic.shape[0]}")
|
||||
|
||||
# =========================================================================
|
||||
# Scan Control
|
||||
# =========================================================================
|
||||
|
||||
def start(self) -> bool:
|
||||
"""Start the stitching scan"""
|
||||
if self.running:
|
||||
self.log("Already running")
|
||||
return False
|
||||
|
||||
self.running = True
|
||||
self.paused = False
|
||||
|
||||
with self._state_lock:
|
||||
self.state = StitchState()
|
||||
self.state.is_scanning = True
|
||||
self.state.total_rows = self.config.rows
|
||||
|
||||
with self._mosaic_lock:
|
||||
self.mosaic = None
|
||||
|
||||
self._prev_frame = None
|
||||
self._append_ref_frame = None
|
||||
self._displacement_since_append_x = 0.0
|
||||
self._displacement_since_append_y = 0.0
|
||||
|
||||
self._thread = threading.Thread(target=self._scan_loop, daemon=True)
|
||||
self._thread.start()
|
||||
|
||||
self.log("Stitching scan started")
|
||||
return True
|
||||
|
||||
def stop(self):
|
||||
"""Stop the scan"""
|
||||
self.running = False
|
||||
self.paused = False
|
||||
self.motion.stop_all()
|
||||
|
||||
with self._state_lock:
|
||||
self.state.is_scanning = False
|
||||
|
||||
self.log("Scan stopped")
|
||||
|
||||
def pause(self):
|
||||
"""Pause the scan"""
|
||||
if self.running and not self.paused:
|
||||
self.paused = True
|
||||
self.motion.stop_all()
|
||||
self.log("Scan paused")
|
||||
|
||||
def resume(self):
|
||||
"""Resume the scan"""
|
||||
if self.running and self.paused:
|
||||
self.paused = False
|
||||
self.log("Scan resumed")
|
||||
|
||||
# =========================================================================
|
||||
# Main Scan Loop
|
||||
# =========================================================================
|
||||
|
||||
def _scan_loop(self):
|
||||
"""Main scanning loop"""
|
||||
try:
|
||||
self.log("Starting scan loop")
|
||||
|
||||
self.motion.set_speed(self.config.scan_speed_index)
|
||||
time.sleep(0.1)
|
||||
|
||||
frame = self._capture_frame()
|
||||
self._init_mosaic(frame)
|
||||
|
||||
for row in range(self.config.rows):
|
||||
if not self.running:
|
||||
break
|
||||
|
||||
with self._state_lock:
|
||||
self.state.current_row = row
|
||||
|
||||
self.log(f"=== Row {row + 1}/{self.config.rows} ===")
|
||||
|
||||
# Serpentine pattern
|
||||
if row % 2 == 0:
|
||||
h_direction = ScanDirection.RIGHT
|
||||
else:
|
||||
h_direction = ScanDirection.LEFT
|
||||
|
||||
self._scan_horizontal(h_direction)
|
||||
|
||||
if not self.running:
|
||||
break
|
||||
|
||||
if row < self.config.rows - 1:
|
||||
self._move_to_next_row()
|
||||
|
||||
self.log("Scan complete!")
|
||||
|
||||
except Exception as e:
|
||||
self.log(f"Scan error: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
self.running = False
|
||||
self.motion.stop_all()
|
||||
with self._state_lock:
|
||||
self.state.is_scanning = False
|
||||
|
||||
def _scan_horizontal(self, direction: ScanDirection):
|
||||
"""Scan horizontally with fixed displacement tracking"""
|
||||
self.log(f"Scanning {direction.value}...")
|
||||
|
||||
with self._state_lock:
|
||||
self.state.direction = direction.value
|
||||
|
||||
frame = self._capture_frame()
|
||||
h, w = frame.shape[:2]
|
||||
threshold_pixels = w * self.config.displacement_threshold
|
||||
|
||||
# Initialize tracking
|
||||
self._prev_frame = frame.copy()
|
||||
self._append_ref_frame = frame.copy()
|
||||
self._displacement_since_append_x = 0.0
|
||||
self._displacement_since_append_y = 0.0
|
||||
|
||||
start_time = time.time()
|
||||
no_movement_count = 0
|
||||
max_no_movement = 50
|
||||
|
||||
while self.running and not self.paused:
|
||||
if time.time() - start_time > self.config.max_scan_time:
|
||||
self.log("Scan timeout")
|
||||
break
|
||||
|
||||
# Pulse the motor
|
||||
if direction == ScanDirection.RIGHT:
|
||||
self.motion.send_command('E')
|
||||
else:
|
||||
self.motion.send_command('W')
|
||||
|
||||
time.sleep(self.config.movement_interval)
|
||||
|
||||
if direction == ScanDirection.RIGHT:
|
||||
self.motion.send_command('e')
|
||||
else:
|
||||
self.motion.send_command('w')
|
||||
|
||||
# Wait for settle
|
||||
time.sleep(self.config.frame_interval)
|
||||
|
||||
# Capture and measure
|
||||
curr_frame = self._capture_frame()
|
||||
dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame)
|
||||
|
||||
# Accumulate displacement SINCE LAST APPEND
|
||||
self._displacement_since_append_x += dx
|
||||
self._displacement_since_append_y += dy
|
||||
|
||||
with self._state_lock:
|
||||
self.state.cumulative_x = self._displacement_since_append_x
|
||||
self.state.cumulative_y = self._displacement_since_append_y
|
||||
self.state.last_displacement = (dx, dy)
|
||||
self.state.frame_count += 1
|
||||
|
||||
# Check for no movement
|
||||
if abs(dx) < 1.0 and abs(dy) < 1.0:
|
||||
no_movement_count += 1
|
||||
if no_movement_count >= max_no_movement:
|
||||
self.log(f"Edge detected (no movement for {no_movement_count} frames)")
|
||||
break
|
||||
else:
|
||||
no_movement_count = 0
|
||||
|
||||
# Check threshold and append
|
||||
if abs(self._displacement_since_append_x) >= threshold_pixels:
|
||||
self._append_to_mosaic_fixed(curr_frame, direction)
|
||||
self.log(f"Appended {abs(self._displacement_since_append_x):.1f}px strip, "
|
||||
f"mosaic: {self.state.mosaic_width}x{self.state.mosaic_height}")
|
||||
|
||||
# Update prev_frame for next displacement calculation
|
||||
self._prev_frame = curr_frame.copy()
|
||||
|
||||
if self.on_progress:
|
||||
self.on_progress(self.state.append_count, 0)
|
||||
|
||||
# Stop
|
||||
if direction == ScanDirection.RIGHT:
|
||||
self.motion.send_command('e')
|
||||
else:
|
||||
self.motion.send_command('w')
|
||||
|
||||
time.sleep(self.config.settle_time)
|
||||
|
||||
def _move_to_next_row(self):
|
||||
"""Move down to next row"""
|
||||
self.log("Moving to next row...")
|
||||
|
||||
frame = self._capture_frame()
|
||||
h, w = frame.shape[:2]
|
||||
move_distance = h * (1 - self.config.row_overlap)
|
||||
|
||||
with self._state_lock:
|
||||
self.state.direction = 'down'
|
||||
|
||||
self.motion.send_command('N')
|
||||
|
||||
self._prev_frame = frame.copy()
|
||||
cumulative_y = 0.0
|
||||
|
||||
while self.running:
|
||||
time.sleep(self.config.frame_interval)
|
||||
|
||||
curr_frame = self._capture_frame()
|
||||
dx, dy = self._detect_displacement_robust(self._prev_frame, curr_frame)
|
||||
|
||||
cumulative_y += dy
|
||||
self._prev_frame = curr_frame.copy()
|
||||
|
||||
with self._state_lock:
|
||||
self.state.cumulative_y = cumulative_y
|
||||
|
||||
if abs(cumulative_y) >= move_distance:
|
||||
break
|
||||
|
||||
if abs(cumulative_y) < 5 and self.state.frame_count > 50:
|
||||
self.log("Warning: Minimal Y movement")
|
||||
break
|
||||
|
||||
self.motion.send_command('n')
|
||||
time.sleep(self.config.settle_time)
|
||||
|
||||
frame = self._capture_frame()
|
||||
self._start_new_row(frame, ScanDirection.DOWN)
|
||||
|
||||
def _capture_frame(self) -> np.ndarray:
|
||||
"""Capture and rotate frame"""
|
||||
frame = self.camera.capture_frame()
|
||||
frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
|
||||
return frame
|
||||
|
||||
# =========================================================================
|
||||
# Getters
|
||||
# =========================================================================
|
||||
|
||||
def get_state(self) -> StitchState:
|
||||
"""Get current scan state"""
|
||||
with self._state_lock:
|
||||
return StitchState(
|
||||
is_scanning=self.state.is_scanning,
|
||||
direction=self.state.direction,
|
||||
cumulative_x=self.state.cumulative_x,
|
||||
cumulative_y=self.state.cumulative_y,
|
||||
last_displacement=self.state.last_displacement,
|
||||
current_row=self.state.current_row,
|
||||
total_rows=self.state.total_rows,
|
||||
mosaic_width=self.state.mosaic_width,
|
||||
mosaic_height=self.state.mosaic_height,
|
||||
frame_count=self.state.frame_count,
|
||||
append_count=self.state.append_count
|
||||
)
|
||||
|
||||
def get_mosaic(self) -> Optional[np.ndarray]:
|
||||
"""Get current mosaic (full resolution)"""
|
||||
with self._mosaic_lock:
|
||||
if self.mosaic is not None:
|
||||
return self.mosaic.copy()
|
||||
return None
|
||||
|
||||
def get_mosaic_preview(self, max_size: int = 600) -> Optional[np.ndarray]:
|
||||
"""Get scaled mosaic for preview"""
|
||||
with self._mosaic_lock:
|
||||
if self.mosaic is None:
|
||||
return None
|
||||
|
||||
h, w = self.mosaic.shape[:2]
|
||||
scale = min(max_size / w, max_size / h, 1.0)
|
||||
|
||||
if scale < 1.0:
|
||||
new_w = int(w * scale)
|
||||
new_h = int(h * scale)
|
||||
return cv2.resize(self.mosaic, (new_w, new_h))
|
||||
|
||||
return self.mosaic.copy()
|
||||
|
||||
def save_mosaic(self, filepath: str) -> bool:
|
||||
"""Save mosaic to file"""
|
||||
with self._mosaic_lock:
|
||||
if self.mosaic is None:
|
||||
return False
|
||||
|
||||
cv2.imwrite(filepath, self.mosaic)
|
||||
self.log(f"Saved mosaic to {filepath}")
|
||||
return True
|
||||
|
||||
# =========================================================================
|
||||
# Testing
|
||||
# =========================================================================
|
||||
|
||||
def test_displacement(self, num_frames: int = 10) -> dict:
|
||||
"""Test displacement detection"""
|
||||
results = {
|
||||
'frames': [],
|
||||
'total_dx': 0.0,
|
||||
'total_dy': 0.0
|
||||
}
|
||||
|
||||
prev_frame = self._capture_frame()
|
||||
|
||||
for i in range(num_frames):
|
||||
time.sleep(0.1)
|
||||
curr_frame = self._capture_frame()
|
||||
|
||||
dx, dy = self._detect_displacement(prev_frame, curr_frame)
|
||||
|
||||
results['frames'].append({'frame': i, 'dx': dx, 'dy': dy})
|
||||
results['total_dx'] += dx
|
||||
results['total_dy'] += dy
|
||||
|
||||
prev_frame = curr_frame
|
||||
|
||||
return results
|
||||
Loading…
Reference in a new issue