相关文章推荐
完美的牛肉面  ·  北京市广播电视局·  3 月前    · 
爱看球的显示器  ·  黎书华教授主页·  4 月前    · 
干练的滑板  ·  我还想用 Kindle ...·  4 月前    · 
犯傻的水龙头  ·  openwrt 还原设置-掘金·  8 月前    · 
注册/登录

如何使用Python将MySQL表数据迁移到MongoDB集合

译文
云计算 MongoDB
本文详细的讲解了如何在需要的地方获取MySQL 数据。

[[410477]]

【51CTO.com快译】 介绍

MySQL 是一个 RDBMS 平台,它以规范化的方式以表格格式存储数据,而 MongoDB 是一个 NoSQL 数据库,它以无模式的方式将信息存储为按集合分组的文档。数据的表示方式完全不同,因此将 MySQL 表数据迁移到 MongoDB 集合听起来可能是一项艰巨的任务。但是,Python 凭借其强大的连接性和数据处理能力让这一切变得轻而易举。

在本文中,将详细的讲解如何使用简单的 Python 脚本将 MySQL 表数据迁移到 MongoDB 集合所需的步骤。这些脚本是在 Windows 上使用 Python 3.9.5 开发的。但是,它应该适用于任何平台上的任何 Python 3+ 版本一起使用。

步骤 1:安装所需的模块

第一步是安装连接MySQL 和 MongoDB 数据库实例所需的模块。我们将使用 mysql.connector 连接到 MySQL 数据库。对于 MongoDB,使用 pymongo ,这是从 Python 连接到 MongoDB 的推荐模块。

如果所需模块尚未安装,请运行以下PIP命令来安装它们。

  1. pip 安装 mysql 连接器 pip 安装 pymongo 

PIP 是 Python 包或模块的包管理器。

步骤2:从MySQL表中读取数据

第一步是从源 MySQL 表中读取数据,并以可用于将数据加载到目标 MongoDB 数据库中的格式进行准备。MongoDB 是一个 NoSQL 数据库,它将数据存储为 JSON 文档,因此最好以 JSON 格式生成源数据。值得一提的是,Python 具有强大的数据处理能力,可以轻松地将数据转换为 JSON 格式。

  1. import mysql.connector  
  2. mysqldb = mysql.connector.connect(    host="localhost",    database="employees",    user="root",    password="" ) 
  3. mycursor = mysqldb.cursor(dictionary=True) mycursor.execute("SELECT * from categories;") myresult = mycursor.fetchall()  
  4. print(myresult) 

当脚本在没有任何错误的情况下完成时,输出的结果如下:

  1.    { 
  2.       "id":4
  3.       "name":"Medicine"
  4.       "description":"<p>Medicine<br></p>"
  5.       "created_at":""
  6.       "updated_at":"" 
  7.    }, 
  8.    { 
  9.       "id":6
  10.       "name":"Food"
  11.       "description":"<p>Food</p>"
  12.       "created_at":""
  13.       "updated_at":"" 
  14.    }, 
  15.    { 
  16.       "id":8
  17.       "name":"Groceries"
  18.       "description":"<p>Groceries<br></p>"
  19.       "created_at":""
  20.       "updated_at":"" 
  21.    }, 
  22.    { 
  23.       "id":9
  24.       "name":"Cakes & Bakes"
  25.       "description":"<p>Cakes & Bakes<br></p>"
  26.       "created_at":d""
  27.       "updated_at":"" 
  28.    } 

请注意,输出的是一个 JSON 数组,因为我们将 dictionary=True 参数传递给了游标。否则,结果将采用列表格式。现在有了 JSON 格式的源数据,就可以迁移到 MongoDB 集合。

步骤3:写入 MongoDB 集合

获得 JSON 格式的源数据后,下一步是将数据插入到 MongoDB 集合中。集合是一组文档,相当于 RDBMS 中表(或关系)。我可以通过调用 insert_many() 集合类的方法来实现,该方法返回插入文档的对象 ID 列表。请注意,当作为参数传递空列表时,此方法将引发异常,因此在方法调用之前进行长度检查。

  1. import pymongo 
  2.  
  3. mongodb_host = "mongodb://localhost:27017/" 
  4. mongodb_dbname = "mymongodb" 
  5.  
  6. myclient = pymongo.MongoClient(mongodb_host) 
  7. mydb = myclient[mongodb_dbname] 
  8. mycol = mydb["categories"
  9.  
  10. if len(myresult) > 0
  11.         x = mycol.insert_many(myresult) #myresult comes from mysql cursor 
  12.         print(len(x.inserted_ids)) 

完成此步骤后,检查一下 MongoDB 实例,以验证数据库和集合是否已创建,文档是否已插入。注意MongoDB 是无模式的,这就意味着不必定义模式来插入文档,模式是动态推断并自动创建的。MongoDB 还可以创建代码中引用的数据库和集合(如果它们还不存在的话)。

步骤4:把数据放在一起

下面是从 MySQL 中读取表并将其插入到 MongoDB 中集合的完整脚本。

  1. import mysql.connector 
  2. import pymongo 
  3.  
  4. delete_existing_documents = True 
  5. mysql_host="localhost" 
  6. mysql_database="mydatabase" 
  7. mysql_schema = "myschema" 
  8. mysql_user="myuser" 
  9. mysql_password="********" 
  10.  
  11. mongodb_host = "mongodb://localhost:27017/" 
  12. mongodb_dbname = "mymongodb" 
  13.  
  14. mysqldb = mysql.connector.connect( 
  15.     host=mysql_host, 
  16.     database=mysql_database, 
  17.     user=mysql_user, 
  18.     password=mysql_password 
  19.  
  20. mycursor = mysqldb.cursor(dictionary=True
  21. mycursor.execute("SELECT * from categories;"
  22. myresult = mycursor.fetchall() 
  23.  
  24. myclient = pymongo.MongoClient(mongodb_host) 
  25. mydb = myclient[mongodb_dbname] 
  26. mycol = mydb["categories"
  27.  
  28. if len(myresult) > 0
  29.         x = mycol.insert_many(myresult) #myresult comes from mysql cursor 
  30.         print(len(x.inserted_ids)) 

步骤 5:增强脚本以加载 MySQL 架构中的所有表

该脚本从 MySQL 中读取一个表,并将结果加载到 MongoDB 集合中。然后,下一步是遍历源数据库中所有表的列表,并将结果加载到新的 MySQL 集合中。我们可以通过查询information_schema.tables元数据表来实现这一点,该表提供给定模式中的表列表。然后可以遍历结果并调用上面的脚本来迁移每个表的数据。

  1. #Iterate through the list of tables in the schema 
  2. table_list_cursor = mysqldb.cursor() 
  3. table_list_cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name;", (mysql_schema,)) 
  4. tables = table_list_cursor.fetchall() 
  5.  
  6. for table in tables: 
  7.     #Execute the migration script for 'table' 

也可以通过将迁移逻辑抽象为一个函数来实现这一点。

  1. #Function migrate_table  
  2. def migrate_table(db, col_name): 
  3.     mycursor = db.cursor(dictionary=True
  4.     mycursor.execute("SELECT * FROM " + col_name + ";"
  5.     myresult = mycursor.fetchall() 
  6.  
  7.     mycol = mydb[col_name] 
  8.      
  9.     if delete_existing_documents: 
  10.         #delete all documents in the collection 
  11.         mycol.delete_many({}) 
  12.  
  13.     #insert the documents 
  14.     if len(myresult) > 0
  15.         x = mycol.insert_many(myresult) 
  16.         return len(x.inserted_ids) 
  17.     else
  18.         return 0 

步骤6:输出脚本进度并使其可读

脚本的进度是通过使用print 语句来传达的。通过颜色编码使输出易于阅读。例如,以将成功语句打印为绿色,将失败语句打印为红色。

  1. class bcolors: 
  2.     HEADER = '\033[95m' 
  3.     OKBLUE = '\033[94m' 
  4.     OKCYAN = '\033[96m' 
  5.     OKGREEN = '\033[92m' 
  6.     WARNING = '\033[93m' 
  7.     FAIL = '\033[91m' 
  8.     ENDC = '\033[0m' 
  9.     BOLD = '\033[1m' 
  10.     UNDERLINE = '\033[4m' 
  11.  
  12. print(f"{bcolors.HEADER}This is a header{bcolors.ENDC}"
  13. print(f"{bcolors.OKBLUE}This prints in blue{bcolors.ENDC}"
  14. print(f"{bcolors.OKGREEN}This message is green{bcolors.ENDC}"
  • 源 MySQL 数据库
  • 迁移后的目标 MongoDB 数据库
  • 在VSCode 中的 Python 脚本和输出
  • 完整的脚本

    1. import mysql.connector 
    2. import pymongo 
    3. import datetime 
    4.  
    5. class bcolors: 
    6.     HEADER = '\033[95m' 
    7.     OKBLUE = '\033[94m' 
    8.     OKCYAN = '\033[96m' 
    9.     OKGREEN = '\033[92m' 
    10.     WARNING = '\033[93m' 
    11.     FAIL = '\033[91m' 
    12.     ENDC = '\033[0m' 
    13.     BOLD = '\033[1m' 
    14.     UNDERLINE = '\033[4m' 
    15.  
    16. begin_time = datetime.datetime.now() 
    17. print(f"{bcolors.HEADER}Script started at: {begin_time} {bcolors.ENDC}"
    18.  
    19. delete_existing_documents = True
    20. mysql_host="localhost" 
    21. mysql_database="mydatabase" 
    22. mysql_schema = "myschhema" 
    23. mysql_user="root" 
    24. mysql_password="" 
    25.  
    26. mongodb_host = "mongodb://localhost:27017/" 
    27. mongodb_dbname = "mymongodb" 
    28.  
    29. print(f"{bcolors.HEADER}Initializing database connections...{bcolors.ENDC}"
    30. print("") 
    31.  
    32. #MySQL connection 
    33. print(f"{bcolors.HEADER}Connecting to MySQL server...{bcolors.ENDC}"
    34. mysqldb = mysql.connector.connect( 
    35.     host=mysql_host, 
    36.     database=mysql_database, 
    37.     user=mysql_user, 
    38.     password=mysql_password 
    39. print(f"{bcolors.HEADER}Connection to MySQL Server succeeded.{bcolors.ENDC}"
    40.  
    41. #MongoDB connection 
    42. print(f"{bcolors.HEADER}Connecting to MongoDB server...{bcolors.ENDC}"
    43. myclient = pymongo.MongoClient(mongodb_host) 
    44. mydb = myclient[mongodb_dbname] 
    45. print(f"{bcolors.HEADER}Connection to MongoDB Server succeeded.{bcolors.ENDC}"
    46.  
    47. print(f"{bcolors.HEADER}Database connections initialized successfully.{bcolors.ENDC}"
    48.  
    49. #Start migration 
    50. print(f"{bcolors.HEADER}Migration started...{bcolors.ENDC}"
    51. dblist = myclient.list_database_names() 
    52. if mongodb_dbname in dblist: 
    53.     print(f"{bcolors.OKBLUE}The database exists.{bcolors.ENDC}"
    54. else
    55.     print(f"{bcolors.WARNING}The database does not exist, it is being created.{bcolors.ENDC}"
    56.  
    57. #Function migrate_table  
    58. def migrate_table(db, col_name): 
    59.     mycursor = db.cursor(dictionary=True
    60.     mycursor.execute("SELECT * FROM " + col_name + ";"
    61.     myresult = mycursor.fetchall() 
    62.  
    63.     mycol = mydb[col_name] 
    64.      
    65.     if delete_existing_documents: 
    66.         #delete all documents in the collection 
    67.         mycol.delete_many({}) 
    68.  
    69.     #insert the documents 
    70.     if len(myresult) > 0
    71.         x = mycol.insert_many(myresult) 
    72.         return len(x.inserted_ids) 
    73.     else
    74.         return 0 
    75.  
    76. #Iterate through the list of tables in the schema 
    77. table_list_cursor = mysqldb.cursor() 
    78. table_list_cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name LIMIT 15;", (mysql_schema,)) 
    79. tables = table_list_cursor.fetchall() 
    80.  
    81. total_count = len(tables) 
    82. success_count = 0 
    83. fail_count = 0 
    84.  
    85. for table in tables: 
    86.     try
    87.         print(f"{bcolors.OKCYAN}Processing table: {table[0]}...{bcolors.ENDC}"
    88.         inserted_count = migrate_table(mysqldb, table[0]) 
    89.         print(f"{bcolors.OKGREEN}Processing table: {table[0]} completed. {inserted_count} documents inserted.{bcolors.ENDC}"
    90.         success_count += 1 
    91.     except Exception as e: 
    92.         print(f"{bcolors.FAIL} {e} {bcolors.ENDC}"
    93.         fail_count += 1 
    94.          
    95. print("") 
    96. print("Migration completed."
    97. print(f"{bcolors.OKGREEN}{success_count} of {total_count} tables migrated successfully.{bcolors.ENDC}"
    98. if fail_count > 0
    99.     print(f"{bcolors.FAIL}Migration of {fail_count} tables failed. See errors above.{bcolors.ENDC}"
    100.  
    101. end_time = datetime.datetime.now() 
    102. print(f"{bcolors.HEADER}Script completed at: {end_time} {bcolors.ENDC}"
    103. print(f"{bcolors.HEADER}Total execution time: {end_time-begin_time} {bcolors.ENDC}"

    该脚本适用于中小型 MySQL 数据库,它有几百个表,每个表有几千行。对于具有数百万行的大型数据库,性能可能会受到影响。在开始实际迁移之前,请在表列表查询和实际表select查询上使用 LIMIT 关键字对有限行进行检测。

    带点击此处从 GitHub 下载整个脚本:
    https://github.com/zshameel/MySQL2MongoDB

    【51CTO译稿,合作站点转载请注明原文译者和出处为51CTO.com】

    责任编辑:梁菲 DZone
    点赞
    收藏