MillWheel学习笔记

来源于Google在vldb2013发的流式计算框架论文MillWheel: Fault-Tolerant Stream Processing at Internet Scale
http://db.disi.unitn.eu/pages/VLDBProgram/pdf/industry/p734-akidau.pdf

Introduction

1. 实时计算一般可用于实时反作弊或者异常监控等场景,需要 fault tolerance & persistent state & scanlability
2. MillWheel 提供了良好的编程模型和流式计算框架,对用户屏蔽了分布式、容错等各方面 细节,保证数据的不重不丢。同时提供优雅的保存处理进度机制
3. 其他流式系统不能同时满足容错、通用性和扩展性特点。比如 S4 不能保证数据不丢,而 Streaming SQL System (比如Spark Streaming) 在应对复杂的业务场景下SQL语言表达能力不足

Motivation And  Requirements

1. 应用场景google内部的Zeitgeist系统,用来监控检索Query的流量是否突增或者突降。主要是通过模型预测值与实际值做对比。
2. Persistent Storage: 有短期的时间窗口存储需求和长期的数据存储需求
3. LowWatermarks: 用来追踪分布式环境中所有等待处理的events,可以区分数据是延迟还是真的缺失。同时也可以规避应用场景中需要按照数据的严格时间序来处理数据
4. Duplicate Prevention: 保证数据不会被重复处理 ,除了保证不会出现突增的误判外,在计费系统也是有同样的需求
5.  MillWheet 提供了数据的不重不丢机制,能够优雅处理out-of-order数据,同时提供可靠存储,系统内部计算递增的LowWatermark等特性

System Overview

1. MillWheet可以认为是一堆算子组成的DAG图。这些算子可以跑在任意数量的机器上,所以用户不需要关心load balance方面的事情.同时可以支持动态调整拓扑,而不需要重启整个集群服务
2. 数据模型是使用一个三元组表示(key,value ,timestamps) 。算子可以自身的业务逻辑来定义处理的key,value可以是任意的二进制串,timestamps用户可以填任何的数值,但是通常是事件发生的clock time, 系统内部会根据timestamps来计算LowWatermark

Core Concerts

1. Computations: 用户的逻辑运行在算子里,当数据到达后,用户的逻辑将会被触发。如果用户的逻辑需要跟外界系统交互,需要自己保证在这些系统上的操作是幂等的。应用层的处理逻辑不需要关心Key会被不同的节点并行处理。
2. Keys: Keys是系统中最主要抽象的概念,通过定义不同的key extraction function,不同的消费者可以从同一个输入流抽取出不同的key,比如在Zeitgeist系统中可以将Query作为key,但在反作弊系统可以用Cookie来作为Key
3. Streams:Streams是不同算子之间的交互机制,一个算子可订阅一个或者多个Stream,也可以发布一个或者多个Stream
4. Persistent State:   持久化存储的内容通常是byte string,用户可以通过Protobuf之类的来序列化或者反序列。后端的存储系统满足了高可用、多副本等特性(Bigtable or Spanner)等,一般的应用场景是一段时间窗口内的Counter或者不同数据流之间的Join操作
5. Low WaterMark: lwm是个时间戳,反映了数据到达算子A的时间界,而一个算子A的 lwm定义为
min(oldest work of A , low watermark of C : C outputs to A)
C是算子A的上游算子,如果A没有输入Stream, 则lwm与oldest work相同, 其中oldest work of A 是指还没A处理的内部数据的最老时间戳。
MillWheet的数据导入系统Injector在每次导入数据时会设置数据源的lwm,但是这只是个估算值。因此内部系统可能会收到一小部分时间戳较旧的数据,可以根据自身的业务逻辑选择丢失或者更新内部累积量。但这里需要注意到,即使出现了老数据,lwm仍然保证是递增。
lwm的作用在于系统可以告诉用户逻辑某个时间点之前的数据已经到齐, 为了保证lwm的准确性,在用户处理逻辑里,当每次生成一个新的record或者聚合多个record时,用户给这些records赋予的timestamp不能小于源record
6. Timers:  钩子函数,在特定的wall time 或者 lwm value被触发, 用户层可以自身的业务场景是wall time还是lwm, 比如监控系统可以使用wall time触发来在固定的时间发监控邮件,而Zeitgeist系统可以使用lwm来判断是否出现流量突降。此外用户可以选择不设置Timers, 比如判断突增时,数据延迟到达往往不会影响判断的准确性。Timer被设置后,会按照递增的timestamp顺序被触发,同时会持久化到可靠存储里

API

1. Computation API:  用户一般只是需要实现ProcessRecord和ProcessTimer两个函数, 同时框架还提供了SetTimer & ProduceRecord等函数供调用
2. Injector and Low WaterMark API:  lwm在算子框架内部会被自动计算(计算的pending work包括了in-progress and queued deliveries),用户逻辑一般不会直接访问lwm,而是通过设置record的timestamp间接影响;  Injector在G内部一般是通用的组件,通常会有不同的实例部署在不同的机器上,而Injector的lwm会将这些值给聚合起来,一般情况下会使用还没发布完的输入文件的最老创建时间作为一个实例的lwm

Fault Tolerance

1. Exactly-Once Delivery:  在算子收到一个输入时,框架层会执行以下几个操作: 1) 输入的去重 2) 业务逻辑的处理,可能会修改timers/states/productions  3) 将修改commit到后端的存储系统 4) 给上游返回ack  5) 将production发给下游
为了保证数据不丢,上游必须需要接收到Ack包后才会停止发送,不然会一直重试,因为可能会出现下游机器故障等情况。当下游接收到重复的数据包后,为了防止重复处理,系统会为所有的records在生成阶段赋予一个全局唯一的ID,在commit阶段会将已处理的id与其他状态一次原子更新到后端。此外,由于本地内存有限,在本地内存中会使用bloom-filter方式加快查询已处理过的集合。这些在后端存储系统中保存的ID集合,在确保没有发送端在重试发送后,会被系统自动清除,一般情况下是分钟级别。而由于外部系统Injector往往会发送延迟数据,所以这部分数据的ID集合清理回收会在推迟到小时级别
2. Strong-Production: 在给下游发送Production数据之前先写Production checkpoint,称之为Strong-Production,否则会出现发送不一致的风险。比如一个算子需要将一个时间窗口当前的累积量发给下游,如果在发送数据但还没收到ack包之前出现宕机,然后到重启后继续retry发送这段时间内窗口累积量发生变化,将会导致前后两次发送的数据不一致。当Production处理成功后,Production checkpoint将会被删除。
3. Weak Productions and Idempotency: 前面的Extract-Once Delivery 和 Strong-Product保证了数据和状态的不重不丢。但由于很多应用场景自身业务逻辑的操作符合幂等的性质,从资源消耗和延迟优化角度考虑,框架层可以跳过输入去重阶段而允许数据重复处理,同时允许写Production checkpoint在发送数据给下游后-框架层的处理逻辑跟之前有些差异,为了保证数据不丢,需要在接收到下游的Ack包后才会返回给上游Ack(这里自己的理解是如果能够成成功给上游返回Ack也不需要再写checkpoint了)
但对于 Weak Productions,需要注意到先接受下游Ack再返回上游Ack的机制在下游处理失败的异常场景下可能会增大整条流水线的处理延迟,因此可以在等待下游的Ack包一段时间后可以先写Production checkpoint,然后理解返回上游Ack,这样的优化机制能够取得延迟减少和总额外资源消耗减少的双重收益
4. State Manipulation: 系统中保存的状态包括了hard-state(在后端存储系统中) 和soft-state(本地内存,一般是本地cache数据或者聚合统计数据)。由于负载均衡或者机器故障等原因,有可能出现两个Worker同时处理同一份数据,然后出现重复写hard-State,造成数据不一致的风险,因此系统会为每次写操作都会绑定一个sequencer token(类似于抢占式机制,新Worker启动后会先令原有token失效), 保证对同一个key只会有一个写端。
而对于管理soft-state,同样需要保证只有一个写端(Single-Writer),考虑这样一种场景:Worker A在还没有Commit状态到后端存储前,新的Worker A-Prime启动从后端存储扫描数据同步到本地内存,然后Worker-A Commit成功然后返回上游Ack,但是A-Prime同步的之前的旧状态数据,导致出现丢失状态情况,最终会导致整个系统数据流处理混乱

System Implementation

1. MillWheel 建立在分布式环境上,每种算子可以跑在一个或者多个机器节点,Streams交互通过RPC方式
2.  由Replica Master来负责负载均衡操作,对于每种算子都将可处理的key集合划分为不同的连续区域,然后分配到不同的节点。同时会根据机器负载情况,将区域进行合并,拆分或者迁移等操作
3. 后端存储系统使用Bigtable或者Spanner,可以提供单行事务特性。对于一个key的不同的相关状态,比如Timers/Pending Productions/Persistent State都会保存到同一行里
4. 当新的worker重启后,需要从后端存储系统读取状态,然后构建本地内存的Timer Heap或者Production Checkpoint队列。为了保证状态的一致性,因此需要框架提供前面所提到的Single-Writer机制
5. 系统内部会有central authority子系统来跟踪所有的lwm并持久到可靠存储。每个进程会计算自身的lwm(包括Pending Production/Timer等数据,即是之前提到的oldest work),然后汇报给central authority。通常情况下由于本地有内存cache,进程在计算lwm时不需要访问Bigtable,因而计算效率够高。
由于每个进程处理的key的范围都是一段区域,所以进程的lwm也是先按自己的区域聚合后才汇报 到central authority,而central authority会判断是否所有的区域都齐全,若某段区域的数据没有收集则会使用上一次的计算值。最后central authority将各个进程的lwm广播到系统内部各个节点。
每个进程都会通过订阅方式获取到它上游算子的lwm数值,然后结合自身的lwm,计算出最后算子的lwm,再在适当的时候触发Timer. 同时为了保证lwm数据的一致性,每个区域的lwm更新同样会绑定一个sequencer token,类似于之前的Single-Write机制。

Evaluation

1. MillWheel在Google里用途广泛,比如生成广告主实时报表时需要Stream Join,也用于谷歌街景的图像处理等场景
2. 在应用场景中一般建议key要划分均匀防止出现了热点数据导致单台机器负载过重成为了系统整体瓶颈。如果数据出现大规模延迟,那么会造成依赖于lwm的Timer迟迟不能被触发,本地内存不能及时计算完后刷到后端存储系统(因为框架层为了性能优化,避免每一次都从后端存储系统读取数据,会在本地内存保存一段窗口内的数据用于统计),最终将会导致系统整体内存出现上涨风险。一般情况下都是限制injector再导入新的数据直到lwm恢复到正常。

 

One thought on “MillWheel学习笔记

  1. 动机中是不是包括高实时性?

    lwm的设计很妙啊!如果用户无意中违反了增加timestamp的原则,能被发现吗?会造成一个隐蔽的bug?

    算子是不是就是为了实现分布式计算而设计的?常见的算子都有什么呢?各种join是吗?

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s