Skip to main content
This is a pre-production deployment of Warehouse. Changes made here affect the production instance of PyPI (
Help us improve Python packaging - Donate today!

A Python logging handler for Fluentd event collector

Project Description

WARNING: This is a fork of the project to work with asyncio.

Many web/mobile applications generate huge amount of event logs (c,f. login, logout, purchase, follow, etc). To analyze these event logs could be really valuable for improving the service. However, the challenge is collecting these logs easily and reliably.

Fluentd solves that problem by having: easy installation, small footprint, plugins, reliable buffering, log forwarding, etc.

aiofluent is a Python library, to record the events from Python application.


  • Python 3.5 or greater
  • msgpack-python


This library is distributed as ‘aiofluent’ python package. Please execute the following command to install it.

$ pip install aiofluent


Fluentd daemon must be launched with a tcp source configuration:

  type forward
  port 24224

To quickly test your setup, add a matcher that logs to the stdout:

<match app.**>
  type stdout


FluentSender Interface

sender.FluentSender is a structured event logger for Fluentd.

By default, the logger assumes fluentd daemon is launched locally. You can also specify remote logger by passing the options.

from aiofluent import sender

# for local fluent
logger = sender.FluentSender('app')

# for remote fluent
logger = sender.FluentSender('app', host='host', port=24224)

For sending event, call emit method with your event. Following example will send the event to fluentd, with tag ‘app.follow’ and the attributes ‘from’ and ‘to’.

# Use current time
logger.emit('follow', {'from': 'userA', 'to': 'userB'})

# Specify optional time
cur_time = int(time.time())
logger.emit_with_time('follow', cur_time, {'from': 'userA', 'to':'userB'})

You can detect an error via return value of emit. If an error happens in emit, emit returns False and get an error object using last_error method.

if not logger.emit('follow', {'from': 'userA', 'to': 'userB'}):
    logger.clear_last_error() # clear stored error after handled errors

If you want to shutdown the client, call close() method.


Event-Based Interface

This API is a wrapper for sender.FluentSender.

First, you need to call sender.setup() to create global sender.FluentSender logger instance. This call needs to be called only once, at the beginning of the application for example.

Initialization code of Event-Based API is below:

from aiofluent import sender

# for local fluent

# for remote fluent
sender.setup('app', host='host', port=24224)

Then, please create the events like this. This will send the event to fluentd, with tag ‘app.follow’ and the attributes ‘from’ and ‘to’.

from aiofluent import event

# send event to fluentd, with 'app.follow' tag
event.Event('follow', {
  'from': 'userA',
  'to':   'userB'

event.Event has one limitation which can’t return success/failure result.

Other methods for Event-Based Interface.

sender.get_global_sender # get instance of global sender
sender.close # Call FluentSender#close

Handler for buffer overflow

You can inject your own custom proc to handle buffer overflow in the event of connection failure. This will mitigate the loss of data instead of simply throwing data away.

import msgpack
from io import BytesIO

def handler(pendings):
    unpacker = msgpack.Unpacker(BytesIO(pendings))
    for unpacked in unpacker:

logger = sender.FluentSender('app', host='host', port=24224, buffer_overflow_handler=handler)

You should handle any exception in handler. aiofluent ignores exceptions from buffer_overflow_handler.

This handler is also called when pending events exist during close().

Python logging.Handler interface

This client-library also has FluentHandler class for Python logging module.

import logging
from aiofluent import handler

custom_format = {
  'host': '%(hostname)s',
  'where': '%(module)s.%(funcName)s',
  'type': '%(levelname)s',
  'stack_trace': '%(exc_text)s'

l = logging.getLogger('fluent.test')
h = handler.FluentHandler('app.follow', host='host', port=24224)
formatter = handler.FluentRecordFormatter(custom_format)
  'from': 'userA',
  'to': 'userB'
})'{"from": "userC", "to": "userD"}')"This log entry will be logged with the additional key: 'message'.")

You can also customize formatter via logging.config.dictConfig

import logging.config
import yaml

with open('logging.yaml') as fd:
    conf = yaml.load(fd)


A sample configuration logging.yaml would be:

    version: 1

        format: '%(message)s'
        format: '%(asctime)s %(levelname)-8s %(name)-15s %(message)s'
        datefmt: '%Y-%m-%d %H:%M:%S'
        '()': fluent.handler.FluentRecordFormatter
          level: '%(levelname)s'
          hostname: '%(hostname)s'
          where: '%(module)s.%(funcName)s'

            class : logging.StreamHandler
            level: DEBUG
            formatter: default
            stream: ext://sys.stdout
            class: fluent.handler.FluentHandler
            host: localhost
            port: 24224
            tag: test.logging
            formatter: fluent_fmt
            level: DEBUG
            class: logging.NullHandler

            handlers: [none]
            propagate: False
            handlers: [none]
            propagate: False
        '': # root logger
            handlers: [console, fluent]
            level: DEBUG
            propagate: False


Apache License, Version 2.0

1.1.3 (2018-02-15)

  • Lock calling the close method of sender [vangheem]
  • Increase default timeout [vangheem]

1.1.2 (2018-02-07)

  • lock the whole method [vangheem]

1.1.1 (2018-02-07)

  • Use lock on getting connection object [vangheem]

1.1.0 (2018-01-25)

  • Move to using asyncio connection infrastructure instead of sockets [vangheem]

1.0.8 (2018-01-04)

  • Always close out buffer data [vangheem]

1.0.7 (2018-01-04)

  • Handle errors processing log queue [vangheem]

1.0.6 (2017-11-14)

  • Prevent log queue from getting too large [vangheem]

1.0.5 (2017-10-17)

  • Fix release to include CHANGELOG.rst file [vangheem]

1.0.4 (2017-10-10)

  • Fix pushing initial record

1.0.3 (2017-10-10)

  • Handle Runtime error when logging done before event loop started [vangheem]

1.0.2 (2017-10-09)

  • Fix to make normal logging call async [vangheem]

1.0.1 (2017-07-03)

  • initial release

Release History

This version
History Node


History Node


History Node


History Node


History Node


History Node


History Node


History Node


History Node


History Node


History Node


History Node


History Node


Download Files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Filename, Size & Hash SHA256 Hash Help File Type Python Version Upload Date
(11.5 kB) Copy SHA256 Hash SHA256
Wheel py2.py3 Feb 15, 2018
(12.8 kB) Copy SHA256 Hash SHA256
Source None Feb 15, 2018

Supported By

Elastic Elastic Search Pingdom Pingdom Monitoring Dyn Dyn DNS Sentry Sentry Error Logging CloudAMQP CloudAMQP RabbitMQ Heroku Heroku PaaS Kabu Creative Kabu Creative UX & Design Fastly Fastly CDN DigiCert DigiCert EV Certificate Google Google Cloud Servers