如何实现Flink、Iceberg和Hive数据类型之间的互转——《DEEPNOVA开发者社区》

作者:闻乃松

Flink、Iceberg和Hive都提供了丰富的SQL 数据类型,但是在具体使用上,还是存在一些差异和不兼容的情况,当将三套引擎放在一起使用时候,经常会碰到将几种引擎的类型之间来回转换。问题是,这几套独立的数据类型是否支持互相转换,以及如何才能减轻转换的复杂性?

为此,首先需要仔细地调研几种引擎对数据类型的支持情况,现将三种引擎支持的数据类型和对应关系罗列如下:

Flink Iceberg Hive
CHAR CHAR
VARCHAR VARCHAR
STRING string STRING
BOOLEAN boolean BOOLEAN
BINARY fixed(L)
VARBINARY binary BINARY
BYTES
DECIMAL decimal(P,S) DECIMAL
TINYINT TINYINT
SMALLINT SMALLINT
INTEGER int INT
BIGINT long BIGINT
FLOAT float FLOAT
DOUBLE double DOUBLE
DATE date DATE
TIME time
TIMESTAMP timestamp TIMESTAMP
TIMESTAMP_LTZ timestamptz
INTERVAL INTERVAL
ARRAY list<E> array
MULTISET
MAP map<K, V> map
ROW struct<...> struct
RAW
union
Structured types struct<...>


数据类型在这三种引擎之间都可以互转,但是最终存储和持久化到Hive元数据中,首先来看Hive的数据类型。
Hive将数据类型分两大类:
Complex类型:map,array ,struct,union
Primitive类型: 除Complex类型之外,如 int,float, double 等
在Hive serde库中,定义了几种类型分类:

//ObjectInspector.class
public static enum Category {
    PRIMITIVE, LIST, MAP, STRUCT, UNION
};


其中Complex数据类型的使用语法格式分别为:
ARRAY<data_type>
MAP<primitive_type, data_type>
STRUCT<col_name : data_type [COMMENT col_comment], ...>
UNIONTYPE<data_type, data_type, ...>

举个建表示例:
CREATE TABLE union_test(foo UNIONTYPE<int, double, array<string>, struct<a:int,b:string>>);

每种数据类型在网络传输中都涉及到类型的序列化和反序列化,因此数据类型都有显示名称,也就是字符串表示的名字,如:
INT的显示类型名称为 int
CHAR的显示类型名称为 char
VARCHAR的显示类型名称为 varchar(length)
ARRAY的显示类型名称为 array<data_type>
MAP的显示类型名称为 map<primitive_type, data_type>
STRUCT的显示类型名称为 struct<col_name:data_type,...>
UNION的显示类型名称为uniontype<data_type, data_type, ...>

Hive serde库提供了对这些类型的序列化和反序列化方法:

//TypeInfoUtils.class
public static ArrayList<TypeInfo> getTypeInfosFromTypeString(String typeString) ;
public static List<String> getTypeStringsFromTypeInfo(List<TypeInfo> typeInfos)


举个例子,将map类型的字符串表示解析成TypeInfo类型对象表示:

ArrayList<TypeInfo> typeInfo = TypeInfoUtils.getTypeInfosFromTypeString("map<boolean,boolean>");


其中TypeInfo是Hive定义的类型信息的对象接口表示,Hive总共实现了以下这些TypeInfo子类:

CharTypeInfo
VarcharTypeInfo
DecimalTypeInfo
ListTypeInfo
MapTypeInfo
StructTypeInfo
PrimitiveTypeInfo
UnionTypeInfo

而其中的DecimalTypeInfo属于基本数据类型的合集,

public static enum PrimitiveCategory {
    VOID,
    BOOLEAN,
    BYTE,
    SHORT,
    LONG,
    FLOAT,
    DOUBLE,
    STRING,
    DATE,
    TIMESTAMP,
    TIMESTAMPLOCALTZ,
    BINARY,
    DECIMAL,
    VARCHAR,
    CHAR,
    INTERVAL_YEAR_MONTH,
    INTERVAL_DAY_TIME,
    UNKNOWN;
}


在PrimitiveCategory定义里, DECIMAL, VARCHAR,CHAR 被归属于基本类型,但是又实现了相应的TypeInfo子类。可见,在数据类型的归属上,Hive存在不一致的地方。

看完了Hive的数据类型,再来看Flink的数据类型定义:

CharType
VarCharType
BinaryType
VarBinaryType
DecimalType
ArrayType
MapType
StructuredType
TinyIntType
SmallIntType
IntType
BigIntType
BooleanType
DoubleType
FloatType
TimestampType
TimeType
DateType
DayTimeIntervalType
YearMonthIntervalType
ZonedTimestampType
LocalZonedTimestampType
NullType
RawType
RowType
MultisetType
SymbolType
UserDefinedType
UnresolvedUserDefinedType

相比Hive的数据类型,Flink定义的数据类型要丰富得多,并且将基本数据类型也分开定义了。Flink开源社区在Flink Hive Connector包中提供了Flink和Hive数据类型的互相转换:

//HiveTypeUtil.class
//Hive数据类型转换为Flink数据类型
public static DataType toFlinkType(TypeInfo hiveType) {
    Preconditions.checkNotNull(hiveType, "hiveType cannot be null");
	switch(hiveType.getCategory()) {
        case PRIMITIVE:
            return toFlinkPrimitiveType((PrimitiveTypeInfo)hiveType);
        case LIST:
            ListTypeInfo listTypeInfo = (ListTypeInfo)hiveType;
            return DataTypes.ARRAY(toFlinkType(listTypeInfo.getListElementTypeInfo()));
        case MAP:
            MapTypeInfo mapTypeInfo = (MapTypeInfo)hiveType;
            return DataTypes.MAP(toFlinkType(mapTypeInfo.getMapKeyTypeInfo()), toFlinkType(mapTypeInfo.getMapValueTypeInfo()));
        case STRUCT:
            StructTypeInfo structTypeInfo = (StructTypeInfo)hiveType;
            List<String> names = structTypeInfo.getAllStructFieldNames();
            List<TypeInfo> typeInfos = structTypeInfo.getAllStructFieldTypeInfos();
            Field[] fields = new Field[names.size()];
            for(int i = 0; i < fields.length; ++i) {
                fields[i] = DataTypes.FIELD((String)names.get(i), toFlinkType((TypeInfo)typeInfos.get(i)));
            return DataTypes.ROW(fields);
        default:
            throw new UnsupportedOperationException(String.format("Flink doesn't support Hive data type %s yet.", hiveType));
private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
    Preconditions.checkNotNull(hiveType, "hiveType cannot be null");
    switch(hiveType.getPrimitiveCategory()) {
    case CHAR:
        return DataTypes.CHAR(((CharTypeInfo)hiveType).getLength());
    case VARCHAR:
        return DataTypes.VARCHAR(((VarcharTypeInfo)hiveType).getLength());
    case STRING:
        return DataTypes.STRING();
    case BOOLEAN:
        return DataTypes.BOOLEAN();
    case BYTE:
        return DataTypes.TINYINT();
    case SHORT:
        return DataTypes.SMALLINT();
    case INT:
        return DataTypes.INT();
    case LONG:
        return DataTypes.BIGINT();
    case FLOAT:
        return DataTypes.FLOAT();
    case DOUBLE:
        return DataTypes.DOUBLE();
    case DATE:
        return DataTypes.DATE();
    case TIMESTAMP:
        return DataTypes.TIMESTAMP(9);
    case BINARY:
        return DataTypes.BYTES();
    case DECIMAL:
        DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo)hiveType;
        return DataTypes.DECIMAL(decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
    default:
        throw new UnsupportedOperationException(String.format("Flink doesn't support Hive primitive type %s yet", hiveType));
//Flink数据类型转为Hive数据类型
public static TypeInfo toHiveTypeInfo(DataType dataType, boolean checkPrecision) {
    Preconditions.checkNotNull(dataType, "type cannot be null");
    LogicalType logicalType = dataType.getLogicalType();
    return (TypeInfo)logicalType.accept(new HiveTypeUtil.TypeInfoLogicalTypeVisitor(dataType, checkPrecision));
}


再来看Iceberg的数据类型定义:

StringType
FixedType
BinaryType
DecimalType
ListType
MapType
StructType
IntegerType
LongType
BooleanType
DoubleType
FloatType
TimestampType
TimeType
DateType
UUIDType

Iceberg的数据类型定义也是将复合数据类型和基本数据类型独立定义,但是相比Flink,类型相对较少,Iceberg类型作为中间态的存在,Iceberg同样提供了数据类型跟Flink、Hive的互相转换:

//HiveSchemaUtil.class
//Iceberg类型转换为Hive类型
public static TypeInfo convert(Type type) {
    return TypeInfoUtils.getTypeInfoFromTypeString(convertToTypeString(type));
//Hive类型转换为Iceberg类型
public static Type convert(TypeInfo typeInfo) {
    return HiveSchemaConverter.convert(typeInfo, false);
}

Iceberg类型转换为Hive类型的思路是先将Iceberg对象表示的类型转换为字符串表示的中间结果,然后再用Hive的TypeInfo解析器解析为Hive支持的数据类型。

而将Hive类型转换为Iceberg类型,仍然采用传统的逐一匹配方式:

//HiveSchemaConverter.class
Type convertType(TypeInfo typeInfo) {
    switch(typeInfo.getCategory()) {
        case PRIMITIVE:
            switch(((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory()) {
                case FLOAT:
                    return FloatType.get();
                case DOUBLE:
                    return DoubleType.get();
                case BOOLEAN:
                    return BooleanType.get();
                case BYTE:
                case SHORT:
                    Preconditions.checkArgument(this.autoConvert, "Unsupported Hive type: %s, use integer instead", ((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory());
                    LOG.debug("Using auto conversion from SHORT/BYTE to INTEGER");
                    return IntegerType.get();
                case INT:
                    return IntegerType.get();
                case LONG:
                    return LongType.get();
                case BINARY:
                    return BinaryType.get();
                case CHAR:
                case VARCHAR:
                    Preconditions.checkArgument(this.autoConvert, "Unsupported Hive type: %s, use string instead", ((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory());
                    LOG.debug("Using auto conversion from CHAR/VARCHAR to STRING");
                    return StringType.get();
                case STRING:
                    return StringType.get();
                case TIMESTAMP:
                    return TimestampType.withoutZone();
                case DATE:
                    return DateType.get();
                case DECIMAL:
                    DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo)typeInfo;
                    return DecimalType.of(decimalTypeInfo.precision(), decimalTypeInfo.scale());
                case INTERVAL_YEAR_MONTH:
                case INTERVAL_DAY_TIME:
                default:
                    if ("TIMESTAMPLOCALTZ".equalsIgnoreCase(((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory().name())) {
                        return TimestampType.withZone();
                    throw new IllegalArgumentException("Unsupported Hive type (" + ((PrimitiveTypeInfo)typeInfo).getPrimitiveCategory() + ") for Iceberg tables.");
        case STRUCT:
            StructTypeInfo structTypeInfo = (StructTypeInfo)typeInfo;
            List<NestedField> fields = this.convertInternal(structTypeInfo.getAllStructFieldNames(), structTypeInfo.getAllStructFieldTypeInfos(), Collections.emptyList());
            return StructType.of(fields);
        case MAP:
            MapTypeInfo mapTypeInfo = (MapTypeInfo)typeInfo;
            Type keyType = this.convertType(mapTypeInfo.getMapKeyTypeInfo());
            Type valueType = this.convertType(mapTypeInfo.getMapValueTypeInfo());
            int keyId = this.id++;
            int valueId = this.id++;
            return MapType.ofOptional(keyId, valueId, keyType, valueType);
        case LIST:
            ListTypeInfo listTypeInfo = (ListTypeInfo)typeInfo;
            Type listType = this.convertType(listTypeInfo.getListElementTypeInfo());
            return ListType.ofOptional(this.id++, listType);
        case UNION:
        default:
            throw new IllegalArgumentException("Unknown type " + typeInfo.getCategory());
}


Flink类型转换为Iceberg采用了访问器设计模式,从实现中很明了地可知类型转换支持的类型:

//FlinkTypeToType.class
public Type visit(CharType charType) {
    return StringType.get();
public Type visit(VarCharType varCharType) {
    return StringType.get();
public Type visit(BooleanType booleanType) {
    return org.apache.iceberg.types.Types.BooleanType.get();
public Type visit(BinaryType binaryType) {
    return FixedType.ofLength(binaryType.getLength());
public Type visit(VarBinaryType varBinaryType) {
    return org.apache.iceberg.types.Types.BinaryType.get();
public Type visit(DecimalType decimalType) {
    return org.apache.iceberg.types.Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale());
public Type visit(TinyIntType tinyIntType) {
    return IntegerType.get();
public Type visit(SmallIntType smallIntType) {
    return IntegerType.get();
public Type visit(IntType intType) {
    return IntegerType.get();
public Type visit(BigIntType bigIntType) {
    return LongType.get();
public Type visit(FloatType floatType) {
    return org.apache.iceberg.types.Types.FloatType.get();
public Type visit(DoubleType doubleType) {
    return org.apache.iceberg.types.Types.DoubleType.get();
public Type visit(DateType dateType) {
    return org.apache.iceberg.types.Types.DateType.get();
public Type visit(TimeType timeType) {
    return org.apache.iceberg.types.Types.TimeType.get();
public Type visit(TimestampType timestampType) {
    return org.apache.iceberg.types.Types.TimestampType.withoutZone();
public Type visit(LocalZonedTimestampType localZonedTimestampType) {
    return org.apache.iceberg.types.Types.TimestampType.withZone();
public Type visit(ArrayType arrayType) {
    Type elementType = (Type)arrayType.getElementType().accept(this);
	return arrayType.getElementType().isNullable() ? ListType.ofOptional(this.getNextId(), elementType) : ListType.ofRequired(this.getNextId(), elementType);
public Type visit(MultisetType multisetType) {
    Type elementType = (Type)multisetType.getElementType().accept(this);
    return MapType.ofRequired(this.getNextId(), this.getNextId(), elementType, IntegerType.get());
public Type visit(org.apache.flink.table.types.logical.MapType mapType) {
    Type keyType = (Type)mapType.getKeyType().accept(this);
    Type valueType = (Type)mapType.getValueType().accept(this);
    return mapType.getValueType().isNullable() ? MapType.ofOptional(this.getNextId(), this.getNextId(), keyType, valueType) : MapType.ofRequired(this.getNextId(), this.getNextId(), keyType, valueType);
public Type visit(RowType rowType) {
    List<NestedField> newFields = Lists.newArrayListWithExpectedSize(rowType.getFieldCount());
    boolean isRoot = this.root == rowType;
    List<Type> types = (List)rowType.getFields().stream().map((f) -> {
        return (Type)f.getType().accept(this);
    }).collect(Collectors.toList());
    for(int i = 0; i < rowType.getFieldCount(); ++i) {
        int id = isRoot ? i : this.getNextId();
        RowField field = (RowField)rowType.getFields().get(i);
        String name = field.getName();
        String comment = (String)field.getDescription().orElse((Object)null);
        if (field.getType().isNullable()) {
        	newFields.add(NestedField.optional(id, name, (Type)types.get(i), comment));
        } else {
        	newFields.add(NestedField.required(id, name, (Type)types.get(i), comment));
    return StructType.of(newFields);
}


同样地,在TypeToFlinkType中也实现了Iceberg类型转为Flink类型:

//TypeToFlinkType.class
public LogicalType struct(StructType struct, List<LogicalType> fieldResults) {
    List<NestedField> fields = struct.fields();
    List<RowField> flinkFields = Lists.newArrayListWithExpectedSize(fieldResults.size());
    for(int i = 0; i < fields.size(); ++i) {
        NestedField field = (NestedField)fields.get(i);
        LogicalType type = (LogicalType)fieldResults.get(i);
        RowField flinkField = new RowField(field.name(), type.copy(field.isOptional()), field.doc());
        flinkFields.add(flinkField);
    return new RowType(flinkFields);
public LogicalType field(NestedField field, LogicalType fieldResult) {
    return fieldResult;
public LogicalType list(ListType list, LogicalType elementResult) {
    return new ArrayType(elementResult.copy(list.isElementOptional()));
public LogicalType map(MapType map, LogicalType keyResult, LogicalType valueResult) {
    return new org.apache.flink.table.types.logical.MapType(keyResult.copy(false), valueResult.copy(map.isValueOptional()));
public LogicalType primitive(PrimitiveType primitive) {
    switch(primitive.typeId()) {
        case BOOLEAN:
            return new BooleanType();
        case INTEGER:
            return new IntType();
        case LONG:
            return new BigIntType();
        case FLOAT:
            return new FloatType();
        case DOUBLE:
            return new DoubleType();
        case DATE:
            return new DateType();
        case TIME:
            return new TimeType();
        case TIMESTAMP:
            TimestampType timestamp = (TimestampType)primitive;
            if (timestamp.shouldAdjustToUTC()) {
                return new LocalZonedTimestampType(6);
            return new org.apache.flink.table.types.logical.TimestampType(6);
        case STRING:
            return new VarCharType(2147483647);
        case UUID:
            return new BinaryType(16);
        case FIXED:
            FixedType fixedType = (FixedType)primitive;
            return new BinaryType(fixedType.length());
        case BINARY:
            return new VarBinaryType(2147483647);
        case DECIMAL: