From 8bed3511a9f6d5c8eccd759b772e9633f6123a73 Mon Sep 17 00:00:00 2001 From: Shaiv Kamat Date: Fri, 26 Dec 2025 17:54:29 -0800 Subject: [PATCH] Refactored GUI --- src/autofocus.py | 371 ++++++++++++++++++++ src/gui.py | 729 ++++++++++----------------------------- src/motion_controller.py | 124 +++++++ 3 files changed, 685 insertions(+), 539 deletions(-) create mode 100644 src/autofocus.py create mode 100644 src/motion_controller.py diff --git a/src/autofocus.py b/src/autofocus.py new file mode 100644 index 0000000..cbee136 --- /dev/null +++ b/src/autofocus.py @@ -0,0 +1,371 @@ +""" +Autofocus module - handles focus detection and autofocus routines. +""" + +import time +import threading +from vision import calculate_focus_score_sobel + + +class AutofocusController: + """Manages autofocus operations""" + + # Default timing settings + DEFAULT_SETTINGS = { + 'sweep_move_time': 2.0, + 'sweep_settle_time': 0.25, + 'sweep_samples': 10, + 'sweep_steps': 15, + 'fine_move_time': 0.15, + 'fine_settle_time': 0.25, + 'fine_samples': 10, + 'fine_max_no_improvement': 5, + } + + def __init__(self, camera, motion_controller, on_log=None, on_focus_update=None): + """ + Args: + camera: Camera instance for capturing frames + motion_controller: MotionController for Z axis control + on_log: Callback(message) for logging + on_focus_update: Callback(score) for updating focus display + """ + self.camera = camera + self.motion = motion_controller + self.on_log = on_log + self.on_focus_update = on_focus_update + + self.running = False + self.settings = self.DEFAULT_SETTINGS.copy() + self._thread = None + + def log(self, message): + """Log a message""" + if self.on_log: + self.on_log(message) + + def update_focus_display(self, score): + """Update focus display""" + if self.on_focus_update: + self.on_focus_update(score) + + # === Focus Measurement === + + def get_focus_score(self): + """Get current focus score from camera""" + try: + frame = self.camera.capture_frame() + return calculate_focus_score_sobel(frame) + except Exception as e: + self.log(f"Focus score error: {e}") + return 0 + + def get_averaged_focus(self, samples=5, delay_between=0.05): + """ + Get averaged focus score from multiple samples. + Removes outliers for more stable readings. + """ + scores = [] + for _ in range(samples): + scores.append(self.get_focus_score()) + time.sleep(delay_between) + + # Remove outliers (top and bottom 20%) + scores.sort() + trim = len(scores) // 5 + if trim > 0: + scores = scores[trim:-trim] + + return sum(scores) / len(scores) if scores else 0 + + # === Autofocus Control === + + def start(self, speed_range=(1, 5), coarse=False, fine=False): + """Start autofocus in background thread""" + if self.running: + self.log("Autofocus already running") + return False + + # Determine speed range based on mode + if coarse: + speed_range = (4, 5) + elif fine: + speed_range = (1, 3) + + self.running = True + self._thread = threading.Thread( + target=self._autofocus_routine, + args=(speed_range,), + daemon=True + ) + self._thread.start() + return True + + def stop(self): + """Stop autofocus routine""" + self.running = False + self.motion.stop_z() + self.log("Autofocus stopped") + + def is_running(self): + """Check if autofocus is currently running""" + return self.running + + # === Autofocus Algorithm === + + def _autofocus_routine(self, speed_range): + """ + Autofocus using sweep search + hill climbing. + Phase 1: Sweep across range to find approximate peak + Phase 2: Fine tune from best position found + """ + self.log(f"Starting autofocus (speed range: {speed_range})") + + min_speed_idx, max_speed_idx = speed_range + s = self.settings # Shorthand + + try: + # Phase 1: Sweep search + best_step, best_score = self._sweep_search( + min_speed_idx, max_speed_idx, s + ) + + if not self.running: + return + + # Move to best position found + self._move_to_best_position(best_step, s) + + if not self.running: + return + + # Phase 2: Fine tuning + final_score = self._fine_tune(min_speed_idx, s) + + self.log(f"Autofocus complete. Final: {final_score:.1f}") + + except Exception as e: + self.log(f"Autofocus error: {e}") + finally: + self.running = False + + def _sweep_search(self, min_speed_idx, max_speed_idx, s): + """Phase 1: Sweep to find approximate best position""" + self.log("Phase 1: Sweep search") + + # Use medium speed for sweep + sweep_speed = (min_speed_idx + max_speed_idx) // 2 + self.motion.set_speed(sweep_speed) + time.sleep(0.1) + + # Record starting position + time.sleep(s['sweep_settle_time']) + start_score = self.get_averaged_focus(samples=s['sweep_samples']) + self.update_focus_display(start_score) + + sweep_data = [(0, start_score)] + self.log(f"Start position: {start_score:.1f}") + + if not self.running: + return 0, start_score + + # Sweep DOWN first (away from slide to avoid contact) + self.log(f"Sweeping down {s['sweep_steps']} steps...") + for i in range(1, s['sweep_steps'] + 1): + if not self.running: + return 0, start_score + + self.motion.move_z_down() + time.sleep(s['sweep_move_time']) + self.motion.stop_z() + time.sleep(s['sweep_settle_time']) + + score = self.get_averaged_focus(samples=s['sweep_samples']) + sweep_data.append((-i, score)) + self.update_focus_display(score) + + if i % 5 == 0: + self.log(f" Step -{i}: {score:.1f}") + + if not self.running: + return 0, start_score + + # Return to start + self.log("Returning to start...") + for _ in range(s['sweep_steps']): + if not self.running: + return 0, start_score + self.motion.move_z_up() + time.sleep(s['sweep_move_time']) + self.motion.stop_z() + time.sleep(0.1) + + time.sleep(s['sweep_settle_time']) + + if not self.running: + return 0, start_score + + # Sweep UP + self.log(f"Sweeping up {s['sweep_steps']} steps...") + for i in range(1, s['sweep_steps'] + 1): + if not self.running: + return 0, start_score + + self.motion.move_z_up() + time.sleep(s['sweep_move_time']) + self.motion.stop_z() + time.sleep(s['sweep_settle_time']) + + score = self.get_averaged_focus(samples=s['sweep_samples']) + sweep_data.append((i, score)) + self.update_focus_display(score) + + if i % 5 == 0: + self.log(f" Step +{i}: {score:.1f}") + + # Find best position + best_step, best_score = max(sweep_data, key=lambda x: x[1]) + self.log(f"Best found at step {best_step}: {best_score:.1f}") + + # Log sweep curve + sorted_data = sorted(sweep_data, key=lambda x: x[0]) + curve_str = " ".join([f"{d[1]:.0f}" for d in sorted_data]) + self.log(f"Sweep curve: {curve_str}") + + return best_step, best_score + + def _move_to_best_position(self, best_step, s): + """Move from current position (+SWEEP_STEPS) to best_step""" + steps_to_move = s['sweep_steps'] - best_step + + if steps_to_move > 0: + self.log(f"Moving down {steps_to_move} steps to best position") + move_func = self.motion.move_z_down + else: + self.log(f"Moving up {abs(steps_to_move)} steps to best position") + move_func = self.motion.move_z_up + steps_to_move = abs(steps_to_move) + + for _ in range(steps_to_move): + if not self.running: + return + move_func() + time.sleep(s['sweep_move_time']) + self.motion.stop_z() + time.sleep(0.1) + + time.sleep(s['sweep_settle_time']) + current_score = self.get_averaged_focus(samples=s['sweep_samples']) + self.log(f"At best position: {current_score:.1f}") + + def _fine_tune(self, min_speed_idx, s): + """Phase 2: Fine hill-climbing from best position""" + self.log("Phase 2: Fine tuning") + + self.motion.set_speed(min_speed_idx) + time.sleep(0.1) + + best_score = self.get_averaged_focus(samples=s['fine_samples']) + + # Determine fine direction by testing both + fine_direction = self._determine_fine_direction(best_score, s) + + if not self.running: + return best_score + + # Fine search + best_score, best_position_offset = self._fine_search( + fine_direction, best_score, s + ) + + if not self.running: + return best_score + + # Return to best position + if best_position_offset > 0: + self._return_to_best(fine_direction, best_position_offset, s) + + # Final reading + time.sleep(s['fine_settle_time']) + final_score = self.get_averaged_focus(samples=s['fine_samples']) + self.update_focus_display(final_score) + + return final_score + + def _determine_fine_direction(self, best_score, s): + """Test both directions to find which improves focus""" + # Try DOWN first + self.motion.move_z_down() + time.sleep(s['fine_move_time']) + self.motion.stop_z() + time.sleep(s['fine_settle_time']) + + down_score = self.get_averaged_focus(samples=s['fine_samples']) + + if down_score > best_score: + self.log(f"Fine direction: DOWN ({down_score:.1f})") + return 'down' + + # Go back and try UP + self.motion.move_z_up() + time.sleep(s['fine_move_time']) + self.motion.stop_z() + time.sleep(s['fine_settle_time']) + + self.motion.move_z_up() + time.sleep(s['fine_move_time']) + self.motion.stop_z() + time.sleep(s['fine_settle_time']) + + up_score = self.get_averaged_focus(samples=s['fine_samples']) + + if up_score > best_score: + self.log(f"Fine direction: UP ({up_score:.1f})") + return 'up' + + # Already at peak + self.log("Already at peak, minor adjustment only") + self.motion.move_z_down() + time.sleep(s['fine_move_time']) + self.motion.stop_z() + time.sleep(s['fine_settle_time']) + + return 'up' + + def _fine_search(self, direction, best_score, s): + """Search in given direction until no improvement""" + move_func = self.motion.move_z_up if direction == 'up' else self.motion.move_z_down + + no_improvement_count = 0 + best_position_offset = 0 + + while self.running and no_improvement_count < s['fine_max_no_improvement']: + move_func() + time.sleep(s['fine_move_time']) + self.motion.stop_z() + time.sleep(s['fine_settle_time']) + + current_score = self.get_averaged_focus(samples=s['fine_samples']) + self.update_focus_display(current_score) + + if current_score > best_score: + best_score = current_score + no_improvement_count = 0 + best_position_offset = 0 + self.log(f"Fine better: {current_score:.1f}") + else: + no_improvement_count += 1 + best_position_offset += 1 + + return best_score, best_position_offset + + def _return_to_best(self, direction, steps, s): + """Return to best position by reversing direction""" + self.log(f"Returning {steps} steps") + reverse_func = self.motion.move_z_down if direction == 'up' else self.motion.move_z_up + + for _ in range(steps): + reverse_func() + time.sleep(s['fine_move_time']) + self.motion.stop_z() + time.sleep(0.1) \ No newline at end of file diff --git a/src/gui.py b/src/gui.py index b1db253..09d2e35 100644 --- a/src/gui.py +++ b/src/gui.py @@ -1,3 +1,8 @@ +""" +AutoScope GUI - Pure UI layer. +Delegates logic to controllers. +""" + import tkinter as tk from tkinter import ttk, scrolledtext from PIL import Image, ImageTk @@ -5,18 +10,25 @@ import cv2 import threading import queue +from motion_controller import MotionController +from autofocus import AutofocusController + + class AppGUI: def __init__(self, camera, arduino): self.camera = camera self.arduino = arduino self.running = True - self._updating_slider = False - self.autofocus_running = False - self.autofocus_speed_range = (1, 6) # Speed indices 1-6 (values: 2, 5, 10, 30, 50, 70) + self._updating_slider = False - # Track movement state for toggle buttons - self.axis_direction = {'X': 1, 'Y': 1, 'Z': 1} # 1 = positive, -1 = negative - self.axis_moving = {'X': False, 'Y': False, 'Z': False} + # Initialize controllers + self.motion = MotionController(arduino, on_command_sent=self._on_command_sent) + self.autofocus = AutofocusController( + camera, + self.motion, + on_log=self.log_message, + on_focus_update=self._update_focus_display_threadsafe + ) # Queue for thread-safe serial log updates self.serial_queue = queue.Queue() @@ -24,7 +36,7 @@ class AppGUI: # Build the window self.root = tk.Tk() self.root.title("AutoScope") - self.root.geometry("1080x1080") + self.root.geometry("1080x1080") self.root.minsize(1080, 1080) self.root.protocol("WM_DELETE_WINDOW", self.on_close) @@ -34,8 +46,17 @@ class AppGUI: self.serial_thread = threading.Thread(target=self._serial_reader, daemon=True) self.serial_thread.start() + def _on_command_sent(self, cmd): + """Callback when motion controller sends a command""" + self.log_message(f"> {cmd}") + + def _update_focus_display_threadsafe(self, score): + """Thread-safe focus display update""" + self.root.after(0, lambda: self.focus_score_label.config(text=f"Focus: {score:.1f}")) + + # === UI Building === + def _build_ui(self): - # Main container main_frame = ttk.Frame(self.root) main_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) @@ -48,608 +69,236 @@ class AppGUI: right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True) right_frame.pack_propagate(False) - # === EMERGENCY STOP (prominent at top) === - emergency_frame = ttk.Frame(right_frame) - emergency_frame.pack(fill=tk.X, pady=(0, 10)) + self._build_emergency_stop(right_frame) + self._build_speed_controls(right_frame) + self._build_fine_speed_controls(right_frame) + self._build_movement_controls(right_frame) + self._build_autofocus_controls(right_frame) + self._build_status_controls(right_frame) + self._build_mode_controls(right_frame) + self._build_serial_log(right_frame) + self._build_command_entry(right_frame) + + def _build_emergency_stop(self, parent): + frame = ttk.Frame(parent) + frame.pack(fill=tk.X, pady=(0, 10)) self.emergency_btn = tk.Button( - emergency_frame, - text="⚠ EMERGENCY STOP ⚠", - command=lambda: self.send_command("!"), + frame, + text="⚠ EMERGENCY STOP ⚠", + command=self.motion.stop_all, bg='red', fg='white', font=('Arial', 12, 'bold'), height=2 ) self.emergency_btn.pack(fill=tk.X) + + def _build_speed_controls(self, parent): + frame = ttk.LabelFrame(parent, text="Speed") + frame.pack(fill=tk.X, pady=(0, 10)) - # === SPEED CONTROL === - speed_frame = ttk.LabelFrame(right_frame, text="Speed") - speed_frame.pack(fill=tk.X, pady=(0, 10)) - - speed_btn_frame = ttk.Frame(speed_frame) - speed_btn_frame.pack(fill=tk.X, padx=5, pady=5) + btn_frame = ttk.Frame(frame) + btn_frame.pack(fill=tk.X, padx=5, pady=5) self.speed_var = tk.StringVar(value="Medium") - ttk.Radiobutton(speed_btn_frame, text="Slow", variable=self.speed_var, - value="Slow", command=lambda: self.send_command("L")).pack(side=tk.LEFT, padx=5) - ttk.Radiobutton(speed_btn_frame, text="Medium", variable=self.speed_var, - value="Medium", command=lambda: self.send_command("M")).pack(side=tk.LEFT, padx=5) - ttk.Radiobutton(speed_btn_frame, text="Fast", variable=self.speed_var, - value="Fast", command=lambda: self.send_command("F")).pack(side=tk.LEFT, padx=5) - # === FINE SPEED CONTROL === - fine_speed_frame = ttk.LabelFrame(right_frame, text="Fine Speed Control") - fine_speed_frame.pack(fill=tk.X, pady=(0, 10)) - - # Speed values matching Arduino speedArr (index 0-5) plus 0 for stop - self.speed_values = [0, 2, 5, 10, 30, 50, 70, 100] - - fine_inner = ttk.Frame(fine_speed_frame) - fine_inner.pack(fill=tk.X, padx=5, pady=5) - - # Current speed display - self.fine_speed_label = ttk.Label(fine_inner, text="Speed: 50", font=('Arial', 10, 'bold')) - self.fine_speed_label.pack(anchor=tk.W) - - # Slider - self.speed_slider = ttk.Scale( - fine_inner, - from_=0, - to=6, - orient=tk.HORIZONTAL, - command=self._on_speed_slider_change - ) - self.speed_slider.set(5) # Default to middle - self.speed_slider.pack(fill=tk.X, pady=(5, 0)) - - # Number buttons for direct selection - btn_row = ttk.Frame(fine_inner) - btn_row.pack(fill=tk.X, pady=(5, 0)) - - for i in range(7): - btn = ttk.Button( - btn_row, - text=str(i), - width=2, - command=lambda x=i: self._set_fine_speed(x) - ) - btn.pack(side=tk.LEFT, padx=1) - # === MOVEMENT CONTROLS === - move_frame = ttk.LabelFrame(right_frame, text="Movement Control") - move_frame.pack(fill=tk.X, pady=(0, 10)) + for text, value, preset in [("Slow", "Slow", "slow"), + ("Medium", "Medium", "medium"), + ("Fast", "Fast", "fast")]: + ttk.Radiobutton( + btn_frame, text=text, variable=self.speed_var, + value=value, command=lambda p=preset: self.motion.set_speed_preset(p) + ).pack(side=tk.LEFT, padx=5) + + def _build_fine_speed_controls(self, parent): + frame = ttk.LabelFrame(parent, text="Fine Speed Control") + frame.pack(fill=tk.X, pady=(0, 10)) - # Command mapping for each axis - self.axis_commands = { - 'X': {'pos': 'E', 'neg': 'W', 'stop': 'e'}, - 'Y': {'pos': 'N', 'neg': 'S', 'stop': 'n'}, - 'Z': {'pos': 'U', 'neg': 'D', 'stop': 'u'} - } + inner = ttk.Frame(frame) + inner.pack(fill=tk.X, padx=5, pady=5) + + self.fine_speed_label = ttk.Label(inner, text="Speed: 50", font=('Arial', 10, 'bold')) + self.fine_speed_label.pack(anchor=tk.W) + + self.speed_slider = ttk.Scale(inner, from_=0, to=6, orient=tk.HORIZONTAL) + self.speed_slider.set(5) + self.speed_slider.config(command=self._on_speed_slider_change) + self.speed_slider.pack(fill=tk.X, pady=(5, 0)) + + btn_row = ttk.Frame(inner) + btn_row.pack(fill=tk.X, pady=(5, 0)) + + for i in range(6): + ttk.Button( + btn_row, text=str(i), width=2, + command=lambda x=i: self._set_fine_speed(x) + ).pack(side=tk.LEFT, padx=1) + + def _build_movement_controls(self, parent): + frame = ttk.LabelFrame(parent, text="Movement Control") + frame.pack(fill=tk.X, pady=(0, 10)) self.dir_labels = {} self.move_buttons = {} for axis in ["X", "Y", "Z"]: - row = ttk.Frame(move_frame) + row = ttk.Frame(frame) row.pack(fill=tk.X, pady=3, padx=5) - # Axis label ttk.Label(row, text=f"{axis}:", width=3, font=('Arial', 10, 'bold')).pack(side=tk.LEFT) - # Direction toggle button - dir_btn = ttk.Button(row, text="+ →", width=5, - command=lambda a=axis: self.toggle_direction(a)) + dir_btn = ttk.Button(row, text="+ →", width=5, + command=lambda a=axis: self._toggle_direction(a)) dir_btn.pack(side=tk.LEFT, padx=2) self.dir_labels[axis] = dir_btn - # Start/Stop movement toggle move_btn = tk.Button(row, text="Move", width=8, bg='green', fg='white', - command=lambda a=axis: self.toggle_movement(a)) + command=lambda a=axis: self._toggle_movement(a)) move_btn.pack(side=tk.LEFT, padx=2) self.move_buttons[axis] = move_btn - # Quick stop for this axis - ttk.Button(row, text="Stop", width=6, - command=lambda a=axis: self.stop_axis(a)).pack(side=tk.LEFT, padx=2) + ttk.Button(row, text="Stop", width=6, + command=lambda a=axis: self._stop_axis(a)).pack(side=tk.LEFT, padx=2) - # Stop All button - ttk.Button(move_frame, text="Stop All Axes", - command=lambda: self.send_command("X")).pack(fill=tk.X, padx=5, pady=5) + ttk.Button(frame, text="Stop All Axes", + command=self.motion.stop_all).pack(fill=tk.X, padx=5, pady=5) + + def _build_autofocus_controls(self, parent): + frame = ttk.LabelFrame(parent, text="Autofocus") + frame.pack(fill=tk.X, pady=(0, 10)) - # === AUTOFOCUS CONTROL === - af_frame = ttk.LabelFrame(right_frame, text="Autofocus") - af_frame.pack(fill=tk.X, pady=(0, 10)) - - af_inner = ttk.Frame(af_frame) - af_inner.pack(fill=tk.X, padx=5, pady=5) - - # Focus score display - self.focus_score_label = ttk.Label(af_inner, text="Focus: --", font=('Arial', 10)) + inner = ttk.Frame(frame) + inner.pack(fill=tk.X, padx=5, pady=5) + + self.focus_score_label = ttk.Label(inner, text="Focus: --", font=('Arial', 10)) self.focus_score_label.pack(anchor=tk.W) - - # Autofocus buttons - af_btn_row = ttk.Frame(af_inner) - af_btn_row.pack(fill=tk.X, pady=(5, 0)) - - self.af_button = ttk.Button(af_btn_row, text="Autofocus", command=self.start_autofocus) + + btn_row = ttk.Frame(inner) + btn_row.pack(fill=tk.X, pady=(5, 0)) + + self.af_button = ttk.Button(btn_row, text="Autofocus", command=self._start_autofocus) self.af_button.pack(side=tk.LEFT, padx=2) - - ttk.Button(af_btn_row, text="Coarse AF", command=lambda: self.start_autofocus(coarse=True)).pack(side=tk.LEFT, padx=2) - ttk.Button(af_btn_row, text="Fine AF", command=lambda: self.start_autofocus(fine=True)).pack(side=tk.LEFT, padx=2) - ttk.Button(af_btn_row, text="Stop AF", command=self.stop_autofocus).pack(side=tk.LEFT, padx=2) - - # Live focus monitoring toggle + + ttk.Button(btn_row, text="Coarse AF", + command=lambda: self._start_autofocus(coarse=True)).pack(side=tk.LEFT, padx=2) + ttk.Button(btn_row, text="Fine AF", + command=lambda: self._start_autofocus(fine=True)).pack(side=tk.LEFT, padx=2) + ttk.Button(btn_row, text="Stop AF", + command=self._stop_autofocus).pack(side=tk.LEFT, padx=2) + self.live_focus_var = tk.BooleanVar(value=True) - ttk.Checkbutton(af_inner, text="Show live focus score", - variable=self.live_focus_var).pack(anchor=tk.W, pady=(5, 0)) - - - # === POSITION & STATUS === - pos_frame = ttk.LabelFrame(right_frame, text="Status") - pos_frame.pack(fill=tk.X, pady=(0, 10)) + ttk.Checkbutton(inner, text="Show live focus score", + variable=self.live_focus_var).pack(anchor=tk.W, pady=(5, 0)) + + def _build_status_controls(self, parent): + frame = ttk.LabelFrame(parent, text="Status") + frame.pack(fill=tk.X, pady=(0, 10)) - pos_btn_frame = ttk.Frame(pos_frame) - pos_btn_frame.pack(fill=tk.X, padx=5, pady=5) + btn_frame = ttk.Frame(frame) + btn_frame.pack(fill=tk.X, padx=5, pady=5) - ttk.Button(pos_btn_frame, text="Status", command=lambda: self.send_command("?")).pack(side=tk.LEFT, padx=2) - ttk.Button(pos_btn_frame, text="Zero All", command=lambda: self.send_command("Z")).pack(side=tk.LEFT, padx=2) - ttk.Button(pos_btn_frame, text="Go Origin", command=lambda: self.send_command("G")).pack(side=tk.LEFT, padx=2) + ttk.Button(btn_frame, text="Status", command=self.motion.request_status).pack(side=tk.LEFT, padx=2) + ttk.Button(btn_frame, text="Zero All", command=self.motion.zero_all).pack(side=tk.LEFT, padx=2) + ttk.Button(btn_frame, text="Go Origin", command=self.motion.go_origin).pack(side=tk.LEFT, padx=2) + + def _build_mode_controls(self, parent): + frame = ttk.LabelFrame(parent, text="Control Mode") + frame.pack(fill=tk.X, pady=(0, 10)) - # === MODE CONTROL === - mode_frame = ttk.LabelFrame(right_frame, text="Control Mode") - mode_frame.pack(fill=tk.X, pady=(0, 10)) + btn_frame = ttk.Frame(frame) + btn_frame.pack(fill=tk.X, padx=5, pady=5) - mode_btn_frame = ttk.Frame(mode_frame) - mode_btn_frame.pack(fill=tk.X, padx=5, pady=5) - - self.mode_label = ttk.Label(mode_btn_frame, text="Mode: Serial", font=('Arial', 9)) + self.mode_label = ttk.Label(btn_frame, text="Mode: Serial", font=('Arial', 9)) self.mode_label.pack(side=tk.LEFT, padx=5) - ttk.Button(mode_btn_frame, text="Toggle Mode", command=lambda: self.send_command("T")).pack(side=tk.RIGHT, padx=2) - - # === SERIAL LOG === - ttk.Label(right_frame, text="Serial Log:").pack(anchor=tk.W) - self.serial_log = scrolledtext.ScrolledText(right_frame, height=10, width=35, state=tk.DISABLED) + ttk.Button(btn_frame, text="Toggle Mode", command=self.motion.toggle_mode).pack(side=tk.RIGHT, padx=2) + + def _build_serial_log(self, parent): + ttk.Label(parent, text="Serial Log:").pack(anchor=tk.W) + self.serial_log = scrolledtext.ScrolledText(parent, height=10, width=35, state=tk.DISABLED) self.serial_log.pack(fill=tk.BOTH, expand=True, pady=(0, 10)) - - # === CUSTOM COMMAND === - ttk.Label(right_frame, text="Send Command:").pack(anchor=tk.W) - cmd_frame = ttk.Frame(right_frame) + + def _build_command_entry(self, parent): + ttk.Label(parent, text="Send Command:").pack(anchor=tk.W) + cmd_frame = ttk.Frame(parent) cmd_frame.pack(fill=tk.X) self.cmd_entry = ttk.Entry(cmd_frame) self.cmd_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) - self.cmd_entry.bind("", lambda e: self.send_custom_command()) + self.cmd_entry.bind("", lambda e: self._send_custom_command()) - ttk.Button(cmd_frame, text="Send", command=self.send_custom_command).pack(side=tk.RIGHT, padx=(5, 0)) + ttk.Button(cmd_frame, text="Send", command=self._send_custom_command).pack(side=tk.RIGHT, padx=(5, 0)) - def toggle_direction(self, axis): - """Toggle direction for an axis""" - self.axis_direction[axis] *= -1 - direction = "+" if self.axis_direction[axis] == 1 else "-" - arrow = "→" if self.axis_direction[axis] == 1 else "←" - self.dir_labels[axis].config(text=f"{direction} {arrow}") + # === UI Event Handlers === + + def _toggle_direction(self, axis): + direction = self.motion.toggle_direction(axis) + arrow = "→" if direction == 1 else "←" + sign = "+" if direction == 1 else "-" + self.dir_labels[axis].config(text=f"{sign} {arrow}") - # If currently moving, restart in new direction - if self.axis_moving[axis]: - self.stop_axis(axis) - self.start_movement(axis) + if self.motion.is_moving(axis): + self.motion.stop_axis(axis) + self.motion.start_movement(axis) + self.move_buttons[axis].config(text="Moving", bg='orange') - def toggle_movement(self, axis): - """Toggle movement on/off for an axis""" - if self.axis_moving[axis]: - self.stop_axis(axis) + def _toggle_movement(self, axis): + if self.motion.is_moving(axis): + self._stop_axis(axis) else: - self.start_movement(axis) + self.motion.start_movement(axis) + self.move_buttons[axis].config(text="Moving", bg='orange') - def start_movement(self, axis): - """Start continuous movement on an axis""" - commands = self.axis_commands[axis] - cmd = commands['pos'] if self.axis_direction[axis] == 1 else commands['neg'] - self.send_command(cmd) - self.axis_moving[axis] = True - self.move_buttons[axis].config(text="Moving", bg='orange') - - def stop_axis(self, axis): - """Stop movement on a specific axis""" - commands = self.axis_commands[axis] - self.send_command(commands['stop']) - self.axis_moving[axis] = False + def _stop_axis(self, axis): + self.motion.stop_axis(axis) self.move_buttons[axis].config(text="Move", bg='green') - def send_command(self, cmd): - """Send a command and log it""" - self.log_message(f"> {cmd}") - if self.arduino: - self.arduino.send_command(cmd) - - def send_custom_command(self): - """Send command from entry box""" - cmd = self.cmd_entry.get().strip() - if cmd: - self.send_command(cmd) - self.cmd_entry.delete(0, tk.END) - def _on_speed_slider_change(self, value): - """Handle slider movement""" if self._updating_slider: return - # Round to nearest integer index = round(float(value)) self._set_fine_speed(index) - + def _set_fine_speed(self, index): - """Set fine speed by index (0-10)""" - # Prevent recursion self._updating_slider = True - # Update slider position self.speed_slider.set(index) - - # Update label with actual speed value - speed_val = self.speed_values[index] + speed_val = self.motion.get_speed_value(index) self.fine_speed_label.config(text=f"Speed: {speed_val}") self._updating_slider = False - - # Send command - '0' for index 0, '1'-'9' for 1-9, 'a' for 10 - if index == 0: - self.send_command("0") - elif index <= 9: - self.send_command(str(index)) - else: - # For index 10 (speed 300), you'd need to add a handler on Arduino - # For now, send '9' as max or add a new command - self.send_command("9") - + self.motion.set_speed(index) - - - - - - - - def get_focus_score(self): - """Get current focus score from camera""" - try: - frame = self.camera.capture_frame() - from vision import calculate_focus_score_sobel - score = calculate_focus_score_sobel(frame) - return score - except Exception as e: - print(f"Focus score error: {e}") - return 0 - - def start_autofocus(self, coarse=False, fine=False): - """Start autofocus routine in background thread""" - if self.autofocus_running: - self.log_message("Autofocus already running") - return - - self.autofocus_running = True - self.af_button.config(state='disabled') - - # Determine speed range based on mode - if coarse: - speed_range = (4, 6) # Faster speeds: 30, 50, 70 - elif fine: - speed_range = (1, 3) # Slower speeds: 2, 5, 10 - else: - speed_range = (1, 6) # Full range - - af_thread = threading.Thread( - target=self._autofocus_routine, - args=(speed_range,), - daemon=True - ) - af_thread.start() - - def stop_autofocus(self): - """Stop autofocus routine""" - self.autofocus_running = False - self.send_command('u') # Stop Z axis + def _start_autofocus(self, coarse=False, fine=False): + if self.autofocus.start(coarse=coarse, fine=fine): + self.af_button.config(state='disabled') + + def _stop_autofocus(self): + self.autofocus.stop() self.af_button.config(state='normal') - self.log_message("Autofocus stopped") - - def _get_averaged_focus(self, samples=5, delay_between=0.05): - """ - Get averaged focus score from multiple samples. - Reduces noise in focus measurements. - """ - import time - scores = [] - for _ in range(samples): - scores.append(self.get_focus_score()) - time.sleep(delay_between) - - # Remove outliers (optional - take middle 60%) - scores.sort() - trim = len(scores) // 5 # Remove top and bottom 20% - if trim > 0: - scores = scores[trim:-trim] - - return sum(scores) / len(scores) - - def _autofocus_routine(self, speed_range): - """ - Autofocus using sweep search + hill climbing. - Phase 1: Sweep across range to find approximate peak - Phase 2: Fine tune from best position found - """ - import time - - self.log_message(f"Starting autofocus (speed range: {speed_range})") - - min_speed_idx, max_speed_idx = speed_range - - # Timing settings - SWEEP_MOVE_TIME = 2 # Movement time per sweep step - SWEEP_SETTLE_TIME = 0.25 # Settle time during sweep - SWEEP_SAMPLES = 10 - - FINE_MOVE_TIME = 0.15 - FINE_SETTLE_TIME = 0.25 - FINE_SAMPLES = 10 - - # Number of steps in each direction for sweep - SWEEP_STEPS = 15 # Total sweep = 2 * SWEEP_STEPS positions - - # ======================================== - # Phase 1: Sweep search - # ======================================== - self.log_message("Phase 1: Sweep search") - - # Use medium speed for sweep - sweep_speed = (min_speed_idx + max_speed_idx) // 2 - self._set_fine_speed_no_send(sweep_speed) - self.send_command(str(sweep_speed)) - time.sleep(0.1) - - # Record starting position score - time.sleep(SWEEP_SETTLE_TIME) - start_score = self._get_averaged_focus(samples=SWEEP_SAMPLES) - self._update_focus_display(start_score) - - # Data structure: (steps_from_start, score) - # Positive = up from start, negative = down from start - sweep_data = [(0, start_score)] - self.log_message(f"Start position: {start_score:.1f}") - - if not self.autofocus_running: - return - - # Sweep DOWN first tyo avoid objective / slide contact - self.log_message(f"Sweeping down {SWEEP_STEPS} steps...") - for i in range(1, SWEEP_STEPS + 1): - if not self.autofocus_running: - return - - self.send_command('D') - time.sleep(SWEEP_MOVE_TIME) - self.send_command('d') - time.sleep(SWEEP_SETTLE_TIME) - - score = self._get_averaged_focus(samples=SWEEP_SAMPLES) - sweep_data.append((-i, score)) - self._update_focus_display(score) - - if i % 5 == 0: - self.log_message(f" Step -{i}: {score:.1f}") - - if not self.autofocus_running: - return - - # Return to start - self.log_message("Returning to start...") - for _ in range(SWEEP_STEPS): - if not self.autofocus_running: - return - self.send_command('U') - time.sleep(SWEEP_MOVE_TIME) - self.send_command('u') - time.sleep(0.1) # Faster return, no need to sample - - time.sleep(SWEEP_SETTLE_TIME) - - if not self.autofocus_running: - return - - # Sweep UP - self.log_message(f"Sweeping up {SWEEP_STEPS} steps...") - for i in range(1, SWEEP_STEPS + 1): - if not self.autofocus_running: - return - - self.send_command('U') - time.sleep(SWEEP_MOVE_TIME) - self.send_command('u') - time.sleep(SWEEP_SETTLE_TIME) - - score = self._get_averaged_focus(samples=SWEEP_SAMPLES) - sweep_data.append((i, score)) - self._update_focus_display(score) - - if i % 5 == 0: - self.log_message(f" Step +{i}: {score:.1f}") - - if not self.autofocus_running: - return - - # ======================================== - # Find best position from sweep - # ======================================== - best_step, best_score = max(sweep_data, key=lambda x: x[1]) - self.log_message(f"Best found at step {best_step}: {best_score:.1f}") - - # Log the full sweep curve for debugging - self.log_message("Sweep curve:") - sorted_data = sorted(sweep_data, key=lambda x: x[0]) - curve_str = " ".join([f"{s[1]:.0f}" for s in sorted_data]) - self.log_message(f" {curve_str}") - - # ======================================== - # Move to best position - # ======================================== - # Currently at +SWEEP_STEPS, need to go to best_step - steps_to_move = SWEEP_STEPS - best_step # Positive = move down - - if steps_to_move > 0: - self.log_message(f"Moving down {steps_to_move} steps to best position") - move_cmd, stop_cmd = 'D', 'd' - else: - self.log_message(f"Moving up {abs(steps_to_move)} steps to best position") - move_cmd, stop_cmd = 'U', 'u' - steps_to_move = abs(steps_to_move) - - for _ in range(steps_to_move): - if not self.autofocus_running: - return - self.send_command(move_cmd) - time.sleep(SWEEP_MOVE_TIME) - self.send_command(stop_cmd) - time.sleep(0.1) - - time.sleep(SWEEP_SETTLE_TIME) - current_score = self._get_averaged_focus(samples=SWEEP_SAMPLES) - self.log_message(f"At best position: {current_score:.1f}") - - if not self.autofocus_running: - return - - # ======================================== - # Phase 2: Fine hill-climbing from best position - # ======================================== - self.log_message("Phase 2: Fine tuning") - - fine_speed = min_speed_idx - self._set_fine_speed_no_send(fine_speed) - self.send_command(str(fine_speed)) - time.sleep(0.1) - - best_score = current_score - - # Try both directions to find which improves - # Try UP first - self.send_command('D') - time.sleep(FINE_MOVE_TIME) - self.send_command('d') - time.sleep(FINE_SETTLE_TIME) - - down_score = self._get_averaged_focus(samples=FINE_SAMPLES) - - if down_score > best_score: - fine_direction = 0 # DOWN - best_score = down_score - self.log_message(f"Fine direction: UP ({down_score:.1f})") - else: - # Go back down and try down - self.send_command('U') - time.sleep(FINE_MOVE_TIME) - self.send_command('u') - time.sleep(FINE_SETTLE_TIME) - - self.send_command('U') - time.sleep(FINE_MOVE_TIME) - self.send_command('u') - time.sleep(FINE_SETTLE_TIME) - - up_score = self._get_averaged_focus(samples=FINE_SAMPLES) - - if up_score > best_score: - fine_direction = 1 # UP - best_score = up_score - self.log_message(f"Fine direction: DOWN ({up_score:.1f})") - else: - # We're already at the peak - self.log_message("Already at peak, minor adjustment only") - fine_direction = 1 # Try up anyway - # Go back to center - self.send_command('U') - time.sleep(FINE_MOVE_TIME) - self.send_command('u') - time.sleep(FINE_SETTLE_TIME) - - if not self.autofocus_running: - return - - # Fine search - move_cmd = 'U' if fine_direction else 'D' - stop_cmd = 'u' if fine_direction else 'd' - reverse_cmd = 'D' if fine_direction else 'U' - reverse_stop = 'd' if fine_direction else 'u' - - no_improvement_count = 0 - max_fine_no_improvement = 5 - best_position_offset = 0 - - while self.autofocus_running and no_improvement_count < max_fine_no_improvement: - self.send_command(move_cmd) - time.sleep(FINE_MOVE_TIME) - self.send_command(stop_cmd) - time.sleep(FINE_SETTLE_TIME) - - current_score = self._get_averaged_focus(samples=FINE_SAMPLES) - self._update_focus_display(current_score) - - if current_score > best_score: - best_score = current_score - no_improvement_count = 0 - best_position_offset = 0 - self.log_message(f"Fine better: {current_score:.1f}") - else: - no_improvement_count += 1 - best_position_offset += 1 - - if not self.autofocus_running: - return - - # Return to best fine position - if best_position_offset > 0: - self.log_message(f"Returning {best_position_offset} steps") - for _ in range(best_position_offset): - self.send_command(reverse_cmd) - time.sleep(FINE_MOVE_TIME) - self.send_command(reverse_stop) - time.sleep(0.1) - - # Final reading - time.sleep(FINE_SETTLE_TIME) - final_score = self._get_averaged_focus(samples=FINE_SAMPLES) - self._update_focus_display(final_score) - - self.autofocus_running = False - self.log_message(f"Autofocus complete. Final: {final_score:.1f}") - - self.root.after(0, lambda: self.af_button.config(state='normal')) - + def _send_custom_command(self): + cmd = self.cmd_entry.get().strip() + if cmd: + self.motion.send_command(cmd) + self.cmd_entry.delete(0, tk.END) + + # === Logging === - def _set_fine_speed_no_send(self, index): - """Update slider/label without sending command (for internal use)""" - self._updating_slider = True - self.speed_slider.set(index) - speed_val = self.speed_values[index] - self.fine_speed_label.config(text=f"Speed: {speed_val}") - self._updating_slider = False - - def _update_focus_display(self, score): - """Update focus score display (thread-safe)""" - self.root.after(0, lambda: self.focus_score_label.config(text=f"Focus: {score:.1f}")) - def log_message(self, msg): - """Add message to serial log""" if not hasattr(self, 'serial_log'): return + self.serial_log.config(state=tk.NORMAL) self.serial_log.insert(tk.END, msg + "\n") self.serial_log.see(tk.END) self.serial_log.config(state=tk.DISABLED) print(f" Log: {msg}") - # Check for mode changes in received messages if msg.startswith("< MODE:"): mode = msg.split(":")[-1] self.mode_label.config(text=f"Mode: {mode.capitalize()}") + # === Serial & Camera === + def _serial_reader(self): - """Background thread to read serial data""" while self.running: if self.arduino and self.arduino.ser.in_waiting: try: @@ -661,24 +310,20 @@ class AppGUI: threading.Event().wait(0.05) def _process_serial_queue(self): - """Process queued serial messages (call from main thread)""" while not self.serial_queue.empty(): msg = self.serial_queue.get_nowait() self.log_message(f"< {msg}") def update_camera(self): - """Capture and display one frame""" try: frame = self.camera.capture_frame() - # Show live focus score if enabled - if self.live_focus_var.get(): - score = self._get_averaged_focus() + if self.live_focus_var.get() and not self.autofocus.is_running(): + score = self.autofocus.get_focus_score() self.focus_score_label.config(text=f"Focus: {score:.1f}") frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - # Resize if needed h, w = frame.shape[:2] max_width = 640 if w > max_width: @@ -693,18 +338,24 @@ class AppGUI: except Exception as e: print(f"Camera error: {e}") + # === Main Loop === + def run(self): - """Main loop""" def update(): if self.running: self.update_camera() self._process_serial_queue() + + # Re-enable AF button when done + if not self.autofocus.is_running(): + self.af_button.config(state='normal') + self.root.after(33, update) update() self.root.mainloop() def on_close(self): - """Clean shutdown""" self.running = False + self.autofocus.stop() self.root.destroy() \ No newline at end of file diff --git a/src/motion_controller.py b/src/motion_controller.py new file mode 100644 index 0000000..a979699 --- /dev/null +++ b/src/motion_controller.py @@ -0,0 +1,124 @@ +""" +Motion controller - manages axis movement state and commands. +Abstracts the command protocol from the GUI. +""" + +class MotionController: + # Command mapping for each axis + AXIS_COMMANDS = { + 'X': {'pos': 'E', 'neg': 'W', 'stop': 'e'}, + 'Y': {'pos': 'N', 'neg': 'S', 'stop': 'n'}, + 'Z': {'pos': 'U', 'neg': 'D', 'stop': 'u'} + } + + # Speed values matching Arduino speedArr + SPEED_VALUES = [0, 2, 5, 10, 30, 50, 70] + + def __init__(self, arduino, on_command_sent=None): + """ + Args: + arduino: Arduino instance for sending commands + on_command_sent: Optional callback(cmd) when command is sent + """ + self.arduino = arduino + self.on_command_sent = on_command_sent + + # Axis state + self.axis_direction = {'X': 1, 'Y': 1, 'Z': 1} + self.axis_moving = {'X': False, 'Y': False, 'Z': False} + + # Speed state + self.current_speed_index = 5 + + def send_command(self, cmd): + """Send command to Arduino and notify callback""" + if self.arduino: + self.arduino.send_command(cmd) + if self.on_command_sent: + self.on_command_sent(cmd) + + # === Axis Movement === + + def get_direction(self, axis): + """Get current direction for axis (1 = positive, -1 = negative)""" + return self.axis_direction[axis] + + def toggle_direction(self, axis): + """Toggle direction and return new direction""" + self.axis_direction[axis] *= -1 + return self.axis_direction[axis] + + def is_moving(self, axis): + """Check if axis is currently moving""" + return self.axis_moving[axis] + + def start_movement(self, axis): + """Start continuous movement on an axis""" + commands = self.AXIS_COMMANDS[axis] + cmd = commands['pos'] if self.axis_direction[axis] == 1 else commands['neg'] + self.send_command(cmd) + self.axis_moving[axis] = True + + def stop_axis(self, axis): + """Stop movement on a specific axis""" + commands = self.AXIS_COMMANDS[axis] + self.send_command(commands['stop']) + self.axis_moving[axis] = False + + def stop_all(self): + """Emergency stop all axes""" + self.send_command('!') + for axis in self.axis_moving: + self.axis_moving[axis] = False + + def move_z_up(self): + """Start Z axis moving up""" + self.send_command('U') + + def move_z_down(self): + """Start Z axis moving down""" + self.send_command('D') + + def stop_z(self): + """Stop Z axis""" + self.send_command('u') + + # === Speed Control === + + def get_speed_value(self, index): + """Get actual speed value for index""" + if 0 <= index < len(self.SPEED_VALUES): + return self.SPEED_VALUES[index] + return 0 + + def set_speed(self, index): + """Set speed by index (0-6)""" + self.current_speed_index = index + if index == 0: + self.send_command("0") + elif index <= 6: + self.send_command(str(index)) + + def set_speed_preset(self, preset): + """Set speed by preset name: 'slow', 'medium', 'fast'""" + presets = {'slow': 'L', 'medium': 'M', 'fast': 'F'} + if preset in presets: + self.send_command(presets[preset]) + + # === Status Commands === + + def request_status(self): + """Request status from Arduino""" + self.send_command('?') + + def zero_all(self): + """Zero all axis positions""" + self.send_command('Z') + + def go_origin(self): + """Move to origin position""" + self.send_command('G') + + def toggle_mode(self): + """Toggle between serial/joystick mode""" + self.send_command('T') \ No newline at end of file