Ticket #1237: pool.py

File pool.py, 5.6 KB (added by junzhang.jn@…, 19 years ago)
Line 
1# -*- coding: gb2312 -*-
2#
3# author: junzhang.jn@gmail.com
4
5import threading,thread
6import time
7import sys,traceback
8
9class DBPoolWithThread:
10 # @param factory PooledObjectFactory object create factory
11 # @param freetime integer when object's idle time > freetime , check_thread will destory it
12 # @param threadsafety integer like dbapi2.0£¬
13 # 0 = Threads may not share the module.
14 # 1 = Threads may share the module, but not connections.
15 # 2 = Threads may share the module and connections.
16 # 3 = Threads may share the module, connections and cursors.
17 def __init__( self , factory , freetime = 30 , threadsafety = 1 ):
18 self.factory = factory
19 self.threadsafety = threadsafety
20 if threadsafety == 0:
21 raise RuntimeError( 'threadsafety is %d , you cannot use connectionpool for this db driver' % threadsafety )
22 self.idlepool = {} # { threadid: [] , }
23 self.lock = threading.Lock()
24 self.freetime = freetime
25 self.idleTime = {} # { obj: ( threadid , time ) , }
26 self.deleted = False
27 self.exitEvent = threading.Event()
28 self.thread = threading.Thread( target=DBPoolWithThread.check_thread , args = ( self , ) )
29 self.thread.start()
30
31 def __container_get_obj( self ):
32 try:
33 self.lock.acquire()
34 threadid = thread.get_ident()
35 obj = None
36 try:
37 # 1.1 get object in current thread
38 li = None
39 try:
40 li = self.idlepool[ threadid ]
41 # 1.2 get first object
42 obj = li.pop(0)
43 if len( li ) == 0:
44 del self.idlepool[ threadid ]
45 except KeyError , e:
46 # not find object list in current thread
47 if self.threadsafety == 1:
48 return None
49 except IndexError , e :
50 # this exception should not occured.
51 del self.idlepool[ threadid ]
52
53 # 1.3 not found object in current thread
54 if obj == None and self.threadsafety > 1:
55 # 2 get object from other thread, threadsafety MUST > 1
56 for key , cons in self.idlepool.items():
57 obj = cons.pop(0)
58 if len( cons ) == 0:
59 del self.idlepool[ key ]
60 break
61 except :
62 pass
63 if obj and self.idleTime.has_key( obj ):
64 del self.idleTime[ obj ] # clear idle timer for return object
65 return obj
66 finally:
67 self.lock.release()
68
69 def __container_put_obj( self , obj ):
70 try:
71 self.lock.acquire()
72 threadid = thread.get_ident()
73 try:
74 li = self.idlepool[ threadid ]
75 except KeyError , e:
76 li = []
77 self.idlepool[ threadid ] = li
78 li.append( obj )
79 self.idleTime[ obj ] = ( threadid , time.time() )
80 finally:
81 self.lock.release()
82
83 def borrow_object( self ):
84 i = 0
85 while i < 5:
86 obj = self.__container_get_obj()
87
88 if not obj :
89 obj = self.factory.create_object()
90 break
91
92 if self.factory.validate_object( obj ):
93 # object is valid
94 break
95 else:
96 # not valid
97 self.factory.destroy_object( obj )# destroy invalid object
98 obj = None
99 i += 1
100 return obj
101
102 def return_object( self , obj ):
103 if obj:
104 self.__container_put_obj( obj )
105
106 def __del__( self ):
107 try:
108 self.lock.acquire()
109 self.exitEvent.set()
110 for key , cons in self.idlepool.items():
111 for obj in cons:
112 self.factory.destroy_object( obj )
113 finally:
114 self.lock.release()
115
116 def check_thread( this ):
117 while True:
118 this.exitEvent.wait( this.freetime ) # wait interval time
119 if this.exitEvent.isSet():
120 break
121 if this.freetime:
122 this.lock.acquire()
123 try:
124 threadid = thread.get_ident()
125 for k,v in this.idleTime.items():
126 if time.time() - v[1] > this.freetime: # timeout will destroy
127 li = this.idlepool[ v[0] ]
128 li.remove( k )
129 this.factory.destroy_object( k )
130 if len( li ) == 0:
131 del this.idlepool[ v[0] ] # delete list
132 del this.idleTime[ k ]
133 finally:
134 this.lock.release()
135 check_thread = staticmethod( check_thread )
136
137# object create factory interface
138class PooledObjectFactory:
139 def __init__( self ):
140 pass
141
142 def create_object( self ):
143 raise NotImplementedError()
144
145 def destroy_object( self , obj ):
146 raise NotImplementedError()
147
148 def validate_object( self , obj ):
149 raise NotImplementedError()
Back to Top