modified:   main.py
This commit is contained in:
“njord” 2024-05-14 00:04:46 +02:00
parent c7d0471ad5
commit e8b1d31ee6

154
main.py
View File

@ -1,118 +1,68 @@
import sys
sys.path.append("")
from micropython import const
import uasyncio as asyncio
import aioble
import bluetooth
import machine
import uasyncio as asyncio
import random
import struct
# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E)
# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
# How frequently to send advertising beacons.
_ADV_INTERVAL_MS = 250_000
_OBD_UUID_SERVICE = bluetooth.UUID(0xfff0)
_OBD_UUID_CHARACTERISTIC = bluetooth.UUID(0xfff1)
led = machine.Pin("LED", machine.Pin.OUT)
connected = False
alive = False
# Register GATT server.
temp_service = aioble.Service(_ENV_SENSE_UUID)
temp_characteristic = aioble.Characteristic(
temp_service, _ENV_SENSE_TEMP_UUID, read=True, notify=True
)
aioble.register_services(temp_service)
async def find_remote():
async with aioble.scan(duration_ms=5000, interval_us=30000, window_us=30000, active=True) as scanner:
async for result in scanner:
if (result.name() == "OBDBLE"):
print("Found OBDBLE")
for item in result.services():
print(item)
if _OBD_UUID_SERVICE in result.services():
print("Found OBDBLE service")
return result.device
return None
# Helper to encode the temperature characteristic encoding (sint16, hundredths of a degree).
def _encode_temperature(temp_deg_c):
return struct.pack("<h", int(temp_deg_c * 100))
async def blink_task():
print("Blink task started")
toggle = True
while True and alive:
blink = 250
led.value(toggle)
toggle = not toggle
if connected:
blink = 1000
else:
blink = 250
await asyncio.sleep_ms(blink)
print("Blink task stopped")
# This would be periodically polling a hardware sensor.
async def sensor_task():
t = 24.5
while True:
temp_characteristic.write(_encode_temperature(t))
t += random.uniform(-0.5, 0.5)
await asyncio.sleep_ms(1000)
# Serially wait for connections. Don't advertise while a central is
# connected.
async def peripheral_task():
global connected, alive
connected = False
device = await find_remote()
if not device:
print("OBD device not found")
return
try:
print("Connecting to", device)
connection = await device.connect()
except asyncio.TimeoutError:
print("Timeout during connection")
return
async with connection:
print("Connected")
alive = True
connected = True
try:
obd_service = await connection.service(_OBD_UUID_SERVICE)
print(obd_service)
obd_characteristic = await obd_service.characteristic(_OBD_UUID_CHARACTERISTIC)
print(obd_characteristic)
except asyncio.TimeoutError:
print("Timeout discovering services/characteristics")
return
try:
if obd_service == None:
print("Obd Disconnected")
alive = False
connected = False
if obd_characteristic == None:
print("No Control")
alive = False
connected = False
except asyncio.TimeoutError:
print("Timeout during discovery/service/characteristic")
alive = False
print("Sending ATZ")
obd_characteristic.write(b"ATZ",response=True)
obd_characteristic.notified()
data = await obd_characteristic.read()
print(data)
await connection.disconnected()
print("Disconnected")
while True:
async with await aioble.advertise(
_ADV_INTERVAL_MS,
name="mpy-temp",
services=[_ENV_SENSE_UUID],
appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER,
) as connection:
print("Connection from", connection.device)
await connection.disconnected()
# Run both tasks.
async def main():
tasks = []
tasks = [
asyncio.create_task(blink_task()),
asyncio.create_task(peripheral_task())
]
t1 = asyncio.create_task(sensor_task())
t2 = asyncio.create_task(peripheral_task())
await asyncio.gather(t1, t2)
await asyncio.gather(*tasks)
while True:
asyncio.run(main())
asyncio.run(main())