add local primitives

This commit is contained in:
Tomas Krejci 2024-05-16 23:59:51 +02:00
parent 4f0d983b55
commit 3e099d288e
13 changed files with 1261 additions and 0 deletions

View File

@ -0,0 +1,68 @@
# __init__.py Common functions for uasyncio primitives
# Copyright (c) 2018-2022 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
import asyncio
async def _g():
pass
type_coro = type(_g())
# If a callback is passed, run it and return.
# If a coro is passed initiate it and return.
# coros are passed by name i.e. not using function call syntax.
def launch(func, tup_args):
res = func(*tup_args)
if isinstance(res, type_coro):
res = asyncio.create_task(res)
return res
def set_global_exception():
def _handle_exception(loop, context):
import sys
sys.print_exception(context["exception"])
sys.exit()
loop = asyncio.get_event_loop()
loop.set_exception_handler(_handle_exception)
_attrs = {
"AADC": "aadc",
"Barrier": "barrier",
"Condition": "condition",
"Delay_ms": "delay_ms",
"Encoder": "encoder",
"Pushbutton": "pushbutton",
"ESP32Touch": "pushbutton",
"Queue": "queue",
"Semaphore": "semaphore",
"BoundedSemaphore": "semaphore",
"Switch": "switch",
"WaitAll": "events",
"WaitAny": "events",
"ELO": "events",
"ESwitch": "events",
"EButton": "events",
"RingbufQueue": "ringbuf_queue",
"Keyboard": "sw_array",
"SwArray": "sw_array",
}
# Copied from uasyncio.__init__.py
# Lazy loader, effectively does:
# global attr
# from .mod import attr
def __getattr__(attr):
mod = _attrs.get(attr, None)
if mod is None:
raise AttributeError(attr)
value = getattr(__import__(mod, None, None, True, 1), attr)
globals()[attr] = value
return value

68
lib/primitives/aadc.py Normal file
View File

@ -0,0 +1,68 @@
# aadc.py AADC (asynchronous ADC) class
# Copyright (c) 2020 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
import asyncio
import io
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
class AADC(io.IOBase):
def __init__(self, adc):
self._adc = adc
self._lower = 0
self._upper = 65535
self._pol = True
self._last = None
self._sreader = asyncio.StreamReader(self)
def __iter__(self):
b = yield from self._sreader.read(2)
return int.from_bytes(b, "little")
def _adcread(self):
self._last = self._adc.read_u16()
return self._last
def read(self, n): # For use by StreamReader only
return int.to_bytes(self._last, 2, "little")
def ioctl(self, req, arg):
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_RD:
if self._pol ^ (self._lower <= self._adcread() <= self._upper):
ret |= MP_STREAM_POLL_RD
return ret
# *** API ***
# If normal will pause until ADC value is in range
# Otherwise will pause until value is out of range
def sense(self, normal):
self._pol = normal
def read_u16(self, last=False):
if last:
return self._last
return self._adcread()
# Call syntax: set limits for trigger
# lower is None: leave limits unchanged.
# upper is None: treat lower as relative to current value.
# both have values: treat as absolute limits.
def __call__(self, lower=None, upper=None):
if lower is not None:
if upper is None: # Relative limit
r = self._adcread() if self._last is None else self._last
self._lower = r - lower
self._upper = r + lower
else: # Absolute limits
self._lower = lower
self._upper = upper
return self

51
lib/primitives/barrier.py Normal file
View File

@ -0,0 +1,51 @@
# barrier.py
# Copyright (c) 2018-2020 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
# Now uses Event rather than polling.
import asyncio
from . import launch
# A Barrier synchronises N coros. Each issues await barrier.
# Execution pauses until all other participant coros are waiting on it.
# At that point the callback is executed. Then the barrier is 'opened' and
# execution of all participants resumes.
class Barrier:
def __init__(self, participants, func=None, args=()):
self._participants = participants
self._count = participants
self._func = func
self._args = args
self._res = None
self._evt = asyncio.Event()
def __await__(self):
if self.trigger():
return # Other tasks have already reached barrier
await self._evt.wait() # Wait until last task reaches it
__iter__ = __await__
def result(self):
return self._res
def trigger(self):
self._count -= 1
if self._count < 0:
raise ValueError("Too many tasks accessing Barrier")
if self._count > 0:
return False # At least 1 other task has not reached barrier
# All other tasks are waiting
if self._func is not None:
self._res = launch(self._func, self._args)
self._count = self._participants
self._evt.set() # Release others
self._evt.clear()
return True
def busy(self):
return self._count < self._participants

View File

@ -0,0 +1,66 @@
# condition.py
# Copyright (c) 2018-2020 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
import asyncio
# Condition class
# from primitives.condition import Condition
class Condition:
def __init__(self, lock=None):
self.lock = asyncio.Lock() if lock is None else lock
self.events = []
async def acquire(self):
await self.lock.acquire()
# enable this syntax:
# with await condition [as cond]:
def __await__(self):
await self.lock.acquire()
return self
__iter__ = __await__
def __enter__(self):
return self
def __exit__(self, *_):
self.lock.release()
def locked(self):
return self.lock.locked()
def release(self):
self.lock.release() # Will raise RuntimeError if not locked
def notify(self, n=1): # Caller controls lock
if not self.lock.locked():
raise RuntimeError("Condition notify with lock not acquired.")
for _ in range(min(n, len(self.events))):
ev = self.events.pop()
ev.set()
def notify_all(self):
self.notify(len(self.events))
async def wait(self):
if not self.lock.locked():
raise RuntimeError("Condition wait with lock not acquired.")
ev = asyncio.Event()
self.events.append(ev)
self.lock.release()
await ev.wait()
await self.lock.acquire()
assert ev not in self.events, "condition wait assertion fail"
return True # CPython compatibility
async def wait_for(self, predicate):
result = predicate()
while not result:
await self.wait()
result = predicate()
return result

View File

@ -0,0 +1,82 @@
# delay_ms.py Now uses ThreadSafeFlag and has extra .wait() API
# Usage:
# from primitives import Delay_ms
# Copyright (c) 2018-2022 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
import asyncio
from utime import ticks_add, ticks_diff, ticks_ms
from . import launch
class Delay_ms:
class DummyTimer: # Stand-in for the timer class. Can be cancelled.
def cancel(self):
pass
_fake = DummyTimer()
def __init__(self, func=None, args=(), duration=1000):
self._func = func
self._args = args
self._durn = duration # Default duration
self._retn = None # Return value of launched callable
self._tend = None # Stop time (absolute ms).
self._busy = False
self._trig = asyncio.ThreadSafeFlag()
self._tout = asyncio.Event() # Timeout event
self.wait = self._tout.wait # Allow: await wait_ms.wait()
self.clear = self._tout.clear
self.set = self._tout.set
self._ttask = self._fake # Timer task
self._mtask = asyncio.create_task(self._run()) # Main task
async def _run(self):
while True:
await self._trig.wait() # Await a trigger
self._ttask.cancel() # Cancel and replace
await asyncio.sleep_ms(0)
dt = max(ticks_diff(self._tend, ticks_ms()), 0) # Beware already elapsed.
self._ttask = asyncio.create_task(self._timer(dt))
async def _timer(self, dt):
await asyncio.sleep_ms(dt)
self._tout.set() # Only gets here if not cancelled.
self._busy = False
if self._func is not None:
self._retn = launch(self._func, self._args)
# API
# trigger may be called from hard ISR.
def trigger(self, duration=0): # Update absolute end time, 0-> ctor default
if self._mtask is None:
raise RuntimeError("Delay_ms.deinit() has run.")
self._tend = ticks_add(ticks_ms(), duration if duration > 0 else self._durn)
self._retn = None # Default in case cancelled.
self._busy = True
self._trig.set()
def stop(self):
self._ttask.cancel()
self._ttask = self._fake
self._busy = False
self._tout.clear()
def __call__(self): # Current running status
return self._busy
running = __call__
def rvalue(self):
return self._retn
def callback(self, func=None, args=()):
self._func = func
self._args = args
def deinit(self):
if self._mtask is not None: # https://github.com/peterhinch/micropython-async/issues/98
self.stop()
self._mtask.cancel()
self._mtask = None

124
lib/primitives/encoder.py Normal file
View File

@ -0,0 +1,124 @@
# encoder.py Asynchronous driver for incremental quadrature encoder.
# Copyright (c) 2021-2023 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
# For an explanation of the design please see
# [ENCODERS.md](https://github.com/peterhinch/micropython-samples/blob/master/encoders/ENCODERS.md)
# Thanks are due to the following collaborators:
# @ilium007 for identifying the issue of tracking detents,
# https://github.com/peterhinch/micropython-async/issues/82.
# Mike Teachman (@miketeachman) for design discussions and testing
# against a state table design
# https://github.com/miketeachman/micropython-rotary/blob/master/rotary.py
# Raul Kompaß (@rkompass) for suggesting a bugfix here
# https://forum.micropython.org/viewtopic.php?f=15&t=9929&p=66175#p66156
import asyncio
from machine import Pin
from select import poll, POLLIN
def ready(tsf, poller):
r = (tsf, POLLIN)
poller.register(*r)
def is_rdy():
return r in poller.ipoll(0)
return is_rdy
class Encoder:
def __init__(
self,
pin_x,
pin_y,
v=0,
div=1,
vmin=None,
vmax=None,
mod=None,
callback=lambda a, b: None,
args=(),
delay=100,
):
self._pin_x = pin_x
self._pin_y = pin_y
self._x = pin_x()
self._y = pin_y()
self._v = v * div # Initialise hardware value
self._cv = v # Current (divided) value
self.delay = delay # Pause (ms) for motion to stop/limit callback frequency
self._trig = asyncio.Event()
if ((vmin is not None) and v < vmin) or ((vmax is not None) and v > vmax):
raise ValueError("Incompatible args: must have vmin <= v <= vmax")
self._tsf = asyncio.ThreadSafeFlag()
self._tsf_ready = ready(self._tsf, poll()) # Create a ready function
trig = Pin.IRQ_RISING | Pin.IRQ_FALLING
try:
xirq = pin_x.irq(trigger=trig, handler=self._x_cb, hard=True)
yirq = pin_y.irq(trigger=trig, handler=self._y_cb, hard=True)
except TypeError: # hard arg is unsupported on some hosts
xirq = pin_x.irq(trigger=trig, handler=self._x_cb)
yirq = pin_y.irq(trigger=trig, handler=self._y_cb)
asyncio.create_task(self._run(vmin, vmax, div, mod, callback, args))
# Hardware IRQ's. Duration 36μs on Pyboard 1 ~50μs on ESP32.
# IRQ latency: 2nd edge may have occured by the time ISR runs, in
# which case there is no movement.
def _x_cb(self, pin_x):
if (x := pin_x()) != self._x:
self._x = x
self._v += 1 if x ^ self._pin_y() else -1
self._tsf.set()
def _y_cb(self, pin_y):
if (y := pin_y()) != self._y:
self._y = y
self._v -= 1 if y ^ self._pin_x() else -1
self._tsf.set()
async def _run(self, vmin, vmax, div, mod, cb, args):
pv = self._v # Prior hardware value
pcv = self._cv # Prior divided value passed to callback
lcv = pcv # Current value after limits applied
plcv = pcv # Previous value after limits applied
delay = self.delay
while True:
if delay > 0 and self._tsf_ready(): # Ensure ThreadSafeFlag is clear
await self._tsf.wait()
await self._tsf.wait()
await asyncio.sleep_ms(delay) # Wait for motion to stop.
hv = self._v # Sample hardware (atomic read).
if hv == pv: # A change happened but was negated before
continue # this got scheduled. Nothing to do.
pv = hv
cv = round(hv / div) # cv is divided value.
if not (dv := cv - pcv): # dv is change in divided value.
continue # No change
lcv += dv # lcv: divided value with limits/mod applied
lcv = lcv if vmax is None else min(vmax, lcv)
lcv = lcv if vmin is None else max(vmin, lcv)
lcv = lcv if mod is None else lcv % mod
self._cv = lcv # update ._cv for .value() before CB.
if lcv != plcv:
cb(lcv, lcv - plcv, *args) # Run user CB in uasyncio context
self._trig.set() # Enable async iterator
pcv = cv
plcv = lcv
def __aiter__(self):
return self
def __anext__(self):
await self._trig.wait()
self._trig.clear()
return self._cv
def value(self):
return self._cv

221
lib/primitives/events.py Normal file
View File

@ -0,0 +1,221 @@
# events.py Event based primitives
# Copyright (c) 2022-2024 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
import asyncio
from . import Delay_ms
from . import RingbufQueue
# An Event-like class that can wait on an iterable of Event-like instances.
# .wait pauses until any passed event is set.
class WaitAny:
def __init__(self, events):
self.events = events
self.trig_event = None
self.evt = asyncio.Event()
async def wait(self):
tasks = [asyncio.create_task(self.wt(event)) for event in self.events]
try:
await self.evt.wait()
finally:
self.evt.clear()
for task in tasks:
task.cancel()
return self.trig_event
async def wt(self, event):
await event.wait()
self.evt.set()
self.trig_event = event
def event(self):
return self.trig_event
def clear(self):
for evt in (x for x in self.events if hasattr(x, "clear")):
evt.clear()
# An Event-like class that can wait on an iterable of Event-like instances,
# .wait pauses until all passed events have been set.
class WaitAll:
def __init__(self, events):
self.events = events
async def wait(self):
async def wt(event):
await event.wait()
tasks = (asyncio.create_task(wt(event)) for event in self.events)
try:
await asyncio.gather(*tasks)
finally: # May be subject to timeout or cancellation
for task in tasks:
task.cancel()
def clear(self):
for evt in (x for x in self.events if hasattr(x, "clear")):
evt.clear()
# Convert to an event-like object: either a running task or a coro with args.
# Motivated by a suggestion from @sandyscott iss #116
class ELO_x:
def __init__(self, coro, *args, **kwargs):
self._coro = coro
self._args = args
self._kwargs = kwargs
self._task = None # Current running task (or exception)
async def wait(self):
cr = self._coro
istask = isinstance(cr, asyncio.Task) # Instantiated with a Task
if istask and isinstance(self._task, asyncio.CancelledError):
return # Previously awaited and was cancelled/timed out
self._task = cr if istask else asyncio.create_task(cr(*self._args, **self._kwargs))
try:
await self._task
except asyncio.CancelledError as e:
self._task = e # Let WaitAll or WaitAny complete
# User can retrieve task/coro results by awaiting .task() (even if task had
# run to completion). If task was cancelled CancelledError is returned.
# If .task() is called before .wait() returns None or result of prior .wait()
# Caller issues isinstance(task, CancelledError)
def task(self):
return self._task
# Convert to an event-like object: either a running task or a coro with args.
# Motivated by a suggestion from @sandyscott iss #116
class ELO:
def __init__(self, coro, *args, **kwargs):
tsk = isinstance(coro, asyncio.Task) # Instantiated with a Task
self._task = coro if tsk else asyncio.create_task(coro(*args, **kwargs))
async def wait(self):
try:
await self._task
except asyncio.CancelledError as e:
self._task = e # Let WaitAll or WaitAny complete
# User can retrieve task/coro results by awaiting elo() (even if task had
# run to completion). If task was cancelled CancelledError is returned.
# If .task() is called before .wait() returns None or result of prior .wait()
# Caller issues isinstance(task, CancelledError)
def __call__(self):
return self._task
# Minimal switch class having an Event based interface
class ESwitch:
debounce_ms = 50
def __init__(self, pin, lopen=1): # Default is n/o switch returned to gnd
self._pin = pin # Should be initialised for input with pullup
self._lopen = lopen # Logic level in "open" state
self.open = asyncio.Event()
self.close = asyncio.Event()
self._state = self._pin() ^ self._lopen # Get initial state
asyncio.create_task(self._poll(ESwitch.debounce_ms))
async def _poll(self, dt): # Poll the button
while True:
if (s := self._pin() ^ self._lopen) != self._state: # 15μs
self._state = s
self._cf() if s else self._of()
await asyncio.sleep_ms(dt) # Wait out bounce
def _of(self):
self.open.set()
def _cf(self):
self.close.set()
# ***** API *****
# Return current state of switch (0 = pressed)
def __call__(self):
return self._state
def deinit(self):
self._poll.cancel()
self.open.clear()
self.close.clear()
# Minimal pushbutton class having an Event based interface
class EButton:
debounce_ms = 50 # Attributes can be varied by user
long_press_ms = 1000
double_click_ms = 400
def __init__(self, pin, suppress=False, sense=None):
self._pin = pin # Initialise for input
self._supp = suppress
self._sense = pin() if sense is None else sense
self._state = self.rawstate() # Initial logical state
self._ltim = Delay_ms(duration=EButton.long_press_ms)
self._dtim = Delay_ms(duration=EButton.double_click_ms)
self.press = asyncio.Event() # *** API ***
self.double = asyncio.Event()
self.long = asyncio.Event()
self.release = asyncio.Event() # *** END API ***
# Tasks run forever. Poll contacts
self._tasks = [asyncio.create_task(self._poll(EButton.debounce_ms))]
self._tasks.append(asyncio.create_task(self._ltf())) # Handle long press
if suppress:
self._tasks.append(asyncio.create_task(self._dtf())) # Double timer
async def _poll(self, dt): # Poll the button
while True:
if (s := self.rawstate()) != self._state:
self._state = s
self._pf() if s else self._rf()
await asyncio.sleep_ms(dt) # Wait out bounce
def _pf(self): # Button press
if not self._supp:
self.press.set() # User event
if self._dtim(): # Press occurred while _dtim is running
self.double.set() # User event
self._dtim.stop() # _dtim's Event is only used if suppress
else: # Single press or 1st of a double pair.
self._dtim.trigger()
self._ltim.trigger() # Trigger long timer on 1st press of a double pair
def _rf(self): # Button release
self._ltim.stop()
if not self._supp or not self._dtim(): # If dtim running postpone release otherwise it
self.release.set() # is set before press
async def _ltf(self): # Long timeout
while True:
await self._ltim.wait()
self._ltim.clear() # Clear the event
self.long.set() # User event
# Runs if suppress set. Delay response to single press until sure it is a single short pulse.
async def _dtf(self):
while True:
await self._dtim.wait() # Double click has timed out
self._dtim.clear() # Clear the event
if not self._ltim(): # Button was released: not a long press.
self.press.set() # User events
self.release.set()
# ****** API ******
# Current non-debounced logical button state: True == pressed
def rawstate(self):
return bool(self._pin() ^ self._sense)
# Current debounced state of button (True == pressed)
def __call__(self):
return self._state
def deinit(self):
for task in self._tasks:
task.cancel()
for evt in (self.press, self.double, self.long, self.release):
evt.clear()

View File

@ -0,0 +1,159 @@
# pushbutton.py
# Copyright (c) 2018-2023 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
import asyncio
import utime as time
from . import launch, Delay_ms
try:
from machine import TouchPad
except ImportError:
pass
class Pushbutton:
debounce_ms = 50
long_press_ms = 1000
double_click_ms = 400
def __init__(self, pin, suppress=False, sense=None):
self._pin = pin # Initialise for input
self._supp = suppress
self._dblpend = False # Doubleclick waiting for 2nd click
self._dblran = False # Doubleclick executed user function
self._tf = False
self._ff = False
self._df = False
self._ld = False # Delay_ms instance for long press
self._dd = False # Ditto for doubleclick
# Convert from electrical to logical value
self._sense = pin.value() if sense is None else sense
self._state = self.rawstate() # Initial state
self._run = asyncio.create_task(self._go()) # Thread runs forever
async def _go(self):
while True:
self._check(self.rawstate())
# Ignore state changes until switch has settled. Also avoid hogging CPU.
# See https://github.com/peterhinch/micropython-async/issues/69
await asyncio.sleep_ms(Pushbutton.debounce_ms)
def _check(self, state):
if state == self._state:
return
# State has changed: act on it now.
self._state = state
if state: # Button pressed: launch pressed func
if self._tf:
launch(self._tf, self._ta)
# If there's a long func: start long press delay if no double click running
# (case where a short click is rapidly followed by a long one, iss 101).
if self._ld and not (self._df and self._dd()):
self._ld.trigger(Pushbutton.long_press_ms)
if self._df:
if self._dd(): # Second click: timer running
self._dd.stop()
self._dblpend = False
self._dblran = True # Prevent suppressed launch on release
launch(self._df, self._da)
else:
# First click: start doubleclick timer
self._dd.trigger(Pushbutton.double_click_ms)
self._dblpend = True # Prevent suppressed launch on release
else: # Button release. Is there a release func?
if self._ff:
if self._supp:
d = self._ld
# If long delay exists, is running and doubleclick status is OK
if not self._dblpend and not self._dblran:
if (d and d()) or not d:
launch(self._ff, self._fa)
else:
launch(self._ff, self._fa)
if self._ld:
self._ld.stop() # Avoid interpreting a second click as a long push
self._dblran = False
def _ddto(self): # Doubleclick timeout: no doubleclick occurred
self._dblpend = False
if self._ff and self._supp and not self._state:
if not self._ld or (self._ld and not self._ld()):
launch(self._ff, self._fa)
# ****** API ******
def press_func(self, func=False, args=()):
if func is None:
self.press = asyncio.Event()
self._tf = self.press.set if func is None else func
self._ta = args
def release_func(self, func=False, args=()):
if func is None:
self.release = asyncio.Event()
self._ff = self.release.set if func is None else func
self._fa = args
def double_func(self, func=False, args=()):
if func is None:
self.double = asyncio.Event()
func = self.double.set
self._df = func
self._da = args
if func: # If double timer already in place, leave it
if not self._dd:
self._dd = Delay_ms(self._ddto)
else:
self._dd = False # Clearing down double func
def long_func(self, func=False, args=()):
if func is None:
self.long = asyncio.Event()
func = self.long.set
if func:
if self._ld:
self._ld.callback(func, args)
else:
self._ld = Delay_ms(func, args)
else:
self._ld = False
# Current non-debounced logical button state: True == pressed
def rawstate(self):
return bool(self._pin() ^ self._sense)
# Current debounced state of button (True == pressed)
def __call__(self):
return self._state
def deinit(self):
self._run.cancel()
class ESP32Touch(Pushbutton):
thresh = (80 << 8) // 100
@classmethod
def threshold(cls, val):
if not (isinstance(val, int) and 0 < val < 100):
raise ValueError("Threshold must be in range 1-99")
cls.thresh = (val << 8) // 100
def __init__(self, pin, suppress=False):
self._thresh = 0 # Detection threshold
self._rawval = 0
try:
self._pad = TouchPad(pin)
except ValueError:
raise ValueError(pin) # Let's have a bit of information :)
super().__init__(pin, suppress, False)
# Current logical button state: True == touched
def rawstate(self):
rv = self._pad.read() # ~220μs
if rv > self._rawval: # Either initialisation or pad was touched
self._rawval = rv # when initialised and has now been released
self._thresh = (rv * ESP32Touch.thresh) >> 8
return False # Untouched
return rv < self._thresh

91
lib/primitives/queue.py Normal file
View File

@ -0,0 +1,91 @@
# queue.py: adapted from uasyncio V2
# Copyright (c) 2018-2020 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
# Code is based on Paul Sokolovsky's work.
# This is a temporary solution until uasyncio V3 gets an efficient official version
import asyncio
# Exception raised by get_nowait().
class QueueEmpty(Exception):
pass
# Exception raised by put_nowait().
class QueueFull(Exception):
pass
class Queue:
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._queue = []
self._evput = asyncio.Event() # Triggered by put, tested by get
self._evget = asyncio.Event() # Triggered by get, tested by put
self._jncnt = 0
self._jnevt = asyncio.Event()
self._upd_jnevt(0) # update join event
def _get(self):
self._evget.set() # Schedule all tasks waiting on get
self._evget.clear()
return self._queue.pop(0)
async def get(self): # Usage: item = await queue.get()
while self.empty(): # May be multiple tasks waiting on get()
# Queue is empty, suspend task until a put occurs
# 1st of N tasks gets, the rest loop again
await self._evput.wait()
return self._get()
def get_nowait(self): # Remove and return an item from the queue.
# Return an item if one is immediately available, else raise QueueEmpty.
if self.empty():
raise QueueEmpty()
return self._get()
def _put(self, val):
self._upd_jnevt(1) # update join event
self._evput.set() # Schedule tasks waiting on put
self._evput.clear()
self._queue.append(val)
async def put(self, val): # Usage: await queue.put(item)
while self.full():
# Queue full
await self._evget.wait()
# Task(s) waiting to get from queue, schedule first Task
self._put(val)
def put_nowait(self, val): # Put an item into the queue without blocking.
if self.full():
raise QueueFull()
self._put(val)
def qsize(self): # Number of items in the queue.
return len(self._queue)
def empty(self): # Return True if the queue is empty, False otherwise.
return len(self._queue) == 0
def full(self): # Return True if there are maxsize items in the queue.
# Note: if the Queue was initialized with maxsize=0 (the default) or
# any negative number, then full() is never True.
return self.maxsize > 0 and self.qsize() >= self.maxsize
def _upd_jnevt(self, inc: int): # #Update join count and join event
self._jncnt += inc
if self._jncnt <= 0:
self._jnevt.set()
else:
self._jnevt.clear()
def task_done(self): # Task Done decrements counter
self._upd_jnevt(-1)
async def join(self): # Wait for join event
await self._jnevt.wait()

View File

@ -0,0 +1,76 @@
# ringbuf_queue.py Provides RingbufQueue class
# Copyright (c) 2022-2023 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
# API differs from CPython
# Uses pre-allocated ring buffer: can use list or array
# Asynchronous iterator allowing consumer to use async for
# put_nowait QueueFull exception can be ignored allowing oldest data to be discarded.
import asyncio
class RingbufQueue: # MicroPython optimised
def __init__(self, buf):
self._q = [0 for _ in range(buf)] if isinstance(buf, int) else buf
self._size = len(self._q)
self._wi = 0
self._ri = 0
self._evput = asyncio.Event() # Triggered by put, tested by get
self._evget = asyncio.Event() # Triggered by get, tested by put
def full(self):
return ((self._wi + 1) % self._size) == self._ri
def empty(self):
return self._ri == self._wi
def qsize(self):
return (self._wi - self._ri) % self._size
def get_nowait(self): # Remove and return an item from the queue.
# Return an item if one is immediately available, else raise QueueEmpty.
if self.empty():
raise IndexError
r = self._q[self._ri]
self._ri = (self._ri + 1) % self._size
self._evget.set() # Schedule all tasks waiting on ._evget
self._evget.clear()
return r
def peek(self): # Return oldest item from the queue without removing it.
# Return an item if one is immediately available, else raise QueueEmpty.
if self.empty():
raise IndexError
return self._q[self._ri]
def put_nowait(self, v):
self._q[self._wi] = v
self._evput.set() # Schedule any tasks waiting on get
self._evput.clear()
self._wi = (self._wi + 1) % self._size
if self._wi == self._ri: # Would indicate empty
self._ri = (self._ri + 1) % self._size # Discard a message
raise IndexError # Caller can ignore if overwrites are OK
async def put(self, val): # Usage: await queue.put(item)
while self.full(): # Queue full
await self._evget.wait() # May be >1 task waiting on ._evget
# Task(s) waiting to get from queue, schedule first Task
self.put_nowait(val)
def __aiter__(self):
return self
async def __anext__(self):
return await self.get()
async def get(self):
while self.empty(): # Empty. May be more than one task waiting on ._evput
await self._evput.wait()
r = self._q[self._ri]
self._ri = (self._ri + 1) % self._size
self._evget.set() # Schedule all tasks waiting on ._evget
self._evget.clear()
return r

View File

@ -0,0 +1,46 @@
# semaphore.py
# Copyright (c) 2018-2020 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
import asyncio
# A Semaphore is typically used to limit the number of coros running a
# particular piece of code at once. The number is defined in the constructor.
class Semaphore:
def __init__(self, value=1):
self._count = value
self._event = asyncio.Event()
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, *args):
self.release()
await asyncio.sleep(0)
async def acquire(self):
self._event.clear()
while self._count == 0: # Multiple tasks may be waiting for
await self._event.wait() # a release
self._event.clear()
# When we yield, another task may succeed. In this case
await asyncio.sleep(0) # the loop repeats
self._count -= 1
def release(self):
self._event.set()
self._count += 1
class BoundedSemaphore(Semaphore):
def __init__(self, value=1):
super().__init__(value)
self._initial_value = value
def release(self):
if self._count < self._initial_value:
super().release()
else:
raise ValueError("Semaphore released more than acquired")

158
lib/primitives/sw_array.py Normal file
View File

@ -0,0 +1,158 @@
# sw_array.py A crosspoint array of pushbuttons
# Copyright (c) 2023 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
import asyncio
from . import RingbufQueue
from time import ticks_ms, ticks_diff
# A crosspoint array of pushbuttons
# Tuples/lists of pins. Rows are OUT, cols are IN
class Keyboard(RingbufQueue):
def __init__(self, rowpins, colpins, *, bufsize=10, db_delay=50):
super().__init__(bytearray(bufsize) if isinstance(bufsize, int) else bufsize)
self.rowpins = rowpins
self.colpins = colpins
self._state = 0 # State of all keys as bitmap
for opin in self.rowpins: # Initialise output pins
opin(1)
self._run = asyncio.create_task(self.scan(len(rowpins) * len(colpins), db_delay))
def __getitem__(self, scan_code):
return bool(self._state & (1 << scan_code))
async def scan(self, nkeys, db_delay):
while True:
cur = 0 # Current bitmap of logical key states
for opin in self.rowpins:
opin(0) # Assert output
for ipin in self.colpins:
cur <<= 1
cur |= ipin() ^ 1 # Convert physical to logical
opin(1)
if pressed := (cur & ~self._state): # 1's are newly pressed button(s)
for sc in range(nkeys):
if pressed & 1:
try:
self.put_nowait(sc)
except IndexError: # q full. Overwrite oldest
pass
pressed >>= 1
changed = cur ^ self._state # Any new press or release
self._state = cur
await asyncio.sleep_ms(db_delay if changed else 0) # Wait out bounce
def deinit(self):
self._run.cancel()
CLOSE = const(1) # cfg comprises the OR of these constants
OPEN = const(2)
LONG = const(4)
DOUBLE = const(8)
SUPPRESS = const(16) # Disambiguate: see docs.
# Entries in queue are (scan_code, event) where event is an OR of above constants.
# rowpins/colpins are tuples/lists of pins. Rows are OUT, cols are IN.
# cfg is a logical OR of above constants. If a bit is 0 that state will never be reported.
class SwArray(RingbufQueue):
debounce_ms = 50 # Attributes can be varied by user
long_press_ms = 1000
double_click_ms = 400
def __init__(self, rowpins, colpins, cfg, *, bufsize=10):
super().__init__(bufsize)
self._rowpins = rowpins
self._colpins = colpins
self._cfg = cfg
self._state = 0 # State of all buttons as bitmap
self._flags = 0 # Busy bitmap
self._basic = not bool(cfg & (SUPPRESS | LONG | DOUBLE)) # Basic mode
self._suppress = bool(cfg & SUPPRESS)
for opin in self._rowpins: # Initialise output pins
opin(1) # open circuit
self._run = asyncio.create_task(self._scan(len(rowpins) * len(colpins)))
def __getitem__(self, scan_code):
return bool(self._state & (1 << scan_code))
def _put(self, sc, evt):
if evt & self._cfg: # Only if user has requested it
try:
self.put_nowait((sc, evt))
except IndexError: # q full. Overwrite oldest
pass
def _timeout(self, ts, condition):
t = SwArray.long_press_ms if condition == LONG else SwArray.double_click_ms
return ticks_diff(ticks_ms(), ts) > t
def _busy(self, sc, v):
of = self._flags # Return prior state
if v:
self._flags |= 1 << sc
else:
self._flags &= ~(1 << sc)
return (of >> sc) & 1
async def _finish(self, sc): # Tidy up. If necessary await a contact open
while self[sc]:
await asyncio.sleep_ms(0)
self._put(sc, OPEN)
self._busy(sc, False)
def keymap(self): # Return a bitmap of debounced state of all buttons/switches
return self._state
# Handle long, double. Switch has closed.
async def _defer(self, sc):
# Wait for contact closure to be registered: let calling loop complete
await asyncio.sleep_ms(0)
ts = ticks_ms()
if not self._suppress:
self._put(sc, CLOSE)
while self[sc]: # Pressed
await asyncio.sleep_ms(0)
if self._timeout(ts, LONG):
self._put(sc, LONG)
await self._finish(sc)
return
if not self._suppress:
self._put(sc, OPEN)
while not self[sc]:
await asyncio.sleep_ms(0)
if self._timeout(ts, DOUBLE): # No second closure
self._put(sc, CLOSE) # Single press. Report CLOSE
await self._finish(sc) # then OPEN
return
self._put(sc, DOUBLE)
await self._finish(sc)
async def _scan(self, nkeys):
db_delay = SwArray.debounce_ms
while True:
cur = 0 # Current bitmap of logical button states (1 == pressed)
for opin in self._rowpins:
opin(0) # Assert output
for ipin in self._colpins:
cur <<= 1
cur |= ipin() ^ 1 # Convert physical to logical
opin(1)
curb = cur # Copy current bitmap
if changed := (cur ^ self._state): # 1's are newly canged button(s)
for sc in range(nkeys):
if changed & 1: # Current button has changed state
if self._basic: # No timed behaviour
self._put(sc, CLOSE if cur & 1 else OPEN)
elif cur & 1: # Closed
if not self._busy(sc, True): # Currently not busy
asyncio.create_task(self._defer(sc)) # Q is handled asynchronously
changed >>= 1
cur >>= 1
changed = curb ^ self._state # Any new press or release
self._state = curb
await asyncio.sleep_ms(db_delay if changed else 0) # Wait out bounce
def deinit(self):
self._run.cancel()

51
lib/primitives/switch.py Normal file
View File

@ -0,0 +1,51 @@
# switch.py
# Copyright (c) 2018-2022 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
import asyncio
import utime as time
from . import launch
class Switch:
debounce_ms = 50
def __init__(self, pin):
self.pin = pin # Should be initialised for input with pullup
self._open_func = False
self._close_func = False
self.switchstate = self.pin.value() # Get initial state
self._run = asyncio.create_task(self.switchcheck()) # Thread runs forever
def open_func(self, func, args=()):
if func is None:
self.open = asyncio.Event()
self._open_func = self.open.set if func is None else func
self._open_args = args
def close_func(self, func, args=()):
if func is None:
self.close = asyncio.Event()
self._close_func = self.close.set if func is None else func
self._close_args = args
# Return current state of switch (0 = pressed)
def __call__(self):
return self.switchstate
async def switchcheck(self):
while True:
state = self.pin.value()
if state != self.switchstate:
# State has changed: act on it now.
self.switchstate = state
if state == 0 and self._close_func:
launch(self._close_func, self._close_args)
elif state == 1 and self._open_func:
launch(self._open_func, self._open_args)
# Ignore further state changes until switch has settled
await asyncio.sleep_ms(Switch.debounce_ms)
def deinit(self):
self._run.cancel()