问题描述
最近在项目中因为涉及到
多表数据同步
,需要原子性操作多个表,所以使用了MongoDB Transaction。
在使用的过程中,Transaction中表的写操作,遇到了
WriteConflict
报错,具体如下。
WriteCommandError({
"errorLabels" : [
"TransientTransactionError"
"operationTime" : Timestamp(1596785629, 1),
"ok" : 0,
"errmsg" : "WriteConflict error: this operation conflicted with another operation. Please retry your operation or multi-document transaction.",
"code" : 112,
"codeName" : "WriteConflict",
根据报错,查了相关资料。原因是 由于MongoDB Server在处理并发请求时采用了 乐观锁
,不同于MySql中的的悲观锁
(如果有并发修改,会卡住后来请求等待锁的释放),乐观锁
基于版本控制的思路,不加锁,先修改,如果在修改过程中,有并发写操作,版本号对不上,则再报错。对应MongoDB Server侧抛出WriteConflict
,写冲突异常。可以结合官方文档查看。
In-progress Transactions and Write Conflicts
If a transaction is in progress and a write outside the transaction modifies a document that an operation in the transaction later tries to modify, the transaction aborts because of a write conflict.
If a transaction is in progress and has taken a lock to modify a document, when a write outside the transaction tries to modify the same document, the write waits until the transaction ends.
第一段:两个transction操作中同时修改一个document情况,后修改的transcation中操作会被MongoDB Server抛出WriteConflict
, 如下图
官方已给出答案,就是在Mongo client端(也是driver端),catch error,然后重试。
贴一段官方代码:
async function commitWithRetry(session) {
try {
await session.commitTransaction();
console.log('Transaction committed.');
} catch (error) {
if (error.hasErrorLabel('UnknownTransactionCommitResult')) {
console.log('UnknownTransactionCommitResult, retrying commit operation ...');
await commitWithRetry(session);
} else {
console.log('Error during commit ...');
throw error;
async function runTransactionWithRetry(txnFunc, client, session) {
try {
await txnFunc(client, session);
} catch (error) {
console.log('Transaction aborted. Caught exception during transaction.');
// If transient error, retry the whole transaction
if (error.hasErrorLabel('TransientTransactionError')) {
console.log('TransientTransactionError, retrying transaction ...');
await runTransactionWithRetry(txnFunc, client, session);
} else {
throw error;
async function updateEmployeeInfo(client, session) {
session.startTransaction({
readConcern: { level: 'snapshot' },
writeConcern: { w: 'majority' },
readPreference: 'primary'
const employeesCollection = client.db('hr').collection('employees');
const eventsCollection = client.db('reporting').collection('events');
await employeesCollection.updateOne(
{ employee: 3 },
{ $set: { status: 'Inactive' } },
{ session }
await eventsCollection.insertOne(
employee: 3,
status: { new: 'Inactive', old: 'Active' }
{ session }
try {
await commitWithRetry(session);
} catch (error) {
await session.abortTransaction();
throw error;
return client.withSession(session =>
runTransactionWithRetry(updateEmployeeInfo, client, session)
针对transaction有几点实用的调优方式,如下
a) 避免使用transaction
可以通过一些写法避免transaction, 这里有个思路,适合数据量比较小的场景。
try {
await session.withTransaction(async() => {
await db.collection('branches').updateOne({
_id: fromBranch
$inc: {
balance: -1 * dollars
session
await db.collection('branches').
updateOne({
_id: toBranch
$inc: {
balance: dollars
session
}, transactionOptions);
} catch (error) {
console.log(error.message);
如果branch数量比较小,可以考虑合并成一个document,类似:
mongo > db.embeddedBranches.findOne(); {
"_id": 1,
"branchTotals": [{
"branchId": 0,
"balance": 101208675
"branchId": 1,
"balance": 98409758
"branchId": 2,
"balance": 99407654
"branchId": 3,
"balance": 98807890
则可以不用transaction,
try {
let updateString =
`{"$inc":{
"branchTotals.` + fromBranch + `.balance":` + dollars + `,
"branchTotals.` + toBranch + `.balance":` + dollars + `}
let updateClause = JSON.parse(updateString);
await db.collection('embeddedBranches').updateOne({
_id: 1
}, updateClause);
} catch (error) {
console.log(error.message);
b) 移动transaction中op顺序,减少临界区
拿一个转账交易场景举例, 3个op, op1为全局交易计数,op2为某个账户余额减少,op3为某个账户余额增加。
1 await session.withTransaction(async () => {
2 await db.collection('txnTotals').
3 updateOne({ _id: 1 },
4 { $inc: { counter: 1 } },
5 { session });
6 await db.collection('accounts').
7 updateOne({ _id: fromAcc },
8 { $inc: { balance: -1*dollars } },
9 { session });
10 await db.collection('accounts').
11 updateOne({ _id: toAcc },
12 { $inc: { balance: dollars } },
13 { session });
14 }, transactionOptions);;
对于每一笔交易,此时临界区从第2行就开始了,而如果把op1移到尾部(原op3的位置),临界区则从第10行才开始,临界区范围降低,可减少碰撞几率,减少写冲突。
c) 切分热点数据(Partitioning Hot Documents)
还是以这个全局计数举例,假设每个交易的transaction中都有这个op,则txnTotals无疑是一个热点数据,
await db.collection('txnTotals').updateOne(
{ _id: 1 },
{ $inc: {counter: 1} },
{ session }
则我们可以 将其切分(Partitioning)降低热度,如下
let id = Math.floor(Math.random() * 10);
await db.collection('txnTotals').updateOne(
{ _id: id },
{ $inc: {counter: 1}},
{ session }