本文介绍如何通过Python连接池DBUtils连接Lindorm宽表引擎。

前提条件

  • 已安装Python环境,要求安装Python 3.8及以上版本。

  • 已获取Lindorm宽表SQL的连接地址,具体操作,请参见 访问实例

  • 已将客户端IP地址添加至Lindorm实例的白名单中,具体操作,请参见 设置白名单

  • 宽表引擎内核版本需为2.3.1及以上版本。如何升级版本,请参见 升级小版本

准备工作

安装phoenixdb 1.2.0版本和DBUtils 3.0.2版本,示例代码如下:

pip install phoenixdb==1.2.0
pip install DBUtils==3.0.2

使用示例

#!/usr/bin/python3
from dbutils.pooled_db import PooledDB
import importlib
class DBUtilsDemo:
    def __init__(self, url, user, password, database):
        config = {
            'url': url,
            'lindorm_user': user,
            'lindorm_password': password,
            'database': database,
            'autocommit': True
        db_creator = importlib.import_module("phoenixdb")
        # 基于DBUtils的连接池
        self.pooled = PooledDB(db_creator,
                               maxcached=10,  
                               # 连接池的最大空闲连接数,可以根据实际需要调整
                               maxconnections=50,  
                               # 连接池的最大连接数, 可以根据实际需要调整
                               blocking=True,  
                               # 如果连接池没有空闲的连接,是否等待。True:等待空闲连接;False:不等待并报错。
                               ping=1,  
                               # 检查服务端是否可用
                               **config)
    # 从连接池获取连接
    def _connect(self):
            r = self.pooled.connection()
            return r
        except Exception as e:
            print("Failed to connect:" + str(e))
    # 归还连接到连接池
    def _close(self, conn, stmt):
        if stmt:
            stmt.close()
        if conn:
            conn.close()
    # 查询单条记录
    def select_row(self, sql):
        connection = self._connect()
        statement = None
            statement = connection.cursor()
            statement.execute(sql)
            row = statement.fetchone()
            return row
        except Exception as e:
            print(e)
        finally:
            self._close(connection, statement)
    # 查询多条记录
    def select_rows(self, sql):
        connection = self._connect()
        statement = None
            statement = connection.cursor()
            print(sql)
            statement.execute(sql)
            rows = statement.fetchall()
            return rows
        except Exception as e:
            print(e)
        finally:
            self._close(connection, statement)
    # 更新与插入
    def upsert_data(self, sql_upsert):
        connection = self._connect()
        statement = None
            statement = connection.cursor()
            statement.execute(sql_upsert)
            connection.commit()
        except Exception as e:
            print(e)
        finally:
            self._close(connection, statement)
    # 更新与插入带参数
    def upsert_data_prams(self, sql_upsert, prams):
        connection = self._connect()
        statement = None
            statement = connection.cursor()
            statement.execute(sql_upsert, prams)
            connection.commit()
        except Exception as e:
            print(e)
        finally:
            self._close(connection, statement)
if __name__ == '__main__':
    # Lindorm宽表SQL的连接地址
    url = 'http://xxxx-proxy-lindorm.lindorm.rds.aliyuncs.com:30060'
    # 用户名,根据实际情况做替换。您可以通过Lindorm集群管理系统查看用户名和密码。
    user = 'root'
    # 密码,根据实际情况做替换
    password = 'root'
    # 连接的数据库名称,根据实际情况做替换
    database = 'test'
    poolUtils = DBUtilsDemo(url, user, password, database)
    rows = poolUtils.select_rows('SELECT * FROM dt')
    print(rows)
重要
  • DBUtils创建的连接支持使用ping方法探测,phoenixdb创建的连接不支持使用ping方法探测。

  • 在长时间不使用连接时(目前时间为10分钟),会出现NoSuchConnection异常,您需要重新从连接池获取连接。