Source code for pyacq.core.rpc.log.handler

# -*- coding: utf-8 -*-
# Copyright (c) 2016, French National Center for Scientific Research (CNRS)
# Distributed under the (new) BSD License. See LICENSE for more info.

import logging
import sys
import socket
import os
import threading
import time
import atexit
import traceback

from .remote import get_host_name, get_process_name, get_thread_name

    import colorama
    _ints = [colorama.Style.NORMAL, colorama.Style.BRIGHT, colorama.Style.DIM]
    _fcolors = [colorama.Fore.WHITE, colorama.Fore.GREEN, colorama.Fore.RED,
                colorama.Fore.CYAN, colorama.Fore.YELLOW, colorama.Fore.BLUE,
    _bcolors = [colorama.Back.WHITE, colorama.Back.GREEN, colorama.Back.RED,
                colorama.Back.CYAN, colorama.Back.YELLOW, colorama.Back.BLUE,
    _thread_color_list = [i+c for i in _ints for c in _fcolors[1:]]  # skip white
    _level_color_map = {
        0: colorama.Style.DIM + colorama.Fore.WHITE,
        logging.DEBUG: colorama.Style.DIM + colorama.Fore.WHITE,
        logging.INFO: colorama.Style.BRIGHT + colorama.Fore.WHITE,
        logging.WARNING: colorama.Style.BRIGHT + colorama.Fore.YELLOW,
        logging.ERROR: colorama.Style.BRIGHT + colorama.Fore.RED,
        logging.CRITICAL: colorama.Back.RED,
except ImportError:

[docs]class RPCLogHandler(logging.StreamHandler): """StreamHandler that sorts incoming log records by their creation time and writes to stderr. Messages are also colored by their log level and the host/process/thread that created the record. Credit: Parameters ---------- stream : file-like The stream to which messages should be sent. The default is sys.stderr. """ thread_headers = {} def __init__(self, stream=sys.stderr): if HAVE_COLORAMA: logging.StreamHandler.__init__(self, colorama.AnsiToWin32(stream).stream) else: logging.StreamHandler.__init__(self, stream) # Hold log records for 0.5 sec before printing them to allow sorting # by creation time. self.delay = 0.2 self.record_lock = threading.Lock() self.records = [] self.thread = threading.Thread(target=self.poll_records, daemon=True) self.thread.start() atexit.register(self.flush_records) @property def is_tty(self): isatty = getattr(, 'isatty', None) return isatty and isatty() def emit(self, record): # send record to sorting thread with self.record_lock: self.records.append(record) self.records.sort(key=lambda rec: rec.created) def poll_records(self): while True: # collect all records more than 0.5 sec old limit = time.time() - self.delay recs = [] with self.record_lock: while len(self.records) > 0 and self.records[0].created < limit: recs.append(self.records.pop(0)) # emit records or sleep if len(recs) > 0: for rec in recs: logging.StreamHandler.emit(self, rec) else: time.sleep(0.2) def format(self, record): header = self.get_thread_header(record) message = logging.StreamHandler.format(self, record) if HAVE_COLORAMA: ind = record.levelno // 10 * 10 # decrease to multiple of 10 message = _level_color_map[ind] + message + colorama.Style.RESET_ALL return header + ' ' + message def get_thread_header(self, record): hid = getattr(record, 'hostname', get_host_name()) pid = getattr(record, 'process_name', get_process_name()) tid = getattr(record, 'thread_name', get_thread_name(record.thread)) key = (hid, pid, tid) header = self.thread_headers.get(key, None) if header is None: header = '[%s:%s:%s]' % (hid, pid, tid) if HAVE_COLORAMA: color = _thread_color_list[len(self.thread_headers) % len(_thread_color_list)] header = color + header + colorama.Style.RESET_ALL self.thread_headers[key] = header return header def colorize(self, message, record): if not HAVE_COLORAMA: return message try: tid = threading.current_thread().ident color = RPCLogHandler.thread_colors.get(tid, None) if color is None: ind = len(RPCLogHandler.thread_colors) % len(_color_list) ind = ind//10*10 # decrease to multiple of 10 color = _color_list[ind] RPCLogHandler.thread_colors[tid] = color return (color + message + colorama.Style.RESET_ALL) except KeyError: return message def flush_records(self): with self.record_lock: recs = self.records[:] self.records = [] for rec in recs: logging.StreamHandler.emit(self, rec)
_sys_excepthook = None def _log_unhandled_exception(exc, val, tb): global _sys_excepthook exc_str = traceback.format_stack() exc_str += [" < exception caught here >\n"] exc_str += traceback.format_exception(exc, val, tb)[1:] exc_str = ''.join([' ' + line for line in exc_str]) logging.getLogger().warn("Unhandled exception:\n%s", exc_str) #_sys_excepthook(exc, val, tb)
[docs]def log_exceptions(): """Install a hook that creates log messages from unhandled exceptions. """ global _sys_excepthook if sys.excepthook is _log_unhandled_exception: return _sys_excepthook = sys.excepthook sys.excepthook = _log_unhandled_exception