本快速入门演示如何使用 Python 代码连接到群集以及如何使用 SQL 语句创建表。 然后演示如何在数据库中插入、查询、更新和删除数据。 本文中的步骤假定你熟悉 Python 开发,但不熟悉 Azure Cosmos DB for PostgreSQL 的使用。

安装 PostgreSQL 库

本文中的代码示例需要 psycopg2 库。 你需要使用语言包管理器(例如 pip)安装 psycopg2。

进行连接,创建表,然后插入数据

下面的代码示例创建通向 Postgres 数据库的 连接池 。 然后,该代码使用 cursor.execute 函数以及 SQL CREATE TABLE 和 INSERT INTO 语句来创建表并插入数据。

下面的示例代码使用连接池来创建和管理与 PostgreSQL 的连接。 强烈建议使用应用程序端连接池,因为:

  • 它可确保应用程序不会生成太多通向数据库的连接,从而避免超过连接限制。
  • 这有助于大幅提高性能,包括延迟和吞吐量。 PostgreSQL 服务器进程必须创建分支来处理每个新连接,而重用连接可避免这项开销。
  • 在以下代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。

    此示例将在结束时关闭连接,因此,如果要在同一会话中运行文章中的其他示例,请在运行此示例时不要包括 # Clean up 部分。

    import psycopg2
    from psycopg2 import pool
    # NOTE: fill in these variables for your own cluster
    host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
    dbname = "citus"
    user = "citus"
    password = "<password>"
    sslmode = "require"
    # Build a connection string from the variables
    conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)
    postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
    if (postgreSQL_pool):
        print("Connection pool created successfully")
    # Use getconn() to get a connection from the connection pool
    conn = postgreSQL_pool.getconn()
    cursor = conn.cursor()
    # Drop previous table of same name if one exists
    cursor.execute("DROP TABLE IF EXISTS pharmacy;")
    print("Finished dropping table (if existed)")
    # Create a table
    cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
    print("Finished creating table")
    # Create a index
    cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
    print("Finished creating index")
    # Insert some data into the table
    cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
    cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
    print("Inserted 2 rows of data")
    # Clean up
    conn.commit()
    cursor.close()
    conn.close()
    

    如果代码成功运行,则会生成以下输出:

    Connection established
    Finished dropping table
    Finished creating table
    Finished creating index
    Inserted 2 rows of data
    

    Azure Cosmos DB for PostgreSQL 可为你提供跨多个节点分发表的强大功能,以实现可伸缩性。 可以使用以下命令来分配表。 可以在此处详细了解 create_distributed_table 和分布列。

    通过分发表,它们可在添加到群集的任何工作器节点之间增长。

    # Create distributed table
    cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
    print("Finished distributing the table")
    

    下面的代码示例使用以下 API 从数据库读取数据:

  • cursor.execute 和 SQL SELECT 语句来读取数据。
  • cursor.fetchall(),以接受查询并返回要循环访问的结果集。
  • # Fetch all rows from table
    cursor.execute("SELECT * FROM pharmacy;")
    rows = cursor.fetchall()
    # Print all rows
    for row in rows:
        print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))
    

    下面的代码示例使用 cursor.execute 和 SQL UPDATE 语句来更新数据。

    # Update a data row in the table
    cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
    print("Updated 1 row of data")
    

    下面的代码示例运行 cursor.execute 和 SQL DELETE 语句来删除数据。

    # Delete data row from table
    cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
    print("Deleted 1 row of data")
    

    用于快速引入的 COPY 命令

    在将数据引入 Azure Cosmos DB for PostgreSQL 时,COPY 命令可能会产生巨大的吞吐量。 COPY 命令可以引入文件中的数据,也可以使用内存中的微批数据进行实时引入。

    用于从文件加载数据的 COPY 命令

    以下代码将数据从 CSV 文件复制到数据库表。 该代码需要使用 pharmacies.csv 文件。

    with open('pharmacies.csv', 'r') as f:
        # Notice that we don't need the `csv` module.
        next(f) # Skip the header row.
        cursor.copy_from(f, 'pharmacy', sep=',')
        print("copying data completed")
    

    用于加载内存中数据的 COPY 命令

    以下代码将内存中数据复制到表。

    data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
    buf = io.StringIO()
    writer = csv.writer(buf)
    writer.writerows(data)
    buf.seek(0)
    with conn.cursor() as cur:
        cur.copy_from(buf, "pharmacy", sep=",")
    conn.commit()
    conn.close()
    

    针对数据库请求失败情况的应用重试

    有时,来自应用程序的数据库请求可能会失败。 此类问题可能在不同的场景下发生,例如应用和数据库之间的网络故障、密码错误等。有些问题可能是暂时的,并且在几秒到几分钟内自行解决。 可以在应用中配置重试逻辑以克服暂时性错误。

    在应用中配置重试逻辑有助于改善最终用户体验。 在故障情况下,用户只会等待应用程序处理请求的时间稍长,而不会遇到错误。

    下面的示例演示如何在应用中实现重试逻辑。 示例代码片段每 60 秒尝试一次数据库请求(最多 5 次),直到成功为止。 可以根据应用程序的需求配置重试次数和频率。

    在此代码中,将 <cluster> 替换为群集名称,将 <password> 替换为管理员密码。

    import psycopg2
    import time
    from psycopg2 import pool
    host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
    dbname = "citus"
    user = "citus"
    password = "<password>"
    sslmode = "require"
    conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
            host, user, dbname, password, sslmode)
    postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)
    def executeRetry(query, retryCount):
        for x in range(retryCount):
                if (postgreSQL_pool):
                    # Use getconn() to Get Connection from connection pool
                    conn = postgreSQL_pool.getconn()
                    cursor = conn.cursor()
                    cursor.execute(query)
                    return cursor.fetchall()
                break
            except Exception as err:
                print(err)
                postgreSQL_pool.putconn(conn)
                time.sleep(60)
        return None
    print(executeRetry("select 1", 5))
    
  • 了解 Azure Cosmos DB for PostgreSQL API 如何扩展 PostgreSQL,并尝试使用有用的诊断查询
  • 为工作负载选择最佳群集大小
  • 监视群集性能
  •