用Hibernate/JPA进行批量处理的方法

学习在hibernate中启用批处理,并 执行批量INSERT / UPDATE语句 ,以提高性能和内存利用率。

请注意, 在内部,Hibernate利用JDBC的批处理能力,将多个SQL语句作为 一个 PreparedStatement 进行批处理。

1.不启用批处理

默认情况下,Hibernate中的批处理是禁用的。因此,如果我们坚持10个新实体,那么将执行10个独立的SQL INSERT 语句。对于一百万条记录也是如此。在任何典型的设置中,任何应用程序的性能都会随着行数的增加而不断下降。

@Test
public void testWithoutPeriodicFlush() {
  doInTransaction(session -> {
    for (int i = 1; i <= 10; i++) {
      System.out.println("Statement Queued : " + i);
      session.persist(new Post.PostBuilder()
          .title("title" + i)
          .content("content" + i)
          .build());

注意控制台中的日志。很明显,hibernate首先在当前的持久化上下文中排队等待所有的语句。当事务被提交时,所有的语句都在方法的最后被执行

Statement Queued : 1
Statement Queued : 2
Statement Queued : 10
Hibernate: insert into Post (content, title, id) values (?, ?, ?)
Hibernate: insert into Post (content, title, id) values (?, ?, ?)
Hibernate: insert into Post (content, title, id) values (?, ?, ?)

2.记录批处理语句

如前所述,hibernate依靠底层的JDBC APIs来创建批量排队的语句,所以为了实现对这些语句的记录,我们必须在数据源层面拦截这些调用。

该库 net.ttddyy:datasource-proxy就是这样一个库,它有助于围绕应用程序使用的原始数据源创建一个代理。我建议在本地和预生产环境中使用它来测试应用程序。避免在生产中使用它。

<dependency>
    <groupId>net.ttddyy</groupId>
    <artifactId>datasource-proxy</artifactId>
    <version>1.7</version>
</dependency>

我们可以使用*@Autowired* DataSource实例或在单元测试中创建一个新的DataSource ,然后用ProxyDataSource包裹它。

private DataSource getDataSource() {
  // Autowire or Create a DataSource
  MysqlDataSource ds = new MysqlDataSource();
  ds.setURL("jdbc:mysql://localhost/testdb");
  ds.setUser("root");
  ds.setPassword("password");
  // Create ProxyDataSource
  ProxyDataSource dataSource = ProxyDataSourceBuilder.create(ds)
      .asJson()
      .countQuery()
      .logQueryToSysOut()
      .multiline()
      .build();
  return dataSource;

最后,在StandardServiceRegistry中使用这个代理数据源来引导SessionFactory

Map<String, Object> settings = new HashMap<>();
settings.put(Environment.DATASOURCE, getDataSource());
settings.put(Environment.DRIVER, "com.mysql.cj.jdbc.Driver");
settings.put(Environment.DIALECT, "org.hibernate.dialect.MySQL8Dialect");
StandardServiceRegistry standardRegistry
            = new StandardServiceRegistryBuilder()
            .applySettings(settings)
            .build();

在生产环境中,只是为了验证批处理是否有效,我们可以为org.hibernate.engine.jdbc.batch.internal.BatchingBatch 记录器启用DEBUG日志。

<logger name="org.hibernate.engine.jdbc.batch.internal.BatchingBatch"  level="DEBUG"/>

**如果你看到类似的日志,那么批处理是工作的。**它不提供任何关于批处理的有用信息,但它足以验证批处理功能在低级别的API上是有效的。

2022-05-18_16:50:06.302 DEBUG o.h.e.j.batch.internal.BatchingBatch - Executing batch size: 5

3.启用批处理

启用批处理,我们需要将hibernate.jdbc.batch_size 属性设置为大于0的数字。

hibernate.jdbc.batch_size = 5

如果我们使用Spring Boot,我们可以把它定义为一个应用程序属性。

spring.jpa.properties.hibernate.jdbc.batch_size = 5

配置会话特定的批处理大小,我们可以使用*setJdbcBatchSize()*方法。

//Using Session
session.setJdbcBatchSize(100);
//Using EntityManager
entityManager.unwrap(Session.class).setJdbcBatchSize(100);

在配置了数据源代理后,再次执行第一个测试,并检查日志。

@Test
public void testWithoutPeriodicFlush() {
  doInTransaction(session -> {
    for (int i = 1; i <= 10; i++) {
      System.out.println("Statement Queued : " + i);
      session.persist(new Post.PostBuilder()
          .title("title" + i)
          .content("content" + i)
          .build());
Statement Queued : 1
Statement Queued : 2
Statement Queued : 3
Statement Queued : 4
Statement Queued : 5
Statement Queued : 6
Statement Queued : 7
Statement Queued : 8
Statement Queued : 9
Statement Queued : 10
2022-05-20_00:47:58.178 DEBUG o.h.e.j.batch.internal.BatchingBatch - Executing batch size: 5
Name:, Connection:3, Time:0, Success:True
Type:Prepared, Batch:True, QuerySize:1, BatchSize:5
Query:["insert into Post (content, title, id) values (?, ?, ?)"]
Params:[(content_1,title_1,1802),
(content_2,title_2,1803),
(content_3,title_3,1804),
(content_4,title_4,1805),
(content_5,title_5,1806)]
2022-05-20_00:47:58.178 DEBUG o.h.e.j.batch.internal.BatchingBatch - Executing batch size: 5
Name:, Connection:3, Time:15, Success:True
Type:Prepared, Batch:True, QuerySize:1, BatchSize:5
Query:["insert into Post (content, title, id) values (?, ?, ?)"]
Params:[(content_6,title_6,1807),
(content_7,title_7,1808),
(content_8,title_8,1809),
(content_9,title_9,1810),
(content_10,title_10,1811)]

很明显,当我们提交事务时,批处理被启用并工作。Hibernate正在以5个批次的方式发送INSERT语句。

在JDBC层面,这些分批的事务被分组为一条INSERT语句。因此,每5个帖子,在数据库层面只有一条INSERT语句。

4.定期刷新和清除会话

上面展示的默认批处理还存在一个问题。它首先将所有的实体排入上下文,然后等待提交事务。

这可能是一个严重的问题,因为我们必须将成千上万的实体实例排入内存(会话级缓存),然后再将它们冲到数据库中。对于足够大的批次,它可能会导致OutOfMemoryError

为了克服这个问题,我们需要定期刷新和清除会话。

  • 会话的 ***flush()***方法触发了一个事务同步,将持久化实体中的所有变化发送到数据库。冲洗是将底层持久化存储与内存中持有的持久化状态同步的过程。
  • 会话的clear()清除会话。它从会话中驱逐所有加载的实例,并取消所有待定的保存、更新和删除。
  • 在给定的例子中,我们在每个批次(大小为5)之后冲刷和清除会话。所以现在,我们在会话中排队5个帖子,并使用*flush()*方法在一个批次语句中把这5个帖子插入数据库。我们在不改变整体批处理行为的情况下重复这样做。

    @Test
    public void testWithPeriodicFlush() {
      doInTransaction(session -> {
        for (int i = 1; i <= 10; i++) {
          System.out.println("Statement Queued : " + i);
          session.persist(new Post.PostBuilder()
              .title("title" + i)
              .content("content" + i)
              .build());
          if (i % 5 == 0) {
            session.flush();
            session.clear();
    

    请注意日志的内容。

    Statement Queued : 1
    Statement Queued : 2
    Statement Queued : 3
    Statement Queued : 4
    Statement Queued : 5
    2022-05-18_17:16:20.227 DEBUG o.h.e.j.batch.internal.BatchingBatch - Executing batch size: 5
    Name:, Connection:3, Time:0, Success:True
    Type:Prepared, Batch:True, QuerySize:1, BatchSize:5
    Query:["insert into Post (content, title, id) values (?, ?, ?)"]
    Params:[(content_1,title_1,1852),
    (content_2,title_2,1853),
    (content_3,title_3,1854),
    (content_4,title_4,1855),
    (content_5,title_5,1856)]
    Statement Queued : 6
    Statement Queued : 7
    Statement Queued : 8
    Statement Queued : 9
    Statement Queued : 10
    2022-05-18_17:16:20.231 DEBUG o.h.e.j.batch.internal.BatchingBatch - Executing batch size: 5
    Name:, Connection:3, Time:0, Success:True
    Type:Prepared, Batch:True, QuerySize:1, BatchSize:5
    Query:["insert into Post (content, title, id) values (?, ?, ?)"]
    Params:[(content_6,title_6,1857),
    (content_7,title_7,1858),
    (content_8,title_8,1859),
    (content_9,title_9,1860),
    (content_10,title_10,1861)]
    

    现在,这是更好的代码,提供了出色的内存和运行时性能。

    5.为多个实体的批处理插入排序

    hibernate批处理的一个限制是,它只允许在一个批次中使用一种类型的实体。对于不同的实体,将创建第二个批次。

    让我们通过一个例子来理解。让我们创建几个帖子和一些关于它们的评论。在下面的例子中,我们创建了10个帖子,并为每个帖子添加了4条评论。这使得总共有10个帖子和40条评论。

    @Test
    public void testInsertOrdering() {
      doInTransaction(session -> {
        for (int i = 1; i <= 10; i++) {
          List<Comment> comments = new ArrayList<>();
          for (int j = 1; j <= 4; j++) {
            Comment comment =
                new Comment.CommentBuilder().text("Comment - " + j).build();
            session.persist(comment);
            comments.add(comment);
          Post post = new Post.PostBuilder()
              .title("title" + i)
              .content("content" + i)
              .comments(comments)
              .build();
          session.persist(post);
    

    请注意日志的内容。所有的帖子都以单独的批次进入一个数据库。同样地,评论也是分10个批次进行的。所以在这个过程中,总共有20条SQL INSERT语句被执行。

    2022-05-20_00:47:58.553 DEBUG o.h.e.j.batch.internal.BatchingBatch - Executing batch size: 1
    Name:, Connection:3, Time:0, Success:True
    Type:Prepared, Batch:True, QuerySize:1, BatchSize:1
    Query:["insert into Post (content, title, id) values (?, ?, ?)"]
    Params:[(content1,title1,1902)]
    2022-05-20_00:47:58.553 DEBUG o.h.e.j.batch.internal.BatchingBatch - Executing batch size: 4
    Name:, Connection:3, Time:0, Success:True
    Type:Prepared, Batch:True, QuerySize:1, BatchSize:4
    Query:["insert into Comment (post_id, text, id) values (?, ?, ?)"]
    Params:[(NULL(BIGINT),Comment - 1,606),
    (NULL(BIGINT),Comment - 2,607),
    (NULL(BIGINT),Comment - 3,608),
    (NULL(BIGINT),Comment - 4,609)]
    

    为了提高性能,从逻辑上讲,所有10个帖子可以在一个SQL INSERT语句中进入数据库。当我们为所有10个帖子创建了帖子ID后,所有的40条评论应该在第二个INSERT语句中进入数据库。所以在整个过程中,应该只需要两条INSERT语句。

    Hibernate提供了hibernate.order_inserts 属性,可以用来强制Hibernate对插入进行排序,以允许更多的批处理。官方文档告诫说,这样做会影响性能,所以要对前后进行基准测试,看这对我们的应用是有帮助还是有伤害

    settings.put("hibernate.order_inserts", true);
    hibernate.order_inserts = true
    

    在Spring boot应用程序中,我们可以使用以下属性强制插入顺序。

    spring.jpa.properties.hibernate.order_inserts = true
    

    在配置后再次运行测试,注意所有的帖子和评论是如何在短短的2条INSERT语句中被创建的。请注意,我们已经将批处理量增加到50条,以便在一个批次中容纳更多的语句。

    2022-05-20_01:08:56.683 DEBUG o.h.e.j.batch.internal.BatchingBatch - Executing batch size: 10
    Name:, Connection:3, Time:0, Success:True
    Type:Prepared, Batch:True, QuerySize:1, BatchSize:10
    Query:["insert into Post (content, title, id) values (?, ?, ?)"]
    Params:[(content1,title1,2302),.....]
    2022-05-20_01:08:56.699 DEBUG o.h.e.j.batch.internal.BatchingBatch - Executing batch size: 40
    Name:, Connection:3, Time:15, Success:True
    Type:Prepared, Batch:True, QuerySize:1, BatchSize:40
    Query:["insert into Comment (post_id, text, id) values (?, ?, ?)"]
    Params:[(NULL(BIGINT),Comment - 1,702),....]
    

    6.批量更新的排序

    与插入排序类似,我们可以强制hibernate在类似上述的情况下对SQL UPDATE语句进行分组。

    settings.put("hibernate.order_updates", "true");
    settings.put("hibernate.batch_versioned_data", "true");
    

    而如果我们使用的是Spring Boot,我们需要将这些添加到application.properties中。

    spring.jpa.properties.hibernate.order_updates=true
    spring.jpa.properties.hibernate.batch_versioned_data=true
    

    7.常见问题

    7.1.配置了属性后,批处理仍然不工作

    最有可能的原因是你在使用GenerationType.IDENTITY 标识符生成器。

    重要的是要认识到,使用IDENTITY列会带来一个运行时行为,即实体行必须在标识符值被知道之前被实际插入。由于这个限制,Hibernate将无法为使用IDENTITY生成器的实体批量插入语句。

    在批量处理中,建议使用GenerationType.SEQUENCE

    7.2.在每个会话的基础上配置批量大小

    我们可以在全局层面上有一个合理的批处理大小,但有时我们需要覆盖全局批处理大小。使用session.setJdbcBatchSize()方法来配置会话特定的批处理大小。

    session.setJdbcBatchSize(100);
    

    在本教程中,我们学习了Hibernate的批处理功能以及如何正确使用它。我们学习了如何通过定期刷新持久化上下文,控制会话特定的批处理大小,以及最后为更好的批处理对INSERT和UPDATE语句进行排序来进一步提高性能。

    学习愉快!!