歡迎光臨
每天分享高質量文章

想用Python爬小姐姐圖片?那你得先搞定分散式行程

導讀:分散式行程指的是將Process行程分佈到多臺機器上,充分利用多臺機器的效能完成複雜的任務。我們可以將這一點應用到分散式爬蟲的開發中。

 

 

作者:範傳輝

如需轉載請聯絡大資料(ID:hzdashuju)

 

分散式行程在Python中依然要用到multiprocessing模組。multiprocessing模組不但支援多行程,其中managers子模組還支援把多行程分佈到多臺機器上。可以寫一個服務行程作為排程者,將任務分佈到其他多個行程中,依靠網路通訊進行管理。

舉個例子:在做爬蟲程式時,常常會遇到這樣的場景,我們想抓取某個網站的所有圖片,如果使用多行程的話,一般是一個行程負責抓取圖片的連結地址,將連結地址存放到Queue中,另外的行程負責從Queue中讀取連結地址進行下載和儲存到本地。

現在把這個過程做成分散式,一臺機器上的行程負責抓取連結,其他機器上的行程負責下載儲存。那麼遇到的主要問題是將Queue暴露到網路中,讓其他機器行程都可以訪問,分散式行程就是將這一個過程進行了封裝,我們可以將這個過程稱為本地佇列的網路化。整體過程如圖1-24所示。

 

▲圖1-24 分散式行程

 

要實現上面例子的功能,建立分散式行程需要分為六個步驟:

 

  1. 建立佇列Queue,用來進行行程間的通訊。服務行程建立任務佇列task_queue,用來作為傳遞任務給任務行程的通道;服務行程建立結果佇列result_queue,作為任務行程完成任務後回覆服務行程的通道。在分散式多行程環境下,必須透過由Queuemanager獲得的Queue介面來新增任務。

  2. 把第一步中建立的佇列在網路上註冊,暴露給其他行程(主機),註冊後獲得網路佇列,相當於本地佇列的映像。

  3. 建立一個物件(Queuemanager(BaseManager))實體manager,系結埠和驗證口令。

  4. 啟動第三步中建立的實體,即啟動管理manager,監管資訊通道。

  5. 透過管理實體的方法獲得透過網路訪問的Queue物件,即再把網路佇列物體化成可以使用的本地佇列。

  6. 建立任務到“本地”佇列中,自動上傳任務到網路佇列中,分配給任務行程進行處理。

 

接下來透過程式實現上面的例子(Linux版),首先編寫的是服務行程(taskManager.py),程式碼如下:

 

import random,time,Queue
from multiprocessing.managers import BaseManager
# 第一步:建立task_queue和result_queue,用來存放任務和結果
task_queue=Queue.Queue()
result_queue=Queue.Queue()

class Queuemanager(BaseManager):
    pass
# 第二步:把建立的兩個佇列註冊在網路上,利用register方法,callable引數關聯了Queue物件,
# 將Queue物件在網路中暴露
Queuemanager.register('get_task_queue',callable=lambda:task_queue)
Queuemanager.register('get_result_queue',callable=lambda:result_queue)

# 第三步:系結埠8001,設定驗證口令‘qiye’。這個相當於物件的初始化
manager=Queuemanager(address=('',8001),authkey='qiye')

# 第四步:啟動管理,監聽資訊通道
manager.start()

# 第五步:透過管理實體的方法獲得透過網路訪問的Queue物件
task=manager.get_task_queue()
result=manager.get_result_queue()

# 第六步:新增任務
for url in ["ImageUrl_"+i for i in range(10)]:
    print 'put task %s ...' %url
    task.put(url)
# 獲取傳回結果
print 'try get result...'
for i in range(10):
    print 'result is %s' %result.get(timeout=10)
# 關閉管理
manager.shutdown()

任務行程已經編寫完成,接下來編寫任務行程(taskWorker.py),建立任務行程的步驟相對較少,需要四個步驟:

 

  1. 使用QueueManager註冊用於獲取Queue的方法名稱,任務行程只能透過名稱來在網路上獲取Queue。

  2. 連線伺服器,埠和驗證口令註意保持與服務行程中完全一致。

  3. 從網路上獲取Queue,進行本地化。

  4. 從task佇列獲取任務,並把結果寫入result佇列。

 

程式taskWorker.py程式碼(win/linux版)如下:

 

# coding:utf-8
import time
from multiprocessing.managers import BaseManager
# 建立類似的QueueManager:
class QueueManager(BaseManager):
    pass
# 第一步:使用QueueManager註冊用於獲取Queue的方法名稱
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 第二步:連線到伺服器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 埠和驗證口令註意保持與服務行程完全一致:
m = QueueManager(address=(server_addr, 8001), authkey='qiye')
# 從網路連線:
m.connect()
# 第三步:獲取Queue的物件:
task = m.get_task_queue()
result = m.get_result_queue()
# 第四步:從task佇列獲取任務,並把結果寫入result佇列:
while(not task.empty()):
    image_url = task.get(True,timeout=5)
    print('run task download %s...' % image_url)
    time.sleep(1)
    result.put('%s--->success'%image_url)

# 處理結束:
print('worker exit.')

 

最後開始執行程式,先啟動服務行程taskManager.py,執行結果如下:

 

put task ImageUrl_0 ...
put task ImageUrl_1 ...
put task ImageUrl_2 ...
put task ImageUrl_3 ...
put task ImageUrl_4 ...
put task ImageUrl_5 ...
put task ImageUrl_6 ...
put task ImageUrl_7 ...
put task ImageUrl_8 ...
put task ImageUrl_9 ...
try get result...

 

接著再啟動任務行程taskWorker.py,執行結果如下:

 

Connect to server 127.0.0.1...
run task download ImageUrl_0...
run task download ImageUrl_1...
run task download ImageUrl_2...
run task download ImageUrl_3...
run task download ImageUrl_4...
run task download ImageUrl_5...
run task download ImageUrl_6...
run task download ImageUrl_7...
run task download ImageUrl_8...
run task download ImageUrl_9...
worker exit.

 

當任務行程執行結束後,服務行程執行結果如下:

 

result is ImageUrl_0--->success
result is ImageUrl_1--->success
result is ImageUrl_2--->success
result is ImageUrl_3--->success
result is ImageUrl_4--->success
result is ImageUrl_5--->success
result is ImageUrl_6--->success
result is ImageUrl_7--->success
result is ImageUrl_8--->success
result is ImageUrl_9--->success

 

其實這就是一個簡單但真正的分散式計算,把程式碼稍加改造,啟動多個worker,就可以把任務分佈到幾臺甚至幾十臺機器上,實現大規模的分散式爬蟲。

 

註:由於平臺的特性,建立服務行程的程式碼在Linux和Windows上有一些不同,建立工作行程的程式碼是一致的。

 

taskManager.py程式在Windows版下的程式碼如下:

 

# coding:utf-8
# taskManager.py for windows
import Queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
# 任務個數
task_number = 10
# 定義收發佇列
task_queue = Queue.Queue(task_number);
result_queue = Queue.Queue(task_number);
def get_task():
    return task_queue
def get_result():
    return result_queue
# 建立類似的QueueManager:
class QueueManager(BaseManager):
    pass
def win_run():
    # Windows下系結呼叫介面不能使用lambda,所以只能先定義函式再系結
    QueueManager.register('get_task_queue',callable = get_task)
    QueueManager.register('get_result_queue',callable = get_result)
    # 系結埠並設定驗證口令,Windows下需要填寫IP地址,Linux下不填預設為本地
    manager = QueueManager(address = ('127.0.0.1',8001),authkey = 'qiye')
    # 啟動
    manager.start()
    try:
        # 透過網路獲取任務佇列和結果佇列
        task = manager.get_task_queue()
        result = manager.get_result_queue()
        # 新增任務
        for url in ["ImageUrl_"+str(i) for i in range(10)]:
            print 'put task %s ...' %url
            task.put(url)
        print 'try get result...'
        for i in range(10):
            print 'result is %s' %result.get(timeout=10)
    except:
        print('Manager error')
    finally:
        # 一定要關閉,否則會報管道未關閉的錯誤
        manager.shutdown()

if __name__ == '__main__':
    # Windows下多行程可能會有問題,新增這句可以緩解
    freeze_support()
    win_run()

關於作者:範傳輝,資深網蟲,Python開發者,參與開發了多項網路應用,在實際開發中積累了豐富的實戰經驗,並善於總結,貢獻了多篇技術文章廣受好評。研究興趣是網路安全、爬蟲技術、資料分析、驅動開發等技術。

本文摘編自《Python爬蟲開發與專案實戰》,經出版方授權釋出。

延伸閱讀《Python爬蟲開發與專案實戰

點選上圖瞭解及購買

轉載請聯絡微信:DoctorData

推薦語:零基礎學習爬蟲技術,從Python和Web前端基礎開始講起,由淺入深,包含大量案例,實用性強。

贊(0)

分享創造快樂