SQLAlchemy入门-上
说明
SQLAlchemy包含SQLAlchemy Core和SQLAlchemy ORM两部分, 这个系列只包含SQLAlchemy Core的内容。
由于内容较多,教程被分成了上,下两部分。 Select,Update,Delete本身内容较为丰富,放在第二步部门进行讨论。
准备
安装sqlalchemy
pip install SQLAlchemy
安装postgresql数据库
如果想运行文中的代码,请安装postgresql数据库并且建立相应的测试用户和测试数据库。
导入helper.py
为了方便代码实现的进行,我编写了helper.py,里面提供的函数功能如下
- reset_tables:在名为'test'的schema下重置users,addresses两张表的数据
- clear_tables:在名为'test'的schema下删除users和addresses两张表并返回两张表对应的object
- clear_schema:删除名为'test'的schema
- get_table:获得名为'test'的schema中指定表的数据(DataFrame形式)
- get_select:获得名为'test'的schema中指定查询语句得到数据(DataFrame形式)
- print_sql:print sqlalchemy object编译后对应的sql语句
读者暂时先不必关心这些函数是怎么实现的,等完成这份教程后自然有能力自己去实现同样的功能。
from helper import reset_tables,clear_tables,clear_schema,get_select,get_table,print_sql
导入其它代码实验要用到的库
from IPython.display import display
创建engine
SQLAlchemy通过engine和目标数据库建立连接,它是后面所有的数据库操作都需要使用的object。 我本机的使用的用户名,数据库名,密码都是'test',端口为'5432'。如果不一致请相应的对下面的代码做出修改。
from sqlalchemy import create_engine
user = 'test'
password = 'test'
port = '5432'
dbname = 'test'
engine = create_engine('postgresql://{user}:{password}@localhost:{port}/{dbname}'.format(**locals()))
测试数据
教程中用到的测试数据如下
from helper import users_data, addresses_data
users表和user是一一对应关系,它包含的测试数据是id为1,2的用户的name和fullname
display(users_data)
id name fullname 0 1 jack Jack Jones 1 2 wendy Wendy Williams
addresses表和user是一对多的关系,它包含的测试数据是id为1,2的用户的email_addresses
display(addresses_data)
user_id email_address 0 1 jack@yahoo.com 1 1 jack@msn.com 2 2 www@www.org 3 2 wendy@aol.com
SQLAlchemy Core初印象
SQLAlchemy Core提供了一套SQL Expression language,它提供了一套用Python construct(Python object)去表达SQL逻辑的体系。下面通过一些代码演示一下SQL Expression language的基本特征。这里读者只需要有大致的感觉即可,如果有一些细节不理解不用在意,后面都会有详细的解释。
传统的SQL语句是用文本方式编写的。
sql = '''
select
users.id, users.fullname
users join addresses
users.id = addresses.user_id
group by
users.id
having
count(addresses.email_address)>1
order by users.fullname
'''
在SQLAlchemy Core中是这样表达的
from sqlalchemy import select, func
users, addresses = reset_tables(engine)
s = (
select(
users.c.id,
users.c.fullname
).select_from(
users.join(
addresses,
users.c.id==addresses.c.user_id
).group_by(users.c.id)
.having(func.count(addresses.c.email_address)>1)
print_sql(engine, s)
***Compiled SQL***
SELECT test.users.id, test.users.fullname
FROM test.users JOIN test.addresses ON test.users.id = test.addresses.user_id GROUP BY test.users.id
HAVING count(test.addresses.email_address) > 1
>>>
上面的SQL逻辑可以看作是很多更基本的元件构成的,包括表,列,条件,join语句等等。整个Select逻辑和这些组成元件,对应的都是sqlalchemy object
l = [
users,
users.c.id,
users.c.id==addresses.c.user_id,
for obj in l:
print(type(obj))
<class 'sqlalchemy.sql.schema.Table'>
<class 'sqlalchemy.sql.schema.Column'>
<class 'sqlalchemy.sql.elements.BinaryExpression
'>
<class 'sqlalchemy.sql.selectable.Select'>
由于因此使用SQLAlchemy Core表达SQL逻辑的时候,是一个从代表基本SQL逻辑模块的object逐步组装成复杂object的过程。这样做有几个好处。
容易拆分
当SQL逻辑复杂的时候,可以分阶段的构造。先构造简单的SQL逻辑模块,再逐步组装成复杂的SQL逻辑。相比一次性构造完整的复杂SQL逻辑相比,头脑的负担更低,也不容易出错。
下面的例子里,我们可以把前面例子中的要选择的columns,join语句,having条件先构造好,然后再组装成完整的SQL逻辑。每一个SQL逻辑模块构造好后我们都可以观察一下对应的SQL语句是什么。
from sqlalchemy import select, func
columns_selection = select(
[users.c.id, users.c.fullname]
print_sql(engine,columns_selection)
join_clause = users.join(
addresses,
users.c.id==addresses.c.user_id
print_sql(engine,join_clause)
condition = func.count(addresses.c.email_address)>1
print_sql(engine,condition)
s = (
columns_selection
.select_from(join_clause)
.group_by(users.c.id)
.having(condition)
print_sql(engine,s)
***Compiled SQL***
SELECT test.users.id, test.users.fullname
FROM test.users
***Compiled SQL***
test.users JOIN test.addresses ON test.users.id = test.addresses.user_id
***Compiled SQL***
count(test.addresses.email_address) > 1
***Compiled SQL***
SELECT test.users.id, test.users.fullname
FROM test.users JOIN test.addresses ON test.users.id = test.addresses.user_id GROUP BY test.users.id
HAVING count(test.addresses.email_address) > 1
>>>
容易复用
由于使用SQLAlchemy Core去表达SQL,本质上是使用python语言写代码。 因此我们可以利用python提供的一切工具和手段将重复出现的SQL逻辑抽提成可复用的python代码。
例如我们在多个地方要根据fullname的长度,和首字母去筛选user。那么可以用一个函数生成这个条件,以后直接调用这个函数即可。
from sqlalchemy import Table, and_, func, bindparam
def email_condition(users, init, length):
return and_(
users.c.fullname.like('{}%'.format(init)),
func.len(users.c.fullname)==length
c = email_condition(users,'J',5)
print_sql(engine,c)
***Compiled SQL***
test.users.fullname LIKE 'J%' AND len(test.users.fullname) = 5
>>>
处理数据库差异
在用SQLAlchemy Core表达SQL逻辑的时候,只是表达了用户的意图,并未生成最终的SQL语句。
同样的SQL逻辑,在不同的database中语法可能会有变化,因此对应的SQL语句是不同的。 而SQLAlchemy Core会根据database的种类,编译出和这个database匹配的SQL语句。这样用户用SQLAlchemy Core组织一次SQL逻辑,就可以在多个数据库中复用。
当然每个database都有一些自己独有的功能,对于这部分差异SQLAlchemy是不能自动处理的。
SQLAlchemy Core使用详解
查看编译后的语句
使用SQLAlchemy Core一个基本的需求是查看sqlalchemy object编译后的SQL语句是什么样的。这个可以用object提供的compile方法实现。
condition = users.c.name == 'jack'
compiled = condition.compile()
print(compiled)
test.users.name = :name_1
默认情况下编译后的SQL语句是带参数的形式,并没有把'jack'代入name_1。可以通过调用params属性查看对应的数值是多少。
print(compiled.params)
{'name_1': 'jack'}
如果希望编译后的SQL语句是非参数化的形式,可以添加
compile_kwargs={"literal_binds": True}
选项。
compiled = condition.compile(compile_kwargs={"literal_binds": True})
print(compiled)
test.users.name = 'jack'
由于具体的SQL逻辑在不同的database对应的语法并不完全相同,所以建议传入一个指向特定数据库的engine,可以得到更准确的编译结果。(在这个例子里没有差别)
compiled = condition.compile(engine, compile_kwargs={"literal_binds": True})
print(compiled)
test.users.name = 'jack'
schema操作
创建schema
from sqlalchemy.schema import CreateSchema
clear_schema(engine)
schema_name = 'test'
obj = CreateSchema(schema_name)
engine.execute(obj)
print_sql(engine,obj,False)
***Compiled SQL***
CREATE SCHEMA test
>>>
如果创建已经存在的schema,会导致异常。例如,刚才已经创建了名为'test'的schema,如果再创建一遍的话,会提示schema "test" already exists
try:
engine.execute(obj)
except Exception as e:
print(type(e),e)
<class 'sqlalchemy.exc.ProgrammingError'> (psycopg2.ProgrammingError) schema "test" already exists
[SQL: 'CREATE SCHEMA test']
注意
有些sqlalchemy object,例如这个例子中的
CreateSchema(schema_name)
,结果为None。
type(obj.compile().params)
NoneType
对于这类object,
compile
的时候添加
compile_kwargs={"literal_binds": True}
会导致异常。
try:
obj.compile(compile_kwargs={"literal_binds": True})
except Exception as e:
print(type(e),e)
<class 'TypeError'> visit_create_schema() got an unexpected keyword argument 'literal_binds'
默认情况
print_sql
函数会添加
"literal_binds": True
, 可以将第三个参数设置成
False
关闭这个设置。
删除schema
和新建schema类似。不过如果这个schema下有一些依赖于这个schema存在的资源,比如tables,那么只有先删除了这些资源后才能删除这个schema,否则会异常。
这里有一个有用的参数cascade,设置成True的话会自动删除所有依赖于这个schema的资源。
from sqlalchemy.schema import DropSchema
schema_name = 'test'
obj = DropSchema(schema_name, cascade = True)
print_sql(engine,obj,False)
engine.execute(obj)
***Compiled SQL***
DROP SCHEMA test CASCADE
<sqlalchemy.engine.result.ResultProxy at 0x7f734fa1cd30>
同样, 如果删除已经不存在的schema,会报ProgrammingError
同样,如果删除并不存在的schema,会报异常,这个不演示了。
table操作
定义table
定义SQLAlchemy可理解的table数据结构,主要参数是table名,schema名以及相关的column的名称,类型,是否是primary_key等信息。
定义table是进行新建表,构建select语句等操作的基础。
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey, Sequence
from sqlalchemy.schema import CreateTable
schema='test'
table_name = 'users'
metadata = MetaData()
table = Table(
table_name,
metadata,
Column('id',Integer,primary_key=True),
Column('name',String),
Column('fullname',String),
schema = schema
)
如果是数据库中已经存在的表,可以直接使用autoload功能从数据库中读取表的列信息,可以免去很多麻烦。下面reset_db确保test.users表存在后用autoload自动读取users表的信息。
reset_tables(engine);
metadats = MetaData()
users = Table('users', metadata, schema = 'test', autoload=True, autoload_with=engine)
可以看到users中自动包含了column的定义信息。
users.c.values()
[Column('id', Integer(), table=<users>, primary_key=True, nullable=False),
Column('name', String(), table=<users>),
Column('fullname', String(), table=<users>)]
注意
如果table中定义了foreign key信息,SQLAlchemy Core构建join语句的时候能够自动将foreign key作为join的条件。 但是autoload得到的table会失去这个便利,暂时没找到解决方法。(见join章节的演示)
新建table
再定义了table后,可以在数据库中新建这张表。
先清空数据库
clear_tables(engine)
新建表
obj = CreateTable(table)
print_sql(engine,obj,False)
engine.execute(obj);
***Compiled SQL***
CREATE TABLE test.users (
id SERIAL NOT NULL,
name VARCHAR,
fullname VARCHAR,
PRIMARY KEY (id)
>>>
SQLAlchemy会根据数据库的类型,将String等列类型信息转化成数据库中对应的信息,例如Oracle中的VARCHAR2。
注意,不同的数据库对于configs的要求会不同。例如,postgresql只需要写String,不需要指定长度;而Oracle在定义时,必须指定长度,得改成类似下面的设置才会生效。
Column('id',Integer,primary_key=True),
Column('name',String(20)),
Column('fullname',String(20)),
同样, 如果尝试新建已经存在的表,会出错,这个不演示了。
drop table
drop table的处理方法和create table类似。不过在定义
from sqlalchemy.schema import DropTable
reset_tables(engine)
metadata = MetaData()
table = Table(
'users',
metadata,
schema = 'test'
print_sql(engine,obj,False)
obj = DropTable(table)
***Compiled SQL***
CREATE TABLE test.users (
id SERIAL NOT NULL,
name VARCHAR,
fullname VARCHAR,
PRIMARY KEY (id)
>>>
不过运行的话会报错
users, addresses = reset_tables(engine)
try:
engine.execute(obj)
except Exception as e:
print(type(e),e)
<class 'sqlalchemy.exc.InternalError'> (psycopg2.InternalError) cannot drop table users because other objects depend on it
DETAIL: constraint addresses_user_id_fkey on table addresses depends on table users
HINT: Use DROP ... CASCADE to drop the dependent objects too.
[SQL: '\nDROP TABLE test.users']
这是由于在定义addresses表的时候,定义了addresses的user_id是users表的foreign key,因此foreign key依赖于users表,只有Drop时指定CASCADE选项才能顺利的删除这张表。(它会删除所有依赖于users表的foreign_key),遗憾的是,我并没有在sqlalchemy中找到相关的选项启动CASCADE。
不过SQLAlchemy的一个好处是,它完全可以接受原生的SQL语句去对数据库进行操作。我们在语句中加上CASCADE和IF EXISTS来进行drop table的操作。
table_name = 'users'
schema = 'test'
sql = "DROP TABLE IF EXISTS {schema}.{table_name} CASCADE".format(table_name = table_name, schema = schema)
engine.execute(sql)
<sqlalchemy.engine.result.ResultProxy at 0x7f7350290198>
提示
SQLAlchemy的优势更多的是体现在构造和复用复杂的SQL逻辑上。在删除table的这个例子里。SQLAlchemy Core实际上并不如原生的SQL语句好用。我们完全可以针对自己的场景,选择适合的工作去完成任务。
插入数据
插入单行数据
users, addresses = reset_tables(engine)
ins = users.insert().values(name='Junjie', fullname='Junjie Cai')
print_sql(engine, ins)
result = engine.execute(ins)
***Compiled SQL***
INSERT INTO test.users (name, fullname) VALUES ('Junjie', 'Junjie Cai') RETURNING test.users.id
>>>
可以用result.insered_primary_key很方便的找到插入记录的id
result.inserted_primary_key
[3]
验证一下插入数据后的结果
display(get_table(engine, users))
id name fullname 0 1 jack Jack Jones 1 2 wendy Wendy Williams 2 3 Junjie Junjie Cai
注意也可以在engine.execute中传入数据
users, addresses = reset_tables(engine)
ins = users.insert()
print_sql(engine, ins)
engine.execute(ins,name='jack', fullname='Jack Jones')
display(get_table(engine, users))
***Compiled SQL***
INSERT INTO test.users (id, name, fullname) VALUES (%(id)s, %(name)s, %(fullname)s)
>>>
id name fullname 0 1 jack Jack Jones 1 2 wendy Wendy Williams 2 3 jack Jack Jones
插入多行数据
如果是插入部分列的话,可以用list of dict的结构。
data = [
{'name':'Junjie','fullname':'CaiJunjie'},
{'name':'Xu','fullname':'ZhangXu'}
ins = users.insert().values(data)
engine.execute(ins)
display(get_table(engine,users))
id name fullname 0 1 jack Jack Jones 1 2 wendy Wendy Williams 2 3 jack Jack Jones 3 4 Junjie CaiJunjie 4 5 Xu ZhangXu
注意如果要插入dict list,sqlalchemy会以list中第一条记录的key为准
data = [
{'name':'Name1'},
{'name':'Name2','fullname':'FULLNAME2'}
ins = users.insert().values(data)
engine.execute(ins)
display(get_table(engine,users))
id name fullname 0 1 jack Jack Jones 1 2 wendy Wendy Williams 2 3 jack Jack Jones 3 4 Junjie CaiJunjie 4 5 Xu ZhangXu 5 6 Name1 None 6 7 Name2 None
如果第一行包含了所有的key,后面的记录key缺失的话,会直接报错。
try:
data = [
{'name':'Name3','fullname':'FULLNAME3'},
{'name':'Name4'},
ins = users.insert().values(data)
engine.execute(ins)
except Exception as e:
print(type(e),e)
<class 'sqlalchemy.exc.CompileError'> INSERT value for column users.fullname is explicitly rendered as a boundparameter in the VALUES clause; a Python-side value or SQL expression is required
如果插入数据时会使用所有的列,那么可以简化成直接用tuple list插入数据。但是这是就不能利用自动编号id,而是要传入id。
data = [
(8,'Cai','Junjie'),
(9,'Zhang','Xu')
ins = users.insert().values(data)
engine.execute(ins)
display(get_table(engine,users))
id name fullname 0 1 jack Jack Jones 1 2 wendy Wendy Williams 2 3 jack Jack Jones 3 4 Junjie CaiJunjie 4 5 Xu ZhangXu 5 6 Name1 None 6 7 Name2 None 7 8 Cai Junjie 8 9 Zhang Xu
但是用这种方式传入数据的话,自动id的状态并不会做出相应的调整,而是继续从上次终止的地方开始,不会跳过用上面方式插入的id。 如果再利用dict list插入数据,生成id就可能和以后的重复,导致异常。
例如下面的例子里,最后一次自动id是7,继续使用自动id的话,会从8开始。可以上面再用tuple list插入数据的时候已经把8占用了,于是导致异常。
ins = users.insert()
print_sql(engine, ins)
try:
engine.execute(ins,name='jack', fullname='Jack Jones')
except Exception as e:
print(type(e),e)
***Compiled SQL***
INSERT INTO test.users (id, name, fullname) VALUES (%(id)s, %(name)s, %(fullname)s)
<class 'sqlalchemy.exc.IntegrityError'> (psycopg2.IntegrityError) duplicate key value violates unique constraint "users_pkey"
DETAIL: Key (id)=(8) already exists.
[SQL: 'INSERT INTO test.users (name, fullname) VALUES (%(name)s, %(fullname)s) RETURNING test.users.id'] [parameters: {'fullname': 'Jack Jones', 'name': 'jack'}]
从DataFrame插入数据
pandas DataFrame是数据工作者经常使用的数据结构。
from pandas import DataFrame
df = DataFrame({'name':['Xu','Junjie'],'fullname':['ZhangXu','CaiJunjie']})
display(df)
fullname name 0 ZhangXu Xu 1 CaiJunjie Junjie
可以利用
to_dict()
方法很方便的把
dataframe
转成dict list
display(df.to_dict(orient = 'records'))
[{'fullname': 'ZhangXu', 'name': 'Xu'},
{'fullname': 'CaiJunjie', 'name': 'Junjie'}]
注意
尽管
list(df.to_records())
转成的结果看上去是tuple list
df = DataFrame(
{'id':[15,16],'name':['Xu','Junjie'],'fullname':['ZhangXu','CaiJunjie']}
,columns = ['id','name','fullname']
display(list(df.to_records(index = False)))
[(15, 'Xu', 'ZhangXu'), (16, 'Junjie', 'CaiJunjie')]
但是直接插入这个数据的话会导致异常
data = list(df.to_records(index = False))
try:
ins = users.insert().values(data)
engine.execute(ins)
except Exception as e:
print(type(e),e)
<class 'sqlalchemy.exc.ProgrammingError
'> (psycopg2.ProgrammingError) can't adapt type 'record' [SQL: 'INSERT INTO test.users (id, name) VALUES (%(id)s, %(name)s)'] [parameters: {'id': (15, 'Xu', 'ZhangXu'), 'name': (16, 'Junjie', 'CaiJunjie')}]
原因是list中的数据类型是numpy.record,不是tuple。
display(type(data[0]))
numpy.record
即使修复了这个问题
data = [tuple(r) for r in data]
display(type(data[0]))
tuple
也依然会因为数据结构类型不一致导致异常
try:
ins = users.insert().values(data)
engine.execute(ins)
except Exception as e:
print(type(e),e)
<class 'sqlalchemy.exc.ProgrammingError'> (psycopg2.ProgrammingError) can't adapt type 'numpy.int64' [SQL: 'INSERT INTO test.users (id, name, fullname) VALUES (%(id_0)s, %(name_0)s, %(fullname_0)s), (%(id_1)s, %(name_1)s, %(fullname_1)s)'] [parameters: {'name_1': 'Junjie', 'fullname_1': 'CaiJunjie', 'id_0': 15, 'name_0': 'Xu', 'fullname_0': 'ZhangXu', 'id_1': 16}]
data = df.to_dict(orient = 'record')
try:
ins = users.insert().values(data)
engine.execute(ins)
except Exception as e:
print(type(e),e)
因此建议直接使用to_dict(orient = 'record')方式转化数据。
Select, Update, Delete
这部门内容比较丰富,这里只演示最基本的应用。更详细的说明放在下一期的的文章讲解。
基本的select结构
from sqlalchemy import select
s = select(
users.c.id,
users.c.name
).select_from(
users
).where(
users.c.id==1
print_sql(engine,s)
display(get_select(engine,s))
***Compiled SQL***
SELECT test.users.id, test.users.name
FROM test.users
WHERE test.users.id = 1
>>>
id name 0 1 jack
其中select_from相当于SQL中的FROM。 如果不会产生歧义,select_from部分可以省略不写。SQLAlchemy会自动补齐相关的FROM语句。
from sqlalchemy import select
s = select(
users.c.id,
users.c.name
).where(
users.c.id==1
print_sql(engine,s)
display(get_select(engine,s))
***Compiled SQL***
SELECT test.users.id, test.users.name
FROM test.users
WHERE test.users.id = 1
>>>
id name 0 1 jack
带参数的SQL逻辑
如果希望生成的SQL逻辑支持参数,有两种实现方式。
函数生成方式
用函数生成SQL逻辑,用函数的参数去实现SQL逻辑参数可变的效果。例如我们构造一个针对user.id的条件。
def condition(user_id):
return users.c.id == user_id
print_sql(engine,condition(1))
print_sql(engine,condition(2))
***Compiled SQL***
test.users.id = 1
***Compiled SQL***
test.users.id = 2
>>>
上面这种方式每次运行函数的时候都会构建新的SQLAlchemy object。
用bindparam指定参数
另一种方式是构建SQLAlchemy object时,用bindparam指定参数部分。 然后用
.params
绑定数值。
from sqlalchemy.sql import bindparam
condition = (users.c.id == bindparam('id')).params({'id':1})
print_sql(engine,condition,False)
print_sql(engine,condition)
***Compiled SQL***
test.users.id = %(id)s
***Compiled SQL***
test.users.id = 1
>>>
实际上,在SQLAlchemy中使用常数的时候,只是把定义参数和绑定数据两步一起做而已。
from sqlalchemy.sql import bindparam
condition = users.c.id == 1
print_sql(engine,condition,False)
print_sql(engine,condition)
***Compiled SQL***
test.users.id = %(id_1)s
***Compiled SQL***
test.users.id = 1
>>>
如果定义了参数后没有通过params绑定数值,那么在execute阶段传入数值也是可以的。
s = users.select().where(users.c.id==bindparam('id'))
print_sql(engine,s,False)
display(engine.execute(s,id=1).fetchone())
***Compiled SQL***
SELECT test.users.id, test.users.name, test.users.fullname
FROM test.users
WHERE test.users.id = %(id)s
(1, 'jack', 'Jack Jones')
上面这种方式, obj生成一次后可以反复被利用,不必重复的生成object。
类型提示
有些场景下,需要指定变量类型,帮助sqlalchemy正确的编译语句。下面的例子里,即使后面绑定了string类型的数据,
+
依然没能正确的编译成字符串的连接符。应该是"||"。
from sqlalchemy import text
s = users.select(users.c.name.like(bindparam('username') + text("'%'")))
s = s.params({'username':'jack'})
print_sql(engine,s)
try:
display(get_select(engine, s))
except Exception as e:
print(type(e),e)
***Compiled SQL***
SELECT test.users.id, test.users.name, test.users.fullname
FROM test.users
WHERE test.users.name LIKE NULL + '%%'
<class 'sqlalchemy.exc.ProgrammingError'> (psycopg2.ProgrammingError) operator is not unique: unknown + unknown
LINE 3: WHERE test.users.name LIKE 'jack' + '%'
HINT: Could not choose a best candidate operator. You might need to add explicit type casts.
[SQL: "SELECT test.users.id, test.users.name, test.users.fullname \nFROM test.users \nWHERE test.users.name LIKE %(username)s + '%%'"] [parameters: {'username': 'jack'}]
这时候,需要主动在bindparam中通过type_指定数据类型,帮助SQLAlchemy正确的编译
from sqlalchemy import text,String
s = users.select(users.c.name.like(bindparam('username',type_=String) + text("'%'")))
s = s.params({'username':'jack'})
print_sql(engine,s,compile_kwargs={'literal_binds':True})
display(get_select(engine, s))
***Compiled SQL***
SELECT test.users.id, test.users.name, test.users.fullname
FROM test.users
WHERE test.users.name LIKE ('jack' || '%%')
>>>
id name fullname 0 1 jack Jack Jones 1 3 jack Jack Jones
用text定义sqlalchemy object
除了用纯粹的sqlalchemy object去定义SQL逻辑的各种组件,有时候我们希望将文本形式的sql直接转化成sqlalchemy object。例如下面两种场景。
- 已经存在现成的sql代码片段,不想用SQLAlchemy重写
- 遇到SQLAlchemy无法表达,只有原生的SQL能表达的场景
例如下面这样包含待定参数的SQL语句,
:id
是名为id的参数。在传入实际的数值前,这个语句是不完整的,如果直接传入engine.execute的话,会出错。
s = 'select users.id, users.name, users.fullname from test.users where users.id=:user_id'
try:
engine.execute(s).fetchall()
except Exception as e:
print(type(e),e)
<class 'sqlalchemy.exc.ProgrammingError'> (psycopg2.ProgrammingError) syntax error at or near ":"
LINE 1: ...name, users.fullname from test.users where users.id=:user_id
[SQL: 'select users.id, users.name, users.fullname from test.users where users.id=:user_id']
这时可以用text处理并且用bindparams函数绑定数据
s = 'select users.id, name, users.fullname from test.users where users.id=:user_id'
s = text(s).bindparams(user_id=1)
print_sql(engine,s)
print(engine.execute(s).fetchone())
***Compiled SQL***
select users.id, name, users.fullname from test.users where users.id=1
(1, 'jack', 'Jack Jones')
绑定参数调用的方法是bindparams,不是params,也不是bindparam! 注意区分!
也可以不绑定参数,而是在execute阶段传入数据
s = 'select users.id, users.name, users.fullname from test.users where users.id=:user_id'
s = text(s)
print_sql(engine,s,False)
print(engine.execute(s,user_id=1).fetchone())
***Compiled SQL***
select users.id, users.name, users.fullname from test.users where users.id=%(user_id)s
(1, 'jack', 'Jack Jones')
除了用文本定义大段的SQL逻辑外,也可以用文本SQL的片段去定义部分的SQL组件。
s = (
select(
text("users.fullname || ', ' || addresses.email_address AS title"),
).select_from(
text('test.users, test.addresses'),
).where(
text(
"users.id = addresses.user_id and "
"users.id = :user_id"
s = s.params({'user_id':1})
print_sql(engine, s)
engine.execute(s).fetchall()
***Compiled SQL***
SELECT users.fullname || ', ' || addresses.email_address AS title
FROM test.users, test.addresses
WHERE users.id = addresses.user_id and users.id = NULL
[('Jack Jones, jack@yahoo.com',), ('Jack Jones, jack@msn.com',)]
注意上面例子中s构造的时候,用到了text生成的带参数的SQL逻辑组件,但是本身的数据类型是sqlalchemy.sql.selectable.Select,因此绑定数据的时候调用的方法是params,而不是bindparam
print(type(s))
<class 'sqlalchemy.sql.selectable.Select'>
如果用文本定义的SQL片段是table,和column, 可以用literal_column, table代替text去处理文本SQL。
from sqlalchemy import literal_column, String,table,literal
users = table('users')
users.schema = 'test' #注意指定schema的方式
s = select(
literal_column('users.id').label('id'),
(literal('=<')+literal_column('users.fullname',type_ = String)+literal('>=')).label('name')
).select_from (
users
print_sql(engine,s)
***Compiled SQL***
SELECT users.id AS id, '=<' || users.fullname || '>=' AS name
FROM test.users
>>>
注意schema不能在构造table时以字符串传入,否则生成的语句执行时会错误。尽管构造出来的SQL看上去是完全正确的。
from sqlalchemy import literal_column, String,table,literal
users = table('test.users') #这样是不行的
s = select(
literal_column('users.id').label('id'),
(literal('=<')+literal_column('users.fullname',type_ = String)+literal('>=')).label('name')
).select_from (
users
print_sql(engine,s)
try:
display(get_select(engine,s))
except Exception as e:
print(type(e),e)
***Compiled SQL***
SELECT users.id AS id, '=<' || users.fullname || '>=' AS name
FROM "test.users"
<class 'sqlalchemy.exc.ProgrammingError'> (psycopg2.ProgrammingError) relation "test.users" does not exist
LINE 2: FROM "test.users"
[SQL: 'SELECT users.id AS id, %(param_1)s || users.fullname || %(param_2)s AS name \nFROM "test.users"'] [parameters: {'param_2': '>=', 'param_1': '=<'}]
用literal_column和table相比text,构造出的object能够更好的被SQLAlchemy支持。看下面的例子。
users, addresses = reset_tables(engine)
s1 = select(
users.c.id,
text('users.fullname AS name')
print_sql(engine,s1)
s2 = select(
users.c.id,
literal_column('users.fullname').label('name')
print_sql(engine,s2)
***Compiled SQL***
SELECT test.users.id, users.fullname AS name
FROM test.users
***Compiled SQL***