I would say that more than 80% of the code of a backend application is related to the database; it is the data that we care about(Am I exaggerating too much?). And as you know, any data manipulation(create, update, and delete) must be transactional.
However, since many parts of the application should be involved in a transaction, here is a concern: how could we separate the database interaction layer from the other parts of the application? For example, we don’t want to explicitly call DB commit methods inside the service layer of a MVC structure.
Java Spring already has a smart solution for this:
@Transactional annotation
(which I came to know from reading
this book
). By wrapping a function with this decorator, we have cleaner and more decoupled code layers while adhering to the DRY principle.
However, at the moment as far as I know, there are no corresponding annotation features in the FastAPI community, and there are only a few articles I found that could be used as references:
Hence in this article, I would like to review these two articles and how I approached to this feature based on these articles.
Remark
: the code examples here are using SQLAlchemy’s asynchronous APIs. But it can be applied in almost the same manner to the synchronous APIs as well.
One thing to notice about this code is that it uses contextvars STL in Python. The author says it is for accessing the current session like a global variable.
However, there is a very important topic related to this contextvars that the author doesn’t mention anymore. Since any backend applications run in concurrent manners, we should manage our session in thread-safe way. According to the SQLAlchemy documentation, we should associate the current session with the current request, and here we see not much such consideration within the author’s code.
So here we have this following question to be resolved: How to associate the current session with the incoming request? Since the documentation “strongly” recommends to follow the integration tool that the backend framework provides, rather than using scoped_session API, we need to look into how FastAPI manages a database session first.
But this approach is not compatible with our decorator annotation approach: we want to access a session as a “global” variable(global but per-request specific). And because we won’t use any session object inside our domain logic as well, the database session has no way to interact with the request explicitly.
There is another approach provided by the same documentation page: use middleware(kind of an old approach, but suits our needs). Inside the middleware we can directly touch the request object. Here we apply the technique we learned from Kosntantine Dvalishvili’s approach: use contextvars for a “global” object.
But wait, how could we map a request to a single database session? As the SQLAlchemy documentation pointed out, it would be the best if FastAPI provides such a mechanism. However, there seems to be no such functionality available at the moment. Namely, we have to provide such functionality on our own.
Would we need a separate "global" Python dictionary object mapping these two, like this?
Remark: this reference is written in Korean, although you could just read the code and see what the author tries to achieve.
So this approach is basically combining the two elements I mentioned previously: accessing the current session using contextvars, and matching it with the current incoming request session. If we use scoped_session, it uses scopefunc that is passed by the user in order to map the current context to one of the database sessions. Under the hood, it is just a simple Python dictionary.
Thus by passing the function get_session_context to the parameter scopefunc, we can smoothly map the current request session to a single database session only.
The below code is a rewritten version of the original code of the author:
Here the author uses uuid4 function for setting up the session ids, but since we already have hash for the request objects, we can just simply use hash(request). This would be also good for debugging, since we can identify a certain request when it goes wrong.
As the first approach did, we don’t want to have nested transactions for a simpler design.
We can simply wrap all those explicit commit, rollback, or close methods using a single context manager: with session.begin()from..configimportconfig# some hints from: https://github.com/teamhide/fastapi-boilerplate/blob/master/core/db/session.py
db_session_context:ContextVar[Optional[int]]=ContextVar("db_session_context",default=Noneengine=create_async_engine(url=config.DB_URL)defget_db_session_context()->int:session_id=db_session_context.get()ifnotsession_id:raiseValueError("Currently no session is available")returnsession_iddefset_db_session_context(*,session_id:int)->None:db_session_context.set(session_id)AsyncScopedSession=async_scoped_session(session_factory=async_sessionmaker(bind=engine,autoflush=False,autocommit=False),scopefunc=get_db_session_context,defget_current_session()->AsyncSession:returnAsyncScopedSession()Enter fullscreen modeExit fullscreen modefrom..utils.loggerimportget_loggerfrom._sessionimportget_current_session,get_db_session_contextAsyncCallable=Callable[...,Awaitable]logger=get_logger(filename=__file__)deftransactional(func:AsyncCallable)->AsyncCallable:@functools.wraps(func)asyncdef_wrapper(*args,**kwargs)->Awaitable[Any]:try:db_session=get_current_session()ifdb_session.in_transaction():returnawaitfunc(*args,**kwargs)asyncwithdb_session.begin():# automatically committed / rolled back thanks to the context manager
return_value=awaitfunc(*args,**kwargs)returnreturn_valueexceptExceptionaserror:logger.info(f"request hash: {get_db_session_context()}")logger.exception(error)raisereturn_wrapperEnter fullscreen modeExit fullscreen mode
fromtypingimportCallable,AwaitablefromfastapiimportRequest,Response,statusasHTTPStatusfrom._sessionimportset_db_session_context,AsyncScopedSessionasyncdefdb_session_middleware_function(request:Request,call_next:Callable[[Request],Awaitable[Response]])->Response:response=Response("Internal server error",status_code=HTTPStatus.HTTP_500_INTERNAL_SERVER_ERRORtry:set_db_session_context(session_id=hash(request))response=awaitcall_next(request)finally:awaitAsyncScopedSession.remove()# this includes closing the session as well
set_db_session_context(session_id=None)returnresponseEnter fullscreen modeExit fullscreen mode
Accessing a request-specific database session using contextvars(STL) and scoped_session(SQLAlchemy)
Implementing a FastAPI middleware function in order to directly access the incoming request object
Avoiding nested transactions
Simpler transaction code using the context manager of session.begin()
Thank you for reading this long article. Please leave a comment if you have any ideas on this post. Have a nice day!
async with db_session.begin():
# automatically committed / rolled back thanks to the context manager
return_value = await func(*args, **kwargs)
Enter fullscreen modeExit fullscreen mode
Thanks for the comment.
I rechecked the docs and source code and thanks to the context manager, we don't have to explicitly call session.commit()
Only from the information you gave, I am not sure of what the cause of the problem is. The only point that I can think of is the dependency injection part where Depends() is called, since it caches the values.
Could you provide more context?
Built on Forem — the open source software that powers DEV and other inclusive communities.