Never been to DZone Snippets before?

Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world

About this user

Vsevolod Balashov

« Newer Snippets
Older Snippets »
Showing 1-1 of 1 total  RSS 

Thread Pool and Threaded Python decorators

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim:ts=4:sw=4:et
# (c) 2007 under terms of LGPL v 2.1
# by Vsevolod S. Balashov <vsevolod@balashov.name> 

from threading import Thread

def threaded(func):
    def proxy(*args, **kwargs):
        thread = Thread(target=func, args=args, kwargs=kwargs)
        thread.start()
        return thread
    return proxy

from Queue import Queue

class Pool(Queue):
    def __init__(self, maxsize):
        assert maxsize > 0, 'maxsize > 0 required for Pool class'
        Queue.__init__(self, maxsize)
        for i in range(maxsize):
            thread = Thread(target = self._worker)
            thread.setDaemon(True)
            thread.start()

    def _worker(self):
        while True:
            try:
                func, args, kwargs = self.get()
                func(*args, **kwargs)
            except:
                self.task_done()
                self.join()
                raise
            self.task_done()

    def addJob(self, func, *args, **kwargs):
        self.put((func, args, kwargs))

    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_value, traceback):
        self.join()

def threadpool(pool):
    assert pool.__class__ == Pool, 'threadpool decorator require a Pool object'
    def decorator(func):
        def proxy(*args, **kwargs):
            pool.put((func, args, kwargs))
            return pool
        return proxy
    return decorator


example usage

from threadpool import *

from time import sleep
from random import random

pool = Pool(3)
    
@threadpool(pool)
def test_threadpool(i):
    print 'threadpool %i enter' % i
    sleep(random())
    print 'threadpool %i exit' % i

print 'threadpool example'
for i in range(6):
    test_threadpool(i)
pool.join()
print 'done'
print ''

@threaded
def test_threaded(i):
    print 'threaded %i enter' % i
    sleep(random())
    print 'threaded %i exit' % i

print 'threaded example'
for i in range(5):
    test_threaded(i)
print 'done'


result output

threadpool example
threadpool 0 enter
threadpool 1 enter
threadpool 2 enter
threadpool 0 exit
threadpool 3 enter
threadpool 1 exit
threadpool 4 enter
threadpool 2 exit
threadpool 5 enter
threadpool 3 exit
threadpool 5 exit
threadpool 4 exit
done

threaded example
done
threaded 0 enter
threaded 1 enter
threaded 2 enter
threaded 3 enter
threaded 4 enter
threaded 1 exit
threaded 2 exit
threaded 3 exit
threaded 0 exit
threaded 4 exit
« Newer Snippets
Older Snippets »
Showing 1-1 of 1 total  RSS