commit cf78f42c15a8a9cff4bf85b50c83026798897281 Author: Shaiv Kamat Date: Fri Dec 26 16:29:44 2025 -0800 Base working code with Autofocus diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b8a1259 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +venv +env \ No newline at end of file diff --git a/Workflow.txt b/Workflow.txt new file mode 100644 index 0000000..339078d --- /dev/null +++ b/Workflow.txt @@ -0,0 +1,5 @@ + cd Autoscope + source venv/bin/activate # Enter environment + pip install something # Installs to this project only + python main.py # Runs with this project's packages + deactivate # Exit environment diff --git a/config/settings.py b/config/settings.py new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f747021 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +opencv-python>=4.5.0 +pyserial>=3.5 +numpy>=1.20.0 +Pillow>=1.0.0 \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/__pycache__/arduino.cpython-39.pyc b/src/__pycache__/arduino.cpython-39.pyc new file mode 100644 index 0000000..7e6ec6e Binary files /dev/null and b/src/__pycache__/arduino.cpython-39.pyc differ diff --git a/src/__pycache__/camera.cpython-39.pyc b/src/__pycache__/camera.cpython-39.pyc new file mode 100644 index 0000000..3d2efd3 Binary files /dev/null and b/src/__pycache__/camera.cpython-39.pyc differ diff --git a/src/__pycache__/gui.cpython-39.pyc b/src/__pycache__/gui.cpython-39.pyc new file mode 100644 index 0000000..10adbef Binary files /dev/null and b/src/__pycache__/gui.cpython-39.pyc differ diff --git a/src/__pycache__/vision.cpython-39.pyc b/src/__pycache__/vision.cpython-39.pyc new file mode 100644 index 0000000..f7851d2 Binary files /dev/null and b/src/__pycache__/vision.cpython-39.pyc differ diff --git a/src/arduino.py b/src/arduino.py new file mode 100644 index 0000000..ebee2e1 --- /dev/null +++ b/src/arduino.py @@ -0,0 +1,120 @@ +import serial +import serial.tools.list_ports +import time + +class Arduino: + # Known Arduino USB vendor/product IDs + ARDUINO_IDS = [ + (0x2341, None), # Arduino SA + (0x2A03, None), # Arduino SA (alt) + (0x1A86, 0x7523), # CH340 (common clone chip) + (0x0403, 0x6001), # FTDI FT232 + (0x10C4, 0xEA60), # CP210x + (0x2341, 0x0043), # Arduino Uno + (0x2341, 0x0001), # Arduino Mega + (0x2341, 0x0042), # Arduino Mega 2560 + ] + + def __init__(self, baud=115200): + self.port = self.find_arduino() + + if not self.port: + raise RuntimeError("Could not find Arduino") + + print(f"Connecting to Arduino on {self.port}") + + try: + self.ser = serial.Serial(self.port, baud, timeout=1) + time.sleep(2) # Wait for Arduino reset + self.clear_buffer() + + except FileNotFoundError: + print("Shutting down...") + + @classmethod + def find_arduino(cls): + """ + Search available USB ports for an Arduino. + Returns port path or None if not found. + """ + print("Searching for Arduino...") + + ports = serial.tools.list_ports.comports() + + for port in ports: + print(f" Found: {port.device}") + print(f" VID:PID = {port.vid}:{port.pid}") + print(f" Description: {port.description}") + print(f" Manufacturer: {port.manufacturer}") + + # Check against known Arduino IDs + for vid, pid in cls.ARDUINO_IDS: + if port.vid == vid: + if pid is None or port.pid == pid: + print(f" → Arduino detected on {port.device}") + return port.device + + # Fallback: check description/manufacturer strings + desc = (port.description or "").lower() + mfr = (port.manufacturer or "").lower() + + if any(term in desc or term in mfr for term in + ["arduino", "ch340", "ftdi", "usb-serial", "cp210"]): + print(f" → Probable Arduino on {port.device}") + return port.device + + print(" No Arduino found") + return None + + def clear_buffer(self): + """Clear any pending data in serial buffer""" + self.ser.reset_input_buffer() + self.ser.reset_output_buffer() + + def send_command(self, cmd): + """Send command to Arduino""" + self.ser.write(cmd.encode()) + self.ser.flush() + + def read_response(self, timeout=1.0): + """Read line from Arduino""" + self.ser.timeout = timeout + return self.ser.readline().decode().strip() + + def move(self, axis, direction): + """ + axis: 'X', 'Y', or 'Z' + direction: '+' or '-' + """ + cmd = f"MOVE {axis}{direction}" + self.send_command(cmd) + + def stop(self, axis="ALL"): + """Stop movement. axis: 'X', 'Y', 'Z', or 'ALL'""" + self.send_command(f"STOP {axis}") + + def home_z(self): + """Home Z axis to max limit switch""" + self.send_command("HOME Z") + while True: + response = self.read_response(timeout=10.0) + if "LIMIT Z+" in response: + return True + if not response: + raise RuntimeError("Timeout waiting for Z home") + + def status(self): + """Query Arduino status""" + self.send_command("STATUS?") + return self.read_response() + + def close(self): + """Close serial connection""" + if self.ser and self.ser.is_open: + self.ser.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() \ No newline at end of file diff --git a/src/camera.py b/src/camera.py new file mode 100644 index 0000000..a2e9f30 --- /dev/null +++ b/src/camera.py @@ -0,0 +1,72 @@ +import cv2 + +class Camera: + def __init__(self, device_id=0): + self.cap = cv2.VideoCapture(device_id) + + if not self.cap.isOpened(): + raise RuntimeError("Could not open camera, stop program") + + # set resolution + # self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) + # self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080) + + self.window_name = "AutoScope" + + def capture_frame(self): + ret, frame = self.cap.read() + if not ret: + raise RuntimeError("Failed to capture frame, stop program") + return frame + + def show_frame(self, frame): + """Display frame in window""" + cv2.imshow(self.window_name, frame) + + def wait_key(self, delay_ms=1): + """ + Wait for key press. Required for window to update. + Returns key code or -1 if no key pressed. + """ + return cv2.waitKey(delay_ms) & 0xFF + + def run_preview_frame(self): + frame = self.capture_frame() + self.show_frame(frame) + + key = self.wait_key(1) + + def close_window(self): + """Close the preview window""" + cv2.destroyWindow(self.window_name) + + def release(self): + """Release camera and close windows""" + self.cap.release() + cv2.destroyAllWindows() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.release() + + + # Unusued + def run_preview(self): + """ + Run live preview loop. + Press 'q' to quit. + """ + print("Starting preview. Press 'q' to quit.") + + while True: + frame = self.capture_frame() + self.show_frame(frame) + + key = self.wait_key(1) + if key == ord('q'): + break + + self.close_window() + \ No newline at end of file diff --git a/src/gui.py b/src/gui.py new file mode 100644 index 0000000..b1db253 --- /dev/null +++ b/src/gui.py @@ -0,0 +1,710 @@ +import tkinter as tk +from tkinter import ttk, scrolledtext +from PIL import Image, ImageTk +import cv2 +import threading +import queue + +class AppGUI: + def __init__(self, camera, arduino): + self.camera = camera + self.arduino = arduino + self.running = True + self._updating_slider = False + self.autofocus_running = False + self.autofocus_speed_range = (1, 6) # Speed indices 1-6 (values: 2, 5, 10, 30, 50, 70) + + # Track movement state for toggle buttons + self.axis_direction = {'X': 1, 'Y': 1, 'Z': 1} # 1 = positive, -1 = negative + self.axis_moving = {'X': False, 'Y': False, 'Z': False} + + # Queue for thread-safe serial log updates + self.serial_queue = queue.Queue() + + # Build the window + self.root = tk.Tk() + self.root.title("AutoScope") + self.root.geometry("1080x1080") + self.root.minsize(1080, 1080) + self.root.protocol("WM_DELETE_WINDOW", self.on_close) + + self._build_ui() + + # Start serial reader thread + self.serial_thread = threading.Thread(target=self._serial_reader, daemon=True) + self.serial_thread.start() + + def _build_ui(self): + # Main container + main_frame = ttk.Frame(self.root) + main_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) + + # Left: Camera view + self.camera_label = ttk.Label(main_frame) + self.camera_label.pack(side=tk.LEFT, padx=(0, 5)) + + # Right: Control panel + right_frame = ttk.Frame(main_frame, width=320) + right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True) + right_frame.pack_propagate(False) + + # === EMERGENCY STOP (prominent at top) === + emergency_frame = ttk.Frame(right_frame) + emergency_frame.pack(fill=tk.X, pady=(0, 10)) + + self.emergency_btn = tk.Button( + emergency_frame, + text="⚠ EMERGENCY STOP ⚠", + command=lambda: self.send_command("!"), + bg='red', fg='white', font=('Arial', 12, 'bold'), + height=2 + ) + self.emergency_btn.pack(fill=tk.X) + + # === SPEED CONTROL === + speed_frame = ttk.LabelFrame(right_frame, text="Speed") + speed_frame.pack(fill=tk.X, pady=(0, 10)) + + speed_btn_frame = ttk.Frame(speed_frame) + speed_btn_frame.pack(fill=tk.X, padx=5, pady=5) + + self.speed_var = tk.StringVar(value="Medium") + + ttk.Radiobutton(speed_btn_frame, text="Slow", variable=self.speed_var, + value="Slow", command=lambda: self.send_command("L")).pack(side=tk.LEFT, padx=5) + ttk.Radiobutton(speed_btn_frame, text="Medium", variable=self.speed_var, + value="Medium", command=lambda: self.send_command("M")).pack(side=tk.LEFT, padx=5) + ttk.Radiobutton(speed_btn_frame, text="Fast", variable=self.speed_var, + value="Fast", command=lambda: self.send_command("F")).pack(side=tk.LEFT, padx=5) + # === FINE SPEED CONTROL === + fine_speed_frame = ttk.LabelFrame(right_frame, text="Fine Speed Control") + fine_speed_frame.pack(fill=tk.X, pady=(0, 10)) + + # Speed values matching Arduino speedArr (index 0-5) plus 0 for stop + self.speed_values = [0, 2, 5, 10, 30, 50, 70, 100] + + fine_inner = ttk.Frame(fine_speed_frame) + fine_inner.pack(fill=tk.X, padx=5, pady=5) + + # Current speed display + self.fine_speed_label = ttk.Label(fine_inner, text="Speed: 50", font=('Arial', 10, 'bold')) + self.fine_speed_label.pack(anchor=tk.W) + + # Slider + self.speed_slider = ttk.Scale( + fine_inner, + from_=0, + to=6, + orient=tk.HORIZONTAL, + command=self._on_speed_slider_change + ) + self.speed_slider.set(5) # Default to middle + self.speed_slider.pack(fill=tk.X, pady=(5, 0)) + + # Number buttons for direct selection + btn_row = ttk.Frame(fine_inner) + btn_row.pack(fill=tk.X, pady=(5, 0)) + + for i in range(7): + btn = ttk.Button( + btn_row, + text=str(i), + width=2, + command=lambda x=i: self._set_fine_speed(x) + ) + btn.pack(side=tk.LEFT, padx=1) + # === MOVEMENT CONTROLS === + move_frame = ttk.LabelFrame(right_frame, text="Movement Control") + move_frame.pack(fill=tk.X, pady=(0, 10)) + + # Command mapping for each axis + self.axis_commands = { + 'X': {'pos': 'E', 'neg': 'W', 'stop': 'e'}, + 'Y': {'pos': 'N', 'neg': 'S', 'stop': 'n'}, + 'Z': {'pos': 'U', 'neg': 'D', 'stop': 'u'} + } + + self.dir_labels = {} + self.move_buttons = {} + + for axis in ["X", "Y", "Z"]: + row = ttk.Frame(move_frame) + row.pack(fill=tk.X, pady=3, padx=5) + + # Axis label + ttk.Label(row, text=f"{axis}:", width=3, font=('Arial', 10, 'bold')).pack(side=tk.LEFT) + + # Direction toggle button + dir_btn = ttk.Button(row, text="+ →", width=5, + command=lambda a=axis: self.toggle_direction(a)) + dir_btn.pack(side=tk.LEFT, padx=2) + self.dir_labels[axis] = dir_btn + + # Start/Stop movement toggle + move_btn = tk.Button(row, text="Move", width=8, bg='green', fg='white', + command=lambda a=axis: self.toggle_movement(a)) + move_btn.pack(side=tk.LEFT, padx=2) + self.move_buttons[axis] = move_btn + + # Quick stop for this axis + ttk.Button(row, text="Stop", width=6, + command=lambda a=axis: self.stop_axis(a)).pack(side=tk.LEFT, padx=2) + + # Stop All button + ttk.Button(move_frame, text="Stop All Axes", + command=lambda: self.send_command("X")).pack(fill=tk.X, padx=5, pady=5) + + # === AUTOFOCUS CONTROL === + af_frame = ttk.LabelFrame(right_frame, text="Autofocus") + af_frame.pack(fill=tk.X, pady=(0, 10)) + + af_inner = ttk.Frame(af_frame) + af_inner.pack(fill=tk.X, padx=5, pady=5) + + # Focus score display + self.focus_score_label = ttk.Label(af_inner, text="Focus: --", font=('Arial', 10)) + self.focus_score_label.pack(anchor=tk.W) + + # Autofocus buttons + af_btn_row = ttk.Frame(af_inner) + af_btn_row.pack(fill=tk.X, pady=(5, 0)) + + self.af_button = ttk.Button(af_btn_row, text="Autofocus", command=self.start_autofocus) + self.af_button.pack(side=tk.LEFT, padx=2) + + ttk.Button(af_btn_row, text="Coarse AF", command=lambda: self.start_autofocus(coarse=True)).pack(side=tk.LEFT, padx=2) + ttk.Button(af_btn_row, text="Fine AF", command=lambda: self.start_autofocus(fine=True)).pack(side=tk.LEFT, padx=2) + ttk.Button(af_btn_row, text="Stop AF", command=self.stop_autofocus).pack(side=tk.LEFT, padx=2) + + # Live focus monitoring toggle + self.live_focus_var = tk.BooleanVar(value=True) + ttk.Checkbutton(af_inner, text="Show live focus score", + variable=self.live_focus_var).pack(anchor=tk.W, pady=(5, 0)) + + + # === POSITION & STATUS === + pos_frame = ttk.LabelFrame(right_frame, text="Status") + pos_frame.pack(fill=tk.X, pady=(0, 10)) + + pos_btn_frame = ttk.Frame(pos_frame) + pos_btn_frame.pack(fill=tk.X, padx=5, pady=5) + + ttk.Button(pos_btn_frame, text="Status", command=lambda: self.send_command("?")).pack(side=tk.LEFT, padx=2) + ttk.Button(pos_btn_frame, text="Zero All", command=lambda: self.send_command("Z")).pack(side=tk.LEFT, padx=2) + ttk.Button(pos_btn_frame, text="Go Origin", command=lambda: self.send_command("G")).pack(side=tk.LEFT, padx=2) + + # === MODE CONTROL === + mode_frame = ttk.LabelFrame(right_frame, text="Control Mode") + mode_frame.pack(fill=tk.X, pady=(0, 10)) + + mode_btn_frame = ttk.Frame(mode_frame) + mode_btn_frame.pack(fill=tk.X, padx=5, pady=5) + + self.mode_label = ttk.Label(mode_btn_frame, text="Mode: Serial", font=('Arial', 9)) + self.mode_label.pack(side=tk.LEFT, padx=5) + + ttk.Button(mode_btn_frame, text="Toggle Mode", command=lambda: self.send_command("T")).pack(side=tk.RIGHT, padx=2) + + # === SERIAL LOG === + ttk.Label(right_frame, text="Serial Log:").pack(anchor=tk.W) + self.serial_log = scrolledtext.ScrolledText(right_frame, height=10, width=35, state=tk.DISABLED) + self.serial_log.pack(fill=tk.BOTH, expand=True, pady=(0, 10)) + + # === CUSTOM COMMAND === + ttk.Label(right_frame, text="Send Command:").pack(anchor=tk.W) + cmd_frame = ttk.Frame(right_frame) + cmd_frame.pack(fill=tk.X) + + self.cmd_entry = ttk.Entry(cmd_frame) + self.cmd_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) + self.cmd_entry.bind("", lambda e: self.send_custom_command()) + + ttk.Button(cmd_frame, text="Send", command=self.send_custom_command).pack(side=tk.RIGHT, padx=(5, 0)) + + def toggle_direction(self, axis): + """Toggle direction for an axis""" + self.axis_direction[axis] *= -1 + direction = "+" if self.axis_direction[axis] == 1 else "-" + arrow = "→" if self.axis_direction[axis] == 1 else "←" + self.dir_labels[axis].config(text=f"{direction} {arrow}") + + # If currently moving, restart in new direction + if self.axis_moving[axis]: + self.stop_axis(axis) + self.start_movement(axis) + + def toggle_movement(self, axis): + """Toggle movement on/off for an axis""" + if self.axis_moving[axis]: + self.stop_axis(axis) + else: + self.start_movement(axis) + + def start_movement(self, axis): + """Start continuous movement on an axis""" + commands = self.axis_commands[axis] + cmd = commands['pos'] if self.axis_direction[axis] == 1 else commands['neg'] + self.send_command(cmd) + self.axis_moving[axis] = True + self.move_buttons[axis].config(text="Moving", bg='orange') + + def stop_axis(self, axis): + """Stop movement on a specific axis""" + commands = self.axis_commands[axis] + self.send_command(commands['stop']) + self.axis_moving[axis] = False + self.move_buttons[axis].config(text="Move", bg='green') + + def send_command(self, cmd): + """Send a command and log it""" + self.log_message(f"> {cmd}") + if self.arduino: + self.arduino.send_command(cmd) + + def send_custom_command(self): + """Send command from entry box""" + cmd = self.cmd_entry.get().strip() + if cmd: + self.send_command(cmd) + self.cmd_entry.delete(0, tk.END) + + def _on_speed_slider_change(self, value): + """Handle slider movement""" + if self._updating_slider: + return + # Round to nearest integer + index = round(float(value)) + self._set_fine_speed(index) + + def _set_fine_speed(self, index): + """Set fine speed by index (0-10)""" + # Prevent recursion + self._updating_slider = True + # Update slider position + self.speed_slider.set(index) + + # Update label with actual speed value + speed_val = self.speed_values[index] + self.fine_speed_label.config(text=f"Speed: {speed_val}") + self._updating_slider = False + + # Send command - '0' for index 0, '1'-'9' for 1-9, 'a' for 10 + if index == 0: + self.send_command("0") + elif index <= 9: + self.send_command(str(index)) + else: + # For index 10 (speed 300), you'd need to add a handler on Arduino + # For now, send '9' as max or add a new command + self.send_command("9") + + + + + + + + + + def get_focus_score(self): + """Get current focus score from camera""" + try: + frame = self.camera.capture_frame() + from vision import calculate_focus_score_sobel + score = calculate_focus_score_sobel(frame) + return score + except Exception as e: + print(f"Focus score error: {e}") + return 0 + + def start_autofocus(self, coarse=False, fine=False): + """Start autofocus routine in background thread""" + if self.autofocus_running: + self.log_message("Autofocus already running") + return + + self.autofocus_running = True + self.af_button.config(state='disabled') + + # Determine speed range based on mode + if coarse: + speed_range = (4, 6) # Faster speeds: 30, 50, 70 + elif fine: + speed_range = (1, 3) # Slower speeds: 2, 5, 10 + else: + speed_range = (1, 6) # Full range + + af_thread = threading.Thread( + target=self._autofocus_routine, + args=(speed_range,), + daemon=True + ) + af_thread.start() + + def stop_autofocus(self): + """Stop autofocus routine""" + self.autofocus_running = False + self.send_command('u') # Stop Z axis + self.af_button.config(state='normal') + self.log_message("Autofocus stopped") + + def _get_averaged_focus(self, samples=5, delay_between=0.05): + """ + Get averaged focus score from multiple samples. + Reduces noise in focus measurements. + """ + import time + scores = [] + for _ in range(samples): + scores.append(self.get_focus_score()) + time.sleep(delay_between) + + # Remove outliers (optional - take middle 60%) + scores.sort() + trim = len(scores) // 5 # Remove top and bottom 20% + if trim > 0: + scores = scores[trim:-trim] + + return sum(scores) / len(scores) + + def _autofocus_routine(self, speed_range): + """ + Autofocus using sweep search + hill climbing. + Phase 1: Sweep across range to find approximate peak + Phase 2: Fine tune from best position found + """ + import time + + self.log_message(f"Starting autofocus (speed range: {speed_range})") + + min_speed_idx, max_speed_idx = speed_range + + # Timing settings + SWEEP_MOVE_TIME = 2 # Movement time per sweep step + SWEEP_SETTLE_TIME = 0.25 # Settle time during sweep + SWEEP_SAMPLES = 10 + + FINE_MOVE_TIME = 0.15 + FINE_SETTLE_TIME = 0.25 + FINE_SAMPLES = 10 + + # Number of steps in each direction for sweep + SWEEP_STEPS = 15 # Total sweep = 2 * SWEEP_STEPS positions + + # ======================================== + # Phase 1: Sweep search + # ======================================== + self.log_message("Phase 1: Sweep search") + + # Use medium speed for sweep + sweep_speed = (min_speed_idx + max_speed_idx) // 2 + self._set_fine_speed_no_send(sweep_speed) + self.send_command(str(sweep_speed)) + time.sleep(0.1) + + # Record starting position score + time.sleep(SWEEP_SETTLE_TIME) + start_score = self._get_averaged_focus(samples=SWEEP_SAMPLES) + self._update_focus_display(start_score) + + # Data structure: (steps_from_start, score) + # Positive = up from start, negative = down from start + sweep_data = [(0, start_score)] + self.log_message(f"Start position: {start_score:.1f}") + + if not self.autofocus_running: + return + + # Sweep DOWN first tyo avoid objective / slide contact + self.log_message(f"Sweeping down {SWEEP_STEPS} steps...") + for i in range(1, SWEEP_STEPS + 1): + if not self.autofocus_running: + return + + self.send_command('D') + time.sleep(SWEEP_MOVE_TIME) + self.send_command('d') + time.sleep(SWEEP_SETTLE_TIME) + + score = self._get_averaged_focus(samples=SWEEP_SAMPLES) + sweep_data.append((-i, score)) + self._update_focus_display(score) + + if i % 5 == 0: + self.log_message(f" Step -{i}: {score:.1f}") + + if not self.autofocus_running: + return + + # Return to start + self.log_message("Returning to start...") + for _ in range(SWEEP_STEPS): + if not self.autofocus_running: + return + self.send_command('U') + time.sleep(SWEEP_MOVE_TIME) + self.send_command('u') + time.sleep(0.1) # Faster return, no need to sample + + time.sleep(SWEEP_SETTLE_TIME) + + if not self.autofocus_running: + return + + # Sweep UP + self.log_message(f"Sweeping up {SWEEP_STEPS} steps...") + for i in range(1, SWEEP_STEPS + 1): + if not self.autofocus_running: + return + + self.send_command('U') + time.sleep(SWEEP_MOVE_TIME) + self.send_command('u') + time.sleep(SWEEP_SETTLE_TIME) + + score = self._get_averaged_focus(samples=SWEEP_SAMPLES) + sweep_data.append((i, score)) + self._update_focus_display(score) + + if i % 5 == 0: + self.log_message(f" Step +{i}: {score:.1f}") + + if not self.autofocus_running: + return + + # ======================================== + # Find best position from sweep + # ======================================== + best_step, best_score = max(sweep_data, key=lambda x: x[1]) + self.log_message(f"Best found at step {best_step}: {best_score:.1f}") + + # Log the full sweep curve for debugging + self.log_message("Sweep curve:") + sorted_data = sorted(sweep_data, key=lambda x: x[0]) + curve_str = " ".join([f"{s[1]:.0f}" for s in sorted_data]) + self.log_message(f" {curve_str}") + + # ======================================== + # Move to best position + # ======================================== + # Currently at +SWEEP_STEPS, need to go to best_step + steps_to_move = SWEEP_STEPS - best_step # Positive = move down + + if steps_to_move > 0: + self.log_message(f"Moving down {steps_to_move} steps to best position") + move_cmd, stop_cmd = 'D', 'd' + else: + self.log_message(f"Moving up {abs(steps_to_move)} steps to best position") + move_cmd, stop_cmd = 'U', 'u' + steps_to_move = abs(steps_to_move) + + for _ in range(steps_to_move): + if not self.autofocus_running: + return + self.send_command(move_cmd) + time.sleep(SWEEP_MOVE_TIME) + self.send_command(stop_cmd) + time.sleep(0.1) + + time.sleep(SWEEP_SETTLE_TIME) + current_score = self._get_averaged_focus(samples=SWEEP_SAMPLES) + self.log_message(f"At best position: {current_score:.1f}") + + if not self.autofocus_running: + return + + # ======================================== + # Phase 2: Fine hill-climbing from best position + # ======================================== + self.log_message("Phase 2: Fine tuning") + + fine_speed = min_speed_idx + self._set_fine_speed_no_send(fine_speed) + self.send_command(str(fine_speed)) + time.sleep(0.1) + + best_score = current_score + + # Try both directions to find which improves + # Try UP first + self.send_command('D') + time.sleep(FINE_MOVE_TIME) + self.send_command('d') + time.sleep(FINE_SETTLE_TIME) + + down_score = self._get_averaged_focus(samples=FINE_SAMPLES) + + if down_score > best_score: + fine_direction = 0 # DOWN + best_score = down_score + self.log_message(f"Fine direction: UP ({down_score:.1f})") + else: + # Go back down and try down + self.send_command('U') + time.sleep(FINE_MOVE_TIME) + self.send_command('u') + time.sleep(FINE_SETTLE_TIME) + + self.send_command('U') + time.sleep(FINE_MOVE_TIME) + self.send_command('u') + time.sleep(FINE_SETTLE_TIME) + + up_score = self._get_averaged_focus(samples=FINE_SAMPLES) + + if up_score > best_score: + fine_direction = 1 # UP + best_score = up_score + self.log_message(f"Fine direction: DOWN ({up_score:.1f})") + else: + # We're already at the peak + self.log_message("Already at peak, minor adjustment only") + fine_direction = 1 # Try up anyway + # Go back to center + self.send_command('U') + time.sleep(FINE_MOVE_TIME) + self.send_command('u') + time.sleep(FINE_SETTLE_TIME) + + if not self.autofocus_running: + return + + # Fine search + move_cmd = 'U' if fine_direction else 'D' + stop_cmd = 'u' if fine_direction else 'd' + reverse_cmd = 'D' if fine_direction else 'U' + reverse_stop = 'd' if fine_direction else 'u' + + no_improvement_count = 0 + max_fine_no_improvement = 5 + best_position_offset = 0 + + while self.autofocus_running and no_improvement_count < max_fine_no_improvement: + self.send_command(move_cmd) + time.sleep(FINE_MOVE_TIME) + self.send_command(stop_cmd) + time.sleep(FINE_SETTLE_TIME) + + current_score = self._get_averaged_focus(samples=FINE_SAMPLES) + self._update_focus_display(current_score) + + if current_score > best_score: + best_score = current_score + no_improvement_count = 0 + best_position_offset = 0 + self.log_message(f"Fine better: {current_score:.1f}") + else: + no_improvement_count += 1 + best_position_offset += 1 + + if not self.autofocus_running: + return + + # Return to best fine position + if best_position_offset > 0: + self.log_message(f"Returning {best_position_offset} steps") + for _ in range(best_position_offset): + self.send_command(reverse_cmd) + time.sleep(FINE_MOVE_TIME) + self.send_command(reverse_stop) + time.sleep(0.1) + + # Final reading + time.sleep(FINE_SETTLE_TIME) + final_score = self._get_averaged_focus(samples=FINE_SAMPLES) + self._update_focus_display(final_score) + + self.autofocus_running = False + self.log_message(f"Autofocus complete. Final: {final_score:.1f}") + + self.root.after(0, lambda: self.af_button.config(state='normal')) + + + + def _set_fine_speed_no_send(self, index): + """Update slider/label without sending command (for internal use)""" + self._updating_slider = True + self.speed_slider.set(index) + speed_val = self.speed_values[index] + self.fine_speed_label.config(text=f"Speed: {speed_val}") + self._updating_slider = False + + def _update_focus_display(self, score): + """Update focus score display (thread-safe)""" + self.root.after(0, lambda: self.focus_score_label.config(text=f"Focus: {score:.1f}")) + + def log_message(self, msg): + """Add message to serial log""" + if not hasattr(self, 'serial_log'): + return + self.serial_log.config(state=tk.NORMAL) + self.serial_log.insert(tk.END, msg + "\n") + self.serial_log.see(tk.END) + self.serial_log.config(state=tk.DISABLED) + print(f" Log: {msg}") + + # Check for mode changes in received messages + if msg.startswith("< MODE:"): + mode = msg.split(":")[-1] + self.mode_label.config(text=f"Mode: {mode.capitalize()}") + + def _serial_reader(self): + """Background thread to read serial data""" + while self.running: + if self.arduino and self.arduino.ser.in_waiting: + try: + line = self.arduino.ser.readline().decode().strip() + if line: + self.serial_queue.put(line) + except Exception as e: + self.serial_queue.put(f"[Error: {e}]") + threading.Event().wait(0.05) + + def _process_serial_queue(self): + """Process queued serial messages (call from main thread)""" + while not self.serial_queue.empty(): + msg = self.serial_queue.get_nowait() + self.log_message(f"< {msg}") + + def update_camera(self): + """Capture and display one frame""" + try: + frame = self.camera.capture_frame() + + # Show live focus score if enabled + if self.live_focus_var.get(): + score = self._get_averaged_focus() + self.focus_score_label.config(text=f"Focus: {score:.1f}") + + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + + # Resize if needed + h, w = frame.shape[:2] + max_width = 640 + if w > max_width: + scale = max_width / w + frame = cv2.resize(frame, (int(w * scale), int(h * scale))) + + img = Image.fromarray(frame) + imgtk = ImageTk.PhotoImage(image=img) + + self.camera_label.imgtk = imgtk + self.camera_label.configure(image=imgtk) + except Exception as e: + print(f"Camera error: {e}") + + def run(self): + """Main loop""" + def update(): + if self.running: + self.update_camera() + self._process_serial_queue() + self.root.after(33, update) + + update() + self.root.mainloop() + + def on_close(self): + """Clean shutdown""" + self.running = False + self.root.destroy() \ No newline at end of file diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..82358c5 --- /dev/null +++ b/src/main.py @@ -0,0 +1,27 @@ +from camera import Camera +from arduino import Arduino +from gui import AppGUI + +def main(): + camera = Camera(device_id=0) + + # Arduino is optional - GUI still works without it + try: + arduino = Arduino() + except RuntimeError as e: + print(f"Arduino not found: {e}") + arduino = None + + try: + app = AppGUI(camera, arduino) + app.run() + + except KeyboardInterrupt: + print("Shutting down...") + finally: + if arduino: + arduino.close() + camera.release() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/scanner.py b/src/scanner.py new file mode 100644 index 0000000..e69de29 diff --git a/src/vision.py b/src/vision.py new file mode 100644 index 0000000..879b47e --- /dev/null +++ b/src/vision.py @@ -0,0 +1,21 @@ +import cv2 +import numpy as np + +def calculate_focus_score(frame): + """ + Calculate focus score using Laplacian variance. + """ + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + laplacian = cv2.Laplacian(gray, cv2.CV_64F) + score = laplacian.var() + return score + +def calculate_focus_score_sobel(frame): + """ + Focus score using Sobel gradients. + """ + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + sobel_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3) + sobel_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3) + score = np.mean(sobel_x**2 + sobel_y**2) + return score \ No newline at end of file