作者:範傳輝
如需轉載請聯繫大資料(ID:hzdashuju)
分佈式行程在Python中依然要用到multiprocessing模塊。multiprocessing模塊不但支持多行程,其中managers子模塊還支持把多行程分佈到多台機器上。可以寫一個服務行程作為調度者,將任務分佈到其他多個行程中,依靠網絡通信進行管理。
舉個例子:在做爬蟲程式時,常常會遇到這樣的場景,我們想抓取某個網站的所有圖片,如果使用多行程的話,一般是一個行程負責抓取圖片的鏈接地址,將鏈接地址存放到Queue中,另外的行程負責從Queue中讀取鏈接地址進行下載和儲存到本地。
現在把這個過程做成分佈式,一臺機器上的行程負責抓取鏈接,其他機器上的行程負責下載儲存。那麼遇到的主要問題是將Queue暴露到網絡中,讓其他機器行程都可以訪問,分佈式行程就是將這一個過程進行了封裝,我們可以將這個過程稱為本地佇列的網絡化。整體過程如圖1-24所示。
▲圖1-24 分佈式行程
要實現上面例子的功能,創建分佈式行程需要分為六個步驟:
-
建立佇列Queue,用來進行行程間的通信。服務行程創建任務佇列task_queue,用來作為傳遞任務給任務行程的通道;服務行程創建結果佇列result_queue,作為任務行程完成任務後回覆服務行程的通道。在分佈式多行程環境下,必須通過由Queuemanager獲得的Queue接口來添加任務。
-
把第一步中建立的佇列在網絡上註冊,暴露給其他行程(主機),註冊後獲得網絡佇列,相當於本地佇列的映像。
-
建立一個物件(Queuemanager(BaseManager))實體manager,系結端口和驗證口令。
-
啟動第三步中建立的實體,即啟動管理manager,監管信息通道。
-
通過管理實體的方法獲得通過網絡訪問的Queue物件,即再把網絡佇列物體化成可以使用的本地佇列。
-
創建任務到“本地”佇列中,自動上傳任務到網絡佇列中,分配給任務行程進行處理。
接下來通過程式實現上面的例子(Linux版),首先編寫的是服務行程(taskManager.py),代碼如下:
import random,time,Queue
from multiprocessing.managers import BaseManager
# 第一步:建立task_queue和result_queue,用來存放任務和結果
task_queue=Queue.Queue()
result_queue=Queue.Queue()
class Queuemanager(BaseManager):
pass
# 第二步:把創建的兩個佇列註冊在網絡上,利用register方法,callable引數關聯了Queue物件,
# 將Queue物件在網絡中暴露
Queuemanager.register('get_task_queue',callable=lambda:task_queue)
Queuemanager.register('get_result_queue',callable=lambda:result_queue)
# 第三步:系結端口8001,設置驗證口令‘qiye’。這個相當於物件的初始化
manager=Queuemanager(address=('',8001),authkey='qiye')
# 第四步:啟動管理,監聽信息通道
manager.start()
# 第五步:通過管理實體的方法獲得通過網絡訪問的Queue物件
task=manager.get_task_queue()
result=manager.get_result_queue()
# 第六步:添加任務
for url in ["ImageUrl_"+i for i in range(10)]:
print 'put task %s ...' %url
task.put(url)
# 獲取傳回結果
print 'try get result...'
for i in range(10):
print 'result is %s' %result.get(timeout=10)
# 關閉管理
manager.shutdown()
任務行程已經編寫完成,接下來編寫任務行程(taskWorker.py),創建任務行程的步驟相對較少,需要四個步驟:
-
使用QueueManager註冊用於獲取Queue的方法名稱,任務行程只能通過名稱來在網絡上獲取Queue。
-
連接服務器,端口和驗證口令註意保持與服務行程中完全一致。
-
從網絡上獲取Queue,進行本地化。
-
從task佇列獲取任務,並把結果寫入result佇列。
程式taskWorker.py代碼(win/linux版)如下:
# coding:utf-8
import time
from multiprocessing.managers import BaseManager
# 創建類似的QueueManager:
class QueueManager(BaseManager):
pass
# 第一步:使用QueueManager註冊用於獲取Queue的方法名稱
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 第二步:連接到服務器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證口令註意保持與服務行程完全一致:
m = QueueManager(address=(server_addr, 8001), authkey='qiye')
# 從網絡連接:
m.connect()
# 第三步:獲取Queue的物件:
task = m.get_task_queue()
result = m.get_result_queue()
# 第四步:從task佇列獲取任務,並把結果寫入result佇列:
while(not task.empty()):
image_url = task.get(True,timeout=5)
print('run task download %s...' % image_url)
time.sleep(1)
result.put('%s--->success'%image_url)
# 處理結束:
print('worker exit.')
最後開始運行程式,先啟動服務行程taskManager.py,運行結果如下:
put task ImageUrl_0 ...
put task ImageUrl_1 ...
put task ImageUrl_2 ...
put task ImageUrl_3 ...
put task ImageUrl_4 ...
put task ImageUrl_5 ...
put task ImageUrl_6 ...
put task ImageUrl_7 ...
put task ImageUrl_8 ...
put task ImageUrl_9 ...
try get result...
接著再啟動任務行程taskWorker.py,運行結果如下:
Connect to server 127.0.0.1...
run task download ImageUrl_0...
run task download ImageUrl_1...
run task download ImageUrl_2...
run task download ImageUrl_3...
run task download ImageUrl_4...
run task download ImageUrl_5...
run task download ImageUrl_6...
run task download ImageUrl_7...
run task download ImageUrl_8...
run task download ImageUrl_9...
worker exit.
當任務行程運行結束後,服務行程運行結果如下:
result is ImageUrl_0--->success
result is ImageUrl_1--->success
result is ImageUrl_2--->success
result is ImageUrl_3--->success
result is ImageUrl_4--->success
result is ImageUrl_5--->success
result is ImageUrl_6--->success
result is ImageUrl_7--->success
result is ImageUrl_8--->success
result is ImageUrl_9--->success
其實這就是一個簡單但真正的分佈式計算,把代碼稍加改造,啟動多個worker,就可以把任務分佈到幾台甚至幾十臺機器上,實現大規模的分佈式爬蟲。
註:由於平臺的特性,創建服務行程的代碼在Linux和Windows上有一些不同,創建工作行程的代碼是一致的。
taskManager.py程式在Windows版下的代碼如下:
# coding:utf-8
# taskManager.py for windows
import Queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
# 任務個數
task_number = 10
# 定義收發佇列
task_queue = Queue.Queue(task_number);
result_queue = Queue.Queue(task_number);
def get_task():
return task_queue
def get_result():
return result_queue
# 創建類似的QueueManager:
class QueueManager(BaseManager):
pass
def win_run():
# Windows下系結呼叫接口不能使用lambda,所以只能先定義函式再系結
QueueManager.register('get_task_queue',callable = get_task)
QueueManager.register('get_result_queue',callable = get_result)
# 系結端口並設置驗證口令,Windows下需要填寫IP地址,Linux下不填預設為本地
manager = QueueManager(address = ('127.0.0.1',8001),authkey = 'qiye')
# 啟動
manager.start()
try:
# 通過網絡獲取任務佇列和結果佇列
task = manager.get_task_queue()
result = manager.get_result_queue()
# 添加任務
for url in ["ImageUrl_"+str(i) for i in range(10)]:
print 'put task %s ...' %url
task.put(url)
print 'try get result...'
for i in range(10):
print 'result is %s' %result.get(timeout=10)
except:
print('Manager error')
finally:
# 一定要關閉,否則會報管道未關閉的錯誤
manager.shutdown()
if __name__ == '__main__':
# Windows下多行程可能會有問題,添加這句可以緩解
freeze_support()
win_run()
關於作者:範傳輝,資深網蟲,Python開發者,參與開發了多項網絡應用,在實際開發中積累了豐富的實戰經驗,並善於總結,貢獻了多篇技術文章廣受好評。研究興趣是網絡安全、爬蟲技術、資料分析、驅動開發等技術。
本文摘編自《Python爬蟲開發與專案實戰》,經出版方授權發佈。
延伸閱讀《Python爬蟲開發與專案實戰》
點擊上圖瞭解及購買
轉載請聯繫微信:DoctorData
推薦語:零基礎學習爬蟲技術,從Python和Web前端基礎開始講起,由淺入深,包含大量案例,實用性強。
朋友會在“發現-看一看”看到你“在看”的內容