如何实现Flink、Iceberg和Hive数据类型之间的互转——《DEEPNOVA开发者社区》
作者:闻乃松
Flink、Iceberg和Hive都提供了丰富的SQL 数据类型,但是在具体使用上,还是存在一些差异和不兼容的情况,当将三套引擎放在一起使用时候,经常会碰到将几种引擎的类型之间来回转换。问题是,这几套独立的数据类型是否支持互相转换,以及如何才能减轻转换的复杂性?
为此,首先需要仔细地调研几种引擎对数据类型的支持情况,现将三种引擎支持的数据类型和对应关系罗列如下:
| Flink | Iceberg | Hive |
| CHAR | CHAR | |
| VARCHAR | VARCHAR | |
| STRING | string | STRING |
| BOOLEAN | boolean | BOOLEAN |
| BINARY | fixed(L) | |
| VARBINARY | binary | BINARY |
| BYTES | ||
| DECIMAL | decimal(P,S) | DECIMAL |
| TINYINT | TINYINT | |
| SMALLINT | SMALLINT | |
| INTEGER | int | INT |
| BIGINT | long | BIGINT |
| FLOAT | float | FLOAT |
| DOUBLE | double | DOUBLE |
| DATE | date | DATE |
| TIME | time | |
| TIMESTAMP | timestamp | TIMESTAMP |
| TIMESTAMP_LTZ | timestamptz | |
| INTERVAL | INTERVAL | |
| ARRAY | list<E> | array |
| MULTISET | ||
| MAP | map<K, V> | map |
| ROW | struct<...> | struct |
| RAW | ||
| union | ||
| Structured types | struct<...> |
数据类型在这三种引擎之间都可以互转,但是最终存储和持久化到Hive元数据中,首先来看Hive的数据类型。
Hive将数据类型分两大类:
Complex类型:map,array ,struct,union
Primitive类型: 除Complex类型之外,如 int,float, double 等
在Hive serde库中,定义了几种类型分类:
//ObjectInspector.class
public static enum Category {
PRIMITIVE, LIST, MAP, STRUCT, UNION
};
其中Complex数据类型的使用语法格式分别为:
ARRAY<data_type>
MAP<primitive_type, data_type>
STRUCT<col_name : data_type [COMMENT col_comment], ...>
UNIONTYPE<data_type, data_type, ...>
举个建表示例:
CREATE TABLE union_test(foo UNIONTYPE<int, double, array<string>, struct<a:int,b:string>>);
每种数据类型在网络传输中都涉及到类型的序列化和反序列化,因此数据类型都有显示名称,也就是字符串表示的名字,如:
INT的显示类型名称为 int
CHAR的显示类型名称为 char
VARCHAR的显示类型名称为 varchar(length)
ARRAY的显示类型名称为 array<data_type>
MAP的显示类型名称为 map<primitive_type, data_type>
STRUCT的显示类型名称为 struct<col_name:data_type,...>
UNION的显示类型名称为uniontype<data_type, data_type, ...>
Hive serde库提供了对这些类型的序列化和反序列化方法:
//TypeInfoUtils.class
public static ArrayList<TypeInfo> getTypeInfosFromTypeString(String typeString) ;
public static List<String> getTypeStringsFromTypeInfo(List<TypeInfo> typeInfos)
举个例子,将map类型的字符串表示解析成TypeInfo类型对象表示:
ArrayList<TypeInfo> typeInfo = TypeInfoUtils.getTypeInfosFromTypeString("map<boolean,boolean>");
其中TypeInfo是Hive定义的类型信息的对象接口表示,Hive总共实现了以下这些TypeInfo子类:
| CharTypeInfo |
| VarcharTypeInfo |
| DecimalTypeInfo |
| ListTypeInfo |
| MapTypeInfo |
| StructTypeInfo |
| PrimitiveTypeInfo |
| UnionTypeInfo |
而其中的DecimalTypeInfo属于基本数据类型的合集,
public static enum PrimitiveCategory {
VOID,
BOOLEAN,
BYTE,
SHORT,
LONG,
FLOAT,
DOUBLE,
STRING,
DATE,
TIMESTAMP,
TIMESTAMPLOCALTZ,
BINARY,
DECIMAL,
VARCHAR,
CHAR,
INTERVAL_YEAR_MONTH,
INTERVAL_DAY_TIME,
UNKNOWN;
}
在PrimitiveCategory定义里, DECIMAL, VARCHAR,CHAR 被归属于基本类型,但是又实现了相应的TypeInfo子类。可见,在数据类型的归属上,Hive存在不一致的地方。
看完了Hive的数据类型,再来看Flink的数据类型定义:
| CharType |
| VarCharType |
| BinaryType |
| VarBinaryType |
| DecimalType |
| ArrayType |
| MapType |
| StructuredType |
| TinyIntType |
| SmallIntType |
| IntType |
| BigIntType |
| BooleanType |
| DoubleType |
| FloatType |
| TimestampType |
| TimeType |
| DateType |
| DayTimeIntervalType |
| YearMonthIntervalType |
| ZonedTimestampType |
| LocalZonedTimestampType |
| NullType |
| RawType |
| RowType |
| MultisetType |
| SymbolType |
| UserDefinedType |
| UnresolvedUserDefinedType |
相比Hive的数据类型,Flink定义的数据类型要丰富得多,并且将基本数据类型也分开定义了。Flink开源社区在Flink Hive Connector包中提供了Flink和Hive数据类型的互相转换:
//HiveTypeUtil.class
//Hive数据类型转换为Flink数据类型
public static DataType toFlinkType(TypeInfo hiveType) {
Preconditions.checkNotNull(hiveType, "hiveType cannot be null");
switch(hiveType.getCategory()) {
case PRIMITIVE:
return toFlinkPrimitiveType((PrimitiveTypeInfo)hiveType);
case LIST:
ListTypeInfo listTypeInfo = (ListTypeInfo)hiveType;
return DataTypes.ARRAY(toFlinkType(listTypeInfo.getListElementTypeInfo()));
case MAP:
MapTypeInfo mapTypeInfo = (MapTypeInfo)hiveType;
return DataTypes.MAP(toFlinkType(mapTypeInfo.getMapKeyTypeInfo()), toFlinkType(mapTypeInfo.getMapValueTypeInfo()));
case STRUCT:
StructTypeInfo structTypeInfo = (StructTypeInfo)hiveType;
List<String> names = structTypeInfo.getAllStructFieldNames();
List<TypeInfo> typeInfos = structTypeInfo.getAllStructFieldTypeInfos();
Field[] fields = new Field[names.size()];
for(int i = 0; i < fields.length; ++i) {
fields[i] = DataTypes.FIELD((String)names.get(i), toFlinkType((TypeInfo)typeInfos.get(i)));
return DataTypes.ROW(fields);
default:
throw new UnsupportedOperationException(String.format("Flink doesn't support Hive data type %s yet.", hiveType));
private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
Preconditions.checkNotNull(hiveType, "hiveType cannot be null");
switch(hiveType.getPrimitiveCategory()) {
case CHAR:
return DataTypes.CHAR(((CharTypeInfo)hiveType).getLength());
case VARCHAR:
return DataTypes.VARCHAR(((VarcharTypeInfo)hiveType).getLength());
case STRING:
return DataTypes.STRING();
case BOOLEAN:
return DataTypes.BOOLEAN();
case BYTE:
return DataTypes.TINYINT();
case SHORT:
return DataTypes.SMALLINT();
case INT:
return DataTypes.INT();
case LONG:
return DataTypes.BIGINT();
case FLOAT:
return DataTypes.FLOAT();
case DOUBLE:
return DataTypes.DOUBLE();
case DATE:
return DataTypes.DATE();
case TIMESTAMP:
return DataTypes.TIMESTAMP(9);
case BINARY:
return DataTypes.BYTES();
case DECIMAL:
DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo)hiveType;
return DataTypes.DECIMAL(decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
default:
throw new UnsupportedOperationException(String.format("Flink doesn't support Hive primitive type %s yet", hiveType));
//Flink数据类型转为Hive数据类型
public static TypeInfo toHiveTypeInfo(DataType dataType, boolean checkPrecision) {
Preconditions.checkNotNull(dataType, "type cannot be null");
LogicalType logicalType = dataType.getLogicalType();
return (TypeInfo)logicalType.accept(new HiveTypeUtil.TypeInfoLogicalTypeVisitor(dataType, checkPrecision));
}
再来看Iceberg的数据类型定义:
| StringType |
| FixedType |
| BinaryType |
| DecimalType |
| ListType |
| MapType |
| StructType |
| IntegerType |
| LongType |
| BooleanType |
| DoubleType |
| FloatType |
| TimestampType |
| TimeType |
| DateType |
| UUIDType |
Iceberg的数据类型定义也是将复合数据类型和基本数据类型独立定义,但是相比Flink,类型相对较少,Iceberg类型作为中间态的存在,Iceberg同样提供了数据类型跟Flink、Hive的互相转换:
//HiveSchemaUtil.class
//Iceberg类型转换为Hive类型
public static TypeInfo convert(Type type) {
return TypeInfoUtils.getTypeInfoFromTypeString(convertToTypeString(type));
//Hive类型转换为Iceberg类型
public static Type convert(TypeInfo typeInfo) {
return HiveSchemaConverter.convert(typeInfo, false);
}
Iceberg类型转换为Hive类型的思路是先将Iceberg对象表示的类型转换为字符串表示的中间结果,然后再用Hive的TypeInfo解析器解析为Hive支持的数据类型。
而将Hive类型转换为Iceberg类型,仍然采用传统的逐一匹配方式:
//HiveSchemaConverter.class
Type convertType(TypeInfo typeInfo) {
switch(typeInfo.getCategory()) {
case PRIMITIVE:
switch(((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory()) {
case FLOAT:
return FloatType.get();
case DOUBLE:
return DoubleType.get();
case BOOLEAN:
return BooleanType.get();
case BYTE:
case SHORT:
Preconditions.checkArgument(this.autoConvert, "Unsupported Hive type: %s, use integer instead", ((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory());
LOG.debug("Using auto conversion from SHORT/BYTE to INTEGER");
return IntegerType.get();
case INT:
return IntegerType.get();
case LONG:
return LongType.get();
case BINARY:
return BinaryType.get();
case CHAR:
case VARCHAR:
Preconditions.checkArgument(this.autoConvert, "Unsupported Hive type: %s, use string instead", ((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory());
LOG.debug("Using auto conversion from CHAR/VARCHAR to STRING");
return StringType.get();
case STRING:
return StringType.get();
case TIMESTAMP:
return TimestampType.withoutZone();
case DATE:
return DateType.get();
case DECIMAL:
DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo)typeInfo;
return DecimalType.of(decimalTypeInfo.precision(), decimalTypeInfo.scale());
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
default:
if ("TIMESTAMPLOCALTZ".equalsIgnoreCase(((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory().name())) {
return TimestampType.withZone();
throw new IllegalArgumentException("Unsupported Hive type (" + ((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory() + ") for Iceberg tables.");
case STRUCT:
StructTypeInfo structTypeInfo = (StructTypeInfo)typeInfo;
List<NestedField> fields = this.convertInternal(structTypeInfo.getAllStructFieldNames(), structTypeInfo.getAllStructFieldTypeInfos(), Collections.emptyList());
return StructType.of(fields);
case MAP:
MapTypeInfo mapTypeInfo = (MapTypeInfo)typeInfo;
Type keyType = this.convertType(mapTypeInfo.getMapKeyTypeInfo());
Type valueType = this.convertType(mapTypeInfo.getMapValueTypeInfo());
int keyId = this.id++;
int valueId = this.id++;
return MapType.ofOptional(keyId, valueId, keyType, valueType);
case LIST:
ListTypeInfo listTypeInfo = (ListTypeInfo)typeInfo;
Type listType = this.convertType(listTypeInfo.getListElementTypeInfo());
return ListType.ofOptional(this.id++, listType);
case UNION:
default:
throw new IllegalArgumentException("Unknown type " + typeInfo.getCategory());
}
Flink类型转换为Iceberg采用了访问器设计模式,从实现中很明了地可知类型转换支持的类型:
//FlinkTypeToType.class
public Type visit(CharType charType) {
return StringType.get();
public Type visit(VarCharType varCharType) {
return StringType.get();
public Type visit(BooleanType booleanType) {
return org.apache.iceberg.types.Types.BooleanType.get();
public Type visit(BinaryType binaryType) {
return FixedType.ofLength(binaryType.getLength());
public Type visit(VarBinaryType varBinaryType) {
return org.apache.iceberg.types.Types.BinaryType.get();
public Type visit(DecimalType decimalType) {
return org.apache.iceberg.types.Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
public Type visit(TinyIntType tinyIntType) {
return IntegerType.get();
public Type visit(SmallIntType smallIntType) {
return IntegerType.get();
public Type visit(IntType intType) {
return IntegerType.get();
public Type visit(BigIntType bigIntType) {
return LongType.get();
public Type visit(FloatType floatType) {
return org.apache.iceberg.types.Types.FloatType.get();
public Type visit(DoubleType doubleType) {
return org.apache.iceberg.types.Types.DoubleType.get();
public Type visit(DateType dateType) {
return org.apache.iceberg.types.Types.DateType.get();
public Type visit(TimeType timeType) {
return org.apache.iceberg.types.Types.TimeType.get();
public Type visit(TimestampType timestampType) {
return org.apache.iceberg.types.Types.TimestampType.withoutZone();
public Type visit(LocalZonedTimestampType localZonedTimestampType) {
return org.apache.iceberg.types.Types.TimestampType.withZone();
public Type visit(ArrayType arrayType) {
Type elementType = (Type)arrayType.getElementType().accept(this);
return arrayType.getElementType().isNullable() ? ListType.ofOptional(this.getNextId(), elementType) : ListType.ofRequired(this.getNextId(), elementType);
public Type visit(MultisetType multisetType) {
Type elementType = (Type)multisetType.getElementType().accept(this);
return MapType.ofRequired(this.getNextId(), this.getNextId(), elementType, IntegerType.get());
public Type visit(org.apache.flink.table.types.logical.MapType mapType) {
Type keyType = (Type)mapType.getKeyType().accept(this);
Type valueType = (Type)mapType.getValueType().accept(this);
return mapType.getValueType().isNullable() ? MapType.ofOptional(this.getNextId(), this.getNextId(), keyType, valueType) : MapType.ofRequired(this.getNextId(), this.getNextId(), keyType, valueType);
public Type visit(RowType rowType) {
List<NestedField> newFields = Lists.newArrayListWithExpectedSize(rowType.getFieldCount());
boolean isRoot = this.root == rowType;
List<Type> types = (List)rowType.getFields().stream().map((f) -> {
return (Type)f.getType().accept(this);
}).collect(Collectors.toList());
for(int i = 0; i < rowType.getFieldCount(); ++i) {
int id = isRoot ? i : this.getNextId();
RowField field = (RowField)rowType.getFields().get(i);
String name = field.getName();
String comment = (String)field.getDescription().orElse((Object)null);
if (field.getType().isNullable()) {
newFields.add(NestedField.optional(id, name, (Type)types.get(i), comment));
} else {
newFields.add(NestedField.required(id, name, (Type)types.get(i), comment));
return StructType.of(newFields);
}
同样地,在TypeToFlinkType中也实现了Iceberg类型转为Flink类型:
//TypeToFlinkType.class
public LogicalType struct(StructType struct, List<LogicalType> fieldResults) {
List<NestedField> fields = struct.fields();
List<RowField> flinkFields = Lists.newArrayListWithExpectedSize(fieldResults.size());
for(int i = 0; i < fields.size(); ++i) {
NestedField field = (NestedField)fields.get(i);
LogicalType type = (LogicalType)fieldResults.get(i);
RowField flinkField = new RowField(field.name(), type.copy(field.isOptional()), field.doc());
flinkFields.add(flinkField);
return new RowType(flinkFields);
public LogicalType field(NestedField field, LogicalType fieldResult) {
return fieldResult;
public LogicalType list(ListType list, LogicalType elementResult) {
return new ArrayType(elementResult.copy(list.isElementOptional()));
public LogicalType map(MapType map, LogicalType keyResult, LogicalType valueResult) {
return new org.apache.flink.table.types.logical.MapType(keyResult.copy(false), valueResult.copy(map.isValueOptional()));
public LogicalType primitive(PrimitiveType primitive) {
switch(primitive.typeId()) {
case BOOLEAN:
return new BooleanType();
case INTEGER:
return new IntType();
case LONG:
return new BigIntType();
case FLOAT:
return new FloatType();
case DOUBLE:
return new DoubleType();
case DATE:
return new DateType();
case TIME:
return new TimeType();
case TIMESTAMP:
TimestampType timestamp = (TimestampType)primitive;
if (timestamp.shouldAdjustToUTC()) {
return new LocalZonedTimestampType(6);
return new org.apache.flink.table.types.logical.TimestampType(6);
case STRING:
return new VarCharType(2147483647);
case UUID:
return new BinaryType(16);
case FIXED:
FixedType fixedType = (FixedType)primitive;
return new BinaryType(fixedType.length());
case BINARY:
return new VarBinaryType(2147483647);
case DECIMAL: