Flink内核原理与实现
上QQ阅读APP看书,第一时间看更新

5.1 DataStream类型系统

DataStream是面向开发者的比较偏底层的API,在Flink SQL推出之前是Flink主要的开发接口。开发者在开发DataStream应用的时候,就像在编写Java或者Scala的程序一样,在定义数据类型的时候,使用的是Java或者Scala的类型,如代码清单5-1所示。

代码清单5-1 WindowWordCount代码示例

上述代码中,Splitter#flatMap的输入是String,输出是Tuple2<String, Integer>,其中涉及了String、Integer、Tuple2这样的Java类型,也使用了泛型。对于Flink而言,在运行时需要将这些不同类型的数据序列化为二进制数据,在内存中保存或者在网络上传输。

5.1.1 物理类型

Flink中支持的物理类型如图5-2所示。为了方便开发者,对于大部分的常用类型,Flink都提供了内置的类型描述和序列化/反序列化方法,但是对于用户自定义类型,需要用户注册该类型,并自行实现序列化、反序列化的方法。

图5-2 Flink物理类型分类

如果开发者在编写Flink应用过程中使用了自定义类型,并且又没有提供类型的注册和序列化/反序列化方法,Flink就无法对该类型进行该自定义序列化/反序列化。此时为了Flink的正常运行,对于这一类的数据类型,无法识别的类型就会交给Kryo进行序列化。Kryo可以对任意类型的Java对象进行序列化,是一种Java中的通用序列化方式,缺点是序列化/反序列化效率相对较低。

5.1.2 逻辑类型

Flink应用开发者使用的是Java/Scala的原生类型,对于Flink而言,UDF的输入和返回类型在开发时都是物理类型。逻辑类型是物理类型的描述,Flink在运行时会根据逻辑类型进行数据的序列化和反序列化。

TypeInformation是Flink类型系统的核心类,所有的逻辑类型都继承自该类,Flink的逻辑类型分类如图5-3所示。

TypeInformation作为核心抽象类,是Java/Scala对象类型和Flink的二进制数据之间的桥梁,在其中定义了关键的序列化器的方法createSerializer (ExecutionConfig config),所有的逻辑类型都必须实现该方法,为该类型提供定制的序列化器Serializer,在作业执行时,序列化器将Java/Scala对象序列化成二进制数据,从二进制数据中心反序列化为Java/Scala对象。

为了降低逻辑类型系统给开发者带来的负担,Flink内置了大量类型的逻辑类型,对Java基本类型(如String、Integer等)、数组类型、对象类型(如Tuple、Crow、Pojo对象)、集合类型等都提供了默认实现,这些类型可以直接使用,Flink会自动将其识别为对应的逻辑类型。

图5-3 Flink逻辑类型分类

Flink作业在执行时被分发到不同的机器上执行,逻辑类型作为重要的类型信息,也需要分发到各个计算节点上,类型信息本身的序列化使用的是Java自带的序列化机制,实现了Serializable接口。

5.1.3 类型推断

在上文提到过,开发者使用的是物理类型,而Flink运行时需要的是逻辑类型,所以需要从物理类型包装为逻辑类型。那么如何从开发者编写的代码中提取Function的输入输出类型呢?

Flink使用了两种开发语言Java和Scala,两者的类型提取的实现方式不同。Java的Flink应用使用反射机制获取Function的输入和输出类型。Scala使用Scala Macro类提取类型。

1. 类型提取的时机

类型信息很重要,那么类型提取是在什么时候发生的呢?如代码清单5-2所示。

代码清单5-2 DataStream#map方法类型提取示例

从上述代码中可以看到,在使用DataStream#map接口的时候,就会触发类型的提取。

2. 自动类型推断

Flink首先会自动进行类型推断,但是对于一些带有泛型的类型,Java泛型的类型擦除机制会导致Flink在处理Lambda表达式的类型推断时不能保证一定能提取到类型。

Java泛型(Generic)的引入加强了参数类型的安全性,减少了类型的转换,但有一点需要注意:Java的泛型机制是在编译级别实现的。编译器生成的字节码在运行期间并不包含泛型的类型信息。

此时就需要为Flink的应用提供类型信息,TypeHint的用途即在于此,使用TypeHint的匿名类来获取泛型的类型信息,如代码清单5-3所示。

代码清单5-3 使用TypeHint在运行时进行类型提取

上述代码中使用匿名内部类来获取泛型信息,其中new TypeHint<Tuple3<String, String, Double>>(){}就是用来在类型擦除的情况下来获取泛型信息的。

3. Lambda函数的类型提取

Flink类型提取依赖于继承等机制,但Lambda函数比较特殊,其类型提取是匿名的,也没有与之相关的类,所以其类型信息较难获取。Eclipse的JDT编译器会把Lambda函数的泛型签名等信息写入编译后的字节码中,而对于javac等常见的其他编译器,则不会这样做,因而Flink就无法获取具体类型信息了。

所以在Flink中借鉴了Google GSON的TypeToken的实现,使用TypeHint的匿名类来保存类型信息。

(1)Java类型擦除的原因

1)避免JVM的重构。如果JVM将泛型类型延续到运行期,那么到运行期时JV就需要进行大量的重构工作,提高了运行期的效率。

2)版本兼容。在编译期擦除可以更好地支持原生类型(Raw Type)。

(2)Java泛型类型擦除规则

1)如果是继承基类而来的泛型,就用getGenericSuperclass(), 转型为ParameterizedType来获得实际类型。

2)如果是实现接口而来的泛型,就用getGenericInterfaces(), 针对其中的元素转型为ParameterizedType来获得实际类型。

3)Java泛型在字节码中会被擦除,并不总是擦除为Object类型,而是擦除到上限类型。

5.1.4 显式类型

一般情况下,Flink除了自动类型推断之外,还提供了显式的类型声明,可以手动创建TypeInfomation,为了简化使用使用,Flink提供了两层简化的类型使用方式。

1. 按照数据类型的快捷方式

例如,BasicTypeInfo这个类定义了基本类型的TypeInfomation的快捷声明,如String、Boolean、Byte、Short\Integer、Long、Float、Double、Char等。

2. 通用的类型快捷方式

使用上述类型声明方式已经比使用TypeInfomation方便快捷了,但是使用不同的类型还是要引入不同的类型声明类,所以Flink还提供了等价的Types类(org.apache.flink.api.common.typeinfo.Types),Types作为类型声明的统一入口,基本涵盖了常用类型。

注:Flink Table & SQL的Types类,在旧版本中是提供给SQL的类型声明,现在已经被标记为废弃。

5.1.5 类型系统存在的问题

Flink内置的类型系统虽然强大而灵活,但仍然有一些需要注意的点。

1. Lambda函数的类型提取

因为类型擦除导致Lambda函数的类型提取并不能总是有效的,有时候需要手动指定类型。

2. Kryo的JavaSerializer在Flink下存在Bug可能导致ClassNotFound异常

推荐使用org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer而非com.esot-ericsoftware.kryo.serializers.JavaSerializer,以防止与Flink不兼容。