"""Utils tools for logging and progress bar."""
import os
import sys
from pathlib import Path
from typing import Iterable, Literal, Optional, Sequence, Tuple, Union
import loguru
from loguru import logger
from rich import get_console
from rich.console import Console
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
ProgressColumn,
ProgressType,
Task,
TaskProgressColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from rich.text import Text
from ..data_structure.constants import PathLike
__all__ = ['setup_logger']
class LoggerWrapper:
"""A wrapper for logger tools."""
is_setup = False
logger_record = set()
# ^8 means center align, 8 characters
logger_format = '{time:YYYY-MM-DD HH:mm:ss} | ' + '{level:^8} | ' + '{message}'
@staticmethod
def find_logger_file() -> str:
"""Find the logger file."""
from loguru._file_sink import FileSink
for _, _handler in logger._core.handlers.items():
_sink = _handler._sink
if isinstance(_sink, FileSink):
return _sink._path
@classmethod
def filter_unique(cls, record: 'loguru.Record', level_name: str = 'WARNING') -> bool:
"""Filter unique warning massage.
Args:
record (loguru.Record): Loguru record.
level_name (str, optional): logging level. Defaults to "WARNING".
Returns:
bool: _description_
"""
msg = record['message']
level = record['level'].name
if level != level_name:
return True
msg = f'{level}: {msg}'
if msg not in cls.logger_record:
cls.logger_record.add(msg)
return True
return False
@classmethod
def setup_logging(
cls,
level: Literal['RPC', 'TRACE', 'DEBUG', 'INFO', 'SUCCESS', 'WARNING', 'ERROR', 'CRITICAL'] = 'INFO',
log_path: 'Optional[PathLike]' = None,
log_path_level: Literal['RPC', 'TRACE', 'DEBUG', 'INFO', 'SUCCESS', 'WARNING', 'ERROR', 'CRITICAL'] = 'DEBUG',
replace: bool = True,
) -> 'loguru.Logger':
"""Setup logging to file and console.
Args:
level (Literal['RPC', 'TRACE', 'DEBUG', 'INFO', 'SUCCESS', 'WARNING', 'ERROR', 'CRITICAL'], optional):
logging level. Defaults to "INFO", find more in https://loguru.readthedocs.io/en/stable/api/logger.html.
log_path (Path, optional): path to save the log file. Defaults to None.
log_path_level (Literal['RPC', 'TRACE', 'DEBUG', 'INFO', 'SUCCESS', 'WARNING', 'ERROR', 'CRITICAL'], optional):
logging level for the log file. Defaults to 'DEBUG'.
replace (bool, optional): replace the log file if exists. Defaults to True.
"""
if cls.is_setup:
return logger
cls.setup_encoding()
# add custom level called RPC, which is the minimum level
logger.level('RPC', no=1, color='<white>', icon='📢')
logger.remove() # remove default logger
# logger.add(sink=lambda msg: rprint(msg, end=''), level=level, format=cls.logger_format)
# c = Console(
# width=sys.maxsize, # disable wrapping
# log_time=False,
# log_path=False,
# log_time_format='',
# )
logger.add(sink=lambda msg: get_console().print(msg, end=''), level=level, format=cls.logger_format)
if log_path:
# add file logger
log_path = Path(log_path).resolve()
log_path.parent.mkdir(parents=True, exist_ok=True)
if replace and log_path.exists():
log_path.unlink(missing_ok=True)
logger.add(
log_path, level=log_path_level, filter=cls.filter_unique, format=cls.logger_format, encoding='utf-8'
)
logger.info(f'Python Logging to "{log_path.as_posix()}"')
cls.is_setup = True
return logger
@staticmethod
def setup_encoding(
encoding: Optional[str] = None,
errorhandler: Literal['ignore', 'replace', 'backslashreplace', 'xmlcharrefreplace'] = 'backslashreplace',
):
"""Modify `PYTHONIOENCODING` to prevent suppress UnicodeEncodeError caused by logging of emojis.
It will affect the default behavior of `sys.stdin`, `sys.stdout` and `sys.stderr`.
Ref: https://docs.python.org/3/using/cmdline.html#envvar-PYTHONIOENCODING
Args:
errorhandler (Literal['ignore', 'replace', 'backslashreplace', 'xmlcharrefreplace'], optional):
specify which error handler to handle unsupported characters. 'strict' is forbidden. Defaults to 'replace'.
Ref to https://docs.python.org/3/library/stdtypes.html#str.encode
"""
encodingname, _, handler = os.environ.get('PYTHONIOENCODING', '').lower().partition(':')
encoding = encodingname if encodingname else encoding
# Set an errorhandler except for "strict"
if handler in ('ignore', 'replace', 'backslashreplace', 'xmlcharrefreplace'):
errorhandler = handler
# elif handler in ('', 'strict'):
# print(
# f'PYTHONIOENCODING is going to use "strict" errorhandler, which could raise errors during logging. Reset to "{errorhandler}"',
# file=sys.stderr,
# )
# else:
# print(
# f'PYTHONIOENCODING is set with invalid errorhandler "{handler}". Reset to "{errorhandler}"',
# file=sys.stderr,
# )
# if not encoding.lower().startswith("utf"):
os.environ['PYTHONIOENCODING'] = f"{encoding or ''}:{errorhandler}"
[docs]
def setup_logger(
level: Literal['RPC', 'TRACE', 'DEBUG', 'INFO', 'SUCCESS', 'WARNING', 'ERROR', 'CRITICAL'] = 'INFO',
log_path: 'Optional[PathLike]' = None,
log_path_level: Literal['RPC', 'TRACE', 'DEBUG', 'INFO', 'SUCCESS', 'WARNING', 'ERROR', 'CRITICAL'] = 'DEBUG',
replace: bool = True,
) -> 'loguru.Logger':
"""Setup logging to file and console.
Args:
level (Literal['TRACE', 'DEBUG', 'INFO', 'SUCCESS', 'WARNING', 'ERROR', 'CRITICAL'], optional): logging level.
Defaults to 'INFO', find more in https://loguru.readthedocs.io/en/stable/api/logger.html.
The order of the levels is:
- 'RPC' (custom level): logging RPC messages which are sent by RPC protocols.
- 'TRACE': logging engine output like console output of blender.
- 'DEBUG': logging debug messages.
- 'INFO': logging info messages.
- ...
log_path (Path, optional): path to save the log file. Defaults to None.
log_path_level (Literal['RPC', 'TRACE', 'DEBUG', 'INFO', 'SUCCESS', 'WARNING', 'ERROR', 'CRITICAL'], optional):
logging level for the log file. Defaults to 'DEBUG'.
replace (bool, optional): replace the log file if exists. Defaults to True.
"""
try:
return LoggerWrapper.setup_logging(level, log_path, log_path_level, replace)
except Exception as e:
import traceback
print(repr(e))
print(traceback.format_exc())
raise e
#### (rich) progress bar ####
class SpeedColumn(ProgressColumn):
"""Renders the speed of a task."""
def render(self, task: 'Task') -> Text:
speed = task.speed
if speed is None:
return Text('N/A', style='progress.data.speed')
return Text(f'{1/speed:.1f} s/step', style='progress.data.speed')
class TimeProgress(Progress):
@classmethod
def get_default_columns(cls) -> Tuple[Union[ProgressColumn, str], ...]:
return (
TextColumn('{task.description}'),
MofNCompleteColumn(),
BarColumn(),
TaskProgressColumn(show_speed=True),
'• Remaining',
TimeRemainingColumn(),
'• Elapsed',
TimeElapsedColumn(),
'•',
SpeedColumn(),
)
def track(
sequence: Union[Sequence[ProgressType], Iterable[ProgressType]],
description: str = 'Working...',
total: Optional[float] = None,
disable: bool = False,
) -> Iterable[ProgressType]:
"""Custom Track progress by iterating over a sequence."""
progress = TimeProgress(
speed_estimate_period=24 * 60 * 60,
refresh_per_second=1,
disable=disable,
) # 1 day as speed estimate period for long task
with progress:
yield from progress.track(
sequence,
total=total,
description=description,
)