So I am going through the docs on how to add an insert + update hook on a specific model and I have a few questions
statement = update(Model).where(Model.id == id).values(name="hi")
res = (await session.execute(update_statement)).scalar()
To get that to work the docs point me to this section.
Instead I think I need to do:
model = Model(name="hi", id=id)
await session.merge(model)
await session.flush()
It kind of feels like the gotcha that depending on how I insert my data I have to have the logic in different places. I have been mixing and matching the two above patterns. Is there one hook that could apply to both?
When applying the hook that can be used to capture session.execute events, it seems I must apply it to the session instead of a per model basis. Is there really no way to capture it at model level?
If I am stuck applying it to the session level, how do I best go about doing that?
My understanding is you create the session from the engine at the beginning of your api endpoint and then you do your operations and then at the end of the api endpoint you close the connection. This makes sure you do not have any dangling db connections. Essentially like this: https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#sqlalchemy.ext.asyncio.async_sessionmaker
But the do_orm_execute event wants an instance of the session?? So how do I get access to it if the session is a run-time concept?
Right now I am doing:
# boot.py
async_session_maker = async_sessionmaker(engine)
session = async_session_maker()
session.sync_session
event.listen(session.sync_session, "do_orm_execute", hook_fn)
async def api_endpoint(async_session_maker):
session = async_session_maker()
await session....
But it feels antithetical to how a session is suppose to be used and more importantly the hook is not getting applied. So I think I am missing something.
So I am going through the docs on how to add an insert + update hook on a specific model and I have a few questions
1. I want to call an async lib in the hook callback. It seems like all the [ORM events](https://docs.sqlalchemy.org/en/20/orm/events.html) are sync though?
the events are run in "sync style" but are still calling into the async driver. the section at https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#using-events-with-the-asyncio-extension gets into this in some detail
Is there any way to await in the callback or do I have to turn my async function to sync? If I have to turn my async func to sync would this be the preferred method result = asyncio.run(my_asyn…
So I am going through the docs on how to add an insert + update hook on a specific model and I have a few questions
1. I want to call an async lib in the hook callback. It seems like all the [ORM events](https://docs.sqlalchemy.org/en/20/orm/events.html) are sync though?
the events are run in "sync style" but are still calling into the async driver. the section at https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#using-events-with-the-asyncio-extension gets into this in some detail
Is there any way to await in the callback or do I have to turn my async function to sync? If I have to turn my async func to sync would this be the preferred method result = asyncio.run(my_async_function()) ?
you can leave your functions as async in the event hooks, but you have to write some adapter logic for them, see the example at https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#using-awaitable-only-driver-methods-in-connection-pool-and-other-events . this is new stuff, im not sure offhand why that "lambda" step is in there...
2. It looks like the after_insert/after_update hooks are supported [here](https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.MapperEvents.after_insert). I was having trouble getting this to work though but then I noticed "this event only applies to the session flush operation and does not apply to the ORM DML operations".
it's a very new world because everyone is coming across this now, and really nothing has changed for 15 years with this, it's just that the standalone insert() / update() / delete() integration with the ORM is now very prominent in 2.0's documentation, whereas previously they weren't, and before 1.4 the insert() function wasn't ORM integrated at all. So now everyone is coming to us with this new bit of confusion, which is based around the fact that the ORM Session has this classic "unit of work" way of operating, which is what we've had for years. the "bulk insert" thing is new, and is intended to be very WYSIWYG about things. It seems to be popular. But the after_insert() event has nothing to do with it, nor could it. Where we probably will go here is changing the names of the unit of work events to be less misleading in this regard.
I have been doing:
statement = update(Model).where(Model.id == id).values(name="hi")
res = (await session.execute(update_statement)).scalar()
To get that to work the docs point me to this section.
It kind of feels like the gotcha that depending on how I insert my data I have to have the logic in different places. I have been mixing and matching the two above patterns. Is there one hook that could apply to both?
A statement like update(Model) might be running a single, small UPDATE statement that on the database server might affect 25000 rows. The way that operation proceeds is 1. create the SQL string 2. send SQL string over the network 3. wait for database to say "Done" with maybe a single number of rows affected returned. A few hundred bytes of network overhead.
To fulfill the contract of after_update(), how would this be proposed? This means the above operation happens, but in addition to that operation, it would mean 4. retrieve each of those 25000 rows over the network (all columns) 5. construct a complete Model() object from each one 6. present them each one by one to the after_update() event in your Python code. We'd have a big crowd with pitchforks at our door if we did that, since it would pretty much ruin any usefulness of being able to run a simple UPDATE statement.
It turns out you can in fact get back your Model objects from the update(), you would just need to be explicit and use update().returning() .
However, this UPDATE statement is something you're running, right there, using session.execute(). you can get the objects back right there. The after_update() and similar events is not really needed there, the reason there are events like after_update() is to allow visibility into the internals of the unit of work flush process, which isn't used here. Again I think the reason people are coming to us with some confusion is the naming.
Instead I think I need to do:
model = Model(name="hi", id=id)
await session.merge(model)
await session.flush()
maybe. You might want to note that that merge() is running a SELECT statement. If you are doing this for hundreds of rows at a time you might want to SELECT those objects up front and then modify them directly.
3. When applying the hook that can be used to capture `session.execute` events, it seems I must apply it to the session instead of a per model basis. Is there really no way to capture it at model level?
There might be other ways to achieve what you want to do without using the persistence events. depends on what the use case is.
My understanding is you create the session from the engine at the beginning of your api endpoint and then you do your operations and then at the end of the api endpoint you close the connection. This makes sure you do not have any dangling db connections. Essentially like this: https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#sqlalchemy.ext.asyncio.async_sessionmaker
that's the classic pattern, yup
But the do_orm_execute event wants an instance of the session?? So how do I get access to it if the session is a run-time concept?
Right now I am doing:
# boot.py
async_session_maker = async_sessionmaker(engine)
session = async_session_maker()
session.sync_session
event.listen(session.sync_session, "do_orm_execute", hook_fn)
this might improve later on but the pattern is at https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#examples-of-event-listeners-with-async-engines-sessions-sessionmakers where you put an explicit sessionmaker inside the async sessionmaker:
import asyncio
from sqlalchemy import event
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import sessionmaker
sync_maker = sessionmaker()
maker = async_sessionmaker(sync_session_class=sync_maker)
@event.listens_for(sync_maker, "before_commit")
def before_commit(session):
print("before commit")
wow we made it. This is new stuff, thanks for playing along.
Okay it sounds like i need to do this one:
https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.SessionEvents.after_commit
But then how do i add the hook to the session given the session is a runtime object?
engine = create_async_engine(
settings.db.URL,
echo=settings.db.ECHO,
echo_pool=settings.db.ECHO_POOL,
json_serializer=msgspec.json.Encoder(enc_hook=_default),
max_overflow=settings.db.POOL_MAX_OVERFLOW,
pool_size=settings.db.POOL_SIZE,
pool_timeout=settings.db.POOL_TIMEOUT,
poolclass=NullPool if settings.db.POOL_DISABLE else None,
"""Configure via [DatabaseSettings][starlite_saqpg.settings.DatabaseSettings].
Overrides default JSON
serializer to use `orjson`. See [`create_async_engine()`][sqlalchemy.ext.asyncio.create_async_engine]
for detailed instructions.
async_session_factory = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
"""Database session factory.
See [`async_sessionmaker()`][sqlalchemy.ext.asyncio.async_sessionmaker].
and then at my endpoint I call async_session_factory()
edit:
Based on your comments it seems this is what i want
# https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.MapperEvents.after_update
def update_redis(_1, connection, target: User) -> None:
auth_dto = AuthSessionSchema.from_orm(target)
print(connection)
connection.info["update_user"] = {"auth_dto": auth_dto, "target": target}
# keep existing TTL and only set if already exists in redis
# sync_redis.set(auth_namespace(target.email), auth_dto.json(), xx=True, keepttl=True)
return
def update_test(session) -> None:
print(session)
user_info = session.info.get("update_user")
if user_info != None:
target = user_info["target"]
auth_dto = user_info["auth_dto"]
sync_redis.set(auth_namespace(target.email), auth_dto.json(), xx=True, keepttl=True)
return
event.listen(User, "after_insert", update_redis)
event.listen(User, "after_update", update_redis)
event.listen(engine.sync_engine, "after_commit", update_test)
only issue is the code doesn't compile and it gives me this error:
_dev | Process SpawnProcess-58:
app_dev | Traceback (most recent call last):
app_dev | File "/usr/local/lib/python3.11/site-packages/sqlalchemy/event/base.py", line 162, in getattr
app_dev | ls = self._empty_listeners[name]
app_dev | ~~~~~~~~~~~~~~~~~~~~~^^^^^^
app_dev | KeyError: 'after_commit'
app_dev |
app_dev | During handling of the above exception, another exception occurred:
app_dev | File "/app/app/domain/users.py", line 86, in
app_dev | event.listen(engine.sync_engine, "after_commit", update_test)
app_dev | File "/usr/local/lib/python3.11/site-packages/sqlalchemy/event/api.py", line 124, in listen
app_dev | _event_key(target, identifier, fn).listen(*args, **kw)
app_dev | File "/usr/local/lib/python3.11/site-packages/sqlalchemy/event/registry.py", line 291, in listen
app_dev | dispatch_collection = getattr(target.dispatch, identifier)
app_dev | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
app_dev | File "/usr/local/lib/python3.11/site-packages/sqlalchemy/event/base.py", line 164, in getattr
app_dev | raise AttributeError(name)
app_dev | AttributeError: after_commit
edit2:
Based on above i also tried:
sync_maker = sessionmaker()
async_session_factory = async_sessionmaker(
engine, expire_on_commit=False, class_=AsyncSession, sync_session_class=sync_maker
event.listen(sync_maker, "after_commit", update_test)
And it compiles but then update_test is never invoked.
after_commit is also a session event, so you want to associate it with the async_session / session, not the engine:
https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.SessionEvents.after_commit
https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#examples-of-event-listeners-with-async-engines-sessions-sessionmakers
Yeah i actually tried this example as well:
When i do that code, in the after commit hook session.info is always an empty dict. I correctly modify session.info in my after_update hook. I can log that value just fine. It is just in the after commit hook that it seems to lose the info.
Where my code differs from the example is i use async_sessionmaker to create a factory and create a new session per endpoint, as mentioned before.
When in the example they use the same session the whole time: session = AsyncSession(engine).
The docs mention the hook is run on all session instances but it seems like it is on a different session.
Is that a bug (me losing session.info)? or is there another way i should create my after_commit hook?
edit:
previously I said this method:
event.listen(sync_maker, "after_commit", update_test) never invoked update_test . Wanted to note that was incorrect. I am seeing it invoked it is just session.info is an emtpy dict ({})
FYI it would be super helpful if you used textual code examples inside of a block rather than screenshots, so I can cite / rework the code given
When i do that code, in the after commit hook session.info is always an empty dict. I correctly modify session.info in my after_update hook. I can log that value just fine. It is just in the after commit hook that it seems to lose the info.
if you have an asyncsession, do some work on it, and commit, everything occurs with the same underlying session and the event hook will get the same session in every case.
the "after_update" event hook is a mapper hook, so in that case, it is not directly passed the session, you need to get it from the object that's passed, using inspect(object).session.
try out this demo that's using the same calling form you illustrated
from __future__ import annotations
import asyncio
import datetime
from typing import Optional
from sqlalchemy import event
from sqlalchemy import func
from sqlalchemy import inspect
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import Session
Base = declarative_base()
class A(Base):
__tablename__ = "a"
id: Mapped[int] = mapped_column(primary_key=True)
data: Mapped[Optional[str]]
create_date: Mapped[datetime.datetime] = mapped_column(
server_default=func.now()
async def async_main():
engine = create_async_engine(
"postgresql+asyncpg://scott:tiger@localhost/test",
echo=True,
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
session = AsyncSession(engine)
@event.listens_for(session.sync_session, "before_commit")
def bc(session):
session.info["before_commit_hook"] = "yup"
@event.listens_for(Session, "after_commit")
def ac(session):
print(
f"before_commit hook: {session.info['before_commit_hook']} "
f"after update hook: {session.info['after_update_hook']}"
@event.listens_for(A, "after_update")
def au(mapper, connection, target):
session = inspect(target).session
session.info["after_update_hook"] = "yup"
async with session.begin():
a1 = A(data="data")
session.add(a1)
await session.flush()
# make sure there's an update
a1.data = "new data"
# commit will be called outside the block
asyncio.run(async_main())
output:
2023-03-28 08:50:16,819 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-03-28 08:50:16,820 INFO sqlalchemy.engine.Engine INSERT INTO a (data) VALUES ($1::VARCHAR) RETURNING a.id, a.create_date
2023-03-28 08:50:16,821 INFO sqlalchemy.engine.Engine [generated in 0.00013s] ('data',)
2023-03-28 08:50:16,823 INFO sqlalchemy.engine.Engine UPDATE a SET data=$1::VARCHAR WHERE a.id = $2::INTEGER
2023-03-28 08:50:16,823 INFO sqlalchemy.engine.Engine [generated in 0.00019s] ('new data', 1)
2023-03-28 08:50:16,825 INFO sqlalchemy.engine.Engine COMMIT
before_commit hook: yup after update hook: yup