Autoscope-Controller/src/autofocus.py
2025-12-29 15:46:12 -08:00

367 lines
No EOL
12 KiB
Python

import time
import threading
from vision import calculate_focus_score_sobel
class AutofocusController:
"""Manages autofocus operations"""
# Default timing settings
DEFAULT_SETTINGS = {
'sweep_move_time': 0.5,
'sweep_settle_time': 0.05,
'sweep_samples': 10,
'sweep_steps': 30,
'fine_move_time': 0.15,
'fine_settle_time': 0.1,
'fine_samples': 10,
'fine_max_no_improvement': 5,
}
def __init__(self, camera, motion_controller, on_log=None, on_focus_update=None):
"""
Args:
camera: Camera instance for capturing frames
motion_controller: MotionController for Z axis control
on_log: Callback(message) for logging
on_focus_update: Callback(score) for updating focus display
"""
self.camera = camera
self.motion = motion_controller
self.on_log = on_log
self.on_focus_update = on_focus_update
self.running = False
self.settings = self.DEFAULT_SETTINGS.copy()
self._thread = None
def log(self, message):
"""Log a message"""
if self.on_log:
self.on_log(message)
def update_focus_display(self, score):
"""Update focus display"""
if self.on_focus_update:
self.on_focus_update(score)
# === Focus Measurement ===
def get_focus_score(self):
"""Get current focus score from camera"""
try:
frame = self.camera.capture_frame()
return calculate_focus_score_sobel(frame)
except Exception as e:
self.log(f"Focus score error: {e}")
return 0
def get_averaged_focus(self, samples=5, delay_between=0.05):
"""
Get averaged focus score from multiple samples.
Removes outliers for more stable readings.
"""
scores = []
for _ in range(samples):
scores.append(self.get_focus_score())
time.sleep(delay_between)
# Remove outliers (top and bottom 20%)
scores.sort()
trim = len(scores) // 5
if trim > 0:
scores = scores[trim:-trim]
return sum(scores) / len(scores) if scores else 0
# === Autofocus Control ===
def start(self, speed_range=(1, 5), coarse=False, fine=False):
"""Start autofocus in background thread"""
if self.running:
self.log("Autofocus already running")
return False
# Determine speed range based on mode
if coarse:
speed_range = (4, 5)
elif fine:
speed_range = (1, 3)
self.running = True
self._thread = threading.Thread(
target=self._autofocus_routine,
args=(speed_range,),
daemon=True
)
self._thread.start()
return True
def stop(self):
"""Stop autofocus routine"""
self.running = False
self.motion.stop_z()
self.log("Autofocus stopped")
def is_running(self):
"""Check if autofocus is currently running"""
return self.running
# === Autofocus Algorithm ===
def _autofocus_routine(self, speed_range):
"""
Autofocus using sweep search + hill climbing.
Phase 1: Sweep across range to find approximate peak
Phase 2: Fine tune from best position found
"""
self.log(f"Starting autofocus (speed range: {speed_range})")
min_speed_idx, max_speed_idx = speed_range
s = self.settings # Shorthand
try:
# Phase 1: Sweep search
best_step, best_score = self._sweep_search(
min_speed_idx, max_speed_idx, s
)
if not self.running:
return
# Move to best position found
self._move_to_best_position(best_step, s)
if not self.running:
return
# Phase 2: Fine tuning
final_score = self._fine_tune(min_speed_idx, s)
self.log(f"Autofocus complete. Final: {final_score:.1f}")
except Exception as e:
self.log(f"Autofocus error: {e}")
finally:
self.running = False
def _sweep_search(self, min_speed_idx, max_speed_idx, s):
"""Phase 1: Sweep to find approximate best position"""
self.log("Phase 1: Sweep search")
# Use medium speed for sweep
sweep_speed = (min_speed_idx + max_speed_idx) // 2
self.motion.set_speed(sweep_speed)
time.sleep(0.1)
# Record starting position
time.sleep(s['sweep_settle_time'])
start_score = self.get_averaged_focus(samples=s['sweep_samples'])
self.update_focus_display(start_score)
sweep_data = [(0, start_score)]
self.log(f"Start position: {start_score:.1f}")
if not self.running:
return 0, start_score
# Sweep DOWN first (away from slide to avoid contact)
self.log(f"Sweeping down {s['sweep_steps']} steps...")
for i in range(1, s['sweep_steps'] + 1):
if not self.running:
return 0, start_score
self.motion.move_z_down()
time.sleep(s['sweep_move_time'])
self.motion.stop_z()
time.sleep(s['sweep_settle_time'])
score = self.get_averaged_focus(samples=s['sweep_samples'])
sweep_data.append((-i, score))
self.update_focus_display(score)
if i % 5 == 0:
self.log(f" Step -{i}: {score:.1f}")
if not self.running:
return 0, start_score
# Return to start
self.log("Returning to start...")
for _ in range(s['sweep_steps']):
if not self.running:
return 0, start_score
self.motion.move_z_up()
time.sleep(s['sweep_move_time'])
self.motion.stop_z()
time.sleep(0.1)
time.sleep(s['sweep_settle_time'])
if not self.running:
return 0, start_score
# Sweep UP
self.log(f"Sweeping up {s['sweep_steps']} steps...")
for i in range(1, s['sweep_steps'] + 1):
if not self.running:
return 0, start_score
self.motion.move_z_up()
time.sleep(s['sweep_move_time'])
self.motion.stop_z()
time.sleep(s['sweep_settle_time'])
score = self.get_averaged_focus(samples=s['sweep_samples'])
sweep_data.append((i, score))
self.update_focus_display(score)
if i % 5 == 0:
self.log(f" Step +{i}: {score:.1f}")
# Find best position
best_step, best_score = max(sweep_data, key=lambda x: x[1])
self.log(f"Best found at step {best_step}: {best_score:.1f}")
# Log sweep curve
sorted_data = sorted(sweep_data, key=lambda x: x[0])
curve_str = " ".join([f"{d[1]:.0f}" for d in sorted_data])
self.log(f"Sweep curve: {curve_str}")
return best_step, best_score
def _move_to_best_position(self, best_step, s):
"""Move from current position (+SWEEP_STEPS) to best_step"""
steps_to_move = s['sweep_steps'] - best_step
if steps_to_move > 0:
self.log(f"Moving down {steps_to_move} steps to best position")
move_func = self.motion.move_z_down
else:
self.log(f"Moving up {abs(steps_to_move)} steps to best position")
move_func = self.motion.move_z_up
steps_to_move = abs(steps_to_move)
for _ in range(steps_to_move):
if not self.running:
return
move_func()
time.sleep(s['sweep_move_time'])
self.motion.stop_z()
time.sleep(0.1)
time.sleep(s['sweep_settle_time'])
current_score = self.get_averaged_focus(samples=s['sweep_samples'])
self.log(f"At best position: {current_score:.1f}")
def _fine_tune(self, min_speed_idx, s):
"""Phase 2: Fine hill-climbing from best position"""
self.log("Phase 2: Fine tuning")
self.motion.set_speed(min_speed_idx)
time.sleep(0.1)
best_score = self.get_averaged_focus(samples=s['fine_samples'])
# Determine fine direction by testing both
fine_direction = self._determine_fine_direction(best_score, s)
if not self.running:
return best_score
# Fine search
best_score, best_position_offset = self._fine_search(
fine_direction, best_score, s
)
if not self.running:
return best_score
# Return to best position
if best_position_offset > 0:
self._return_to_best(fine_direction, best_position_offset, s)
# Final reading
time.sleep(s['fine_settle_time'])
final_score = self.get_averaged_focus(samples=s['fine_samples'])
self.update_focus_display(final_score)
return final_score
def _determine_fine_direction(self, best_score, s):
"""Test both directions to find which improves focus"""
# Try DOWN first
self.motion.move_z_down()
time.sleep(s['fine_move_time'])
self.motion.stop_z()
time.sleep(s['fine_settle_time'])
down_score = self.get_averaged_focus(samples=s['fine_samples'])
if down_score > best_score:
self.log(f"Fine direction: DOWN ({down_score:.1f})")
return 'down'
# Go back and try UP
self.motion.move_z_up()
time.sleep(s['fine_move_time'])
self.motion.stop_z()
time.sleep(s['fine_settle_time'])
self.motion.move_z_up()
time.sleep(s['fine_move_time'])
self.motion.stop_z()
time.sleep(s['fine_settle_time'])
up_score = self.get_averaged_focus(samples=s['fine_samples'])
if up_score > best_score:
self.log(f"Fine direction: UP ({up_score:.1f})")
return 'up'
# Already at peak
self.log("Already at peak, minor adjustment only")
self.motion.move_z_down()
time.sleep(s['fine_move_time'])
self.motion.stop_z()
time.sleep(s['fine_settle_time'])
return 'up'
def _fine_search(self, direction, best_score, s):
"""Search in given direction until no improvement"""
move_func = self.motion.move_z_up if direction == 'up' else self.motion.move_z_down
no_improvement_count = 0
best_position_offset = 0
while self.running and no_improvement_count < s['fine_max_no_improvement']:
move_func()
time.sleep(s['fine_move_time'])
self.motion.stop_z()
time.sleep(s['fine_settle_time'])
current_score = self.get_averaged_focus(samples=s['fine_samples'])
self.update_focus_display(current_score)
if current_score > best_score:
best_score = current_score
no_improvement_count = 0
best_position_offset = 0
self.log(f"Fine better: {current_score:.1f}")
else:
no_improvement_count += 1
best_position_offset += 1
return best_score, best_position_offset
def _return_to_best(self, direction, steps, s):
"""Return to best position by reversing direction"""
self.log(f"Returning {steps} steps")
reverse_func = self.motion.move_z_down if direction == 'up' else self.motion.move_z_up
for _ in range(steps):
reverse_func()
time.sleep(s['fine_move_time'])
self.motion.stop_z()
time.sleep(0.1)