diff --git a/ble_main.py b/ble_main.py deleted file mode 100644 index 6608db2..0000000 --- a/ble_main.py +++ /dev/null @@ -1,68 +0,0 @@ -import sys - -sys.path.append("") - -from micropython import const - -import uasyncio as asyncio -import aioble -import bluetooth - -import random -import struct - -# org.bluetooth.service.environmental_sensing -_ENV_SENSE_UUID = bluetooth.UUID(0x181A) -# org.bluetooth.characteristic.temperature -_ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E) -# org.bluetooth.characteristic.gap.appearance.xml -_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768) - -# How frequently to send advertising beacons. -_ADV_INTERVAL_MS = 250_000 - - -# Register GATT server. -temp_service = aioble.Service(_ENV_SENSE_UUID) -temp_characteristic = aioble.Characteristic( - temp_service, _ENV_SENSE_TEMP_UUID, read=True, notify=True -) -aioble.register_services(temp_service) - - -# Helper to encode the temperature characteristic encoding (sint16, hundredths of a degree). -def _encode_temperature(temp_deg_c): - return struct.pack(" 0 - - def _advertise(self, interval_us=500000): - print("Starting advertising") - self._ble.gap_advertise(interval_us, adv_data=self._payload) - - def on_write(self, callback): - self._write_callback = callback - - -def demo(): - ble = bluetooth.BLE() - p = BLESimplePeripheral(ble) - - def on_rx(v): - print("RX", v) - - p.on_write(on_rx) - - i = 0 - while True: - if p.is_connected(): - # Short burst of queued notifications. - for _ in range(3): - data = str(i) + "_" - print("TX", data) - p.send(data) - i += 1 - time.sleep_ms(100) - - -if __name__ == "__main__": - demo() \ No newline at end of file diff --git a/lib/ble_uart/__init__.py b/lib/ble_uart/__init__.py new file mode 100644 index 0000000..a895251 --- /dev/null +++ b/lib/ble_uart/__init__.py @@ -0,0 +1,92 @@ +import bluetooth +from ble_uart.ble_advertising import advertising_payload + +from micropython import const + +_IRQ_CENTRAL_CONNECT = const(1) +_IRQ_CENTRAL_DISCONNECT = const(2) +_IRQ_GATTS_WRITE = const(3) + +_FLAG_WRITE = const(0x0008) +_FLAG_NOTIFY = const(0x0010) + +_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") +_UART_TX = ( + bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"), + _FLAG_NOTIFY, +) +_UART_RX = ( + bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"), + _FLAG_WRITE, +) +_UART_SERVICE = ( + _UART_UUID, + (_UART_TX, _UART_RX), +) + +# org.bluetooth.characteristic.gap.appearance.xml +_ADV_APPEARANCE_GENERIC_COMPUTER = const(128) + + +class Ble_uart: + def __init__(self, ble, on_rx=None, name="mpy-uart", rxbuf=100): + self._ble = ble + self._ble.active(True) + self._ble.irq(self._irq) + ((self._tx_handle, self._rx_handle),) = self._ble.gatts_register_services( + (_UART_SERVICE,) + ) + # Increase the size of the rx buffer and enable append mode. + self._ble.gatts_set_buffer(self._rx_handle, rxbuf, True) + self._connections = set() + self._rx_buffer = bytearray() + self._handler = None + # Optionally add services=[_UART_UUID], but this is likely to make the payload too large. + self._payload = advertising_payload( + name=name, appearance=_ADV_APPEARANCE_GENERIC_COMPUTER + ) + self._advertise() + self.irq(handler=on_rx) + + def irq(self, handler): + self._handler = handler + + def _irq(self, event, data): + # Track connections so we can send notifications. + if event == _IRQ_CENTRAL_CONNECT: + conn_handle, _, _ = data + self._connections.add(conn_handle) + elif event == _IRQ_CENTRAL_DISCONNECT: + conn_handle, _, _ = data + if conn_handle in self._connections: + self._connections.remove(conn_handle) + # Start advertising again to allow a new connection. + self._advertise() + elif event == _IRQ_GATTS_WRITE: + conn_handle, value_handle = data + if conn_handle in self._connections and value_handle == self._rx_handle: + self._rx_buffer += self._ble.gatts_read(self._rx_handle) + if self._handler: + self._handler() + + def any(self): + return len(self._rx_buffer) + + def read(self, sz=None): + if not sz: + sz = len(self._rx_buffer) + result = self._rx_buffer[0:sz] + self._rx_buffer = self._rx_buffer[sz:] + return result + + def write(self, data): + for conn_handle in self._connections: + self._ble.gatts_notify(conn_handle, self._tx_handle, data) + + def close(self): + for conn_handle in self._connections: + self._ble.gap_disconnect(conn_handle) + self._connections.clear() + + def _advertise(self, interval_us=500000): + self._ble.gap_advertise(interval_us, adv_data=self._payload) diff --git a/ble_advertising.py b/lib/ble_uart/ble_advertising.py similarity index 87% rename from ble_advertising.py rename to lib/ble_uart/ble_advertising.py index d745711..eed527f 100644 --- a/ble_advertising.py +++ b/lib/ble_uart/ble_advertising.py @@ -1,9 +1,5 @@ # Helpers for generating BLE advertising payloads. -# A more fully-featured (and easier to use) version of this is implemented in -# aioble. This code is provided just as a basic example. See -# https://github.com/micropython/micropython-lib/tree/master/micropython/bluetooth/aioble - from micropython import const import struct import bluetooth @@ -23,8 +19,6 @@ _ADV_TYPE_UUID32_MORE = const(0x4) _ADV_TYPE_UUID128_MORE = const(0x6) _ADV_TYPE_APPEARANCE = const(0x19) -_ADV_MAX_PAYLOAD = const(31) - # Generate a payload to be passed to gap_advertise(adv_data=...). def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0): @@ -56,9 +50,6 @@ def advertising_payload(limited_disc=False, br_edr=False, name=None, services=No if appearance: _append(_ADV_TYPE_APPEARANCE, struct.pack(" _ADV_MAX_PAYLOAD: - raise ValueError("advertising payload too large") - return payload @@ -99,4 +90,4 @@ def demo(): if __name__ == "__main__": - demo() \ No newline at end of file + demo() diff --git a/lib/ble_uart/test.py b/lib/ble_uart/test.py new file mode 100644 index 0000000..5b4a93a --- /dev/null +++ b/lib/ble_uart/test.py @@ -0,0 +1,14 @@ +import ble_uart +import bluetooth +import time + +def print_handler(): + print(uart.read().decode().strip()) + +ble = bluetooth.BLE() +uart = ble_uart.Ble_uart(ble, on_rx=print_handler, name="bens app") + + +while True: + uart.write("hello\n") + time.sleep(1) diff --git a/remote.py b/remote.py deleted file mode 100644 index 80ac074..0000000 --- a/remote.py +++ /dev/null @@ -1,247 +0,0 @@ -# Kevin McAleer -# 2023-06-28 -# Bluetooth cores specification versio 5.4 (0x0D) - -import sys - -import aioble -import bluetooth -import gc -import machine -import uasyncio as asyncio -from micropython import const, mem_info -#from pico_lcd_1_14 import LCD_1inch14 -from machine import Pin,SPI,PWM -import framebuf - - - -def uid(): - """ Return the unique id of the device as a string """ - return "{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}".format( - *machine.unique_id()) - -MANUFACTURER_ID = const(0x02A29) -MODEL_NUMBER_ID = const(0x2A24) -SERIAL_NUMBER_ID = const(0x2A25) -HARDWARE_REVISION_ID = const(0x2A26) -BLE_VERSION_ID = const(0x2A28) - -led = machine.Pin("LED", machine.Pin.OUT) - -# Begin set up of LCD -BL = 13 -DC = 8 -RST = 12 -MOSI = 11 -SCK = 10 -CS = 9 - -pwm = PWM(Pin(BL)) -pwm.freq(1000) -pwm.duty_u16(32768)#max 65535 - -#LCD = LCD_1inch14() -#color BRG -"""" -LCD.fill(LCD.white) - -LCD.show() -LCD.text("Raspberry Pi Pico",90,40,LCD.red) -LCD.text("PicoGo",90,60,LCD.green) -LCD.text("Pico-LCD-1.14",90,80,LCD.blue) - -LCD.hline(10,10,220,LCD.blue) -LCD.hline(10,125,220,LCD.blue) -LCD.vline(10,10,115,LCD.blue) -LCD.vline(230,10,115,LCD.blue) - -LCD.show() -""" - -keyA = Pin(15,Pin.IN,Pin.PULL_UP) -keyB = Pin(17,Pin.IN,Pin.PULL_UP) - -key2 = Pin(2 ,Pin.IN,Pin.PULL_UP) -key3 = Pin(3 ,Pin.IN,Pin.PULL_UP) -key4 = Pin(16 ,Pin.IN,Pin.PULL_UP) -key5 = Pin(18 ,Pin.IN,Pin.PULL_UP) -key6 = Pin(20 ,Pin.IN,Pin.PULL_UP) - -# End of LCD setup - - -# _ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x1800) -_GENERIC = bluetooth.UUID(0x1848) -_BUTTON_UUID = bluetooth.UUID(0x2A6E) -_ROBOT = bluetooth.UUID(0x180A) - -_BLE_APPEARANCE_GENERIC_REMOTE_CONTROL = const(384) - -# Advertising frequency -ADV_INTERVAL_MS = 250_000 - -device_info = aioble.Service(_ROBOT) - -connection = None - -# Create characteristics for device info -aioble.Characteristic(device_info, bluetooth.UUID(MANUFACTURER_ID), read=True, initial="KevsRobotsRemote") -aioble.Characteristic(device_info, bluetooth.UUID(MODEL_NUMBER_ID), read=True, initial="1.0") -aioble.Characteristic(device_info, bluetooth.UUID(SERIAL_NUMBER_ID), read=True, initial=uid()) -aioble.Characteristic(device_info, bluetooth.UUID(HARDWARE_REVISION_ID), read=True, initial=sys.version) -aioble.Characteristic(device_info, bluetooth.UUID(BLE_VERSION_ID), read=True, initial="1.0") - -remote_service = aioble.Service(_GENERIC) - -button_characteristic = aioble.Characteristic( - remote_service, _BUTTON_UUID, read=True, notify=True -) - -print('registering services') -aioble.register_services(remote_service, device_info) - -connected = False - -async def remote_task(): - """ Send the event to the connected device """ - while True: - if not connected: - print('not connected') - await asyncio.sleep_ms(1000) - continue - - if (keyA.value() == 0): - button_characteristic.write(b"a") - button_characteristic.notify(connection, b"a") - - elif (keyB.value() == 0): - button_characteristic.write(b"b") - button_characteristic.notify(connection, b"b") - - elif (key2.value() == 0): - button_characteristic.write(b"u") - button_characteristic.notify(connection, b"u") - - elif (key3.value() == 0): - button_characteristic.write(b"c") - button_characteristic.notify(connection, b"c") - - elif (key4.value() == 0): - button_characteristic.write(b"l") - button_characteristic.notify(connection, b"l") - - elif (key5.value() == 0): - button_characteristic.write(b"d") - button_characteristic.notify(connection, b"d") - - elif (key6.value() == 0): - button_characteristic.write(b"r") - button_characteristic.notify(connection, b"r") - - else: - button_characteristic.notify(connection, b"!") - - # Show button pushed on LCD - if(keyA.value() == 0): -# LCD.fill_rect(208,12,20,20,LCD.red) - print("A") -# else : -# LCD.fill_rect(208,12,20,20,LCD.white) -# LCD.rect(208,12,20,20,LCD.red) - - if(keyB.value() == 0): - print("B") - else : - pass -# LCD.fill_rect(208,103,20,20,LCD.white) -# LCD.rect(208,103,20,20,LCD.red) - - if(key2.value() == 0): - # LCD.fill_rect(37,35,20,20,LCD.red) - print("UP") - else : - pass - # LCD.fill_rect(37,35,20,20,LCD.white) - # LCD.rect(37,35,20,20,LCD.red) - - if(key3.value() == 0): - # LCD.fill_rect(37,60,20,20,LCD.red) - print("CTRL") - else : - pass - # LCD.fill_rect(37,60,20,20,LCD.white) - # LCD.rect(37,60,20,20,LCD.red) - - if(key4.value() == 0): - # LCD.fill_rect(12,60,20,20,LCD.red) - print("LEFT") - else : - pass - # LCD.fill_rect(12,60,20,20,LCD.white) - # LCD.rect(12,60,20,20,LCD.red) - - if(key5.value() == 0): - # LCD.fill_rect(37,85,20,20,LCD.red) - print("DOWN") - else : - pass - # LCD.fill_rect(37,85,20,20,LCD.white) - # LCD.rect(37,85,20,20,LCD.red) - - if(key6.value() == 0): - # LCD.fill_rect(62,60,20,20,LCD.red) - print("RIGHT") - else : - pass - # LCD.fill_rect(62,60,20,20,LCD.white) - # LCD.rect(62,60,20,20,LCD.red) - - # LCD.show() - - await asyncio.sleep_ms(10) - - -# Serially wait for connections. -# Don't advertise while a central is connected -async def peripheral_task(): - print('peripheral task started') - global connected, connection - while True: - connected = False - async with await aioble.advertise( - ADV_INTERVAL_MS, - name="KevsRobots", - appearance=_BLE_APPEARANCE_GENERIC_REMOTE_CONTROL, - services=[_GENERIC] - ) as connection: - print("Connection from", connection.device) - connected = True - print(f"connected: {connected}") - await connection.disconnected(timeout_ms=None) - print(f'disconnected') - - -async def blink_task(): - print('blink task started') - toggle = True - while True: - led.value(toggle) - toggle = not toggle - blink = 1000 - if connected: - blink = 1000 - else: - blink = 250 - await asyncio.sleep_ms(blink) - -async def main(): - tasks = [ - asyncio.create_task(peripheral_task()), - asyncio.create_task(blink_task()), - asyncio.create_task(remote_task()), - ] - await asyncio.gather(*tasks) - - -asyncio.run(main()) \ No newline at end of file diff --git a/test01.py b/test01.py deleted file mode 100644 index dc02123..0000000 --- a/test01.py +++ /dev/null @@ -1,9 +0,0 @@ -from machine import Pin -from time import sleep - -led = Pin('LED', Pin.OUT) -print('Blinking LED Example') - -while True: - led.value(not led.value()) - sleep(0.5) \ No newline at end of file diff --git a/test_olt_ir.py b/test_olt_ir.py deleted file mode 100644 index 5998bd7..0000000 --- a/test_olt_ir.py +++ /dev/null @@ -1,57 +0,0 @@ -## Test IR TX/RX for OpenLaserTag -## Tomas Krejci [Njord] - -# Run this to characterise a remote. -import ustruct -from sys import platform -import time -import gc -from machine import Pin, freq -from olt_lib.ir_rx.print_error import print_error # Optional print of error codes - -# Import all implemented classes -from olt_lib.ir_rx.olt import LT_24, SONY_12, SONY_15, SONY_20 - -# Define pin according to platform -if platform == "pyboard": - p = Pin("X3", Pin.IN) -elif platform == "esp8266": - freq(160000000) - p = Pin(13, Pin.IN) -elif platform == "esp32" or platform == "esp32_LoBo": - p = Pin(23, Pin.IN) -elif platform == "rp2": - p = Pin(16, Pin.IN) - - - -# User callback -def cb(byte1, byte2, byte3, packet): - print(f"byte1 0x{byte1:02x} byte2 0x{byte2:02x} byte3 0x{byte3:02x} packet 0x{packet:06x}") - -def test(proto=0): - classes = (LT_24, SONY_12, SONY_15, SONY_20) - ir = classes[proto](p, cb) # Instantiate receiver - ir.error_function(print_error) # Show debug information - # ir.verbose = True - # A real application would do something here... - try: - while True: - print("running") - time.sleep(5) - gc.collect() - except KeyboardInterrupt: - ir.close() - - -# **** DISPLAY GREETING **** -s = """Test for IR receiver. Run: -from ir_rx.test import test -test() for LT 24 bit protocol, -test(1) for Sony SIRC 12 bit, -test(2) for Sony SIRC 15 bit, -test(3) for Sony SIRC 20 bit, - -Hit ctrl-c to stop, then ctrl-d to soft reset.""" - -print(s) \ No newline at end of file