Python多線程與Queue隊列多線程在感官上類似於同時執行多個程序,雖然由於GIL的存在,在Python中無法實現線程的真正并行,但是對於某些場景,多線程仍不失為一個有效的處理方法:
1,不緊急的,無需阻塞主線程的任務,此時可以利用多線程在後台慢慢處理;
2,IO密集型操作,比如文件讀寫、用戶輸入和網絡請求等,此時多線程可以近似達到甚至優於多進程的表現;
多線程的基本使用不再贅述,以下語法便可輕鬆實現:
1 def task(args1, args2):
2 pass
3
4 Thread(
5 target=task,
6 args=(args1, args2)
7 ).start()
這裏我們重點關注線程通信。
假設有這麼一種場景:有一批源數據,指定一個操作係數N,需要分別對其進行與N的加減乘除操作,並將結果匯總。
當然這裏的加減乘除只是一種簡單處理,在實際的生產環境中,它其實代表了一步較為複雜的業務操作,並包含了較多的IO處理。
自然我們想到可以開啟多線程處理,那麼緊接着的問題便是:如何劃分線程,是根據處理步驟劃分,還是根據源數據劃分?
對於前者,我們把涉及的業務操作單獨劃分位一個線程,即有4個線程分別進行加減乘除的操作,顯然上一個線程的結果是下一個線程的輸入,這類似於流水線操作;
而後者則是把源數據分為若干份,每份啟動一個線程進行處理,最終把結果匯總。一般來說,我們推薦第一種方式。因為在一個線程中完成所有的操作不如每步一個線程清晰明了,
尤其是在一些複雜的場景下,會加大單個線程的出錯概率和測試難度。
那麼我們將開闢4個線程,分別執行加減乘除操作。最後一個除法線程結束則任務完成:
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from Queue import Queue
5 from threading import Thread
6
7
8 class NumberHandler(object):
9 def __init__(self, n):
10 self.n = n
11
12 def add(self, num):
13 return num + self.n
14
15 def subtract(self, num):
16 return num - self.n
17
18 def multiply(self, num):
19 return num * self.n * self.n
20
21 def divide(self, num):
22 return num / self.n
23
24
25 class ClosableQueue(Queue):
26 SENTINEL = object()
27
28 def close(self):
29 self.put(self.SENTINEL)
30
31 def __iter__(self):
32 while True:
33 item = self.get()
34 try:
35 if item is self.SENTINEL:
36 return
37 yield item
38 finally:
39 self.task_done()
40
41
42 class StoppableWorker(Thread):
43 def __init__(self, func, in_queue, out_queue):
44 super(StoppableWorker, self).__init__()
45 self.in_queue = in_queue
46 self.out_queue = out_queue
47 self.func = func
48
49 def run(self):
50 for item in self.in_queue:
51 result = self.func(item)
52 self.out_queue.put(result)
53 print self.func
54
55
56 if __name__ == '__main__':
57 source_queue = ClosableQueue()
58 add_queue = ClosableQueue()
59 subtract_queue = ClosableQueue()
60 multiply_queue = ClosableQueue()
61 divide_queue = ClosableQueue()
62 result_queue = ClosableQueue()
63
64 number_handler = NumberHandler(5)
65
66 threads = [
67 StoppableWorker(number_handler.add, add_queue, subtract_queue),
68 StoppableWorker(number_handler.subtract, subtract_queue, multiply_queue),
69 StoppableWorker(number_handler.multiply, multiply_queue, divide_queue),
70 StoppableWorker(number_handler.divide, divide_queue, result_queue),
71 ]
72
73 for _thread in threads:
74 _thread.start()
75
76 for i in range(10):
77 add_queue.put(i)
78
79 add_queue.close()
80 add_queue.join()
81 print 'add job done...'
82 subtract_queue.close()
83 subtract_queue.join()
84 print 'subtract job done...'
85 multiply_queue.close()
86 multiply_queue.join()
87 print 'multiply job done...'
88 divide_queue.close()
89 divide_queue.join()
90 print 'divide job done...'
91 result_queue.close()
92
93 print "%s items finished, result: %s" % (result_queue.qsize(), result_queue)
94
95 for i in result_queue:
96 print i
運行結果:
線程執行日誌:
總的結果:
可見線程交叉運行,但是任務卻是順序結束,這符合我們的預期。
值得注意的是,我們在ClosableQueue定義了一個close()方法,通過放入一個特殊的類變量SENTINEL告訴隊列應該關閉。此外,由於直接加減乘除結果不變,因此我特意乘了兩次來便於我們判斷結果。
總結:
1. Queue是一種高效的任務處理方式,它可以把任務處理流程劃分為若干階段,並使用多條python線程來同時執行這些子任務;
2. Queue類具備阻塞式的隊列操作、能夠指定緩衝區尺寸,而且還支 持join方法,這使得開發者可以構建出健壯的流水線。
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※帶您來了解什麼是 USB CONNECTOR ?
※自行創業 缺乏曝光? 下一步”網站設計“幫您第一時間規劃公司的門面形象
※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!
※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化
※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益
※試算大陸海運運費!