导读: 每个数据科学专业人员都必须从不同的数据源中提取、转换和加载(Extract-Transform-Load,ETL)数据。

本文将讨论如何使用Python为选定的流行数据库实现数据的ETL。对于关系数据库,选择MySQL,并将Elasticsearch作为文档数据库的例子展开。对于图形数据库,选择Neo4j。对于NoSQL,可参考此前文章中介绍的 MongoDB

作者:萨扬·穆霍帕迪亚(Sayan Mukhopadhyay)

如需转载请联系大数据(ID:hzdashuju)

  • ElasticSearch是一个基于Lucene的搜索服务器。 它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。 Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。

  • Neo4j是一个高性能的,NOSQL图形数据库,它将结构化数据存储在网络上(从数学角度叫做图)而不是表中,是一个嵌入式的、基于磁盘的、具备完全的事务特性的Java持久化引擎。

01 MySQL

MySQLdb是在MySQL C接口上面开发的Python API。

1. 如何安装MySQLdb

首先,需要在计算机上安装Python MySQLdb模块。然后运行以下脚本:

#!/usr/bin/python
import MySQLdb

如果出现导入错误,则表示模块未正确安装。

以下是安装MySQL Python模块的说明:

$gunzip MySQL-python-1.2.2.tar.gz
$tar –xvf MySQL-python-1.2.2.tar
$cd MySQL-python-1.2.2
$python setup.py build
$python setup.py install

2. 数据库连接

在连接到MySQL数据库之前,请确保有以下内容。

  • 有一个名为TEST的数据库。

  • 在TEST数据库中有一个表STUDENT。

  • STUDENT表有三个字段:NAME、SUR_NAME和ROLL_NO。

  • 用户对TEST数据库具有完全访问权限。

3. INSERT操作

以下代码执行SQL INSERT语句,以便在STUDENT表中创建记录:

#!/usr/bin/python
import MySQLdb
# Open database connection
db = MySQLdb.connect("localhost","user","passwd","TEST" )
# prepare a cursor object using cursor() method
cursor = db.cursor()
# Prepare SQL query to INSERT a record into the database.
sql = """INSERT INTO STUDENT(NAME,
         SUR_NAME, ROLL_NO)
         VALUES ('Sayan', 'Mukhopadhyay', 1)"""

try:
   # Execute the SQL command
cursor.execute(sql)
   # Commit your changes in the database
   db.commit()
except:
   # Rollback in case there is any error
   db.rollback()
# disconnect from server
db.close()

4. READ操作

以下代码从STUDENT表中提取数据并打印出来:

#!/usr/bin/python
import MySQLdb
# Open database connection
db = MySQLdb.connect("localhost","user","passwd","TEST" )
# prepare a cursor object using cursor() method
cursor = db.cursor()
# Prepare SQL query to INSERT a record into the database.
sql = "SELECT * FROM STUDENT "
try:
   # Execute the SQL command
cursor.execute(sql)
   # Fetch all the rows in a list of lists.
results = cursor.fetchall()
for row in results:
fname = row[0]
lname = row[1]
id = row[2]
      # Now print fetched result
print "name=%s,surname=%s,id=%d" % \
             (fname, lname, id )
except:
print "Error: unable to fecth data"
# disconnect from server
db.close()

5. DELETE操作

以下代码从TEST中删除id=1的一行数据:

#!/usr/bin/python
import MySQLdb
# Open database connection
db = MySQLdb.connect("localhost","test","passwd","TEST")

#prepare a cursor object using cursor() method
cursor = db.cursor()

# PrepareSQL query to DELETE required records
sql="DELETE FROM STUDENT WHERE ROLL_NO=1"
try:
#Execute the SQL command 
cursor.execute(sql)
#Commit your changes in the database
db.commit()
except:
#Roll back in case there is any error
db.rollback()

#disconnect from server 
db.close()

6. UPDATE操作

以下代码将lastname为Mukhopadhyay的记录更改为Mukherjee:

#!/usr/bin/python
import MySQLdb
# Open database connection
db = MySQLdb.connect("localhost","user","passwd","TEST" )
# prepare a cursor object using
cursor() method cursor = db.cursor()
# Prepare SQL query to UPDATE required records
sql = "UPDATE STUDENT SET SUR_NAME="Mukherjee"
                          WHERE SUR_NAME="
Mukhopadhyay""
try:
   # Execute the SQL command
cursor.execute(sql)
   # Commit your changes in the database
db.commit()
except:
   # Rollback in case there is any error
db.rollback()
# disconnect from server
db.close()

7. COMMIT操作

提交操作提供对数据库完成修改命令,并且在此操作之后,无法将其还原。

8. ROLL-BACK操作

如果不能确认对数据的修改同时想要撤回操作,可以使用roll-back()方法。

以下是通过Python访问MySQL数据的完整示例。它将提供将数据存储为CSV文件或MySQL数据库中的数据的完整描述。

import MySQLdb
import sys

out = open('Config1.txt','w')
print "Enter the Data Source Type:"
print "1. MySql"
print "2. Text"
print "3. Exit"

while(1):
       data1 = sys.stdin.readline().strip()
       if(int(data1) == 1):
             out.write("source begin"+"\n"+"type=mysql\n")
             print "Enter the ip:"
             ip = sys.stdin.readline().strip()
             out.write("host=" + ip + "\n")
             print "Enter the database name:"
             db = sys.stdin.readline().strip()
             out.write("database=" + db + "\n")
             print "Enter the user name:"
             usr = sys.stdin.readline().strip()
             out.write("user=" + usr + "\n")
             print "Enter the password:"
             passwd = sys.stdin.readline().strip()
             out.write("password=" + passwd + "\n")
             connection = MySQLdb.connect(ip, usr, passwd, db)
             cursor = connection.cursor()
             query = "show tables"
             cursor.execute(query)
             data = cursor.fetchall()
             tables = []
             for row in data:
                    for field in row:
                           tables.append(field.strip())
             for i in range(len(tables)):
                    print i, tables[i]
             tb = tables[int(sys.stdin.readline().strip())]
             out.write("table=" + tb + "\n")
             query = "describe " + tb
             cursor.execute(query)
             data = cursor.fetchall()
             columns = []
             for row in data:
                    columns.append(row[0].strip())
             for i in range(len(columns)):
                    print columns[i] 
             print "Not index choose the exact column names seperated by coma"
             cols = sys.stdin.readline().strip()
             out.write("columns=" + cols + "\n")

             cursor.close()
             connection.close()
             out.write("source end"+"\n")

             print "Enter the Data Source Type:"
             print "1. MySql"
             print "2. Text"
             print "3. Exit"

       if(int(data1) == 2):
             print "path of text file:"
             path = sys.stdin.readline().strip()
             file = open(path)
             count = 0
             for line in file:
                    print line
                    count = count + 1
                    if count > 3:
                          break
             file.close()
             out.write("source begin"+"\n"+"type=text\n")
             out.write("path=" + path + "\n")
             print "enter delimeter:"
             dlm = sys.stdin.readline().strip()
             out.write("dlm=" + dlm + "\n")
             print "enter column indexes seperated by comma:"
             cols = sys.stdin.readline().strip()
             out.write("columns=" + cols + "\n")
             out.write("source end"+"\n")

             print "Enter the Data Source Type:"
             print "1. MySql"
             print "2. Text"
             print "3. Exit"

       if(int(data1) == 3):
             out.close()
             sys.exit()

02 Elasticsearch

Elasticsearch(ES)低级客户端提供从Python到ES REST端点的直接映射。Elasticsearch的一大优势是为数据分析提供了全栈解决方案。Elasticsearch作为数据库,有可配置前端Kibana、数据收集工具Logstash以及企业安全工具Shield。

下例具有称为cat、cluster、indices、ingest、nodes、snapshot和tasks的特征,根据任务分别转换为CatClient、ClusterClient、IndicesClient、IngestClient、NodesClient、SnapshotClient和TasksClient实例。这些实例是访问这些类及其方法的唯一方式。

你可以指定自己的连接类,可以通过提供的connection_class参数来使用。

# create connection to local host using the ThriftConnection
Es1=Elasticsearch(connection_class=ThriftConnection)

如果你想打开sniffing,那么有几个选择:

# create connection that will automatically inspect the cluster to get
# the list of active nodes. Start with nodes running on 'esnode1' and
# 'esnode2'
Es1=Elasticsearch(
    ['esnode1''esnode2'],
# sniff before doing anything
sniff_on_start=True,
# refresh nodes after a node fails to respond
sniff_on_connection_fail=True,
# and also every 30 seconds
sniffer_timeout=30
)

不同的主机可以有不同的参数,你可以为每个节点使用一个字典来指定它们。

# connect to localhost directly and another node using SSL on port 443
# and an url_prefix. Note that ``port`` needs to be an int.
Es1=Elasticsearch([
{'host':'localhost'},
{'host':'othernode','port':443,'url_prefix':'es','use_ssl':True},
])

还支持SSL客户端身份验证(有关选项的详细说明,请参阅Urllib3 HttpConnection)。

Es1=Elasticsearch(
['localhost:443','other_host:443'],
# turn on SSL
use_ssl=True,
# make sure we verify SSL certificates (off by default)
verify_certs=True,
# provide a path to CA certs on disk
ca_certs='path to CA_certs',
# PEM formatted SSL client certificate
client_cert='path to clientcert.pem',
# PEM formatted SSL client key
client_key='path to clientkey.pem'
)
  • 连接层API

许多类负责处理Elasticsearch集群。这里可以通过将参数传递给Elasticsearch类来忽略正在使用的默认子类。属于客户端的每个参数都将添加到Transport、ConnectionPool和Connection上。

例如,如果你要使用定制的ConnectionSelector类,只需传入selector_class参数即可。

整个API以很高的精确度包装了原始REST API,其中包括区分调用必需参数和可选参数。这意味着代码区分了按排位的参数和关键字参数。建议读者使用关键字参数来保证所有调用的一致性和安全性。

如果Elasticsearch返回2XX,则API调用成功(并将返回响应)。否则,将引发TransportError(或更具体的子类)的实例。你可以在异常中查看其他异常和错误状态。如果你不希望引发异常,可以通过传入ignore参数忽略状态代码或状态代码列表。

from elasticsearch import Elasticsearch
es=Elasticsearch()
# ignore 400 cause by IndexAlreadyExistsException when creating an index
es.indices.create(index='test-index',ignore=400)
# ignore 404 and 400
es.indices.delete(index='test-index',ignore=[400,404])

03 Neo4j Python驱动

Neo4j支持Neo4j Python驱动,并通过二进制协议与数据库连接。它试图保持简约及Python的惯用方式。

pip install neo4j-driver
from neo4j.v1 import GraphDatabase, basic_auth
driver11 = GraphDatabase.driver("bolt://localhost", auth=basic_auth("neo4j""neo4j"))
session11 = driver11.session()
session11.run("CREATE (a:Person {name:'Sayan',title:'Mukhopadhyay'})")
result11= session11.run("MATCH (a:Person) WHERE a.name ='Sayan' RETURN a.name AS name, a.title AS title")
for recordi n result11:
print("%s %s"% (record["title"], record["name"]))
session11.close()

04 neo4j-rest-client

neo4j-rest-client的主要目标是确保已经使用本地Neo4j的Python程序员通过python-embedded的方式也能够访问Neo4j REST服务器。因此,neo4j-rest-client API的结构与python-embedded完全同步。但是引入了一种新的结构,以达到更加Python化的风格,并通过Neo4j团队引入的新特性来增强API。

05 内存数据库

另一个重要的数据库类是内存数据库。它在RAM中存储和处理数据。因此,对数据库的操作非常快,并且数据是灵活的。SQLite是内存数据库的一个流行范例。在Python中,需要使用sqlalchemy库来操作SQLite。在第1章的Flask和Falcon示例中,展示了如何从SQLite中选择数据。以下将展示如何在SQLite中存储Pandas数据框架:

from sqlalchemy import create_engine
import sqlite3
conn = sqlite3.connect('multiplier.db')
conn.execute('''CREATE TABLE if not exists multiplier
       (domain        CHAR(50),
        low        REAL,
        high        REAL);'''
)
conn.close()
db_name = "sqlite:///" + prop + "_" + domain + str(i) + ".db"
disk_engine = create_engine(db_name)
df.to_sql('scores', disk_engine, if_exists='replace')

06 Python版本MongoDB

这部分内容请见此前的文章 数据处理入门干货: MongoDB和pandas极简教程

关于作者: Sayan Mukhopadhyay拥有超过13年的行业经验,并与瑞信、PayPal、CA Technologies、CSC和Mphasis等公司建立了联系。他对投资银行、在线支付、在线广告、IT架构和零售等领域的数据分析应用有着深刻的理解。他的专业领域是在分布式和数据驱动的环境(如实时分析、高频交易等)中,实现高性能计算。

本文摘编自《 Python高级数据分析:机器学习、深度学习和NLP实例 》,经出版方授权发布。

延伸阅读《 Python高级数据分析

点击上图了解及购买

转载请联系微信:DoctorData

推荐语: 本书介绍高级数据分析概念的广泛基础,以及最近的数据库革命,如Neo4j、弹性搜索和MongoDB。本书讨论了如何实现包括局部爬取在内的ETL技术,并应用于高频算法交易和目标导向的对话系统等领域。还有一些机器学习概念的例子,如半监督学习、深度学习和NLP。本书还涵盖了重要的传统数据分析技术,如时间序列和主成分分析等。

你想免费读这本《Python高级数据分析:机器学习、深度学习和NLP实例》吗?

「大数据」内容合伙人之「鉴书小分队」上线啦!

最近,你都在读什么书? 有哪些心得体会想要跟大家分享?

数据叔最近搞了个大事——联合优质图书出版商机械工业出版社华章公司发起鉴书活动。

简单说就是: 你可以免费读新书,你可以免费读新书的同时,顺手码一篇读书笔记就行。 详情请在大数据公众号后台对话框回复 合伙人 查看。

京东图书大促开始啦!

每满100-50! 满200-100!满300-150!

长按二维码进入唯一领券入口,领取大数据粉丝专属“ 200减35 ”叠加满减券。

是的!你没看错!相当于 165块买400块 的书!!!(打了多少折你敲敲旁边的计算器……)

长按下方二维码 或点击 阅读原文

发现更多好书

PPT | 报告 | 读书 | 书单 | 干货

大数据 | 揭秘 | Python | 可视化

人工智能 | 机器学习 | 深度学习 | 神经网络

AI | 1024 | 段子 | 区块链 | 数学

Q: 你常用哪些数据库?

欢迎留言并赢取福利

觉得不错,请把这篇文章分享给你的朋友

转载 / 投稿请联系:baiyu@hzbook.com

更多精彩,请在后台点击“历史文章”查看

640?wx_fmt=gif 点击 阅读原文 ,了解更多

导读:每个数据科学专业人员都必须从不同的数据源中提取、转换和加载(Extract-Transform-Load,ETL)数据。本文将讨论如何使用Python为选定的流行数据库实现数据的ETL。对于关系数据库,选择MySQL,并将Elasticsearch作为文档数据库的例子展开。对于图形数据库,选择Neo4j。对于NoSQL,可参考此前文章中介绍的MongoDB。作者:萨扬... import sqlite3 import pymysql from py etl import Task, DatabaseReader, DatabaseWriter, ElasticsearchWriter, FileWriter src = sqlite3.connect("file.db") reader = DatabaseReader(src, table_name="source_table") # 数据库 之间 数据 同步,表到表传输 dst = pymysql.connect(host="localhost", user="your_user", password="your_password", db="tes import pandas as pd import time data = pd.read_excel(' ETL _ 数据 清洗挑战.xlsx','测试 数据 ',dtype=str)#读取 数据 data_dict = data.to_dict(orient = 'dict')#将 数据 转换 为字典 #print(data['CHECK_POINT']) listDate = []#创建列表并初始化 for cell in data_dict['CHECK_POINT'].values():#遍
ETL 的考虑       做 数据 仓库 系统 ETL 是关键的一环。说大了, ETL 数据 整合解决方案,说小了,就是倒 数据 的工具。回忆一下工作这么些年来,处理 数据 迁移、 转换 的工作倒 还真的不少。但是那些工作基本上是一次性工作或者很小 数据 量,使用access、DTS或是自己编个小程序搞定。可是在 数据 仓库 系统 中, ETL 上升到了一 定的理论高度,和原来小打小闹的工具使用不同了。究竟什么不同,从名字上就可以看到,人家已经将倒 数据 的过程分成3个步骤,E、T、L分别代表 抽取 转换 和装载。     其 实 ETL 过程就是 数据 流动的过程,从不同的 数据 源流向不同的目标 数据 。但在 数据 仓库中, ETL 有几个特点,一是数
ETL (Extract, Transform, Load)是 数据 仓库中常用的一种 数据 处理流程。在 Python 中,有许多工具和库可以用于 ETL 任务。 最常用的是使用 Python 的pandas库来进行 数据 提取 转换 加载 。下面是一个简单的示例代码: ``` python import pandas as pd # 从 数据 提取 数据 data = pd.read_csv('input.csv') # 数据 转换 data['new_column'] = data['old_column'] * 2 # 加载 数据 到目标位置 data.to_csv('output.csv', index=False) 在这个例子中,我们使用了pandas库来读取一个CSV文件( 数据 提取 ),对 数据 进行简单的 转换 数据 转换 ),然后将处理后的 数据 保存到另一个CSV文件中( 数据 加载 )。 除了pandas, Python 还有其他一些常用的 ETL 工具和库,例如: - Apache Airflow:用于编排和调度 ETL 任务的开源工具。 - Apache Spark:用于大规模 数据 处理和分析的快速通用引擎,可以通过PySpark( Python API)进行 ETL 操作。 - SQLAlchemy:用于 数据库 访问和操作的 Python ORM(对象关系映射)工具。 这只是一些常见的工具和库,实际上还有许多其他选择,具体取决于你的需求和项目要求。