导读:
每个数据科学专业人员都必须从不同的数据源中提取、转换和加载(Extract-Transform-Load,ETL)数据。
本文将讨论如何使用Python为选定的流行数据库实现数据的ETL。对于关系数据库,选择MySQL,并将Elasticsearch作为文档数据库的例子展开。对于图形数据库,选择Neo4j。对于NoSQL,可参考此前文章中介绍的
MongoDB
。
作者:萨扬·穆霍帕迪亚(Sayan Mukhopadhyay)
如需转载请联系大数据(ID:hzdashuju)
-
ElasticSearch是一个基于Lucene的搜索服务器。
它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。
Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。
-
Neo4j是一个高性能的,NOSQL图形数据库,它将结构化数据存储在网络上(从数学角度叫做图)而不是表中,是一个嵌入式的、基于磁盘的、具备完全的事务特性的Java持久化引擎。
01 MySQL
MySQLdb是在MySQL C接口上面开发的Python API。
1. 如何安装MySQLdb
首先,需要在计算机上安装Python MySQLdb模块。然后运行以下脚本:
import MySQLdb
如果出现导入错误,则表示模块未正确安装。
以下是安装MySQL Python模块的说明:
$gunzip MySQL-python-1.2.2.tar.gz
$tar –xvf MySQL-python-1.2.2.tar
$cd MySQL-python-1.2.2
$python setup.py build
$python setup.py install
2. 数据库连接
在连接到MySQL数据库之前,请确保有以下内容。
3. INSERT操作
以下代码执行SQL INSERT语句,以便在STUDENT表中创建记录:
import MySQLdb
db = MySQLdb.connect("localhost","user","passwd","TEST" )
cursor = db.cursor()
sql = """INSERT INTO STUDENT(NAME,
SUR_NAME, ROLL_NO)
VALUES ('Sayan', 'Mukhopadhyay', 1)"""
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
db.close()
4. READ操作
以下代码从STUDENT表中提取数据并打印出来:
import MySQLdb
db = MySQLdb.connect("localhost","user","passwd","TEST" )
cursor = db.cursor()
sql = "SELECT * FROM STUDENT "
try:
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
fname = row[0]
lname = row[1]
id = row[2]
print "name=%s,surname=%s,id=%d" % \
(fname, lname, id )
except:
print "Error: unable to fecth data"
db.close()
5. DELETE操作
以下代码从TEST中删除id=1的一行数据:
import MySQLdb
db = MySQLdb.connect("localhost","test","passwd","TEST")
cursor = db.cursor()
sql="DELETE FROM STUDENT WHERE ROLL_NO=1"
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
db.close()
6. UPDATE操作
以下代码将lastname为Mukhopadhyay的记录更改为Mukherjee:
import MySQLdb
db = MySQLdb.connect("localhost","user","passwd","TEST" )
cursor() method cursor = db.cursor()
sql = "UPDATE STUDENT SET SUR_NAME="Mukherjee"
WHERE SUR_NAME="Mukhopadhyay""
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
db.close()
7. COMMIT操作
提交操作提供对数据库完成修改命令,并且在此操作之后,无法将其还原。
8. ROLL-BACK操作
如果不能确认对数据的修改同时想要撤回操作,可以使用roll-back()方法。
以下是通过Python访问MySQL数据的完整示例。它将提供将数据存储为CSV文件或MySQL数据库中的数据的完整描述。
import MySQLdb
import sys
out = open('Config1.txt','w')
print "Enter the Data Source Type:"
print "1. MySql"
print "2. Text"
print "3. Exit"
while(1):
data1 = sys.stdin.readline().strip()
if(int(data1) == 1):
out.write("source begin"+"\n"+"type=mysql\n")
print "Enter the ip:"
ip = sys.stdin.readline().strip()
out.write("host=" + ip + "\n")
print "Enter the database name:"
db = sys.stdin.readline().strip()
out.write("database=" + db + "\n")
print "Enter the user name:"
usr = sys.stdin.readline().strip()
out.write("user=" + usr + "\n")
print "Enter the password:"
passwd = sys.stdin.readline().strip()
out.write("password=" + passwd + "\n")
connection = MySQLdb.connect(ip, usr, passwd, db)
cursor = connection.cursor()
query = "show tables"
cursor.execute(query)
data = cursor.fetchall()
tables = []
for row in data:
for field in row:
tables.append(field.strip())
for i in range(len(tables)):
print i, tables[i]
tb = tables[int(sys.stdin.readline().strip())]
out.write("table=" + tb + "\n")
query = "describe " + tb
cursor.execute(query)
data = cursor.fetchall()
columns = []
for row in data:
columns.append(row[0].strip())
for i in range(len(columns)):
print columns[i]
print "Not index choose the exact column names seperated by coma"
cols = sys.stdin.readline().strip()
out.write("columns=" + cols + "\n")
cursor.close()
connection.close()
out.write("source end"+"\n")
print "Enter the Data Source Type:"
print "1. MySql"
print "2. Text"
print "3. Exit"
if(int(data1) == 2):
print "path of text file:"
path = sys.stdin.readline().strip()
file = open(path)
count = 0
for line in file:
print line
count = count + 1
if count > 3:
break
file.close()
out.write("source begin"+"\n"+"type=text\n")
out.write("path=" + path + "\n")
print "enter delimeter:"
dlm = sys.stdin.readline().strip()
out.write("dlm=" + dlm + "\n")
print "enter column indexes seperated by comma:"
cols = sys.stdin.readline().strip()
out.write("columns=" + cols + "\n")
out.write("source end"+"\n")
print "Enter the Data Source Type:"
print "1. MySql"
print "2. Text"
print "3. Exit"
if(int(data1) == 3):
out.close()
sys.exit()
02 Elasticsearch
Elasticsearch(ES)低级客户端提供从Python到ES REST端点的直接映射。Elasticsearch的一大优势是为数据分析提供了全栈解决方案。Elasticsearch作为数据库,有可配置前端Kibana、数据收集工具Logstash以及企业安全工具Shield。
下例具有称为cat、cluster、indices、ingest、nodes、snapshot和tasks的特征,根据任务分别转换为CatClient、ClusterClient、IndicesClient、IngestClient、NodesClient、SnapshotClient和TasksClient实例。这些实例是访问这些类及其方法的唯一方式。
你可以指定自己的连接类,可以通过提供的connection_class参数来使用。
Es1=Elasticsearch(connection_class=ThriftConnection)
如果你想打开sniffing,那么有几个选择:
Es1=Elasticsearch(
['esnode1', 'esnode2'],
sniff_on_start=True,
sniff_on_connection_fail=True,
sniffer_timeout=30
)
不同的主机可以有不同的参数,你可以为每个节点使用一个字典来指定它们。
Es1=Elasticsearch([
{'host':'localhost'},
{'host':'othernode','port':443,'url_prefix':'es','use_ssl':True},
])
还支持SSL客户端身份验证(有关选项的详细说明,请参阅Urllib3
HttpConnection)。
Es1=Elasticsearch(
['localhost:443','other_host:443'],
use_ssl=True,
verify_certs=True,
ca_certs='path to CA_certs',
client_cert='path to clientcert.pem',
client_key='path to clientkey.pem'
)
许多类负责处理Elasticsearch集群。这里可以通过将参数传递给Elasticsearch类来忽略正在使用的默认子类。属于客户端的每个参数都将添加到Transport、ConnectionPool和Connection上。
例如,如果你要使用定制的ConnectionSelector类,只需传入selector_class参数即可。
整个API以很高的精确度包装了原始REST API,其中包括区分调用必需参数和可选参数。这意味着代码区分了按排位的参数和关键字参数。建议读者使用关键字参数来保证所有调用的一致性和安全性。
如果Elasticsearch返回2XX,则API调用成功(并将返回响应)。否则,将引发TransportError(或更具体的子类)的实例。你可以在异常中查看其他异常和错误状态。如果你不希望引发异常,可以通过传入ignore参数忽略状态代码或状态代码列表。
from elasticsearch import Elasticsearch
es=Elasticsearch()
es.indices.create(index='test-index',ignore=400)
es.indices.delete(index='test-index',ignore=[400,404])
03 Neo4j Python驱动
Neo4j支持Neo4j Python驱动,并通过二进制协议与数据库连接。它试图保持简约及Python的惯用方式。
pip install neo4j-driver
from neo4j.v1 import GraphDatabase, basic_auth
driver11 = GraphDatabase.driver("bolt://localhost", auth=basic_auth("neo4j", "neo4j"))
session11 = driver11.session()
session11.run("CREATE (a:Person {name:'Sayan',title:'Mukhopadhyay'})")
result11= session11.run("MATCH (a:Person) WHERE a.name ='Sayan' RETURN a.name AS name, a.title AS title")
for recordi n result11:
print("%s %s"% (record["title"], record["name"]))
session11.close()
04 neo4j-rest-client
neo4j-rest-client的主要目标是确保已经使用本地Neo4j的Python程序员通过python-embedded的方式也能够访问Neo4j REST服务器。因此,neo4j-rest-client API的结构与python-embedded完全同步。但是引入了一种新的结构,以达到更加Python化的风格,并通过Neo4j团队引入的新特性来增强API。
05 内存数据库
另一个重要的数据库类是内存数据库。它在RAM中存储和处理数据。因此,对数据库的操作非常快,并且数据是灵活的。SQLite是内存数据库的一个流行范例。在Python中,需要使用sqlalchemy库来操作SQLite。在第1章的Flask和Falcon示例中,展示了如何从SQLite中选择数据。以下将展示如何在SQLite中存储Pandas数据框架:
from sqlalchemy import create_engine
import sqlite3
conn = sqlite3.connect('multiplier.db')
conn.execute('''CREATE TABLE if not exists multiplier
(domain CHAR(50),
low REAL,
high REAL);''')
conn.close()
db_name = "sqlite:///" + prop + "_" + domain + str(i) + ".db"
disk_engine = create_engine(db_name)
df.to_sql('scores', disk_engine, if_exists='replace')
06 Python版本MongoDB
这部分内容请见此前的文章
数据处理入门干货:
MongoDB和pandas极简教程
。
关于作者:
Sayan Mukhopadhyay拥有超过13年的行业经验,并与瑞信、PayPal、CA Technologies、CSC和Mphasis等公司建立了联系。他对投资银行、在线支付、在线广告、IT架构和零售等领域的数据分析应用有着深刻的理解。他的专业领域是在分布式和数据驱动的环境(如实时分析、高频交易等)中,实现高性能计算。
本文摘编自《
Python高级数据分析:机器学习、深度学习和NLP实例
》,经出版方授权发布。
延伸阅读《
Python高级数据分析
》
点击上图了解及购买
转载请联系微信:DoctorData
推荐语:
本书介绍高级数据分析概念的广泛基础,以及最近的数据库革命,如Neo4j、弹性搜索和MongoDB。本书讨论了如何实现包括局部爬取在内的ETL技术,并应用于高频算法交易和目标导向的对话系统等领域。还有一些机器学习概念的例子,如半监督学习、深度学习和NLP。本书还涵盖了重要的传统数据分析技术,如时间序列和主成分分析等。
你想免费读这本《Python高级数据分析:机器学习、深度学习和NLP实例》吗?
「大数据」内容合伙人之「鉴书小分队」上线啦!
最近,你都在读什么书?
有哪些心得体会想要跟大家分享?
数据叔最近搞了个大事——联合优质图书出版商机械工业出版社华章公司发起鉴书活动。
简单说就是:
你可以免费读新书,你可以免费读新书的同时,顺手码一篇读书笔记就行。
详情请在大数据公众号后台对话框回复
合伙人
查看。
京东图书大促开始啦!
每满100-50!
满200-100!满300-150!
长按二维码进入唯一领券入口,领取大数据粉丝专属“
200减35
”叠加满减券。
是的!你没看错!相当于
165块买400块
的书!!!(打了多少折你敲敲旁边的计算器……)
长按下方二维码
或点击
阅读原文
发现更多好书
PPT
|
报告
|
读书
|
书单
|
干货
大数据
|
揭秘
|
Python
|
可视化
人工智能
|
机器学习
|
深度学习
|
神经网络
AI
|
1024
|
段子
|
区块链
|
数学
Q:
你常用哪些数据库?
欢迎留言并赢取福利
觉得不错,请把这篇文章分享给你的朋友
转载 / 投稿请联系:baiyu@hzbook.com
更多精彩,请在后台点击“历史文章”查看
点击
阅读原文
,了解更多
导读:每个数据科学专业人员都必须从不同的数据源中提取、转换和加载(Extract-Transform-Load,ETL)数据。本文将讨论如何使用Python为选定的流行数据库实现数据的ETL。对于关系数据库,选择MySQL,并将Elasticsearch作为文档数据库的例子展开。对于图形数据库,选择Neo4j。对于NoSQL,可参考此前文章中介绍的MongoDB。作者:萨扬...
import sqlite3
import pymysql
from py
etl
import Task, DatabaseReader, DatabaseWriter, ElasticsearchWriter, FileWriter
src = sqlite3.connect("file.db")
reader = DatabaseReader(src, table_name="source_table")
#
数据库
之间
数据
同步,表到表传输
dst = pymysql.connect(host="localhost", user="your_user", password="your_password", db="tes
import pandas as pd
import time
data = pd.read_excel('
ETL
_
数据
清洗挑战.xlsx','测试
数据
',dtype=str)#读取
数据
data_dict = data.to_dict(orient = 'dict')#将
数据
转换
为字典
#print(data['CHECK_POINT'])
listDate = []#创建列表并初始化
for cell in data_dict['CHECK_POINT'].values():#遍
ETL
的考虑
做
数据
仓库
系统
,
ETL
是关键的一环。说大了,
ETL
是
数据
整合解决方案,说小了,就是倒
数据
的工具。回忆一下工作这么些年来,处理
数据
迁移、
转换
的工作倒 还真的不少。但是那些工作基本上是一次性工作或者很小
数据
量,使用access、DTS或是自己编个小程序搞定。可是在
数据
仓库
系统
中,
ETL
上升到了一 定的理论高度,和原来小打小闹的工具使用不同了。究竟什么不同,从名字上就可以看到,人家已经将倒
数据
的过程分成3个步骤,E、T、L分别代表
抽取
、
转换
和装载。
其 实
ETL
过程就是
数据
流动的过程,从不同的
数据
源流向不同的目标
数据
。但在
数据
仓库中,
ETL
有几个特点,一是数
ETL
(Extract, Transform, Load)是
数据
仓库中常用的一种
数据
处理流程。在
Python
中,有许多工具和库可以用于
ETL
任务。
最常用的是使用
Python
的pandas库来进行
数据
提取
、
转换
和
加载
。下面是一个简单的示例代码:
```
python
import pandas as pd
# 从
数据
源
提取
数据
data = pd.read_csv('input.csv')
#
数据
转换
data['new_column'] = data['old_column'] * 2
#
加载
数据
到目标位置
data.to_csv('output.csv', index=False)
在这个例子中,我们使用了pandas库来读取一个CSV文件(
数据
提取
),对
数据
进行简单的
转换
(
数据
转换
),然后将处理后的
数据
保存到另一个CSV文件中(
数据
加载
)。
除了pandas,
Python
还有其他一些常用的
ETL
工具和库,例如:
- Apache Airflow:用于编排和调度
ETL
任务的开源工具。
- Apache Spark:用于大规模
数据
处理和分析的快速通用引擎,可以通过PySpark(
Python
API)进行
ETL
操作。
- SQLAlchemy:用于
数据库
访问和操作的
Python
ORM(对象关系映射)工具。
这只是一些常见的工具和库,实际上还有许多其他选择,具体取决于你的需求和项目要求。