Apache Calcite JDBC查询流程详解
之前写了一篇文章 使用Apache Calcite进行JDBC多数据源关联 里面使用Calcite的JDBC的Adapter对后端数据库进行SQL查询,代码例子如下,整个实现过程很简单,与常规的JDBC使用没有任何区别。但是Calcite在内部整个流程中做了很多有趣的工作,比如sql解析,生成关系表达式,优化关系表达式生成执行计划等。这篇文章会详细分析Calcite JDBC查询的整个流程实现,理解了这整个流程就对Calcite的核心功能和Calcite能做什么都会有深入的了解。
@Test
public void singleSourceTest() throws SQLException {
Properties config = new Properties();
config.put("model", TestUtil.resourcePath("singleSource.json"));
config.put("lex", "MYSQL");
String sql = "select s.name,c.name from student as s join colleage as c on s.cid = c.id";
try (Connection con = DriverManager.getConnection("jdbc:calcite:", config)) {
try (Statement stmt = con.createStatement()) {
try (ResultSet rs = stmt.executeQuery(sql)) {
printRs(rs);
}
Calcite JDBC的实现类
Calcite不仅实现了它的核心功能,也为开发者提供了一个称为Avatica的构建JDBC和ODBC Driver的框架,如果希望实现自己的JDBC Driver可以详细学习一下 这个框架 。 从上图可知,Calcite core模块中的JDBC的类实现都是基于Avatica框架。我们在使用Calcite进行JDBC查询时Connection,Statement和ResultSet实例其实就是CalciteJdbc41Connection,CalciteJdbc41Statement,CalciteResultSet。
Calcite的JDBC Driver
在使用Calcite的JDBC Driver时,我们无需手动注册。如下图所示,Calcite核心模块会利用 SPI 方式注册。
Avatica框架在创建JDBC的接口实例的时候要使用一个工场类来创建对应的实例。Calcite的Driver实现中通过
getFactoryClassName
方法获得这个工场类。另外,在这个方法中也可以看出Calcite JDBC实现只支持JDBC4.1版本。
@Override protected String getFactoryClassName(JdbcVersion jdbcVersion) {
switch (jdbcVersion) {
case JDBC_30:
case JDBC_40:
throw new IllegalArgumentException("JDBC version not supported: "
+ jdbcVersion);
case JDBC_41:
default:
return "org.apache.calcite.jdbc.CalciteJdbc41Factory";
}
如下图所示,通过CalciteJdbc41Factory提供的方法,可以获得Connection, Statement, ResultSet等实例对象。
获取Calcite的JDBC Conection
当我们通过DriverManager获取Calcite的Connection的时候,大致过程如下:
DriverManager会调用Calcite的Driver的
connect
方法来创建Calcite Connection实例:
public Connection connect(String url, Properties info) throws SQLException {
if (!acceptsURL(url)) {
return null;
final String prefix = getConnectStringPrefix();
assert url.startsWith(prefix);
final String urlSuffix = url.substring(prefix.length());
final Properties info2 = ConnectStringParser.parse(urlSuffix, info);
final AvaticaConnection connection =
factory.newConnection(this, factory, url, info2);
handler.onConnectionInit(connection);
return connection;
}
上面的connect方法实现中,Calcite的Connection是通过前面说的CalciteJdbc41Factory创建的,Connection实例的类是CalciteJdbc41Connection。 另外在返回Connection之前,Calcite还会根据配置的model来解析model格式(json或yaml)生成JdbcSchema。
{
"defaultSchema": "db1",
"schemas": [
"factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
"name": "db1",
"operand": {
"jdbcDriver": "com.mysql.cj.jdbc.Driver",
"jdbcPassword": "changeme",
"jdbcUrl": "jdbc:mysql://localhost:3306/test",
"jdbcUser": "root"
"type": "custom"
"version": "1.0"
}
JdbcSchema在创建的过程主要是根据model中jdbc的信息建立与后端数据库的连接池。
创建Statement
当我们调用connection的
createStatement
方法创建Statement实例的时候,Calcite的执行流程如上图。整个过程也很简单,Caclite的connection对象也是委托CalciteJdbc41Factory工场类来创建Statement对象,这个Statement是CalciteJdbc41Statement实例。
执行executeQuery方法创建ResultSet
当我们获得Statement对象后,调用
executeQuery(sql)
方法时。Calcite在这一步的时候,内部会做很多事情。这里也是整个Calcite JDBC调用工作最核心的地方。 在开始分析这一步的工作之前,假设后端数据库是mysql,要查询的sql如下:
SELECT "ENAME", "EMPNO"
FROM "SCOTT"."EMP"
ORDER BY "EMPNO" NULLS LAST
在获得ResultSet对象之前,executeQuery方法内部的执行过程大致分为下面几个阶段:
- sql解析 :calcite首先会使用它实现的SqlParser解析sql字符串,生成SqlNode为节点的AST(抽象语法树)。 - 生成逻辑关系表达式 :得到SqlNode的AST后,Calcite会使用SqlToRelConverter将SqlNode转换为RelNode组成的逻辑关系表达式树。 - 优化 :Calcite会使用planner(优化器)优化逻辑关系表达式,生成最优的物理关系表达式树(带有具体的算法的算子)。 - 生成Linq4j表达式 :在获得物理关系表达式后,Calcite的EnumerableRelImplentor可以通过物理关系表达式树转换为Linq4j表达式树,通过Linq4j表达式树可以生成一个Java Class。这个Java Class的实例知道如何与后端数据库进行JDBC连接,并且如果是多数据源查询并且需要join、sort、filter等,它也知道这些操作要如何处理。 - 编译 :前面生成的Java Class会通过运行时编译(Calcite使用 Janino编译器 ),编译后生成对象实例,这个实例实现了Iterator接口。会保存在要返回的ResultSet中,后面ResultSet调用相应的方法取数据时,最终是调用这个实例对象获取数据。
下图较详细的显示了整个executeQuery的流程,如果对某一部分详细的实现代码感兴趣可以查看源码下相关方法。
Sql解析
final CalciteConnectionConfig config = context.config();
final SqlParser.ConfigBuilder parserConfig = createParserConfig()
.setQuotedCasing(config.quotedCasing())
.setUnquotedCasing(config.unquotedCasing())
.setQuoting(config.quoting())
.setConformance(config.conformance())
.setCaseSensitive(config.caseSensitive());
final SqlParserImplFactory parserFactory =
config.parserFactory(SqlParserImplFactory.class, null);
if (parserFactory != null) {
parserConfig.setParserFactory(parserFactory);
SqlParser parser = createParser(query.sql, parserConfig);
SqlNode sqlNode;
try {
sqlNode = parser.parseStmt();
statementType = getStatementType(sqlNode.getKind());
} catch (SqlParseException e) {
throw new RuntimeException(
"parse failed: " + e.getMessage(), e);
/** Factory method for SQL parser with a given configuration. */
protected SqlParser createParser(String sql,
SqlParser.ConfigBuilder parserConfig) {
return SqlParser.create(sql, parserConfig.build());
/** Factory method for SQL parser configuration. */
protected SqlParser.ConfigBuilder createParserConfig() {
return SqlParser.configBuilder();
}
上面的代码片段显示了Calcite的JDBC如何创建SqlParser,并且通过SqlParser将sql转换成SqlNode的AST。Calcite的Parser是通过JavaCC实现的,要如何扩展Parser可以参考 官方文档 。如果需要单独使用Calcite的SqlParser可以按照上面的方式获得。当例子的sql经过解析后,变为如下结构:
SqlNode转换为RelNode
final SqlToRelConverter.ConfigBuilder builder =
SqlToRelConverter.configBuilder()
.withTrimUnusedFields(true)
.withExpand(THREAD_EXPAND.get())
.withExplain(sqlQuery.getKind() == SqlKind.EXPLAIN);
final SqlToRelConverter sqlToRelConverter =
getSqlToRelConverter(validator, catalogReader, builder.build());
. . . . . . .
RelRoot root =
sqlToRelConverter.convertQuery(sqlQuery, needsValidation, true);
Calcite在获得SqlNode之后要将它转换为RelNode树,用关系代数来表示查询过程。在转换之前,还会使用SqlValidator验证SQL中的标识对象是否合法,比如表是否存在,字段是否存在,数据类型等是否正确。SqlVlidator验证需要的数据是通过CalciteCatalogReader去后端数据库获取的。
private SqlValidator createSqlValidator(Context context,
CalciteCatalogReader catalogReader) {
final SqlOperatorTable opTab0 =
context.config().fun(SqlOperatorTable.class,
SqlStdOperatorTable.instance());
final SqlOperatorTable opTab =
ChainedSqlOperatorTable.of(opTab0, catalogReader);
final JavaTypeFactory typeFactory = context.getTypeFactory();
final CalciteConnectionConfig connectionConfig = context.config();
final SqlValidator.Config config = SqlValidator.Config.DEFAULT
.withLenientOperatorLookup(connectionConfig.lenientOperatorLookup())
.withSqlConformance(connectionConfig.conformance())
.withDefaultNullCollation(connectionConfig.defaultNullCollation())
.withIdentifierExpansion(true);
return new CalciteSqlValidator(opTab, catalogReader, typeFactory,
config);
}
经过SqlToRelConverter转换后,生成下面形式的RelNode树。目前生成的RelNode树中都是逻辑表达式节点,它们只用于表示SQL的关系表达式形式而不能真正的执行,因为它们都没有具体算子的实现算法。
优化
在优化阶段Calcite会创建优化器对前面生成的逻辑RelNode树进行优化。目前Calcite实现了两个优化器:
HepPlanner
和
VolcanoPlanner
。HepPlanner是基于规则的启发式优化器,而Volcano是基于代价的优化器。在JDBC查询过程中,Calcite使用的是VolcanoPlanner。关于Calcite优化器的优化过程,可以参考
Apache Calcite VolcanoPlanner优化过程解析
。 通过优化器优化后RelNode树变成下面形式。JdbcToEnumerableConverter可以通过它的子节点生成要发送给后端数据库的SQL字符串,这里生成的字符串与前面例子的SQL字符串是一样的。
如果是多个数据源join查询时,比如下面的例子。
SELECT s.name,c.name
FROM db1.student AS s join db2.colleage AS c on s.cid = c.id
优化后的RelNode树如下所示,有两个JdbcToEnumerableConverter分别生成对应数据源的查询sql,返回的数据通过EnumerableHashJoin算子的算法join起来,形成例子中单条sql查询语句的结果。
生成Java Plan
通过优化获得了Jdbc的物理表达式后,要将这些表达式生成Java代码后才能真正将执行计划跑起来。在生成Java代码之前,将Jdbc的物理表达式的根节点变为EnumerableCalc,JdbcToEnumerableConverter变为它的子节点。
由上图可知,EnumerableCalc和JdbcToEnumerableConverter都实现了EnumerableRel接口,所以它们可以连接起来,并且都实现了
implement
方法。 Calcaite如何通过这些物理关系表达式算子生成可执行的Java代码?在EnumerableRel的每个算子的implement方法中会将这个算子要实现的算法写成Linq4j的表达式,然后通过这些Linq4j表达式生成Java Class。
public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
final BlockBuilder builder0 = new BlockBuilder(false);
final JdbcRel child = (JdbcRel) getInput();
. . . . . .
final JdbcConvention jdbcConvention =
(JdbcConvention) child.getConvention();
SqlString sqlString = generateSql(jdbcConvention.dialect);
String sql = sqlString.getSql();
. . . . . .
final Expression sql_ =
builder0.append("sql", Expressions.constant(sql));
. . . . ..
}
比如上面的代码快是JdbcToEnumerableConverter的
implement
方法的一小块,implememnt中都是像上面的代码一样生成linq4j的表达式。上面的代码片段主要是什么意思?
BlockBuilder
会生成
BlockStatement
这代表了程序顺序流程,相对的还有
ConditionStatement
,
WhileStatement
等分支流程。而最后的sql Express那行代码表示在程序中定义一个字符串变量名为
sql
,值是常量,翻译成java代码如下:
String sql = "SELECT ENAME, EMPNO FROM SCOTT.EMP ORDER BY EMPNO NULLS LAST";
linq4j有很多不同的表达式代表一个java程序的不同元素,正确的生成这些元素并且拼装起来可以生成正确的java代码,关于Calcite的linq4j后面会专文出一篇文章详细讲解。 所以上一小节生成的物理表达式树的每个节点的implemnt被调用后就会生成linq4j表达式树,根据linq4j能生成可执行的java代码:
public static Bindable toBindable(Map<String, Object> parameters,
CalcitePrepare.SparkHandler spark, EnumerableRel rel,
EnumerableRel.Prefer prefer) {
EnumerableRelImplementor relImplementor =
new EnumerableRelImplementor(rel.getCluster().getRexBuilder(),
parameters);
final ClassDeclaration expr = relImplementor.implementRoot(rel, prefer);
String s = Expressions.toString(expr.memberDeclarations, "\n", false);
......
}
上面的代码是
EnumerableInterpretable
的
toBindable
方法的片段,jdbc流程中在获得上一小节讲解的物理表达式树后,经过上面的代码生成一个
EnumerableRelImplementor
对象,通过调用它的
implementRoot
方法会调用物理表达式树根节点(这里是EnumerableCalc)的implement方法,父节点又会迭代调用子节点的implement方法。最终返会获得生成的linq4j表达式树。通过上面代码片段的
Expressions.toString
方法可以生成java代码字符串,比如如下就是生成的Java类:
public class Baz implements org.apache.calcite.runtime.Bindable {
public org.apache.calcite.linq4j.Enumerable bind(final org.apache.calcite.DataContext root) {
//of方法调用ResultSetEnumerable的构造方法
//ResultSetEnumerable对象保存of的参数:后端数据库连接池,sql查询字符串,行映射方法
final org.apache.calcite.runtime.ResultSetEnumerable enumerable =
org.apache.calcite.runtime.ResultSetEnumerable.of(
(javax.sql.DataSource) root.getRootSchema().getSubSchema("SCOTT")
.unwrap(javax.sql.DataSource.class),
"SELECT \"ENAME\", \"EMPNO\"\nFROM \"SCOTT\".\"EMP\"\nORDER BY \"EMPNO\" NULLS LAST",
new org.apache.calcite.linq4j.function.Function1() {
public org.apache.calcite.linq4j.function.Function0 apply(
final java.sql.ResultSet resultSet) {
return new org.apache.calcite.linq4j.function.Function0() {
public Object apply() {
try {
final Object[] values = new Object[2];
values[0] = resultSet.getObject(1);
values[1] = resultSet.getShort(2);
if (resultSet.wasNull()) {
values[1] = null;
return values;
} catch (java.sql.SQLException e) {
throw new RuntimeException(e);
public Object apply(final Object resultSet) {
return apply((java.sql.ResultSet) resultSet);
//设置jdbc查询超时时间
enumerable.setTimeout(root);
return new org.apache.calcite.linq4j.AbstractEnumerable() {
public org.apache.calcite.linq4j.Enumerator<String> enumerator() {
return new org.apache.calcite.linq4j.Enumerator<String>() {
//内部使用上面生成的ResultSetEnumerable生成的枚举器
public final org.apache.calcite.linq4j.Enumerator<Object[]> inputEnumerator =
enumerable.enumerator();
public void reset() {
inputEnumerator.reset();
public boolean moveNext() {
return inputEnumerator.moveNext();
public void close() {
inputEnumerator.close();
public Object current() {
final Object[] current = (Object[]) inputEnumerator.current();
return current[0] == null ? (String) null : current[0].toString();
public Class getElementType() {
return java.lang.String.class;
}
得到生成的java代码后,Calcite会调用Janino编译器动态编译这个java类,并且实例化这个类的一个对象。 后面在创建CalciteResultSet的时候会调用这个对象的
bind
方法,这个方法返回一个
Eumerable
对象,通过这个对象可以迭代JDBC查询的结果。这个
Eumerable
对象的实际工作是委托给
ResultSetEnumerable
的
enumerator()
方法生成的枚举器实现的。
public Enumerator<T> enumerator() {
if (preparedStatementEnricher == null) {
return enumeratorBasedOnStatement();
} else {
return enumeratorBasedOnPreparedStatement();
private Enumerator<T> enumeratorBasedOnStatement() {
Connection connection = null;
Statement statement = null;
try {
connection = dataSource.getConnection(); //获取实际数据库的jdbc连接
statement = connection.createStatement(); //生成jdbc statement
setTimeoutIfPossible(statement);
if (statement.execute(sql)) {
final ResultSet resultSet = statement.getResultSet(); //执行sql获得实际的ResultSet
statement = null;
connection = null;
return new ResultSetEnumerator<>(resultSet, rowBuilderFactory); //用ResultSetEnumerator对象包装ResultSet
} else {
Integer updateCount = statement.getUpdateCount();
return Linq4j.singletonEnumerator((T) updateCount);
} catch (SQLException e) {
throw Static.RESOURCE.exceptionWhilePerformingQueryOnJdbcSubSchema(sql)
.ex(e);
} finally {
closeIfPossible(connection, statement);
}
通过上面
ResultSetEnumerable
的
enumerator()
方法实现代码可以知道,在生成枚举器的时候就开始执行真正的数据库查询,获得实际的ResultSet,用ResultSetEnumerator包装起来,通过ResultSetEnumerator就能操作ResultSet。 在
executeSql
方法的最后,会返回CalciteResultSet,它与实际的ResultSet关系如下:
使用ResultSet
经过执行statement的
executeSql
方法获得CalciteResultSet后,就可以使用next方法获取查询数据。CalciteResultSet的next方法实现如下:
public boolean next() throws SQLException {
// TODO: for timeout, see IteratorResultSet.next
checkOpen();
if (null != statement && statement.cancelFlag.get()) {
throw AvaticaConnection.HELPER.createException("Statement canceled");
if (cursor.next()) { //调用cursor的next方法,cursor是IteraterCursor
++row;
beforeFirst = false;
return true;