Spring MongoTemplate批量操作源码跟踪与最佳实践

MongoTemplate介绍

MongoTemplate是Spring-data-mongodb实现的接口,用于对mongodb数据库的操作。绝大部分操作都包含在内。本文使用的包版本为 spring-data-mongodb-2.1.5.RELEASE.jar ,其他版本实现核心逻辑大致不变

MongoDb批量操作

db.collection.insertMany()

给定一个文档数组, insertMany() 将数组中的每个文档插入集合中。默认情况下,按顺序插入文档。如果 ordered 设置为false,则文档将以无序格式插入,并且可以通过重新排序来提高性能。如果使用无序,则应用程序不应依赖于插入的顺序。每个组中的操作数不能超过 maxWriteBatchSize 数据库的值。从MongoDB 3.6开始,此值为 100,000 。该值显示在 isMaster.maxWriteBatchSize 字段中。当然,很多高级API会对insert内容进行分割。

db.collection.bulkWrite()

批量操作,可以批量执行一组语句,与redis事务类似,默认为有序。当ordered设置为true的时候,当执行语句遇到错误则中断后续执行,当ordered设置为false的时候遇到错误继续执行。需要注意的是bulkWrite()只支持以下操作

  • insertOne
  • updateOne
  • updateMany
  • deleteOne
  • deleteMany
  • replaceOne
  • MongoTemplate中的批量操作

    insertAll方法

    方法定义为:
    public <T> Collection<T> insertAll(Collection<? extends T> objectsToSave)
    用于插入默认文档,入参与返回均为集合类型。
    实现源码如下,部分代码已打注释

    protected <T> Collection<T> doInsertAll(Collection<? extends T> listToSave, MongoWriter<T> writer) {
          //创建以collection为key的插入内容的List 
          Map<String, List<T>> elementsByCollection = new HashMap();
           //创建用于保存结果的数组
           List<T> savedObjects = new ArrayList(listToSave.size());
           //开始遍历
           Iterator var5 = listToSave.iterator();
           while(var5.hasNext()) {
               T element = var5.next();
               if (element != null) {
                   MongoPersistentEntity<?> entity = (MongoPersistentEntity)this.mappingContext.getRequiredPersistentEntity(element.getClass());
                   String collection = entity.getCollection();
                   List<T> collectionElements = (List)elementsByCollection.get(collection);
                  //拿到以collection为key的插入内容的List ,如果为空则新建
                   if (null == collectionElements) {
                       collectionElements = new ArrayList();
                       elementsByCollection.put(collection, collectionElements);
                   //把插入内容逐条add到List
                   ((List)collectionElements).add(element);
            //遍历map,组个collection调用this.doInsertBatch方法批量插入
           var5 = elementsByCollection.entrySet().iterator();
           while(var5.hasNext()) {
               Entry<String, List<T>> entry = (Entry)var5.next();
               savedObjects.addAll(this.doInsertBatch((String)entry.getKey(), (Collection)entry.getValue(), this.mongoConverter));
           return savedObjects;
    

    从源码可以看出,insertAll核心逻辑为把方法入参的list按照collection分组,并按组执行this.doInsertBatch方法。这个方法为protected方法,只能内部及子类、同包调用。这个方法实现后续会说到

    insert方法

    MongoTemplate自带的insert方法也有批量插入的实现,其中的一个接口如下:

    <T> Collection<T> insert(Collection<? extends T> var1, String var2);
    

    入参为要插入mongodb的集合以及collection的name
    方法实现为:

    public <T> Collection<T> insert(Collection<? extends T> batchToSave, String collectionName) {
            Assert.notNull(batchToSave, "BatchToSave must not be null!");
            Assert.notNull(collectionName, "CollectionName must not be null!");
            return this.doInsertBatch(collectionName, batchToSave, this.mongoConverter);
    

    可以看出这个方法直接断言了入参不为空,然后调用了和insertAll分类后调用的同一个方法doInsertBatch();从这个角度看如果只插入一个collection,二者的实际底层实现是一样的

    doInsertBatch方法

    首先这个方法为protected 方法,不能从外部直接调用。这个方法为一个通用的批量插入方法。实现如下

        protected <T> Collection<T> doInsertBatch(String collectionName, Collection<? extends T> batchToSave, MongoWriter<T> writer) {
            Assert.notNull(writer, "MongoWriter must not be null!");
          //这个是用于插入的list
            List<Document> documentList = new ArrayList();
            List<T> initializedBatchToSave = new ArrayList(batchToSave.size());
            Iterator var6 = batchToSave.iterator();
            Object saved;
           //遍历要插入的集合,并做一系列的处理
            while(var6.hasNext()) {
                T uninitialized = var6.next();
                BeforeConvertEvent<T> event = new BeforeConvertEvent(uninitialized, collectionName);
                T toConvert = ((BeforeConvertEvent)this.maybeEmitEvent(event)).getSource();
                AdaptibleEntity<T> entity = this.operations.forEntity(toConvert, this.mongoConverter.getConversionService());
                entity.assertUpdateableIdIfNotSet();
                saved = entity.initializeVersionProperty();
                Document document = entity.toMappedDocument(writer).getDocument();
                this.maybeEmitEvent(new BeforeSaveEvent(saved, document, collectionName));
                documentList.add(document);
                initializedBatchToSave.add(saved);
           //这里是关键,处理完之后调用insertDocumentList执行批量插入操作,并返回_id主键列表
            List<Object> ids = this.insertDocumentList(collectionName, documentList);
            List<T> savedObjects = new ArrayList(documentList.size());
            int i = 0;
            for(Iterator var17 = initializedBatchToSave.iterator(); var17.hasNext(); ++i) {
                T obj = var17.next();
                if (i < ids.size()) {
                    saved = this.populateIdIfNecessary(obj, ids.get(i));
                    this.maybeEmitEvent(new AfterSaveEvent(saved, (Document)documentList.get(i), collectionName));
                    savedObjects.add(saved);
                } else {
                    savedObjects.add(obj);
            return savedObjects;
    这个是真正执行批量插入的方法
    protected List<Object> insertDocumentList(String collectionName, List<Document> documents) {
            if (documents.isEmpty()) {
                return Collections.emptyList();
            } else {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Inserting list of Documents containing {} items", documents.size());
                this.execute(collectionName, (collection) -> {
                    MongoAction mongoAction = new MongoAction(this.writeConcern, MongoActionOperation.INSERT_LIST, collectionName, (Class)null, (Document)null, (Document)null);
                    WriteConcern writeConcernToUse = this.prepareWriteConcern(mongoAction);
                    if (writeConcernToUse == null) {
                        collection.insertMany(documents);
                    } else {
                        collection.withWriteConcern(writeConcernToUse).insertMany(documents);
                    return null;
                return MappedDocument.toIds(documents);
    

    从源码中可以看出在doInsertBatch对集合进行处理之后调用了insertDocumentList来执行批量操作,并使用了MongoCollection的insertMany方法即java实现db.collection.insertMany()的接口。在这里执行批量插入后返回插入后主键 “_id”的集合。

    insert方法和insertAll

    可以看出外部可以调用的两个批量插入操作方法实际上底部逻辑是一样的,如果是针对某一MongoDB的集合进行批量操作,使用insert方法效率稍微高一些,免去了不必要的分类操作。这两个方法中核心的批量插入数据库方法底层都是调用的db.collection.insertMany()方法。

    bulkOps方法

    为批量操作方法,底层使用db.collection.bulkWrite();bulkOps方法定义之一如下

    public BulkOperations bulkOps(BulkMode bulkMode, String collectionName)
    

    其中BulkMode为批量模式,下方代码可以看到主要是有ordered和unordered两种模式,对应mongoDb的ordered和unordered,如果使用ordered,则进行顺序操作,如果使用unordered则不按顺序会根据情况重新排序。当使用ordered的时候如果前一条操作命令失败则终止,如果使用unordered模式,执行失败的语句会跳过,直至全部语句执行完毕。理论上来说unordered语句效率高于ordered语句

    public static enum BulkMode {
            ORDERED,
            UNORDERED;
            private BulkMode() {
    

    bulkOps()方法返回一个BulkOperations类型的对象,通过操作这个对象来添加批量语句。BulkOperations对象接口定义如下

    public interface BulkOperations {
        BulkOperations insert(Object var1);
        BulkOperations insert(List<? extends Object> var1);
        BulkOperations updateOne(Query var1, Update var2);
        BulkOperations updateOne(List<Pair<Query, Update>> var1);
        BulkOperations updateMulti(Query var1, Update var2);
        BulkOperations updateMulti(List<Pair<Query, Update>> var1);
        BulkOperations upsert(Query var1, Update var2);
        BulkOperations upsert(List<Pair<Query, Update>> var1);
        BulkOperations remove(Query var1);
        BulkOperations remove(List<Query> var1);
        BulkWriteResult execute();
        public static enum BulkMode {
            ORDERED,
            UNORDERED;
            private BulkMode() {
    

    mongoDb的db.collection.bulkWrite()批量操作的语句只支持以下命令 insertOne、 updateOne、updateMany、deleteOne、deleteMany、 replaceOne。可以看出BulkOperations 包含了所有的操作并且多出一个 insert(List<? extends Object> var1)方法,因为mongoDb的db.collection.bulkWrite()不支持insertMany,所以推断insert(List<? extends Object> var1)应该是java层做的封装。查看源码,在insert(List<? extends Object> var1)方法中使用了foreach直接把单行插入操作方法BulkOperations insert(Object var1)封装成多行。

    public BulkOperations insert(List<? extends Object> documents) {
            Assert.notNull(documents, "Documents must not be null!");
            documents.forEach(this::insert);
            return this;
    

    bulkWrite()方法与insert/insertAll批量插入方法的区别

  • bulkWrite()原理是一次性提交多行语句然后一次性执行,横向对比来说,类似redis的事务、mysql的source导入执行sql文件。
  • insert/insertAll原理是语句级别的多行插入。类似mysql的insetNSERT INTO 表名([列名],[列名]) VALUES
    ([列值],[列值])),([列值],[列值])),([列值],[列值])),........([列值],[列值]));
  • 单从批量插入效率来看,理论上insert/insertAll会快于bulkWrite()。后文将做测试来测试效率。
    从排序来看,因为Spring的MongoTemplate类中的insert以及insertAll方法底层使用了MongoCollection的void insertMany(List<? extends TDocument> var1);方法。其中的实现如下:

    public void insertMany(List<? extends TDocument> documents) {
            this.insertMany(documents, new InsertManyOptions());
    

    其中操作直接new了一个InsertManyOptions。而InsertManyOptions的默认ordered选项为true。

    public final class InsertManyOptions {
        private boolean ordered = true;
        private Boolean bypassDocumentValidation;
        public InsertManyOptions() {
    

    而insertMany方法最终会调用mongodb的db.collection.insertMany(document,writeConcern,ordered)方法,所以在spring-data-mongodb-2.1.5.RELEASE.jar这个版本的包使用insert/insertAll执行批量操作一定是有序操作。而使用bulkWrite()则可以选择有序也可以选择非有序。

    MongoTemplate语句使用demo

    //批量插入
     mongoTemplate.insert(new ArrayList<>(),"collectionDemoName");
    //批量插入
     mongoTemplate.insertAll(new ArrayList<>());
    //初始化BulkOperations 对象为ORDERED模式
    BulkOperations orderedOperations = mongoTemplate.bulkOps(BulkOperations.BulkMode.ORDERED, "collectionDemoName");
    orderedOperations.insert("demo1");
    orderedOperations.insert("demo2");
    orderedOperations.insert(new ArrayList<String>(Arrays.asList("demo3", "demo4")));
    //提交批量执行
    orderedOperations.execute();
    //初始化BulkOperations 对象为UNORDERED模式
    BulkOperations unorderedOperations = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, "collectionDemoName");
    orderedOperations.insert("demo5");
    orderedOperations.insert("demo6");
    orderedOperations.insert(new ArrayList<String>(Arrays.asList("demo7", "demo8")));
    //提交批量执行
    orderedOperations.execute();
    

    批量插入效率测试

    mongoTemplate方法 mongoDb方法 bulkOps(BulkMode bulkMode, String collectionName).execute() db.collection.bulkWrite()