歡迎光臨
每天分享高質量文章

以5個資料庫為例,用Python實現資料的提取、轉換和加載(ETL)

導讀:每個資料科學專業人員都必須從不同的資料源中提取、轉換和加載(Extract-Transform-Load,ETL)資料。

本文將討論如何使用Python為選定的流行資料庫實現資料的ETL。對於關係資料庫,選擇MySQL,並將Elasticsearch作為文件資料庫的例子展開。對於圖形資料庫,選擇Neo4j。對於NoSQL,可參考此前文章中介紹的MongoDB

 

 

作者:薩揚·穆霍帕迪亞(Sayan Mukhopadhyay)

如需轉載請聯繫大資料(ID:hzdashuju)

 

  • ElasticSearch是一個基於Lucene的搜索服務器。它提供了一個分佈式多用戶能力的全文搜索引擎,基於RESTful web接口。Elasticsearch是用Java開發的,並作為Apache許可條款下的開放原始碼發佈,是當前流行的企業級搜索引擎。

  • Neo4j是一個高性能的,NOSQL圖形資料庫,它將結構化資料儲存在網絡上(從數學角度叫做圖)而不是表中,是一個嵌入式的、基於磁盤的、具備完全的事務特性的Java持久化引擎。

 

 

01 MySQL

 

MySQLdb是在MySQL C接口上面開發的Python API。

 

1. 如何安裝MySQLdb

 

首先,需要在計算機上安裝Python MySQLdb模塊。然後運行以下腳本:

 

#!/usr/bin/python
import MySQLdb

 

如果出現匯入錯誤,則表示模塊未正確安裝。

 

以下是安裝MySQL Python模塊的說明:

 

$gunzip MySQL-python-1.2.2.tar.gz
$tar –xvf MySQL-python-1.2.2.tar
$cd MySQL-python-1.2.2
$python setup.py build
$python setup.py install

 

2. 資料庫連接

 

在連接到MySQL資料庫之前,請確保有以下內容。

 

  • 有一個名為TEST的資料庫。

  • 在TEST資料庫中有一個表STUDENT。

  • STUDENT表有三個欄位:NAME、SUR_NAME和ROLL_NO。

  • 用戶對TEST資料庫具有完全訪問權限。

 

3. INSERT操作

 

以下代碼執行SQL INSERT陳述句,以便在STUDENT表中創建記錄:

 

#!/usr/bin/python
import MySQLdb
# Open database connection
db = MySQLdb.connect("localhost","user","passwd","TEST" )
# prepare a cursor object using cursor() method
cursor = db.cursor()
# Prepare SQL query to INSERT a record into the database.
sql = """INSERT INTO STUDENT(NAME,
         SUR_NAME, ROLL_NO)
         VALUES ('Sayan', 'Mukhopadhyay', 1)"""
try:
   # Execute the SQL command
cursor.execute(sql)
   # Commit your changes in the database
   db.commit()
except:
   # Rollback in case there is any error
   db.rollback()
# disconnect from server
db.close()

 

4. READ操作

 

以下代碼從STUDENT表中提取資料並打印出來:

 

#!/usr/bin/python
import MySQLdb
# Open database connection
db = MySQLdb.connect("localhost","user","passwd","TEST" )
# prepare a cursor object using cursor() method
cursor = db.cursor()
# Prepare SQL query to INSERT a record into the database.
sql = "SELECT * FROM STUDENT "
try:
   # Execute the SQL command
cursor.execute(sql)
   # Fetch all the rows in a list of lists.
results = cursor.fetchall()
for row in results:
fname = row[0]
lname = row[1]
id = row[2]
      # Now print fetched result
print "name=%s,surname=%s,id=%d" % \
             (fname, lname, id )
except:
print "Error: unable to fecth data"
# disconnect from server
db.close()

 

5. DELETE操作

 

以下代碼從TEST中刪除id=1的一行資料:

 

#!/usr/bin/python
import MySQLdb
# Open database connection
db = MySQLdb.connect("localhost","test","passwd","TEST")

#prepare a cursor object using cursor() method
cursor = db.cursor()

# PrepareSQL query to DELETE required records
sql="DELETE FROM STUDENT WHERE ROLL_NO=1"
try:
#Execute the SQL command 
cursor.execute(sql)
#Commit your changes in the database
db.commit()
except:
#Roll back in case there is any error
db.rollback()

#disconnect from server 
db.close()

 

6. UPDATE操作

 

以下代碼將lastname為Mukhopadhyay的記錄更改為Mukherjee:

 

#!/usr/bin/python
import MySQLdb
# Open database connection
db = MySQLdb.connect("localhost","user","passwd","TEST" )
# prepare a cursor object using
cursor() method cursor = db.cursor()
# Prepare SQL query to UPDATE required records
sql = "UPDATE STUDENT SET SUR_NAME="Mukherjee"
                          WHERE SUR_NAME="Mukhopadhyay""
try:
   # Execute the SQL command
cursor.execute(sql)
   # Commit your changes in the database
db.commit()
except:
   # Rollback in case there is any error
db.rollback()
# disconnect from server
db.close()

 

7. COMMIT操作

 

提交操作提供對資料庫完成修改命令,並且在此操作之後,無法將其還原。

 

8. ROLL-BACK操作

 

如果不能確認對資料的修改同時想要撤回操作,可以使用roll-back()方法。

 

以下是通過Python訪問MySQL資料的完整示例。它將提供將資料儲存為CSV檔案或MySQL資料庫中的資料的完整描述。

 

import MySQLdb
import sys

out = open('Config1.txt','w')
print "Enter the Data Source Type:"
print "1. MySql"
print "2. Text"
print "3. Exit"

while(1):
       data1 = sys.stdin.readline().strip()
       if(int(data1) == 1):
             out.write("source begin"+"\n"+"type=mysql\n")
             print "Enter the ip:"
             ip = sys.stdin.readline().strip()
             out.write("host=" + ip + "\n")
             print "Enter the database name:"
             db = sys.stdin.readline().strip()
             out.write("database=" + db + "\n")
             print "Enter the user name:"
             usr = sys.stdin.readline().strip()
             out.write("user=" + usr + "\n")
             print "Enter the password:"
             passwd = sys.stdin.readline().strip()
             out.write("password=" + passwd + "\n")
             connection = MySQLdb.connect(ip, usr, passwd, db)
             cursor = connection.cursor()
             query = "show tables"
             cursor.execute(query)
             data = cursor.fetchall()
             tables = []
             for row in data:
                    for field in row:
                           tables.append(field.strip())
             for i in range(len(tables)):
                    print i, tables[i]
             tb = tables[int(sys.stdin.readline().strip())]
             out.write("table=" + tb + "\n")
             query = "describe " + tb
             cursor.execute(query)
             data = cursor.fetchall()
             columns = []
             for row in data:
                    columns.append(row[0].strip())
             for i in range(len(columns)):
                    print columns[i] 
             print "Not index choose the exact column names seperated by coma"
             cols = sys.stdin.readline().strip()
             out.write("columns=" + cols + "\n")

             cursor.close()
             connection.close()
             out.write("source end"+"\n")

             print "Enter the Data Source Type:"
             print "1. MySql"
             print "2. Text"
             print "3. Exit"

       if(int(data1) == 2):
             print "path of text file:"
             path = sys.stdin.readline().strip()
             file = open(path)
             count = 0
             for line in file:
                    print line
                    count = count + 1
                    if count > 3:
                          break
             file.close()
             out.write("source begin"+"\n"+"type=text\n")
             out.write("path=" + path + "\n")
             print "enter delimeter:"
             dlm = sys.stdin.readline().strip()
             out.write("dlm=" + dlm + "\n")
             print "enter column indexes seperated by comma:"
             cols = sys.stdin.readline().strip()
             out.write("columns=" + cols + "\n")
             out.write("source end"+"\n")

             print "Enter the Data Source Type:"
             print "1. MySql"
             print "2. Text"
             print "3. Exit"

       if(int(data1) == 3):
             out.close()
             sys.exit()

 

 

02 Elasticsearch

 

Elasticsearch(ES)低級客戶端提供從Python到ES REST端點的直接映射。Elasticsearch的一大優勢是為資料分析提供了全棧解決方案。Elasticsearch作為資料庫,有可配置前端Kibana、資料收集工具Logstash以及企業安全工具Shield。

 

下例具有稱為cat、cluster、indices、ingest、nodes、snapshot和tasks的特征,根據任務分別轉換為CatClient、ClusterClient、IndicesClient、IngestClient、NodesClient、SnapshotClient和TasksClient實體。這些實體是訪問這些類及其方法的唯一方式。

 

你可以指定自己的連接類,可以通過提供的connection_class引數來使用。

 

# create connection to local host using the ThriftConnection
Es1=Elasticsearch(connection_class=ThriftConnection)

 

如果你想打開sniffing,那麼有幾個選擇:

 

# create connection that will automatically inspect the cluster to get
# the list of active nodes. Start with nodes running on 'esnode1' and
# 'esnode2'
Es1=Elasticsearch(
    ['esnode1''esnode2'],
# sniff before doing anything
sniff_on_start=True,
# refresh nodes after a node fails to respond
sniff_on_connection_fail=True,
# and also every 30 seconds
sniffer_timeout=30
)

 

不同的主機可以有不同的引數,你可以為每個節點使用一個字典來指定它們。

 

# connect to localhost directly and another node using SSL on port 443
# and an url_prefix. Note that ``port`` needs to be an int.
Es1=Elasticsearch([
{'host':'localhost'},
{'host':'othernode','port':443,'url_prefix':'es','use_ssl':True},
])

 

還支持SSL客戶端身份驗證(有關選項的詳細說明,請參閱Urllib3HttpConnection)。

 

Es1=Elasticsearch(
['localhost:443','other_host:443'],
# turn on SSL
use_ssl=True,
# make sure we verify SSL certificates (off by default)
verify_certs=True,
# provide a path to CA certs on disk
ca_certs='path to CA_certs',
# PEM formatted SSL client certificate
client_cert='path to clientcert.pem',
# PEM formatted SSL client key
client_key='path to clientkey.pem'
)

 

  • 連接層API

 

許多類負責處理Elasticsearch集群。這裡可以通過將引數傳遞給Elasticsearch類來忽略正在使用的預設子類。屬於客戶端的每個引數都將添加到Transport、ConnectionPool和Connection上。

 

例如,如果你要使用定製的ConnectionSelector類,只需傳入selector_class引數即可。

 

整個API以很高的精確度包裝了原始REST API,其中包括區分呼叫必需引數和可選引數。這意味著代碼區分了按排位的引數和關鍵字引數。建議讀者使用關鍵字引數來保證所有呼叫的一致性和安全性。

 

如果Elasticsearch傳回2XX,則API呼叫成功(並將傳迴響應)。否則,將引發TransportError(或更具體的子類)的實體。你可以在異常中查看其他異常和錯誤狀態。如果你不希望引發異常,可以通過傳入ignore引數忽略狀態代碼或狀態代碼串列。

 

from elasticsearch import Elasticsearch
es=Elasticsearch()
# ignore 400 cause by IndexAlreadyExistsException when creating an index
es.indices.create(index='test-index',ignore=400)
# ignore 404 and 400
es.indices.delete(index='test-index',ignore=[400,404])

 

 

03 Neo4j Python驅動

 

Neo4j支持Neo4j Python驅動,並通過二進制協議與資料庫連接。它試圖保持簡約及Python的慣用方式。

 

pip install neo4j-driver
from neo4j.v1 import GraphDatabase, basic_auth
driver11 = GraphDatabase.driver("bolt://localhost", auth=basic_auth("neo4j""neo4j"))
session11 = driver11.session()
session11.run("CREATE (a:Person {name:'Sayan',title:'Mukhopadhyay'})")
result11= session11.run("MATCH (a:Person) WHERE a.name ='Sayan' RETURN a.name AS name, a.title AS title")
for recordi n result11:
print("%s %s"% (record["title"], record["name"]))
session11.close()

 

 

04 neo4j-rest-client

 

neo4j-rest-client的主要標的是確保已經使用本地Neo4j的Python程式員通過python-embedded的方式也能夠訪問Neo4j REST服務器。因此,neo4j-rest-client API的結構與python-embedded完全同步。但是引入了一種新的結構,以達到更加Python化的風格,並通過Neo4j團隊引入的新特性來增強API。

 

 

05 記憶體資料庫

 

另一個重要的資料庫類是記憶體資料庫。它在RAM中儲存和處理資料。因此,對資料庫的操作非常快,並且資料是靈活的。SQLite是記憶體資料庫的一個流行範例。在Python中,需要使用sqlalchemy庫來操作SQLite。在第1章的Flask和Falcon示例中,展示瞭如何從SQLite中選擇資料。以下將展示如何在SQLite中儲存Pandas資料框架:

 

from sqlalchemy import create_engine
import sqlite3
conn = sqlite3.connect('multiplier.db')
conn.execute('''CREATE TABLE if not exists multiplier
       (domain        CHAR(50),
        low        REAL,
        high        REAL);''')
conn.close()
db_name = "sqlite:///" + prop + "_" + domain + str(i) + ".db"
disk_engine = create_engine(db_name)
df.to_sql('scores', disk_engine, if_exists='replace')

 

 

06 Python版本MongoDB

 

這部分內容請見此前的文章資料處理入門乾貨:MongoDB和pandas極簡教程

 

關於作者:Sayan Mukhopadhyay擁有超過13年的行業經驗,並與瑞信、PayPal、CA Technologies、CSC和Mphasis等公司建立了聯繫。他對投資銀行、在線支付、在線廣告、IT架構和零售等領域的資料分析應用有著深刻的理解。他的專業領域是在分佈式和資料驅動的環境(如實時分析、高頻交易等)中,實現高性能計算。

本文摘編自《Python高級資料分析:機器學習、深度學習和NLP實體》,經出版方授權發佈。

延伸閱讀《Python高級資料分析

點擊上圖瞭解及購買

轉載請聯繫微信:DoctorData

推薦語:本書介紹高級資料分析概念的廣泛基礎,以及最近的資料庫革命,如Neo4j、彈性搜索和MongoDB。本書討論瞭如何實現包括區域性爬取在內的ETL技術,並應用於高頻演算法交易和標的導向的對話系統等領域。還有一些機器學習概念的例子,如半監督學習、深度學習和NLP。本書還涵蓋了重要的傳統資料分析技術,如時間序列和主成分分析等。

已同步到看一看
赞(0)

分享創造快樂