add logging

This commit is contained in:
Tomas Krejci 2024-05-24 01:02:39 +02:00
parent 5ee5ebc97f
commit cbcf91b9bb
2 changed files with 271 additions and 2 deletions

252
lib/logging.py Normal file
View File

@ -0,0 +1,252 @@
from micropython import const
import io
import sys
import time
CRITICAL = const(50)
ERROR = const(40)
WARNING = const(30)
INFO = const(20)
DEBUG = const(10)
NOTSET = const(0)
_DEFAULT_LEVEL = const(WARNING)
_level_dict = {
CRITICAL: "CRITICAL",
ERROR: "ERROR",
WARNING: "WARNING",
INFO: "INFO",
DEBUG: "DEBUG",
NOTSET: "NOTSET",
}
_loggers = {}
_stream = sys.stderr
_default_fmt = "%(levelname)s:%(name)s:%(message)s"
_default_datefmt = "%Y-%m-%d %H:%M:%S"
class LogRecord:
def set(self, name, level, message):
self.name = name
self.levelno = level
self.levelname = _level_dict[level]
self.message = message
self.ct = time.time()
self.msecs = int((self.ct - int(self.ct)) * 1000)
self.asctime = None
class Handler:
def __init__(self, level=NOTSET):
self.level = level
self.formatter = None
def close(self):
pass
def setLevel(self, level):
self.level = level
def setFormatter(self, formatter):
self.formatter = formatter
def format(self, record):
return self.formatter.format(record)
class StreamHandler(Handler):
def __init__(self, stream=None):
self.stream = _stream if stream is None else stream
self.terminator = "\n"
def close(self):
if hasattr(self.stream, "flush"):
self.stream.flush()
def emit(self, record):
if record.levelno >= self.level:
self.stream.write(self.format(record) + self.terminator)
class FileHandler(StreamHandler):
def __init__(self, filename, mode="a", encoding="UTF-8"):
super().__init__(stream=open(filename, mode=mode, encoding=encoding))
def close(self):
super().close()
self.stream.close()
class Formatter:
def __init__(self, fmt=None, datefmt=None):
self.fmt = _default_fmt if fmt is None else fmt
self.datefmt = _default_datefmt if datefmt is None else datefmt
def usesTime(self):
return "asctime" in self.fmt
def formatTime(self, datefmt, record):
if hasattr(time, "strftime"):
return time.strftime(datefmt, time.localtime(record.ct))
return None
def format(self, record):
if self.usesTime():
record.asctime = self.formatTime(self.datefmt, record)
return self.fmt % {
"name": record.name,
"message": record.message,
"msecs": record.msecs,
"asctime": record.asctime,
"levelname": record.levelname,
}
class Logger:
def __init__(self, name, level=NOTSET):
self.name = name
self.level = level
self.handlers = []
self.record = LogRecord()
def setLevel(self, level):
self.level = level
def isEnabledFor(self, level):
return level >= self.getEffectiveLevel()
def getEffectiveLevel(self):
return self.level or getLogger().level or _DEFAULT_LEVEL
def log(self, level, msg, *args):
if self.isEnabledFor(level):
if args:
if isinstance(args[0], dict):
args = args[0]
msg = msg % args
self.record.set(self.name, level, msg)
handlers = self.handlers
if not handlers:
handlers = getLogger().handlers
for h in handlers:
h.emit(self.record)
def debug(self, msg, *args):
self.log(DEBUG, msg, *args)
def info(self, msg, *args):
self.log(INFO, msg, *args)
def warning(self, msg, *args):
self.log(WARNING, msg, *args)
def error(self, msg, *args):
self.log(ERROR, msg, *args)
def critical(self, msg, *args):
self.log(CRITICAL, msg, *args)
def exception(self, msg, *args, exc_info=True):
self.log(ERROR, msg, *args)
tb = None
if isinstance(exc_info, BaseException):
tb = exc_info
elif hasattr(sys, "exc_info"):
tb = sys.exc_info()[1]
if tb:
buf = io.StringIO()
sys.print_exception(tb, buf)
self.log(ERROR, buf.getvalue())
def addHandler(self, handler):
self.handlers.append(handler)
def hasHandlers(self):
return len(self.handlers) > 0
def getLogger(name=None):
if name is None:
name = "root"
if name not in _loggers:
_loggers[name] = Logger(name)
if name == "root":
basicConfig()
return _loggers[name]
def log(level, msg, *args):
getLogger().log(level, msg, *args)
def debug(msg, *args):
getLogger().debug(msg, *args)
def info(msg, *args):
getLogger().info(msg, *args)
def warning(msg, *args):
getLogger().warning(msg, *args)
def error(msg, *args):
getLogger().error(msg, *args)
def critical(msg, *args):
getLogger().critical(msg, *args)
def exception(msg, *args):
getLogger().exception(msg, *args)
def shutdown():
for k, logger in _loggers.items():
for h in logger.handlers:
h.close()
_loggers.pop(logger, None)
def addLevelName(level, name):
_level_dict[level] = name
def basicConfig(
filename=None,
filemode="a",
format=None,
datefmt=None,
level=WARNING,
stream=None,
encoding="UTF-8",
force=False,
):
if "root" not in _loggers:
_loggers["root"] = Logger("root")
logger = _loggers["root"]
if force or not logger.handlers:
for h in logger.handlers:
h.close()
logger.handlers = []
if filename is None:
handler = StreamHandler(stream)
else:
handler = FileHandler(filename, filemode, encoding)
handler.setLevel(level)
handler.setFormatter(Formatter(format, datefmt))
logger.setLevel(level)
logger.addHandler(handler)
if hasattr(sys, "atexit"):
sys.atexit(shutdown)

21
main.py
View File

@ -39,14 +39,24 @@ import primitives.queue as queue
from primitives import Switch, Pushbutton
from rgb import RGB
import logging
logger = logging.getLogger(__name__)
#logger = logging.getLogger()
logger.setLevel(level=logging.DEBUG)
#logger.setLevel(logging.DEBUG)
#logging = logging.getLogger(__name__)
### alive check ###
async def alive_check():
# loop for checked is alive
onboar_led = machine.Pin("LED", machine.Pin.OUT) # Pi Pico onboard LED
init_ticks = time.ticks_ms()
while True:
print(f"main while running ms: {time.ticks_ms() - init_ticks}")
while True:
logger.warning(f"main while running ms: {time.ticks_ms() - init_ticks}")
logger.debug("Alive check")
#print(f"main while running ms: {time.ticks_ms() - init_ticks}")
onboar_led.on()
await asyncio.sleep_ms(100)
onboar_led.off()
@ -70,6 +80,13 @@ async def start_game_comand(btn, rgb):
btn.press_func(None)
async def main(proto):
### Logging ###
global logger
### alive check ###
asyncio.create_task(alive_check())