Photon学习笔记

来源于Google在sigmod2013发表的一篇Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams http://www.mpi-sws.org/~areznich/files/photon-sigmod13.pdf
主要是描述了广告系统中展现流和点击流如何实时Join的问题

Introduction

1.  对于Google Adwords这类的商业系统而言,能够近实时的反馈广告主的预算消费等多维度的统计数据,能够使广告主及时知道调整投放策略后的效果以及允许Google内部系统更有效地优化budget-Control策略
2.  Photon会从不同数据中心的GFS读入Query Log 和 Click Log后,根据query id Join后生成新的log写到不同数据中心的GFS上
3. 需要注意到单独的展现流或者点击流是不能包含报表统计所需要的所有数据的。一种解决方式是将展现信息带到点击串中,但这样会存在不少问题:一来点击串带上额外过多的数据会增大响应延迟和降低用户体验,二来点击URL是有长度限制的,限制了我们所能够带的数据量的大小

Problem Statement

1.  解决的问题:有两条持续增长的数据流,在primary log里每条日志有唯一的identifier,而foreign log里也带有同样的identifier用来与primary log做Join操作
2.  在传统数据库里,上述场景类似于foreign table 根据外键与primary table做inner Join,点击流可以类似于foreign table,展现流类似于primary table,query_id作为Join Key

System Challenges

1. 数据的不重不丢:重复数据将会导致广告主重复计费,丢失数据将会使Google收入减少。Photon保证99.99999%的数据会在秒级别被Join到输出,100%的数据会在小时级别后被Join到。因此Proton需要满足:在任意时候最多只处理一次的语义、准实时的处理一次语义以及最终的不重不丢语义
2. 数据中心级别的自动容错机制:数据中心服务有时候会中断服务,有计划内也有计划外,比如软件升级或者机房异常断电,这个中断服务的时间有可能是分钟级别,也有可能达到了数周的级别。因此在出现故障后,很难决定是在其他数据中心迁移服务还是等待原服务恢复正常,这会极大降低系统的可用性。GFS这类系统在小规模机器故障时能够运行良好,但是在设计时候没有考虑数据中心级别的容错,因此需要应用层用户考虑更多的事情。考虑到Proton的可用性是会直接影响到公司的收入,因此Proton需要数据中心级别的自动容错机制
3. 高可扩展性:Proton不只是要能够分钟数百万的数据,还需要能够应对未来流量的持续增长
4. 低延迟:Proton的输出用于为广告主生成报表,更低的延迟能够便于广告主更有效的优化预算投放等操作
5. 非严格时序的数据流: 展现数据流是根据query timestamp基本有序到达的,但点击数据有天然延迟,不一定是按照query timestamp有序到达,因此使用Window Join的方式比较困难
6. 展现数据流的延迟:虽然展现行为是早于点击行为,但由于展现数据量比点击数据量大同时是分布在不同数据中心等各种原因,展现log不一定先于点击log到达Proton。因此需要处理这种延迟情况

Paxos-Based Id Registry 

1. 在分布式系统中,提供可靠性最直接的方式就是增加副本。类似于线上检索系统,为了提供数据中心级别的可靠性,往往会在不同的数据中心部署同样的服务。Proton也会在不同的数据中心部署相同的worker,尝试去Join同样的输入,但是会保证每个输入input event只会被处理一次。不同worker之间会共享存储在Id Registry上的全局状态:最近N天内已处理过click_id的集合,N的选择取决于资源消耗和丢失数据影响面的trade-off。需要注意到当系统遇到延迟到达N天之前的数据,由于没有办法去判断这批数据是否已经被Join过,为了防止重复计费,所以需要选择丢弃不处理
2. 在输出Join后的数据前,每个worker需要先验证对应的click id是否已经在Id Registry,如果已在则丢弃数据,否则需要尝试去往Id Registry写入click id。只有当写成功后,才能够输出Join后的日志。Id Registry需要保证click id一旦被写入,后续的写同一个click id的操作都会失败。
3.  Id Registry需要提供数据中心级别的容错,多个副本的数据在不同的数据中心实时同步,可以容忍可配置的N个数据中心级别的服务停止。同时要保证写操作能够成功当且仅当对应的click id在Id Registry不存在。

Id-Registry Sever Architecture

1. Id Registry是建立在PaxosDB基础上: 一个内存型的key-value存储,通过Paxos协议来保证多副本之间的一致性,之外还提供了read-modify-write transaction来保证同样的key只有一次写入会成功
2.  PaxosDB保证副本是按照同样的顺序去处理submitted values,  同时在任意时刻只有一个副本能够成为master,也只有master能提交更新到Paxos. 如果master出现故障,PaxosDB会自动选举出一个新的Master
3.  Id Registry Server会有RPC Handler Thread接收client端请求,然后放到内存队列中,同时会有另外background 线程从队列中取出请求,往PaxosDB执行事务操作,并返回结果给Client

Scalable Id-Registry

1. Id Registry提供了数据中心级别的容错,一个commit操作需要经过分布在不同数据中心的replica agree后才能提交,而由于跨数据中心的网络交互延迟在100ms左右,因此会限制 Id Registry的吞吐只能是10个请求每秒,  这是远小于系统的预期。虽然可以通过client批量提交请求来缓解,但由于client端太多,这种机制不能彻底的解决问题。Id Registry使用了Server-Side Batch 和 shading的方式来解决扩展性问题
2. Sever-Side Batch: 系统内部处理的延迟其实主要来自于跨数据中心拓扑带来的交互延迟,而不是在于数据量的大小。因此Id Registry的background线程可以从队列里取出多个请求并作为一个事务提交(类似于Group Commit)。由于一批请求里可能会有对同一个key的写操作,因此Id Registry也提供了多行事务的机制,保证第一个写入才会返回Client成功,其他写入会返回失败。由于一个click id不会超过100个字节,因此background线程一次可以batch上千个请求来提交
3. Sharding: 由于Click id之间的处理是独立的,因此不同的click id可以由不同的Server来处理。当存储数据出现增长时,Id Registry需要支持动态扩容, 只是简单扩shard的方式会使得取模分库的方式出现错误,因为同一个click id会在扩容前后分布在不同的库上,导致有可能重复输出。因此需要一个确定性的mapping机制,在保证能够动态扩容的同时又可以向后兼容来保证数据的一致性。Id Registry使用了time stamp-based shading Configuration机制来解决这个问题。
每一个click id都会被赋予一个时间戳,在扩容期间,给定一个时间间隔S(需要保证当时处处理的click id之间的时间戳之差不会超过S),如果当前时间是T,那么时间戳小于T+S的数据mod旧的分库数,而时间戳大于T+S的数据mod新的分库数。这个sharding配置将会村存储在PaxosDB, 被所有的Id Registry Server和Client端共享。另外,缩容的时候可以采用一样的方式来处理
4. Deleting Old Keys: Id Registry只保留N天的数据,N称为garbage collection threshold。为了删除过期数据,每个库的master实例会有回收线程周期性扫描过期的数据并删除,同时为了保证master切换时的时间同步问题,garbage collection boundary times tamp 会存储在Id Registry里,同时每个实例会有另外一个线程利用TrueTime机制周期性更新times tamp ,保证不会出现时间戳回退现象。如果Client尝试查询或者删除过期数据,Server会返回一个特殊的错误码,让Client跳过当前数据的处理。

Single Datacenter Pipeline

1. Photon在每个数据中心的Pipeline大致会分为以下几个部分:dispatcher负责读取click log并分发请求到Joiner,EventStore提供高效查询Query log机制,Joiner负责从EventStore度读取query log与当前的click log做Join,并利用Id Registry去重和输出Join后的结果。整体处理流程会分为以下几步:
1)  Dispatcher从GFS持续读取日志,并查询Id Registry是否对应的click id已被处理过。如果被处理过,则跳过当前的点击日志
2) 如果没有被处理过,则异步地分发到Joiner并等待结果。如果Joiner处理失败(比如网络或者对应的Query log不存在) ,Dispatcher会在一定周期后再分发到其他Joiner
3) Joiner会从click log抽取出Query Id,并在EventStore查找对应的Query log
4) 如果查找不到,Joiner会返回失败给Dispatcher. 如果查找成功,则Joiner尝试往Id Registry插入click id
5) 如果click id如果已经存在于Id Registry,则Joiner认为对应的click log已经被Join。如果能够成功写入,则会Join完query log和click log后写出到GFS

Unique Event Id Generation

1. 由于会有成千上万的线上Server部署在全世界,因此能够快速生成一个全局唯一的ID对系统的处理延迟是非常重要的, 而每个Server独立生成ID的机制是满足这个特点
2.  Event Id是个三元组: Server IP + Process IP + Timestamp。每个Server会基于当前的时间为每个event生成递增的Times tamp。由于Timestamp的生成会收到时钟频率的影响,为了保证正确性,还需要有个后台线程周期性调用TrueTime API同步时钟

 Dispatch: Ensuring At-Least-Once Semantics

1. Dispatch会周期性扫描日志目录里的成千上万的日志文件,监控是否有新的日志产生。为了提供并发度,Dispatch会有多个worker并行去处理不同的日志文件,同时将各自的处理进度保存在GFS。在发送log event到Joiner之前,会先在Id Registry查询click id验证对应的日志event是否已经处理过,这个优化技巧可以极大的提高系统处理性能
2.  当Dispatch接收不到Joiner的返回处理成功的消息,会将click日志保存在GFS并稍后尝试。为了减少由于连续处理失败(比如网络拥塞)引起的重试比例,Dispatch会使用类似于指数退避的算法。如果Joiner重试了多次仍然失败,并且click log的时间老于某个阈值,Joiner会将日志表示unjoinable,并返回成功给Dispatch
3.  当一个数据中心停止服务后,这个不会影响Photon服务的可用性,因为至少有另外一套在其他数据中心并行运行.当数据中心服务恢复后,Dispatch会读取进度状态重新开始发送日志,由于大多数日志在其他Pipeline已经被处理过,所以Dispatch会在Id Registry查询到这批日志已经被处理过,将可以跳过这些日志并会很快追上当前进度。Dispatch将会采取流控策略保证不会给Id Registry带来过大压力,同时能够保证足够快的恢复速度。如果进度文件已经损坏,可以人工清空状态并从最新文件开始处理,这需要其他Pipeline保证在此之间一直没有停止服务

Joiner

1. Joiner在EventStore查找不到Query log或者自身压力过大时会返回处理失败给Dispatch。当能够查找到Query log时,Joiner会将展现日志和点击日志传递给一个日志处理的lib库Adaptor,Adaptor会做过滤、拼接或者用户自定义操作等,将业务逻辑从Joiner分离能够极大提高Joiner架构的灵活性
2. 当Joiner往 Id Registry插入click id失败时(由于之前有click id插入成功),则Joiner会丢弃这条日志,这种情况被称为wasted Join,  由于至少有两个数据中心在同时运行,因此同一条日志在系统内部至少被处理两次,为了减少wasted Join的比例,Dispatch在下发数据前会先有查询操作。与Dispatch不同,Joiner没有任何状态,因此Dispatch可以采取均衡策略将请求分发到任意的Joiner
3. Joiner在写Id Registry和写日志数据到GFS两个操作之间的非事务性特性会导致数据出现丢失风险。一般可以分为两种情况
1) 其中一种情况是Joiner超时或者没有由于网络原因没有接收到Id Registry的返回结果。当重新发送请求时,由于click id已经存在将会导致Id Registry返回处理失败,Joiner将会丢失这条日志。为了解决这个问题,当Joiner commit一个click id到Id Registry时,同时发送一个全局唯一的token(包括ip + 进程号 + 时间戳)到Id Registry。Id Registry会同时将click id和token一起保存。当Joiner接收不到Id Registry的返回结果重试时,会带上同样的token,当Id Registry检测到token一致时将会返回成功给Joiner
2) 另外一种情况是写Id Registry成功,但在写日志数据到GFS之前Joiner出现宕机。这种情况将会导致click log不能再被其他Joiner Commit,因为click log所对应的token已经不属于任何一个Joiner。为了减少这种情况的影响面,每个Joiner会限制当前要被处理的请求个数,一旦达到上限,将会返回失败给Dispatch。
4. 丢失数据的场景还会有logs Storage丢失数据或者软件bug之类,Proton提供了一个校验系统来作为系统的数据一致性保证。检验系统会读取原始输入click log,并检查是否在Join后的输出文件里,如果发现不在输出文件,但是对应的click id在Id Registry,校验系统将读取tokon中的数据来判断对应的Joiner是否已经宕机或者重启,如果发现Joiner已经宕机或者重启,检验系统将会在Id Registry删除对应的click id并将原始的click log重注入到Dispatch。由于检验系统只是需要读取丢失数据的token,所以可以在Id Registry里删除已经输出的数据的token,通过这样的方式可以减少系统所需要保存的数据量。

EventStore

1.  EventStore主要是提供了根据query id返回query log的服务,在Photon系统中提供了两种EventStore的实现,一种是CacheEventStore,主要利用event的时间局部性(大部分点击行为都是在展现后短时间内发生),另外一种是LogEventsStore,主要利用query log在日志文件中是近似按照timestamp有序保存
2. CacheEventStore是一个内存型cache,与分布式memcache相似,key是query id,value是query log,sharding规则是根据query id做一致性hash,cache的过期时间是分钟级别。其中CacheEventStore极大优化了读取延迟以及减少在LogEventsStore的随机IO读写
3.  LogEventsStore会通过读取保存在BigTable里的log file map来获得对应的query log在原始日志文件里的大致位置,从近似位置开始store会顺序读取一小部分数据来找到确切的数据。log file map里面保存了query id与文件路径及偏移的映射关系,EventReader会按照固定的间隔,比如每隔W秒或者M Bytes数据就会往log file map添加新的记录,EventReader同时是会往CacheEventStore填充数据。Bigtable提供了Range Query机制,因此Store会在Bigtable里扫描key在ServerIp:ProcessId:Timestamp-W到ServerIp:ProcessId:Timestamp之间的数据,只要reader按照固定周期更新log file map,就可以保证总可以通过一个记录找到实际的log

Performance Result

1. IdRegistry部署在三个地域的五个数据中心,每个地域最多有两个副本。由于只要有3/5的副本投票成功便可以commit操作,因此可以容忍一个地域的服务出现故障
2.  Dispatcher之类的服务会部署美国的东西海岸,同时会与log datacenters物理上很接近
3.  IdRegistry会有上千个分库在每个数据中心,同时在每个数据中心,Photon系统会有上千个Dispatcher&Joiner服务。Photon每天能够T级别的点击数据和几十个T级别的展现数据
4. 从读取原始日志到生成Join后日志,90%的处理延迟小于7s,之外还有cache命中率高、较好的数据中心级别的自动容错机制、吞吐能力强等特性

Design Lessons

1. 基于Paxos建立一个分布式系统,需要减少保存在Paxos上的元数据, 因为在不同数据中心之间同步数据的消耗是非常昂贵的
2. 为IdRegistry提供动态扩容机制能够极大系统的可扩展性
3. 通过RPC交互方式相比于数据通过磁盘交互能够极大降低延迟,但是应用层需要处理RPC失败或者重试等场景。同时为了避免系统的过载,让Dispatcher来负责整体的重试机制,能够使得其他worker通过RPC交互极其灵活
4.  模块的单一职责能够使的系统更具有可扩展性,在旧系统里Joiner和LogEventsStore的功能合并在同一个Worker,同时还CacheEventStore,这极大限制了系统的处理能力
5. 其他类似的系统是将多个event作为一个batch来处理,但是这些系统运行稳定要依赖于在一个batch里的所有event都能够同时处理成功。由于在Photon里的应用场景里,展现有可能延迟于点击,因此除非所有query log都已经到达,否则一个batch是不能commit的。因此在设计时需要根据具体的场景来考虑是否需要batch

Future Work

1. 将Phonton扩展成能够处理多流的Join,以及进一步降低处理的延迟:通过server直接发送query event和click event到Joiner,而不需要Dispatcher等日志拷贝到数据中心,这能够使大多数的数据得到最快的处理,剩下的missing event则交给原有的logs-base系统来处理

MillWheel学习笔记

来源于Google在vldb2013发的流式计算框架论文MillWheel: Fault-Tolerant Stream Processing at Internet Scale
http://db.disi.unitn.eu/pages/VLDBProgram/pdf/industry/p734-akidau.pdf

Introduction

1. 实时计算一般可用于实时反作弊或者异常监控等场景,需要 fault tolerance & persistent state & scanlability
2. MillWheel 提供了良好的编程模型和流式计算框架,对用户屏蔽了分布式、容错等各方面 细节,保证数据的不重不丢。同时提供优雅的保存处理进度机制
3. 其他流式系统不能同时满足容错、通用性和扩展性特点。比如 S4 不能保证数据不丢,而 Streaming SQL System (比如Spark Streaming) 在应对复杂的业务场景下SQL语言表达能力不足

Motivation And  Requirements

1. 应用场景google内部的Zeitgeist系统,用来监控检索Query的流量是否突增或者突降。主要是通过模型预测值与实际值做对比。
2. Persistent Storage: 有短期的时间窗口存储需求和长期的数据存储需求
3. LowWatermarks: 用来追踪分布式环境中所有等待处理的events,可以区分数据是延迟还是真的缺失。同时也可以规避应用场景中需要按照数据的严格时间序来处理数据
4. Duplicate Prevention: 保证数据不会被重复处理 ,除了保证不会出现突增的误判外,在计费系统也是有同样的需求
5.  MillWheet 提供了数据的不重不丢机制,能够优雅处理out-of-order数据,同时提供可靠存储,系统内部计算递增的LowWatermark等特性

System Overview

1. MillWheet可以认为是一堆算子组成的DAG图。这些算子可以跑在任意数量的机器上,所以用户不需要关心load balance方面的事情.同时可以支持动态调整拓扑,而不需要重启整个集群服务
2. 数据模型是使用一个三元组表示(key,value ,timestamps) 。算子可以自身的业务逻辑来定义处理的key,value可以是任意的二进制串,timestamps用户可以填任何的数值,但是通常是事件发生的clock time, 系统内部会根据timestamps来计算LowWatermark

Core Concerts

1. Computations: 用户的逻辑运行在算子里,当数据到达后,用户的逻辑将会被触发。如果用户的逻辑需要跟外界系统交互,需要自己保证在这些系统上的操作是幂等的。应用层的处理逻辑不需要关心Key会被不同的节点并行处理。
2. Keys: Keys是系统中最主要抽象的概念,通过定义不同的key extraction function,不同的消费者可以从同一个输入流抽取出不同的key,比如在Zeitgeist系统中可以将Query作为key,但在反作弊系统可以用Cookie来作为Key
3. Streams:Streams是不同算子之间的交互机制,一个算子可订阅一个或者多个Stream,也可以发布一个或者多个Stream
4. Persistent State:   持久化存储的内容通常是byte string,用户可以通过Protobuf之类的来序列化或者反序列。后端的存储系统满足了高可用、多副本等特性(Bigtable or Spanner)等,一般的应用场景是一段时间窗口内的Counter或者不同数据流之间的Join操作
5. Low WaterMark: lwm是个时间戳,反映了数据到达算子A的时间界,而一个算子A的 lwm定义为
min(oldest work of A , low watermark of C : C outputs to A)
C是算子A的上游算子,如果A没有输入Stream, 则lwm与oldest work相同, 其中oldest work of A 是指还没A处理的内部数据的最老时间戳。
MillWheet的数据导入系统Injector在每次导入数据时会设置数据源的lwm,但是这只是个估算值。因此内部系统可能会收到一小部分时间戳较旧的数据,可以根据自身的业务逻辑选择丢失或者更新内部累积量。但这里需要注意到,即使出现了老数据,lwm仍然保证是递增。
lwm的作用在于系统可以告诉用户逻辑某个时间点之前的数据已经到齐, 为了保证lwm的准确性,在用户处理逻辑里,当每次生成一个新的record或者聚合多个record时,用户给这些records赋予的timestamp不能小于源record
6. Timers:  钩子函数,在特定的wall time 或者 lwm value被触发, 用户层可以自身的业务场景是wall time还是lwm, 比如监控系统可以使用wall time触发来在固定的时间发监控邮件,而Zeitgeist系统可以使用lwm来判断是否出现流量突降。此外用户可以选择不设置Timers, 比如判断突增时,数据延迟到达往往不会影响判断的准确性。Timer被设置后,会按照递增的timestamp顺序被触发,同时会持久化到可靠存储里

API

1. Computation API:  用户一般只是需要实现ProcessRecord和ProcessTimer两个函数, 同时框架还提供了SetTimer & ProduceRecord等函数供调用
2. Injector and Low WaterMark API:  lwm在算子框架内部会被自动计算(计算的pending work包括了in-progress and queued deliveries),用户逻辑一般不会直接访问lwm,而是通过设置record的timestamp间接影响;  Injector在G内部一般是通用的组件,通常会有不同的实例部署在不同的机器上,而Injector的lwm会将这些值给聚合起来,一般情况下会使用还没发布完的输入文件的最老创建时间作为一个实例的lwm

Fault Tolerance

1. Exactly-Once Delivery:  在算子收到一个输入时,框架层会执行以下几个操作: 1) 输入的去重 2) 业务逻辑的处理,可能会修改timers/states/productions  3) 将修改commit到后端的存储系统 4) 给上游返回ack  5) 将production发给下游
为了保证数据不丢,上游必须需要接收到Ack包后才会停止发送,不然会一直重试,因为可能会出现下游机器故障等情况。当下游接收到重复的数据包后,为了防止重复处理,系统会为所有的records在生成阶段赋予一个全局唯一的ID,在commit阶段会将已处理的id与其他状态一次原子更新到后端。此外,由于本地内存有限,在本地内存中会使用bloom-filter方式加快查询已处理过的集合。这些在后端存储系统中保存的ID集合,在确保没有发送端在重试发送后,会被系统自动清除,一般情况下是分钟级别。而由于外部系统Injector往往会发送延迟数据,所以这部分数据的ID集合清理回收会在推迟到小时级别
2. Strong-Production: 在给下游发送Production数据之前先写Production checkpoint,称之为Strong-Production,否则会出现发送不一致的风险。比如一个算子需要将一个时间窗口当前的累积量发给下游,如果在发送数据但还没收到ack包之前出现宕机,然后到重启后继续retry发送这段时间内窗口累积量发生变化,将会导致前后两次发送的数据不一致。当Production处理成功后,Production checkpoint将会被删除。
3. Weak Productions and Idempotency: 前面的Extract-Once Delivery 和 Strong-Product保证了数据和状态的不重不丢。但由于很多应用场景自身业务逻辑的操作符合幂等的性质,从资源消耗和延迟优化角度考虑,框架层可以跳过输入去重阶段而允许数据重复处理,同时允许写Production checkpoint在发送数据给下游后-框架层的处理逻辑跟之前有些差异,为了保证数据不丢,需要在接收到下游的Ack包后才会返回给上游Ack(这里自己的理解是如果能够成成功给上游返回Ack也不需要再写checkpoint了)
但对于 Weak Productions,需要注意到先接受下游Ack再返回上游Ack的机制在下游处理失败的异常场景下可能会增大整条流水线的处理延迟,因此可以在等待下游的Ack包一段时间后可以先写Production checkpoint,然后理解返回上游Ack,这样的优化机制能够取得延迟减少和总额外资源消耗减少的双重收益
4. State Manipulation: 系统中保存的状态包括了hard-state(在后端存储系统中) 和soft-state(本地内存,一般是本地cache数据或者聚合统计数据)。由于负载均衡或者机器故障等原因,有可能出现两个Worker同时处理同一份数据,然后出现重复写hard-State,造成数据不一致的风险,因此系统会为每次写操作都会绑定一个sequencer token(类似于抢占式机制,新Worker启动后会先令原有token失效), 保证对同一个key只会有一个写端。
而对于管理soft-state,同样需要保证只有一个写端(Single-Writer),考虑这样一种场景:Worker A在还没有Commit状态到后端存储前,新的Worker A-Prime启动从后端存储扫描数据同步到本地内存,然后Worker-A Commit成功然后返回上游Ack,但是A-Prime同步的之前的旧状态数据,导致出现丢失状态情况,最终会导致整个系统数据流处理混乱

System Implementation

1. MillWheel 建立在分布式环境上,每种算子可以跑在一个或者多个机器节点,Streams交互通过RPC方式
2.  由Replica Master来负责负载均衡操作,对于每种算子都将可处理的key集合划分为不同的连续区域,然后分配到不同的节点。同时会根据机器负载情况,将区域进行合并,拆分或者迁移等操作
3. 后端存储系统使用Bigtable或者Spanner,可以提供单行事务特性。对于一个key的不同的相关状态,比如Timers/Pending Productions/Persistent State都会保存到同一行里
4. 当新的worker重启后,需要从后端存储系统读取状态,然后构建本地内存的Timer Heap或者Production Checkpoint队列。为了保证状态的一致性,因此需要框架提供前面所提到的Single-Writer机制
5. 系统内部会有central authority子系统来跟踪所有的lwm并持久到可靠存储。每个进程会计算自身的lwm(包括Pending Production/Timer等数据,即是之前提到的oldest work),然后汇报给central authority。通常情况下由于本地有内存cache,进程在计算lwm时不需要访问Bigtable,因而计算效率够高。
由于每个进程处理的key的范围都是一段区域,所以进程的lwm也是先按自己的区域聚合后才汇报 到central authority,而central authority会判断是否所有的区域都齐全,若某段区域的数据没有收集则会使用上一次的计算值。最后central authority将各个进程的lwm广播到系统内部各个节点。
每个进程都会通过订阅方式获取到它上游算子的lwm数值,然后结合自身的lwm,计算出最后算子的lwm,再在适当的时候触发Timer. 同时为了保证lwm数据的一致性,每个区域的lwm更新同样会绑定一个sequencer token,类似于之前的Single-Write机制。

Evaluation

1. MillWheel在Google里用途广泛,比如生成广告主实时报表时需要Stream Join,也用于谷歌街景的图像处理等场景
2. 在应用场景中一般建议key要划分均匀防止出现了热点数据导致单台机器负载过重成为了系统整体瓶颈。如果数据出现大规模延迟,那么会造成依赖于lwm的Timer迟迟不能被触发,本地内存不能及时计算完后刷到后端存储系统(因为框架层为了性能优化,避免每一次都从后端存储系统读取数据,会在本地内存保存一段窗口内的数据用于统计),最终将会导致系统整体内存出现上涨风险。一般情况下都是限制injector再导入新的数据直到lwm恢复到正常。