Flink序列化框架分析

作者 : 开心源码 本文共8825个字,预计阅读时间需要23分钟 发布时间: 2022-05-12 共204人阅读

1.Flink的TypeInformation类

TypeInformation是flink中所有类型的基类,其作为生产序列化器和比较的一个工具。它包括了类型的少量基本属性,并可以通过它来生产序列化器(serializer),特殊情况下还可以生成类型比较器。(Flink中的比较器不仅仅是定义大小顺序,更是解决keys的基本辅助工具)

  • 基本类型:所有Java基本数据类型和对应的装箱类型,加上void,String,Date,BigDecimal和BigInteger
  • 基本数组和对象数组
  • 复合类型:
    • Flink Java Tuples (Flink Java API的一部分): 最多25个成员,不支持null成员
    • Scala case 类 (包括 Scala tuples): 最多25个成员, 不支持null成员
    • Row: 包含任意多个字段的元组并且支持null成员
    • POJOs: 遵循类bean模式的类
  • 辅助类型 (Option, Either, Lists, Maps, …)
  • 泛型: Flink自身不会序列化泛型,而是借助Kryo进行序列化.

POJO类非常有意思,由于POJO类可以支持复杂类型的创立,并且在定义keys时可以使用成员的名字:dataSet.join(another).where(“name”).equalTo(“personName”)。同时,POJO类对于运行时(runtime)是透明的,这使得Flink可以非常高效地解决它们。

1.1 POJO类型的规则

在满足如下条件时,Flink会将这种数据类型识别成POJO类型(并允许以成员名引用字段):

  • 该类是public的并且是独立的(即没有非静态的内部类)
  • 该类有一个public的无参构造方法
  • 该类(及该类的父类)的所有成员要么是public的,要么是拥有按照标准java bean命名规则命名的public getter和 public setter方法。

1.2 创立一个TypeInformation对象或者序列化器###

创立一个TypeInformation对象时如下:

在Scala中,Flink使用在编译时运行的宏,在宏可供调用时去捕获所有泛型信息。

// 重要: 为了能够访问'createTypeInformation' 的宏方法,这个import是必需的import org.apache.flink.streaming.api.scala._val stringInfo: TypeInformation[String] = createTypeInformation[String]val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]

你也可以在Java使用相同的方法作为备选。

为了创立一个序列化器(TypeSerializer),只要要在TypeInformation 对象上调用typeInfo.createSerializer(config)方法。

config参数的类型是ExecutionConfig,它保留了程序的注册的自己设置序列化器的相关信息。在可能用到TypeSerializer的地方,尽量传入程序的ExecutionConfig,你可以调用DataStream 或者 DataSet的 getExecutionConfig()方法获取ExecutionConfig。少量内部方法(如:MapFunction)中,你可以通过将该方法变成一个Rich Function,而后调用getRuntimeContext().getExecutionConfig()获取ExecutionConfig.

2 基本类型实现示例

以String为例:

//BasicTypeInfo.javapublic static final BasicTypeInfo<String> STRING_TYPE_INFO = new BasicTypeInfo<>(String.class, new Class<?>[]{}, StringSerializer.INSTANCE, StringComparator.class);

StringSerializer如下

//StringSerializer.javapublic final class StringSerializer extends TypeSerializerSingleton<String> {    private static final long serialVersionUID = 1L;        public static final StringSerializer INSTANCE = new StringSerializer();        private static final String EMPTY = "";    @Override    public boolean isImmutableType() {        return true;    }    @Override    public String createInstance() {        return EMPTY;    }    @Override    public String copy(String from) {        return from;    }        @Override    public String copy(String from, String reuse) {        return from;    }    @Override    public int getLength() {        return -1;    }    @Override    public void serialize(String record, DataOutputView target) throws IOException {        StringValue.writeString(record, target);    }    @Override    public String deserialize(DataInputView source) throws IOException {        return StringValue.readString(source);    }        @Override    public String deserialize(String record, DataInputView source) throws IOException {        return deserialize(source);    }    @Override    public void copy(DataInputView source, DataOutputView target) throws IOException {        StringValue.copyString(source, target);    }    @Override    public boolean canEqual(Object obj) {        return obj instanceof StringSerializer;    }    @Override    protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {        return super.isCompatibleSerializationFormatIdentifier(identifier)                || identifier.equals(StringValue.class.getCanonicalName());    }}

上面代码中出现的StringValue是真正进行input以及output序列化过程操作,基本类型都有相应的方法,后面会单独说明下多字段Record序列化形式。
StringComparator如下

public final class StringComparator extends BasicTypeComparator<String> {    private static final long serialVersionUID = 1L;        private static final int HIGH_BIT = 0x1 << 7;        private static final int HIGH_BIT2 = 0x1 << 13;        private static final int HIGH_BIT2_MASK = 0x3 << 6;            public StringComparator(boolean ascending) {        super(ascending);    }    @Override    public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {        String s1 = StringValue.readString(firstSource);        String s2 = StringValue.readString(secondSource);        int comp = s1.compareTo(s2);         return ascendingComparison ? comp : -comp;    }    @Override    public boolean supportsNormalizedKey() {        return true;    }    @Override    public boolean supportsSerializationWithKeyNormalization() {        return false;    }    @Override    public int getNormalizeKeyLen() {        return Integer.MAX_VALUE;    }    @Override    public boolean isNormalizedKeyPrefixOnly(int keyBytes) {        return true;    }    @Override    public void putNormalizedKey(String record, MemorySegment target, int offset, int len) {;        final int limit = offset + len;        final int end = record.length();        int pos = 0;                while (pos < end && offset < limit) {            char c = record.charAt(pos++);            if (c < HIGH_BIT) {                target.put(offset++, (byte) c);            }            else if (c < HIGH_BIT2) {                target.put(offset++, (byte) ((c >>> 7) | HIGH_BIT));                if (offset < limit) {                    target.put(offset++, (byte) c);                }            }            else {                target.put(offset++, (byte) ((c >>> 10) | HIGH_BIT2_MASK));                if (offset < limit) {                    target.put(offset++, (byte) (c >>> 2));                }                if (offset < limit) {                    target.put(offset++, (byte) c);                }            }        }        while (offset < limit) {            target.put(offset++, (byte) 0);        }    }    @Override    public StringComparator duplicate() {        return new StringComparator(ascendingComparison);    }}

3 多字段Record示例

在开始这部分原理分析之前可以先看个示例代码

//RecordTest.javapublic void testAddField() {  try {    // Add a value to an empty record    Record record = new Record();    assertTrue(record.getNumFields() == 0);    record.addField(this.origVal1);    assertTrue(record.getNumFields() == 1);    assertTrue(origVal1.getValue().equals(record.getField(0, StringValue.class).getValue()));        // Add 100 random integers to the record    record = new Record();    for (int i = 0; i < 100; i++) {      IntValue orig = new IntValue(this.rand.nextInt());      record.addField(orig);      IntValue rec = record.getField(i, IntValue.class);            assertTrue(record.getNumFields() == i + 1);      assertTrue(orig.getValue() == rec.getValue());    }        // Add 3 values of different type to the record    record = new Record(this.origVal1, this.origVal2);    record.addField(this.origVal3);        assertTrue(record.getNumFields() == 3);        StringValue recVal1 = record.getField(0, StringValue.class);    DoubleValue recVal2 = record.getField(1, DoubleValue.class);    IntValue recVal3 = record.getField(2, IntValue.class);        assertTrue("The value of the first field has changed", recVal1.equals(this.origVal1));    assertTrue("The value of the second field changed", recVal2.equals(this.origVal2));    assertTrue("The value of the third field has changed", recVal3.equals(this.origVal3));  } catch (Throwable t) {    Assert.fail("Test failed due to an exception: " + t.getMessage());  }}

Record代表多个数值的记录,其可以包含多个字段(可空并不表现在该记录中),内部有一个bitmap标记字段能否被赋值。为了数据交换方便,Record中的数据都以bytes方式存储,字段在访问时才被进行反序列化。当字段被修改时首先是放在cache中,并在下次序列化时合入或者者显式调用updateBinaryRepresenation()方法。
Notes:

  • 该record必需是一个可变的对象,这样才可以被多个自己设置方法使用来提升性能(后面单独分析)。该record是一个比较中的对象,为了减少对每个字段的序列化、反序列化操作,其保存了比较大的状态,需要有多个指针以及数组,从而要占用相比照较大的内存空间,在64位的JVM中要占用超过200bytes。
  • 该类是非线程安全的

4 存放Record的数据结构

针对上面提出的存放数据结构的疑问,这里继续深入分析下。

  • 将record放在一个迭代器中,当前存在一个叫BlockResettableMutableObjectIterator,其包含如下少量方法,读写都是在这个迭代器中进行。
    Record迭代器.png

其中以无参数next()方法为示例走读存储或者者读取流程,代码如下:

public T next() throws IOException {        // check for the left over element        if (this.readPhase) {            return getNextRecord();        } else {            // writing phase. check for leftover first            T result = null;            if (this.leftOverReturned) {                // get next record                if ((result = this.input.next()) != null) {                    if (writeNextRecord(result)) {                        return result;                    } else {                        // did not fit into memory, keep as leftover                        this.leftOverRecord = this.serializer.copy(result);                        this.leftOverReturned = false;                        this.fullWriteBuffer = true;                        return null;                    }                } else {                    this.noMoreBlocks = true;                    return null;                }            } else if (this.fullWriteBuffer) {                return null;            } else {                this.leftOverReturned = true;                return this.leftOverRecord;            }        }    }

通过源码可以看出,在方法执行时根据标记判断是读取还是写入流程,同时方法对应getNextRecord和writeNextRecord两个方法,都在笼统类AbstractBlockResettableIterator中,两个方法源码如下:

protected T getNextRecord() throws IOException {        if (this.numRecordsReturned < this.numRecordsInBuffer) {            this.numRecordsReturned++;            return this.serializer.deserialize(this.readView);        } else {            return null;        }    }
protected boolean writeNextRecord(T record) throws IOException {        try {            this.serializer.serialize(record, this.collectingView);            this.numRecordsInBuffer++;            return true;        } catch (EOFException eofex) {            return false;        }    }

其中存放数据是基于Flink内存管理部分进行申请以及维护大小等,相关初始化源码如下:

 memoryManager.allocatePages(ownerTask, emptySegments, numPages);         this.collectingView = new SimpleCollectingOutputView(this.fullSegments,                         new ListMemorySegmentSource(this.emptySegments), memoryManager.getPageSize()); this.readView = new RandomAccessInputView(this.fullSegments, memoryManager.getPageSize());

5 Flink 如何直接操作二进制数据

Flink 提供了如 group、sort、join 等操作,这些操作都需要访问海量数据。这里,我们以sort为例,这是一个在 Flink 中使用非常频繁的操作。
首先,Flink 会从 MemoryManager 中申请一批 MemorySegment,我们把这批 MemorySegment 称作 sort buffer,用来存放排序的数据。

sort示例.png

我们会把 sort buffer 分成两块区域。一个区域是用来存放所有对象完整的二进制数据。另一个区域用来存放指向完整二进制数据的指针以及定长的序列化后的key(key+pointer)。假如需要序列化的key是个变长类型,如String,则会取其前缀序列化。如上图所示,当一个对象要加到 sort buffer 中时,它的二进制数据会被加到第一个区域,指针(可能还有key)会被加到第二个区域。

将实际的数据和指针加定长key分开存放有两个目的。第一,交换定长块(key+pointer)更高效,不用交换真实的数据也不用移动其余key和pointer。第二,这样做是缓存友好的,由于key都是连续存储在内存中的,可以大大减少 cache miss(后面会详细解释)。

排序的关键是比大小和交换。Flink 中,会先用 key 比大小,这样即可以直接用二进制的key比较而不需要反序列化出整个对象。由于key是定长的,所以假如key相同(或者者没有提供二进制key),那就必需将真实的二进制数据反序列化出来,而后再做比较。之后,只要要交换key+pointer即可以达到排序的效果,真实的数据不用移动。

sort指针.png

最后,访问排序后的数据,可以沿着排好序的key+pointer区域顺序访问,通过pointer找到对应的真实数据,并写到内存或者外部(更多细节可以看这篇文章 Joins in Flink)。

5.1 缓存友好的数据结构和算法

随着磁盘IO和网络IO越来越快,CPU逐步成为了大数据领域的瓶颈。从 L1/L2/L3 缓存读取数据的速度比从主内存读取数据的速度快好几个量级。通过性能分析可以发现,CPU时间中的很大一部分都是白费在等待数据从主内存过来上。假如这些数据可以从 L1/L2/L3 缓存过来,那么这些等待时间可以极大地降低,并且所有的算法会因而而受益。

在上面探讨中我们谈到的,Flink 通过定制的序列化框架将算法中需要操作的数据(如sort中的key)连续存储,而完整数据存储在其余地方。由于对于完整的数据来说,key+pointer更容易装进缓存,这大大提高了缓存命中率,从而提高了基础算法的效率。这对于上层应用是完全透明的,可以充分享受缓存友好带来的性能提升。

References

  • 1.Data Types & Serialization

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » Flink序列化框架分析

发表回复