Monday 13 September 2010

QueueHandler and ZeroMQ support

A new handler has been added to logging for Python 3.2: QueueHandler . This allows you to send logging events to a queue of some description. For users of earlier versions of Python, here is the latest code from the Py3K branch:

class QueueHandler(logging.Handler): 
    """ 
    This handler sends events to a queue. Typically, it would be used together 
    with a multiprocessing Queue to centralise logging to file in one process 
    (in a multi-process application), so as to avoid file write contention 
    between processes. 
 
    This code is new in Python 3.2, but this class can be copy pasted into 
    user code for use with earlier Python versions. 
    """ 
 
    def __init__(self, queue): 
        """ 
        Initialise an instance, using the passed queue. 
        """ 
        logging.Handler.__init__(self) 
        self.queue = queue 
 
    def enqueue(self, record): 
        """ 
        Enqueue a record. 
 
        The base implementation uses put_nowait. You may want to override 
        this method if you want to use blocking, timeouts or custom queue 
        implementations. 
        """ 
        self.queue.put_nowait(record) 
 
    def emit(self, record): 
        """ 
        Emit a record. 
 
        Writes the LogRecord to the queue, preparing it for pickling first. 
        """ 
        try: 
            # The format operation gets traceback text into record.exc_text 
            # (if there's exception data), and also puts the message into 
            # record.message. We can then use this to replace the original 
            # msg + args, as these might be unpickleable. We also zap the 
            # exc_info attribute, as it's no longer needed and, if not None, 
            # will typically not be pickleable. 
            self.format(record) 
            record.msg = record.message 
            record.args = None 
            record.exc_info = None 
            self.enqueue(record) 
        except (KeyboardInterrupt, SystemExit): 
            raise 
        except: 
            self.handleError(record) 

This code is perfectly usable in earlier Python versions, including 2.x - just copy and paste it into your own code. In addition to usage with queues from the queue and multiprocessing modules (as described in this earlier post), the QueueHandler makes it easy to support other queue-like objects, such as ZeroMQ sockets.

In the example below, a PUBLISH socket is created separately and passed to the handler (as its ‘queue’):

import zmq # using pyzmq, the Python binding for ZeroMQ 
import json # for serializing records portably 
 
ctx = zmq.Context() 
sock = zmq.Socket(ctx, zmq.PUB) # or zmq.PUSH, or other suitable value 
sock.bind('tcp://*:5556') # or wherever 
 
class ZeroMQSocketHandler(QueueHandler): 
    def enqueue(self, record): 
        data = json.dumps(record.__dict__) 
        self.queue.send(data) 

handler = ZeroMQSocketHandler(sock)

Of course there are other ways of organizing this, for example passing in the data needed by the handler to create the socket:

class ZeroMQSocketHandler(QueueHandler): 
    def __init__(self, uri, socktype=zmq.PUB, ctx=None): 
        self.ctx = ctx or zmq.Context() 
        socket = zmq.Socket(self.ctx, socktype) 
        socket.bind(uri) 
        QueueHandler.__init__(self, socket) 
 
    def enqueue(self, record): 
        data = json.dumps(record.__dict__) 
        self.queue.send(data) 
 
    def close(self): 
        self.queue.close() 

To test this out, put this together into a little test script (imports not shown, but you can get the working script here):

def main(): 
    print('Enter messages to send:') 
    h = ZeroMQSocketHandler('tcp://*:5556') 
    logger = logging.getLogger() 
    logger.addHandler(h) 
    try: 
        while True: 
            s = raw_input('> ') 
            logger.warning(s) 
    finally: 
        logger.removeHandler(h) 
        h.close() 

For the receiving end, you can use a simple script like this:

import json 
import pprint 
import zmq 
 
URI = 'tcp://localhost:5556' 
 
def main(): 
    print('Receiving on a SUB socket: %s' % URI) 
    ctx = zmq.Context() 
    sock = zmq.Socket(ctx, zmq.SUB) 
    sock.setsockopt(zmq.SUBSCRIBE, '') 
    sock.connect(URI) 
    try: 
        while True: 
            msg = sock.recv() 
            data = json.loads(msg) 
            pprint.pprint(data) 
            print('-'*40) 
    finally: 
        sock.close() 
 
if __name__ == '__main__': 
    main() 

And the output would be something like:
Receiving on a SUB socket: tcp://localhost:5556
{u'args': None,
 u'created': 1284446528.9099669,
 u'exc_info': None,
 u'exc_text': None,
 u'filename': u'zmqlog.py',
 u'funcName': u'main',
 u'levelname': u'WARNING',
 u'levelno': 30,
 u'lineno': 78,
 u'message': u"It's easy to log to ZeroMQ!",
 u'module': u'zmqlog',
 u'msecs': 909.96694564819336,
 u'msg': u"It's easy to log to ZeroMQ!",
 u'name': u'root',
 u'pathname': u'zmqlog.py',
 u'process': 13647,
 u'processName': u'MainProcess',
 u'relativeCreated': 521204.14185523987,
 u'thread': -1215568192,
 u'threadName': u'MainThread'}
----------------------------------------

3 comments:

  1. This is great! I have written my own handler before for writing to a "multiprocessing" queue, but 0MQ has grabbed my attention as a better direction for putting together the pieces of my application.

    ReplyDelete
  2. HI, this is nice. Also, pyzmq ships with log handlers for the logging module:

    http://github.com/zeromq/pyzmq/blob/master/zmq/log/handlers.py

    Cheers

    ReplyDelete
  3. @Brian: Thanks, I'd missed that.

    ReplyDelete