Psycopg是Python用于操作PostgreSQL的库。Hologres兼容PostgreSQL 11,因此您可以通过psycopg访问Hologres。本文将指导您使用pyscopg2访问Hologres,示例使用的操作环境为基于CentOS 7系统的Python 3.8版本。

安装Python3.8

您可以基于Miniconda、Anaconda安装Python 3.8环境。如下内容以CentOS 7系统为例,安装Python 3.8版本。

  1. 安装Python 3.8。
    您可以下载对应版本的Python,执行如下命令进行安装。
    # yum install centos-release-scl
    # yum install rh-python38
    # scl enable rh-python38 bash
    # python --version
    Python 3.8.6
  2. 安装 psycopg2 模块。
    执行如下命令安装psycopg2模块。
     # pip install psycopg2-binary

连接Hologres

Python3.8环境和psycopg2安装完成之后,您可以执行如下操作并连接Hologres。

  1. 加载psycopg2。
    如果需要使用psycopg2,您可以执行命令 import psycopg2 加载安装的psycopg2。
  2. 创建数据库连接。
    您可以使用 psycopg2.connect() 函数连接Hologres,具体语法和参数说明如下所示。
    conn = psycopg2.connect(host="<Endpoint>", port=<Port>, dbname="<database>", user="<Access ID>", password="<Access Key>", application_name="<Application Name>")
    参数 描述
    host Hologres实例的网络地址。

    进入 Hologres管理控制台 的实例详情页,从 实例配置 获取网络地址。

    port Hologres的实例端口。

    进入 Hologres管理控制台 的实例详情页,从 实例配置 获取端口。

    dbname Hologres创建的数据库名称。
    user 当前阿里云账号的AccessKey ID。

    您可以单击 AccessKey 管理 ,获取AccessKey ID。

    password 当前阿里云账号的AccessKey Secret。

    您可以单击 AccessKey 管理 ,获取AccessKey Secret。

    application_name 应用名,可选。

    用于记录查询日志时识别SQL代表的应用含义。

使用Hologres

当您成功连接Hologres数据库之后,即可通过psycopg2进行数据开发操作。如下内容将指导您创建表、插入数据、查询和释放资源等操作。如果需要使用Fixed Plan能力实现更高性能的读写操作,需要配置相关GUC参数,请参见 Fixed Plan加速SQL执行

  1. 创建游标。
    在进行数据开发之前,您需要执行命令 cur = conn.cursor() 来创建连接的游标。
  2. 数据开发。
    1. 创建表
      您可以执行如下命令,创建一个表 holo_test 并定义表的数据类型为integer。您也可以根据业务需求定义表名称和数据类型。
      cur.execute("CREATE TABLE holo_test (num integer);")
    2. 插入数据
      您可以执行如下命令,为创建的表 holo_test 插入数据1~1000。
      cur.execute("INSERT INTO holo_test SELECT generate_series(%s, %s)", (1, 1000))
    3. 查询数据
      cur.execute("SELECT sum(num) FROM holo_test;")
      cur.fetchone()
  3. 提交事务。
    在查询数据的命令之后,您需要执行命令 conn.commit() 提交事务,此操作可以确保操作已经提交。也可以把 autocommit 参数设置为true,实现SQL命令的自动提交。
  4. 释放资源。
    为避免影响后续的操作,当操作执行完成后,您需要执行如下命令关闭游标并断开数据库连接。
    cur.close()
    conn.close()

Pandas DataFrame快速写入Hologres最佳实践

使用Python时,经常会使用Pandas将数据转换为DataFrame,并对DataFrame进行处理,最终将DataFrame导入Hologres,此时希望将DataFrame快速导入Hologres。导入时候常用 to_sql 函数,详情请参见 Pandas

需要Pandas为V1.4.2及以上版本,您可以执行如下命令强制安装V1.5.1版本的Pandas库。
# pip install Pandas==1.5.1
推荐使用 to_sql 函数的callable方式,使用copy方式导入数据,样例的Python代码如下。
# 加载依赖
import pandas as pd
import psycopg2
# 生成连接字符串
host="hgpostcn-cn-xxxxxx-cn-hangzhou.hologres.aliyuncs.com"
port=80
dbname="demo"
user="LTAI5xxxxx"
password="fa8Kdgxxxxx"
application_name="Python Test"
conn = "postgresql+psycopg2://{}:{}@{}:{}/{}?application_name={}".format(user, password, host, port, dbname,application_name)
print(conn)
# 生成dataframe
data = [('1','1','1'),('2','2','2')]
cols = ('col1','col2','col3')
pd_data = pd.DataFrame(data, columns=cols)
# 定义callable函数
import csv
from io import StringIO
def psql_insert_copy(table, conn, keys, data_iter):
    Execute SQL statement inserting data
    Parameters
    ----------
    table : pandas.io.sql.SQLTable
    conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
    keys : list of str
        Column names
    data_iter : Iterable that iterates the values to be inserted
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)
        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name
        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)
# 导入数据
pd_data.to_sql(
    name="pd_data",
    con=conn,
    if_exists="replace",
    index=False,
    method=psql_insert_copy
)
查看历史查询,验证已经使用COPY方式写入数据至Hologres。 历史慢Query