关于mongoDB插入去重以及高并发问题

最近在项目中碰到过向mongoDB插入数据去重问题。一开始我的想法直接用upsert,我的项目部分代码如下:

        //使用Upsert进行插入,如果存在就更新,不存在则插入
        //根据报告时间和code进行筛选去重
        Query query = new Query();
        query.addCriteria(Criteria.where("reporttimeStamp").is(meteorologicalData.getReporttimeStamp()));
        query.addCriteria(Criteria.where("adcode").is(meteorologicalData.getAdcode()));
        //建立更新update类
        Update update=new Update();
        //下面步骤可以替换成想要的对象。这里我用的反射让每个属性值赋值在update集合中。
        Field[] declaredFields = MeteorologicalData.class.getDeclaredFields();
        for (Field declaredField : declaredFields) {
            declaredField.setAccessible(true);
            update.set(declaredField.getName(),declaredField.get(meteorologicalData));
            declaredField.setAccessible(false);
        //插入对应集合
        mongoTemplate.upsert(query,update,MeteorologicalData.class);

虽然解决了插入去重问题,但是发现在高并发情况下,依然会出现重复插入问题,研究一番之后,把问题锁定在了多进程同时更新数据库时产生的upsert问题。这里讨论一个简化模型来阐明此问题并且论述解决方法。

为了简化讨论,这里讨论一个投票程序。某城市人民选举该市市长。每个市民选择一个人为他/她投票。由于该选举为广泛选举,在选举之前并没有候选人集合,而是每个市民可以把票投给任何一个人。为简化问题,假设城市里没有人重名,因此每个人的名字可以作为其唯一代号使用。实际情况中,可以使用其他唯一表示,比如身份证号等等。

这里的数据集非常简单:

Collection: db.election
{'name': 'Alice', 'votes': 10}
{'name': 'Bob', 'votes': 21}

作为一个计票的进程,主要任务就是拿过一张选票,查看其name属性,在数据库中给名字为name的文档的票数加1。注意,这里name不一定已经存在于数据库中。如果此名字不存在,则应新建一条文档。

在只有一个进程运行的情况下,这段代码虽然速度并不快,但会给出正确的计票结果。如果我们使用多进程,创建几个worker,分别收集选票,给指定的被选人计票,会怎么样呢?

多进程下的写入矛盾

当简单地把上面的upsert交给几个进程来处理的时候,我们会发现运行结果出了这样的问题:每个被选人的票数似乎少了很多,而被选人的数量增加了。仔细检查会发现,其实是同一个候选人被创建了多个文档。为什么会导致这样的写入错误呢?不难想象到,这是多个进程同时试图更新一个文档的时候导致的。

需要注意的是,MongoDB本身是有文档级的写入锁的。也就是说,当一个进程开始修改一个文档时,该文档被锁定,其他文档不可以再对其进行写入甚至读取。这个写入锁的存在本身就是为了防止不同程序更新文档时产生的写入冲突。然而,update其实分为两步。首先是搜索文档位置,然后是文档更新。当两个程序同时试图更新一个不存在的文档的时候,假设程序A先发现文档不存在,然后程序B发现文档不存在。此时A还没来得及对文档进行写入,因此文档锁并没有挂起。或者说,由于文档不存在,讨论文档锁也就失去了意义。这个时候,两个进程就会分别创建文档并给其votes加1。于是就出现了不必要的重复。

如何解决?

解决方法其实很简单:unique index。 上文提到,name属性是唯一的。如果我们给它加一个唯一索引,不就可以从根本上避免一个人有多个不同的文档了吗?
这个时候,即使两个进程经过搜索都得到了某个文档不存在的结果,假设A先一步创建了该文档,那么当B创建文档时,由于含有相同name的文档已经被A进程抢先创建,MongoDB就会拒绝B进程创建。pymongo对此类错误应该是有应对机制的,这是B进程会稍等片刻,重新尝试更新文档。这个时候,A进程已经完成计票并且释放了写入锁,文档被成功创建,而进程B再尝试时,也会检索到这个被新创建的文档,直接在上面把票数加1,而不是创建新文档。这样一个小的时间差,就解决了写入矛盾。

同时,我们还得到了额外的奖励:当name上创建了unique index之后,找到特定候选人的速度就会快很多。这个优势在计票初期,候选人数量不多时并没有显示,但当后期候选人数变多时,一方面再有新的候选人被加入的概率会变得很小(该被加的差不多都被加进来了),因此修改索引的几率越来越少;另一方面,在候选人基数变得很大的时候,相比于没有索引的情况,有唯一索引的情况下程序的速度优势会越发明显。这两个方面综合在一起,结果就是,添加唯一索引之后程序在后期速度优势会越来越明显。在我自己的程序中,运行初期多线程比单线程只快了三四倍,但在数据量较大时,多线程(加上唯一索引)会比单线程快10到20倍。这多处来的速度,就是唯一索引导致的。

虽然这样能解决高并发产生的重复插入问题,但是我们发现db进行upsert的速度越来越慢,以前两个小时就能消费完队列里的数据,现在需要四五个小时,并且消耗时间是呈现不断上升的趋势。所以我觉得应该是和upsert这个操作有关。

  • 问题定位:

由于是写多读少的场景,所以我们并没有对集合加入索引。并且经查阅资料发现,mongodb索引的存储机制和mysql不同,mysql的索引是存储在硬盘中,需要时会调用部分到内存中。而mongodb的索引则是直接存储在内存和临时文件中,并且和内存大小限制有很直接的关系,如果超过内存限制,则从硬盘加载索引。所以mongodb索引的使用,在大数据集合面前,会面临内存耗尽的风险。

下面这个链接是官方对索引使用限制的说明:
https://docs.mongodb.com/manual/reference/limits/#index-limitations
官方介绍

另外upsert操作会先在集合中进行数据查找,如果数据已经存在,则更新,否则才插入。数据的查找那就势必会使用索引,mongo索引用的是B树,时间复杂度为Olog(n),而没有索引的情况下则时间复杂度是O(n),差别见下图:时间复杂度曲线
问题显而易见了,随着数据日益增多,upsert性能是线性下滑的,所以后来的想法就是,如果有高并发的情况下,还是先进行对数据库查询然后在进行insert,这个时候对这些操作进行加锁操作,当然会对速度很有影响,但是至少规避了后期内存耗尽风险。如果是大公司有钱那就用upsert+唯一索引来解决吧,内存不够就扩充内存,钞能力可以解决任何问题。

参考文档:https://blog.csdn.net/jeffrey11223/article/details/80366368

关于mongoDB插入去重以及高并发问题最近在项目中碰到过向mongoDB插入数据去重问题。一开始我的想法直接用upsert,我的项目部分代码如下: //使用Upsert进行插入,如果存在就更新,不存在则插入 //根据报告时间和code进行筛选去重 Query query = new Query(); query.addCriteria(Criteria.where("reporttimeStamp").is(meteorologicalDa BSON 是一种类似 JSON 的二进制形式的存储格式,是 Binary JSON 的简称。 插入文档 MongoDB 使用 insert() 或 save() 方法向集合中插入文档,语法如下: db.COLLECTION_NAME.insert(document) 以下文档可以存储在 MongoDB 的 runoob 数据库 的 col 集合中: >db.col.insert({title: 'Mo
MongoDB的并发 线上环境遇到MongoDB的性能瓶颈,为了解决性能瓶颈学习了一下MongoDB中的并发机制,记录如下。下文中主要是对比了MongoDB 2.2和3.0.7这两个版本的并发机制。 1. MongoDB锁的类型 在2.2版本中MongoDB用的是读写锁,允许并行的读但是只能互斥的写,当一个读锁存在的时候可以有多个读操作共享这个锁,但是当一个写锁存在的时候只能有一个写操作获得这个
MongoDB允许多个客户端读取和写入相同的数据。 为了确保一致性,它使用lock和其他并发控制措施来防止多个客户端同时修改同一条数据。 总之,这些机制保证对单个文档同时只可能被一个客户端写入,并且客户端永远不会看到数据的不一致视图。 MongoDB使用什么类型的锁? MongoDB使用多粒度锁,允许操作锁定全局,数据库或集合级别,并允许各个存储引擎在集合级别下实现自己的并发控制(例如,在W...
MongoDB是一个文档数据库(以JSON为数据模型),由C++语言编写。MongoDB的数据是存储在硬盘上的,只不过需要操作的数据会被加载到内存中提高效率,所以MongoDB本身很吃内存。(本文章使用4.x版本,自带分布式事务) MongoDB基于灵活的JSON文档模型,非常适合敏捷式快速开发。与此同时,其与生俱来的高可用、高水平扩展能力使它在处理海量、高并发数据应用时颇具优势。如何考虑是否选择MongoDB? 没有某个业务场景必须要使用MongoDB才能解决,但使用MongoDB通常能让你以更低的成本
以下摘自pymongo文档: update_one(filter,update,upsert=False) update_many(filter,update,upsert=False) filter: A query that matches the document to update. update: The modifications to apply.
高并发操作MongoDB性能优化 一,代码层级优化 采用批量数据方式操作MongoDB。将数据信息存放在消息队列中进行缓存,然后定时定量的去获取队列消息,触发连接MongoDB获取查询结果。 所谓定时定量是指每一段时间或消息队列达到某一数值。示例:每10秒或队列消息数达到200时提交一次统计事件。 优点:减少网络传输的IO,同时减少SQL语句解析的次数。降低MongoDB日志刷盘的数据量和频率...
方法一:去重 blackListIp.setIp = "192.168.0.1"; blackListIpRepository.save(blackListIp.getIp);//存入一条数据库中已经有ip=192.168.0.1的记录。现在这个ip的记录有两条。 blackListIpRepository.delete
MongoDB 中,可以使用 `distinct()` 方法来进行去重。这个方法可以应用在一个集合中的某个字段,返回不同的值。例如,假设我们有一个集合叫做 `users`,其中有一个字段叫做 `username`,我们可以使用如下的命令来获取所有不同的用户名: db.users.distinct("username") 这将返回一个包含所有不同用户名的数组。需要注意的是,`distinct()` 方法只能用于一个字段,如果你想要对多个字段进行去重,需要使用聚合框架中的 `$group` 操作符。