Apache Calcite JDBC查询流程详解

之前写了一篇文章 使用Apache Calcite进行JDBC多数据源关联 里面使用Calcite的JDBC的Adapter对后端数据库进行SQL查询,代码例子如下,整个实现过程很简单,与常规的JDBC使用没有任何区别。但是Calcite在内部整个流程中做了很多有趣的工作,比如sql解析,生成关系表达式,优化关系表达式生成执行计划等。这篇文章会详细分析Calcite JDBC查询的整个流程实现,理解了这整个流程就对Calcite的核心功能和Calcite能做什么都会有深入的了解。

@Test
public void singleSourceTest() throws SQLException {
    Properties config = new Properties();
    config.put("model", TestUtil.resourcePath("singleSource.json"));
    config.put("lex", "MYSQL");
    String sql = "select s.name,c.name from student as s join colleage as c on s.cid = c.id";
    try (Connection con = DriverManager.getConnection("jdbc:calcite:", config)) {
        try (Statement stmt = con.createStatement()) {
            try (ResultSet rs = stmt.executeQuery(sql)) {
                printRs(rs);
}

Calcite JDBC的实现类


Calcite不仅实现了它的核心功能,也为开发者提供了一个称为Avatica的构建JDBC和ODBC Driver的框架,如果希望实现自己的JDBC Driver可以详细学习一下 这个框架 。 从上图可知,Calcite core模块中的JDBC的类实现都是基于Avatica框架。我们在使用Calcite进行JDBC查询时Connection,Statement和ResultSet实例其实就是CalciteJdbc41Connection,CalciteJdbc41Statement,CalciteResultSet。

Calcite的JDBC Driver

在使用Calcite的JDBC Driver时,我们无需手动注册。如下图所示,Calcite核心模块会利用 SPI 方式注册。

Avatica框架在创建JDBC的接口实例的时候要使用一个工场类来创建对应的实例。Calcite的Driver实现中通过 getFactoryClassName 方法获得这个工场类。另外,在这个方法中也可以看出Calcite JDBC实现只支持JDBC4.1版本。

@Override protected String getFactoryClassName(JdbcVersion jdbcVersion) {
    switch (jdbcVersion) {
    case JDBC_30:
    case JDBC_40:
      throw new IllegalArgumentException("JDBC version not supported: "
          + jdbcVersion);
    case JDBC_41:
    default:
      return "org.apache.calcite.jdbc.CalciteJdbc41Factory";
  }

如下图所示,通过CalciteJdbc41Factory提供的方法,可以获得Connection, Statement, ResultSet等实例对象。


获取Calcite的JDBC Conection

当我们通过DriverManager获取Calcite的Connection的时候,大致过程如下:

DriverManager会调用Calcite的Driver的 connect 方法来创建Calcite Connection实例:

public Connection connect(String url, Properties info) throws SQLException {
    if (!acceptsURL(url)) {
      return null;
    final String prefix = getConnectStringPrefix();
    assert url.startsWith(prefix);
    final String urlSuffix = url.substring(prefix.length());
    final Properties info2 = ConnectStringParser.parse(urlSuffix, info);
    final AvaticaConnection connection =
        factory.newConnection(this, factory, url, info2);
    handler.onConnectionInit(connection);
    return connection;
  }

上面的connect方法实现中,Calcite的Connection是通过前面说的CalciteJdbc41Factory创建的,Connection实例的类是CalciteJdbc41Connection。 另外在返回Connection之前,Calcite还会根据配置的model来解析model格式(json或yaml)生成JdbcSchema。

{
    "defaultSchema": "db1",
    "schemas": [
            "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
            "name": "db1",
            "operand": {
                "jdbcDriver": "com.mysql.cj.jdbc.Driver",
                "jdbcPassword": "changeme",
                "jdbcUrl": "jdbc:mysql://localhost:3306/test",
                "jdbcUser": "root"
            "type": "custom"
    "version": "1.0"
}

JdbcSchema在创建的过程主要是根据model中jdbc的信息建立与后端数据库的连接池。

创建Statement


当我们调用connection的 createStatement 方法创建Statement实例的时候,Calcite的执行流程如上图。整个过程也很简单,Caclite的connection对象也是委托CalciteJdbc41Factory工场类来创建Statement对象,这个Statement是CalciteJdbc41Statement实例。

执行executeQuery方法创建ResultSet

当我们获得Statement对象后,调用 executeQuery(sql) 方法时。Calcite在这一步的时候,内部会做很多事情。这里也是整个Calcite JDBC调用工作最核心的地方。 在开始分析这一步的工作之前,假设后端数据库是mysql,要查询的sql如下:

SELECT "ENAME", "EMPNO"
FROM "SCOTT"."EMP"
ORDER BY "EMPNO" NULLS LAST

在获得ResultSet对象之前,executeQuery方法内部的执行过程大致分为下面几个阶段:

- sql解析 :calcite首先会使用它实现的SqlParser解析sql字符串,生成SqlNode为节点的AST(抽象语法树)。 - 生成逻辑关系表达式 :得到SqlNode的AST后,Calcite会使用SqlToRelConverter将SqlNode转换为RelNode组成的逻辑关系表达式树。 - 优化 :Calcite会使用planner(优化器)优化逻辑关系表达式,生成最优的物理关系表达式树(带有具体的算法的算子)。 - 生成Linq4j表达式 :在获得物理关系表达式后,Calcite的EnumerableRelImplentor可以通过物理关系表达式树转换为Linq4j表达式树,通过Linq4j表达式树可以生成一个Java Class。这个Java Class的实例知道如何与后端数据库进行JDBC连接,并且如果是多数据源查询并且需要join、sort、filter等,它也知道这些操作要如何处理。 - 编译 :前面生成的Java Class会通过运行时编译(Calcite使用 Janino编译器 ),编译后生成对象实例,这个实例实现了Iterator接口。会保存在要返回的ResultSet中,后面ResultSet调用相应的方法取数据时,最终是调用这个实例对象获取数据。

下图较详细的显示了整个executeQuery的流程,如果对某一部分详细的实现代码感兴趣可以查看源码下相关方法。


Sql解析

final CalciteConnectionConfig config = context.config();
      final SqlParser.ConfigBuilder parserConfig = createParserConfig()
          .setQuotedCasing(config.quotedCasing())
          .setUnquotedCasing(config.unquotedCasing())
          .setQuoting(config.quoting())
          .setConformance(config.conformance())
          .setCaseSensitive(config.caseSensitive());
      final SqlParserImplFactory parserFactory =
          config.parserFactory(SqlParserImplFactory.class, null);
      if (parserFactory != null) {
        parserConfig.setParserFactory(parserFactory);
      SqlParser parser = createParser(query.sql,  parserConfig);
      SqlNode sqlNode;
      try {
        sqlNode = parser.parseStmt();
        statementType = getStatementType(sqlNode.getKind());
      } catch (SqlParseException e) {
        throw new RuntimeException(
            "parse failed: " + e.getMessage(), e);
/** Factory method for SQL parser with a given configuration. */
  protected SqlParser createParser(String sql,
      SqlParser.ConfigBuilder parserConfig) {
    return SqlParser.create(sql, parserConfig.build());
  /** Factory method for SQL parser configuration. */
  protected SqlParser.ConfigBuilder createParserConfig() {
    return SqlParser.configBuilder();
  }

上面的代码片段显示了Calcite的JDBC如何创建SqlParser,并且通过SqlParser将sql转换成SqlNode的AST。Calcite的Parser是通过JavaCC实现的,要如何扩展Parser可以参考 官方文档 。如果需要单独使用Calcite的SqlParser可以按照上面的方式获得。当例子的sql经过解析后,变为如下结构:


SqlNode转换为RelNode

final SqlToRelConverter.ConfigBuilder builder =
        SqlToRelConverter.configBuilder()
            .withTrimUnusedFields(true)
            .withExpand(THREAD_EXPAND.get())
            .withExplain(sqlQuery.getKind() == SqlKind.EXPLAIN);
    final SqlToRelConverter sqlToRelConverter =
        getSqlToRelConverter(validator, catalogReader, builder.build());
    . . . . . . .
    RelRoot root =
        sqlToRelConverter.convertQuery(sqlQuery, needsValidation, true);

Calcite在获得SqlNode之后要将它转换为RelNode树,用关系代数来表示查询过程。在转换之前,还会使用SqlValidator验证SQL中的标识对象是否合法,比如表是否存在,字段是否存在,数据类型等是否正确。SqlVlidator验证需要的数据是通过CalciteCatalogReader去后端数据库获取的。

private SqlValidator createSqlValidator(Context context,
      CalciteCatalogReader catalogReader) {
    final SqlOperatorTable opTab0 =
        context.config().fun(SqlOperatorTable.class,
            SqlStdOperatorTable.instance());
    final SqlOperatorTable opTab =
        ChainedSqlOperatorTable.of(opTab0, catalogReader);
    final JavaTypeFactory typeFactory = context.getTypeFactory();
    final CalciteConnectionConfig connectionConfig = context.config();
    final SqlValidator.Config config = SqlValidator.Config.DEFAULT
        .withLenientOperatorLookup(connectionConfig.lenientOperatorLookup())
        .withSqlConformance(connectionConfig.conformance())
        .withDefaultNullCollation(connectionConfig.defaultNullCollation())
        .withIdentifierExpansion(true);
    return new CalciteSqlValidator(opTab, catalogReader, typeFactory,
        config);
  }

经过SqlToRelConverter转换后,生成下面形式的RelNode树。目前生成的RelNode树中都是逻辑表达式节点,它们只用于表示SQL的关系表达式形式而不能真正的执行,因为它们都没有具体算子的实现算法。


优化


在优化阶段Calcite会创建优化器对前面生成的逻辑RelNode树进行优化。目前Calcite实现了两个优化器: HepPlanner VolcanoPlanner 。HepPlanner是基于规则的启发式优化器,而Volcano是基于代价的优化器。在JDBC查询过程中,Calcite使用的是VolcanoPlanner。关于Calcite优化器的优化过程,可以参考 Apache Calcite VolcanoPlanner优化过程解析 。 通过优化器优化后RelNode树变成下面形式。JdbcToEnumerableConverter可以通过它的子节点生成要发送给后端数据库的SQL字符串,这里生成的字符串与前面例子的SQL字符串是一样的。

如果是多个数据源join查询时,比如下面的例子。

SELECT s.name,c.name 
FROM db1.student AS s join db2.colleage AS c on s.cid = c.id

优化后的RelNode树如下所示,有两个JdbcToEnumerableConverter分别生成对应数据源的查询sql,返回的数据通过EnumerableHashJoin算子的算法join起来,形成例子中单条sql查询语句的结果。


生成Java Plan

通过优化获得了Jdbc的物理表达式后,要将这些表达式生成Java代码后才能真正将执行计划跑起来。在生成Java代码之前,将Jdbc的物理表达式的根节点变为EnumerableCalc,JdbcToEnumerableConverter变为它的子节点。

由上图可知,EnumerableCalc和JdbcToEnumerableConverter都实现了EnumerableRel接口,所以它们可以连接起来,并且都实现了 implement 方法。 Calcaite如何通过这些物理关系表达式算子生成可执行的Java代码?在EnumerableRel的每个算子的implement方法中会将这个算子要实现的算法写成Linq4j的表达式,然后通过这些Linq4j表达式生成Java Class。

public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
        final BlockBuilder builder0 = new BlockBuilder(false);
        final JdbcRel child = (JdbcRel) getInput();
        . . . . . .
        final JdbcConvention jdbcConvention =
            (JdbcConvention) child.getConvention();
        SqlString sqlString = generateSql(jdbcConvention.dialect);
        String sql = sqlString.getSql();
        . . . . . .
        final Expression sql_ =
            builder0.append("sql", Expressions.constant(sql));
        . . . . .. 
  }

比如上面的代码快是JdbcToEnumerableConverter的 implement 方法的一小块,implememnt中都是像上面的代码一样生成linq4j的表达式。上面的代码片段主要是什么意思? BlockBuilder 会生成 BlockStatement 这代表了程序顺序流程,相对的还有 ConditionStatement WhileStatement 等分支流程。而最后的sql Express那行代码表示在程序中定义一个字符串变量名为 sql ,值是常量,翻译成java代码如下:

String sql = "SELECT ENAME, EMPNO FROM SCOTT.EMP ORDER BY EMPNO NULLS LAST";

linq4j有很多不同的表达式代表一个java程序的不同元素,正确的生成这些元素并且拼装起来可以生成正确的java代码,关于Calcite的linq4j后面会专文出一篇文章详细讲解。 所以上一小节生成的物理表达式树的每个节点的implemnt被调用后就会生成linq4j表达式树,根据linq4j能生成可执行的java代码:

public static Bindable toBindable(Map<String, Object> parameters,
      CalcitePrepare.SparkHandler spark, EnumerableRel rel,
      EnumerableRel.Prefer prefer) {
    EnumerableRelImplementor relImplementor =
        new EnumerableRelImplementor(rel.getCluster().getRexBuilder(),
            parameters);
    final ClassDeclaration expr = relImplementor.implementRoot(rel, prefer);
    String s = Expressions.toString(expr.memberDeclarations, "\n", false);
    ......
  }

上面的代码是 EnumerableInterpretable toBindable 方法的片段,jdbc流程中在获得上一小节讲解的物理表达式树后,经过上面的代码生成一个 EnumerableRelImplementor 对象,通过调用它的 implementRoot 方法会调用物理表达式树根节点(这里是EnumerableCalc)的implement方法,父节点又会迭代调用子节点的implement方法。最终返会获得生成的linq4j表达式树。通过上面代码片段的 Expressions.toString 方法可以生成java代码字符串,比如如下就是生成的Java类:

public class Baz implements org.apache.calcite.runtime.Bindable {
    public org.apache.calcite.linq4j.Enumerable bind(final org.apache.calcite.DataContext root) {
        //of方法调用ResultSetEnumerable的构造方法
        //ResultSetEnumerable对象保存of的参数:后端数据库连接池,sql查询字符串,行映射方法
        final org.apache.calcite.runtime.ResultSetEnumerable enumerable =
                org.apache.calcite.runtime.ResultSetEnumerable.of(
                        (javax.sql.DataSource) root.getRootSchema().getSubSchema("SCOTT")
                                .unwrap(javax.sql.DataSource.class),
                        "SELECT \"ENAME\", \"EMPNO\"\nFROM \"SCOTT\".\"EMP\"\nORDER BY \"EMPNO\" NULLS LAST",
                        new org.apache.calcite.linq4j.function.Function1() {
                            public org.apache.calcite.linq4j.function.Function0 apply(
                                    final java.sql.ResultSet resultSet) {
                                return new org.apache.calcite.linq4j.function.Function0() {
                                    public Object apply() {
                                        try {
                                            final Object[] values = new Object[2];
                                            values[0] = resultSet.getObject(1);
                                            values[1] = resultSet.getShort(2);
                                            if (resultSet.wasNull()) {
                                                values[1] = null;
                                            return values;
                                        } catch (java.sql.SQLException e) {
                                            throw new RuntimeException(e);
                            public Object apply(final Object resultSet) {
                                return apply((java.sql.ResultSet) resultSet);
        //设置jdbc查询超时时间
        enumerable.setTimeout(root);
        return new org.apache.calcite.linq4j.AbstractEnumerable() {
            public org.apache.calcite.linq4j.Enumerator<String> enumerator() {
                return new org.apache.calcite.linq4j.Enumerator<String>() {
                    //内部使用上面生成的ResultSetEnumerable生成的枚举器
                    public final org.apache.calcite.linq4j.Enumerator<Object[]> inputEnumerator =
                            enumerable.enumerator();
                    public void reset() {
                        inputEnumerator.reset();
                    public boolean moveNext() {
                        return inputEnumerator.moveNext();
                    public void close() {
                        inputEnumerator.close();
                    public Object current() {
                        final Object[] current = (Object[]) inputEnumerator.current();
                        return current[0] == null ? (String) null : current[0].toString();
    public Class getElementType() {
        return java.lang.String.class;
}

得到生成的java代码后,Calcite会调用Janino编译器动态编译这个java类,并且实例化这个类的一个对象。 后面在创建CalciteResultSet的时候会调用这个对象的 bind 方法,这个方法返回一个 Eumerable 对象,通过这个对象可以迭代JDBC查询的结果。这个 Eumerable 对象的实际工作是委托给 ResultSetEnumerable enumerator() 方法生成的枚举器实现的。

public Enumerator<T> enumerator() {
    if (preparedStatementEnricher == null) {
      return enumeratorBasedOnStatement();
    } else {
      return enumeratorBasedOnPreparedStatement();
  private Enumerator<T> enumeratorBasedOnStatement() {
    Connection connection = null;
    Statement statement = null;
    try {
      connection = dataSource.getConnection(); //获取实际数据库的jdbc连接
      statement = connection.createStatement(); //生成jdbc statement
      setTimeoutIfPossible(statement);
      if (statement.execute(sql)) { 
        final ResultSet resultSet = statement.getResultSet(); //执行sql获得实际的ResultSet
        statement = null;
        connection = null;
        return new ResultSetEnumerator<>(resultSet, rowBuilderFactory); //用ResultSetEnumerator对象包装ResultSet
      } else {
        Integer updateCount = statement.getUpdateCount();
        return Linq4j.singletonEnumerator((T) updateCount);
    } catch (SQLException e) {
      throw Static.RESOURCE.exceptionWhilePerformingQueryOnJdbcSubSchema(sql)
          .ex(e);
    } finally {
      closeIfPossible(connection, statement);
  }

通过上面 ResultSetEnumerable enumerator() 方法实现代码可以知道,在生成枚举器的时候就开始执行真正的数据库查询,获得实际的ResultSet,用ResultSetEnumerator包装起来,通过ResultSetEnumerator就能操作ResultSet。 在 executeSql 方法的最后,会返回CalciteResultSet,它与实际的ResultSet关系如下:


使用ResultSet

经过执行statement的 executeSql 方法获得CalciteResultSet后,就可以使用next方法获取查询数据。CalciteResultSet的next方法实现如下:

public boolean next() throws SQLException {
    // TODO: for timeout, see IteratorResultSet.next
    checkOpen();
    if (null != statement && statement.cancelFlag.get()) {
      throw AvaticaConnection.HELPER.createException("Statement canceled");
    if (cursor.next()) { //调用cursor的next方法,cursor是IteraterCursor
      ++row;
      beforeFirst = false;
      return true;