【51CTO.com快译】 介绍
MySQL 是一个 RDBMS 平台,它以规范化的方式以表格格式存储数据,而 MongoDB 是一个 NoSQL 数据库,它以无模式的方式将信息存储为按集合分组的文档。数据的表示方式完全不同,因此将 MySQL 表数据迁移到 MongoDB 集合听起来可能是一项艰巨的任务。但是,Python 凭借其强大的连接性和数据处理能力让这一切变得轻而易举。
在本文中,将详细的讲解如何使用简单的 Python 脚本将 MySQL 表数据迁移到 MongoDB 集合所需的步骤。这些脚本是在 Windows 上使用 Python 3.9.5 开发的。但是,它应该适用于任何平台上的任何 Python 3+ 版本一起使用。
步骤 1:安装所需的模块
第一步是安装连接MySQL 和 MongoDB 数据库实例所需的模块。我们将使用 mysql.connector 连接到 MySQL 数据库。对于 MongoDB,使用 pymongo ,这是从 Python 连接到 MongoDB 的推荐模块。
如果所需模块尚未安装,请运行以下PIP命令来安装它们。
- pip 安装 mysql 连接器 pip 安装 pymongo
PIP 是 Python 包或模块的包管理器。
步骤2:从MySQL表中读取数据
第一步是从源 MySQL 表中读取数据,并以可用于将数据加载到目标 MongoDB 数据库中的格式进行准备。MongoDB 是一个 NoSQL 数据库,它将数据存储为 JSON 文档,因此最好以 JSON 格式生成源数据。值得一提的是,Python 具有强大的数据处理能力,可以轻松地将数据转换为 JSON 格式。
- import mysql.connector
- mysqldb = mysql.connector.connect( host="localhost", database="employees", user="root", password="" )
- mycursor = mysqldb.cursor(dictionary=True) mycursor.execute("SELECT * from categories;") myresult = mycursor.fetchall()
- print(myresult)
当脚本在没有任何错误的情况下完成时,输出的结果如下:
- [
- {
- "id":4,
- "name":"Medicine",
- "description":"<p>Medicine<br></p>",
- "created_at":"",
- "updated_at":""
- },
- {
- "id":6,
- "name":"Food",
- "description":"<p>Food</p>",
- "created_at":"",
- "updated_at":""
- },
- {
- "id":8,
- "name":"Groceries",
- "description":"<p>Groceries<br></p>",
- "created_at":"",
- "updated_at":""
- },
- {
- "id":9,
- "name":"Cakes & Bakes",
- "description":"<p>Cakes & Bakes<br></p>",
- "created_at":d"",
- "updated_at":""
- }
- ]
请注意,输出的是一个 JSON 数组,因为我们将 dictionary=True 参数传递给了游标。否则,结果将采用列表格式。现在有了 JSON 格式的源数据,就可以迁移到 MongoDB 集合。
步骤3:写入 MongoDB 集合
获得 JSON 格式的源数据后,下一步是将数据插入到 MongoDB 集合中。集合是一组文档,相当于 RDBMS 中表(或关系)。我可以通过调用 insert_many() 集合类的方法来实现,该方法返回插入文档的对象 ID 列表。请注意,当作为参数传递空列表时,此方法将引发异常,因此在方法调用之前进行长度检查。
- import pymongo
- mongodb_host = "mongodb://localhost:27017/"
- mongodb_dbname = "mymongodb"
- myclient = pymongo.MongoClient(mongodb_host)
- mydb = myclient[mongodb_dbname]
- mycol = mydb["categories"]
- if len(myresult) > 0:
- x = mycol.insert_many(myresult) #myresult comes from mysql cursor
- print(len(x.inserted_ids))
完成此步骤后,检查一下 MongoDB 实例,以验证数据库和集合是否已创建,文档是否已插入。注意MongoDB 是无模式的,这就意味着不必定义模式来插入文档,模式是动态推断并自动创建的。MongoDB 还可以创建代码中引用的数据库和集合(如果它们还不存在的话)。
步骤4:把数据放在一起
下面是从 MySQL 中读取表并将其插入到 MongoDB 中集合的完整脚本。
- import mysql.connector
- import pymongo
- delete_existing_documents = True
- mysql_host="localhost"
- mysql_database="mydatabase"
- mysql_schema = "myschema"
- mysql_user="myuser"
- mysql_password="********"
- mongodb_host = "mongodb://localhost:27017/"
- mongodb_dbname = "mymongodb"
- mysqldb = mysql.connector.connect(
- host=mysql_host,
- database=mysql_database,
- user=mysql_user,
- password=mysql_password
- )
- mycursor = mysqldb.cursor(dictionary=True)
- mycursor.execute("SELECT * from categories;")
- myresult = mycursor.fetchall()
- myclient = pymongo.MongoClient(mongodb_host)
- mydb = myclient[mongodb_dbname]
- mycol = mydb["categories"]
- if len(myresult) > 0:
- x = mycol.insert_many(myresult) #myresult comes from mysql cursor
- print(len(x.inserted_ids))
步骤 5:增强脚本以加载 MySQL 架构中的所有表
该脚本从 MySQL 中读取一个表,并将结果加载到 MongoDB 集合中。然后,下一步是遍历源数据库中所有表的列表,并将结果加载到新的 MySQL 集合中。我们可以通过查询information_schema.tables元数据表来实现这一点,该表提供给定模式中的表列表。然后可以遍历结果并调用上面的脚本来迁移每个表的数据。
- #Iterate through the list of tables in the schema
- table_list_cursor = mysqldb.cursor()
- table_list_cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name;", (mysql_schema,))
- tables = table_list_cursor.fetchall()
- for table in tables:
- #Execute the migration script for 'table'
也可以通过将迁移逻辑抽象为一个函数来实现这一点。
- #Function migrate_table
- def migrate_table(db, col_name):
- mycursor = db.cursor(dictionary=True)
- mycursor.execute("SELECT * FROM " + col_name + ";")
- myresult = mycursor.fetchall()
- mycol = mydb[col_name]
- if delete_existing_documents:
- #delete all documents in the collection
- mycol.delete_many({})
- #insert the documents
- if len(myresult) > 0:
- x = mycol.insert_many(myresult)
- return len(x.inserted_ids)
- else:
- return 0
步骤6:输出脚本进度并使其可读
脚本的进度是通过使用print 语句来传达的。通过颜色编码使输出易于阅读。例如,以将成功语句打印为绿色,将失败语句打印为红色。
- class bcolors:
- HEADER = '\033[95m'
- OKBLUE = '\033[94m'
- OKCYAN = '\033[96m'
- OKGREEN = '\033[92m'
- WARNING = '\033[93m'
- FAIL = '\033[91m'
- ENDC = '\033[0m'
- BOLD = '\033[1m'
- UNDERLINE = '\033[4m'
- print(f"{bcolors.HEADER}This is a header{bcolors.ENDC}")
- print(f"{bcolors.OKBLUE}This prints in blue{bcolors.ENDC}")
- print(f"{bcolors.OKGREEN}This message is green{bcolors.ENDC}")
完整的脚本
- import mysql.connector
- import pymongo
- import datetime
- class bcolors:
- HEADER = '\033[95m'
- OKBLUE = '\033[94m'
- OKCYAN = '\033[96m'
- OKGREEN = '\033[92m'
- WARNING = '\033[93m'
- FAIL = '\033[91m'
- ENDC = '\033[0m'
- BOLD = '\033[1m'
- UNDERLINE = '\033[4m'
- begin_time = datetime.datetime.now()
- print(f"{bcolors.HEADER}Script started at: {begin_time} {bcolors.ENDC}")
- delete_existing_documents = True;
- mysql_host="localhost"
- mysql_database="mydatabase"
- mysql_schema = "myschhema"
- mysql_user="root"
- mysql_password=""
- mongodb_host = "mongodb://localhost:27017/"
- mongodb_dbname = "mymongodb"
- print(f"{bcolors.HEADER}Initializing database connections...{bcolors.ENDC}")
- print("")
- #MySQL connection
- print(f"{bcolors.HEADER}Connecting to MySQL server...{bcolors.ENDC}")
- mysqldb = mysql.connector.connect(
- host=mysql_host,
- database=mysql_database,
- user=mysql_user,
- password=mysql_password
- )
- print(f"{bcolors.HEADER}Connection to MySQL Server succeeded.{bcolors.ENDC}")
- #MongoDB connection
- print(f"{bcolors.HEADER}Connecting to MongoDB server...{bcolors.ENDC}")
- myclient = pymongo.MongoClient(mongodb_host)
- mydb = myclient[mongodb_dbname]
- print(f"{bcolors.HEADER}Connection to MongoDB Server succeeded.{bcolors.ENDC}")
- print(f"{bcolors.HEADER}Database connections initialized successfully.{bcolors.ENDC}")
- #Start migration
- print(f"{bcolors.HEADER}Migration started...{bcolors.ENDC}")
- dblist = myclient.list_database_names()
- if mongodb_dbname in dblist:
- print(f"{bcolors.OKBLUE}The database exists.{bcolors.ENDC}")
- else:
- print(f"{bcolors.WARNING}The database does not exist, it is being created.{bcolors.ENDC}")
- #Function migrate_table
- def migrate_table(db, col_name):
- mycursor = db.cursor(dictionary=True)
- mycursor.execute("SELECT * FROM " + col_name + ";")
- myresult = mycursor.fetchall()
- mycol = mydb[col_name]
- if delete_existing_documents:
- #delete all documents in the collection
- mycol.delete_many({})
- #insert the documents
- if len(myresult) > 0:
- x = mycol.insert_many(myresult)
- return len(x.inserted_ids)
- else:
- return 0
- #Iterate through the list of tables in the schema
- table_list_cursor = mysqldb.cursor()
- table_list_cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name LIMIT 15;", (mysql_schema,))
- tables = table_list_cursor.fetchall()
- total_count = len(tables)
- success_count = 0
- fail_count = 0
- for table in tables:
- try:
- print(f"{bcolors.OKCYAN}Processing table: {table[0]}...{bcolors.ENDC}")
- inserted_count = migrate_table(mysqldb, table[0])
- print(f"{bcolors.OKGREEN}Processing table: {table[0]} completed. {inserted_count} documents inserted.{bcolors.ENDC}")
- success_count += 1
- except Exception as e:
- print(f"{bcolors.FAIL} {e} {bcolors.ENDC}")
- fail_count += 1
- print("")
- print("Migration completed.")
- print(f"{bcolors.OKGREEN}{success_count} of {total_count} tables migrated successfully.{bcolors.ENDC}")
- if fail_count > 0:
- print(f"{bcolors.FAIL}Migration of {fail_count} tables failed. See errors above.{bcolors.ENDC}")
- end_time = datetime.datetime.now()
- print(f"{bcolors.HEADER}Script completed at: {end_time} {bcolors.ENDC}")
- print(f"{bcolors.HEADER}Total execution time: {end_time-begin_time} {bcolors.ENDC}")
该脚本适用于中小型 MySQL 数据库,它有几百个表,每个表有几千行。对于具有数百万行的大型数据库,性能可能会受到影响。在开始实际迁移之前,请在表列表查询和实际表select查询上使用 LIMIT 关键字对有限行进行检测。
带点击此处从 GitHub 下载整个脚本:
https://github.com/zshameel/MySQL2MongoDB
【51CTO译稿,合作站点转载请注明原文译者和出处为51CTO.com】