歡迎光臨
每天分享高質量文章

想用Python爬小姐姐圖片?那你得先搞定分佈式行程

導讀:分佈式行程指的是將Process行程分佈到多台機器上,充分利用多台機器的性能完成複雜的任務。我們可以將這一點應用到分佈式爬蟲的開發中。

 

 

作者:範傳輝

如需轉載請聯繫大資料(ID:hzdashuju)

 

分佈式行程在Python中依然要用到multiprocessing模塊。multiprocessing模塊不但支持多行程,其中managers子模塊還支持把多行程分佈到多台機器上。可以寫一個服務行程作為調度者,將任務分佈到其他多個行程中,依靠網絡通信進行管理。

舉個例子:在做爬蟲程式時,常常會遇到這樣的場景,我們想抓取某個網站的所有圖片,如果使用多行程的話,一般是一個行程負責抓取圖片的鏈接地址,將鏈接地址存放到Queue中,另外的行程負責從Queue中讀取鏈接地址進行下載和儲存到本地。

現在把這個過程做成分佈式,一臺機器上的行程負責抓取鏈接,其他機器上的行程負責下載儲存。那麼遇到的主要問題是將Queue暴露到網絡中,讓其他機器行程都可以訪問,分佈式行程就是將這一個過程進行了封裝,我們可以將這個過程稱為本地佇列的網絡化。整體過程如圖1-24所示。

 

▲圖1-24 分佈式行程

 

要實現上面例子的功能,創建分佈式行程需要分為六個步驟:

 

  1. 建立佇列Queue,用來進行行程間的通信。服務行程創建任務佇列task_queue,用來作為傳遞任務給任務行程的通道;服務行程創建結果佇列result_queue,作為任務行程完成任務後回覆服務行程的通道。在分佈式多行程環境下,必須通過由Queuemanager獲得的Queue接口來添加任務。

  2. 把第一步中建立的佇列在網絡上註冊,暴露給其他行程(主機),註冊後獲得網絡佇列,相當於本地佇列的映像。

  3. 建立一個物件(Queuemanager(BaseManager))實體manager,系結端口和驗證口令。

  4. 啟動第三步中建立的實體,即啟動管理manager,監管信息通道。

  5. 通過管理實體的方法獲得通過網絡訪問的Queue物件,即再把網絡佇列物體化成可以使用的本地佇列。

  6. 創建任務到“本地”佇列中,自動上傳任務到網絡佇列中,分配給任務行程進行處理。

 

接下來通過程式實現上面的例子(Linux版),首先編寫的是服務行程(taskManager.py),代碼如下:

 

import random,time,Queue
from multiprocessing.managers import BaseManager
# 第一步:建立task_queue和result_queue,用來存放任務和結果
task_queue=Queue.Queue()
result_queue=Queue.Queue()

class Queuemanager(BaseManager):
    pass
# 第二步:把創建的兩個佇列註冊在網絡上,利用register方法,callable引數關聯了Queue物件,
# 將Queue物件在網絡中暴露
Queuemanager.register('get_task_queue',callable=lambda:task_queue)
Queuemanager.register('get_result_queue',callable=lambda:result_queue)

# 第三步:系結端口8001,設置驗證口令‘qiye’。這個相當於物件的初始化
manager=Queuemanager(address=('',8001),authkey='qiye')

# 第四步:啟動管理,監聽信息通道
manager.start()

# 第五步:通過管理實體的方法獲得通過網絡訪問的Queue物件
task=manager.get_task_queue()
result=manager.get_result_queue()

# 第六步:添加任務
for url in ["ImageUrl_"+i for i in range(10)]:
    print 'put task %s ...' %url
    task.put(url)
# 獲取傳回結果
print 'try get result...'
for i in range(10):
    print 'result is %s' %result.get(timeout=10)
# 關閉管理
manager.shutdown()

任務行程已經編寫完成,接下來編寫任務行程(taskWorker.py),創建任務行程的步驟相對較少,需要四個步驟:

 

  1. 使用QueueManager註冊用於獲取Queue的方法名稱,任務行程只能通過名稱來在網絡上獲取Queue。

  2. 連接服務器,端口和驗證口令註意保持與服務行程中完全一致。

  3. 從網絡上獲取Queue,進行本地化。

  4. 從task佇列獲取任務,並把結果寫入result佇列。

 

程式taskWorker.py代碼(win/linux版)如下:

 

# coding:utf-8
import time
from multiprocessing.managers import BaseManager
# 創建類似的QueueManager:
class QueueManager(BaseManager):
    pass
# 第一步:使用QueueManager註冊用於獲取Queue的方法名稱
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 第二步:連接到服務器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證口令註意保持與服務行程完全一致:
m = QueueManager(address=(server_addr, 8001), authkey='qiye')
# 從網絡連接:
m.connect()
# 第三步:獲取Queue的物件:
task = m.get_task_queue()
result = m.get_result_queue()
# 第四步:從task佇列獲取任務,並把結果寫入result佇列:
while(not task.empty()):
    image_url = task.get(True,timeout=5)
    print('run task download %s...' % image_url)
    time.sleep(1)
    result.put('%s--->success'%image_url)

# 處理結束:
print('worker exit.')

 

最後開始運行程式,先啟動服務行程taskManager.py,運行結果如下:

 

put task ImageUrl_0 ...
put task ImageUrl_1 ...
put task ImageUrl_2 ...
put task ImageUrl_3 ...
put task ImageUrl_4 ...
put task ImageUrl_5 ...
put task ImageUrl_6 ...
put task ImageUrl_7 ...
put task ImageUrl_8 ...
put task ImageUrl_9 ...
try get result...

 

接著再啟動任務行程taskWorker.py,運行結果如下:

 

Connect to server 127.0.0.1...
run task download ImageUrl_0...
run task download ImageUrl_1...
run task download ImageUrl_2...
run task download ImageUrl_3...
run task download ImageUrl_4...
run task download ImageUrl_5...
run task download ImageUrl_6...
run task download ImageUrl_7...
run task download ImageUrl_8...
run task download ImageUrl_9...
worker exit.

 

當任務行程運行結束後,服務行程運行結果如下:

 

result is ImageUrl_0--->success
result is ImageUrl_1--->success
result is ImageUrl_2--->success
result is ImageUrl_3--->success
result is ImageUrl_4--->success
result is ImageUrl_5--->success
result is ImageUrl_6--->success
result is ImageUrl_7--->success
result is ImageUrl_8--->success
result is ImageUrl_9--->success

 

其實這就是一個簡單但真正的分佈式計算,把代碼稍加改造,啟動多個worker,就可以把任務分佈到幾台甚至幾十臺機器上,實現大規模的分佈式爬蟲。

 

註:由於平臺的特性,創建服務行程的代碼在Linux和Windows上有一些不同,創建工作行程的代碼是一致的。

 

taskManager.py程式在Windows版下的代碼如下:

 

# coding:utf-8
# taskManager.py for windows
import Queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
# 任務個數
task_number = 10
# 定義收發佇列
task_queue = Queue.Queue(task_number);
result_queue = Queue.Queue(task_number);
def get_task():
    return task_queue
def get_result():
    return result_queue
# 創建類似的QueueManager:
class QueueManager(BaseManager):
    pass
def win_run():
    # Windows下系結呼叫接口不能使用lambda,所以只能先定義函式再系結
    QueueManager.register('get_task_queue',callable = get_task)
    QueueManager.register('get_result_queue',callable = get_result)
    # 系結端口並設置驗證口令,Windows下需要填寫IP地址,Linux下不填預設為本地
    manager = QueueManager(address = ('127.0.0.1',8001),authkey = 'qiye')
    # 啟動
    manager.start()
    try:
        # 通過網絡獲取任務佇列和結果佇列
        task = manager.get_task_queue()
        result = manager.get_result_queue()
        # 添加任務
        for url in ["ImageUrl_"+str(i) for i in range(10)]:
            print 'put task %s ...' %url
            task.put(url)
        print 'try get result...'
        for i in range(10):
            print 'result is %s' %result.get(timeout=10)
    except:
        print('Manager error')
    finally:
        # 一定要關閉,否則會報管道未關閉的錯誤
        manager.shutdown()

if __name__ == '__main__':
    # Windows下多行程可能會有問題,添加這句可以緩解
    freeze_support()
    win_run()

關於作者:範傳輝,資深網蟲,Python開發者,參與開發了多項網絡應用,在實際開發中積累了豐富的實戰經驗,並善於總結,貢獻了多篇技術文章廣受好評。研究興趣是網絡安全、爬蟲技術、資料分析、驅動開發等技術。

本文摘編自《Python爬蟲開發與專案實戰》,經出版方授權發佈。

延伸閱讀《Python爬蟲開發與專案實戰

點擊上圖瞭解及購買

轉載請聯繫微信:DoctorData

推薦語:零基礎學習爬蟲技術,從Python和Web前端基礎開始講起,由淺入深,包含大量案例,實用性強。

赞(0)

分享創造快樂