Source code for aliyun.log.logger_hanlder

import logging
from .logclient import LogClient
from .logitem import LogItem
from .putlogsrequest import PutLogsRequest
from threading import Thread
import atexit
from time import time, sleep
from enum import Enum
from .version import LOGGING_HANDLER_USER_AGENT
try:
    from collections.abc import Callable
except ImportError:
    from collections import Callable
import six

if six.PY2:
    from Queue import Empty, Full, Queue
else:
    from queue import Empty, Full, Queue

import json
import re


[docs]class LogFields(Enum): """fields used to upload automatically Possible fields: record_name, level, func_name, module, file_path, line_no, process_id, process_name, thread_id, thread_name """ record_name = 'name' level = 'levelname' file_name = 'filename' func_name = 'funcName' module = 'module' file_path = 'pathname' line_no = 'lineno' process_id = 'process' process_name = 'processName' thread_id = 'thread' thread_name = 'threadName' level_no = 'levelno' asc_time = 'asctime' created_timestamp = 'created' micro_second = 'msecs' relative_created = 'relativeCreated'
DEFAULT_RECORD_LOG_FIELDS = set((LogFields.record_name, LogFields.level, LogFields.func_name, LogFields.module, LogFields.file_path, LogFields.line_no, LogFields.process_id, LogFields.process_name, LogFields.thread_id, LogFields.thread_name)) BLACK_FIELD_LIST = set(['exc_info', 'exc_text', 'stack_info', 'msg', 'args', 'message']) BUILTIN_LOG_FIELDS_NAMES = set(x for x in dir(LogFields) if not x.startswith('__')) BUILTIN_LOG_FIELDS_NAMES.update(set(LogFields[x].value for x in BUILTIN_LOG_FIELDS_NAMES)) BUILTIN_LOG_FIELDS_NAMES.update(BLACK_FIELD_LIST)
[docs]class SimpleLogHandler(logging.Handler, object): """ SimpleLogHandler, blocked sending any logs, just for simple test purpose :param end_point: log service endpoint :param access_key_id: access key id :param access_key: access key :param project: project name :param log_store: logstore name :param topic: topic, by default is empty :param fields: list of LogFields or list of names of LogFields, default is LogFields.record_name, LogFields.level, LogFields.func_name, LogFields.module, LogFields.file_path, LogFields.line_no, LogFields.process_id, LogFields.process_name, LogFields.thread_id, LogFields.thread_name, you could also just use he string name like 'thread_name', it's also possible customize extra fields in this list by disable extra fields and put white list here. :param buildin_fields_prefix: prefix of builtin fields, default is empty. suggest using "__" when extract json is True to prevent conflict. :param buildin_fields_suffix: suffix of builtin fields, default is empty. suggest using "__" when extract json is True to prevent conflict. :param extract_json: if extract json automatically, default is False :param extract_json_drop_message: if drop message fields if it's JSON and extract_json is True, default is False :param extract_json_prefix: prefix of fields extracted from json when extract_json is True. default is "" :param extract_json_suffix: suffix of fields extracted from json when extract_json is True. default is empty :param extract_kv: if extract kv like k1=v1 k2="v 2" automatically, default is False :param extract_kv_drop_message: if drop message fields if it's kv and extract_kv is True, default is False :param extract_kv_prefix: prefix of fields extracted from KV when extract_json is True. default is "" :param extract_kv_suffix: suffix of fields extracted from KV when extract_json is True. default is "" :param extract_kv_sep: separator for KV case, defualt is '=', e.g. k1=v1 :param extra: if show extra info, default True to show all. default is True. Note: the extra field will also be handled with buildin_fields_prefix/suffix :param kwargs: other parameters passed to logging.Handler """ def __init__(self, end_point, access_key_id, access_key, project, log_store, topic=None, fields=None, buildin_fields_prefix=None, buildin_fields_suffix=None, extract_json=None, extract_json_drop_message=None, extract_json_prefix=None, extract_json_suffix=None, extract_kv=None, extract_kv_drop_message=None, extract_kv_prefix=None, extract_kv_suffix=None, extract_kv_sep=None, extra=None, **kwargs): logging.Handler.__init__(self, **kwargs) self.end_point = end_point self.access_key_id = access_key_id self.access_key = access_key self.project = project self.log_store = log_store self.client = None self.topic = topic self.fields = DEFAULT_RECORD_LOG_FIELDS if fields is None else set(fields) self.extract_json = False if extract_json is None else extract_json self.extract_json_prefix = "" if extract_json_prefix is None else extract_json_prefix self.extract_json_suffix = "" if extract_json_suffix is None else extract_json_suffix self.extract_json_drop_message = False if extract_json_drop_message is None else extract_json_drop_message self.buildin_fields_prefix = "" if buildin_fields_prefix is None else buildin_fields_prefix self.buildin_fields_suffix = "" if buildin_fields_suffix is None else buildin_fields_suffix self.extract_kv = False if extract_kv is None else extract_kv self.extract_kv_prefix = "" if extract_kv_prefix is None else extract_kv_prefix self.extract_kv_suffix = "" if extract_kv_suffix is None else extract_kv_suffix self.extract_kv_drop_message = False if extract_kv_drop_message is None else extract_kv_drop_message self.extract_kv_sep = "=" if extract_kv_sep is None else extract_kv_sep self.extract_kv_ptn = self._get_extract_kv_ptn() self.extra = True if extra is None else extra def set_topic(self, topic): self.topic = topic def create_client(self): self.client = LogClient(self.end_point, self.access_key_id, self.access_key) self.client.set_user_agent(LOGGING_HANDLER_USER_AGENT) def send(self, req): if self.client is None: self.create_client() return self.client.put_logs(req) def set_fields(self, fields): self.fields = fields @staticmethod def _n(v): if v is None: return "" if isinstance(v, (dict, list, tuple)): try: v = json.dumps(v) except Exception: pass elif six.PY2 and isinstance(v, six.text_type): v = v.encode('utf8', "ignore") elif six.PY3 and isinstance(v, six.binary_type): v = v.decode('utf8', "ignore") return str(v) def extract_dict(self, message): data = [] if isinstance(message, dict): for k, v in six.iteritems(message): data.append(("{0}{1}{2}".format(self.extract_json_prefix, self._n(k), self.extract_json_suffix), self._n(v))) return data def _get_extract_kv_ptn(self): sep = self.extract_kv_sep p1 = u'(?!{0})([\u4e00-\u9fa5\u0800-\u4e00\\w\\.\\-]+)\\s*{0}\\s*([\u4e00-\u9fa5\u0800-\u4e00\\w\\.\\-]+)' p2 = u'(?!{0})([\u4e00-\u9fa5\u0800-\u4e00\\w\\.\\-]+)\\s*{0}\\s*"\s*([^"]+?)\s*"' ps = '|'.join([p1, p2]).format(sep) return re.compile(ps) def extract_kv_str(self, message): if isinstance(message, six.binary_type): message = message.decode('utf8', 'ignore') r = self.extract_kv_ptn.findall(message) data = [] for k1, v1, k2, v2 in r: if k1: data.append(("{0}{1}{2}".format(self.extract_kv_prefix, self._n(k1), self.extract_kv_suffix), self._n(v1))) elif k2: data.append(("{0}{1}{2}".format(self.extract_kv_prefix, self._n(k2), self.extract_kv_suffix), self._n(v2))) return data def _add_record_fields(self, record, k, contents): v = getattr(record, k, None) if v is None or isinstance(v, Callable): return v = self._n(v) contents.append(("{0}{1}{2}".format(self.buildin_fields_prefix, k, self.buildin_fields_suffix), v)) def make_request(self, record): contents = [] message_field_name = "{0}message{1}".format(self.buildin_fields_prefix, self.buildin_fields_suffix) if isinstance(record.msg, dict) and self.extract_json: data = self.extract_dict(record.msg) contents.extend(data) if not self.extract_json_drop_message or not data: contents.append((message_field_name, self.format(record))) elif isinstance(record.msg, (six.text_type, six.binary_type)) and self.extract_kv: data = self.extract_kv_str(record.msg) contents.extend(data) if not self.extract_kv_drop_message or not data: # if it's not KV contents.append((message_field_name, self.format(record))) else: contents = [(message_field_name, self.format(record))] # add builtin fields for x in self.fields: k = x if isinstance(x, LogFields): k = x.name x = x.value elif isinstance(x, (six.binary_type, six.text_type)): if x in BLACK_FIELD_LIST: continue # by pass for those reserved fields. make no sense to render them if x in BUILTIN_LOG_FIELDS_NAMES: k = LogFields[x].name x = LogFields[x].value elif self.extra: # will handle it later continue self._add_record_fields(record, x, contents) # handle extra if self.extra: for x in dir(record): if not x.startswith('__') and not x in BUILTIN_LOG_FIELDS_NAMES: self._add_record_fields(record, x, contents) item = LogItem(contents=contents, timestamp=record.created) return PutLogsRequest(self.project, self.log_store, self.topic, logitems=[item, ]) def emit(self, record): try: req = self.make_request(record) self.send(req) except Exception as e: self.handleError(record)
[docs]class QueuedLogHandler(SimpleLogHandler): """ Queued Log Handler, tuned async log handler. :param end_point: log service endpoint :param access_key_id: access key id :param access_key: access key :param project: project name :param log_store: logstore name :param topic: topic, default is empty :param fields: list of LogFields, default is LogFields.record_name, LogFields.level, LogFields.func_name, LogFields.module, LogFields.file_path, LogFields.line_no, LogFields.process_id, LogFields.process_name, LogFields.thread_id, LogFields.thread_name :param queue_size: queue size, default is 40960 logs, about 10MB ~ 40MB :param put_wait: maximum delay to send the logs, by default 2 seconds and wait double time for when Queue is full. :param close_wait: when program exit, it will try to send all logs in queue in this timeperiod, by default 5 seconds :param batch_size: merge this cound of logs and send them batch, by default min(1024, queue_size) :param buildin_fields_prefix: prefix of builtin fields, default is empty. suggest using "__" when extract json is True to prevent conflict. :param buildin_fields_suffix: suffix of builtin fields, default is empty. suggest using "__" when extract json is True to prevent conflict. :param extract_json: if extract json automatically, default is False :param extract_json_drop_message: if drop message fields if it's JSON and extract_json is True, default is False :param extract_json_prefix: prefix of fields extracted from json when extract_json is True. default is "" :param extract_json_suffix: suffix of fields extracted from json when extract_json is True. default is empty :param extract_kv: if extract kv like k1=v1 k2="v 2" automatically, default is False :param extract_kv_drop_message: if drop message fields if it's kv and extract_kv is True, default is False :param extract_kv_prefix: prefix of fields extracted from KV when extract_json is True. default is "" :param extract_kv_suffix: suffix of fields extracted from KV when extract_json is True. default is "" :param extract_kv_sep: separator for KV case, defualt is '=', e.g. k1=v1 :param extra: if show extra info, default True to show all. default is True :param kwargs: other parameters passed to logging.Handler """ def __init__(self, end_point, access_key_id, access_key, project, log_store, topic=None, fields=None, queue_size=None, put_wait=None, close_wait=None, batch_size=None, buildin_fields_prefix=None, buildin_fields_suffix=None, extract_json=None, extract_json_drop_message=None, extract_json_prefix=None, extract_json_suffix=None, extract_kv=None, extract_kv_drop_message=None, extract_kv_prefix=None, extract_kv_suffix=None, extract_kv_sep=None, extra=None, **kwargs): super(QueuedLogHandler, self).__init__(end_point, access_key_id, access_key, project, log_store, topic=topic, fields=fields, extract_json=extract_json, extract_json_drop_message=extract_json_drop_message, extract_json_prefix=extract_json_prefix, extract_json_suffix=extract_json_suffix, buildin_fields_prefix=buildin_fields_prefix, buildin_fields_suffix=buildin_fields_suffix, extract_kv=extract_kv, extract_kv_drop_message=extract_kv_drop_message, extract_kv_prefix=extract_kv_prefix, extract_kv_suffix=extract_kv_suffix, extract_kv_sep=extract_kv_sep, extra=extra, **kwargs) self.stop_flag = False self.stop_time = None self.put_wait = put_wait or 2 # default is 2 seconds self.close_wait = close_wait or 5 # default is 5 seconds self.queue_size = queue_size or 40960 # default is 40960, about 10MB ~ 40MB self.batch_size = min(batch_size or 1024, self.queue_size) # default is 1024 items self.init_worker() def init_worker(self): self.worker = Thread(target=self._post) self.queue = Queue(self.queue_size) self.worker.setDaemon(True) self.worker.start() atexit.register(self.stop) def flush(self): self.stop() def stop(self): self.stop_time = time() self.stop_flag = True self.worker.join(timeout=self.close_wait + 1) def emit(self, record): req = self.make_request(record) req.__record__ = record try: self.queue.put(req, timeout=self.put_wait*2) except Full as ex: self.handleError(record) def _get_batch_requests(self, timeout=None): """try to get request as fast as possible, once empty and stop falg or time-out, just return Empty""" reqs = [] s = time() while len(reqs) < self.batch_size and (time() - s) < timeout: try: req = self.queue.get(block=False) self.queue.task_done() reqs.append(req) except Empty as ex: if self.stop_flag: break else: sleep(0.1) if not reqs: raise Empty elif len(reqs) <= 1: return reqs[0] else: logitems = [] req = reqs[0] for req in reqs: logitems.extend(req.get_log_items()) ret = PutLogsRequest(self.project, self.log_store, req.topic, logitems=logitems) ret.__record__ = req.__record__ return ret def _post(self): while not self.stop_flag or (time() - self.stop_time) <= self.close_wait: try: req = self._get_batch_requests(timeout=self.put_wait) except Empty as ex: if self.stop_flag: break else: continue try: self.send(req) except Exception as ex: self.handleError(req.__record__)
[docs]class UwsgiQueuedLogHandler(QueuedLogHandler): """ Queued Log Handler for Uwsgi, depends on library `uwsgidecorators`, need to deploy it separatedly. :param end_point: log service endpoint :param access_key_id: access key id :param access_key: access key :param project: project name :param log_store: logstore name :param topic: topic, default is empty :param fields: list of LogFields, default is LogFields.record_name, LogFields.level, LogFields.func_name, LogFields.module, LogFields.file_path, LogFields.line_no, LogFields.process_id, LogFields.process_name, LogFields.thread_id, LogFields.thread_name :param queue_size: queue size, default is 40960 logs, about 10MB ~ 40MB :param put_wait: maximum delay to send the logs, by default 2 seconds and wait double time for when Queue is full. :param close_wait: when program exit, it will try to send all logs in queue in this timeperiod, by default 2 seconds :param batch_size: merge this cound of logs and send them batch, by default min(1024, queue_size) :param buildin_fields_prefix: prefix of builtin fields, default is empty. suggest using "__" when extract json is True to prevent conflict. :param buildin_fields_suffix: suffix of builtin fields, default is empty. suggest using "__" when extract json is True to prevent conflict. :param extract_json: if extract json automatically, default is False :param extract_json_drop_message: if drop message fields if it's JSON and extract_json is True, default is False :param extract_json_prefix: prefix of fields extracted from json when extract_json is True. default is "" :param extract_json_suffix: suffix of fields extracted from json when extract_json is True. default is empty :param extract_kv: if extract kv like k1=v1 k2="v 2" automatically, default is False :param extract_kv_drop_message: if drop message fields if it's kv and extract_kv is True, default is False :param extract_kv_prefix: prefix of fields extracted from KV when extract_json is True. default is "" :param extract_kv_suffix: suffix of fields extracted from KV when extract_json is True. default is "" :param extract_kv_sep: separator for KV case, defualt is '=', e.g. k1=v1 :param extra: if show extra info, default True to show all. default is True :param kwargs: other parameters passed to logging.Handler """ def __init__(self, *args, **kwargs): # change close_wait from default 5 to 2 if len(args) >= 10: if args[9] is None: args = args[:9] + (2,) + args[10:] elif 'close_wait' in kwargs and kwargs['close_wait'] is None: kwargs['close_wait'] = 2 super(UwsgiQueuedLogHandler, self).__init__(*args, **kwargs) def init_worker(self): self.queue = Queue(self.queue_size) from uwsgidecorators import postfork, thread self._post = postfork(thread(self._post)) def stop(self): self.stop_time = time() self.stop_flag = True