<aside> 📌

[과제명][학교][이름] 바꿔주세요, 과제태그를 선생님 설명 듣고 넣어주세요.

</aside>

3️⃣ 웹 AI모델로 피코 제어하기

# 통합된 BLE 서보모터 및 LED 제어 시스템
# ble_advertising.py, ble_simple_peripheral.py, main.py를 하나로 합친 버전

from machine import Pin, PWM
from micropython import const
import bluetooth
import struct
import time
import random

# ========== BLE Advertising 관련 코드 ==========
# Advertising payloads are repeated packets of the following form:
#   1 byte data length (N + 1)
#   1 byte type (see constants below)
#   N bytes type-specific data

_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)
_ADV_TYPE_UUID16_MORE = const(0x2)
_ADV_TYPE_UUID32_MORE = const(0x4)
_ADV_TYPE_UUID128_MORE = const(0x6)
_ADV_TYPE_APPEARANCE = const(0x19)

def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0):
    payload = bytearray()

    def _append(adv_type, value):
        nonlocal payload
        payload += struct.pack("BB", len(value) + 1, adv_type) + value

    _append(
        _ADV_TYPE_FLAGS,
        struct.pack("B", (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)),
    )

    if name:
        _append(_ADV_TYPE_NAME, name)

    if services:
        for uuid in services:
            b = bytes(uuid)
            if len(b) == 2:
                _append(_ADV_TYPE_UUID16_COMPLETE, b)
            elif len(b) == 4:
                _append(_ADV_TYPE_UUID32_COMPLETE, b)
            elif len(b) == 16:
                _append(_ADV_TYPE_UUID128_COMPLETE, b)

    # See org.bluetooth.characteristic.gap.appearance.xml
    if appearance:
        _append(_ADV_TYPE_APPEARANCE, struct.pack("<h", appearance))

    return payload

def decode_field(payload, adv_type):
    i = 0
    result = []
    while i + 1 < len(payload):
        if payload[i + 1] == adv_type:
            result.append(payload[i + 2 : i + payload[i] + 1])
        i += 1 + payload[i]
    return result

def decode_name(payload):
    n = decode_field(payload, _ADV_TYPE_NAME)
    return str(n[0], "utf-8") if n else ""

def decode_services(payload):
    services = []
    for u in decode_field(payload, _ADV_TYPE_UUID16_COMPLETE):
        services.append(bluetooth.UUID(struct.unpack("<h", u)[0]))
    for u in decode_field(payload, _ADV_TYPE_UUID32_COMPLETE):
        services.append(bluetooth.UUID(struct.unpack("<d", u)[0]))
    for u in decode_field(payload, _ADV_TYPE_UUID128_COMPLETE):
        services.append(bluetooth.UUID(u))
    return services

# ========== BLE Simple Peripheral 관련 코드 ==========
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)

_FLAG_READ = const(0x0002)
_FLAG_WRITE_NO_RESPONSE = const(0x0004)
_FLAG_WRITE = const(0x0008)
_FLAG_NOTIFY = const(0x0010)

_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (
    bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
    _FLAG_READ | _FLAG_NOTIFY,
)
_UART_RX = (
    bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
    _FLAG_WRITE | _FLAG_WRITE_NO_RESPONSE,
)
_UART_SERVICE = (
    _UART_UUID,
    (_UART_TX, _UART_RX),
)

class BLESimplePeripheral:
    def __init__(self, ble, name="SJM"):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(self._irq)
        ((self._handle_tx, self._handle_rx),) = self._ble.gatts_register_services((_UART_SERVICE,))
        self._connections = set()
        self._write_callback = None
        self._payload = advertising_payload(name=name, services=[_UART_UUID])
        self._advertise()

    def _irq(self, event, data):
        # Track connections so we can send notifications.
        if event == _IRQ_CENTRAL_CONNECT:
            conn_handle, _, _ = data
            print("New connection", conn_handle)
            self._connections.add(conn_handle)
        elif event == _IRQ_CENTRAL_DISCONNECT:
            conn_handle, _, _ = data
            print("Disconnected", conn_handle)
            self._connections.remove(conn_handle)
            # Start advertising again to allow a new connection.
            self._advertise()
        elif event == _IRQ_GATTS_WRITE:
            conn_handle, value_handle = data
            value = self._ble.gatts_read(value_handle)
            if value_handle == self._handle_rx and self._write_callback:
                self._write_callback(value)

    def send(self, data):
        for conn_handle in self._connections:
            self._ble.gatts_notify(conn_handle, self._handle_tx, data)

    def is_connected(self):
        return len(self._connections) > 0

    def _advertise(self, interval_us=500000):
        print("Starting advertising")
        self._ble.gap_advertise(interval_us, adv_data=self._payload)

    def on_write(self, callback):
        self._write_callback = callback

# ========== 간단한 큐 클래스 ==========
class SimpleQueue:
    def __init__(self, maxsize):
        self.queue = []
        self.maxsize = maxsize

    def put(self, item):
        if len(self.queue) < self.maxsize:
            self.queue.append(item)
        else:
            raise OverflowError("Queue is full")

    def get(self):
        if len(self.queue) > 0:
            return self.queue.pop(0)
        else:
            raise IndexError("Queue is empty")

    def empty(self):
        return len(self.queue) == 0

    def full(self):
        return len(self.queue) >= self.maxsize

# ========== 제어 명령 처리 함수들 (위쪽 배치) ==========
def handle_servo_left():
    """a: 서보모터1 왼쪽으로"""
    global servo1_angle
    servo1_angle = max(0, servo1_angle - 10)
    servo1.duty_u16(angle_to_duty(servo1_angle))
    print(f"Servo1 left: {servo1_angle}°")

def handle_servo_right():
    """d: 서보모터1 오른쪽으로"""
    global servo1_angle
    servo1_angle = min(180, servo1_angle + 10)
    servo1.duty_u16(angle_to_duty(servo1_angle))
    print(f"Servo1 right: {servo1_angle}°")

def handle_servo2_up():
    """w: 서보모터2 위로"""
    global servo2_angle
    servo2_angle = min(180, servo2_angle + 10)
    servo2.duty_u16(angle_to_duty(servo2_angle))
    print(f"Servo2 up: {servo2_angle}°")

def handle_servo2_down():
    """s: 서보모터2 아래로"""
    global servo2_angle
    servo2_angle = max(0, servo2_angle - 10)
    servo2.duty_u16(angle_to_duty(servo2_angle))
    print(f"Servo2 down: {servo2_angle}°")

def handle_key_1():
    """1: 내부 LED 켜기"""
    onboard_led.on()
    print("Onboard LED ON")

def handle_key_2():
    """2: 내부 LED 끄기"""
    onboard_led.off()
    print("Onboard LED OFF")

def handle_key_3():
    """3: 외부 LED 깜빡이기"""
    global led_blink_active
    led_blink_active = not led_blink_active
    if not led_blink_active:
        external_led.off()
    print(f"External LED blinking: {'ON' if led_blink_active else 'OFF'}")

def handle_key_4():
    """4: RGB LED 빨간색"""
    rgb_red.duty_u16(65535)
    rgb_green.duty_u16(0)
    rgb_blue.duty_u16(0)
    print("RGB LED: Red")

def handle_key_5():
    """5: RGB LED 초록색"""
    rgb_red.duty_u16(0)
    rgb_green.duty_u16(65535)
    rgb_blue.duty_u16(0)
    print("RGB LED: Green")

def handle_key_6():
    """6: RGB LED 파란색"""
    rgb_red.duty_u16(0)
    rgb_green.duty_u16(0)
    rgb_blue.duty_u16(65535)
    print("RGB LED: Blue")

def handle_key_7():
    """7: RGB LED 색상 순환"""
    global rgb_cycle_active
    rgb_cycle_active = not rgb_cycle_active
    print(f"RGB LED cycling: {'ON' if rgb_cycle_active else 'OFF'}")

def handle_key_8():
    """8: 예약"""
    pass

def handle_key_9():
    """9: 예약"""
    pass

def handle_reset_all():
    """x: 모두 초기화"""
    global servo1_angle, servo2_angle, led_blink_active, rgb_cycle_active
    
    # 모든 활동 중지
    led_blink_active = False
    rgb_cycle_active = False
    
    # 서보모터 90도로 초기화
    servo1_angle = 90
    servo2_angle = 90
    servo1.duty_u16(angle_to_duty(servo1_angle))
    servo2.duty_u16(angle_to_duty(servo2_angle))
    
    # 모든 LED 끄기
    onboard_led.off()
    external_led.off()
    rgb_red.duty_u16(0)
    rgb_green.duty_u16(0)
    rgb_blue.duty_u16(0)
    
    print("All systems reset")

# ========== 메인 애플리케이션 코드 ==========
# BLE 초기화
ble = bluetooth.BLE()
sp = BLESimplePeripheral(ble)

# 핀 초기화
# 서보모터 (변경된 핀)
servo1 = PWM(Pin(15))  # 서보모터1
servo2 = PWM(Pin(16))  # 서보모터2
servo1.freq(50)
servo2.freq(50)

# LED 핀
onboard_led = Pin("LED", Pin.OUT)  # 내장 LED (Pico W의 경우)
external_led = Pin(1, Pin.OUT)    # 외부 LED

# RGB LED 핀 (변경된 핀)
rgb_red = PWM(Pin(19))
rgb_green = PWM(Pin(20))
rgb_blue = PWM(Pin(21))
rgb_red.freq(1000)
rgb_green.freq(1000)
rgb_blue.freq(1000)

# 서보모터 각도 상태
servo1_angle = 90
servo2_angle = 90

# 제어 상태 변수
led_blink_active = False
rgb_cycle_active = False
led_blink_state = False
rgb_color_index = 0

# 타이밍 제어 변수
last_led_blink = 0
last_rgb_change = 0

# 데이터 처리 큐
data_queue = SimpleQueue(maxsize=10)

# RGB 색상 배열
rgb_colors = [
    (65535, 0, 0),      # 빨강
    (0, 65535, 0),      # 초록
    (0, 0, 65535),      # 파랑
    (65535, 65535, 0),  # 노랑
    (65535, 0, 65535),  # 자홍
    (0, 65535, 65535),  # 청록
    (32768, 32768, 32768), # 흰색
]

# 각도를 서보모터의 듀티 값으로 변환
def angle_to_duty(angle):
    return int(3277 + (angle * 3277 / 90))  # 0° = 3277 (~0.5ms), 180° = 6553 (~2.5ms)

# 주기적 작업 처리
def handle_periodic_tasks():
    global led_blink_active, led_blink_state, last_led_blink
    global rgb_cycle_active, rgb_color_index, last_rgb_change
    
    current_time = time.ticks_ms()
    
    # LED 깜빡이기 처리 (500ms 간격)
    if led_blink_active and time.ticks_diff(current_time, last_led_blink) > 500:
        led_blink_state = not led_blink_state
        external_led.value(led_blink_state)
        last_led_blink = current_time
    
    # RGB LED 색상 변경 처리 (1000ms 간격)
    if rgb_cycle_active and time.ticks_diff(current_time, last_rgb_change) > 1000:
        r, g, b = rgb_colors[rgb_color_index]
        rgb_red.duty_u16(r)
        rgb_green.duty_u16(g)
        rgb_blue.duty_u16(b)
        rgb_color_index = (rgb_color_index + 1) % len(rgb_colors)
        last_rgb_change = current_time

# 수신 데이터 처리
def process_data():
    while not data_queue.empty():
        try:
            data = data_queue.get()
            print("Processing data:", data)
            
            # 단일 문자 명령 처리
            if len(data) == 1:
                cmd = chr(data[0])
                
                # 서보모터 제어
                if cmd == 'a':
                    handle_servo_left()
                elif cmd == 'd':
                    handle_servo_right()
                elif cmd == 'w':
                    handle_servo2_up()
                elif cmd == 's':
                    handle_servo2_down()
                
                # 숫자 키 1-9
                elif cmd == '1':
                    handle_key_1()
                elif cmd == '2':
                    handle_key_2()
                elif cmd == '3':
                    handle_key_3()
                elif cmd == '4':
                    handle_key_4()
                elif cmd == '5':
                    handle_key_5()
                elif cmd == '6':
                    handle_key_6()
                elif cmd == '7':
                    handle_key_7()
                elif cmd == '8':
                    handle_key_8()
                elif cmd == '9':
                    handle_key_9()
                
                # 리셋
                elif cmd == 'x':
                    handle_reset_all()
                    
        except Exception as e:
            print("Error processing data:", e)

# BLE 데이터 수신 핸들러
def on_rx(data):
    try:
        print("Received:", data)
        if not data_queue.full():
            data_queue.put(data)  # 데이터를 큐에 저장
        else:
            print("Data queue is full, dropping data")
    except Exception as e:
        print("Error receiving data:", e)

# 초기화
handle_reset_all()  # 모든 시스템 초기화

print("BLE Controller Started")
print("=== Control Commands ===")
print("Servo Control:")
print("  a: Servo1 Left (-10°)")
print("  d: Servo1 Right (+10°)")
print("  w: Servo2 Up (+10°)")
print("  s: Servo2 Down (-10°)")
print("LED Control:")
print("  1: Onboard LED ON")
print("  2: Onboard LED OFF")
print("  3: External LED Blink Toggle")
print("RGB LED Control:")
print("  4: RGB Red")
print("  5: RGB Green")
print("  6: RGB Blue")
print("  7: RGB Cycle Toggle")
print("Reserved:")
print("  8: (Reserved)")
print("  9: (Reserved)")
print("System:")
print("  x: Reset All")
print("Waiting for connections...")

# 메인 루프
while True:
    if sp.is_connected():
        sp.on_write(on_rx)  # BLE 데이터 수신
        process_data()      # 큐에 있는 데이터를 처리
        handle_periodic_tasks()  # 주기적 작업 처리
    else:
        time.sleep(0.1)      # 연결되지 않은 경우 대기

2️⃣ 미디어파이프 웹에서 불러오기

image.png

https://codepen.io/kfuevrck-the-reactor/full/yyNjVoX

image.png

image.png

image.png

1️⃣ 코드펜으로 나만의 웹사이트 제작하기

image.png

image.png