lasertag_remote_control/main.py

401 lines
11 KiB
Python

###########################################################
# #
# OpenLaserTag remote control #
# OLT protocol #
# Tomas Krejci [Njord] #
# tk@tomaskrejci.com #
# #
# version 1.0 #
# 2024-06-22 #
# #
###########################################################
import machine, time, re
from sys import platform
from primitives import set_global_exception
import primitives
import uasyncio as asyncio
ESP32 = platform == "esp32"
RP2 = platform == "rp2"
PYBOARD = platform == "pyboard"
#### Other imports ###
from primitives.delay_ms import Delay_ms
import primitives.queue as queue
from primitives import Switch, Pushbutton
from rgb import RGB
#### onboard LED ####
if ESP32 or RP2:
from machine import Pin
else:
from pyb import Pin, LED
#### Device ID ###
device_id_str = ""
for b in machine.unique_id():
device_id_str += "{:02X}".format(b)
device_id_str = device_id_str[-4:] # Keep last 8 char
device_id = int(device_id_str, 16) # Convert to int from hex in string
### Piezo ###
piezo_pin = machine.Pin(15, machine.Pin.OUT, value=0)
async def piezo_beep(sec=0.3):
piezo_pin.value(1)
await asyncio.sleep(sec)
piezo_pin.value(0)
async def piezo_short_beep():
piezo_pin.value(1)
await asyncio.sleep(0.1)
piezo_pin.value(0)
#### Voltage monitor ###
from machine import ADC
from primitives import AADC
# Define ADC battery pin according to platform
if ESP32:
battery_ADC = ADC(28)
elif RP2:
battery_ADC = ADC(28)
else:
battery_ADC = ADC("X2")
aadc = AADC(battery_ADC)
async def battery_monitor(aadc):
while True:
aadc.sense(normal=False)
# value = await aadc(25_000, 39_000) # Wait until in range
# logger.debug("In range:", value)
# aadc.sense(normal=True)
# value = await aadc() # Wait until out of range
# logger.debug("Out of range:", value)
voltage = aadc.read_u16() * (
3.3 / 65535
) # reference voltage (3.3V) divided by the maximum ADC value (65535)
voltage = round(voltage, 2)
capacity = voltage_to_capacity(voltage / 2)
logger.info(f"Battery Voltage: {voltage:.2f}V, Capacity: {capacity:.0f}%")
uart.write(f"BATTERY: Voltage: {voltage:.2f}V, Capacity: {capacity:.0f}%\n")
await asyncio.sleep_ms(10000)
# simple voltage to capacity, model for standart one cell alkaline batteries
def voltage_to_capacity(voltage):
if voltage >= 1.6:
return 100
elif 1.5 <= voltage < 1.6:
return 90 + (voltage - 1.5) * 100
elif 1.4 <= voltage < 1.5:
return 70 + (voltage - 1.4) * 200
elif 1.3 <= voltage < 1.4:
return 40 + (voltage - 1.3) * 300
elif 1.2 <= voltage < 1.3:
return 10 + (voltage - 1.2) * 300
elif 1.0 <= voltage < 1.2:
return (voltage - 1.0) * 50
else:
return 0
#### BLE UART ###
import ble_uart
import bluetooth
import time
async def ble_rx_handler():
global rgb1
message = uart.read().decode().strip()
logger.debug(f"BLE: {message}")
try:
if "bomb" in message:
logger.info("BLE: Bomb Explode send to IR")
for i in range(5):
asyncio.create_task(
olt_sent_ir(ir_tx, COMMANDS[1][1], COMMANDS[1][2], COMMANDS[1][3])
)
await asyncio.sleep_ms(100)
asyncio.create_task(piezo_beep())
uart.write("BLE: Bomb Explode send to IR\n")
asyncio.create_task(rgb1.set_red())
await asyncio.sleep_ms(100)
while True:
time.sleep_ms(3000)
logger.info(
"BLE: Bomb Exploded, Device is LOCKED, please restart Device"
)
uart.write(
"BLE: Bomb Exploded, Device is LOCKED\n Please restart Device\n"
)
await piezo_short_beep()
elif "led_on" in message:
logger.info("BLE: LED ON")
asyncio.create_task(rgb1.set_red())
uart.write("BLE: LED ON\n")
elif "led_off" in message:
logger.info("BLE: LED OFF")
asyncio.create_task(rgb1.off())
uart.write("BLE: LED OFF\n")
else:
message_int = int(message, 16)
logger.debug(f"BLE: {message_int:08x}")
## if recive from BLE UART any hex number senend it as custom command to IR
tx1 = (message_int >> 16) & 0xFF
tx2 = (message_int >> 8) & 0xFF
tx3 = message_int & 0xFF
asyncio.create_task(olt_sent_ir(ir_tx, tx1, tx2, tx3))
## send info back to BLE
uart.write(f"BLE: uart recive 0x{message_int:08x}\n")
except TypeError:
logger.debug("BLE: TypeError")
pass
except ValueError:
logger.debug("BLE: ValueError")
pass
#### OLT IR ###
COMMAND_MASK = 0x800000
DEMAGE = [0, 1, 4, 5, 7, 10, 15, 17, 20, 25, 30, 35, 40, 50, 75, 100]
TEAM = ["RED", "BLUE", "YELLOW", "GREEN"]
#### OLT IR RX imports ###
import ustruct
from sys import platform
import time
import gc
from machine import Pin, freq
from olt_lib.ir_rx.print_error import print_error # Optional print of error codes
from olt_lib.ir_rx.olt import LT_24 as LT_24_RX # Import all implemented classes
# Define IR RX pin according to platform
if platform == "pyboard":
ir_rx_pin = Pin("X3", Pin.IN)
elif platform == "esp8266":
freq(160000000)
ir_rx_pin = Pin(13, Pin.IN)
elif platform == "esp32" or platform == "esp32_LoBo":
ir_rx_pin = Pin(23, Pin.IN)
elif platform == "rp2":
ir_rx_pin = Pin(16, Pin.IN, Pin.PULL_UP)
#### OLT IR TX imports ###
from sys import platform
ESP32 = platform == "esp32"
RP2 = platform == "rp2"
PYBOARD = platform == "pyboard"
if ESP32 or RP2:
from machine import Pin
else:
from pyb import Pin, LED
import uasyncio as asyncio
from primitives.switch import Switch
from primitives.delay_ms import Delay_ms
# Import all implemented classes
from olt_lib.ir_tx.olt import LT_24 as LT_24_TX
# Define IR TX pin according to platform
if ESP32: # Pins for IR LED gate
ir_tx_pin = Pin(23, Pin.OUT, value=0)
elif RP2:
ir_tx_pin = Pin(17, Pin.OUT, value=0)
else:
ir_tx_pin = Pin("X1")
#### OLT Commands ####
COMMAND_END = 0xE8
COMMANDS = [
["NewGame", 0x83, 0x05, COMMAND_END],
# ["AdminKill", 0x83, 0x00, COMMAND_END],
["Explode", 0x83, 0x0B, COMMAND_END],
["TestShot_ID8_RED_25", 0x08, 0x24, 0x00],
["SensorTest", 0x83, 0x15, COMMAND_END],
]
#### Logging ####
import logging
global logger
init_ticks = time.ticks_ms()
# change logging level here, logging.ERROR is default, logging.DEBUG for more info, and reset device
# log_level = logging.DEBUG
log_level = logging.INFO
# log_level = logging.ERROR
logging.basicConfig(level=log_level, format="%(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
### Alive check ###
async def alive_check():
# loop for checked is alive
logger.debug("alive check start")
onboar_led = machine.Pin("LED", machine.Pin.OUT) # Pi Pico onboard LED
while True:
logger.debug(f"running ms: {time.ticks_ms() - init_ticks}")
onboar_led.on()
await asyncio.sleep_ms(100)
onboar_led.off()
await asyncio.sleep_ms(900)
### OLT Command ###
async def olt_sent_ir(ir_tx, tx1, tx2, tx3):
# IR TX send
logger.debug(f"Command IR TX transmit")
asyncio.create_task(ir_tx.transmit(tx1, tx2, tx3, False))
await asyncio.sleep_ms(10)
async def olt_command(cmd, btn, rgb, ir_tx, tx1, tx2, tx3):
logger.debug(f"Init OLT game command {cmd}")
await rgb.set_red()
btn.press_func(None)
btn.release_func(None)
while True:
btn.press.clear()
btn.release.clear()
await btn.press.wait()
logger.debug(f"Command {cmd} button pressed")
asyncio.create_task(piezo_short_beep())
await rgb.set_green()
# IR TX send
asyncio.create_task(olt_sent_ir(ir_tx, tx1, tx2, tx3))
# logger.debug(f"Command {cmd} IR TX transmit")
# asyncio.create_task(ir_tx.transmit(tx1, tx2, tx3, False))
# await asyncio.sleep_ms(50)
await btn.release.wait()
logger.debug(f"Command {cmd} button released")
# await rgb.set_red()
# IR RX callback
async def cb(byte1, byte2, byte3, packet):
# logger.debug(f"{(packet & COMMAND_MASK):0X}")
if packet & COMMAND_MASK:
logger.debug("Command packet")
print(
f"byte1 0x{byte1:02x} byte2 0x{byte2:02x} byte3 0x{byte3:02x} packet 0x{packet:06x}"
)
uart.write(f"IR RX Command 0x{packet:06x}\n")
else:
logger.debug("Shot packet")
print(
f"byte1 0x{byte1:02x} byte2 0x{byte2:02x} byte3 0x{byte3:02x} packet 0x{packet:06x}"
)
uart.write(f"IR RX Shot 0x{packet:06x}\n")
packet = packet >> 10
id = packet >> 6
team = (packet >> 4) & 0x3
demage = packet & 0xF
logger.debug(
f"Shot: player ID:{id}, team {TEAM[team]}, demage {DEMAGE[demage]}"
)
uart.write(
f"Shot: player ID:{id}, team {TEAM[team]}, demage {DEMAGE[demage]}\n"
)
await rgb1.set_blue()
await asyncio.sleep_ms(500)
await rgb1.set_red()
### main ###
async def main(proto):
set_global_exception() # enable a global exception handler to simplify debugging
logger.debug("main start")
print(f"Device ID: {device_id_str}")
### alive check ###
asyncio.create_task(alive_check())
### battery monitor ###
# aadc = AADC()
asyncio.create_task(battery_monitor(aadc))
### RGB LED ###
logger.debug("RGB init")
global rgb1
rgb1 = RGB()
### OpenLaserTag IR RX ###
logger.debug("OLT IR RX init")
ir_rx = LT_24_RX(ir_rx_pin, cb) # Instantiate receiver
ir_rx.error_function(print_error) # Show debug information
### OpenLaserTag IR TX ###
logger.debug("OLT IR TX init")
global ir_tx
ir_tx = LT_24_TX(ir_tx_pin, 56000) # My decoder chip is 56KHz
# Uncomment the following to print transmit timing
ir_tx.timeit = True
### BLUETOOTH UART ###
ble = bluetooth.BLE()
global uart
uart = ble_uart.Ble_uart(
ble, on_rx=ble_rx_handler, name="OpenLaserTag " + device_id_str
)
### Buttons ###
logger.debug("Button init")
PINS = ((COMMANDS[0], 18), (COMMANDS[1], 21), (COMMANDS[2], 20), (COMMANDS[3], 19))
for cmd, pin in PINS:
btn = Pushbutton(machine.Pin(pin, machine.Pin.IN, machine.Pin.PULL_UP))
command, tx1, tx2, tx3 = cmd
logger.debug(
f"New command task: {command}, IR 0x{tx1:02X} 0x{tx2:02X} 0x{tx3:02X}, btn: {btn._pin} "
)
asyncio.create_task(olt_command(command, btn, rgb1, ir_tx, tx1, tx2, tx3))
asyncio.create_task(piezo_short_beep()) # piezo_beep()
### Main loop ###
while True:
await asyncio.sleep(1)
def exception_handler(loop, context):
# log exception
print(context["exception"])
### asyncio start main ###
try:
asyncio.run(main(0))
except KeyboardInterrupt:
logger.exception("App Interrupted")
finally:
asyncio.new_event_loop()
logger.exception("End")