Skip to main content

A Python logging handler for Fluentd event collector

Project description

Build Status Coverage Status

Many web/mobile applications generate huge amount of event logs (c,f. login, logout, purchase, follow, etc). To analyze these event logs could be really valuable for improving the service. However, the challenge is collecting these logs easily and reliably.

Fluentd solves that problem by having: easy installation, small footprint, plugins, reliable buffering, log forwarding, etc.

fluent-logger-python is a Python library, to record the events from Python application.

NOTE: This is a fork of https://github.com/fluent/fluent-logger-python, it only adds TLS capability to the original package.

Requirements

  • Python 2.7 or 3.4+

  • msgpack-python

  • IMPORTANT: Version 0.8.0 is the last version supporting Python 2.6, 3.2 and 3.3

Installation

This library is distributed as ‘fluent-logger’ python package. Please execute the following command to install it.

$ pip install mona-fluent-logger

Configuration

Fluentd daemon must be launched with a tcp source configuration:

<source>
  type forward
  port 24224
</source>

To quickly test your setup, add a matcher that logs to the stdout:

<match app.**>
  type stdout
</match>

Usage

FluentSender Interface

sender.FluentSender is a structured event logger for Fluentd.

By default, the logger assumes fluentd daemon is launched locally. You can also specify remote logger by passing the options.

from fluent import sender

# for local fluent
logger = sender.FluentSender('app')

# for remote fluent
logger = sender.FluentSender('app', host='host', port=24224)

For sending event, call emit method with your event. Following example will send the event to fluentd, with tag ‘app.follow’ and the attributes ‘from’ and ‘to’.

# Use current time
logger.emit('follow', {'from': 'userA', 'to': 'userB'})

# Specify optional time
cur_time = int(time.time())
logger.emit_with_time('follow', cur_time, {'from': 'userA', 'to':'userB'})

To send events with nanosecond-precision timestamps (Fluent 0.14 and up), specify nanosecond_precision on FluentSender.

# Use nanosecond
logger = sender.FluentSender('app', nanosecond_precision=True)
logger.emit('follow', {'from': 'userA', 'to': 'userB'})
logger.emit_with_time('follow', time.time(), {'from': 'userA', 'to': 'userB'})

You can detect an error via return value of emit. If an error happens in emit, emit returns False and get an error object using last_error method.

if not logger.emit('follow', {'from': 'userA', 'to': 'userB'}):
    print(logger.last_error)
    logger.clear_last_error() # clear stored error after handled errors

If you want to shutdown the client, call close() method.

logger.close()

Event-Based Interface

This API is a wrapper for sender.FluentSender.

First, you need to call sender.setup() to create global sender.FluentSender logger instance. This call needs to be called only once, at the beginning of the application for example.

Initialization code of Event-Based API is below:

from fluent import sender

# for local fluent
sender.setup('app')

# for remote fluent
sender.setup('app', host='host', port=24224)

Then, please create the events like this. This will send the event to fluentd, with tag ‘app.follow’ and the attributes ‘from’ and ‘to’.

from fluent import event

# send event to fluentd, with 'app.follow' tag
event.Event('follow', {
  'from': 'userA',
  'to':   'userB'
})

event.Event has one limitation which can’t return success/failure result.

Other methods for Event-Based Interface.

sender.get_global_sender # get instance of global sender
sender.close # Call FluentSender#close

Handler for buffer overflow

You can inject your own custom proc to handle buffer overflow in the event of connection failure. This will mitigate the loss of data instead of simply throwing data away.

import msgpack
from io import BytesIO

def overflow_handler(pendings):
    unpacker = msgpack.Unpacker(BytesIO(pendings))
    for unpacked in unpacker:
        print(unpacked)

logger = sender.FluentSender('app', host='host', port=24224, buffer_overflow_handler=overflow_handler)

You should handle any exception in handler. fluent-logger ignores exceptions from buffer_overflow_handler.

This handler is also called when pending events exist during close().

Python logging.Handler interface

This client-library also has FluentHandler class for Python logging module.

import logging
from fluent import handler

custom_format = {
  'host': '%(hostname)s',
  'where': '%(module)s.%(funcName)s',
  'type': '%(levelname)s',
  'stack_trace': '%(exc_text)s'
}

logging.basicConfig(level=logging.INFO)
l = logging.getLogger('fluent.test')
h = handler.FluentHandler('app.follow', host='host', port=24224, buffer_overflow_handler=overflow_handler)
formatter = handler.FluentRecordFormatter(custom_format)
h.setFormatter(formatter)
l.addHandler(h)
l.info({
  'from': 'userA',
  'to': 'userB'
})
l.info('{"from": "userC", "to": "userD"}')
l.info("This log entry will be logged with the additional key: 'message'.")

You can also customize formatter via logging.config.dictConfig

import logging.config
import yaml

with open('logging.yaml') as fd:
    conf = yaml.load(fd)

logging.config.dictConfig(conf['logging'])

You can inject your own custom proc to handle buffer overflow in the event of connection failure. This will mitigate the loss of data instead of simply throwing data away.

import msgpack
from io import BytesIO

def overflow_handler(pendings):
    unpacker = msgpack.Unpacker(BytesIO(pendings))
    for unpacked in unpacker:
        print(unpacked)

A sample configuration logging.yaml would be:

logging:
    version: 1

    formatters:
      brief:
        format: '%(message)s'
      default:
        format: '%(asctime)s %(levelname)-8s %(name)-15s %(message)s'
        datefmt: '%Y-%m-%d %H:%M:%S'
      fluent_fmt:
        '()': fluent.handler.FluentRecordFormatter
        format:
          level: '%(levelname)s'
          hostname: '%(hostname)s'
          where: '%(module)s.%(funcName)s'

    handlers:
        console:
            class : logging.StreamHandler
            level: DEBUG
            formatter: default
            stream: ext://sys.stdout
        fluent:
            class: fluent.handler.FluentHandler
            host: localhost
            port: 24224
            tag: test.logging
            buffer_overflow_handler: overflow_handler
            formatter: fluent_fmt
            level: DEBUG
        none:
            class: logging.NullHandler

    loggers:
        amqp:
            handlers: [none]
            propagate: False
        conf:
            handlers: [none]
            propagate: False
        '': # root logger
            handlers: [console, fluent]
            level: DEBUG
            propagate: False

Asynchronous Communication

Besides the regular interfaces - the event-based one provided by sender.FluentSender and the python logging one provided by handler.FluentHandler - there are also corresponding asynchronous versions in asyncsender and asynchandler respectively. These versions use a separate thread to handle the communication with the remote fluentd server. In this way the client of the library won’t be blocked during the logging of the events, and won’t risk going into timeout if the fluentd server becomes unreachable. Also it won’t be slowed down by the network overhead.

The interfaces in asyncsender and asynchandler are exactly the same as those in sender and handler, so it’s just a matter of importing from a different module.

For instance, for the event-based interface:

from fluent import asyncsender as sender

# for local fluent
sender.setup('app')

# for remote fluent
sender.setup('app', host='host', port=24224)

# do your work
...

# IMPORTANT: before program termination, close the sender
sender.close()

or for the python logging interface:

import logging
from fluent import asynchandler as handler

custom_format = {
  'host': '%(hostname)s',
  'where': '%(module)s.%(funcName)s',
  'type': '%(levelname)s',
  'stack_trace': '%(exc_text)s'
}

logging.basicConfig(level=logging.INFO)
l = logging.getLogger('fluent.test')
h = handler.FluentHandler('app.follow', host='host', port=24224, buffer_overflow_handler=overflow_handler)
formatter = handler.FluentRecordFormatter(custom_format)
h.setFormatter(formatter)
l.addHandler(h)
l.info({
  'from': 'userA',
  'to': 'userB'
})
l.info('{"from": "userC", "to": "userD"}')
l.info("This log entry will be logged with the additional key: 'message'.")

...

# IMPORTANT: before program termination, close the handler
h.close()

NOTE: please note that it’s important to close the sender or the handler at program termination. This will make sure the communication thread terminates and it’s joined correctly. Otherwise the program won’t exit, waiting for the thread, unless forcibly killed.

Circular queue mode

In some applications it can be especially important to guarantee that the logging process won’t block under any circumstance, even when it’s logging faster than the sending thread could handle (backpressure). In this case it’s possible to enable the circular queue mode, by passing True in the queue_circular parameter of asyncsender.FluentSender or asynchandler.FluentHandler. By doing so the thread doing the logging won’t block even when the queue is full, the new event will be added to the queue by discarding the oldest one.

WARNING: setting queue_circular to True will cause loss of events if the queue fills up completely! Make sure that this doesn’t happen, or it’s acceptable for your application.

Testing

Testing can be done using nose.

Release

Need wheel package.

$ pip install wheel

After that, type following command:

$ python setup.py clean sdist bdist_wheel upload

Contributors

Patches contributed by those people.

License

Apache License, Version 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mona-fluent-logger-0.0.6.tar.gz (15.3 kB view details)

Uploaded Source

Built Distribution

mona_fluent_logger-0.0.6-py2.py3-none-any.whl (13.0 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file mona-fluent-logger-0.0.6.tar.gz.

File metadata

  • Download URL: mona-fluent-logger-0.0.6.tar.gz
  • Upload date:
  • Size: 15.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/45.1.0 requests-toolbelt/0.9.1 tqdm/4.41.1 CPython/3.7.4

File hashes

Hashes for mona-fluent-logger-0.0.6.tar.gz
Algorithm Hash digest
SHA256 bb3fab43f32d473c8f3e4ba6d533aa1a67933a7666740fe760600bbc3325b1db
MD5 0e3d43bdbe4080d70d1637f7e2bcb987
BLAKE2b-256 8096906ea66c0b56f9834bfa844ab0929683c7b2a0192b9ec1f1c4285d27d3c6

See more details on using hashes here.

File details

Details for the file mona_fluent_logger-0.0.6-py2.py3-none-any.whl.

File metadata

  • Download URL: mona_fluent_logger-0.0.6-py2.py3-none-any.whl
  • Upload date:
  • Size: 13.0 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/45.1.0 requests-toolbelt/0.9.1 tqdm/4.41.1 CPython/3.7.4

File hashes

Hashes for mona_fluent_logger-0.0.6-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 862a592b76ba563bdcc6b7c27e1f8468aee9afd8360ef09f26811f410c013efc
MD5 827fadf8f5ee3886eb61a22e298a98db
BLAKE2b-256 b1816be79869a26a1dfcc5c2721bdd3d0e66b753d67d5b9d5d876681ab048a50

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page