Refactored GUI
This commit is contained in:
parent
730643ca86
commit
8bed3511a9
3 changed files with 685 additions and 539 deletions
371
src/autofocus.py
Normal file
371
src/autofocus.py
Normal file
|
|
@ -0,0 +1,371 @@
|
|||
"""
|
||||
Autofocus module - handles focus detection and autofocus routines.
|
||||
"""
|
||||
|
||||
import time
|
||||
import threading
|
||||
from vision import calculate_focus_score_sobel
|
||||
|
||||
|
||||
class AutofocusController:
|
||||
"""Manages autofocus operations"""
|
||||
|
||||
# Default timing settings
|
||||
DEFAULT_SETTINGS = {
|
||||
'sweep_move_time': 2.0,
|
||||
'sweep_settle_time': 0.25,
|
||||
'sweep_samples': 10,
|
||||
'sweep_steps': 15,
|
||||
'fine_move_time': 0.15,
|
||||
'fine_settle_time': 0.25,
|
||||
'fine_samples': 10,
|
||||
'fine_max_no_improvement': 5,
|
||||
}
|
||||
|
||||
def __init__(self, camera, motion_controller, on_log=None, on_focus_update=None):
|
||||
"""
|
||||
Args:
|
||||
camera: Camera instance for capturing frames
|
||||
motion_controller: MotionController for Z axis control
|
||||
on_log: Callback(message) for logging
|
||||
on_focus_update: Callback(score) for updating focus display
|
||||
"""
|
||||
self.camera = camera
|
||||
self.motion = motion_controller
|
||||
self.on_log = on_log
|
||||
self.on_focus_update = on_focus_update
|
||||
|
||||
self.running = False
|
||||
self.settings = self.DEFAULT_SETTINGS.copy()
|
||||
self._thread = None
|
||||
|
||||
def log(self, message):
|
||||
"""Log a message"""
|
||||
if self.on_log:
|
||||
self.on_log(message)
|
||||
|
||||
def update_focus_display(self, score):
|
||||
"""Update focus display"""
|
||||
if self.on_focus_update:
|
||||
self.on_focus_update(score)
|
||||
|
||||
# === Focus Measurement ===
|
||||
|
||||
def get_focus_score(self):
|
||||
"""Get current focus score from camera"""
|
||||
try:
|
||||
frame = self.camera.capture_frame()
|
||||
return calculate_focus_score_sobel(frame)
|
||||
except Exception as e:
|
||||
self.log(f"Focus score error: {e}")
|
||||
return 0
|
||||
|
||||
def get_averaged_focus(self, samples=5, delay_between=0.05):
|
||||
"""
|
||||
Get averaged focus score from multiple samples.
|
||||
Removes outliers for more stable readings.
|
||||
"""
|
||||
scores = []
|
||||
for _ in range(samples):
|
||||
scores.append(self.get_focus_score())
|
||||
time.sleep(delay_between)
|
||||
|
||||
# Remove outliers (top and bottom 20%)
|
||||
scores.sort()
|
||||
trim = len(scores) // 5
|
||||
if trim > 0:
|
||||
scores = scores[trim:-trim]
|
||||
|
||||
return sum(scores) / len(scores) if scores else 0
|
||||
|
||||
# === Autofocus Control ===
|
||||
|
||||
def start(self, speed_range=(1, 5), coarse=False, fine=False):
|
||||
"""Start autofocus in background thread"""
|
||||
if self.running:
|
||||
self.log("Autofocus already running")
|
||||
return False
|
||||
|
||||
# Determine speed range based on mode
|
||||
if coarse:
|
||||
speed_range = (4, 5)
|
||||
elif fine:
|
||||
speed_range = (1, 3)
|
||||
|
||||
self.running = True
|
||||
self._thread = threading.Thread(
|
||||
target=self._autofocus_routine,
|
||||
args=(speed_range,),
|
||||
daemon=True
|
||||
)
|
||||
self._thread.start()
|
||||
return True
|
||||
|
||||
def stop(self):
|
||||
"""Stop autofocus routine"""
|
||||
self.running = False
|
||||
self.motion.stop_z()
|
||||
self.log("Autofocus stopped")
|
||||
|
||||
def is_running(self):
|
||||
"""Check if autofocus is currently running"""
|
||||
return self.running
|
||||
|
||||
# === Autofocus Algorithm ===
|
||||
|
||||
def _autofocus_routine(self, speed_range):
|
||||
"""
|
||||
Autofocus using sweep search + hill climbing.
|
||||
Phase 1: Sweep across range to find approximate peak
|
||||
Phase 2: Fine tune from best position found
|
||||
"""
|
||||
self.log(f"Starting autofocus (speed range: {speed_range})")
|
||||
|
||||
min_speed_idx, max_speed_idx = speed_range
|
||||
s = self.settings # Shorthand
|
||||
|
||||
try:
|
||||
# Phase 1: Sweep search
|
||||
best_step, best_score = self._sweep_search(
|
||||
min_speed_idx, max_speed_idx, s
|
||||
)
|
||||
|
||||
if not self.running:
|
||||
return
|
||||
|
||||
# Move to best position found
|
||||
self._move_to_best_position(best_step, s)
|
||||
|
||||
if not self.running:
|
||||
return
|
||||
|
||||
# Phase 2: Fine tuning
|
||||
final_score = self._fine_tune(min_speed_idx, s)
|
||||
|
||||
self.log(f"Autofocus complete. Final: {final_score:.1f}")
|
||||
|
||||
except Exception as e:
|
||||
self.log(f"Autofocus error: {e}")
|
||||
finally:
|
||||
self.running = False
|
||||
|
||||
def _sweep_search(self, min_speed_idx, max_speed_idx, s):
|
||||
"""Phase 1: Sweep to find approximate best position"""
|
||||
self.log("Phase 1: Sweep search")
|
||||
|
||||
# Use medium speed for sweep
|
||||
sweep_speed = (min_speed_idx + max_speed_idx) // 2
|
||||
self.motion.set_speed(sweep_speed)
|
||||
time.sleep(0.1)
|
||||
|
||||
# Record starting position
|
||||
time.sleep(s['sweep_settle_time'])
|
||||
start_score = self.get_averaged_focus(samples=s['sweep_samples'])
|
||||
self.update_focus_display(start_score)
|
||||
|
||||
sweep_data = [(0, start_score)]
|
||||
self.log(f"Start position: {start_score:.1f}")
|
||||
|
||||
if not self.running:
|
||||
return 0, start_score
|
||||
|
||||
# Sweep DOWN first (away from slide to avoid contact)
|
||||
self.log(f"Sweeping down {s['sweep_steps']} steps...")
|
||||
for i in range(1, s['sweep_steps'] + 1):
|
||||
if not self.running:
|
||||
return 0, start_score
|
||||
|
||||
self.motion.move_z_down()
|
||||
time.sleep(s['sweep_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(s['sweep_settle_time'])
|
||||
|
||||
score = self.get_averaged_focus(samples=s['sweep_samples'])
|
||||
sweep_data.append((-i, score))
|
||||
self.update_focus_display(score)
|
||||
|
||||
if i % 5 == 0:
|
||||
self.log(f" Step -{i}: {score:.1f}")
|
||||
|
||||
if not self.running:
|
||||
return 0, start_score
|
||||
|
||||
# Return to start
|
||||
self.log("Returning to start...")
|
||||
for _ in range(s['sweep_steps']):
|
||||
if not self.running:
|
||||
return 0, start_score
|
||||
self.motion.move_z_up()
|
||||
time.sleep(s['sweep_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(0.1)
|
||||
|
||||
time.sleep(s['sweep_settle_time'])
|
||||
|
||||
if not self.running:
|
||||
return 0, start_score
|
||||
|
||||
# Sweep UP
|
||||
self.log(f"Sweeping up {s['sweep_steps']} steps...")
|
||||
for i in range(1, s['sweep_steps'] + 1):
|
||||
if not self.running:
|
||||
return 0, start_score
|
||||
|
||||
self.motion.move_z_up()
|
||||
time.sleep(s['sweep_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(s['sweep_settle_time'])
|
||||
|
||||
score = self.get_averaged_focus(samples=s['sweep_samples'])
|
||||
sweep_data.append((i, score))
|
||||
self.update_focus_display(score)
|
||||
|
||||
if i % 5 == 0:
|
||||
self.log(f" Step +{i}: {score:.1f}")
|
||||
|
||||
# Find best position
|
||||
best_step, best_score = max(sweep_data, key=lambda x: x[1])
|
||||
self.log(f"Best found at step {best_step}: {best_score:.1f}")
|
||||
|
||||
# Log sweep curve
|
||||
sorted_data = sorted(sweep_data, key=lambda x: x[0])
|
||||
curve_str = " ".join([f"{d[1]:.0f}" for d in sorted_data])
|
||||
self.log(f"Sweep curve: {curve_str}")
|
||||
|
||||
return best_step, best_score
|
||||
|
||||
def _move_to_best_position(self, best_step, s):
|
||||
"""Move from current position (+SWEEP_STEPS) to best_step"""
|
||||
steps_to_move = s['sweep_steps'] - best_step
|
||||
|
||||
if steps_to_move > 0:
|
||||
self.log(f"Moving down {steps_to_move} steps to best position")
|
||||
move_func = self.motion.move_z_down
|
||||
else:
|
||||
self.log(f"Moving up {abs(steps_to_move)} steps to best position")
|
||||
move_func = self.motion.move_z_up
|
||||
steps_to_move = abs(steps_to_move)
|
||||
|
||||
for _ in range(steps_to_move):
|
||||
if not self.running:
|
||||
return
|
||||
move_func()
|
||||
time.sleep(s['sweep_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(0.1)
|
||||
|
||||
time.sleep(s['sweep_settle_time'])
|
||||
current_score = self.get_averaged_focus(samples=s['sweep_samples'])
|
||||
self.log(f"At best position: {current_score:.1f}")
|
||||
|
||||
def _fine_tune(self, min_speed_idx, s):
|
||||
"""Phase 2: Fine hill-climbing from best position"""
|
||||
self.log("Phase 2: Fine tuning")
|
||||
|
||||
self.motion.set_speed(min_speed_idx)
|
||||
time.sleep(0.1)
|
||||
|
||||
best_score = self.get_averaged_focus(samples=s['fine_samples'])
|
||||
|
||||
# Determine fine direction by testing both
|
||||
fine_direction = self._determine_fine_direction(best_score, s)
|
||||
|
||||
if not self.running:
|
||||
return best_score
|
||||
|
||||
# Fine search
|
||||
best_score, best_position_offset = self._fine_search(
|
||||
fine_direction, best_score, s
|
||||
)
|
||||
|
||||
if not self.running:
|
||||
return best_score
|
||||
|
||||
# Return to best position
|
||||
if best_position_offset > 0:
|
||||
self._return_to_best(fine_direction, best_position_offset, s)
|
||||
|
||||
# Final reading
|
||||
time.sleep(s['fine_settle_time'])
|
||||
final_score = self.get_averaged_focus(samples=s['fine_samples'])
|
||||
self.update_focus_display(final_score)
|
||||
|
||||
return final_score
|
||||
|
||||
def _determine_fine_direction(self, best_score, s):
|
||||
"""Test both directions to find which improves focus"""
|
||||
# Try DOWN first
|
||||
self.motion.move_z_down()
|
||||
time.sleep(s['fine_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(s['fine_settle_time'])
|
||||
|
||||
down_score = self.get_averaged_focus(samples=s['fine_samples'])
|
||||
|
||||
if down_score > best_score:
|
||||
self.log(f"Fine direction: DOWN ({down_score:.1f})")
|
||||
return 'down'
|
||||
|
||||
# Go back and try UP
|
||||
self.motion.move_z_up()
|
||||
time.sleep(s['fine_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(s['fine_settle_time'])
|
||||
|
||||
self.motion.move_z_up()
|
||||
time.sleep(s['fine_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(s['fine_settle_time'])
|
||||
|
||||
up_score = self.get_averaged_focus(samples=s['fine_samples'])
|
||||
|
||||
if up_score > best_score:
|
||||
self.log(f"Fine direction: UP ({up_score:.1f})")
|
||||
return 'up'
|
||||
|
||||
# Already at peak
|
||||
self.log("Already at peak, minor adjustment only")
|
||||
self.motion.move_z_down()
|
||||
time.sleep(s['fine_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(s['fine_settle_time'])
|
||||
|
||||
return 'up'
|
||||
|
||||
def _fine_search(self, direction, best_score, s):
|
||||
"""Search in given direction until no improvement"""
|
||||
move_func = self.motion.move_z_up if direction == 'up' else self.motion.move_z_down
|
||||
|
||||
no_improvement_count = 0
|
||||
best_position_offset = 0
|
||||
|
||||
while self.running and no_improvement_count < s['fine_max_no_improvement']:
|
||||
move_func()
|
||||
time.sleep(s['fine_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(s['fine_settle_time'])
|
||||
|
||||
current_score = self.get_averaged_focus(samples=s['fine_samples'])
|
||||
self.update_focus_display(current_score)
|
||||
|
||||
if current_score > best_score:
|
||||
best_score = current_score
|
||||
no_improvement_count = 0
|
||||
best_position_offset = 0
|
||||
self.log(f"Fine better: {current_score:.1f}")
|
||||
else:
|
||||
no_improvement_count += 1
|
||||
best_position_offset += 1
|
||||
|
||||
return best_score, best_position_offset
|
||||
|
||||
def _return_to_best(self, direction, steps, s):
|
||||
"""Return to best position by reversing direction"""
|
||||
self.log(f"Returning {steps} steps")
|
||||
reverse_func = self.motion.move_z_down if direction == 'up' else self.motion.move_z_up
|
||||
|
||||
for _ in range(steps):
|
||||
reverse_func()
|
||||
time.sleep(s['fine_move_time'])
|
||||
self.motion.stop_z()
|
||||
time.sleep(0.1)
|
||||
667
src/gui.py
667
src/gui.py
|
|
@ -1,3 +1,8 @@
|
|||
"""
|
||||
AutoScope GUI - Pure UI layer.
|
||||
Delegates logic to controllers.
|
||||
"""
|
||||
|
||||
import tkinter as tk
|
||||
from tkinter import ttk, scrolledtext
|
||||
from PIL import Image, ImageTk
|
||||
|
|
@ -5,18 +10,25 @@ import cv2
|
|||
import threading
|
||||
import queue
|
||||
|
||||
from motion_controller import MotionController
|
||||
from autofocus import AutofocusController
|
||||
|
||||
|
||||
class AppGUI:
|
||||
def __init__(self, camera, arduino):
|
||||
self.camera = camera
|
||||
self.arduino = arduino
|
||||
self.running = True
|
||||
self._updating_slider = False
|
||||
self.autofocus_running = False
|
||||
self.autofocus_speed_range = (1, 6) # Speed indices 1-6 (values: 2, 5, 10, 30, 50, 70)
|
||||
|
||||
# Track movement state for toggle buttons
|
||||
self.axis_direction = {'X': 1, 'Y': 1, 'Z': 1} # 1 = positive, -1 = negative
|
||||
self.axis_moving = {'X': False, 'Y': False, 'Z': False}
|
||||
# Initialize controllers
|
||||
self.motion = MotionController(arduino, on_command_sent=self._on_command_sent)
|
||||
self.autofocus = AutofocusController(
|
||||
camera,
|
||||
self.motion,
|
||||
on_log=self.log_message,
|
||||
on_focus_update=self._update_focus_display_threadsafe
|
||||
)
|
||||
|
||||
# Queue for thread-safe serial log updates
|
||||
self.serial_queue = queue.Queue()
|
||||
|
|
@ -34,8 +46,17 @@ class AppGUI:
|
|||
self.serial_thread = threading.Thread(target=self._serial_reader, daemon=True)
|
||||
self.serial_thread.start()
|
||||
|
||||
def _on_command_sent(self, cmd):
|
||||
"""Callback when motion controller sends a command"""
|
||||
self.log_message(f"> {cmd}")
|
||||
|
||||
def _update_focus_display_threadsafe(self, score):
|
||||
"""Thread-safe focus display update"""
|
||||
self.root.after(0, lambda: self.focus_score_label.config(text=f"Focus: {score:.1f}"))
|
||||
|
||||
# === UI Building ===
|
||||
|
||||
def _build_ui(self):
|
||||
# Main container
|
||||
main_frame = ttk.Frame(self.root)
|
||||
main_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
||||
|
||||
|
|
@ -48,608 +69,236 @@ class AppGUI:
|
|||
right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)
|
||||
right_frame.pack_propagate(False)
|
||||
|
||||
# === EMERGENCY STOP (prominent at top) ===
|
||||
emergency_frame = ttk.Frame(right_frame)
|
||||
emergency_frame.pack(fill=tk.X, pady=(0, 10))
|
||||
self._build_emergency_stop(right_frame)
|
||||
self._build_speed_controls(right_frame)
|
||||
self._build_fine_speed_controls(right_frame)
|
||||
self._build_movement_controls(right_frame)
|
||||
self._build_autofocus_controls(right_frame)
|
||||
self._build_status_controls(right_frame)
|
||||
self._build_mode_controls(right_frame)
|
||||
self._build_serial_log(right_frame)
|
||||
self._build_command_entry(right_frame)
|
||||
|
||||
def _build_emergency_stop(self, parent):
|
||||
frame = ttk.Frame(parent)
|
||||
frame.pack(fill=tk.X, pady=(0, 10))
|
||||
|
||||
self.emergency_btn = tk.Button(
|
||||
emergency_frame,
|
||||
frame,
|
||||
text="⚠ EMERGENCY STOP ⚠",
|
||||
command=lambda: self.send_command("!"),
|
||||
command=self.motion.stop_all,
|
||||
bg='red', fg='white', font=('Arial', 12, 'bold'),
|
||||
height=2
|
||||
)
|
||||
self.emergency_btn.pack(fill=tk.X)
|
||||
|
||||
# === SPEED CONTROL ===
|
||||
speed_frame = ttk.LabelFrame(right_frame, text="Speed")
|
||||
speed_frame.pack(fill=tk.X, pady=(0, 10))
|
||||
def _build_speed_controls(self, parent):
|
||||
frame = ttk.LabelFrame(parent, text="Speed")
|
||||
frame.pack(fill=tk.X, pady=(0, 10))
|
||||
|
||||
speed_btn_frame = ttk.Frame(speed_frame)
|
||||
speed_btn_frame.pack(fill=tk.X, padx=5, pady=5)
|
||||
btn_frame = ttk.Frame(frame)
|
||||
btn_frame.pack(fill=tk.X, padx=5, pady=5)
|
||||
|
||||
self.speed_var = tk.StringVar(value="Medium")
|
||||
|
||||
ttk.Radiobutton(speed_btn_frame, text="Slow", variable=self.speed_var,
|
||||
value="Slow", command=lambda: self.send_command("L")).pack(side=tk.LEFT, padx=5)
|
||||
ttk.Radiobutton(speed_btn_frame, text="Medium", variable=self.speed_var,
|
||||
value="Medium", command=lambda: self.send_command("M")).pack(side=tk.LEFT, padx=5)
|
||||
ttk.Radiobutton(speed_btn_frame, text="Fast", variable=self.speed_var,
|
||||
value="Fast", command=lambda: self.send_command("F")).pack(side=tk.LEFT, padx=5)
|
||||
# === FINE SPEED CONTROL ===
|
||||
fine_speed_frame = ttk.LabelFrame(right_frame, text="Fine Speed Control")
|
||||
fine_speed_frame.pack(fill=tk.X, pady=(0, 10))
|
||||
for text, value, preset in [("Slow", "Slow", "slow"),
|
||||
("Medium", "Medium", "medium"),
|
||||
("Fast", "Fast", "fast")]:
|
||||
ttk.Radiobutton(
|
||||
btn_frame, text=text, variable=self.speed_var,
|
||||
value=value, command=lambda p=preset: self.motion.set_speed_preset(p)
|
||||
).pack(side=tk.LEFT, padx=5)
|
||||
|
||||
# Speed values matching Arduino speedArr (index 0-5) plus 0 for stop
|
||||
self.speed_values = [0, 2, 5, 10, 30, 50, 70, 100]
|
||||
def _build_fine_speed_controls(self, parent):
|
||||
frame = ttk.LabelFrame(parent, text="Fine Speed Control")
|
||||
frame.pack(fill=tk.X, pady=(0, 10))
|
||||
|
||||
fine_inner = ttk.Frame(fine_speed_frame)
|
||||
fine_inner.pack(fill=tk.X, padx=5, pady=5)
|
||||
inner = ttk.Frame(frame)
|
||||
inner.pack(fill=tk.X, padx=5, pady=5)
|
||||
|
||||
# Current speed display
|
||||
self.fine_speed_label = ttk.Label(fine_inner, text="Speed: 50", font=('Arial', 10, 'bold'))
|
||||
self.fine_speed_label = ttk.Label(inner, text="Speed: 50", font=('Arial', 10, 'bold'))
|
||||
self.fine_speed_label.pack(anchor=tk.W)
|
||||
|
||||
# Slider
|
||||
self.speed_slider = ttk.Scale(
|
||||
fine_inner,
|
||||
from_=0,
|
||||
to=6,
|
||||
orient=tk.HORIZONTAL,
|
||||
command=self._on_speed_slider_change
|
||||
)
|
||||
self.speed_slider.set(5) # Default to middle
|
||||
self.speed_slider = ttk.Scale(inner, from_=0, to=6, orient=tk.HORIZONTAL)
|
||||
self.speed_slider.set(5)
|
||||
self.speed_slider.config(command=self._on_speed_slider_change)
|
||||
self.speed_slider.pack(fill=tk.X, pady=(5, 0))
|
||||
|
||||
# Number buttons for direct selection
|
||||
btn_row = ttk.Frame(fine_inner)
|
||||
btn_row = ttk.Frame(inner)
|
||||
btn_row.pack(fill=tk.X, pady=(5, 0))
|
||||
|
||||
for i in range(7):
|
||||
btn = ttk.Button(
|
||||
btn_row,
|
||||
text=str(i),
|
||||
width=2,
|
||||
for i in range(6):
|
||||
ttk.Button(
|
||||
btn_row, text=str(i), width=2,
|
||||
command=lambda x=i: self._set_fine_speed(x)
|
||||
)
|
||||
btn.pack(side=tk.LEFT, padx=1)
|
||||
# === MOVEMENT CONTROLS ===
|
||||
move_frame = ttk.LabelFrame(right_frame, text="Movement Control")
|
||||
move_frame.pack(fill=tk.X, pady=(0, 10))
|
||||
).pack(side=tk.LEFT, padx=1)
|
||||
|
||||
# Command mapping for each axis
|
||||
self.axis_commands = {
|
||||
'X': {'pos': 'E', 'neg': 'W', 'stop': 'e'},
|
||||
'Y': {'pos': 'N', 'neg': 'S', 'stop': 'n'},
|
||||
'Z': {'pos': 'U', 'neg': 'D', 'stop': 'u'}
|
||||
}
|
||||
def _build_movement_controls(self, parent):
|
||||
frame = ttk.LabelFrame(parent, text="Movement Control")
|
||||
frame.pack(fill=tk.X, pady=(0, 10))
|
||||
|
||||
self.dir_labels = {}
|
||||
self.move_buttons = {}
|
||||
|
||||
for axis in ["X", "Y", "Z"]:
|
||||
row = ttk.Frame(move_frame)
|
||||
row = ttk.Frame(frame)
|
||||
row.pack(fill=tk.X, pady=3, padx=5)
|
||||
|
||||
# Axis label
|
||||
ttk.Label(row, text=f"{axis}:", width=3, font=('Arial', 10, 'bold')).pack(side=tk.LEFT)
|
||||
|
||||
# Direction toggle button
|
||||
dir_btn = ttk.Button(row, text="+ →", width=5,
|
||||
command=lambda a=axis: self.toggle_direction(a))
|
||||
command=lambda a=axis: self._toggle_direction(a))
|
||||
dir_btn.pack(side=tk.LEFT, padx=2)
|
||||
self.dir_labels[axis] = dir_btn
|
||||
|
||||
# Start/Stop movement toggle
|
||||
move_btn = tk.Button(row, text="Move", width=8, bg='green', fg='white',
|
||||
command=lambda a=axis: self.toggle_movement(a))
|
||||
command=lambda a=axis: self._toggle_movement(a))
|
||||
move_btn.pack(side=tk.LEFT, padx=2)
|
||||
self.move_buttons[axis] = move_btn
|
||||
|
||||
# Quick stop for this axis
|
||||
ttk.Button(row, text="Stop", width=6,
|
||||
command=lambda a=axis: self.stop_axis(a)).pack(side=tk.LEFT, padx=2)
|
||||
command=lambda a=axis: self._stop_axis(a)).pack(side=tk.LEFT, padx=2)
|
||||
|
||||
# Stop All button
|
||||
ttk.Button(move_frame, text="Stop All Axes",
|
||||
command=lambda: self.send_command("X")).pack(fill=tk.X, padx=5, pady=5)
|
||||
ttk.Button(frame, text="Stop All Axes",
|
||||
command=self.motion.stop_all).pack(fill=tk.X, padx=5, pady=5)
|
||||
|
||||
# === AUTOFOCUS CONTROL ===
|
||||
af_frame = ttk.LabelFrame(right_frame, text="Autofocus")
|
||||
af_frame.pack(fill=tk.X, pady=(0, 10))
|
||||
def _build_autofocus_controls(self, parent):
|
||||
frame = ttk.LabelFrame(parent, text="Autofocus")
|
||||
frame.pack(fill=tk.X, pady=(0, 10))
|
||||
|
||||
af_inner = ttk.Frame(af_frame)
|
||||
af_inner.pack(fill=tk.X, padx=5, pady=5)
|
||||
inner = ttk.Frame(frame)
|
||||
inner.pack(fill=tk.X, padx=5, pady=5)
|
||||
|
||||
# Focus score display
|
||||
self.focus_score_label = ttk.Label(af_inner, text="Focus: --", font=('Arial', 10))
|
||||
self.focus_score_label = ttk.Label(inner, text="Focus: --", font=('Arial', 10))
|
||||
self.focus_score_label.pack(anchor=tk.W)
|
||||
|
||||
# Autofocus buttons
|
||||
af_btn_row = ttk.Frame(af_inner)
|
||||
af_btn_row.pack(fill=tk.X, pady=(5, 0))
|
||||
btn_row = ttk.Frame(inner)
|
||||
btn_row.pack(fill=tk.X, pady=(5, 0))
|
||||
|
||||
self.af_button = ttk.Button(af_btn_row, text="Autofocus", command=self.start_autofocus)
|
||||
self.af_button = ttk.Button(btn_row, text="Autofocus", command=self._start_autofocus)
|
||||
self.af_button.pack(side=tk.LEFT, padx=2)
|
||||
|
||||
ttk.Button(af_btn_row, text="Coarse AF", command=lambda: self.start_autofocus(coarse=True)).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(af_btn_row, text="Fine AF", command=lambda: self.start_autofocus(fine=True)).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(af_btn_row, text="Stop AF", command=self.stop_autofocus).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(btn_row, text="Coarse AF",
|
||||
command=lambda: self._start_autofocus(coarse=True)).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(btn_row, text="Fine AF",
|
||||
command=lambda: self._start_autofocus(fine=True)).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(btn_row, text="Stop AF",
|
||||
command=self._stop_autofocus).pack(side=tk.LEFT, padx=2)
|
||||
|
||||
# Live focus monitoring toggle
|
||||
self.live_focus_var = tk.BooleanVar(value=True)
|
||||
ttk.Checkbutton(af_inner, text="Show live focus score",
|
||||
ttk.Checkbutton(inner, text="Show live focus score",
|
||||
variable=self.live_focus_var).pack(anchor=tk.W, pady=(5, 0))
|
||||
|
||||
def _build_status_controls(self, parent):
|
||||
frame = ttk.LabelFrame(parent, text="Status")
|
||||
frame.pack(fill=tk.X, pady=(0, 10))
|
||||
|
||||
# === POSITION & STATUS ===
|
||||
pos_frame = ttk.LabelFrame(right_frame, text="Status")
|
||||
pos_frame.pack(fill=tk.X, pady=(0, 10))
|
||||
btn_frame = ttk.Frame(frame)
|
||||
btn_frame.pack(fill=tk.X, padx=5, pady=5)
|
||||
|
||||
pos_btn_frame = ttk.Frame(pos_frame)
|
||||
pos_btn_frame.pack(fill=tk.X, padx=5, pady=5)
|
||||
ttk.Button(btn_frame, text="Status", command=self.motion.request_status).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(btn_frame, text="Zero All", command=self.motion.zero_all).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(btn_frame, text="Go Origin", command=self.motion.go_origin).pack(side=tk.LEFT, padx=2)
|
||||
|
||||
ttk.Button(pos_btn_frame, text="Status", command=lambda: self.send_command("?")).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(pos_btn_frame, text="Zero All", command=lambda: self.send_command("Z")).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(pos_btn_frame, text="Go Origin", command=lambda: self.send_command("G")).pack(side=tk.LEFT, padx=2)
|
||||
def _build_mode_controls(self, parent):
|
||||
frame = ttk.LabelFrame(parent, text="Control Mode")
|
||||
frame.pack(fill=tk.X, pady=(0, 10))
|
||||
|
||||
# === MODE CONTROL ===
|
||||
mode_frame = ttk.LabelFrame(right_frame, text="Control Mode")
|
||||
mode_frame.pack(fill=tk.X, pady=(0, 10))
|
||||
btn_frame = ttk.Frame(frame)
|
||||
btn_frame.pack(fill=tk.X, padx=5, pady=5)
|
||||
|
||||
mode_btn_frame = ttk.Frame(mode_frame)
|
||||
mode_btn_frame.pack(fill=tk.X, padx=5, pady=5)
|
||||
|
||||
self.mode_label = ttk.Label(mode_btn_frame, text="Mode: Serial", font=('Arial', 9))
|
||||
self.mode_label = ttk.Label(btn_frame, text="Mode: Serial", font=('Arial', 9))
|
||||
self.mode_label.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
ttk.Button(mode_btn_frame, text="Toggle Mode", command=lambda: self.send_command("T")).pack(side=tk.RIGHT, padx=2)
|
||||
ttk.Button(btn_frame, text="Toggle Mode", command=self.motion.toggle_mode).pack(side=tk.RIGHT, padx=2)
|
||||
|
||||
# === SERIAL LOG ===
|
||||
ttk.Label(right_frame, text="Serial Log:").pack(anchor=tk.W)
|
||||
self.serial_log = scrolledtext.ScrolledText(right_frame, height=10, width=35, state=tk.DISABLED)
|
||||
def _build_serial_log(self, parent):
|
||||
ttk.Label(parent, text="Serial Log:").pack(anchor=tk.W)
|
||||
self.serial_log = scrolledtext.ScrolledText(parent, height=10, width=35, state=tk.DISABLED)
|
||||
self.serial_log.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
|
||||
|
||||
# === CUSTOM COMMAND ===
|
||||
ttk.Label(right_frame, text="Send Command:").pack(anchor=tk.W)
|
||||
cmd_frame = ttk.Frame(right_frame)
|
||||
def _build_command_entry(self, parent):
|
||||
ttk.Label(parent, text="Send Command:").pack(anchor=tk.W)
|
||||
cmd_frame = ttk.Frame(parent)
|
||||
cmd_frame.pack(fill=tk.X)
|
||||
|
||||
self.cmd_entry = ttk.Entry(cmd_frame)
|
||||
self.cmd_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
|
||||
self.cmd_entry.bind("<Return>", lambda e: self.send_custom_command())
|
||||
self.cmd_entry.bind("<Return>", lambda e: self._send_custom_command())
|
||||
|
||||
ttk.Button(cmd_frame, text="Send", command=self.send_custom_command).pack(side=tk.RIGHT, padx=(5, 0))
|
||||
ttk.Button(cmd_frame, text="Send", command=self._send_custom_command).pack(side=tk.RIGHT, padx=(5, 0))
|
||||
|
||||
def toggle_direction(self, axis):
|
||||
"""Toggle direction for an axis"""
|
||||
self.axis_direction[axis] *= -1
|
||||
direction = "+" if self.axis_direction[axis] == 1 else "-"
|
||||
arrow = "→" if self.axis_direction[axis] == 1 else "←"
|
||||
self.dir_labels[axis].config(text=f"{direction} {arrow}")
|
||||
# === UI Event Handlers ===
|
||||
|
||||
# If currently moving, restart in new direction
|
||||
if self.axis_moving[axis]:
|
||||
self.stop_axis(axis)
|
||||
self.start_movement(axis)
|
||||
def _toggle_direction(self, axis):
|
||||
direction = self.motion.toggle_direction(axis)
|
||||
arrow = "→" if direction == 1 else "←"
|
||||
sign = "+" if direction == 1 else "-"
|
||||
self.dir_labels[axis].config(text=f"{sign} {arrow}")
|
||||
|
||||
def toggle_movement(self, axis):
|
||||
"""Toggle movement on/off for an axis"""
|
||||
if self.axis_moving[axis]:
|
||||
self.stop_axis(axis)
|
||||
else:
|
||||
self.start_movement(axis)
|
||||
|
||||
def start_movement(self, axis):
|
||||
"""Start continuous movement on an axis"""
|
||||
commands = self.axis_commands[axis]
|
||||
cmd = commands['pos'] if self.axis_direction[axis] == 1 else commands['neg']
|
||||
self.send_command(cmd)
|
||||
self.axis_moving[axis] = True
|
||||
if self.motion.is_moving(axis):
|
||||
self.motion.stop_axis(axis)
|
||||
self.motion.start_movement(axis)
|
||||
self.move_buttons[axis].config(text="Moving", bg='orange')
|
||||
|
||||
def stop_axis(self, axis):
|
||||
"""Stop movement on a specific axis"""
|
||||
commands = self.axis_commands[axis]
|
||||
self.send_command(commands['stop'])
|
||||
self.axis_moving[axis] = False
|
||||
def _toggle_movement(self, axis):
|
||||
if self.motion.is_moving(axis):
|
||||
self._stop_axis(axis)
|
||||
else:
|
||||
self.motion.start_movement(axis)
|
||||
self.move_buttons[axis].config(text="Moving", bg='orange')
|
||||
|
||||
def _stop_axis(self, axis):
|
||||
self.motion.stop_axis(axis)
|
||||
self.move_buttons[axis].config(text="Move", bg='green')
|
||||
|
||||
def send_command(self, cmd):
|
||||
"""Send a command and log it"""
|
||||
self.log_message(f"> {cmd}")
|
||||
if self.arduino:
|
||||
self.arduino.send_command(cmd)
|
||||
|
||||
def send_custom_command(self):
|
||||
"""Send command from entry box"""
|
||||
cmd = self.cmd_entry.get().strip()
|
||||
if cmd:
|
||||
self.send_command(cmd)
|
||||
self.cmd_entry.delete(0, tk.END)
|
||||
|
||||
def _on_speed_slider_change(self, value):
|
||||
"""Handle slider movement"""
|
||||
if self._updating_slider:
|
||||
return
|
||||
# Round to nearest integer
|
||||
index = round(float(value))
|
||||
self._set_fine_speed(index)
|
||||
|
||||
def _set_fine_speed(self, index):
|
||||
"""Set fine speed by index (0-10)"""
|
||||
# Prevent recursion
|
||||
self._updating_slider = True
|
||||
# Update slider position
|
||||
self.speed_slider.set(index)
|
||||
|
||||
# Update label with actual speed value
|
||||
speed_val = self.speed_values[index]
|
||||
speed_val = self.motion.get_speed_value(index)
|
||||
self.fine_speed_label.config(text=f"Speed: {speed_val}")
|
||||
self._updating_slider = False
|
||||
self.motion.set_speed(index)
|
||||
|
||||
# Send command - '0' for index 0, '1'-'9' for 1-9, 'a' for 10
|
||||
if index == 0:
|
||||
self.send_command("0")
|
||||
elif index <= 9:
|
||||
self.send_command(str(index))
|
||||
else:
|
||||
# For index 10 (speed 300), you'd need to add a handler on Arduino
|
||||
# For now, send '9' as max or add a new command
|
||||
self.send_command("9")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def get_focus_score(self):
|
||||
"""Get current focus score from camera"""
|
||||
try:
|
||||
frame = self.camera.capture_frame()
|
||||
from vision import calculate_focus_score_sobel
|
||||
score = calculate_focus_score_sobel(frame)
|
||||
return score
|
||||
except Exception as e:
|
||||
print(f"Focus score error: {e}")
|
||||
return 0
|
||||
|
||||
def start_autofocus(self, coarse=False, fine=False):
|
||||
"""Start autofocus routine in background thread"""
|
||||
if self.autofocus_running:
|
||||
self.log_message("Autofocus already running")
|
||||
return
|
||||
|
||||
self.autofocus_running = True
|
||||
def _start_autofocus(self, coarse=False, fine=False):
|
||||
if self.autofocus.start(coarse=coarse, fine=fine):
|
||||
self.af_button.config(state='disabled')
|
||||
|
||||
# Determine speed range based on mode
|
||||
if coarse:
|
||||
speed_range = (4, 6) # Faster speeds: 30, 50, 70
|
||||
elif fine:
|
||||
speed_range = (1, 3) # Slower speeds: 2, 5, 10
|
||||
else:
|
||||
speed_range = (1, 6) # Full range
|
||||
|
||||
af_thread = threading.Thread(
|
||||
target=self._autofocus_routine,
|
||||
args=(speed_range,),
|
||||
daemon=True
|
||||
)
|
||||
af_thread.start()
|
||||
|
||||
def stop_autofocus(self):
|
||||
"""Stop autofocus routine"""
|
||||
self.autofocus_running = False
|
||||
self.send_command('u') # Stop Z axis
|
||||
def _stop_autofocus(self):
|
||||
self.autofocus.stop()
|
||||
self.af_button.config(state='normal')
|
||||
self.log_message("Autofocus stopped")
|
||||
|
||||
def _get_averaged_focus(self, samples=5, delay_between=0.05):
|
||||
"""
|
||||
Get averaged focus score from multiple samples.
|
||||
Reduces noise in focus measurements.
|
||||
"""
|
||||
import time
|
||||
scores = []
|
||||
for _ in range(samples):
|
||||
scores.append(self.get_focus_score())
|
||||
time.sleep(delay_between)
|
||||
def _send_custom_command(self):
|
||||
cmd = self.cmd_entry.get().strip()
|
||||
if cmd:
|
||||
self.motion.send_command(cmd)
|
||||
self.cmd_entry.delete(0, tk.END)
|
||||
|
||||
# Remove outliers (optional - take middle 60%)
|
||||
scores.sort()
|
||||
trim = len(scores) // 5 # Remove top and bottom 20%
|
||||
if trim > 0:
|
||||
scores = scores[trim:-trim]
|
||||
|
||||
return sum(scores) / len(scores)
|
||||
|
||||
def _autofocus_routine(self, speed_range):
|
||||
"""
|
||||
Autofocus using sweep search + hill climbing.
|
||||
Phase 1: Sweep across range to find approximate peak
|
||||
Phase 2: Fine tune from best position found
|
||||
"""
|
||||
import time
|
||||
|
||||
self.log_message(f"Starting autofocus (speed range: {speed_range})")
|
||||
|
||||
min_speed_idx, max_speed_idx = speed_range
|
||||
|
||||
# Timing settings
|
||||
SWEEP_MOVE_TIME = 2 # Movement time per sweep step
|
||||
SWEEP_SETTLE_TIME = 0.25 # Settle time during sweep
|
||||
SWEEP_SAMPLES = 10
|
||||
|
||||
FINE_MOVE_TIME = 0.15
|
||||
FINE_SETTLE_TIME = 0.25
|
||||
FINE_SAMPLES = 10
|
||||
|
||||
# Number of steps in each direction for sweep
|
||||
SWEEP_STEPS = 15 # Total sweep = 2 * SWEEP_STEPS positions
|
||||
|
||||
# ========================================
|
||||
# Phase 1: Sweep search
|
||||
# ========================================
|
||||
self.log_message("Phase 1: Sweep search")
|
||||
|
||||
# Use medium speed for sweep
|
||||
sweep_speed = (min_speed_idx + max_speed_idx) // 2
|
||||
self._set_fine_speed_no_send(sweep_speed)
|
||||
self.send_command(str(sweep_speed))
|
||||
time.sleep(0.1)
|
||||
|
||||
# Record starting position score
|
||||
time.sleep(SWEEP_SETTLE_TIME)
|
||||
start_score = self._get_averaged_focus(samples=SWEEP_SAMPLES)
|
||||
self._update_focus_display(start_score)
|
||||
|
||||
# Data structure: (steps_from_start, score)
|
||||
# Positive = up from start, negative = down from start
|
||||
sweep_data = [(0, start_score)]
|
||||
self.log_message(f"Start position: {start_score:.1f}")
|
||||
|
||||
if not self.autofocus_running:
|
||||
return
|
||||
|
||||
# Sweep DOWN first tyo avoid objective / slide contact
|
||||
self.log_message(f"Sweeping down {SWEEP_STEPS} steps...")
|
||||
for i in range(1, SWEEP_STEPS + 1):
|
||||
if not self.autofocus_running:
|
||||
return
|
||||
|
||||
self.send_command('D')
|
||||
time.sleep(SWEEP_MOVE_TIME)
|
||||
self.send_command('d')
|
||||
time.sleep(SWEEP_SETTLE_TIME)
|
||||
|
||||
score = self._get_averaged_focus(samples=SWEEP_SAMPLES)
|
||||
sweep_data.append((-i, score))
|
||||
self._update_focus_display(score)
|
||||
|
||||
if i % 5 == 0:
|
||||
self.log_message(f" Step -{i}: {score:.1f}")
|
||||
|
||||
if not self.autofocus_running:
|
||||
return
|
||||
|
||||
# Return to start
|
||||
self.log_message("Returning to start...")
|
||||
for _ in range(SWEEP_STEPS):
|
||||
if not self.autofocus_running:
|
||||
return
|
||||
self.send_command('U')
|
||||
time.sleep(SWEEP_MOVE_TIME)
|
||||
self.send_command('u')
|
||||
time.sleep(0.1) # Faster return, no need to sample
|
||||
|
||||
time.sleep(SWEEP_SETTLE_TIME)
|
||||
|
||||
if not self.autofocus_running:
|
||||
return
|
||||
|
||||
# Sweep UP
|
||||
self.log_message(f"Sweeping up {SWEEP_STEPS} steps...")
|
||||
for i in range(1, SWEEP_STEPS + 1):
|
||||
if not self.autofocus_running:
|
||||
return
|
||||
|
||||
self.send_command('U')
|
||||
time.sleep(SWEEP_MOVE_TIME)
|
||||
self.send_command('u')
|
||||
time.sleep(SWEEP_SETTLE_TIME)
|
||||
|
||||
score = self._get_averaged_focus(samples=SWEEP_SAMPLES)
|
||||
sweep_data.append((i, score))
|
||||
self._update_focus_display(score)
|
||||
|
||||
if i % 5 == 0:
|
||||
self.log_message(f" Step +{i}: {score:.1f}")
|
||||
|
||||
if not self.autofocus_running:
|
||||
return
|
||||
|
||||
# ========================================
|
||||
# Find best position from sweep
|
||||
# ========================================
|
||||
best_step, best_score = max(sweep_data, key=lambda x: x[1])
|
||||
self.log_message(f"Best found at step {best_step}: {best_score:.1f}")
|
||||
|
||||
# Log the full sweep curve for debugging
|
||||
self.log_message("Sweep curve:")
|
||||
sorted_data = sorted(sweep_data, key=lambda x: x[0])
|
||||
curve_str = " ".join([f"{s[1]:.0f}" for s in sorted_data])
|
||||
self.log_message(f" {curve_str}")
|
||||
|
||||
# ========================================
|
||||
# Move to best position
|
||||
# ========================================
|
||||
# Currently at +SWEEP_STEPS, need to go to best_step
|
||||
steps_to_move = SWEEP_STEPS - best_step # Positive = move down
|
||||
|
||||
if steps_to_move > 0:
|
||||
self.log_message(f"Moving down {steps_to_move} steps to best position")
|
||||
move_cmd, stop_cmd = 'D', 'd'
|
||||
else:
|
||||
self.log_message(f"Moving up {abs(steps_to_move)} steps to best position")
|
||||
move_cmd, stop_cmd = 'U', 'u'
|
||||
steps_to_move = abs(steps_to_move)
|
||||
|
||||
for _ in range(steps_to_move):
|
||||
if not self.autofocus_running:
|
||||
return
|
||||
self.send_command(move_cmd)
|
||||
time.sleep(SWEEP_MOVE_TIME)
|
||||
self.send_command(stop_cmd)
|
||||
time.sleep(0.1)
|
||||
|
||||
time.sleep(SWEEP_SETTLE_TIME)
|
||||
current_score = self._get_averaged_focus(samples=SWEEP_SAMPLES)
|
||||
self.log_message(f"At best position: {current_score:.1f}")
|
||||
|
||||
if not self.autofocus_running:
|
||||
return
|
||||
|
||||
# ========================================
|
||||
# Phase 2: Fine hill-climbing from best position
|
||||
# ========================================
|
||||
self.log_message("Phase 2: Fine tuning")
|
||||
|
||||
fine_speed = min_speed_idx
|
||||
self._set_fine_speed_no_send(fine_speed)
|
||||
self.send_command(str(fine_speed))
|
||||
time.sleep(0.1)
|
||||
|
||||
best_score = current_score
|
||||
|
||||
# Try both directions to find which improves
|
||||
# Try UP first
|
||||
self.send_command('D')
|
||||
time.sleep(FINE_MOVE_TIME)
|
||||
self.send_command('d')
|
||||
time.sleep(FINE_SETTLE_TIME)
|
||||
|
||||
down_score = self._get_averaged_focus(samples=FINE_SAMPLES)
|
||||
|
||||
if down_score > best_score:
|
||||
fine_direction = 0 # DOWN
|
||||
best_score = down_score
|
||||
self.log_message(f"Fine direction: UP ({down_score:.1f})")
|
||||
else:
|
||||
# Go back down and try down
|
||||
self.send_command('U')
|
||||
time.sleep(FINE_MOVE_TIME)
|
||||
self.send_command('u')
|
||||
time.sleep(FINE_SETTLE_TIME)
|
||||
|
||||
self.send_command('U')
|
||||
time.sleep(FINE_MOVE_TIME)
|
||||
self.send_command('u')
|
||||
time.sleep(FINE_SETTLE_TIME)
|
||||
|
||||
up_score = self._get_averaged_focus(samples=FINE_SAMPLES)
|
||||
|
||||
if up_score > best_score:
|
||||
fine_direction = 1 # UP
|
||||
best_score = up_score
|
||||
self.log_message(f"Fine direction: DOWN ({up_score:.1f})")
|
||||
else:
|
||||
# We're already at the peak
|
||||
self.log_message("Already at peak, minor adjustment only")
|
||||
fine_direction = 1 # Try up anyway
|
||||
# Go back to center
|
||||
self.send_command('U')
|
||||
time.sleep(FINE_MOVE_TIME)
|
||||
self.send_command('u')
|
||||
time.sleep(FINE_SETTLE_TIME)
|
||||
|
||||
if not self.autofocus_running:
|
||||
return
|
||||
|
||||
# Fine search
|
||||
move_cmd = 'U' if fine_direction else 'D'
|
||||
stop_cmd = 'u' if fine_direction else 'd'
|
||||
reverse_cmd = 'D' if fine_direction else 'U'
|
||||
reverse_stop = 'd' if fine_direction else 'u'
|
||||
|
||||
no_improvement_count = 0
|
||||
max_fine_no_improvement = 5
|
||||
best_position_offset = 0
|
||||
|
||||
while self.autofocus_running and no_improvement_count < max_fine_no_improvement:
|
||||
self.send_command(move_cmd)
|
||||
time.sleep(FINE_MOVE_TIME)
|
||||
self.send_command(stop_cmd)
|
||||
time.sleep(FINE_SETTLE_TIME)
|
||||
|
||||
current_score = self._get_averaged_focus(samples=FINE_SAMPLES)
|
||||
self._update_focus_display(current_score)
|
||||
|
||||
if current_score > best_score:
|
||||
best_score = current_score
|
||||
no_improvement_count = 0
|
||||
best_position_offset = 0
|
||||
self.log_message(f"Fine better: {current_score:.1f}")
|
||||
else:
|
||||
no_improvement_count += 1
|
||||
best_position_offset += 1
|
||||
|
||||
if not self.autofocus_running:
|
||||
return
|
||||
|
||||
# Return to best fine position
|
||||
if best_position_offset > 0:
|
||||
self.log_message(f"Returning {best_position_offset} steps")
|
||||
for _ in range(best_position_offset):
|
||||
self.send_command(reverse_cmd)
|
||||
time.sleep(FINE_MOVE_TIME)
|
||||
self.send_command(reverse_stop)
|
||||
time.sleep(0.1)
|
||||
|
||||
# Final reading
|
||||
time.sleep(FINE_SETTLE_TIME)
|
||||
final_score = self._get_averaged_focus(samples=FINE_SAMPLES)
|
||||
self._update_focus_display(final_score)
|
||||
|
||||
self.autofocus_running = False
|
||||
self.log_message(f"Autofocus complete. Final: {final_score:.1f}")
|
||||
|
||||
self.root.after(0, lambda: self.af_button.config(state='normal'))
|
||||
|
||||
|
||||
|
||||
def _set_fine_speed_no_send(self, index):
|
||||
"""Update slider/label without sending command (for internal use)"""
|
||||
self._updating_slider = True
|
||||
self.speed_slider.set(index)
|
||||
speed_val = self.speed_values[index]
|
||||
self.fine_speed_label.config(text=f"Speed: {speed_val}")
|
||||
self._updating_slider = False
|
||||
|
||||
def _update_focus_display(self, score):
|
||||
"""Update focus score display (thread-safe)"""
|
||||
self.root.after(0, lambda: self.focus_score_label.config(text=f"Focus: {score:.1f}"))
|
||||
# === Logging ===
|
||||
|
||||
def log_message(self, msg):
|
||||
"""Add message to serial log"""
|
||||
if not hasattr(self, 'serial_log'):
|
||||
return
|
||||
|
||||
self.serial_log.config(state=tk.NORMAL)
|
||||
self.serial_log.insert(tk.END, msg + "\n")
|
||||
self.serial_log.see(tk.END)
|
||||
self.serial_log.config(state=tk.DISABLED)
|
||||
print(f" Log: {msg}")
|
||||
|
||||
# Check for mode changes in received messages
|
||||
if msg.startswith("< MODE:"):
|
||||
mode = msg.split(":")[-1]
|
||||
self.mode_label.config(text=f"Mode: {mode.capitalize()}")
|
||||
|
||||
# === Serial & Camera ===
|
||||
|
||||
def _serial_reader(self):
|
||||
"""Background thread to read serial data"""
|
||||
while self.running:
|
||||
if self.arduino and self.arduino.ser.in_waiting:
|
||||
try:
|
||||
|
|
@ -661,24 +310,20 @@ class AppGUI:
|
|||
threading.Event().wait(0.05)
|
||||
|
||||
def _process_serial_queue(self):
|
||||
"""Process queued serial messages (call from main thread)"""
|
||||
while not self.serial_queue.empty():
|
||||
msg = self.serial_queue.get_nowait()
|
||||
self.log_message(f"< {msg}")
|
||||
|
||||
def update_camera(self):
|
||||
"""Capture and display one frame"""
|
||||
try:
|
||||
frame = self.camera.capture_frame()
|
||||
|
||||
# Show live focus score if enabled
|
||||
if self.live_focus_var.get():
|
||||
score = self._get_averaged_focus()
|
||||
if self.live_focus_var.get() and not self.autofocus.is_running():
|
||||
score = self.autofocus.get_focus_score()
|
||||
self.focus_score_label.config(text=f"Focus: {score:.1f}")
|
||||
|
||||
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
|
||||
# Resize if needed
|
||||
h, w = frame.shape[:2]
|
||||
max_width = 640
|
||||
if w > max_width:
|
||||
|
|
@ -693,18 +338,24 @@ class AppGUI:
|
|||
except Exception as e:
|
||||
print(f"Camera error: {e}")
|
||||
|
||||
# === Main Loop ===
|
||||
|
||||
def run(self):
|
||||
"""Main loop"""
|
||||
def update():
|
||||
if self.running:
|
||||
self.update_camera()
|
||||
self._process_serial_queue()
|
||||
|
||||
# Re-enable AF button when done
|
||||
if not self.autofocus.is_running():
|
||||
self.af_button.config(state='normal')
|
||||
|
||||
self.root.after(33, update)
|
||||
|
||||
update()
|
||||
self.root.mainloop()
|
||||
|
||||
def on_close(self):
|
||||
"""Clean shutdown"""
|
||||
self.running = False
|
||||
self.autofocus.stop()
|
||||
self.root.destroy()
|
||||
124
src/motion_controller.py
Normal file
124
src/motion_controller.py
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
"""
|
||||
Motion controller - manages axis movement state and commands.
|
||||
Abstracts the command protocol from the GUI.
|
||||
"""
|
||||
|
||||
class MotionController:
|
||||
# Command mapping for each axis
|
||||
AXIS_COMMANDS = {
|
||||
'X': {'pos': 'E', 'neg': 'W', 'stop': 'e'},
|
||||
'Y': {'pos': 'N', 'neg': 'S', 'stop': 'n'},
|
||||
'Z': {'pos': 'U', 'neg': 'D', 'stop': 'u'}
|
||||
}
|
||||
|
||||
# Speed values matching Arduino speedArr
|
||||
SPEED_VALUES = [0, 2, 5, 10, 30, 50, 70]
|
||||
|
||||
def __init__(self, arduino, on_command_sent=None):
|
||||
"""
|
||||
Args:
|
||||
arduino: Arduino instance for sending commands
|
||||
on_command_sent: Optional callback(cmd) when command is sent
|
||||
"""
|
||||
self.arduino = arduino
|
||||
self.on_command_sent = on_command_sent
|
||||
|
||||
# Axis state
|
||||
self.axis_direction = {'X': 1, 'Y': 1, 'Z': 1}
|
||||
self.axis_moving = {'X': False, 'Y': False, 'Z': False}
|
||||
|
||||
# Speed state
|
||||
self.current_speed_index = 5
|
||||
|
||||
def send_command(self, cmd):
|
||||
"""Send command to Arduino and notify callback"""
|
||||
if self.arduino:
|
||||
self.arduino.send_command(cmd)
|
||||
if self.on_command_sent:
|
||||
self.on_command_sent(cmd)
|
||||
|
||||
# === Axis Movement ===
|
||||
|
||||
def get_direction(self, axis):
|
||||
"""Get current direction for axis (1 = positive, -1 = negative)"""
|
||||
return self.axis_direction[axis]
|
||||
|
||||
def toggle_direction(self, axis):
|
||||
"""Toggle direction and return new direction"""
|
||||
self.axis_direction[axis] *= -1
|
||||
return self.axis_direction[axis]
|
||||
|
||||
def is_moving(self, axis):
|
||||
"""Check if axis is currently moving"""
|
||||
return self.axis_moving[axis]
|
||||
|
||||
def start_movement(self, axis):
|
||||
"""Start continuous movement on an axis"""
|
||||
commands = self.AXIS_COMMANDS[axis]
|
||||
cmd = commands['pos'] if self.axis_direction[axis] == 1 else commands['neg']
|
||||
self.send_command(cmd)
|
||||
self.axis_moving[axis] = True
|
||||
|
||||
def stop_axis(self, axis):
|
||||
"""Stop movement on a specific axis"""
|
||||
commands = self.AXIS_COMMANDS[axis]
|
||||
self.send_command(commands['stop'])
|
||||
self.axis_moving[axis] = False
|
||||
|
||||
def stop_all(self):
|
||||
"""Emergency stop all axes"""
|
||||
self.send_command('!')
|
||||
for axis in self.axis_moving:
|
||||
self.axis_moving[axis] = False
|
||||
|
||||
def move_z_up(self):
|
||||
"""Start Z axis moving up"""
|
||||
self.send_command('U')
|
||||
|
||||
def move_z_down(self):
|
||||
"""Start Z axis moving down"""
|
||||
self.send_command('D')
|
||||
|
||||
def stop_z(self):
|
||||
"""Stop Z axis"""
|
||||
self.send_command('u')
|
||||
|
||||
# === Speed Control ===
|
||||
|
||||
def get_speed_value(self, index):
|
||||
"""Get actual speed value for index"""
|
||||
if 0 <= index < len(self.SPEED_VALUES):
|
||||
return self.SPEED_VALUES[index]
|
||||
return 0
|
||||
|
||||
def set_speed(self, index):
|
||||
"""Set speed by index (0-6)"""
|
||||
self.current_speed_index = index
|
||||
if index == 0:
|
||||
self.send_command("0")
|
||||
elif index <= 6:
|
||||
self.send_command(str(index))
|
||||
|
||||
def set_speed_preset(self, preset):
|
||||
"""Set speed by preset name: 'slow', 'medium', 'fast'"""
|
||||
presets = {'slow': 'L', 'medium': 'M', 'fast': 'F'}
|
||||
if preset in presets:
|
||||
self.send_command(presets[preset])
|
||||
|
||||
# === Status Commands ===
|
||||
|
||||
def request_status(self):
|
||||
"""Request status from Arduino"""
|
||||
self.send_command('?')
|
||||
|
||||
def zero_all(self):
|
||||
"""Zero all axis positions"""
|
||||
self.send_command('Z')
|
||||
|
||||
def go_origin(self):
|
||||
"""Move to origin position"""
|
||||
self.send_command('G')
|
||||
|
||||
def toggle_mode(self):
|
||||
"""Toggle between serial/joystick mode"""
|
||||
self.send_command('T')
|
||||
Loading…
Reference in a new issue