An image to describe post

按:ETL是目前比较典型的应用,应用面比较广,Exact-Once是其中的重要特性,但是实现起来并不容易。这篇文章是我的朋友Tenyu和朋友所写,以Pravega为例,详细介绍了Exact-Once的原理和实现细节,希望能为感兴趣的朋友提供帮助。Tenyu长期专注于分布式存储领域,目前就职于DellEMC。

作者 | 滕昱、周煜敏、赵凯皓

编辑 | Vincent

AI 前线导读: 所谓的流式处理其实就是对 Stream 的读取 - 处理 - 写入(ETL)操作,应用从 Stream 中读取数据,再对数据进行相应的处理分析,最后将结果写入另一个 Stream 中。其中仅一次语义保证了哪怕系统发生故障,每一个 ETL 操作也仅会被执行一次,不会产生数据的丢失或者重复。这样的可靠性保证对于一些交易、金融类的应用来说至关重要,这就需要 Pravega 作为流存储与流计算引擎共同努力来完成。

更多优质内容请关注微信公众号“AI 前线”(ID:ai-front)

通常来说,对于单独的消息系统而言,语义分为如下三种:

至多一次(At most once):不管 Writer 在等待 ACK 时是否发生超时或者得到错误异常,Writer 都不会重新发送 Event,因此会有数据丢失的风险。在具体的实现过程中,这一种语义无需做任何额外的控制,实现起来最为简单,因此也通常有着最优的性能。在某些特定的场景中,我们只希望追求极致的性能而不关心数据的丢失,可能会选用此方案。

至少一次(At least once):如果 Writer 在等待 ACK 时发生超时或者得到错误异常,Writer 将会重新发送消息,这样能保证每个 Event 至少被处理一次,保证了数据不会丢失,从而提高了系统的可靠性,但同时会带来数据重复的问题,例如,当 Writer 往 Stream 中成功写入一个 Event,但是当系统尝试给 Writer 返回 ACK 的时候出现网络异常,Writer 因没有收到 ACK 而判断为写入 Event 失败,因此 Writer 还是会重新发送此 Event,导致数据重复。

仅一次(Exactly once):在系统发生异常时,Writer 可以尝试多次重新发送 Event,同时能保证最终每个 Event 只被写入一次。一些对数据准确性要求非常高的系统需要保证 exactly-once 语义,譬如支付系统,当用户在移动端付款时,很有可能会因为网络原因导致延时较长甚至超时,用户可能会手动进行刷新操作,如果没有 exactly-once 的语义支持,很有可能会发生两次扣费,我们绝对不希望此类错误发生。

仅一次语义是实现流处理系统正确性 (correctness) 的基石,因此也是流存储 Pravega 自从设计之初就规划好的设计目标。但是,exactly-once 的实现也面临着诸多挑战,例如 Kafka 也直到 0.11 版本引入了 KIP-98 之后才完成了仅一次的支持。这种更强的语义不仅使编写应用程序更容易,而且使 Pravega 有了更为广泛的应用空间。这一篇文章我们将介绍 Pravega 实现这一特性的设计细节,以及和 Flink 社区合作开发的端到端 (end-to-end) 的 exactly-once 的实现。

Pravega 自身的 exactly-once 语义

从之前对三种语义的描述可以看出,要满足 exactly-once 的语义,需要对于可能发生的故障具有足够的恢复机制,来保证最终结果的每一条 Event 仅被写入一次。Pravega 通过实现了读写端的以下两个性质,增强了 Pravega 的读写语义,完成了这一目标:

  1. 数据的可恢复性

  2. Writer 的幂等性

其中数据的可恢复性保证了在 Pravega 读取数据过程中,发生故障之后数据进行正确重放的可能性,需要在读客户端优化。而幂等性保证了单一一条数据在数据写入的过程不会出现重复,需要在写客户端优化。

数据的可恢复性

发生故障时,数据的恢复需要有以下三个性质的保障,才能有数据恢复的前提。

持久性 Durability

持久性是指一旦写操作确认成功,即使系统在发生故障的情况下数据都不会丢失,保证了数据的可恢复性。Pravega 利用了 Bookkeeper 以及分层的存储实现了流式存储系统的持久性,帮助用户解决了数据可能会丢失的问题。用户只需要直接使用 Pravega,而不需要考虑额外的数据备份工作。

Pravega 的持久化详细内容请参阅本系列之前的文章:

https://www.infoq.cn/article/VXA51t57pKphQ3d*7ZST

一致性 Consistency

一致性是指,不管系统是否发生异常,所有的 Reader 读到的相同键值下的数据都是一致的、有序的。对 Pravega 来说,不管是 tail read 还是 catch up read 都满足一致性。

有序性 Ordering

有序性是指 Reader 读取 Event 的顺序要和 Event 被写入的顺序保持一致。对 Pravega 来说,数据与应用定义的路由键 (routing key) 一并写入 Stream,Pravega 根据路由键的哈希值将写入操作分配至不同的 Segment。Pravega 保证在路由键内部的有序性,即针对在同一个路由键的数据写入是保证有序的。Pravega 的有序性同样保证了读客户端在数据读取发生异常时,仍然能够进行有序的恢复重放。

有了以上三个特性,Pravega 得以进行有效的数据重放,保证了 at-least-once 语义。

Pravega 是如何防止数据重复的

Pravega 实现了 at-least-once 语义,但是为了要更进一步得满足 exactly-once 语义,我们还需要避免数据重复。所有的写入操作需要保证重复数据的消除,也称为幂等性 (idempotency)。由于分布式环境下,数据交互复杂导致故障的发生位置众多,实现需要考虑诸多细节,这也是 exactly-once 实现的难点。这需要从读写两方面进行控制。

Pravega 中的 Reader 每成功消费 Segment 中的一条数据后都会将数据的位置信息以 SegmentID+offset 的形式写入 State Synchronizer(可参阅 之前的文章)中进行持久化。这样的 SegmentID+offset 的对应关系构成了 Pravega 的 StreamCut(可参考http://pravega.io/docs/latest/streamcuts/),代表了当前 Stream 读取位置的状态信息。对于一个 ReaderGroup 中的所有读客户端而言,ReaderGroup 实现了 Checkpoint 机制,以 Map<Stream, StreamCut>的形式保存了 Reader 读取的所有 Stream 的一致性的状态信息。确定了一致的恢复位置,我们就保证了故障发生时,Segment 中的数据也仅被读取一次。

Pravega 的 Writer 内部自带一个 ID,在 Writer 与 Pravega 服务发生重新连接时可以通过 ID 定位到最近一次成功写入 Stream 中的 Event,Writer 会以 block 的形式将 Event 批量追加写入到 Stream 中,当 block 成功写入后,Writer 会发送一个 block end 指令,指令中包含 number of events writtenlast event number。当 Writer 与 Segment Store 之间发生断开重连时,Segment Store 通过 Writer ID 并通过握手将 last event number 传递给该 Writer,这样 Writer 就能知道应该从哪里开始发送 Event。这也是分布式系统中最常用的保证幂等性的方法之一。

然而 Pravega 中 Writer 的 ID 是无法保障永远不变的,一旦 Writer 发生异常崩溃了,新起的 Writer 将会生产一个新的 Writer ID,所以在考虑 Writer 发生故障的情况下,我们就需要将 Writer ID 与 Segment Store 解耦。

因此,Pravega 使用事务来保证数据写入时的 exactly-once。Event 将会被批量地写入事务中,这些 Event 要么同时被提交,要么同时被丢弃。

Pravega 的事务(Transaction)

接下来让我们一起来看一下 Pravega 中的事务。Pravega 提供了 Transaction API,支持事务性地数据写入,代码示例如下。

先回顾一下将普通 Event 写入 Pravega Stream 的操作:

// 创建一个 client factory, 一个 writer 然后 写入 event
try(ClientFactory clientFactory =
ClientFactory.withScope(scope, controllerURI) {
    EventStreamWriter<String> writer = clientFactory
    .createEventWriter(streamName,
    new JavaSerializer<String>(),
    EventWriterConfig.builder().build());
    writer.writeNext("Key 1", "Hello");
    writer.writeNext("Key 2", "World!");
}

接下来是将 Event 事务写入 Pravega Stream 的操作:

Transaction<String> txn = writer.beginTxn();
txn.writeEvent("Key 1", "Hello");
txn.writeEvent("Key 2", "World!");
txn.commit();

API 很简单,只需调用 beginTxn() 开启一个 Transaction,使用 commit() 方法进行事务提交即可。下面我们以一个例子进行说明其内部实现。

An image to describe post

Transaction 实现示例

如图,Stream 有 3 个 active 的 Segments,当在该 Stream 上创建 Transaction 时,该 Transaction 会从 Pravega 的元数据库 Zookeeper 中读取相应 Stream 的所有的*active* 的 Segment 集合(保证一致性),同样会分配 3 个对应的 Transaction Segment(也称为 Shadow Segment)。

当事件被写入到事务中时,它被路由并分配给与 Stream 相同的编号的 Segment(即分配给 Stream 中的 Segment 3 的事件将被分配给 Transaction 中的 Segment 3),并与 Stream 类似,事件本身将附加到事务的 Segment 的尾端。在事务处理的过程中,Transaction Segment 并不对读客户端可见,保证了隔离性。

提交事务后,所有 Transaction 的 Segment 将自动附加到 Stream 中对应的 Stream Segment,由于 Pravega 的持久化特性,数据也就被持久化了。如果事务终止,则其所有的 Transaction Segment 以及其中的数据都将从 Pravega 中删除,保证了事务操作的原子性。至此,分布式事务的 ACID 四大特性均得到满足。

熟悉 Kafka 的读者可能会发现,Pravega 的 Transaction 和 Kafka 的 Transaction 的实现方式不同,Pravega 通过副本的方式创建 Transaction Segment,直到 Transaction Segment 提交合并入真正的 Segment 中后,Reader 才能开始消费 Transaction 写入的数据,然而 Kafka 直接是将 Transaction 的 Event 写入 Topic 的 Partition 中,并且允许用户额外配置事务隔离级别满足不同需求。因此对比 Kafka,Pravega 存在以下优势:

  1. Kafka 中被终止 (aborted) 的 Transaction 会残留在 topic partition 中,这就导致了磁盘空间和 IO 带宽资源的浪费,而 Pravega 的 Transaction 使用了一个临时的 Segment,当 Transaction abort 之后,临时的 Segment 就将被回收。

  2. Kafka 中 Transaction 的 Event 是直接写入 partition 中的,当 Kafka 的隔离级别为 read-commit 时,Reader 在尝试读取一个 open Transaction 中的数据时会发生阻塞等待,直到 Transaction 完成 (committed 或者 aborted),而 Pravega 中,未提交的 Transaction 以一个临时的 Segment 表示,在提交成功前,Reader 端是无法感知到 Transaction Segment 的存在的,因此 Reader 不会被阻塞。

Pravega Stream 的弹性伸缩机制 会对事务产生影响,在 Writer 端负载偏高时,Segment 会相应做 split 操作,原先的 Segment 会被 seal,同时生产两个新的 Segment,细心的读者会发现这里会存在问题,倘若 Writer 正在往 Transaction 的 Transaction Segment 中写数据,如果此时 Segment 发生了 split,就会发生与 Transaction Segment 不一致的情况,当我们合并 Transaction Segment 的时候就会发现找不到相对应的 Segment。Pravega 进一步实现了 Rolling Transaction 的机制来将 Stream 的伸缩和 Transaction 解耦,让他们同时工作互不影响。有了 Rolling Transaction 的支持,用户既能享受 Pravega Stream 的弹性伸缩机制也同时能保证 exactly-once 的支持。

Pravega 的 Transaction 还包括以下 API:

API描述getTnxId()获取 Transaction UUIDflush()等待 write 操作成功ping()跟新 Transaction 的等待时间checkStatus()查询 Transaction 的状态 OpenCommittingCommittedAbortingAbortedcommit()提交abort()中断 Transaction 并且丢弃所有 Events

Pravega 与 Flink 的端到端 exactly-once 语义

设想文章最初提到的 ETL 场景,一个读客户端应用首先从 Pravega 中读取到数据 A,对数据进行处理 F(A),此时如果在执行 F(A) 时读客户端应用发生故障重启,因为 A 已经被成功消费过了,State Synchronizer 中的元信息已经更新,恢复之后将导致 A 数据的丢失。这一例子说明了 ETL 系统端到端的 exactly-once 仅靠流存储本身无法保证,需要配合处理端进行端到端的同步才能实现。

对于一般的端到端的 exactly-once 实现,ETL 的三个组件要分别达到如下的要求:

  1. 输入端要支持固定位置的重放

  2. 流处理系统的容错处理保证任务只产生 exactly-once 的输出

  3. 输出端要有事务性的支持

为了解决这个问题,Pravega 团队与 Apache Flink 展开了深入的合作,实现了 Pravega Flink Connector 连接器与 Flink 通信,使得用户能够在 Flink 中调用 API,对 Pravega 进行数据的读写,更多详细内容可以期待 Pravega 系列之后的一篇文章。在这篇文章中,我们将重点介绍:如何配合 Pravega 提供的 Checkpoint 和事务机制以及 Flink 提供的 Checkpoint,实现 Pravega->Flink->Pravega 的 exactly-once。

熟悉 Flink 的读者应该知道,Flink 在 1.4.0 版本之前通过 Flink 的 Chandy-Lamport 算法实现的 Checkpoint 机制,做到了 Flink 应用内部的 exactly-once 语义。然而,使用 Checkpoint 也意味着当时的 exactly-once 的应用是有条件的,只有每条消息对于 Flink 应用状态的影响有且只有一次才能满足 exactly-once,例如数据流过 Flink 直接写入到数据库中的无状态应用是无法保证 exactly-once 的。Flink 在 1.4.0 版本引入了 TwoPhaseCommitSinkFunction,通过两阶段提交(2PC)协议对接事务性写客户端解决了写入的幂等性问题,从而支持了 Kafka 0.11+ 以及 Pravega 作为输入输出端的端到端的 exactly-once 语义。

整体结构

从整体设计来看,Flink 和 Pravega 实现端到端 exactly-once 有以下四个步骤:

  1. 当 Pravega 作为 Source 时,Flink 的每个 Checkpoint Event 会触发 Pravega ReaderGroup 的 Checkpoint

  2. 当 Pravega 作为 Sink 时,每两个 Flink Checkpoint 之间,会创建一个 Pravega Transaction

  3. 当 Checkpoint 完成时,提交 Transaction 到 Pravega Stream

  4. Flink 异常恢复后,尝试重新提交 pending 状态的 Transaction 或是恢复到最新 Checkpoint

Pravega 的 PravegaFlinkWriter 并没有直接继承 Flink 提供的 TwoPhaseCommitSinkFunction,而是继承了其父类 RichSinkFunction,从而用统一的入口支持了 exactly-once 和其它的语义。 PravegaWriterMode 中提供了三种写入模式: BEST_EFFORT, ATLEAST_ONCE 以及 EXACTLY_ONCE。其中的 EXACTLY_ONCE 模式即使用本文描述的事务性写入。代码如下:

protected AbstractInternalWriter createInternalWriter() {
Preconditions.checkState(this.clientFactory != null, "clientFactory not initialized");
    if (this.writerMode == PravegaWriterMode.EXACTLY_ONCE) {
        return new TransactionalWriter(this.clientFactory);
    } else {
        ExecutorService executorService = createExecutorService();
    return new NonTransactionalWriter(this.clientFactory, executorService);
    }
}

EXACTLY_ONCE 模式的实现依然遵循 Flink Checkpoint 的两阶段提交协议,在 Flink 本地算子快照时提交本地事务(进行 pre-commit 一阶段提交),通过 Flink JobManager 协调完成投票,当一切正常时,通知各算子完成 Checkpoint,并最终提交事务。若存在失败则 Checkpoint 也将失败,视具体情况进行相应的处理,确保数据仅处理一次。

值得一提的是,投票处理的过程是异步进行的,不会影响正常的数据读写线程,对整体处理性能的影响较小。

故障恢复

针对其中的 4,由于异常同样可能在流处理进行的各阶段发生,接下来将具体介绍以下三种情况出现异常时的处理方法。

Flink 写入 Event 时发生异常

An image to describe post

Flink 写入 Event 时发生异常

上图中,Checkpoint-1 与 Checkpoint-2 均成功完成,但在 Checkpoint-2 之后的 Event 往 Transaction-3 中写的时候发生了异常。此时,Flink 将从 Checkpoint-2 恢复,这时候由于没有 Checkpoint-3 因此 Transaction-3 也不会被提交到 Pravega Stream 中,因此 Flink 程序就可以从 Checkpoint-2 恢复重新在新的 Transaction 内写 Event,不会发生数据重复,保证了从 Flink 到 Pravega 的 exactly-once

Flink 本地快照时发生异常

An image to describe post

Flink 本地快照时发生异常

上图中,Checkpoint-1 成功,当在 Checkpoint-2 的时候由于有些算子快照时失败。此时,程序从 Checkpoint-1 恢复,没有成功提交的 Transaction-2 和 Transaction-3 被丢弃,保证了 exactly-once。

Flink Checkpoint 成功,但是 Transaction 提交失败

An image to describe post

Transaction 提交发生异常

上图中,Checkpoint-2 成功,但在 Transaction-2 往 Pravege Stream 中提交的时候由于网络原因提交失败,这种情况下无需让 Flink 恢复到 Checkpoint-1,而是只需重新提交 Transaction-2 即可,如下图:

An image to describe post

重新提交恢复

实例展示

接下来我们通过一个实际的例子,展示并验证 Flink 写入 Pravega 的 exactly-once。

  1. 首先我们保证 Pravega 的运行。Pravega 可以选择部署 standalone 版本,具体步骤可以参考 之前的文章。Flink 可以按照官网推荐步骤进行部署,然后提交任务 Jar 包运行。也可以不进行部署,Flink 会自动创建本地的 ExecutionEnvironment 运行,本例选择后者。

  2. 下载并构建 pravega-sample:

    $ git clone https://github.com/pravega/pravega-samples
    $ cd pravega-samples
    $ ./gradlew installDist
    $ cd flink-connector-examples/build/install/pravega-flink-examples
    

该目录的 bin/ 下有两个程序, exactlyOnceWriterexactlyOnceChecker。

exactlyOnceWriter 是一个 Flink 应用,会生成一组整数的 Events(默认为 1~50)写入 Pravega,并且会在 26 位置处制造一个人为的模拟异常,同时会以 100 毫秒为周期进行 Checkpoint 操作,如果出现异常应用就从最近 Checkpoint 恢复。

exactlyOnceChecker 应用则是一个简单的 Pravega Reader 应用,检测写入 Pravega 中的数据否重复。

  1. 我们首先在一个命令行窗口中启动 exactlyOnceChecker

    $ bin/exactlyOnceChecker --scope examples --stream mystream --controller tcp://localhost:9090
    
  2. 然后在另一个窗口中运行 exactlyOnceWriter 开始往 Pravega 中写数据,我们先不开启 EXACTLY_ONCE 模式,将 --exactlyonce 参数设置为 false,此时默认为 ATLEAST_ONCE 模式。

    $ bin/exactlyOnceWriter --controller tcp://localhost:9090 --scope examples --stream mystream --exactlyonce false
    

观察 exactlyOnceWriter 的输出,会产生类似如下内容:

...
Start checkpointing at position 6
Complete checkpointing at position 6
Artificial failure at position 26
...
Restore from checkpoint at position 6
Start checkpointing at position 50
Complete checkpointing at position 50
...

我们发现第一次 Checkpoint 在 6 的位置成功,同时数据继续往 Pravega Stream 中写入,直到 26 的位置,我们模拟了一个 Flink Transaction 的异常,导致了应用从最近的 Checkpoint 点恢复,程序又开始从 6 位置继续往 Pravega 中写数据直到结束。由于没有开启 Pravega 的 EXACTLY_ONCE 模式,7~26 的数据就会重复写入 Pravega。

再观察 exactlyOnceChecker 的输出,的确监测到了数据重复

============== Checker starts ===============
Duplicate event: 8
Duplicate event: 7
...
Duplicate event: 23
Duplicate event: 24
Found duplicates
============== Checker ends ===============
  1. 现在以 EXACTLY_ONCE 的模式重新运行 exactlyOnceWriter

    $ bin/exactlyOnceWriter --controller tcp://localhost:9090 --scope examples --stream mystream --exactlyonce true
    

exactlyOnceChecker 的输出如下:

============== Checker starts ===============
No duplicate found. EXACTLY_ONCE!
============== Checker ends ===============

EXACTLY_ONCE 模式下,Event 1~6 是一个 Transaction,当 Flink 标记 Checkpoint 成功后 Event 1~6 才会被提交到 Pravega Stream 中,而在 26 处 Flink Transaction 发生了异常,这时 Transaction 中的 Event 7-26 并没有成功提交到 Pravega Stream 中,而是被丢弃了,因此在 exactlyOnceWriter 从 6 的 Checkpoint 处恢复后,重新从 7 开始写入数据,从而保证了端到端的 exactly-once。

Pravega 系列文章计划

Pravega 根据 Apache 2.0 许可证开源,0.5 版本将于近日发布。我们欢迎对流式存储感兴趣的大咖们加入 Pravega 社区,与 Pravega 共同成长。本篇文章为 Pravega 系列第七篇,系列文章标题如下(标题根据需求可能会有更新):

  1. 实时流处理 (Streaming) 统一批处理 (Batch) 的最后一块拼图:Pravega

  2. 开源 Pravega 架构解析:如何通过分层解决流存储的三大挑战?

  3. Pravega 应用实战:为什么云原生特性对流存储至关重要

  4. “ToB” 产品必备特性: Pravega 的动态弹性伸缩

  5. 取代 ZooKeeper!高并发下的分布式一致性开源组件 StateSynchronizer

  6. 分布式一致性解决方案 - 状态同步器 (StateSynchronizer) API 示例

  7. Pravega 的仅一次语义及事务支持

  8. 与 Apache Flink 集成使用

作者简介

滕昱,就职于 DellEMC 非结构化数据存储部门 (Unstructured Data Storage) 团队并担任软件开发总监。2007 年加入 DellEMC 以后一直专注于分布式存储领域。参加并领导了中国研发团队参与两代 DellEMC 对象存储产品的研发工作并取得商业上成功。从 2017 年开始,兼任 Streaming 存储和实时计算系统的设计开发与领导工作。

周煜敏,复旦大学计算机专业研究生,从本科起就参与 DellEMC 分布式对象存储的实习工作。现参与 Flink 相关领域研发工作。

赵凯皓,现就职于 DellEMC,从事流存储和云原生相关的设计与开发工作。

参考资料

http://pravega.io

http://blog.pravega.io

https://github.com/pravega/pravega

https://github.com/pravega/flink-connectors

https://github.com/pravega/pravega-samples