new file: micropython_ir
new file: olt_lib/*
This commit is contained in:
parent
d725e709a2
commit
2d6fe96072
1
micropython_ir
Submodule
1
micropython_ir
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit b6697bc2127840b100037da3853bd3d3dd7751b2
|
73
olt_lib/rx/__init__.py
Normal file
73
olt_lib/rx/__init__.py
Normal file
@ -0,0 +1,73 @@
|
||||
# ir_rx __init__.py Decoder for IR remote control using synchronous code
|
||||
# IR_RX abstract base class for IR receivers.
|
||||
|
||||
# Author: Peter Hinch
|
||||
# Copyright Peter Hinch 2020-2024 Released under the MIT license
|
||||
|
||||
# Thanks are due to @Pax-IT for diagnosing a problem with ESP32C3.
|
||||
|
||||
from machine import Timer, Pin
|
||||
from array import array
|
||||
from utime import ticks_us
|
||||
|
||||
# from micropython import alloc_emergency_exception_buf
|
||||
# alloc_emergency_exception_buf(100)
|
||||
|
||||
|
||||
# On 1st edge start a block timer. While the timer is running, record the time
|
||||
# of each edge. When the timer times out decode the data. Duration must exceed
|
||||
# the worst case block transmission time, but be less than the interval between
|
||||
# a block start and a repeat code start (~108ms depending on protocol)
|
||||
|
||||
|
||||
class IR_RX:
|
||||
Timer_id = -1 # Software timer but enable override
|
||||
# Result/error codes
|
||||
# Repeat button code
|
||||
REPEAT = -1
|
||||
# Error codes
|
||||
BADSTART = -2
|
||||
BADBLOCK = -3
|
||||
BADREP = -4
|
||||
OVERRUN = -5
|
||||
BADDATA = -6
|
||||
BADADDR = -7
|
||||
|
||||
def __init__(self, pin, nedges, tblock, callback, *args): # Optional args for callback
|
||||
self._pin = pin
|
||||
self._nedges = nedges
|
||||
self._tblock = tblock
|
||||
self.callback = callback
|
||||
self.args = args
|
||||
self._errf = lambda _: None
|
||||
self.verbose = False
|
||||
|
||||
self._times = array("i", (0 for _ in range(nedges + 1))) # +1 for overrun
|
||||
pin.irq(handler=self._cb_pin, trigger=(Pin.IRQ_FALLING | Pin.IRQ_RISING))
|
||||
self.edge = 0
|
||||
self.tim = Timer(self.Timer_id) # Defaul is sofware timer
|
||||
self.cb = self.decode
|
||||
|
||||
# Pin interrupt. Save time of each edge for later decode.
|
||||
def _cb_pin(self, line):
|
||||
t = ticks_us()
|
||||
# On overrun ignore pulses until software timer times out
|
||||
if self.edge <= self._nedges: # Allow 1 extra pulse to record overrun
|
||||
if not self.edge: # First edge received
|
||||
self.tim.init(period=self._tblock, mode=Timer.ONE_SHOT, callback=self.cb)
|
||||
self._times[self.edge] = t
|
||||
self.edge += 1
|
||||
|
||||
def do_callback(self, cmd, addr, ext, thresh=0):
|
||||
self.edge = 0
|
||||
if cmd >= thresh:
|
||||
self.callback(cmd, addr, ext, *self.args)
|
||||
else:
|
||||
self._errf(cmd)
|
||||
|
||||
def error_function(self, func):
|
||||
self._errf = func
|
||||
|
||||
def close(self):
|
||||
self._pin.irq(handler=None)
|
||||
self.tim.deinit()
|
108
olt_lib/rx/acquire.py
Normal file
108
olt_lib/rx/acquire.py
Normal file
@ -0,0 +1,108 @@
|
||||
# acquire.py Acquire a pulse train from an IR remote
|
||||
# Supports NEC protocol.
|
||||
# For a remote using NEC see https://www.adafruit.com/products/389
|
||||
|
||||
# Author: Peter Hinch
|
||||
# Copyright Peter Hinch 2020 Released under the MIT license
|
||||
|
||||
from machine import Pin, freq
|
||||
from sys import platform
|
||||
|
||||
from utime import sleep_ms, ticks_us, ticks_diff
|
||||
from ir_rx import IR_RX
|
||||
|
||||
|
||||
class IR_GET(IR_RX):
|
||||
def __init__(self, pin, nedges=100, twait=100, display=True):
|
||||
self.display = display
|
||||
super().__init__(pin, nedges, twait, lambda *_ : None)
|
||||
self.data = None
|
||||
|
||||
def decode(self, _):
|
||||
def near(v, target):
|
||||
return target * 0.8 < v < target * 1.2
|
||||
lb = self.edge - 1 # Possible length of burst
|
||||
if lb < 3:
|
||||
return # Noise
|
||||
burst = []
|
||||
for x in range(lb):
|
||||
dt = ticks_diff(self._times[x + 1], self._times[x])
|
||||
if x > 0 and dt > 10000: # Reached gap between repeats
|
||||
break
|
||||
burst.append(dt)
|
||||
lb = len(burst) # Actual length
|
||||
# Duration of pulse train 24892 for RC-5 22205 for RC-6
|
||||
duration = ticks_diff(self._times[lb - 1], self._times[0])
|
||||
|
||||
if self.display:
|
||||
for x, e in enumerate(burst):
|
||||
print('{:03d} {:5d}'.format(x, e))
|
||||
print()
|
||||
# Attempt to determine protocol
|
||||
ok = False # Protocol not yet found
|
||||
if near(burst[0], 9000) and lb == 67:
|
||||
print('NEC')
|
||||
ok = True
|
||||
|
||||
if not ok and near(burst[0], 2400) and near(burst[1], 600): # Maybe Sony
|
||||
try:
|
||||
nbits = {25:12, 31:15, 41:20}[lb]
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
ok = True
|
||||
print('Sony {}bit'.format(nbits))
|
||||
|
||||
if not ok and near(burst[0], 889): # Maybe RC-5
|
||||
if near(duration, 24892) and near(max(burst), 1778):
|
||||
print('Philps RC-5')
|
||||
ok = True
|
||||
|
||||
if not ok and near(burst[0], 2666) and near(burst[1], 889): # RC-6?
|
||||
if near(duration, 22205) and near(burst[1], 889) and near(burst[2], 444):
|
||||
print('Philips RC-6 mode 0')
|
||||
ok = True
|
||||
|
||||
if not ok and near(burst[0], 2000) and near(burst[1], 1000):
|
||||
if near(duration, 19000):
|
||||
print('Microsoft MCE edition protocol.')
|
||||
# Constant duration, variable burst length, presumably bi-phase
|
||||
print('Protocol start {} {} Burst length {} duration {}'.format(burst[0], burst[1], lb, duration))
|
||||
ok = True
|
||||
|
||||
if not ok and near(burst[0], 4500) and near(burst[1], 4500) and lb == 67: # Samsung
|
||||
print('Samsung')
|
||||
ok = True
|
||||
|
||||
if not ok and near(burst[0], 3500) and near(burst[1], 1680): # Panasonic?
|
||||
print('Unsupported protocol. Panasonic?')
|
||||
ok = True
|
||||
|
||||
if not ok:
|
||||
print('Unknown protocol start {} {} Burst length {} duration {}'.format(burst[0], burst[1], lb, duration))
|
||||
|
||||
print()
|
||||
self.data = burst
|
||||
# Set up for new data burst. Run null callback
|
||||
self.do_callback(0, 0, 0)
|
||||
|
||||
def acquire(self):
|
||||
while self.data is None:
|
||||
sleep_ms(5)
|
||||
self.close()
|
||||
return self.data
|
||||
|
||||
def test():
|
||||
# Define pin according to platform
|
||||
if platform == 'pyboard':
|
||||
pin = Pin('X3', Pin.IN)
|
||||
elif platform == 'esp8266':
|
||||
freq(160000000)
|
||||
pin = Pin(13, Pin.IN)
|
||||
elif platform == 'esp32' or platform == 'esp32_LoBo':
|
||||
pin = Pin(23, Pin.IN)
|
||||
elif platform == 'rp2':
|
||||
pin = Pin(16, Pin.IN)
|
||||
irg = IR_GET(pin)
|
||||
print('Waiting for IR data...')
|
||||
return irg.acquire()
|
19
olt_lib/rx/print_error.py
Normal file
19
olt_lib/rx/print_error.py
Normal file
@ -0,0 +1,19 @@
|
||||
# print_error.py Error print for IR receiver
|
||||
|
||||
# Author: Peter Hinch
|
||||
# Copyright Peter Hinch 2020 Released under the MIT license
|
||||
|
||||
from ir_rx import IR_RX
|
||||
|
||||
_errors = {IR_RX.BADSTART : 'Invalid start pulse',
|
||||
IR_RX.BADBLOCK : 'Error: bad block',
|
||||
IR_RX.BADREP : 'Error: repeat',
|
||||
IR_RX.OVERRUN : 'Error: overrun',
|
||||
IR_RX.BADDATA : 'Error: invalid data',
|
||||
IR_RX.BADADDR : 'Error: invalid address'}
|
||||
|
||||
def print_error(data):
|
||||
if data in _errors:
|
||||
print(_errors[data])
|
||||
else:
|
||||
print('Unknown error code:', data)
|
70
olt_lib/rx/sony.py
Normal file
70
olt_lib/rx/sony.py
Normal file
@ -0,0 +1,70 @@
|
||||
# sony.py Decoder for IR remote control using synchronous code
|
||||
# Sony SIRC protocol.
|
||||
|
||||
# Author: Peter Hinch
|
||||
# Copyright Peter Hinch 2020 Released under the MIT license
|
||||
|
||||
from utime import ticks_us, ticks_diff
|
||||
from ir_rx import IR_RX
|
||||
|
||||
class SONY_ABC(IR_RX): # Abstract base class
|
||||
def __init__(self, pin, bits, callback, *args):
|
||||
# 20 bit block has 42 edges and lasts <= 39ms nominal. Add 4ms to time
|
||||
# for tolerances except in 20 bit case where timing is tight with a
|
||||
# repeat period of 45ms.
|
||||
t = int(3 + bits * 1.8) + (1 if bits == 20 else 4)
|
||||
super().__init__(pin, 2 + bits * 2, t, callback, *args)
|
||||
self._addr = 0
|
||||
self._bits = 20
|
||||
|
||||
def decode(self, _):
|
||||
try:
|
||||
nedges = self.edge # No. of edges detected
|
||||
self.verbose and print('nedges', nedges)
|
||||
if nedges > 42:
|
||||
raise RuntimeError(self.OVERRUN)
|
||||
bits = (nedges - 2) // 2
|
||||
if nedges not in (26, 32, 42) or bits > self._bits:
|
||||
raise RuntimeError(self.BADBLOCK)
|
||||
self.verbose and print('SIRC {}bit'.format(bits))
|
||||
width = ticks_diff(self._times[1], self._times[0])
|
||||
if not 1800 < width < 3000: # 2.4ms leading mark for all valid data
|
||||
raise RuntimeError(self.BADSTART)
|
||||
width = ticks_diff(self._times[2], self._times[1])
|
||||
if not 350 < width < 1000: # 600μs space
|
||||
raise RuntimeError(self.BADSTART)
|
||||
|
||||
val = 0 # Data received, LSB 1st
|
||||
x = 2
|
||||
bit = 1
|
||||
while x <= nedges - 2:
|
||||
if ticks_diff(self._times[x + 1], self._times[x]) > 900:
|
||||
val |= bit
|
||||
bit <<= 1
|
||||
x += 2
|
||||
cmd = val & 0x7f # 7 bit command
|
||||
val >>= 7
|
||||
if nedges < 42:
|
||||
addr = val & 0xff # 5 or 8 bit addr
|
||||
val = 0
|
||||
else:
|
||||
addr = val & 0x1f # 5 bit addr
|
||||
val >>= 5 # 8 bit extended
|
||||
except RuntimeError as e:
|
||||
cmd = e.args[0]
|
||||
addr = 0
|
||||
val = 0
|
||||
self.do_callback(cmd, addr, val)
|
||||
|
||||
class SONY_12(SONY_ABC):
|
||||
def __init__(self, pin, callback, *args):
|
||||
super().__init__(pin, 12, callback, *args)
|
||||
|
||||
class SONY_15(SONY_ABC):
|
||||
def __init__(self, pin, callback, *args):
|
||||
super().__init__(pin, 15, callback, *args)
|
||||
|
||||
class SONY_20(SONY_ABC):
|
||||
def __init__(self, pin, callback, *args):
|
||||
super().__init__(pin, 20, callback, *args)
|
||||
|
70
olt_lib/rx/test.py
Normal file
70
olt_lib/rx/test.py
Normal file
@ -0,0 +1,70 @@
|
||||
# test.py Test program for IR remote control decoder
|
||||
# Supports Pyboard, ESP32 and ESP8266
|
||||
|
||||
# Author: Peter Hinch
|
||||
# Copyright Peter Hinch 2020-2022 Released under the MIT license
|
||||
|
||||
# Run this to characterise a remote.
|
||||
|
||||
from sys import platform
|
||||
import time
|
||||
import gc
|
||||
from machine import Pin, freq
|
||||
from ir_rx.print_error import print_error # Optional print of error codes
|
||||
|
||||
# Import all implemented classes
|
||||
from ir_rx.nec import NEC_8, NEC_16, SAMSUNG
|
||||
from ir_rx.sony import SONY_12, SONY_15, SONY_20
|
||||
from ir_rx.philips import RC5_IR, RC6_M0
|
||||
from ir_rx.mce import MCE
|
||||
|
||||
# Define pin according to platform
|
||||
if platform == "pyboard":
|
||||
p = Pin("X3", Pin.IN)
|
||||
elif platform == "esp8266":
|
||||
freq(160000000)
|
||||
p = Pin(13, Pin.IN)
|
||||
elif platform == "esp32" or platform == "esp32_LoBo":
|
||||
p = Pin(23, Pin.IN)
|
||||
elif platform == "rp2":
|
||||
p = Pin(16, Pin.IN)
|
||||
|
||||
# User callback
|
||||
def cb(data, addr, ctrl):
|
||||
if data < 0: # NEC protocol sends repeat codes.
|
||||
print("Repeat code.")
|
||||
else:
|
||||
print(f"Data 0x{data:02x} Addr 0x{addr:04x} Ctrl 0x{ctrl:02x}")
|
||||
|
||||
|
||||
def test(proto=0):
|
||||
classes = (NEC_8, NEC_16, SONY_12, SONY_15, SONY_20, RC5_IR, RC6_M0, MCE, SAMSUNG)
|
||||
ir = classes[proto](p, cb) # Instantiate receiver
|
||||
ir.error_function(print_error) # Show debug information
|
||||
# ir.verbose = True
|
||||
# A real application would do something here...
|
||||
try:
|
||||
while True:
|
||||
print("running")
|
||||
time.sleep(5)
|
||||
gc.collect()
|
||||
except KeyboardInterrupt:
|
||||
ir.close()
|
||||
|
||||
|
||||
# **** DISPLAY GREETING ****
|
||||
s = """Test for IR receiver. Run:
|
||||
from ir_rx.test import test
|
||||
test() for NEC 8 bit protocol,
|
||||
test(1) for NEC 16 bit,
|
||||
test(2) for Sony SIRC 12 bit,
|
||||
test(3) for Sony SIRC 15 bit,
|
||||
test(4) for Sony SIRC 20 bit,
|
||||
test(5) for Philips RC-5 protocol,
|
||||
test(6) for RC6 mode 0.
|
||||
test(7) for Microsoft Vista MCE.
|
||||
test(8) for Samsung.
|
||||
|
||||
Hit ctrl-c to stop, then ctrl-d to soft reset."""
|
||||
|
||||
print(s)
|
146
olt_lib/tx/__init__.py
Normal file
146
olt_lib/tx/__init__.py
Normal file
@ -0,0 +1,146 @@
|
||||
# __init__.py Nonblocking IR blaster
|
||||
# Runs on Pyboard D or Pyboard 1.x (not Pyboard Lite), ESP32 and RP2
|
||||
|
||||
# Released under the MIT License (MIT). See LICENSE.
|
||||
|
||||
# Copyright (c) 2020-2021 Peter Hinch
|
||||
from sys import platform
|
||||
ESP32 = platform == 'esp32' # Loboris not supported owing to RMT
|
||||
RP2 = platform == 'rp2'
|
||||
if ESP32:
|
||||
from machine import Pin, PWM
|
||||
from esp32 import RMT
|
||||
elif RP2:
|
||||
from .rp2_rmt import RP2_RMT
|
||||
else:
|
||||
from pyb import Pin, Timer # Pyboard does not support machine.PWM
|
||||
|
||||
from micropython import const
|
||||
from array import array
|
||||
from time import ticks_us, ticks_diff, sleep_ms
|
||||
# import micropython
|
||||
# micropython.alloc_emergency_exception_buf(100)
|
||||
|
||||
|
||||
# Shared by NEC
|
||||
STOP = const(0) # End of data
|
||||
|
||||
# IR abstract base class. Array holds periods in μs between toggling 36/38KHz
|
||||
# carrier on or off. Physical transmission occurs in an ISR context controlled
|
||||
# by timer 2 and timer 5. See TRANSMITTER.md for details of operation.
|
||||
class IR:
|
||||
_active_high = True # Hardware turns IRLED on if pin goes high.
|
||||
_space = 0 # Duty ratio that causes IRLED to be off
|
||||
timeit = False # Print timing info
|
||||
|
||||
@classmethod
|
||||
def active_low(cls):
|
||||
if ESP32:
|
||||
raise ValueError('Cannot set active low on ESP32')
|
||||
cls._active_high = False
|
||||
cls._space = 100
|
||||
|
||||
def __init__(self, pin, cfreq, asize, duty, verbose):
|
||||
if ESP32:
|
||||
self._rmt = RMT(0, pin=pin, clock_div=80, tx_carrier = (cfreq, duty, 1))
|
||||
# 1μs resolution
|
||||
elif RP2: # PIO-based RMT-like device
|
||||
self._rmt = RP2_RMT(pin_pulse=None, carrier=(pin, cfreq, duty)) # 1μs resolution
|
||||
asize += 1 # Allow for possible extra space pulse
|
||||
else: # Pyboard
|
||||
if not IR._active_high:
|
||||
duty = 100 - duty
|
||||
tim = Timer(2, freq=cfreq) # Timer 2/pin produces 36/38/40KHz carrier
|
||||
self._ch = tim.channel(1, Timer.PWM, pin=pin)
|
||||
self._ch.pulse_width_percent(self._space) # Turn off IR LED
|
||||
# Pyboard: 0 <= pulse_width_percent <= 100
|
||||
self._duty = duty
|
||||
self._tim = Timer(5) # Timer 5 controls carrier on/off times
|
||||
self._tcb = self._cb # Pre-allocate
|
||||
self._arr = array('H', 0 for _ in range(asize)) # on/off times (μs)
|
||||
self._mva = memoryview(self._arr)
|
||||
# Subclass interface
|
||||
self.verbose = verbose
|
||||
self.carrier = False # Notional carrier state while encoding biphase
|
||||
self.aptr = 0 # Index into array
|
||||
self._busy = False
|
||||
|
||||
def _cb(self, t): # T5 callback, generate a carrier mark or space
|
||||
self._busy = True
|
||||
t.deinit()
|
||||
p = self.aptr
|
||||
v = self._arr[p]
|
||||
if v == STOP:
|
||||
self._ch.pulse_width_percent(self._space) # Turn off IR LED.
|
||||
self._busy = False
|
||||
return
|
||||
self._ch.pulse_width_percent(self._space if p & 1 else self._duty)
|
||||
self._tim.init(prescaler=84, period=v, callback=self._tcb)
|
||||
self.aptr += 1
|
||||
|
||||
def busy(self):
|
||||
if ESP32:
|
||||
return not self._rmt.wait_done()
|
||||
if RP2:
|
||||
return self._rmt.busy()
|
||||
return self._busy
|
||||
|
||||
# Public interface
|
||||
# Before populating array, zero pointer, set notional carrier state (off).
|
||||
def transmit(self, addr, data, toggle=0, validate=False): # NEC: toggle is unused
|
||||
while self.busy():
|
||||
pass
|
||||
t = ticks_us()
|
||||
if validate:
|
||||
if addr > self.valid[0] or addr < 0:
|
||||
raise ValueError('Address out of range', addr)
|
||||
if data > self.valid[1] or data < 0:
|
||||
raise ValueError('Data out of range', data)
|
||||
if toggle > self.valid[2] or toggle < 0:
|
||||
raise ValueError('Toggle out of range', toggle)
|
||||
self.aptr = 0 # Inital conditions for tx: index into array
|
||||
self.carrier = False
|
||||
self.tx(addr, data, toggle) # Subclass populates ._arr
|
||||
self.trigger() # Initiate transmission
|
||||
if self.timeit:
|
||||
dt = ticks_diff(ticks_us(), t)
|
||||
print('Time = {}μs'.format(dt))
|
||||
sleep_ms(1) # Ensure ._busy is set prior to return
|
||||
|
||||
# Subclass interface
|
||||
def trigger(self): # Used by NEC to initiate a repeat frame
|
||||
if ESP32:
|
||||
self._rmt.write_pulses(tuple(self._mva[0 : self.aptr]))
|
||||
elif RP2:
|
||||
self.append(STOP)
|
||||
self._rmt.send(self._arr)
|
||||
else:
|
||||
self.append(STOP)
|
||||
self.aptr = 0 # Reset pointer
|
||||
self._cb(self._tim) # Initiate physical transmission.
|
||||
|
||||
def append(self, *times): # Append one or more time peiods to ._arr
|
||||
for t in times:
|
||||
self._arr[self.aptr] = t
|
||||
self.aptr += 1
|
||||
self.carrier = not self.carrier # Keep track of carrier state
|
||||
self.verbose and print('append', t, 'carrier', self.carrier)
|
||||
|
||||
def add(self, t): # Increase last time value (for biphase)
|
||||
assert t > 0
|
||||
self.verbose and print('add', t)
|
||||
# .carrier unaffected
|
||||
self._arr[self.aptr - 1] += t
|
||||
|
||||
|
||||
# Given an iterable (e.g. list or tuple) of times, emit it as an IR stream.
|
||||
class Player(IR):
|
||||
|
||||
def __init__(self, pin, freq=38000, verbose=False, asize=68): # NEC specifies 38KHz
|
||||
super().__init__(pin, freq, asize, 33, verbose) # Measured duty ratio 33%
|
||||
|
||||
def play(self, lst):
|
||||
for x, t in enumerate(lst):
|
||||
self._arr[x] = t
|
||||
self.aptr = x + 1
|
||||
self.trigger()
|
121
olt_lib/tx/rp2_rmt.py
Normal file
121
olt_lib/tx/rp2_rmt.py
Normal file
@ -0,0 +1,121 @@
|
||||
# rp2_rmt.py A RMT-like class for the RP2.
|
||||
|
||||
# Released under the MIT License (MIT). See LICENSE.
|
||||
|
||||
# Copyright (c) 2021-2024 Peter Hinch
|
||||
|
||||
# There are two asm_pio programs, pulsetrain and irqtrain. Only the second is used.
|
||||
# Both operate on a FIFO containing times in μs. The first toggles a pin when a
|
||||
# time elapses and also generates an interrupt. The ISR feeds the FIFO. This was
|
||||
# retained in case it finds application as a general RMT replacement. It is not
|
||||
# invoked by the IR constructor and is inaccessible by the IR subclasses.
|
||||
|
||||
# The irqtrain script only generates the interrupt when the period elapses. The ISR
|
||||
# switches the duty ratio of a PWM running at the carrier frequency. It also feeds
|
||||
# the FIFO. See RP2_RMT.md in the repository root.
|
||||
|
||||
from machine import Pin, PWM
|
||||
import rp2
|
||||
|
||||
# See above: this function is unused by the IR class.
|
||||
@rp2.asm_pio(set_init=rp2.PIO.OUT_LOW, autopull=True, pull_thresh=32)
|
||||
def pulsetrain():
|
||||
wrap_target()
|
||||
out(x, 32) # No of 1MHz ticks. Block if FIFO MT at end.
|
||||
irq(rel(0))
|
||||
set(pins, 1) # Set pin high
|
||||
label("loop")
|
||||
jmp(x_dec, "loop")
|
||||
irq(rel(0))
|
||||
set(pins, 0) # Set pin low
|
||||
out(y, 32) # Low time.
|
||||
label("loop_lo")
|
||||
jmp(y_dec, "loop_lo")
|
||||
wrap()
|
||||
|
||||
|
||||
@rp2.asm_pio(autopull=True, pull_thresh=32)
|
||||
def irqtrain():
|
||||
wrap_target()
|
||||
out(x, 32) # No of 1MHz ticks. Block if FIFO MT at end.
|
||||
irq(rel(0))
|
||||
label("loop")
|
||||
jmp(x_dec, "loop")
|
||||
wrap()
|
||||
|
||||
|
||||
class DummyPWM:
|
||||
def duty_u16(self, _):
|
||||
pass
|
||||
|
||||
|
||||
class RP2_RMT:
|
||||
def __init__(self, pin_pulse=None, carrier=None, sm_no=0, sm_freq=1_000_000):
|
||||
if carrier is None:
|
||||
self.pwm = DummyPWM()
|
||||
self.duty = (0, 0)
|
||||
else:
|
||||
pin_car, freq, duty = carrier
|
||||
self.pwm = PWM(pin_car) # Set up PWM with carrier off.
|
||||
self.pwm.freq(freq)
|
||||
self.pwm.duty_u16(0)
|
||||
self.duty = (int(0xFFFF * duty // 100), 0)
|
||||
if pin_pulse is None:
|
||||
self.sm = rp2.StateMachine(sm_no, irqtrain, freq=sm_freq)
|
||||
else:
|
||||
self.sm = rp2.StateMachine(sm_no, pulsetrain, freq=sm_freq, set_base=pin_pulse)
|
||||
self.apt = 0 # Array index
|
||||
self.arr = None # Array
|
||||
self.ict = None # Current IRQ count
|
||||
self.icm = 0 # End IRQ count
|
||||
self.reps = 0 # 0 == forever n == no. of reps
|
||||
rp2.PIO(0).irq(handler=self._cb, trigger=1 << (sm_no + 8), hard=True)
|
||||
|
||||
# IRQ callback. Because of FIFO IRQ's keep arriving after STOP.
|
||||
def _cb(self, pio):
|
||||
if self.ict is not None: # Occasionally a spurious call occurs in testing
|
||||
self.pwm.duty_u16(self.duty[self.ict & 1])
|
||||
self.ict += 1
|
||||
if d := self.arr[self.apt]: # If data available feed FIFO
|
||||
self.sm.put(d)
|
||||
self.apt += 1
|
||||
else:
|
||||
if r := self.reps != 1: # All done if reps == 1
|
||||
if r: # 0 == run forever
|
||||
self.reps -= 1
|
||||
self.sm.put(self.arr[0])
|
||||
self.apt = 1 # Set pointer and count to state
|
||||
self.ict = 1 # after 1st IRQ
|
||||
|
||||
# Arg is an array of times in μs terminated by 0.
|
||||
def send(self, ar, reps=1, check=True):
|
||||
self.sm.active(0)
|
||||
self.reps = reps
|
||||
ar[-1] = 0 # Ensure at least one STOP
|
||||
for x, d in enumerate(ar): # Find 1st STOP
|
||||
if d == 0:
|
||||
break
|
||||
if check:
|
||||
# Pulse train must end with a space otherwise we leave carrier on.
|
||||
# So, if it ends with a mark, append a space. Note __init__.py
|
||||
# ensures that there is room in array.
|
||||
if x & 1:
|
||||
ar[x] = 1 # space. Duration doesn't matter.
|
||||
x += 1
|
||||
ar[x] = 0 # STOP
|
||||
self.icm = x # index of 1st STOP
|
||||
mv = memoryview(ar)
|
||||
n = min(x, 4) # Fill FIFO if there are enough data points.
|
||||
self.sm.put(mv[0:n])
|
||||
self.arr = ar # Initial conditions for ISR
|
||||
self.apt = n # Point to next data value
|
||||
self.ict = 0 # IRQ count
|
||||
self.sm.active(1)
|
||||
|
||||
def busy(self):
|
||||
if self.ict is None:
|
||||
return False # Just instantiated
|
||||
return self.ict < self.icm
|
||||
|
||||
def cancel(self):
|
||||
self.reps = 1
|
52
olt_lib/tx/sony.py
Normal file
52
olt_lib/tx/sony.py
Normal file
@ -0,0 +1,52 @@
|
||||
# sony.py Encoder for IR remote control using synchronous code
|
||||
# Sony SIRC protocol.
|
||||
|
||||
# Author: Peter Hinch
|
||||
# Copyright Peter Hinch 2020 Released under the MIT license
|
||||
|
||||
from micropython import const
|
||||
from ir_tx import IR
|
||||
|
||||
class SONY_ABC(IR):
|
||||
|
||||
def __init__(self, pin, bits, freq, verbose):
|
||||
super().__init__(pin, freq, 3 + bits * 2, 30, verbose)
|
||||
if bits not in (12, 15, 20, 24):
|
||||
raise ValueError('bits must be 12, 15, 20 or 24.')
|
||||
self.bits = bits
|
||||
|
||||
def tx(self, addr, data, ext):
|
||||
self.append(2400, 600)
|
||||
bits = self.bits
|
||||
v = data & 0x7f
|
||||
if bits == 12:
|
||||
v |= (addr & 0x1f) << 7
|
||||
elif bits == 15:
|
||||
v |= (addr & 0xff) << 7
|
||||
else:
|
||||
v |= (addr & 0x1f) << 7
|
||||
v |= (ext & 0xff) << 12
|
||||
for _ in range(bits):
|
||||
self.append(1200 if v & 1 else 600, 600)
|
||||
v >>= 1
|
||||
|
||||
# Sony LT specifies 56KHz
|
||||
class SONY_12(SONY_ABC):
|
||||
valid = (0x1f, 0x7f, 0) # Max addr, data, toggle
|
||||
def __init__(self, pin, freq=56000, verbose=False):
|
||||
super().__init__(pin, 12, freq, verbose)
|
||||
|
||||
class SONY_15(SONY_ABC):
|
||||
valid = (0xff, 0x7f, 0) # Max addr, data, toggle
|
||||
def __init__(self, pin, freq=56000, verbose=False):
|
||||
super().__init__(pin, 15, freq, verbose)
|
||||
|
||||
class SONY_20(SONY_ABC):
|
||||
valid = (0x1f, 0x7f, 0xff) # Max addr, data, toggle
|
||||
def __init__(self, pin, freq=56000, verbose=False):
|
||||
super().__init__(pin, 20, freq, verbose)
|
||||
|
||||
class LT_24(SONY_ABC):
|
||||
valid = (0x1f, 0x7f, 0xff) # Max addr, data, toggle
|
||||
def __init__(self, pin, freq=56000, verbose=False):
|
||||
super().__init__(pin, 24, freq, verbose)
|
131
olt_lib/tx/test.py
Normal file
131
olt_lib/tx/test.py
Normal file
@ -0,0 +1,131 @@
|
||||
# ir_tx.test Test for nonblocking NEC/SONY/RC-5/RC-6 mode 0 IR blaster.
|
||||
|
||||
# Released under the MIT License (MIT). See LICENSE.
|
||||
|
||||
# Copyright (c) 2020 Peter Hinch
|
||||
|
||||
# Implements a 2-button remote control on a Pyboard with auto repeat.
|
||||
from sys import platform
|
||||
ESP32 = platform == 'esp32'
|
||||
RP2 = platform == 'rp2'
|
||||
PYBOARD = platform == 'pyboard'
|
||||
if ESP32 or RP2:
|
||||
from machine import Pin
|
||||
else:
|
||||
from pyb import Pin, LED
|
||||
import uasyncio as asyncio
|
||||
from primitives.switch import Switch
|
||||
from primitives.delay_ms import Delay_ms
|
||||
# Import all implemented classes
|
||||
from ir_tx.sony import SONY_12, SONY_15, SONY_20, LT_24
|
||||
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
# If button is held down normal behaviour is to retransmit
|
||||
# but most NEC models send a REPEAT code
|
||||
class Rbutton:
|
||||
toggle = 1 # toggle is ignored in NEC mode
|
||||
def __init__(self, irb, pin, addr, data, proto):
|
||||
self.irb = irb
|
||||
self.sw = Switch(pin)
|
||||
self.addr = addr
|
||||
self.data = data
|
||||
self.proto = proto
|
||||
|
||||
self.sw.close_func(self.cfunc)
|
||||
self.sw.open_func(self.ofunc)
|
||||
self.tim = Delay_ms(self.repeat)
|
||||
|
||||
def cfunc(self): # Button push: send data
|
||||
tog = 0 if self.proto < 3 else Rbutton.toggle # NEC, sony 12, 15: toggle==0
|
||||
self.irb.transmit(self.addr, self.data, tog, True) # Test validation
|
||||
# Auto repeat. The Sony protocol specifies 45ms but this is tight.
|
||||
# In 20 bit mode a data burst can be upto 39ms long.
|
||||
self.tim.trigger(108)
|
||||
|
||||
def ofunc(self): # Button release: cancel repeat timer
|
||||
self.tim.stop()
|
||||
Rbutton.toggle ^= 1 # Toggle control
|
||||
|
||||
async def repeat(self):
|
||||
await asyncio.sleep(0) # Let timer stop before retriggering
|
||||
if not self.sw(): # Button is still pressed: retrigger
|
||||
self.tim.trigger(108)
|
||||
if self.proto == 0:
|
||||
self.irb.repeat() # NEC special case: send REPEAT code
|
||||
else:
|
||||
tog = 0 if self.proto < 3 else Rbutton.toggle # NEC, sony 12, 15: toggle==0
|
||||
self.irb.transmit(self.addr, self.data, tog, True) # Test validation
|
||||
|
||||
async def main(proto):
|
||||
# Test uses a 56KHz carrier.
|
||||
if ESP32: # Pins for IR LED gate
|
||||
pin = Pin(23, Pin.OUT, value = 0)
|
||||
elif RP2:
|
||||
pin = Pin(17, Pin.OUT, value = 0)
|
||||
else:
|
||||
pin = Pin('X1')
|
||||
classes = (SONY_12, SONY_15, SONY_20, LT_24)
|
||||
irb = classes[proto](pin, 56000) # My decoder chip is 56KHz
|
||||
# Uncomment the following to print transmit timing
|
||||
irb.timeit = True
|
||||
|
||||
b = [] # Rbutton instances
|
||||
px3 = Pin('X3', Pin.IN, Pin.PULL_UP) if PYBOARD else Pin(18, Pin.IN, Pin.PULL_UP)
|
||||
px4 = Pin('X4', Pin.IN, Pin.PULL_UP) if PYBOARD else Pin(19, Pin.IN, Pin.PULL_UP)
|
||||
b.append(Rbutton(irb, px3, 0x1, 0x7, proto))
|
||||
b.append(Rbutton(irb, px4, 0x10, 0xb, proto))
|
||||
if ESP32:
|
||||
while True:
|
||||
print('Running')
|
||||
await asyncio.sleep(5)
|
||||
elif RP2:
|
||||
led = Pin(25, Pin.OUT)
|
||||
while True:
|
||||
await asyncio.sleep_ms(500) # Obligatory flashing LED.
|
||||
led(not led())
|
||||
else:
|
||||
led = LED(1)
|
||||
while True:
|
||||
await asyncio.sleep_ms(500) # Obligatory flashing LED.
|
||||
led.toggle()
|
||||
|
||||
# Greeting strings. Common:
|
||||
s = '''Test for IR transmitter. Run:
|
||||
from ir_tx.test import test
|
||||
test() for NEC protocol
|
||||
test(1) for Sony SIRC 12 bit
|
||||
test(2) for Sony SIRC 15 bit
|
||||
test(3) for Sony SIRC 20 bit
|
||||
test(4) for Philips RC-5 protocol
|
||||
test(5) for Philips RC-6 mode 0.
|
||||
'''
|
||||
|
||||
# Pyboard:
|
||||
spb = '''
|
||||
IR LED on pin X1
|
||||
Ground pin X3 to send addr 1 data 7
|
||||
Ground pin X4 to send addr 0x10 data 0x0b.'''
|
||||
|
||||
# ESP32
|
||||
sesp = '''
|
||||
IR LED gate on pin 23
|
||||
Ground pin 18 to send addr 1 data 7
|
||||
Ground pin 19 to send addr 0x10 data 0x0b.'''
|
||||
|
||||
# RP2
|
||||
srp2 = '''
|
||||
IR LED gate on pin 17
|
||||
Ground pin 18 to send addr 1 data 7
|
||||
Ground pin 19 to send addr 0x10 data 0x0b.'''
|
||||
|
||||
if ESP32:
|
||||
print(''.join((s, sesp)))
|
||||
elif RP2:
|
||||
print(''.join((s, srp2)))
|
||||
else:
|
||||
print(''.join((s, spb)))
|
||||
|
||||
def test(proto=0):
|
||||
loop.run_until_complete(main(proto))
|
Loading…
Reference in New Issue
Block a user