########################################################### # # # OpenLaserTag remote control # # OLT protocol # # Tomas Krejci [Njord] # # tk@tomaskrejci.com # # # # version 1.0 # # 2024-06-22 # # # ########################################################### import machine, time, re from sys import platform from primitives import set_global_exception import primitives import uasyncio as asyncio ESP32 = platform == "esp32" RP2 = platform == "rp2" PYBOARD = platform == "pyboard" #### Other imports ### from primitives.delay_ms import Delay_ms import primitives.queue as queue from primitives import Switch, Pushbutton from rgb import RGB #### onboard LED #### if ESP32 or RP2: from machine import Pin else: from pyb import Pin, LED #### Device ID ### device_id_str = "" for b in machine.unique_id(): device_id_str += "{:02X}".format(b) device_id_str = device_id_str[-4:] # Keep last 8 char device_id = int(device_id_str, 16) # Convert to int from hex in string ### Piezo ### piezo_pin = machine.Pin(15, machine.Pin.OUT, value=0) async def piezo_beep(sec=0.3): piezo_pin.value(1) await asyncio.sleep(sec) piezo_pin.value(0) async def piezo_short_beep(): piezo_pin.value(1) await asyncio.sleep(0.1) piezo_pin.value(0) #### Voltage monitor ### from machine import ADC from primitives import AADC # Define ADC battery pin according to platform if ESP32: battery_ADC = ADC(28) elif RP2: battery_ADC = ADC(28) else: battery_ADC = ADC("X2") aadc = AADC(battery_ADC) async def battery_monitor(aadc): while True: aadc.sense(normal=False) # value = await aadc(25_000, 39_000) # Wait until in range # logger.debug("In range:", value) # aadc.sense(normal=True) # value = await aadc() # Wait until out of range # logger.debug("Out of range:", value) voltage = aadc.read_u16() * ( 3.3 / 65535 ) # reference voltage (3.3V) divided by the maximum ADC value (65535) voltage = round(voltage, 2) capacity = voltage_to_capacity(voltage / 2) logger.info(f"Battery Voltage: {voltage:.2f}V, Capacity: {capacity:.0f}%") uart.write(f"BATTERY: Voltage: {voltage:.2f}V, Capacity: {capacity:.0f}%\n") await asyncio.sleep_ms(10000) # simple voltage to capacity, model for standart one cell alkaline batteries def voltage_to_capacity(voltage): if voltage >= 1.6: return 100 elif 1.5 <= voltage < 1.6: return 90 + (voltage - 1.5) * 100 elif 1.4 <= voltage < 1.5: return 70 + (voltage - 1.4) * 200 elif 1.3 <= voltage < 1.4: return 40 + (voltage - 1.3) * 300 elif 1.2 <= voltage < 1.3: return 10 + (voltage - 1.2) * 300 elif 1.0 <= voltage < 1.2: return (voltage - 1.0) * 50 else: return 0 #### BLE UART ### import ble_uart import bluetooth import time async def ble_rx_handler(): global rgb1 message = uart.read().decode().strip() logger.debug(f"BLE: {message}") try: if "bomb" in message: logger.info("BLE: Bomb Explode send to IR") for i in range(5): asyncio.create_task( olt_sent_ir(ir_tx, COMMANDS[1][1], COMMANDS[1][2], COMMANDS[1][3]) ) await asyncio.sleep_ms(100) asyncio.create_task(piezo_beep()) uart.write("BLE: Bomb Explode send to IR\n") asyncio.create_task(rgb1.set_red()) await asyncio.sleep_ms(100) while True: time.sleep_ms(3000) logger.info( "BLE: Bomb Exploded, Device is LOCKED, please restart Device" ) uart.write( "BLE: Bomb Exploded, Device is LOCKED\n Please restart Device\n" ) await piezo_short_beep() elif "led_on" in message: logger.info("BLE: LED ON") asyncio.create_task(rgb1.set_red()) uart.write("BLE: LED ON\n") elif "led_off" in message: logger.info("BLE: LED OFF") asyncio.create_task(rgb1.off()) uart.write("BLE: LED OFF\n") else: message_int = int(message, 16) logger.debug(f"BLE: {message_int:08x}") ## if recive from BLE UART any hex number senend it as custom command to IR tx1 = (message_int >> 16) & 0xFF tx2 = (message_int >> 8) & 0xFF tx3 = message_int & 0xFF asyncio.create_task(olt_sent_ir(ir_tx, tx1, tx2, tx3)) ## send info back to BLE uart.write(f"BLE: uart recive 0x{message_int:08x}\n") except TypeError: logger.debug("BLE: TypeError") pass except ValueError: logger.debug("BLE: ValueError") pass #### OLT IR ### COMMAND_MASK = 0x800000 DEMAGE = [0, 1, 4, 5, 7, 10, 15, 17, 20, 25, 30, 35, 40, 50, 75, 100] TEAM = ["RED", "BLUE", "YELLOW", "GREEN"] #### OLT IR RX imports ### import ustruct from sys import platform import time import gc from machine import Pin, freq from olt_lib.ir_rx.print_error import print_error # Optional print of error codes from olt_lib.ir_rx.olt import LT_24 as LT_24_RX # Import all implemented classes # Define IR RX pin according to platform if platform == "pyboard": ir_rx_pin = Pin("X3", Pin.IN) elif platform == "esp8266": freq(160000000) ir_rx_pin = Pin(13, Pin.IN) elif platform == "esp32" or platform == "esp32_LoBo": ir_rx_pin = Pin(23, Pin.IN) elif platform == "rp2": ir_rx_pin = Pin(16, Pin.IN, Pin.PULL_UP) #### OLT IR TX imports ### from sys import platform ESP32 = platform == "esp32" RP2 = platform == "rp2" PYBOARD = platform == "pyboard" if ESP32 or RP2: from machine import Pin else: from pyb import Pin, LED import uasyncio as asyncio from primitives.switch import Switch from primitives.delay_ms import Delay_ms # Import all implemented classes from olt_lib.ir_tx.olt import LT_24 as LT_24_TX # Define IR TX pin according to platform if ESP32: # Pins for IR LED gate ir_tx_pin = Pin(23, Pin.OUT, value=0) elif RP2: ir_tx_pin = Pin(17, Pin.OUT, value=0) else: ir_tx_pin = Pin("X1") #### OLT Commands #### COMMAND_END = 0xE8 COMMANDS = [ ["NewGame", 0x83, 0x05, COMMAND_END], # ["AdminKill", 0x83, 0x00, COMMAND_END], ["Explode", 0x83, 0x0B, COMMAND_END], ["TestShot_ID8_RED_25", 0x08, 0x24, 0x00], ["SensorTest", 0x83, 0x15, COMMAND_END], ] #### Logging #### import logging global logger init_ticks = time.ticks_ms() # change logging level here, logging.ERROR is default, logging.DEBUG for more info, and reset device # log_level = logging.DEBUG log_level = logging.INFO # log_level = logging.ERROR logging.basicConfig(level=log_level, format="%(name)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) ### Alive check ### async def alive_check(): # loop for checked is alive logger.debug("alive check start") onboar_led = machine.Pin("LED", machine.Pin.OUT) # Pi Pico onboard LED while True: logger.debug(f"running ms: {time.ticks_ms() - init_ticks}") onboar_led.on() await asyncio.sleep_ms(100) onboar_led.off() await asyncio.sleep_ms(900) ### OLT Command ### async def olt_sent_ir(ir_tx, tx1, tx2, tx3): # IR TX send logger.debug(f"Command IR TX transmit") asyncio.create_task(ir_tx.transmit(tx1, tx2, tx3, False)) await asyncio.sleep_ms(10) async def olt_command(cmd, btn, rgb, ir_tx, tx1, tx2, tx3): logger.debug(f"Init OLT game command {cmd}") await rgb.set_red() btn.press_func(None) btn.release_func(None) while True: btn.press.clear() btn.release.clear() await btn.press.wait() logger.debug(f"Command {cmd} button pressed") asyncio.create_task(piezo_short_beep()) await rgb.set_green() # IR TX send asyncio.create_task(olt_sent_ir(ir_tx, tx1, tx2, tx3)) # logger.debug(f"Command {cmd} IR TX transmit") # asyncio.create_task(ir_tx.transmit(tx1, tx2, tx3, False)) # await asyncio.sleep_ms(50) await btn.release.wait() logger.debug(f"Command {cmd} button released") # await rgb.set_red() # IR RX callback async def cb(byte1, byte2, byte3, packet): # logger.debug(f"{(packet & COMMAND_MASK):0X}") if packet & COMMAND_MASK: logger.debug("Command packet") print( f"byte1 0x{byte1:02x} byte2 0x{byte2:02x} byte3 0x{byte3:02x} packet 0x{packet:06x}" ) uart.write(f"IR RX Command 0x{packet:06x}\n") else: logger.debug("Shot packet") print( f"byte1 0x{byte1:02x} byte2 0x{byte2:02x} byte3 0x{byte3:02x} packet 0x{packet:06x}" ) uart.write(f"IR RX Shot 0x{packet:06x}\n") packet = packet >> 10 id = packet >> 6 team = (packet >> 4) & 0x3 demage = packet & 0xF logger.debug( f"Shot: player ID:{id}, team {TEAM[team]}, demage {DEMAGE[demage]}" ) uart.write( f"Shot: player ID:{id}, team {TEAM[team]}, demage {DEMAGE[demage]}\n" ) await rgb1.set_blue() await asyncio.sleep_ms(500) await rgb1.set_red() ### main ### async def main(proto): set_global_exception() # enable a global exception handler to simplify debugging logger.debug("main start") print(f"Device ID: {device_id_str}") ### alive check ### asyncio.create_task(alive_check()) ### battery monitor ### # aadc = AADC() asyncio.create_task(battery_monitor(aadc)) ### RGB LED ### logger.debug("RGB init") global rgb1 rgb1 = RGB() ### OpenLaserTag IR RX ### logger.debug("OLT IR RX init") ir_rx = LT_24_RX(ir_rx_pin, cb) # Instantiate receiver ir_rx.error_function(print_error) # Show debug information ### OpenLaserTag IR TX ### logger.debug("OLT IR TX init") global ir_tx ir_tx = LT_24_TX(ir_tx_pin, 56000) # My decoder chip is 56KHz # Uncomment the following to print transmit timing ir_tx.timeit = True ### BLUETOOTH UART ### ble = bluetooth.BLE() global uart uart = ble_uart.Ble_uart( ble, on_rx=ble_rx_handler, name="OpenLaserTag " + device_id_str ) ### Buttons ### logger.debug("Button init") PINS = ((COMMANDS[0], 18), (COMMANDS[1], 21), (COMMANDS[2], 20), (COMMANDS[3], 19)) for cmd, pin in PINS: btn = Pushbutton(machine.Pin(pin, machine.Pin.IN, machine.Pin.PULL_UP)) command, tx1, tx2, tx3 = cmd logger.debug( f"New command task: {command}, IR 0x{tx1:02X} 0x{tx2:02X} 0x{tx3:02X}, btn: {btn._pin} " ) asyncio.create_task(olt_command(command, btn, rgb1, ir_tx, tx1, tx2, tx3)) asyncio.create_task(piezo_short_beep()) # piezo_beep() ### Main loop ### while True: await asyncio.sleep(1) def exception_handler(loop, context): # log exception print(context["exception"]) ### asyncio start main ### try: asyncio.run(main(0)) except KeyboardInterrupt: logger.exception("App Interrupted") finally: asyncio.new_event_loop() logger.exception("End")