engine = create_engine('mysql://%s:%s@%s/%s?charset=%s' % (db_config['user'], db_config['passwd'], db_config['host'], db_config['db'], db_config['charset']), echo=True)
# start from 0 and get 10
searchResult = self.db.query(User).join(UserInfo, User.uid == UserInfo.uid).\
    filter(UserInfo.income > 5000).offset(0).limit(10)

filter中实现isnull(id,0)=0

这个的办法就是用 or_ 函数, 同时实现了result转json

SysNote = self.db.query(Note.note_id.label('msg_id'), Note.content).filter(Note.deleted == 0).\
    outerjoin(NoteBox, Note.note_id == NoteBox.note_id).\
    filter(or_(NoteBox.nb_id == None, NoteBox.deleted == 0)).\
    filter(or_(NoteBox.nb_id == None, NoteBox.uid == self.uid))
msgs = []
# print(dir(SysNote), ...)
for msg in SysNote:
    row = {}
    # msg is tuple
    for field in msg._fields:
        row[field] = eval('msg.'+field) #dynamicly get the field value
    msgs.append(row)
SysNote = json.dumps(msgs, check_circular=False)
return JsonResponse(self, 50000, msg='Success', data=msgs)

排除Noe值

session.query(employee).filter_by(employ.brand_id.isnot(None)) from sqlalchemy import not_ session.query(employee).filter_by(not_(employ.brand_id==None))
users = session.query(User.name.label("user_name"))                 # 结果集的列取别名
    for user in users:
        print("label test:", user.user_name)                            # 这里使用别名

join使用

sqlalchemy各个模块主要功能了解

1. 在sqlalchemy.schema包里有数据库关系的描述,列举几个最常用的:
   字段:Column
   索引:Index
   表:Table
1. 数据类型在sqlalchemy.types包,列举几个最常用的:
   二进制:BIGINT
   布尔:BOOLEAN
   字符:CHAR
   可变字符:VARCHAR
   日期:DATETIME
1. 操作方法在sqlalchemy.sql包里,列举几个最常用的:
   execute,update,insert,select,delete,join等

执行原生SQL,格式化参数

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine('mysql://user:passwd@ip:port/db', echo=True)
Session = sessionmaker(bind=engine)
session = Session()
session.execute('show databases')

如何选取指定的列,使用别名

联表统计,聚合函数的使用,sum,rownumber, avg

如何使用group by, having

创建表结构

创建名为users的表,有四个字段:id,name,fullname,password. 
String在mysql里其实就是varchar
metadata = MetaData()
users_table = Table('users', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('fullname', String(50)),
    Column('password', String(100))
metadata.create_all(engine)

从数据表获取映射对象

看到这里我们没有使用Model对象
from sqlalchemy.orm import mapper
metadata = MetaData(engine)
users_table = Table('users', metadata, autoload=True)
print users_table.columns
# 利用映射对象执行数据插入和显示
insert =  users_table.insert()
insert.execute(name='leon', fullname='leon liang', password='leon123')
mapper(User, users_table)
ed_user=User('crackpot','Crackpot','password')
print 'username:', ed_user.name
print 'fullname:', ed_user.fullname
print 'password:', ed_user.password
print 'id:', str(ed_user.id)

ORM操作

#coding:utf8
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
print(sqlalchemy.__version__)
engine = create_engine('sqlite:///dbyuan1.db', echo=True)
Base = declarative_base()#生成一个SQLORM基类
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    fullname = Column(String)
    password = Column(String)
    def __repr__(self):
       return "<User(name='%s', fullname='%s', password='%s')>" % (
                            self.name, self.fullname, self.password)
Base.metadata.create_all(engine)  #创建所有表结构
ed_user = User(name='xiaoyu', fullname='Xiaoyu Liu', password='123')
print(ed_user)
#这两行触发sessionmaker类下的__call__方法,return得到 Session实例,赋给变量session,所以session可以调用Session类下的add,add_all等方法
MySession = sessionmaker(bind=engine)
session = MySession()
session.add(ed_user)
# our_user = session.query(User).filter_by(name='ed').first()
# SELECT * FROM users WHERE name="ed" LIMIT 1;
# session.add_all([
#     User(name='alex', fullname='Alex Li', password='456'),
#     User(name='alex', fullname='Alex old', password='789'),
#     User(name='peiqi', fullname='Peiqi Wu', password='sxsxsx')])
session.commit()
#print(">>>",session.query(User).filter_by(name='ed').first())
#print(session.query(User).all())
# for row in session.query(User).order_by(User.id):
#      print(row)
# for row in session.query(User).filter(User.name.in_(['alex', 'wendy', 'jack'])):#这里的名字是完全匹配
#     print(row)
# for row in session.query(User).filter(~User.name.in_(['ed', 'wendy', 'jack'])):
#     print(row)
#print(session.query(User).filter(User.name == 'ed').count())
#from sqlalchemy import and_, or_
# for row in session.query(User).filter(and_(User.name == 'ed', User.fullname == 'Ed Jones')):
#     print(row)
# for row in session.query(User).filter(or_(User.name == 'ed', User.name == 'wendy')):
#     print(row)

一对多关联操作

http://www.cnblogs.com/yuanchenqi/articles/5638282.html

#coding:utf8
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
print(sqlalchemy.__version__)
engine = create_engine('sqlite:///dbyuan1.db', echo=True)
Base = declarative_base()#生成一个SQLORM基类
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    fullname = Column(String)
    password = Column(String)
    def __repr__(self):
       return "<User(name='%s', fullname='%s', password='%s')>" % (
                            self.name, self.fullname, self.password)
Base.metadata.create_all(engine)  #创建所有表结构
ed_user = User(name='xiaoyu', fullname='Xiaoyu Liu', password='123')
print(ed_user)
#这两行触发sessionmaker类下的__call__方法,return得到 Session实例,赋给变量session,所以session可以调用Session类下的add,add_all等方法
MySession = sessionmaker(bind=engine)
session = MySession()
session.add(ed_user)
# our_user = session.query(User).filter_by(name='ed').first()
# SELECT * FROM users WHERE name="ed" LIMIT 1;
# session.add_all([
#     User(name='alex', fullname='Alex Li', password='456'),
#     User(name='alex', fullname='Alex old', password='789'),
#     User(name='peiqi', fullname='Peiqi Wu', password='sxsxsx')])
session.commit()
#print(">>>",session.query(User).filter_by(name='ed').first())
#print(session.query(User).all())
# for row in session.query(User).order_by(User.id):
#      print(row)
# for row in session.query(User).filter(User.name.in_(['alex', 'wendy', 'jack'])):#这里的名字是完全匹配
#     print(row)
# for row in session.query(User).filter(~User.name.in_(['ed', 'wendy', 'jack'])):
#     print(row)
#print(session.query(User).filter(User.name == 'ed').count())
#from sqlalchemy import and_, or_
# for row in session.query(User).filter(and_(User.name == 'ed', User.fullname == 'Ed Jones')):
#     print(row)
# for row in session.query(User).filter(or_(User.name == 'ed', User.name == 'wendy')):
#     print(row)
#coding:utf8
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
print(sqlalchemy.__version__)
engine = create_engine('sqlite:///dbyuan1.db', echo=True)
Base = declarative_base()#生成一个SQLORM基类
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    fullname = Column(String)
    password = Column(String)
    def __repr__(self):
       return "<User(name='%s', fullname='%s', password='%s')>" % (
                            self.name, self.fullname, self.password)
Base.metadata.create_all(engine)  #创建所有表结构
ed_user = User(name='xiaoyu', fullname='Xiaoyu Liu', password='123')
print(ed_user)
#这两行触发sessionmaker类下的__call__方法,return得到 Session实例,赋给变量session,所以session可以调用Session类下的add,add_all等方法
MySession = sessionmaker(bind=engine)
session = MySession()
session.add(ed_user)
# our_user = session.query(User).filter_by(name='ed').first()
# SELECT * FROM users WHERE name="ed" LIMIT 1;
# 批量插入
# session.add_all([
#     User(name='alex', fullname='Alex Li', password='456'),
#     User(name='alex', fullname='Alex old', password='789'),
#     User(name='peiqi', fullname='Peiqi Wu', password='sxsxsx')])
session.commit()
#print(">>>",session.query(User).filter_by(name='ed').first())
#print(session.query(User).all())
# for row in session.query(User).order_by(User.id):
#      print(row)
# for row in session.query(User).filter(User.name.in_(['alex', 'wendy', 'jack'])):#这里的名字是完全匹配
#     print(row)
# for row in session.query(User).filter(~User.name.in_(['ed', 'wendy', 'jack'])):
#     print(row)
#print(session.query(User).filter(User.name == 'ed').count())
#from sqlalchemy import and_, or_
# for row in session.query(User).filter(and_(User.name == 'ed', User.fullname == 'Ed Jones')):
#     print(row)
# for row in session.query(User).filter(or_(User.name == 'ed', User.name == 'wendy')):
#     print(row)

ORM查询

http://www.cnblogs.com/aylin/p/5770888.html

#limit索引取出第一二行数据 session.query(Person).all()[1:3] #order by,按照id从大到小排列 session.query(Person).ordre_by(Person.id) #equal/like/in query = session.query(Person) query.filter(Person.id==1).all() query.filter(Person.id!=1).all() query.filter(Person.name.like('%ay%')).all() query.filter(Person.id.in_([1,2,3])).all() query.filter(~Person.id.in_([1,2,3])).all() query.filter(Person.name==None).all() #and or from sqlalchemy import and_ from sqlalchemy import or_ query.filter(and_(Person.id==1, Person.name=='张岩林')).all() query.filter(Person.id==1, Person.name=='张岩林').all() query.filter(Person.id==1).filter(Person.name=='张岩林').all() query.filter(or_(Person.id==1, Person.id==2)).all() # count计算个数 session.query(Person).count() # 修改update session.query(Person).filter(id > 2).update({'name' : '张岩林'}) ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() ret = session.query(Person).join(Favor).all() ret = session.query(Person).join(Favor, isouter=True).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all()

relationship和join使用

转载自:http://www.cnblogs.com/coder2012/p/4746941.html

  • relationship
  • #!/usr/bin/env python
    # encoding: utf-8
    from sqlalchemy import create_engine
    from sqlalchemy import Column
    from sqlalchemy import Integer
    from sqlalchemy import String
    from sqlalchemy import ForeignKey
    from sqlalchemy.orm import backref
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy.orm import relationship, backref
    from sqlalchemy.ext.declarative import declarative_base
    Base = declarative_base()
    class User(Base):
        __tablename__ = 'users'
        id = Column(Integer, primary_key=True)
        name = Column(String(32))
        addresses = relationship("Address", order_by="Address.id", backref="user")
    class Address(Base):
        __tablename__ = 'addresses'
        id = Column(Integer, primary_key=True)
        email_address = Column(String(32), nullable=False)
        user_id = Column(Integer, ForeignKey('users.id'))
        #user = relationship("User", backref=backref('addresses', order_by=id))
    engine  = create_engine('mysql://root:root@localhost:3306/test', echo=True)
    #Base.metadata.create_all(engine) 
    jack = User(name='jack')
    jack.addresses = [Address(email_address='test@test.com'), Address(email_address='test1@test1.com')]
    session.add(jack)
    session.commit()
    

    不使用join可以直接联表查询

    session.query(User.name, Address.email_address).\
        filter(User.id == Address.user_id).\
        filter(Address.email_address == 'test@test.com').all()
    # SELECT users.name AS users_name, addresses.email_address AS addresses_email_address
    # FROM users, addresses
    # WHERE users.id = addresses.user_id AND addresses.email_address = %s
    # [('jack', 'test@test.com')]
    

    在sqlalchemy中提供了Queqy.join()函数

    # 有外键
    session.query(User).join(Address).filter(Address.email_address=='test@test.com').first()
    session.query(User).join(Address).filter(Address.email_address=='test@test.com').first().name
    session.query(User).join(Address).filter(Address.email_address=='test@test.com').first().addresses
    # 无外键
    query.join(Address, User.id==Address.user_id)    # explicit condition
    query.join(User.addresses)                       # specify relationship from left to right
    query.join(Address, User.addresses)              # same, with explicit target
    query.join('addresses') 
    
    from sqlalchemy.orm import aliased
    adalias1 = aliased(Address)
    

    例如我们需要如下的查询

    SELECT users.*, adr_count.address_count
    FROM users
    LEFT JOIN
      (SELECT user_id, count(*) AS address_count
       FROM addresses
       GROUP BY user_id
       ) AS adr_count ON users.id=adr_count.user_id;
    
    # 生成子句,等同于(select user_id ... group_by user_id)
    sbq = session.query(Address.user_id, func.count('*').label('address_count')).\
        group_by(Address.user_id).subquery()
    # 联接子句,注意子句中需要使用c来调用字段内容
    session.query(User.name, sbq.c.address_count).\
        outerjoin(sbq, User.id == sbq.c.user_id).all()
    

    包含contains

    query.filter(User.addresses.contains(someaddress))
    

    sqlalchemy(一)基本操作 TODO

    http://www.cnblogs.com/coder2012/p/4741081.html

    #获取所有数据
    session.query(Person).all()
    #获取某一列数据,类似于django的get,如果返回数据为多个则报错
    session.query(Person).filter(Person.name=='jack').one()
    #获取返回数据的第一行
    session.query(Person).first()
    #过滤数据
    session.query(Person.name).filter(Person.id>1).all()
    #limit
    session.query(Person).all()[1:3]
    #order by
    session.query(Person).ordre_by(-Person.id)
    #equal/like/in
    query = session.query(Person)
    query.filter(Person.id==1).all()
    query.filter(Person.id!=1).all()
    query.filter(Person.name.like('%ac%')).all()
    query.filter(Person.id.in_([1,2,3])).all()
    query.filter(~Person.id.in_([1,2,3])).all()
    query.filter(Person.name==None).all()
    #and or
    from sqlalchemy import and_
    query.filter(and_(Person.id==1, Person.name=='jack')).all()
    query.filter(Person.id==1, Person.name=='jack').all()
    query.filter(Person.id==1).filter(Person.name=='jack').all()
    from sqlalchemy import or_
    query.filter(or_(Person.id==1, Person.id==2)).all()
    
    # 使用text
    from sqlalchemy import text
    query.filter(text("id>1")).all()
    query.filter(Person.id>1).all() #同上
    query.filter(text("id>:id")).params(id=1).all() #使用:,params来传参
    query.from_statement(
        text("select * from person where name=:name")).\
        params(name='jack').all()
    
    #计数 count
    query.filter(Person.id>1).count()
    
    session.query(func.count('*')).select_from(Person).scalar()
    session.query(func.count(Person.id)).scalar()
    

    Query

    resultProxy=db.execute("select * from users")
    resultProxy.close(), resultProxy 用完之后, 需要close
    resultProxy.scalar(), 可以返回一个标量查询的值
    ResultProxy 类是对Cursor类的封装(在文件sqlalchemy\engine\base.py),
    ResultProxy 类有个属性cursor即对应着原来的cursor.
    ResultProxy 类有很多方法对应着Cursor类的方法, 另外有扩展了一些属性/方法.
    resultProxy.fetchall()
    resultProxy.fetchmany()
    resultProxy.fetchone()
    resultProxy.first()
    resultProxy.scalar()
    resultProxy.returns_rows  #True if this ResultProxy returns rows.
    resultProxy.rowcount  #return rows affected by an UPDATE or DELETE statement. It is not intended to provide the number of rows present from a SELECT.
    

    **遍历ResultProxy时, 得到的每一个行都是RowProxy对象, 获取字段的方法非常灵活, 下标和字段名甚至属性都行. rowproxy[0] == rowproxy['id'] == rowproxy.id, 看得出 RowProxy 已经具备基本 POJO 类特性. **
    http://blog.csdn.net/mmx/article/details/48064109

    delete and remove

    session.delete(jack)
    session.query(User).filter_by(name='jack').count()
    session.query(User).filter_by(name='jack').remove()
    session.commit()