對現今的應用程式來說,
multi-thread 是不可或缺的部份。
那麼在 python 中要怎麼 implement multi-thread 呢?
這裡介紹兩個 python 所提供的 module -- thread and threading
thread module
使用 thread module 很像在 win32 的 CreateThread,
只是 CreateThread 換成了 thread.start_new_thread(function, args[, kwargs])
第一個參數就是 thread function,第二個參數是我們要傳進去的資料(tuples),
我們看一個簡單的例子:
#-*- coding: utf-8 -*-
#!/usr/bin/python
import thread
import time
def Threadfun(string, sleeptime, *args):
while(True):
print '{0}_{1}\n'.format(string, sleeptime)
time.sleep(sleeptime)
if __name__ == "__main__":
for i in range(1,5):
thread.start_new_thread(Threadfun, ("ThreadFun", i))
while(True):
print 'MainThread {0}'.format(thread.get_ident())
time.sleep(1)
以上就是很基本的建立多個執行緒的方法。
如果需要同步呢?
我們可以用 Lock Object。
Lock Object 可以藉由呼叫 allocate_lock() 得到。
先看看沒有同步會是什麼情況。
#-*- coding: utf-8 -*-
#!/usr/bin/python
import thread
import time
def Threadfun(string, sleeptime, lock, *args):
while(True):
print 'Enter_{0}\r\n'.format(string)
time.sleep(sleeptime)
print 'Leave_{0}\r\n'.format(string)
if __name__ == "__main__":
lock = thread.allocate_lock()
thread.start_new_thread(Threadfun, ("ThreadFun1", 2, lock))
thread.start_new_thread(Threadfun, ("ThreadFun2", 2, lock))
while (True):
pass
執行的結果如下:
看出來了嗎?一個都還沒離開,另一個就進去了。
使用 Lock Object 來保護:
#-*- coding: utf-8 -*-
#!/usr/bin/python
import thread
import time
def Threadfun(string, sleeptime, lock, *args):
while(True):
lock.acquire()
print 'Enter_{0}\r\n'.format(string)
time.sleep(sleeptime)
print 'Leave_{0}\r\n'.format(string)
lock.release()
if __name__ == "__main__":
lock = thread.allocate_lock()
thread.start_new_thread(Threadfun, ("ThreadFun1", 2, lock))
thread.start_new_thread(Threadfun, ("ThreadFun2", 2, lock))
while (True):
pass
執行結果如下:
這樣才是我們要的結果喔!
p.s. 根據 python 官方文件,thread module在 python 3.0 已經改為 __thread,
可是還是建議使用高階的 threading module
threading module
threading module 裡面包含了以下 components
- Lock object
- RLock object
- Semaphore Object
- Condition Object
- Event Object
- Thread Object
Thread Object wrapper 了 start_new_thread() 這個 function。
當我們呼叫 start() 時,就會自動去呼叫 run() ,
所以我們只能夠 override __init()__ 以及 run() 這兩個 method,
絕對不可以 override start()。
#-*- coding: utf-8 -*-
#!/usr/bin/python
from threading import Thread
import time
class MyThread(Thread):
def __init__(self, string, sleeptime):
Thread.__init__(self)
self.sleeptime = sleeptime
self.setName(str(sleeptime))
def run(self):
while(True):
print 'Threadfun_{0}\r\n'.format(self.getName())
time.sleep(self.sleeptime)
if __name__ == "__main__":
thrList = [MyThread('ThreadFun', i) for i in range(1,5)]
# thrList[0]~thrList[3]
for i in range(0,4):
thrList[i].start()
# another way
#for i in range(1,5):
# MyThread('ThreadFun', i).start()
另外一種作法是把 thread function 當參數,在建構物件時傳入,
那麼一樣在呼叫 start() 是,這個函式就會被執行。
threading.Condition
Condition 也是一種鎖,
也就是說它也提供 acquire/release 方法。
除此之外,
它還可以wait/notify/notifyAll,
但是要注意的一點是,
wait/notify/notifyAll 一定要跟 acquire/release 一起使用,
否則就會拋出 RuntimeError 異常。
我們可以用一個簡單的 productor/consumer 模型來當作範例。
#!/usr/bin/python # -*- coding:utf-8 -*- from threading import * import time class itemX: def __init__(self): self.cnt = 0 def produce(self, num=1): self.cnt += 1 def consume(self, num=1): if self.cnt: self.cnt -= 1 else: print 'WARNING***********************WARNING' def isEmpty(self): return not self.cnt def getCount(self): return self.cnt class Producer(Thread): def __init__(self, condition, item, sleeptime=2): Thread.__init__(self) self.con = condition self.item = item self.sleeptime = sleeptime def run(self): while (True): time.sleep(self.sleeptime) self.con.acquire() self.item.produce() print 'produce 1 product\r\n' print self.item.getCount() self.con.notifyAll() self.con.release() class Consumer(Thread): def __init__(self, condition, item, sleeptime=2): Thread.__init__(self) self.con = condition self.item = item self.sleeptime = sleeptime def run(self): while (True): time.sleep(self.sleeptime) self.con.acquire() print '({0})enter'.format(self.getName()) while self.item.isEmpty(): print '({0})wait'.format(self.getName()) self.con.wait() self.item.consume() print '({0})consume 1 product\r\n'.format(self.getName()) print self.item.getCount() self.con.release() if __name__ == "__main__": X = itemX() cond = Condition() Producer(cond, X).start() Consumer(cond, X).start() Consumer(cond, X).start() while (True): pass
threading.Event
event 是很常用的同步機制,
我們先用 event 改寫上一個例子,
再來討論它有什麼要注意的地方。
#!/usr/bin/python
# -*- coding:utf-8 -*-
from threading import *
import time
class itemX:
def __init__(self):
self.cnt = 0
def produce(self, num=1):
self.cnt += 1
def consume(self, num=1):
if self.cnt:
self.cnt -= 1
else:
print 'WARNING***********************WARNING'
def isEmpty(self):
return not self.cnt
def getCount(self):
return self.cnt
class Producer(Thread):
def __init__(self, condition, event, item, sleeptime=1):
Thread.__init__(self)
self.con = condition
self.event = event
self.item = item
self.sleeptime = sleeptime
def run(self):
while (True):
time.sleep(self.sleeptime)
self.con.acquire()
self.item.produce()
print 'produce 1 product, remain({0})\r\n'.format(self.item.getCount())
self.event.set()
self.con.release()
class Consumer(Thread):
def __init__(self, condition, event, item, sleeptime=1):
Thread.__init__(self)
self.con = condition
self.event = event
self.item = item
self.sleeptime = sleeptime
def run(self):
while (True):
time.sleep(self.sleeptime)
self.con.acquire()
print '({0})enter\r\n'.format(self.getName())
#while self.item.isEmpty():
while (True):
print '({0})wait'.format(self.getName())
self.event.wait()
break
self.item.consume()
self.event.clear()
print '({0})consume 1 product, remain({1})\r\n'.format(self.getName(), self.item.getCount())
self.con.release()
if __name__ == "__main__":
X = itemX()
cond_Con = Condition()
cond_Pro = Condition()
event = Event()
Producer(cond_Pro, event, X).start()
Consumer(cond_Con, event, X).start()
Consumer(cond_Con, event, X).start()
while (True):
pass
首先,
event 的 block 機制跟 condition 是很不一樣的,
對condition來說,notify 是一個通知,
有人在等,通知才有意義,
如果沒有人等,通知是沒有意義的。
notify 和 wait 是有序的,
就是說,假設通知先發生了,
這時候才有人開始等,
那麼是絕對等不到已經發生的那一個通知。
可是 event 不一樣,event 是一種狀態。
當 event 被設定了,
它就處於激發狀態,
只要狀態沒有改變,
任何時候都可以等得到。
為了強迫 consumer 一定要等到 event 才能 consume,
我們把上一個範例的 "while self.item.isEmpty():" 改成 "while (True):"
當然 Lock(Critical Section) 的保護也是必要的,
才不會發生多個 consumers 同時 consume 的情況(race condition)。
producer 的 Lock 在這個範例可以不要,
但如果有多個 producers 就一定要加,
特別要注意的一點是,
producer 用的 Lock 和 consumer 的必須要不一樣,
不然會出現 deadlock。
還有就是這邊的 event 是 manual reset,
也就是激發後必須手動呼叫 clear() 方法使其回到未激發狀態。
threading.Timer
Timer 也是 threading 的一個元件,
可以在指定的時間間隔後,執行某一個動作(函式),
例如:
#-*- coding: utf-8 -*-
#!/usr/bin/python
from threading import Time
def hello(msg):
print msg
t = Timer(3, hello, ['Hello world'])
t.start()
Pooling Threads
最後想要討論一個有趣的東西。
不同的作業系統對可執行的 thread 限制都不一樣,
有時候我們擔心一次建立太多 thread 會造成系統(或程式)效能變差,
比如說我們只想建立兩個 threads,可是我們有十件工作要做,
那麼我們就可以用排程的概念來實做。
在程式的一開始先把兩個 thread 建好 (thread pool),
然後利用 python 的 Queue module,
把十件工作的資料都 put 進 Queue。
在 thread function 裡面會去 get Queue 的資料,
只要 get 到,thread 就會開始工作。
#!/usr/bin/python
#-*- coding:utf-8 -*-
from threading import *
import Queue
import time
class MyThread(Thread):
def __init__(self, condition):
Thread.__init__(self)
self.cond = condition
def run(self):
print '{0} start\r\n'.format(self.getName())
global cnt
while (True):
id = threadPool.get()
if id != None:
self.cond.acquire()
print '{0}_{1}'.format(self.getName(), id)
for x in xrange(101):
cnt += x
time.sleep(2)
print 'cnt = {0}\r\n'.format(cnt)
cnt = 0
self.cond.release()
threadPool.task_done()
threadPool = Queue.Queue(0)
condition = Condition()
cnt = 0
for i in xrange(2):
MyThread(condition).start()
for i in xrange(10):
threadPool.put(i)
threadPool.join()
print 'done'
在 thread function 最後呼叫 task_done() 是為了讓 Queue 知道這一個工作已經完成,
是給 Queue.join() 作為參考。
如果這些工作會存取到相同的資源,
還是記得要用 Lock 保護。
Queue module 是 thread safe,所以這樣的應用是沒有問題的。
沒有留言:
張貼留言