diff --git a/lib/logging.py b/lib/logging.py new file mode 100644 index 0000000..d17e42c --- /dev/null +++ b/lib/logging.py @@ -0,0 +1,252 @@ +from micropython import const +import io +import sys +import time + +CRITICAL = const(50) +ERROR = const(40) +WARNING = const(30) +INFO = const(20) +DEBUG = const(10) +NOTSET = const(0) + +_DEFAULT_LEVEL = const(WARNING) + +_level_dict = { + CRITICAL: "CRITICAL", + ERROR: "ERROR", + WARNING: "WARNING", + INFO: "INFO", + DEBUG: "DEBUG", + NOTSET: "NOTSET", +} + +_loggers = {} +_stream = sys.stderr +_default_fmt = "%(levelname)s:%(name)s:%(message)s" +_default_datefmt = "%Y-%m-%d %H:%M:%S" + + +class LogRecord: + def set(self, name, level, message): + self.name = name + self.levelno = level + self.levelname = _level_dict[level] + self.message = message + self.ct = time.time() + self.msecs = int((self.ct - int(self.ct)) * 1000) + self.asctime = None + + +class Handler: + def __init__(self, level=NOTSET): + self.level = level + self.formatter = None + + def close(self): + pass + + def setLevel(self, level): + self.level = level + + def setFormatter(self, formatter): + self.formatter = formatter + + def format(self, record): + return self.formatter.format(record) + + +class StreamHandler(Handler): + def __init__(self, stream=None): + self.stream = _stream if stream is None else stream + self.terminator = "\n" + + def close(self): + if hasattr(self.stream, "flush"): + self.stream.flush() + + def emit(self, record): + if record.levelno >= self.level: + self.stream.write(self.format(record) + self.terminator) + + +class FileHandler(StreamHandler): + def __init__(self, filename, mode="a", encoding="UTF-8"): + super().__init__(stream=open(filename, mode=mode, encoding=encoding)) + + def close(self): + super().close() + self.stream.close() + + +class Formatter: + def __init__(self, fmt=None, datefmt=None): + self.fmt = _default_fmt if fmt is None else fmt + self.datefmt = _default_datefmt if datefmt is None else datefmt + + def usesTime(self): + return "asctime" in self.fmt + + def formatTime(self, datefmt, record): + if hasattr(time, "strftime"): + return time.strftime(datefmt, time.localtime(record.ct)) + return None + + def format(self, record): + if self.usesTime(): + record.asctime = self.formatTime(self.datefmt, record) + return self.fmt % { + "name": record.name, + "message": record.message, + "msecs": record.msecs, + "asctime": record.asctime, + "levelname": record.levelname, + } + + +class Logger: + def __init__(self, name, level=NOTSET): + self.name = name + self.level = level + self.handlers = [] + self.record = LogRecord() + + def setLevel(self, level): + self.level = level + + def isEnabledFor(self, level): + return level >= self.getEffectiveLevel() + + def getEffectiveLevel(self): + return self.level or getLogger().level or _DEFAULT_LEVEL + + def log(self, level, msg, *args): + if self.isEnabledFor(level): + if args: + if isinstance(args[0], dict): + args = args[0] + msg = msg % args + self.record.set(self.name, level, msg) + handlers = self.handlers + if not handlers: + handlers = getLogger().handlers + for h in handlers: + h.emit(self.record) + + def debug(self, msg, *args): + self.log(DEBUG, msg, *args) + + def info(self, msg, *args): + self.log(INFO, msg, *args) + + def warning(self, msg, *args): + self.log(WARNING, msg, *args) + + def error(self, msg, *args): + self.log(ERROR, msg, *args) + + def critical(self, msg, *args): + self.log(CRITICAL, msg, *args) + + def exception(self, msg, *args, exc_info=True): + self.log(ERROR, msg, *args) + tb = None + if isinstance(exc_info, BaseException): + tb = exc_info + elif hasattr(sys, "exc_info"): + tb = sys.exc_info()[1] + if tb: + buf = io.StringIO() + sys.print_exception(tb, buf) + self.log(ERROR, buf.getvalue()) + + def addHandler(self, handler): + self.handlers.append(handler) + + def hasHandlers(self): + return len(self.handlers) > 0 + + +def getLogger(name=None): + if name is None: + name = "root" + if name not in _loggers: + _loggers[name] = Logger(name) + if name == "root": + basicConfig() + return _loggers[name] + + +def log(level, msg, *args): + getLogger().log(level, msg, *args) + + +def debug(msg, *args): + getLogger().debug(msg, *args) + + +def info(msg, *args): + getLogger().info(msg, *args) + + +def warning(msg, *args): + getLogger().warning(msg, *args) + + +def error(msg, *args): + getLogger().error(msg, *args) + + +def critical(msg, *args): + getLogger().critical(msg, *args) + + +def exception(msg, *args): + getLogger().exception(msg, *args) + + +def shutdown(): + for k, logger in _loggers.items(): + for h in logger.handlers: + h.close() + _loggers.pop(logger, None) + + +def addLevelName(level, name): + _level_dict[level] = name + + +def basicConfig( + filename=None, + filemode="a", + format=None, + datefmt=None, + level=WARNING, + stream=None, + encoding="UTF-8", + force=False, +): + if "root" not in _loggers: + _loggers["root"] = Logger("root") + + logger = _loggers["root"] + + if force or not logger.handlers: + for h in logger.handlers: + h.close() + logger.handlers = [] + + if filename is None: + handler = StreamHandler(stream) + else: + handler = FileHandler(filename, filemode, encoding) + + handler.setLevel(level) + handler.setFormatter(Formatter(format, datefmt)) + + logger.setLevel(level) + logger.addHandler(handler) + + +if hasattr(sys, "atexit"): + sys.atexit(shutdown) diff --git a/main.py b/main.py index 02fd5f8..8a40c62 100644 --- a/main.py +++ b/main.py @@ -39,14 +39,24 @@ import primitives.queue as queue from primitives import Switch, Pushbutton from rgb import RGB +import logging +logger = logging.getLogger(__name__) +#logger = logging.getLogger() +logger.setLevel(level=logging.DEBUG) +#logger.setLevel(logging.DEBUG) + +#logging = logging.getLogger(__name__) + ### alive check ### async def alive_check(): # loop for checked is alive onboar_led = machine.Pin("LED", machine.Pin.OUT) # Pi Pico onboard LED init_ticks = time.ticks_ms() - while True: - print(f"main while running ms: {time.ticks_ms() - init_ticks}") + while True: + logger.warning(f"main while running ms: {time.ticks_ms() - init_ticks}") + logger.debug("Alive check") + #print(f"main while running ms: {time.ticks_ms() - init_ticks}") onboar_led.on() await asyncio.sleep_ms(100) onboar_led.off() @@ -70,6 +80,13 @@ async def start_game_comand(btn, rgb): btn.press_func(None) async def main(proto): + + ### Logging ### + global logger + + + + ### alive check ### asyncio.create_task(alive_check())