diff --git a/ble_advertising.py b/ble_advertising.py new file mode 100644 index 0000000..d745711 --- /dev/null +++ b/ble_advertising.py @@ -0,0 +1,102 @@ +# Helpers for generating BLE advertising payloads. + +# A more fully-featured (and easier to use) version of this is implemented in +# aioble. This code is provided just as a basic example. See +# https://github.com/micropython/micropython-lib/tree/master/micropython/bluetooth/aioble + +from micropython import const +import struct +import bluetooth + +# Advertising payloads are repeated packets of the following form: +# 1 byte data length (N + 1) +# 1 byte type (see constants below) +# N bytes type-specific data + +_ADV_TYPE_FLAGS = const(0x01) +_ADV_TYPE_NAME = const(0x09) +_ADV_TYPE_UUID16_COMPLETE = const(0x3) +_ADV_TYPE_UUID32_COMPLETE = const(0x5) +_ADV_TYPE_UUID128_COMPLETE = const(0x7) +_ADV_TYPE_UUID16_MORE = const(0x2) +_ADV_TYPE_UUID32_MORE = const(0x4) +_ADV_TYPE_UUID128_MORE = const(0x6) +_ADV_TYPE_APPEARANCE = const(0x19) + +_ADV_MAX_PAYLOAD = const(31) + + +# Generate a payload to be passed to gap_advertise(adv_data=...). +def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0): + payload = bytearray() + + def _append(adv_type, value): + nonlocal payload + payload += struct.pack("BB", len(value) + 1, adv_type) + value + + _append( + _ADV_TYPE_FLAGS, + struct.pack("B", (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)), + ) + + if name: + _append(_ADV_TYPE_NAME, name) + + if services: + for uuid in services: + b = bytes(uuid) + if len(b) == 2: + _append(_ADV_TYPE_UUID16_COMPLETE, b) + elif len(b) == 4: + _append(_ADV_TYPE_UUID32_COMPLETE, b) + elif len(b) == 16: + _append(_ADV_TYPE_UUID128_COMPLETE, b) + + # See org.bluetooth.characteristic.gap.appearance.xml + if appearance: + _append(_ADV_TYPE_APPEARANCE, struct.pack(" _ADV_MAX_PAYLOAD: + raise ValueError("advertising payload too large") + + return payload + + +def decode_field(payload, adv_type): + i = 0 + result = [] + while i + 1 < len(payload): + if payload[i + 1] == adv_type: + result.append(payload[i + 2 : i + payload[i] + 1]) + i += 1 + payload[i] + return result + + +def decode_name(payload): + n = decode_field(payload, _ADV_TYPE_NAME) + return str(n[0], "utf-8") if n else "" + + +def decode_services(payload): + services = [] + for u in decode_field(payload, _ADV_TYPE_UUID16_COMPLETE): + services.append(bluetooth.UUID(struct.unpack(" 0 + + def _advertise(self, interval_us=500000): + print("Starting advertising") + self._ble.gap_advertise(interval_us, adv_data=self._payload) + + def on_write(self, callback): + self._write_callback = callback + + +def demo(): + ble = bluetooth.BLE() + p = BLESimplePeripheral(ble) + + def on_rx(v): + print("RX", v) + + p.on_write(on_rx) + + i = 0 + while True: + if p.is_connected(): + # Short burst of queued notifications. + for _ in range(3): + data = str(i) + "_" + print("TX", data) + p.send(data) + i += 1 + time.sleep_ms(100) + + +if __name__ == "__main__": + demo() \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..d0eb59f --- /dev/null +++ b/main.py @@ -0,0 +1,118 @@ +import aioble +import bluetooth +import machine +import uasyncio as asyncio + + +_OBD_UUID_SERVICE = bluetooth.UUID(0xfff0) +_OBD_UUID_CHARACTERISTIC = bluetooth.UUID(0xfff1) + +led = machine.Pin("LED", machine.Pin.OUT) +connected = False +alive = False + + +async def find_remote(): + async with aioble.scan(duration_ms=5000, interval_us=30000, window_us=30000, active=True) as scanner: + async for result in scanner: + if (result.name() == "OBDBLE"): + print("Found OBDBLE") + for item in result.services(): + print(item) + if _OBD_UUID_SERVICE in result.services(): + print("Found OBDBLE service") + return result.device + return None + + +async def blink_task(): + print("Blink task started") + toggle = True + + while True and alive: + blink = 250 + led.value(toggle) + toggle = not toggle + + if connected: + blink = 1000 + else: + blink = 250 + await asyncio.sleep_ms(blink) + + print("Blink task stopped") + + +async def peripheral_task(): + + global connected, alive + connected = False + device = await find_remote() + + if not device: + print("OBD device not found") + return + + try: + print("Connecting to", device) + connection = await device.connect() + + except asyncio.TimeoutError: + print("Timeout during connection") + return + + async with connection: + print("Connected") + alive = True + connected = True + + + try: + + obd_service = await connection.service(_OBD_UUID_SERVICE) + print(obd_service) + obd_characteristic = await obd_service.characteristic(_OBD_UUID_CHARACTERISTIC) + print(obd_characteristic) + + except asyncio.TimeoutError: + print("Timeout discovering services/characteristics") + return + + + try: + if obd_service == None: + print("Obd Disconnected") + alive = False + connected = False + + if obd_characteristic == None: + print("No Control") + alive = False + connected = False + + except asyncio.TimeoutError: + print("Timeout during discovery/service/characteristic") + alive = False + + print("Sending ATZ") + obd_characteristic.write(b"ATZ",response=True) + obd_characteristic.notified() + data = await obd_characteristic.read() + print(data) + + + await connection.disconnected() + print("Disconnected") + + +async def main(): + tasks = [] + tasks = [ + asyncio.create_task(blink_task()), + asyncio.create_task(peripheral_task()) + ] + + await asyncio.gather(*tasks) + +while True: + asyncio.run(main()) \ No newline at end of file