Never been to DZone Snippets before?

Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world

Thread Pool and Threaded Python decorators (See related posts)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim:ts=4:sw=4:et
# (c) 2007 under terms of LGPL v 2.1
# by Vsevolod S. Balashov <vsevolod@balashov.name> 

from threading import Thread

def threaded(func):
    def proxy(*args, **kwargs):
        thread = Thread(target=func, args=args, kwargs=kwargs)
        thread.start()
        return thread
    return proxy

from Queue import Queue

class Pool(Queue):
    def __init__(self, maxsize):
        assert maxsize > 0, 'maxsize > 0 required for Pool class'
        Queue.__init__(self, maxsize)
        for i in range(maxsize):
            thread = Thread(target = self._worker)
            thread.setDaemon(True)
            thread.start()

    def _worker(self):
        while True:
            try:
                func, args, kwargs = self.get()
                func(*args, **kwargs)
            except:
                self.task_done()
                self.join()
                raise
            self.task_done()

    def addJob(self, func, *args, **kwargs):
        self.put((func, args, kwargs))

    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_value, traceback):
        self.join()

def threadpool(pool):
    assert pool.__class__ == Pool, 'threadpool decorator require a Pool object'
    def decorator(func):
        def proxy(*args, **kwargs):
            pool.put((func, args, kwargs))
            return pool
        return proxy
    return decorator


example usage

from threadpool import *

from time import sleep
from random import random

pool = Pool(3)
    
@threadpool(pool)
def test_threadpool(i):
    print 'threadpool %i enter' % i
    sleep(random())
    print 'threadpool %i exit' % i

print 'threadpool example'
for i in range(6):
    test_threadpool(i)
pool.join()
print 'done'
print ''

@threaded
def test_threaded(i):
    print 'threaded %i enter' % i
    sleep(random())
    print 'threaded %i exit' % i

print 'threaded example'
for i in range(5):
    test_threaded(i)
print 'done'


result output

threadpool example
threadpool 0 enter
threadpool 1 enter
threadpool 2 enter
threadpool 0 exit
threadpool 3 enter
threadpool 1 exit
threadpool 4 enter
threadpool 2 exit
threadpool 5 enter
threadpool 3 exit
threadpool 5 exit
threadpool 4 exit
done

threaded example
done
threaded 0 enter
threaded 1 enter
threaded 2 enter
threaded 3 enter
threaded 4 enter
threaded 1 exit
threaded 2 exit
threaded 3 exit
threaded 0 exit
threaded 4 exit

Comments on this post

jumbo posts on Sep 10, 2007 at 16:49
What version of python? This is not working on python 2.4.3. Error pool:

pool.join()
AttributeError: Pool instance has no attribute 'join'

You need to create an account or log in to post comments to this site.


Click here to browse all 5140 code snippets

Related Posts