python日志输出全局脱敏

Posted by Shi Hai's Blog on March 30, 2024

一、背景介绍

考虑这个问题的初衷是一些token、password等可能被会被开发同学编码过程不注意输出出来,而且现在日志保存会保留一段时间,如果服务被攻击了,这些敏感信息可能就会被非法利用。为了解决这个问题,想从编程语言在对日志进行IO输出时就能默认对输出流进行一定的过滤。

  • Go语言:现在Go语言日志输出Log生态太多,没有顶层控制能力,所以需要结合Log生态考虑,比如:我们使用klog,我们就可以通过SetLogger()配置一些Filter进行日志输出脱敏;
  • Java语言:Java语言的Log生态还比较统一,主要是Logback和log4j。比如:我们使用的是Logback,那我们就可以直接在logbgack.xml中配置自定义的encoder就行。
  • Python语言:可以给Handlers注入各种自定义Filters,但各个三方库的日志输出是否能像Java Logback一样也会被Filter拦截过滤到,这个能力待确认打开。

二、典型场景分析

三方库打印敏感信息

有很多三方库会打印敏感信息,特别是走网络请求或者和中间件对接通信的一些三方库,例如:oslo.messaging。

oslo.messaging

这段代码是oslo.messaging的一段通知函数,这里如果payload涉及到一些敏感信息并且调用出错,这段payload就会输出到日志文件中 源码。 这段代码获取logger实例就是直接通过原生的logging模块执行_LOG = logging.getLogger(__name__)获取到的。

    def _notify(self, ctxt, event_type, payload, priority, publisher_id=None,
                retry=None):
        payload = self._serializer.serialize_entity(ctxt, payload)

        # NOTE(JayF): We must remove secure information from notification
        #             payloads, otherwise we risk sending sensitive creds
        #             to a notification bus.
        safe_ctxt = _sanitize_context(ctxt)
        ctxt = self._serializer.serialize_context(safe_ctxt)

        msg = dict(message_id=str(uuid.uuid4()),
                   publisher_id=publisher_id or self.publisher_id,
                   event_type=event_type,
                   priority=priority,
                   payload=payload,
                   timestamp=str(timeutils.utcnow()))

        def do_notify(ext):
            try:
                ext.obj.notify(ctxt, msg, priority, retry or self.retry)
            except Exception as e:
                _LOG.exception("Problem '%(e)s' attempting to send to "
                               "notification system. Payload=%(payload)s",
                               {'e': e, 'payload': payload})

        if self._driver_mgr.extensions:
            self._driver_mgr.map(do_notify)

通过_LOG = logging.getLogger(__name__)获取到的logger实际就是Logging.Logger类创建出来的实例 源码
另外,Logger实例的parentroot logger 源码

三、技术实现

通过直接注入filters方式对输出进行过滤

如果是单个模块调用,可以通过直接注入filters的方式对输出文件进行过滤,但是如果我们有大量的模块以及有依赖的三方库,就会导致我们需要多个模块逐一配置或者就无法被纳管住。

import logging
from random import choice

class ContextFilter(logging.Filter):
    """
    This is a filter which injects contextual information into the log.

    Rather than use actual contextual information, we just use random
    data in this demo.
    """

    USERS = ['jim', 'fred', 'sheila']
    IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']

    def filter(self, record):

        record.ip = choice(ContextFilter.IPS)
        record.user = choice(ContextFilter.USERS)
        return True

if __name__ == '__main__':
    levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
    a1 = logging.getLogger('a.b.c')
    a2 = logging.getLogger('d.e.f')

    f = ContextFilter()
    a1.addFilter(f)
    a2.addFilter(f)
    a1.debug('A debug message')
    a1.info('An info message with %s', 'some parameters')

通过dictConfig()配置filters

通过调用dictConfig()函数,我们就可以从一个字典中获取到logging配置。这样比较简单高效,就是我们在dict中定义好handlers和filters,创建出来的Logger实例就直接有了这些默认配置,但这种方式可能会有两个问题:

import logging
import logging.config
import sys

class MyFilter(logging.Filter):
    def __init__(self, param=None):
        self.param = param

    def filter(self, record):
        if self.param is None:
            allow = True
        else:
            allow = self.param not in record.msg
        if allow:
            record.msg = 'changed: ' + record.msg
        return allow

LOGGING = {
    'version': 1,
    'filters': {
        'myfilter': {
            '()': MyFilter,
            'param': 'noshow',
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'filters': ['myfilter']
        }
    },
    'root': {
        'level': 'DEBUG',
        'handlers': ['console']
    },
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.debug('hello')
    logging.debug('hello - noshow')

通过root logger进行配置

Filters不像Levels和Handlers,通过顶层root logger配置好,子类就能继承下来。 另外,配置Filters有两种方式:一种就是直接对Logger实例调用setFilter()函数添加Filters;而另一种是给root logger添加Handlers,然后在Handler里面通过调用SetFilter()函数添加Filters。
Logger.handle()源码

# 这段代码是Logger.handle()里面的函数,里面实现了对日志记录的处理逻辑,先执行filter,然后再调用handler。
def handle(self, record):
    """
    Call the handlers for the specified record.

    This method is used for unpickled records received from a socket, as
    well as those created locally. Logger-level filtering is applied.
    """
    if self.disabled:
        return
    maybe_record = self.filter(record)
    if not maybe_record:
        return
    if isinstance(maybe_record, LogRecord):
        record = maybe_record
    self.callHandlers(record)

为什么Logger实例和Handler实例都可以添加Filters呢?那是因为两者都继承自Filiterer源码
因为所有的Logger实例的parent父类都是root logger实例,那我们是不是可以直接对root logger实例直接添加Handlers来实现输出脱敏呢?
实际从下面写的demo代码看是可行的。

class MyFilter(logging.Filter):

    def __init__(self, param):
        super().__init__()
        self._param = param

    def filter(self, record):
        #import pdb;pdb.set_trace()
        if self._param is None:
            allow = True
        else:
            allow = self._param not in record.msg

        if allow:
            record.msg = 'changed: ' + record.msg
        return allow


def config_root_logger():
    '''Config root logger'''
    myFilter = MyFilter(param='noshow')
    handlers = logging.root.handlers
    for h in handlers:
        h.addFilter(myFilter)


# 如果root已经有了handlers,则调用basicConfig()就不会对root logger的handlers做处理
# 指定filename时,实际在logging模块里面就会创建出FileHandler实例
logging.basicConfig(filename="myapp.log", level=logging.DEBUG)
# 对root logger中的所有handlers添加filter
config_root_logger()
logger = logging.getLogger(__name__)
logger.debug("debug: Hello World")
logger.debug("debug: Hello World - noshow")
logger.info("info: Hello World")
logger.error("error: Hello World")

三、参考文档