401 lines
11 KiB
Python
401 lines
11 KiB
Python
###########################################################
|
|
# #
|
|
# OpenLaserTag remote control #
|
|
# OLT protocol #
|
|
# Tomas Krejci [Njord] #
|
|
# tk@tomaskrejci.com #
|
|
# #
|
|
# version 1.0 #
|
|
# 2024-06-22 #
|
|
# #
|
|
###########################################################
|
|
|
|
import machine, time, re
|
|
from sys import platform
|
|
from primitives import set_global_exception
|
|
import primitives
|
|
import uasyncio as asyncio
|
|
|
|
ESP32 = platform == "esp32"
|
|
RP2 = platform == "rp2"
|
|
PYBOARD = platform == "pyboard"
|
|
|
|
#### Other imports ###
|
|
from primitives.delay_ms import Delay_ms
|
|
import primitives.queue as queue
|
|
from primitives import Switch, Pushbutton
|
|
from rgb import RGB
|
|
|
|
#### onboard LED ####
|
|
if ESP32 or RP2:
|
|
from machine import Pin
|
|
else:
|
|
from pyb import Pin, LED
|
|
|
|
#### Device ID ###
|
|
device_id_str = ""
|
|
for b in machine.unique_id():
|
|
device_id_str += "{:02X}".format(b)
|
|
|
|
device_id_str = device_id_str[-4:] # Keep last 8 char
|
|
device_id = int(device_id_str, 16) # Convert to int from hex in string
|
|
|
|
### Piezo ###
|
|
piezo_pin = machine.Pin(15, machine.Pin.OUT, value=0)
|
|
|
|
|
|
async def piezo_beep(sec=0.3):
|
|
piezo_pin.value(1)
|
|
await asyncio.sleep(sec)
|
|
piezo_pin.value(0)
|
|
|
|
|
|
async def piezo_short_beep():
|
|
piezo_pin.value(1)
|
|
await asyncio.sleep(0.1)
|
|
piezo_pin.value(0)
|
|
|
|
|
|
#### Voltage monitor ###
|
|
from machine import ADC
|
|
from primitives import AADC
|
|
|
|
# Define ADC battery pin according to platform
|
|
if ESP32:
|
|
battery_ADC = ADC(28)
|
|
elif RP2:
|
|
battery_ADC = ADC(28)
|
|
else:
|
|
battery_ADC = ADC("X2")
|
|
|
|
aadc = AADC(battery_ADC)
|
|
|
|
|
|
async def battery_monitor(aadc):
|
|
while True:
|
|
aadc.sense(normal=False)
|
|
# value = await aadc(25_000, 39_000) # Wait until in range
|
|
# logger.debug("In range:", value)
|
|
# aadc.sense(normal=True)
|
|
# value = await aadc() # Wait until out of range
|
|
# logger.debug("Out of range:", value)
|
|
|
|
voltage = aadc.read_u16() * (
|
|
3.3 / 65535
|
|
) # reference voltage (3.3V) divided by the maximum ADC value (65535)
|
|
voltage = round(voltage, 2)
|
|
capacity = voltage_to_capacity(voltage / 2)
|
|
logger.info(f"Battery Voltage: {voltage:.2f}V, Capacity: {capacity:.0f}%")
|
|
uart.write(f"BATTERY: Voltage: {voltage:.2f}V, Capacity: {capacity:.0f}%\n")
|
|
await asyncio.sleep_ms(10000)
|
|
|
|
|
|
# simple voltage to capacity, model for standart one cell alkaline batteries
|
|
def voltage_to_capacity(voltage):
|
|
if voltage >= 1.6:
|
|
return 100
|
|
elif 1.5 <= voltage < 1.6:
|
|
return 90 + (voltage - 1.5) * 100
|
|
elif 1.4 <= voltage < 1.5:
|
|
return 70 + (voltage - 1.4) * 200
|
|
elif 1.3 <= voltage < 1.4:
|
|
return 40 + (voltage - 1.3) * 300
|
|
elif 1.2 <= voltage < 1.3:
|
|
return 10 + (voltage - 1.2) * 300
|
|
elif 1.0 <= voltage < 1.2:
|
|
return (voltage - 1.0) * 50
|
|
else:
|
|
return 0
|
|
|
|
|
|
#### BLE UART ###
|
|
import ble_uart
|
|
import bluetooth
|
|
import time
|
|
|
|
|
|
async def ble_rx_handler():
|
|
global rgb1
|
|
message = uart.read().decode().strip()
|
|
logger.debug(f"BLE: {message}")
|
|
|
|
try:
|
|
if "bomb" in message:
|
|
logger.info("BLE: Bomb Explode send to IR")
|
|
for i in range(5):
|
|
asyncio.create_task(
|
|
olt_sent_ir(ir_tx, COMMANDS[1][1], COMMANDS[1][2], COMMANDS[1][3])
|
|
)
|
|
await asyncio.sleep_ms(100)
|
|
asyncio.create_task(piezo_beep())
|
|
uart.write("BLE: Bomb Explode send to IR\n")
|
|
asyncio.create_task(rgb1.set_red())
|
|
await asyncio.sleep_ms(100)
|
|
while True:
|
|
time.sleep_ms(3000)
|
|
logger.info(
|
|
"BLE: Bomb Exploded, Device is LOCKED, please restart Device"
|
|
)
|
|
uart.write(
|
|
"BLE: Bomb Exploded, Device is LOCKED\n Please restart Device\n"
|
|
)
|
|
await piezo_short_beep()
|
|
|
|
elif "led_on" in message:
|
|
logger.info("BLE: LED ON")
|
|
asyncio.create_task(rgb1.set_red())
|
|
uart.write("BLE: LED ON\n")
|
|
|
|
elif "led_off" in message:
|
|
logger.info("BLE: LED OFF")
|
|
asyncio.create_task(rgb1.off())
|
|
uart.write("BLE: LED OFF\n")
|
|
|
|
else:
|
|
message_int = int(message, 16)
|
|
logger.debug(f"BLE: {message_int:08x}")
|
|
|
|
## if recive from BLE UART any hex number senend it as custom command to IR
|
|
tx1 = (message_int >> 16) & 0xFF
|
|
tx2 = (message_int >> 8) & 0xFF
|
|
tx3 = message_int & 0xFF
|
|
asyncio.create_task(olt_sent_ir(ir_tx, tx1, tx2, tx3))
|
|
|
|
## send info back to BLE
|
|
uart.write(f"BLE: uart recive 0x{message_int:08x}\n")
|
|
|
|
except TypeError:
|
|
logger.debug("BLE: TypeError")
|
|
pass
|
|
|
|
except ValueError:
|
|
logger.debug("BLE: ValueError")
|
|
pass
|
|
|
|
|
|
#### OLT IR ###
|
|
COMMAND_MASK = 0x800000
|
|
DEMAGE = [0, 1, 4, 5, 7, 10, 15, 17, 20, 25, 30, 35, 40, 50, 75, 100]
|
|
TEAM = ["RED", "BLUE", "YELLOW", "GREEN"]
|
|
|
|
|
|
#### OLT IR RX imports ###
|
|
import ustruct
|
|
from sys import platform
|
|
import time
|
|
import gc
|
|
from machine import Pin, freq
|
|
from olt_lib.ir_rx.print_error import print_error # Optional print of error codes
|
|
from olt_lib.ir_rx.olt import LT_24 as LT_24_RX # Import all implemented classes
|
|
|
|
# Define IR RX pin according to platform
|
|
if platform == "pyboard":
|
|
ir_rx_pin = Pin("X3", Pin.IN)
|
|
elif platform == "esp8266":
|
|
freq(160000000)
|
|
ir_rx_pin = Pin(13, Pin.IN)
|
|
elif platform == "esp32" or platform == "esp32_LoBo":
|
|
ir_rx_pin = Pin(23, Pin.IN)
|
|
elif platform == "rp2":
|
|
ir_rx_pin = Pin(16, Pin.IN, Pin.PULL_UP)
|
|
|
|
#### OLT IR TX imports ###
|
|
from sys import platform
|
|
|
|
ESP32 = platform == "esp32"
|
|
RP2 = platform == "rp2"
|
|
PYBOARD = platform == "pyboard"
|
|
if ESP32 or RP2:
|
|
from machine import Pin
|
|
else:
|
|
from pyb import Pin, LED
|
|
import uasyncio as asyncio
|
|
from primitives.switch import Switch
|
|
from primitives.delay_ms import Delay_ms
|
|
|
|
# Import all implemented classes
|
|
from olt_lib.ir_tx.olt import LT_24 as LT_24_TX
|
|
|
|
# Define IR TX pin according to platform
|
|
if ESP32: # Pins for IR LED gate
|
|
ir_tx_pin = Pin(23, Pin.OUT, value=0)
|
|
elif RP2:
|
|
ir_tx_pin = Pin(17, Pin.OUT, value=0)
|
|
else:
|
|
ir_tx_pin = Pin("X1")
|
|
|
|
|
|
#### OLT Commands ####
|
|
COMMAND_END = 0xE8
|
|
COMMANDS = [
|
|
["NewGame", 0x83, 0x05, COMMAND_END],
|
|
# ["AdminKill", 0x83, 0x00, COMMAND_END],
|
|
["Explode", 0x83, 0x0B, COMMAND_END],
|
|
["TestShot_ID8_RED_25", 0x08, 0x24, 0x00],
|
|
["SensorTest", 0x83, 0x15, COMMAND_END],
|
|
]
|
|
|
|
#### Logging ####
|
|
import logging
|
|
|
|
global logger
|
|
init_ticks = time.ticks_ms()
|
|
# change logging level here, logging.ERROR is default, logging.DEBUG for more info, and reset device
|
|
# log_level = logging.DEBUG
|
|
log_level = logging.INFO
|
|
# log_level = logging.ERROR
|
|
logging.basicConfig(level=log_level, format="%(name)s - %(levelname)s - %(message)s")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
### Alive check ###
|
|
async def alive_check():
|
|
# loop for checked is alive
|
|
logger.debug("alive check start")
|
|
onboar_led = machine.Pin("LED", machine.Pin.OUT) # Pi Pico onboard LED
|
|
|
|
while True:
|
|
logger.debug(f"running ms: {time.ticks_ms() - init_ticks}")
|
|
onboar_led.on()
|
|
await asyncio.sleep_ms(100)
|
|
onboar_led.off()
|
|
await asyncio.sleep_ms(900)
|
|
|
|
|
|
### OLT Command ###
|
|
async def olt_sent_ir(ir_tx, tx1, tx2, tx3):
|
|
# IR TX send
|
|
logger.debug(f"Command IR TX transmit")
|
|
asyncio.create_task(ir_tx.transmit(tx1, tx2, tx3, False))
|
|
await asyncio.sleep_ms(10)
|
|
|
|
|
|
async def olt_command(cmd, btn, rgb, ir_tx, tx1, tx2, tx3):
|
|
logger.debug(f"Init OLT game command {cmd}")
|
|
await rgb.set_red()
|
|
btn.press_func(None)
|
|
btn.release_func(None)
|
|
|
|
while True:
|
|
btn.press.clear()
|
|
btn.release.clear()
|
|
await btn.press.wait()
|
|
logger.debug(f"Command {cmd} button pressed")
|
|
asyncio.create_task(piezo_short_beep())
|
|
await rgb.set_green()
|
|
# IR TX send
|
|
asyncio.create_task(olt_sent_ir(ir_tx, tx1, tx2, tx3))
|
|
# logger.debug(f"Command {cmd} IR TX transmit")
|
|
# asyncio.create_task(ir_tx.transmit(tx1, tx2, tx3, False))
|
|
# await asyncio.sleep_ms(50)
|
|
await btn.release.wait()
|
|
logger.debug(f"Command {cmd} button released")
|
|
# await rgb.set_red()
|
|
|
|
|
|
# IR RX callback
|
|
async def cb(byte1, byte2, byte3, packet):
|
|
# logger.debug(f"{(packet & COMMAND_MASK):0X}")
|
|
if packet & COMMAND_MASK:
|
|
logger.debug("Command packet")
|
|
print(
|
|
f"byte1 0x{byte1:02x} byte2 0x{byte2:02x} byte3 0x{byte3:02x} packet 0x{packet:06x}"
|
|
)
|
|
uart.write(f"IR RX Command 0x{packet:06x}\n")
|
|
else:
|
|
logger.debug("Shot packet")
|
|
print(
|
|
f"byte1 0x{byte1:02x} byte2 0x{byte2:02x} byte3 0x{byte3:02x} packet 0x{packet:06x}"
|
|
)
|
|
uart.write(f"IR RX Shot 0x{packet:06x}\n")
|
|
|
|
packet = packet >> 10
|
|
id = packet >> 6
|
|
team = (packet >> 4) & 0x3
|
|
demage = packet & 0xF
|
|
|
|
logger.debug(
|
|
f"Shot: player ID:{id}, team {TEAM[team]}, demage {DEMAGE[demage]}"
|
|
)
|
|
uart.write(
|
|
f"Shot: player ID:{id}, team {TEAM[team]}, demage {DEMAGE[demage]}\n"
|
|
)
|
|
|
|
await rgb1.set_blue()
|
|
await asyncio.sleep_ms(500)
|
|
await rgb1.set_red()
|
|
|
|
|
|
### main ###
|
|
async def main(proto):
|
|
set_global_exception() # enable a global exception handler to simplify debugging
|
|
logger.debug("main start")
|
|
|
|
print(f"Device ID: {device_id_str}")
|
|
|
|
### alive check ###
|
|
asyncio.create_task(alive_check())
|
|
|
|
### battery monitor ###
|
|
# aadc = AADC()
|
|
asyncio.create_task(battery_monitor(aadc))
|
|
|
|
### RGB LED ###
|
|
logger.debug("RGB init")
|
|
global rgb1
|
|
rgb1 = RGB()
|
|
|
|
### OpenLaserTag IR RX ###
|
|
logger.debug("OLT IR RX init")
|
|
|
|
ir_rx = LT_24_RX(ir_rx_pin, cb) # Instantiate receiver
|
|
ir_rx.error_function(print_error) # Show debug information
|
|
|
|
### OpenLaserTag IR TX ###
|
|
logger.debug("OLT IR TX init")
|
|
|
|
global ir_tx
|
|
ir_tx = LT_24_TX(ir_tx_pin, 56000) # My decoder chip is 56KHz
|
|
# Uncomment the following to print transmit timing
|
|
ir_tx.timeit = True
|
|
|
|
### BLUETOOTH UART ###
|
|
ble = bluetooth.BLE()
|
|
global uart
|
|
uart = ble_uart.Ble_uart(
|
|
ble, on_rx=ble_rx_handler, name="OpenLaserTag " + device_id_str
|
|
)
|
|
|
|
### Buttons ###
|
|
logger.debug("Button init")
|
|
PINS = ((COMMANDS[0], 18), (COMMANDS[1], 21), (COMMANDS[2], 20), (COMMANDS[3], 19))
|
|
for cmd, pin in PINS:
|
|
btn = Pushbutton(machine.Pin(pin, machine.Pin.IN, machine.Pin.PULL_UP))
|
|
command, tx1, tx2, tx3 = cmd
|
|
logger.debug(
|
|
f"New command task: {command}, IR 0x{tx1:02X} 0x{tx2:02X} 0x{tx3:02X}, btn: {btn._pin} "
|
|
)
|
|
asyncio.create_task(olt_command(command, btn, rgb1, ir_tx, tx1, tx2, tx3))
|
|
|
|
asyncio.create_task(piezo_short_beep()) # piezo_beep()
|
|
### Main loop ###
|
|
while True:
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
def exception_handler(loop, context):
|
|
# log exception
|
|
print(context["exception"])
|
|
|
|
|
|
### asyncio start main ###
|
|
try:
|
|
asyncio.run(main(0))
|
|
|
|
except KeyboardInterrupt:
|
|
logger.exception("App Interrupted")
|
|
|
|
finally:
|
|
asyncio.new_event_loop()
|
|
logger.exception("End")
|