add BLE UART, test work
This commit is contained in:
parent
1bea9dfa0b
commit
bc983bdd39
68
ble_main.py
68
ble_main.py
@ -1,68 +0,0 @@
|
||||
import sys
|
||||
|
||||
sys.path.append("")
|
||||
|
||||
from micropython import const
|
||||
|
||||
import uasyncio as asyncio
|
||||
import aioble
|
||||
import bluetooth
|
||||
|
||||
import random
|
||||
import struct
|
||||
|
||||
# org.bluetooth.service.environmental_sensing
|
||||
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
|
||||
# org.bluetooth.characteristic.temperature
|
||||
_ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E)
|
||||
# org.bluetooth.characteristic.gap.appearance.xml
|
||||
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
|
||||
|
||||
# How frequently to send advertising beacons.
|
||||
_ADV_INTERVAL_MS = 250_000
|
||||
|
||||
|
||||
# Register GATT server.
|
||||
temp_service = aioble.Service(_ENV_SENSE_UUID)
|
||||
temp_characteristic = aioble.Characteristic(
|
||||
temp_service, _ENV_SENSE_TEMP_UUID, read=True, notify=True
|
||||
)
|
||||
aioble.register_services(temp_service)
|
||||
|
||||
|
||||
# Helper to encode the temperature characteristic encoding (sint16, hundredths of a degree).
|
||||
def _encode_temperature(temp_deg_c):
|
||||
return struct.pack("<h", int(temp_deg_c * 100))
|
||||
|
||||
|
||||
# This would be periodically polling a hardware sensor.
|
||||
async def sensor_task():
|
||||
t = 24.5
|
||||
while True:
|
||||
temp_characteristic.write(_encode_temperature(t))
|
||||
t += random.uniform(-0.5, 0.5)
|
||||
await asyncio.sleep_ms(1000)
|
||||
|
||||
|
||||
# Serially wait for connections. Don't advertise while a central is
|
||||
# connected.
|
||||
async def peripheral_task():
|
||||
while True:
|
||||
async with await aioble.advertise(
|
||||
_ADV_INTERVAL_MS,
|
||||
name="mpy-temp",
|
||||
services=[_ENV_SENSE_UUID],
|
||||
appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER,
|
||||
) as connection:
|
||||
print("Connection from", connection.device)
|
||||
await connection.disconnected()
|
||||
|
||||
|
||||
# Run both tasks.
|
||||
async def main():
|
||||
t1 = asyncio.create_task(sensor_task())
|
||||
t2 = asyncio.create_task(peripheral_task())
|
||||
await asyncio.gather(t1, t2)
|
||||
|
||||
|
||||
asyncio.run(main())
|
@ -1,80 +0,0 @@
|
||||
#import time
|
||||
from machine import Pin, ADC
|
||||
import aioble
|
||||
import uasyncio as asyncio
|
||||
import bluetooth
|
||||
from micropython import const
|
||||
|
||||
_ADVERTISING_INTERVAL_US = const(200_000)
|
||||
_APPEARANCE = const(0x0552) # Multi-sensor
|
||||
_TEMP_MEASUREMENT_INTERVAL_MS = const(15_000)
|
||||
_TEMP_MEASUREMENT_FREQUENCY = const(15_000)
|
||||
|
||||
connected=False
|
||||
|
||||
async def task_peripheral():
|
||||
""" Task to handle advertising and connections """
|
||||
global connected
|
||||
while True:
|
||||
connected = False
|
||||
async with await aioble.advertise(
|
||||
_ADVERTISING_INTERVAL_US,
|
||||
appearance=_APPEARANCE,
|
||||
name='RP2-SENSOR',
|
||||
services=[_SVC_DEVICE_INFO]
|
||||
) as connection:
|
||||
print("Connected from ", connection.device)
|
||||
connected = True
|
||||
|
||||
async def task_flash_led():
|
||||
""" Blink the on-board LED, faster if disconnected and slower if connected """
|
||||
BLINK_DELAY_MS_FAST = const(100)
|
||||
BLINK_DELAY_MS_SLOW = const(500)
|
||||
while True:
|
||||
led.toggle()
|
||||
if connected:
|
||||
await asyncio.sleep_ms(BLINK_DELAY_MS_SLOW)
|
||||
else:
|
||||
await asyncio.sleep_ms(BLINK_DELAY_MS_FAST)
|
||||
|
||||
async def task_sensor():
|
||||
""" Task to handle sensor measures """
|
||||
while True:
|
||||
temperature = 27.0 - ((adc.read_u16() * 3.3 / 65535) - 0.706) / 0.001721
|
||||
print("T: {}°C".format(temperature))
|
||||
await asyncio.sleep_ms(_TEMP_MEASUREMENT_FREQUENCY)
|
||||
#time.sleep_ms(_TEMP_MEASUREMENT_INTERVAL_MS)
|
||||
|
||||
led = Pin('LED', Pin.OUT)
|
||||
adc = ADC(4)
|
||||
|
||||
# Constants for the device information service
|
||||
_SVC_DEVICE_INFO = bluetooth.UUID(0x180A)
|
||||
svc_dev_info = aioble.Service(_SVC_DEVICE_INFO)
|
||||
|
||||
_CHAR_MANUFACTURER_NAME_STR = bluetooth.UUID(0x2A29)
|
||||
_CHAR_MODEL_NUMBER_STR = bluetooth.UUID(0x2A24)
|
||||
_CHAR_SERIAL_NUMBER_STR = bluetooth.UUID(0x2A25)
|
||||
_CHAR_FIRMWARE_REV_STR = bluetooth.UUID(0x2A26)
|
||||
_CHAR_HARDWARE_REV_STR = bluetooth.UUID(0x2A27)
|
||||
|
||||
# Create characteristics for device info
|
||||
aioble.Characteristic(svc_dev_info, _CHAR_MANUFACTURER_NAME_STR, read=True, initial='Njord')
|
||||
aioble.Characteristic(svc_dev_info, _CHAR_MODEL_NUMBER_STR, read=True, initial='J-0001')
|
||||
aioble.Characteristic(svc_dev_info, _CHAR_SERIAL_NUMBER_STR, read=True, initial='J-0001-0000')
|
||||
aioble.Characteristic(svc_dev_info, _CHAR_FIRMWARE_REV_STR, read=True, initial='0.0.1')
|
||||
aioble.Characteristic(svc_dev_info, _CHAR_HARDWARE_REV_STR, read=True, initial='0.0.1')
|
||||
|
||||
aioble.register_services(svc_dev_info)
|
||||
|
||||
|
||||
async def main():
|
||||
""" Create all the tasks """
|
||||
tasks = [
|
||||
asyncio.create_task(task_peripheral()),
|
||||
asyncio.create_task(task_flash_led()),
|
||||
asyncio.create_task(task_sensor()),
|
||||
]
|
||||
asyncio.gather(*tasks)
|
||||
|
||||
asyncio.run(main())
|
247
ble_main_3.py
247
ble_main_3.py
@ -1,247 +0,0 @@
|
||||
# Kevin McAleer
|
||||
# 2023-06-28
|
||||
# Bluetooth cores specification versio 5.4 (0x0D)
|
||||
|
||||
import sys
|
||||
|
||||
import aioble
|
||||
import bluetooth
|
||||
import gc
|
||||
import machine
|
||||
import uasyncio as asyncio
|
||||
from micropython import const, mem_info
|
||||
#from pico_lcd_1_14 import LCD_1inch14
|
||||
from machine import Pin,SPI,PWM
|
||||
import framebuf
|
||||
|
||||
|
||||
|
||||
def uid():
|
||||
""" Return the unique id of the device as a string """
|
||||
return "{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}".format(
|
||||
*machine.unique_id())
|
||||
|
||||
MANUFACTURER_ID = const(0x02A29)
|
||||
MODEL_NUMBER_ID = const(0x2A24)
|
||||
SERIAL_NUMBER_ID = const(0x2A25)
|
||||
HARDWARE_REVISION_ID = const(0x2A26)
|
||||
BLE_VERSION_ID = const(0x2A28)
|
||||
|
||||
led = machine.Pin("LED", machine.Pin.OUT)
|
||||
|
||||
# Begin set up of LCD
|
||||
BL = 13
|
||||
DC = 8
|
||||
RST = 12
|
||||
MOSI = 11
|
||||
SCK = 10
|
||||
CS = 9
|
||||
|
||||
pwm = PWM(Pin(BL))
|
||||
pwm.freq(1000)
|
||||
pwm.duty_u16(32768)#max 65535
|
||||
|
||||
#LCD = LCD_1inch14()
|
||||
#color BRG
|
||||
""""
|
||||
LCD.fill(LCD.white)
|
||||
|
||||
LCD.show()
|
||||
LCD.text("Raspberry Pi Pico",90,40,LCD.red)
|
||||
LCD.text("PicoGo",90,60,LCD.green)
|
||||
LCD.text("Pico-LCD-1.14",90,80,LCD.blue)
|
||||
|
||||
LCD.hline(10,10,220,LCD.blue)
|
||||
LCD.hline(10,125,220,LCD.blue)
|
||||
LCD.vline(10,10,115,LCD.blue)
|
||||
LCD.vline(230,10,115,LCD.blue)
|
||||
|
||||
LCD.show()
|
||||
"""
|
||||
|
||||
keyA = Pin(15,Pin.IN,Pin.PULL_UP)
|
||||
keyB = Pin(17,Pin.IN,Pin.PULL_UP)
|
||||
|
||||
key2 = Pin(2 ,Pin.IN,Pin.PULL_UP)
|
||||
key3 = Pin(3 ,Pin.IN,Pin.PULL_UP)
|
||||
key4 = Pin(16 ,Pin.IN,Pin.PULL_UP)
|
||||
key5 = Pin(18 ,Pin.IN,Pin.PULL_UP)
|
||||
key6 = Pin(20 ,Pin.IN,Pin.PULL_UP)
|
||||
|
||||
# End of LCD setup
|
||||
|
||||
|
||||
# _ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x1800)
|
||||
_GENERIC = bluetooth.UUID(0x1848)
|
||||
_BUTTON_UUID = bluetooth.UUID(0x2A6E)
|
||||
_ROBOT = bluetooth.UUID(0x180A)
|
||||
|
||||
_BLE_APPEARANCE_GENERIC_REMOTE_CONTROL = const(384)
|
||||
|
||||
# Advertising frequency
|
||||
ADV_INTERVAL_MS = 250_000
|
||||
|
||||
device_info = aioble.Service(_ROBOT)
|
||||
|
||||
connection = None
|
||||
|
||||
# Create characteristics for device info
|
||||
aioble.Characteristic(device_info, bluetooth.UUID(MANUFACTURER_ID), read=True, initial="KevsRobotsRemote")
|
||||
aioble.Characteristic(device_info, bluetooth.UUID(MODEL_NUMBER_ID), read=True, initial="1.0")
|
||||
aioble.Characteristic(device_info, bluetooth.UUID(SERIAL_NUMBER_ID), read=True, initial=uid())
|
||||
aioble.Characteristic(device_info, bluetooth.UUID(HARDWARE_REVISION_ID), read=True, initial=sys.version)
|
||||
aioble.Characteristic(device_info, bluetooth.UUID(BLE_VERSION_ID), read=True, initial="1.0")
|
||||
|
||||
remote_service = aioble.Service(_GENERIC)
|
||||
|
||||
button_characteristic = aioble.Characteristic(
|
||||
remote_service, _BUTTON_UUID, read=True, notify=True
|
||||
)
|
||||
|
||||
print('registering services')
|
||||
aioble.register_services(remote_service, device_info)
|
||||
|
||||
connected = False
|
||||
|
||||
async def remote_task():
|
||||
""" Send the event to the connected device """
|
||||
while True:
|
||||
if not connected:
|
||||
print('not connected')
|
||||
await asyncio.sleep_ms(1000)
|
||||
continue
|
||||
|
||||
if (keyA.value() == 0):
|
||||
button_characteristic.write(b"a")
|
||||
button_characteristic.notify(connection, b"a")
|
||||
|
||||
elif (keyB.value() == 0):
|
||||
button_characteristic.write(b"b")
|
||||
button_characteristic.notify(connection, b"b")
|
||||
|
||||
elif (key2.value() == 0):
|
||||
button_characteristic.write(b"u")
|
||||
button_characteristic.notify(connection, b"u")
|
||||
|
||||
elif (key3.value() == 0):
|
||||
button_characteristic.write(b"c")
|
||||
button_characteristic.notify(connection, b"c")
|
||||
|
||||
elif (key4.value() == 0):
|
||||
button_characteristic.write(b"l")
|
||||
button_characteristic.notify(connection, b"l")
|
||||
|
||||
elif (key5.value() == 0):
|
||||
button_characteristic.write(b"d")
|
||||
button_characteristic.notify(connection, b"d")
|
||||
|
||||
elif (key6.value() == 0):
|
||||
button_characteristic.write(b"r")
|
||||
button_characteristic.notify(connection, b"r")
|
||||
|
||||
else:
|
||||
button_characteristic.notify(connection, b"!")
|
||||
|
||||
# Show button pushed on LCD
|
||||
if(keyA.value() == 0):
|
||||
# LCD.fill_rect(208,12,20,20,LCD.red)
|
||||
print("A")
|
||||
# else :
|
||||
# LCD.fill_rect(208,12,20,20,LCD.white)
|
||||
# LCD.rect(208,12,20,20,LCD.red)
|
||||
|
||||
if(keyB.value() == 0):
|
||||
print("B")
|
||||
else :
|
||||
pass
|
||||
# LCD.fill_rect(208,103,20,20,LCD.white)
|
||||
# LCD.rect(208,103,20,20,LCD.red)
|
||||
|
||||
if(key2.value() == 0):
|
||||
# LCD.fill_rect(37,35,20,20,LCD.red)
|
||||
print("UP")
|
||||
else :
|
||||
pass
|
||||
# LCD.fill_rect(37,35,20,20,LCD.white)
|
||||
# LCD.rect(37,35,20,20,LCD.red)
|
||||
|
||||
if(key3.value() == 0):
|
||||
# LCD.fill_rect(37,60,20,20,LCD.red)
|
||||
print("CTRL")
|
||||
else :
|
||||
pass
|
||||
# LCD.fill_rect(37,60,20,20,LCD.white)
|
||||
# LCD.rect(37,60,20,20,LCD.red)
|
||||
|
||||
if(key4.value() == 0):
|
||||
# LCD.fill_rect(12,60,20,20,LCD.red)
|
||||
print("LEFT")
|
||||
else :
|
||||
pass
|
||||
# LCD.fill_rect(12,60,20,20,LCD.white)
|
||||
# LCD.rect(12,60,20,20,LCD.red)
|
||||
|
||||
if(key5.value() == 0):
|
||||
# LCD.fill_rect(37,85,20,20,LCD.red)
|
||||
print("DOWN")
|
||||
else :
|
||||
pass
|
||||
# LCD.fill_rect(37,85,20,20,LCD.white)
|
||||
# LCD.rect(37,85,20,20,LCD.red)
|
||||
|
||||
if(key6.value() == 0):
|
||||
# LCD.fill_rect(62,60,20,20,LCD.red)
|
||||
print("RIGHT")
|
||||
else :
|
||||
pass
|
||||
# LCD.fill_rect(62,60,20,20,LCD.white)
|
||||
# LCD.rect(62,60,20,20,LCD.red)
|
||||
|
||||
# LCD.show()
|
||||
|
||||
await asyncio.sleep_ms(10)
|
||||
|
||||
|
||||
# Serially wait for connections.
|
||||
# Don't advertise while a central is connected
|
||||
async def peripheral_task():
|
||||
print('peripheral task started')
|
||||
global connected, connection
|
||||
while True:
|
||||
connected = False
|
||||
async with await aioble.advertise(
|
||||
ADV_INTERVAL_MS,
|
||||
name="KevsRobots",
|
||||
appearance=_BLE_APPEARANCE_GENERIC_REMOTE_CONTROL,
|
||||
services=[_GENERIC]
|
||||
) as connection:
|
||||
print("Connection from", connection.device)
|
||||
connected = True
|
||||
print(f"connected: {connected}")
|
||||
await connection.disconnected(timeout_ms=None)
|
||||
print(f'disconnected')
|
||||
|
||||
|
||||
async def blink_task():
|
||||
print('blink task started')
|
||||
toggle = True
|
||||
while True:
|
||||
led.value(toggle)
|
||||
toggle = not toggle
|
||||
blink = 1000
|
||||
if connected:
|
||||
blink = 1000
|
||||
else:
|
||||
blink = 250
|
||||
await asyncio.sleep_ms(blink)
|
||||
|
||||
async def main():
|
||||
tasks = [
|
||||
asyncio.create_task(peripheral_task()),
|
||||
asyncio.create_task(blink_task()),
|
||||
asyncio.create_task(remote_task()),
|
||||
]
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
|
||||
asyncio.run(main())
|
@ -1,106 +0,0 @@
|
||||
# This example demonstrates a UART periperhal.
|
||||
|
||||
# This example demonstrates the low-level bluetooth module. For most
|
||||
# applications, we recommend using the higher-level aioble library which takes
|
||||
# care of all IRQ handling and connection management. See
|
||||
# https://github.com/micropython/micropython-lib/tree/master/micropython/bluetooth/aioble
|
||||
|
||||
import bluetooth
|
||||
import random
|
||||
import struct
|
||||
import time
|
||||
from ble_advertising import advertising_payload
|
||||
|
||||
from micropython import const
|
||||
|
||||
_IRQ_CENTRAL_CONNECT = const(1)
|
||||
_IRQ_CENTRAL_DISCONNECT = const(2)
|
||||
_IRQ_GATTS_WRITE = const(3)
|
||||
|
||||
_FLAG_READ = const(0x0002)
|
||||
_FLAG_WRITE_NO_RESPONSE = const(0x0004)
|
||||
_FLAG_WRITE = const(0x0008)
|
||||
_FLAG_NOTIFY = const(0x0010)
|
||||
|
||||
_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
|
||||
_UART_TX = (
|
||||
bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
|
||||
_FLAG_READ | _FLAG_NOTIFY,
|
||||
)
|
||||
_UART_RX = (
|
||||
bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
|
||||
_FLAG_WRITE | _FLAG_WRITE_NO_RESPONSE,
|
||||
)
|
||||
_UART_SERVICE = (
|
||||
_UART_UUID,
|
||||
(_UART_TX, _UART_RX),
|
||||
)
|
||||
|
||||
|
||||
class BLESimplePeripheral:
|
||||
def __init__(self, ble, name="mpy-uart"):
|
||||
self._ble = ble
|
||||
self._ble.active(True)
|
||||
self._ble.irq(self._irq)
|
||||
((self._handle_tx, self._handle_rx),) = self._ble.gatts_register_services((_UART_SERVICE,))
|
||||
self._connections = set()
|
||||
self._write_callback = None
|
||||
self._payload = advertising_payload(name=name, services=[_UART_UUID])
|
||||
self._advertise()
|
||||
|
||||
def _irq(self, event, data):
|
||||
# Track connections so we can send notifications.
|
||||
if event == _IRQ_CENTRAL_CONNECT:
|
||||
conn_handle, _, _ = data
|
||||
print("New connection", conn_handle)
|
||||
self._connections.add(conn_handle)
|
||||
elif event == _IRQ_CENTRAL_DISCONNECT:
|
||||
conn_handle, _, _ = data
|
||||
print("Disconnected", conn_handle)
|
||||
self._connections.remove(conn_handle)
|
||||
# Start advertising again to allow a new connection.
|
||||
self._advertise()
|
||||
elif event == _IRQ_GATTS_WRITE:
|
||||
conn_handle, value_handle = data
|
||||
value = self._ble.gatts_read(value_handle)
|
||||
if value_handle == self._handle_rx and self._write_callback:
|
||||
self._write_callback(value)
|
||||
|
||||
def send(self, data):
|
||||
for conn_handle in self._connections:
|
||||
self._ble.gatts_notify(conn_handle, self._handle_tx, data)
|
||||
|
||||
def is_connected(self):
|
||||
return len(self._connections) > 0
|
||||
|
||||
def _advertise(self, interval_us=500000):
|
||||
print("Starting advertising")
|
||||
self._ble.gap_advertise(interval_us, adv_data=self._payload)
|
||||
|
||||
def on_write(self, callback):
|
||||
self._write_callback = callback
|
||||
|
||||
|
||||
def demo():
|
||||
ble = bluetooth.BLE()
|
||||
p = BLESimplePeripheral(ble)
|
||||
|
||||
def on_rx(v):
|
||||
print("RX", v)
|
||||
|
||||
p.on_write(on_rx)
|
||||
|
||||
i = 0
|
||||
while True:
|
||||
if p.is_connected():
|
||||
# Short burst of queued notifications.
|
||||
for _ in range(3):
|
||||
data = str(i) + "_"
|
||||
print("TX", data)
|
||||
p.send(data)
|
||||
i += 1
|
||||
time.sleep_ms(100)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
demo()
|
92
lib/ble_uart/__init__.py
Normal file
92
lib/ble_uart/__init__.py
Normal file
@ -0,0 +1,92 @@
|
||||
import bluetooth
|
||||
from ble_uart.ble_advertising import advertising_payload
|
||||
|
||||
from micropython import const
|
||||
|
||||
_IRQ_CENTRAL_CONNECT = const(1)
|
||||
_IRQ_CENTRAL_DISCONNECT = const(2)
|
||||
_IRQ_GATTS_WRITE = const(3)
|
||||
|
||||
_FLAG_WRITE = const(0x0008)
|
||||
_FLAG_NOTIFY = const(0x0010)
|
||||
|
||||
_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
|
||||
_UART_TX = (
|
||||
bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
|
||||
_FLAG_NOTIFY,
|
||||
)
|
||||
_UART_RX = (
|
||||
bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
|
||||
_FLAG_WRITE,
|
||||
)
|
||||
_UART_SERVICE = (
|
||||
_UART_UUID,
|
||||
(_UART_TX, _UART_RX),
|
||||
)
|
||||
|
||||
# org.bluetooth.characteristic.gap.appearance.xml
|
||||
_ADV_APPEARANCE_GENERIC_COMPUTER = const(128)
|
||||
|
||||
|
||||
class Ble_uart:
|
||||
def __init__(self, ble, on_rx=None, name="mpy-uart", rxbuf=100):
|
||||
self._ble = ble
|
||||
self._ble.active(True)
|
||||
self._ble.irq(self._irq)
|
||||
((self._tx_handle, self._rx_handle),) = self._ble.gatts_register_services(
|
||||
(_UART_SERVICE,)
|
||||
)
|
||||
# Increase the size of the rx buffer and enable append mode.
|
||||
self._ble.gatts_set_buffer(self._rx_handle, rxbuf, True)
|
||||
self._connections = set()
|
||||
self._rx_buffer = bytearray()
|
||||
self._handler = None
|
||||
# Optionally add services=[_UART_UUID], but this is likely to make the payload too large.
|
||||
self._payload = advertising_payload(
|
||||
name=name, appearance=_ADV_APPEARANCE_GENERIC_COMPUTER
|
||||
)
|
||||
self._advertise()
|
||||
self.irq(handler=on_rx)
|
||||
|
||||
def irq(self, handler):
|
||||
self._handler = handler
|
||||
|
||||
def _irq(self, event, data):
|
||||
# Track connections so we can send notifications.
|
||||
if event == _IRQ_CENTRAL_CONNECT:
|
||||
conn_handle, _, _ = data
|
||||
self._connections.add(conn_handle)
|
||||
elif event == _IRQ_CENTRAL_DISCONNECT:
|
||||
conn_handle, _, _ = data
|
||||
if conn_handle in self._connections:
|
||||
self._connections.remove(conn_handle)
|
||||
# Start advertising again to allow a new connection.
|
||||
self._advertise()
|
||||
elif event == _IRQ_GATTS_WRITE:
|
||||
conn_handle, value_handle = data
|
||||
if conn_handle in self._connections and value_handle == self._rx_handle:
|
||||
self._rx_buffer += self._ble.gatts_read(self._rx_handle)
|
||||
if self._handler:
|
||||
self._handler()
|
||||
|
||||
def any(self):
|
||||
return len(self._rx_buffer)
|
||||
|
||||
def read(self, sz=None):
|
||||
if not sz:
|
||||
sz = len(self._rx_buffer)
|
||||
result = self._rx_buffer[0:sz]
|
||||
self._rx_buffer = self._rx_buffer[sz:]
|
||||
return result
|
||||
|
||||
def write(self, data):
|
||||
for conn_handle in self._connections:
|
||||
self._ble.gatts_notify(conn_handle, self._tx_handle, data)
|
||||
|
||||
def close(self):
|
||||
for conn_handle in self._connections:
|
||||
self._ble.gap_disconnect(conn_handle)
|
||||
self._connections.clear()
|
||||
|
||||
def _advertise(self, interval_us=500000):
|
||||
self._ble.gap_advertise(interval_us, adv_data=self._payload)
|
@ -1,9 +1,5 @@
|
||||
# Helpers for generating BLE advertising payloads.
|
||||
|
||||
# A more fully-featured (and easier to use) version of this is implemented in
|
||||
# aioble. This code is provided just as a basic example. See
|
||||
# https://github.com/micropython/micropython-lib/tree/master/micropython/bluetooth/aioble
|
||||
|
||||
from micropython import const
|
||||
import struct
|
||||
import bluetooth
|
||||
@ -23,8 +19,6 @@ _ADV_TYPE_UUID32_MORE = const(0x4)
|
||||
_ADV_TYPE_UUID128_MORE = const(0x6)
|
||||
_ADV_TYPE_APPEARANCE = const(0x19)
|
||||
|
||||
_ADV_MAX_PAYLOAD = const(31)
|
||||
|
||||
|
||||
# Generate a payload to be passed to gap_advertise(adv_data=...).
|
||||
def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0):
|
||||
@ -56,9 +50,6 @@ def advertising_payload(limited_disc=False, br_edr=False, name=None, services=No
|
||||
if appearance:
|
||||
_append(_ADV_TYPE_APPEARANCE, struct.pack("<h", appearance))
|
||||
|
||||
if len(payload) > _ADV_MAX_PAYLOAD:
|
||||
raise ValueError("advertising payload too large")
|
||||
|
||||
return payload
|
||||
|
||||
|
||||
@ -99,4 +90,4 @@ def demo():
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
demo()
|
||||
demo()
|
14
lib/ble_uart/test.py
Normal file
14
lib/ble_uart/test.py
Normal file
@ -0,0 +1,14 @@
|
||||
import ble_uart
|
||||
import bluetooth
|
||||
import time
|
||||
|
||||
def print_handler():
|
||||
print(uart.read().decode().strip())
|
||||
|
||||
ble = bluetooth.BLE()
|
||||
uart = ble_uart.Ble_uart(ble, on_rx=print_handler, name="bens app")
|
||||
|
||||
|
||||
while True:
|
||||
uart.write("hello\n")
|
||||
time.sleep(1)
|
247
remote.py
247
remote.py
@ -1,247 +0,0 @@
|
||||
# Kevin McAleer
|
||||
# 2023-06-28
|
||||
# Bluetooth cores specification versio 5.4 (0x0D)
|
||||
|
||||
import sys
|
||||
|
||||
import aioble
|
||||
import bluetooth
|
||||
import gc
|
||||
import machine
|
||||
import uasyncio as asyncio
|
||||
from micropython import const, mem_info
|
||||
#from pico_lcd_1_14 import LCD_1inch14
|
||||
from machine import Pin,SPI,PWM
|
||||
import framebuf
|
||||
|
||||
|
||||
|
||||
def uid():
|
||||
""" Return the unique id of the device as a string """
|
||||
return "{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}".format(
|
||||
*machine.unique_id())
|
||||
|
||||
MANUFACTURER_ID = const(0x02A29)
|
||||
MODEL_NUMBER_ID = const(0x2A24)
|
||||
SERIAL_NUMBER_ID = const(0x2A25)
|
||||
HARDWARE_REVISION_ID = const(0x2A26)
|
||||
BLE_VERSION_ID = const(0x2A28)
|
||||
|
||||
led = machine.Pin("LED", machine.Pin.OUT)
|
||||
|
||||
# Begin set up of LCD
|
||||
BL = 13
|
||||
DC = 8
|
||||
RST = 12
|
||||
MOSI = 11
|
||||
SCK = 10
|
||||
CS = 9
|
||||
|
||||
pwm = PWM(Pin(BL))
|
||||
pwm.freq(1000)
|
||||
pwm.duty_u16(32768)#max 65535
|
||||
|
||||
#LCD = LCD_1inch14()
|
||||
#color BRG
|
||||
""""
|
||||
LCD.fill(LCD.white)
|
||||
|
||||
LCD.show()
|
||||
LCD.text("Raspberry Pi Pico",90,40,LCD.red)
|
||||
LCD.text("PicoGo",90,60,LCD.green)
|
||||
LCD.text("Pico-LCD-1.14",90,80,LCD.blue)
|
||||
|
||||
LCD.hline(10,10,220,LCD.blue)
|
||||
LCD.hline(10,125,220,LCD.blue)
|
||||
LCD.vline(10,10,115,LCD.blue)
|
||||
LCD.vline(230,10,115,LCD.blue)
|
||||
|
||||
LCD.show()
|
||||
"""
|
||||
|
||||
keyA = Pin(15,Pin.IN,Pin.PULL_UP)
|
||||
keyB = Pin(17,Pin.IN,Pin.PULL_UP)
|
||||
|
||||
key2 = Pin(2 ,Pin.IN,Pin.PULL_UP)
|
||||
key3 = Pin(3 ,Pin.IN,Pin.PULL_UP)
|
||||
key4 = Pin(16 ,Pin.IN,Pin.PULL_UP)
|
||||
key5 = Pin(18 ,Pin.IN,Pin.PULL_UP)
|
||||
key6 = Pin(20 ,Pin.IN,Pin.PULL_UP)
|
||||
|
||||
# End of LCD setup
|
||||
|
||||
|
||||
# _ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x1800)
|
||||
_GENERIC = bluetooth.UUID(0x1848)
|
||||
_BUTTON_UUID = bluetooth.UUID(0x2A6E)
|
||||
_ROBOT = bluetooth.UUID(0x180A)
|
||||
|
||||
_BLE_APPEARANCE_GENERIC_REMOTE_CONTROL = const(384)
|
||||
|
||||
# Advertising frequency
|
||||
ADV_INTERVAL_MS = 250_000
|
||||
|
||||
device_info = aioble.Service(_ROBOT)
|
||||
|
||||
connection = None
|
||||
|
||||
# Create characteristics for device info
|
||||
aioble.Characteristic(device_info, bluetooth.UUID(MANUFACTURER_ID), read=True, initial="KevsRobotsRemote")
|
||||
aioble.Characteristic(device_info, bluetooth.UUID(MODEL_NUMBER_ID), read=True, initial="1.0")
|
||||
aioble.Characteristic(device_info, bluetooth.UUID(SERIAL_NUMBER_ID), read=True, initial=uid())
|
||||
aioble.Characteristic(device_info, bluetooth.UUID(HARDWARE_REVISION_ID), read=True, initial=sys.version)
|
||||
aioble.Characteristic(device_info, bluetooth.UUID(BLE_VERSION_ID), read=True, initial="1.0")
|
||||
|
||||
remote_service = aioble.Service(_GENERIC)
|
||||
|
||||
button_characteristic = aioble.Characteristic(
|
||||
remote_service, _BUTTON_UUID, read=True, notify=True
|
||||
)
|
||||
|
||||
print('registering services')
|
||||
aioble.register_services(remote_service, device_info)
|
||||
|
||||
connected = False
|
||||
|
||||
async def remote_task():
|
||||
""" Send the event to the connected device """
|
||||
while True:
|
||||
if not connected:
|
||||
print('not connected')
|
||||
await asyncio.sleep_ms(1000)
|
||||
continue
|
||||
|
||||
if (keyA.value() == 0):
|
||||
button_characteristic.write(b"a")
|
||||
button_characteristic.notify(connection, b"a")
|
||||
|
||||
elif (keyB.value() == 0):
|
||||
button_characteristic.write(b"b")
|
||||
button_characteristic.notify(connection, b"b")
|
||||
|
||||
elif (key2.value() == 0):
|
||||
button_characteristic.write(b"u")
|
||||
button_characteristic.notify(connection, b"u")
|
||||
|
||||
elif (key3.value() == 0):
|
||||
button_characteristic.write(b"c")
|
||||
button_characteristic.notify(connection, b"c")
|
||||
|
||||
elif (key4.value() == 0):
|
||||
button_characteristic.write(b"l")
|
||||
button_characteristic.notify(connection, b"l")
|
||||
|
||||
elif (key5.value() == 0):
|
||||
button_characteristic.write(b"d")
|
||||
button_characteristic.notify(connection, b"d")
|
||||
|
||||
elif (key6.value() == 0):
|
||||
button_characteristic.write(b"r")
|
||||
button_characteristic.notify(connection, b"r")
|
||||
|
||||
else:
|
||||
button_characteristic.notify(connection, b"!")
|
||||
|
||||
# Show button pushed on LCD
|
||||
if(keyA.value() == 0):
|
||||
# LCD.fill_rect(208,12,20,20,LCD.red)
|
||||
print("A")
|
||||
# else :
|
||||
# LCD.fill_rect(208,12,20,20,LCD.white)
|
||||
# LCD.rect(208,12,20,20,LCD.red)
|
||||
|
||||
if(keyB.value() == 0):
|
||||
print("B")
|
||||
else :
|
||||
pass
|
||||
# LCD.fill_rect(208,103,20,20,LCD.white)
|
||||
# LCD.rect(208,103,20,20,LCD.red)
|
||||
|
||||
if(key2.value() == 0):
|
||||
# LCD.fill_rect(37,35,20,20,LCD.red)
|
||||
print("UP")
|
||||
else :
|
||||
pass
|
||||
# LCD.fill_rect(37,35,20,20,LCD.white)
|
||||
# LCD.rect(37,35,20,20,LCD.red)
|
||||
|
||||
if(key3.value() == 0):
|
||||
# LCD.fill_rect(37,60,20,20,LCD.red)
|
||||
print("CTRL")
|
||||
else :
|
||||
pass
|
||||
# LCD.fill_rect(37,60,20,20,LCD.white)
|
||||
# LCD.rect(37,60,20,20,LCD.red)
|
||||
|
||||
if(key4.value() == 0):
|
||||
# LCD.fill_rect(12,60,20,20,LCD.red)
|
||||
print("LEFT")
|
||||
else :
|
||||
pass
|
||||
# LCD.fill_rect(12,60,20,20,LCD.white)
|
||||
# LCD.rect(12,60,20,20,LCD.red)
|
||||
|
||||
if(key5.value() == 0):
|
||||
# LCD.fill_rect(37,85,20,20,LCD.red)
|
||||
print("DOWN")
|
||||
else :
|
||||
pass
|
||||
# LCD.fill_rect(37,85,20,20,LCD.white)
|
||||
# LCD.rect(37,85,20,20,LCD.red)
|
||||
|
||||
if(key6.value() == 0):
|
||||
# LCD.fill_rect(62,60,20,20,LCD.red)
|
||||
print("RIGHT")
|
||||
else :
|
||||
pass
|
||||
# LCD.fill_rect(62,60,20,20,LCD.white)
|
||||
# LCD.rect(62,60,20,20,LCD.red)
|
||||
|
||||
# LCD.show()
|
||||
|
||||
await asyncio.sleep_ms(10)
|
||||
|
||||
|
||||
# Serially wait for connections.
|
||||
# Don't advertise while a central is connected
|
||||
async def peripheral_task():
|
||||
print('peripheral task started')
|
||||
global connected, connection
|
||||
while True:
|
||||
connected = False
|
||||
async with await aioble.advertise(
|
||||
ADV_INTERVAL_MS,
|
||||
name="KevsRobots",
|
||||
appearance=_BLE_APPEARANCE_GENERIC_REMOTE_CONTROL,
|
||||
services=[_GENERIC]
|
||||
) as connection:
|
||||
print("Connection from", connection.device)
|
||||
connected = True
|
||||
print(f"connected: {connected}")
|
||||
await connection.disconnected(timeout_ms=None)
|
||||
print(f'disconnected')
|
||||
|
||||
|
||||
async def blink_task():
|
||||
print('blink task started')
|
||||
toggle = True
|
||||
while True:
|
||||
led.value(toggle)
|
||||
toggle = not toggle
|
||||
blink = 1000
|
||||
if connected:
|
||||
blink = 1000
|
||||
else:
|
||||
blink = 250
|
||||
await asyncio.sleep_ms(blink)
|
||||
|
||||
async def main():
|
||||
tasks = [
|
||||
asyncio.create_task(peripheral_task()),
|
||||
asyncio.create_task(blink_task()),
|
||||
asyncio.create_task(remote_task()),
|
||||
]
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
|
||||
asyncio.run(main())
|
@ -1,9 +0,0 @@
|
||||
from machine import Pin
|
||||
from time import sleep
|
||||
|
||||
led = Pin('LED', Pin.OUT)
|
||||
print('Blinking LED Example')
|
||||
|
||||
while True:
|
||||
led.value(not led.value())
|
||||
sleep(0.5)
|
@ -1,57 +0,0 @@
|
||||
## Test IR TX/RX for OpenLaserTag
|
||||
## Tomas Krejci [Njord]
|
||||
|
||||
# Run this to characterise a remote.
|
||||
import ustruct
|
||||
from sys import platform
|
||||
import time
|
||||
import gc
|
||||
from machine import Pin, freq
|
||||
from olt_lib.ir_rx.print_error import print_error # Optional print of error codes
|
||||
|
||||
# Import all implemented classes
|
||||
from olt_lib.ir_rx.olt import LT_24, SONY_12, SONY_15, SONY_20
|
||||
|
||||
# Define pin according to platform
|
||||
if platform == "pyboard":
|
||||
p = Pin("X3", Pin.IN)
|
||||
elif platform == "esp8266":
|
||||
freq(160000000)
|
||||
p = Pin(13, Pin.IN)
|
||||
elif platform == "esp32" or platform == "esp32_LoBo":
|
||||
p = Pin(23, Pin.IN)
|
||||
elif platform == "rp2":
|
||||
p = Pin(16, Pin.IN)
|
||||
|
||||
|
||||
|
||||
# User callback
|
||||
def cb(byte1, byte2, byte3, packet):
|
||||
print(f"byte1 0x{byte1:02x} byte2 0x{byte2:02x} byte3 0x{byte3:02x} packet 0x{packet:06x}")
|
||||
|
||||
def test(proto=0):
|
||||
classes = (LT_24, SONY_12, SONY_15, SONY_20)
|
||||
ir = classes[proto](p, cb) # Instantiate receiver
|
||||
ir.error_function(print_error) # Show debug information
|
||||
# ir.verbose = True
|
||||
# A real application would do something here...
|
||||
try:
|
||||
while True:
|
||||
print("running")
|
||||
time.sleep(5)
|
||||
gc.collect()
|
||||
except KeyboardInterrupt:
|
||||
ir.close()
|
||||
|
||||
|
||||
# **** DISPLAY GREETING ****
|
||||
s = """Test for IR receiver. Run:
|
||||
from ir_rx.test import test
|
||||
test() for LT 24 bit protocol,
|
||||
test(1) for Sony SIRC 12 bit,
|
||||
test(2) for Sony SIRC 15 bit,
|
||||
test(3) for Sony SIRC 20 bit,
|
||||
|
||||
Hit ctrl-c to stop, then ctrl-d to soft reset."""
|
||||
|
||||
print(s)
|
Loading…
Reference in New Issue
Block a user