remote red led only setup
This commit is contained in:
parent
bf810ad31b
commit
74009d5180
222
lib/audio.py
222
lib/audio.py
@ -1,222 +0,0 @@
|
||||
## audio.py test for audio, I2S, MAX98357
|
||||
## Tomas Krejci [Njord]
|
||||
|
||||
## MAX98357A
|
||||
# Pico 3V3 to breakout VIN
|
||||
# Pico GND to breakout GND
|
||||
# Pico GP0 to breakout BCLK
|
||||
# Pico GP1 to breakout LRC
|
||||
# Pico GP2 to breakout DIN
|
||||
# Speaker + to screw terminal +
|
||||
# Speaker - to screw terminal -
|
||||
|
||||
# The MIT License (MIT)
|
||||
# Copyright (c) 2022 Mike Teachman
|
||||
# https://opensource.org/licenses/MIT
|
||||
|
||||
# Purpose: Play a WAV audio file out of a speaker or headphones
|
||||
#
|
||||
# - read audio samples from a WAV file on SD Card
|
||||
# - write audio samples to an I2S amplifier or DAC module
|
||||
# - the WAV file will play continuously in a loop until
|
||||
# a keyboard interrupt is detected or the board is reset
|
||||
#
|
||||
# non-blocking version
|
||||
# - the write() method is non-blocking.
|
||||
# - a callback function is called when all sample data has been written to the I2S interface
|
||||
# - a callback() method sets the callback function
|
||||
|
||||
import os
|
||||
import time
|
||||
import micropython
|
||||
from machine import I2S
|
||||
from machine import Pin
|
||||
|
||||
if os.uname().machine.count("PYBv1"):
|
||||
# ======= I2S CONFIGURATION =======
|
||||
SCK_PIN = "Y6"
|
||||
WS_PIN = "Y5"
|
||||
SD_PIN = "Y8"
|
||||
I2S_ID = 2
|
||||
BUFFER_LENGTH_IN_BYTES = 40000
|
||||
# ======= I2S CONFIGURATION =======
|
||||
|
||||
elif os.uname().machine.count("PYBD"):
|
||||
import pyb
|
||||
|
||||
# pyb.Pin("EN_3V3").on() # provide 3.3V on 3V3 output pin
|
||||
# os.mount(pyb.SDCard(), "/sd")
|
||||
|
||||
# ======= I2S CONFIGURATION =======
|
||||
SCK_PIN = "Y6"
|
||||
WS_PIN = "Y5"
|
||||
SD_PIN = "Y8"
|
||||
I2S_ID = 2
|
||||
BUFFER_LENGTH_IN_BYTES = 40000
|
||||
# ======= I2S CONFIGURATION =======
|
||||
|
||||
elif os.uname().machine.count("ESP32"):
|
||||
# from machine import SDCard
|
||||
|
||||
# sd = SDCard(slot=2) # sck=18, mosi=23, miso=19, cs=5
|
||||
# os.mount(sd, "/sd")
|
||||
|
||||
# ======= I2S CONFIGURATION =======
|
||||
SCK_PIN = 32
|
||||
WS_PIN = 25
|
||||
SD_PIN = 33
|
||||
I2S_ID = 0
|
||||
BUFFER_LENGTH_IN_BYTES = 40000
|
||||
# ======= I2S CONFIGURATION =======
|
||||
|
||||
elif os.uname().machine.count("Raspberry"):
|
||||
# from sdcard import SDCard
|
||||
# from machine import SPI
|
||||
|
||||
# cs = Pin(13, machine.Pin.OUT)
|
||||
# spi = SPI(
|
||||
# 1,
|
||||
# baudrate=1_000_000, # this has no effect on spi bus speed to SD Card
|
||||
# polarity=0,
|
||||
# phase=0,
|
||||
# bits=8,
|
||||
# firstbit=machine.SPI.MSB,
|
||||
# sck=Pin(14),
|
||||
# mosi=Pin(15),
|
||||
# miso=Pin(12),
|
||||
# )
|
||||
#
|
||||
# sd = SDCard(spi, cs)
|
||||
# sd.init_spi(25_000_000) # increase SPI bus speed to SD card
|
||||
# os.mount(sd, "/sd")
|
||||
|
||||
# ======= I2S CONFIGURATION =======
|
||||
SCK_PIN = 16
|
||||
WS_PIN = 17
|
||||
SD_PIN = 18
|
||||
I2S_ID = 0
|
||||
BUFFER_LENGTH_IN_BYTES = 40000
|
||||
# ======= I2S CONFIGURATION =======
|
||||
|
||||
elif os.uname().machine.count("MIMXRT"):
|
||||
from machine import SDCard
|
||||
|
||||
# sd = SDCard(1) # Teensy 4.1: sck=45, mosi=43, miso=42, cs=44
|
||||
# os.mount(sd, "/sd")
|
||||
|
||||
# ======= I2S CONFIGURATION =======
|
||||
SCK_PIN = 4
|
||||
WS_PIN = 3
|
||||
SD_PIN = 2
|
||||
I2S_ID = 2
|
||||
BUFFER_LENGTH_IN_BYTES = 40000
|
||||
# ======= I2S CONFIGURATION =======
|
||||
|
||||
else:
|
||||
print("Warning: program not tested with this board")
|
||||
|
||||
# ======= AUDIO CONFIGURATION =======
|
||||
WAV_FILE = "test.wav"
|
||||
WAV_SAMPLE_SIZE_IN_BITS = 16
|
||||
FORMAT = I2S.MONO
|
||||
SAMPLE_RATE_IN_HZ = 16000
|
||||
# ======= AUDIO CONFIGURATION =======
|
||||
|
||||
PLAY = 0
|
||||
PAUSE = 1
|
||||
RESUME = 2
|
||||
STOP = 3
|
||||
|
||||
|
||||
def eof_callback(arg):
|
||||
global state
|
||||
print("end of audio file")
|
||||
# state = STOP # uncomment to stop looping playback
|
||||
|
||||
|
||||
def i2s_callback(arg):
|
||||
global state
|
||||
if state == PLAY:
|
||||
num_read = wav.readinto(wav_samples_mv)
|
||||
# end of WAV file?
|
||||
if num_read == 0:
|
||||
# end-of-file, advance to first byte of Data section
|
||||
pos = wav.seek(44)
|
||||
_ = audio_out.write(silence)
|
||||
micropython.schedule(eof_callback, None)
|
||||
else:
|
||||
_ = audio_out.write(wav_samples_mv[:num_read])
|
||||
elif state == RESUME:
|
||||
state = PLAY
|
||||
_ = audio_out.write(silence)
|
||||
elif state == PAUSE:
|
||||
_ = audio_out.write(silence)
|
||||
elif state == STOP:
|
||||
# cleanup
|
||||
wav.close()
|
||||
# if os.uname().machine.count("PYBD"):
|
||||
# os.umount("/sd")
|
||||
# elif os.uname().machine.count("ESP32"):
|
||||
# os.umount("/sd")
|
||||
# sd.deinit()
|
||||
# elif os.uname().machine.count("Raspberry"):
|
||||
# os.umount("/sd")
|
||||
# spi.deinit()
|
||||
# elif os.uname().machine.count("MIMXRT"):
|
||||
# os.umount("/sd")
|
||||
# sd.deinit()
|
||||
audio_out.deinit()
|
||||
print("Done")
|
||||
else:
|
||||
print("Not a valid state. State ignored")
|
||||
|
||||
|
||||
audio_out = I2S(
|
||||
I2S_ID,
|
||||
sck=Pin(SCK_PIN),
|
||||
ws=Pin(WS_PIN),
|
||||
sd=Pin(SD_PIN),
|
||||
mode=I2S.TX,
|
||||
bits=WAV_SAMPLE_SIZE_IN_BITS,
|
||||
format=FORMAT,
|
||||
rate=SAMPLE_RATE_IN_HZ,
|
||||
ibuf=BUFFER_LENGTH_IN_BYTES,
|
||||
)
|
||||
|
||||
audio_out.irq(i2s_callback)
|
||||
state = PAUSE
|
||||
|
||||
wav = open("/lib/audio_files/{}".format(WAV_FILE), "rb")
|
||||
_ = wav.seek(44) # advance to first byte of Data section in WAV file
|
||||
|
||||
# allocate a small array of blank samples
|
||||
silence = bytearray(1000)
|
||||
|
||||
# allocate sample array buffer
|
||||
wav_samples = bytearray(10000)
|
||||
wav_samples_mv = memoryview(wav_samples)
|
||||
|
||||
_ = audio_out.write(silence)
|
||||
|
||||
# add runtime code here ....
|
||||
# changing 'state' will affect playback of audio file
|
||||
|
||||
print("starting playback for 10s")
|
||||
state = PLAY
|
||||
time.sleep(10)
|
||||
print("pausing playback for 10s")
|
||||
state = PAUSE
|
||||
time.sleep(10)
|
||||
print("resuming playback for 15s")
|
||||
state = RESUME
|
||||
time.sleep(15)
|
||||
print("stopping playback")
|
||||
state = STOP
|
||||
|
||||
|
||||
def demo():
|
||||
print("Audio demo")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
demo()
|
Binary file not shown.
30
lib/rgb.py
30
lib/rgb.py
@ -7,17 +7,17 @@ import machine
|
||||
import uasyncio as asyncio
|
||||
from machine import Pin, PWM
|
||||
|
||||
|
||||
class RGB:
|
||||
#rgb_pwr = 1.0
|
||||
# rgb_pwr = 1.0
|
||||
def __init__(self):
|
||||
global rgb_pwr
|
||||
rgb_pwr = 0.05 # 0.0 - 1.0
|
||||
rgb_pwr = 1.0 # 0.0 - 1.0
|
||||
|
||||
self.led_r = PWM(Pin(22), 100)
|
||||
self.led_g = PWM(Pin(26), 100)
|
||||
self.led_b = PWM(Pin(27), 100)
|
||||
|
||||
|
||||
async def set(self, r, g, b):
|
||||
global rgb_pwr
|
||||
self.led_r.duty_u16(int(65535 * r * rgb_pwr))
|
||||
@ -33,13 +33,12 @@ class RGB:
|
||||
|
||||
async def set_pwr(self, pwr):
|
||||
global rgb_pwr
|
||||
rgb_pwr = pwr
|
||||
rgb_pwr = pwr
|
||||
|
||||
async def get_pwr(self):
|
||||
global rgb_pwr
|
||||
return rgb_pwr
|
||||
|
||||
|
||||
async def set_pwr_up(self):
|
||||
await self.set_pwr(await self.get_pwr() * 5.0)
|
||||
|
||||
@ -56,7 +55,7 @@ class RGB:
|
||||
await self.set(0, 0, 1)
|
||||
|
||||
async def set_yellow(self):
|
||||
await self.set(1, 1, 0)
|
||||
await self.set(1, 1, 0)
|
||||
|
||||
async def set_magenta(self):
|
||||
await self.set(1, 0, 1)
|
||||
@ -65,14 +64,11 @@ class RGB:
|
||||
await self.set(0, 1, 1)
|
||||
|
||||
async def set_white(self):
|
||||
await self.set(0.8, 0.8, 0.8)
|
||||
await self.set(0.8, 0.8, 0.8)
|
||||
|
||||
async def set_orange(self):
|
||||
await self.set(1, 0.5, 0)
|
||||
|
||||
|
||||
|
||||
|
||||
async def test(self):
|
||||
sec = 1
|
||||
print("RGB demo")
|
||||
@ -80,7 +76,7 @@ class RGB:
|
||||
await uasyncio.sleep(sec)
|
||||
print(f"RGB power: {await self.get_pwr()}")
|
||||
print("RGB initializing...")
|
||||
|
||||
|
||||
await uasyncio.sleep(sec)
|
||||
while True:
|
||||
print("RGB off")
|
||||
@ -110,11 +106,9 @@ class RGB:
|
||||
await self.set_pwr_up()
|
||||
await self.set_red()
|
||||
print("RGB power:", await self.get_pwr())
|
||||
await uasyncio.sleep(sec)
|
||||
await uasyncio.sleep(sec)
|
||||
|
||||
|
||||
# test
|
||||
#rgb = RGB()
|
||||
#asyncio.run(rgb.test())
|
||||
|
||||
|
||||
|
||||
|
||||
# rgb = RGB()
|
||||
# asyncio.run(rgb.test())
|
||||
|
59
main.py
59
main.py
@ -2,6 +2,7 @@
|
||||
# OLT protocol.
|
||||
# Tomas Krejci [Njord]
|
||||
|
||||
|
||||
import machine, time, re
|
||||
from sys import platform
|
||||
from primitives import set_global_exception
|
||||
@ -12,6 +13,12 @@ ESP32 = platform == "esp32"
|
||||
RP2 = platform == "rp2"
|
||||
PYBOARD = platform == "pyboard"
|
||||
|
||||
#### Other imports ###
|
||||
from primitives.delay_ms import Delay_ms
|
||||
import primitives.queue as queue
|
||||
from primitives import Switch, Pushbutton
|
||||
from rgb import RGB
|
||||
|
||||
#### onboard LED ####
|
||||
if ESP32 or RP2:
|
||||
from machine import Pin
|
||||
@ -85,16 +92,22 @@ import time
|
||||
|
||||
|
||||
async def ble_rx_handler():
|
||||
global rgb1
|
||||
message = uart.read().decode().strip()
|
||||
logger.debug(f"BLE: {message}")
|
||||
|
||||
try:
|
||||
if "bomb" in message:
|
||||
logger.info("BLE: Bomb Explode send to IR")
|
||||
asyncio.create_task(
|
||||
olt_sent_ir(ir_tx, COMMANDS[1][1], COMMANDS[1][2], COMMANDS[1][3])
|
||||
)
|
||||
for i in range(3):
|
||||
asyncio.create_task(
|
||||
olt_sent_ir(ir_tx, COMMANDS[1][1], COMMANDS[1][2], COMMANDS[1][3])
|
||||
)
|
||||
await asyncio.sleep_ms(100)
|
||||
|
||||
uart.write("BLE: Bomb Explode send to IR\n")
|
||||
asyncio.create_task(rgb1.set_red())
|
||||
await asyncio.sleep_ms(100)
|
||||
while True:
|
||||
time.sleep_ms(2000)
|
||||
logger.info(
|
||||
@ -104,17 +117,28 @@ async def ble_rx_handler():
|
||||
"BLE: Bomb Exploded, Device is LOCKED\n Please restart Device\n"
|
||||
)
|
||||
|
||||
message_int = int(message, 16)
|
||||
logger.debug(f"BLE: {message_int:08x}")
|
||||
elif "led_on" in message:
|
||||
logger.info("BLE: LED ON")
|
||||
asyncio.create_task(rgb1.set_red())
|
||||
uart.write("BLE: LED ON\n")
|
||||
|
||||
## if recive from BLE UART any hex number senend it as custom command to IR
|
||||
tx1 = (message_int >> 16) & 0xFF
|
||||
tx2 = (message_int >> 8) & 0xFF
|
||||
tx3 = message_int & 0xFF
|
||||
asyncio.create_task(olt_sent_ir(ir_tx, tx1, tx2, tx3))
|
||||
elif "led_off" in message:
|
||||
logger.info("BLE: LED OFF")
|
||||
asyncio.create_task(rgb1.off())
|
||||
uart.write("BLE: LED OFF\n")
|
||||
|
||||
## send info back to BLE
|
||||
uart.write(f"BLE: uart recive 0x{message_int:08x}\n")
|
||||
else:
|
||||
message_int = int(message, 16)
|
||||
logger.debug(f"BLE: {message_int:08x}")
|
||||
|
||||
## if recive from BLE UART any hex number senend it as custom command to IR
|
||||
tx1 = (message_int >> 16) & 0xFF
|
||||
tx2 = (message_int >> 8) & 0xFF
|
||||
tx3 = message_int & 0xFF
|
||||
asyncio.create_task(olt_sent_ir(ir_tx, tx1, tx2, tx3))
|
||||
|
||||
## send info back to BLE
|
||||
uart.write(f"BLE: uart recive 0x{message_int:08x}\n")
|
||||
|
||||
except TypeError:
|
||||
logger.debug("BLE: TypeError")
|
||||
@ -176,11 +200,6 @@ elif RP2:
|
||||
else:
|
||||
ir_tx_pin = Pin("X1")
|
||||
|
||||
#### Other imports ###
|
||||
from primitives.delay_ms import Delay_ms
|
||||
import primitives.queue as queue
|
||||
from primitives import Switch, Pushbutton
|
||||
from rgb import RGB
|
||||
|
||||
#### OLT Commands ####
|
||||
COMMAND_END = 0xE8
|
||||
@ -245,7 +264,7 @@ async def olt_command(cmd, btn, rgb, ir_tx, tx1, tx2, tx3):
|
||||
# await asyncio.sleep_ms(50)
|
||||
await btn.release.wait()
|
||||
logger.debug(f"Command {cmd} button released")
|
||||
await rgb.set_red()
|
||||
# await rgb.set_red()
|
||||
|
||||
|
||||
# IR RX callback
|
||||
@ -277,7 +296,7 @@ async def cb(byte1, byte2, byte3, packet):
|
||||
)
|
||||
|
||||
await rgb1.set_blue()
|
||||
await asyncio.sleep_ms(200)
|
||||
await asyncio.sleep_ms(500)
|
||||
await rgb1.set_red()
|
||||
|
||||
|
||||
@ -323,7 +342,7 @@ async def main(proto):
|
||||
|
||||
### Buttons ###
|
||||
logger.debug("Button init")
|
||||
PINS = ((COMMANDS[0], 20), (COMMANDS[1], 21), (COMMANDS[2], 18), (COMMANDS[3], 19))
|
||||
PINS = ((COMMANDS[0], 18), (COMMANDS[1], 21), (COMMANDS[2], 20), (COMMANDS[3], 19))
|
||||
for cmd, pin in PINS:
|
||||
btn = Pushbutton(machine.Pin(pin, machine.Pin.IN, machine.Pin.PULL_UP))
|
||||
command, tx1, tx2, tx3 = cmd
|
||||
|
Loading…
Reference in New Issue
Block a user