5.1 DataStream类型系统
DataStream是面向开发者的比较偏底层的API,在Flink SQL推出之前是Flink主要的开发接口。开发者在开发DataStream应用的时候,就像在编写Java或者Scala的程序一样,在定义数据类型的时候,使用的是Java或者Scala的类型,如代码清单5-1所示。
代码清单5-1 WindowWordCount代码示例
上述代码中,Splitter#flatMap的输入是String,输出是Tuple2<String, Integer>,其中涉及了String、Integer、Tuple2这样的Java类型,也使用了泛型。对于Flink而言,在运行时需要将这些不同类型的数据序列化为二进制数据,在内存中保存或者在网络上传输。
5.1.1 物理类型
Flink中支持的物理类型如图5-2所示。为了方便开发者,对于大部分的常用类型,Flink都提供了内置的类型描述和序列化/反序列化方法,但是对于用户自定义类型,需要用户注册该类型,并自行实现序列化、反序列化的方法。
图5-2 Flink物理类型分类
如果开发者在编写Flink应用过程中使用了自定义类型,并且又没有提供类型的注册和序列化/反序列化方法,Flink就无法对该类型进行该自定义序列化/反序列化。此时为了Flink的正常运行,对于这一类的数据类型,无法识别的类型就会交给Kryo进行序列化。Kryo可以对任意类型的Java对象进行序列化,是一种Java中的通用序列化方式,缺点是序列化/反序列化效率相对较低。
5.1.2 逻辑类型
Flink应用开发者使用的是Java/Scala的原生类型,对于Flink而言,UDF的输入和返回类型在开发时都是物理类型。逻辑类型是物理类型的描述,Flink在运行时会根据逻辑类型进行数据的序列化和反序列化。
TypeInformation是Flink类型系统的核心类,所有的逻辑类型都继承自该类,Flink的逻辑类型分类如图5-3所示。
TypeInformation作为核心抽象类,是Java/Scala对象类型和Flink的二进制数据之间的桥梁,在其中定义了关键的序列化器的方法createSerializer (ExecutionConfig config),所有的逻辑类型都必须实现该方法,为该类型提供定制的序列化器Serializer,在作业执行时,序列化器将Java/Scala对象序列化成二进制数据,从二进制数据中心反序列化为Java/Scala对象。
为了降低逻辑类型系统给开发者带来的负担,Flink内置了大量类型的逻辑类型,对Java基本类型(如String、Integer等)、数组类型、对象类型(如Tuple、Crow、Pojo对象)、集合类型等都提供了默认实现,这些类型可以直接使用,Flink会自动将其识别为对应的逻辑类型。
图5-3 Flink逻辑类型分类
Flink作业在执行时被分发到不同的机器上执行,逻辑类型作为重要的类型信息,也需要分发到各个计算节点上,类型信息本身的序列化使用的是Java自带的序列化机制,实现了Serializable接口。
5.1.3 类型推断
在上文提到过,开发者使用的是物理类型,而Flink运行时需要的是逻辑类型,所以需要从物理类型包装为逻辑类型。那么如何从开发者编写的代码中提取Function的输入输出类型呢?
Flink使用了两种开发语言Java和Scala,两者的类型提取的实现方式不同。Java的Flink应用使用反射机制获取Function的输入和输出类型。Scala使用Scala Macro类提取类型。
1. 类型提取的时机
类型信息很重要,那么类型提取是在什么时候发生的呢?如代码清单5-2所示。
代码清单5-2 DataStream#map方法类型提取示例
从上述代码中可以看到,在使用DataStream#map接口的时候,就会触发类型的提取。
2. 自动类型推断
Flink首先会自动进行类型推断,但是对于一些带有泛型的类型,Java泛型的类型擦除机制会导致Flink在处理Lambda表达式的类型推断时不能保证一定能提取到类型。
Java泛型(Generic)的引入加强了参数类型的安全性,减少了类型的转换,但有一点需要注意:Java的泛型机制是在编译级别实现的。编译器生成的字节码在运行期间并不包含泛型的类型信息。
此时就需要为Flink的应用提供类型信息,TypeHint的用途即在于此,使用TypeHint的匿名类来获取泛型的类型信息,如代码清单5-3所示。
代码清单5-3 使用TypeHint在运行时进行类型提取
上述代码中使用匿名内部类来获取泛型信息,其中new TypeHint<Tuple3<String, String, Double>>(){}就是用来在类型擦除的情况下来获取泛型信息的。
3. Lambda函数的类型提取
Flink类型提取依赖于继承等机制,但Lambda函数比较特殊,其类型提取是匿名的,也没有与之相关的类,所以其类型信息较难获取。Eclipse的JDT编译器会把Lambda函数的泛型签名等信息写入编译后的字节码中,而对于javac等常见的其他编译器,则不会这样做,因而Flink就无法获取具体类型信息了。
所以在Flink中借鉴了Google GSON的TypeToken的实现,使用TypeHint的匿名类来保存类型信息。
(1)Java类型擦除的原因
1)避免JVM的重构。如果JVM将泛型类型延续到运行期,那么到运行期时JV就需要进行大量的重构工作,提高了运行期的效率。
2)版本兼容。在编译期擦除可以更好地支持原生类型(Raw Type)。
(2)Java泛型类型擦除规则
1)如果是继承基类而来的泛型,就用getGenericSuperclass(), 转型为ParameterizedType来获得实际类型。
2)如果是实现接口而来的泛型,就用getGenericInterfaces(), 针对其中的元素转型为ParameterizedType来获得实际类型。
3)Java泛型在字节码中会被擦除,并不总是擦除为Object类型,而是擦除到上限类型。
5.1.4 显式类型
一般情况下,Flink除了自动类型推断之外,还提供了显式的类型声明,可以手动创建TypeInfomation,为了简化使用使用,Flink提供了两层简化的类型使用方式。
1. 按照数据类型的快捷方式
例如,BasicTypeInfo这个类定义了基本类型的TypeInfomation的快捷声明,如String、Boolean、Byte、Short\Integer、Long、Float、Double、Char等。
2. 通用的类型快捷方式
使用上述类型声明方式已经比使用TypeInfomation方便快捷了,但是使用不同的类型还是要引入不同的类型声明类,所以Flink还提供了等价的Types类(org.apache.flink.api.common.typeinfo.Types),Types作为类型声明的统一入口,基本涵盖了常用类型。
注:Flink Table & SQL的Types类,在旧版本中是提供给SQL的类型声明,现在已经被标记为废弃。
5.1.5 类型系统存在的问题
Flink内置的类型系统虽然强大而灵活,但仍然有一些需要注意的点。
1. Lambda函数的类型提取
因为类型擦除导致Lambda函数的类型提取并不能总是有效的,有时候需要手动指定类型。
2. Kryo的JavaSerializer在Flink下存在Bug,可能导致ClassNotFound异常
推荐使用org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer而非com.esot-ericsoftware.kryo.serializers.JavaSerializer,以防止与Flink不兼容。