来源于Google在sigmod2013发表的一篇Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams http://www.mpi-sws.org/~areznich/files/photon-sigmod13.pdf
主要是描述了广告系统中展现流和点击流如何实时Join的问题
Introduction
1. 对于Google Adwords这类的商业系统而言,能够近实时的反馈广告主的预算消费等多维度的统计数据,能够使广告主及时知道调整投放策略后的效果以及允许Google内部系统更有效地优化budget-Control策略
2. Photon会从不同数据中心的GFS读入Query Log 和 Click Log后,根据query id Join后生成新的log写到不同数据中心的GFS上
3. 需要注意到单独的展现流或者点击流是不能包含报表统计所需要的所有数据的。一种解决方式是将展现信息带到点击串中,但这样会存在不少问题:一来点击串带上额外过多的数据会增大响应延迟和降低用户体验,二来点击URL是有长度限制的,限制了我们所能够带的数据量的大小
Problem Statement
1. 解决的问题:有两条持续增长的数据流,在primary log里每条日志有唯一的identifier,而foreign log里也带有同样的identifier用来与primary log做Join操作
2. 在传统数据库里,上述场景类似于foreign table 根据外键与primary table做inner Join,点击流可以类似于foreign table,展现流类似于primary table,query_id作为Join Key
System Challenges
1. 数据的不重不丢:重复数据将会导致广告主重复计费,丢失数据将会使Google收入减少。Photon保证99.99999%的数据会在秒级别被Join到输出,100%的数据会在小时级别后被Join到。因此Proton需要满足:在任意时候最多只处理一次的语义、准实时的处理一次语义以及最终的不重不丢语义
2. 数据中心级别的自动容错机制:数据中心服务有时候会中断服务,有计划内也有计划外,比如软件升级或者机房异常断电,这个中断服务的时间有可能是分钟级别,也有可能达到了数周的级别。因此在出现故障后,很难决定是在其他数据中心迁移服务还是等待原服务恢复正常,这会极大降低系统的可用性。GFS这类系统在小规模机器故障时能够运行良好,但是在设计时候没有考虑数据中心级别的容错,因此需要应用层用户考虑更多的事情。考虑到Proton的可用性是会直接影响到公司的收入,因此Proton需要数据中心级别的自动容错机制
3. 高可扩展性:Proton不只是要能够分钟数百万的数据,还需要能够应对未来流量的持续增长
4. 低延迟:Proton的输出用于为广告主生成报表,更低的延迟能够便于广告主更有效的优化预算投放等操作
5. 非严格时序的数据流: 展现数据流是根据query timestamp基本有序到达的,但点击数据有天然延迟,不一定是按照query timestamp有序到达,因此使用Window Join的方式比较困难
6. 展现数据流的延迟:虽然展现行为是早于点击行为,但由于展现数据量比点击数据量大同时是分布在不同数据中心等各种原因,展现log不一定先于点击log到达Proton。因此需要处理这种延迟情况
Paxos-Based Id Registry
1. 在分布式系统中,提供可靠性最直接的方式就是增加副本。类似于线上检索系统,为了提供数据中心级别的可靠性,往往会在不同的数据中心部署同样的服务。Proton也会在不同的数据中心部署相同的worker,尝试去Join同样的输入,但是会保证每个输入input event只会被处理一次。不同worker之间会共享存储在Id Registry上的全局状态:最近N天内已处理过click_id的集合,N的选择取决于资源消耗和丢失数据影响面的trade-off。需要注意到当系统遇到延迟到达N天之前的数据,由于没有办法去判断这批数据是否已经被Join过,为了防止重复计费,所以需要选择丢弃不处理
2. 在输出Join后的数据前,每个worker需要先验证对应的click id是否已经在Id Registry,如果已在则丢弃数据,否则需要尝试去往Id Registry写入click id。只有当写成功后,才能够输出Join后的日志。Id Registry需要保证click id一旦被写入,后续的写同一个click id的操作都会失败。
3. Id Registry需要提供数据中心级别的容错,多个副本的数据在不同的数据中心实时同步,可以容忍可配置的N个数据中心级别的服务停止。同时要保证写操作能够成功当且仅当对应的click id在Id Registry不存在。
Id-Registry Sever Architecture
1. Id Registry是建立在PaxosDB基础上: 一个内存型的key-value存储,通过Paxos协议来保证多副本之间的一致性,之外还提供了read-modify-write transaction来保证同样的key只有一次写入会成功
2. PaxosDB保证副本是按照同样的顺序去处理submitted values, 同时在任意时刻只有一个副本能够成为master,也只有master能提交更新到Paxos. 如果master出现故障,PaxosDB会自动选举出一个新的Master
3. Id Registry Server会有RPC Handler Thread接收client端请求,然后放到内存队列中,同时会有另外background 线程从队列中取出请求,往PaxosDB执行事务操作,并返回结果给Client
Scalable Id-Registry
1. Id Registry提供了数据中心级别的容错,一个commit操作需要经过分布在不同数据中心的replica agree后才能提交,而由于跨数据中心的网络交互延迟在100ms左右,因此会限制 Id Registry的吞吐只能是10个请求每秒, 这是远小于系统的预期。虽然可以通过client批量提交请求来缓解,但由于client端太多,这种机制不能彻底的解决问题。Id Registry使用了Server-Side Batch 和 shading的方式来解决扩展性问题
2. Sever-Side Batch: 系统内部处理的延迟其实主要来自于跨数据中心拓扑带来的交互延迟,而不是在于数据量的大小。因此Id Registry的background线程可以从队列里取出多个请求并作为一个事务提交(类似于Group Commit)。由于一批请求里可能会有对同一个key的写操作,因此Id Registry也提供了多行事务的机制,保证第一个写入才会返回Client成功,其他写入会返回失败。由于一个click id不会超过100个字节,因此background线程一次可以batch上千个请求来提交
3. Sharding: 由于Click id之间的处理是独立的,因此不同的click id可以由不同的Server来处理。当存储数据出现增长时,Id Registry需要支持动态扩容, 只是简单扩shard的方式会使得取模分库的方式出现错误,因为同一个click id会在扩容前后分布在不同的库上,导致有可能重复输出。因此需要一个确定性的mapping机制,在保证能够动态扩容的同时又可以向后兼容来保证数据的一致性。Id Registry使用了time stamp-based shading Configuration机制来解决这个问题。
每一个click id都会被赋予一个时间戳,在扩容期间,给定一个时间间隔S(需要保证当时处处理的click id之间的时间戳之差不会超过S),如果当前时间是T,那么时间戳小于T+S的数据mod旧的分库数,而时间戳大于T+S的数据mod新的分库数。这个sharding配置将会村存储在PaxosDB, 被所有的Id Registry Server和Client端共享。另外,缩容的时候可以采用一样的方式来处理。
4. Deleting Old Keys: Id Registry只保留N天的数据,N称为garbage collection threshold。为了删除过期数据,每个库的master实例会有回收线程周期性扫描过期的数据并删除,同时为了保证master切换时的时间同步问题,garbage collection boundary times tamp 会存储在Id Registry里,同时每个实例会有另外一个线程利用TrueTime机制周期性更新times tamp ,保证不会出现时间戳回退现象。如果Client尝试查询或者删除过期数据,Server会返回一个特殊的错误码,让Client跳过当前数据的处理。
Single Datacenter Pipeline
1. Photon在每个数据中心的Pipeline大致会分为以下几个部分:dispatcher负责读取click log并分发请求到Joiner,EventStore提供高效查询Query log机制,Joiner负责从EventStore度读取query log与当前的click log做Join,并利用Id Registry去重和输出Join后的结果。整体处理流程会分为以下几步:
1) Dispatcher从GFS持续读取日志,并查询Id Registry是否对应的click id已被处理过。如果被处理过,则跳过当前的点击日志
2) 如果没有被处理过,则异步地分发到Joiner并等待结果。如果Joiner处理失败(比如网络或者对应的Query log不存在) ,Dispatcher会在一定周期后再分发到其他Joiner
3) Joiner会从click log抽取出Query Id,并在EventStore查找对应的Query log
4) 如果查找不到,Joiner会返回失败给Dispatcher. 如果查找成功,则Joiner尝试往Id Registry插入click id
5) 如果click id如果已经存在于Id Registry,则Joiner认为对应的click log已经被Join。如果能够成功写入,则会Join完query log和click log后写出到GFS
Unique Event Id Generation
1. 由于会有成千上万的线上Server部署在全世界,因此能够快速生成一个全局唯一的ID对系统的处理延迟是非常重要的, 而每个Server独立生成ID的机制是满足这个特点
2. Event Id是个三元组: Server IP + Process IP + Timestamp。每个Server会基于当前的时间为每个event生成递增的Times tamp。由于Timestamp的生成会收到时钟频率的影响,为了保证正确性,还需要有个后台线程周期性调用TrueTime API同步时钟
Dispatch: Ensuring At-Least-Once Semantics
1. Dispatch会周期性扫描日志目录里的成千上万的日志文件,监控是否有新的日志产生。为了提供并发度,Dispatch会有多个worker并行去处理不同的日志文件,同时将各自的处理进度保存在GFS。在发送log event到Joiner之前,会先在Id Registry查询click id验证对应的日志event是否已经处理过,这个优化技巧可以极大的提高系统处理性能
2. 当Dispatch接收不到Joiner的返回处理成功的消息,会将click日志保存在GFS并稍后尝试。为了减少由于连续处理失败(比如网络拥塞)引起的重试比例,Dispatch会使用类似于指数退避的算法。如果Joiner重试了多次仍然失败,并且click log的时间老于某个阈值,Joiner会将日志表示unjoinable,并返回成功给Dispatch
3. 当一个数据中心停止服务后,这个不会影响Photon服务的可用性,因为至少有另外一套在其他数据中心并行运行.当数据中心服务恢复后,Dispatch会读取进度状态重新开始发送日志,由于大多数日志在其他Pipeline已经被处理过,所以Dispatch会在Id Registry查询到这批日志已经被处理过,将可以跳过这些日志并会很快追上当前进度。Dispatch将会采取流控策略保证不会给Id Registry带来过大压力,同时能够保证足够快的恢复速度。如果进度文件已经损坏,可以人工清空状态并从最新文件开始处理,这需要其他Pipeline保证在此之间一直没有停止服务
Joiner
1. Joiner在EventStore查找不到Query log或者自身压力过大时会返回处理失败给Dispatch。当能够查找到Query log时,Joiner会将展现日志和点击日志传递给一个日志处理的lib库Adaptor,Adaptor会做过滤、拼接或者用户自定义操作等,将业务逻辑从Joiner分离能够极大提高Joiner架构的灵活性
2. 当Joiner往 Id Registry插入click id失败时(由于之前有click id插入成功),则Joiner会丢弃这条日志,这种情况被称为wasted Join, 由于至少有两个数据中心在同时运行,因此同一条日志在系统内部至少被处理两次,为了减少wasted Join的比例,Dispatch在下发数据前会先有查询操作。与Dispatch不同,Joiner没有任何状态,因此Dispatch可以采取均衡策略将请求分发到任意的Joiner
3. Joiner在写Id Registry和写日志数据到GFS两个操作之间的非事务性特性会导致数据出现丢失风险。一般可以分为两种情况
1) 其中一种情况是Joiner超时或者没有由于网络原因没有接收到Id Registry的返回结果。当重新发送请求时,由于click id已经存在将会导致Id Registry返回处理失败,Joiner将会丢失这条日志。为了解决这个问题,当Joiner commit一个click id到Id Registry时,同时发送一个全局唯一的token(包括ip + 进程号 + 时间戳)到Id Registry。Id Registry会同时将click id和token一起保存。当Joiner接收不到Id Registry的返回结果重试时,会带上同样的token,当Id Registry检测到token一致时将会返回成功给Joiner
2) 另外一种情况是写Id Registry成功,但在写日志数据到GFS之前Joiner出现宕机。这种情况将会导致click log不能再被其他Joiner Commit,因为click log所对应的token已经不属于任何一个Joiner。为了减少这种情况的影响面,每个Joiner会限制当前要被处理的请求个数,一旦达到上限,将会返回失败给Dispatch。
4. 丢失数据的场景还会有logs Storage丢失数据或者软件bug之类,Proton提供了一个校验系统来作为系统的数据一致性保证。检验系统会读取原始输入click log,并检查是否在Join后的输出文件里,如果发现不在输出文件,但是对应的click id在Id Registry,校验系统将读取tokon中的数据来判断对应的Joiner是否已经宕机或者重启,如果发现Joiner已经宕机或者重启,检验系统将会在Id Registry删除对应的click id并将原始的click log重注入到Dispatch。由于检验系统只是需要读取丢失数据的token,所以可以在Id Registry里删除已经输出的数据的token,通过这样的方式可以减少系统所需要保存的数据量。
EventStore
1. EventStore主要是提供了根据query id返回query log的服务,在Photon系统中提供了两种EventStore的实现,一种是CacheEventStore,主要利用event的时间局部性(大部分点击行为都是在展现后短时间内发生),另外一种是LogEventsStore,主要利用query log在日志文件中是近似按照timestamp有序保存
2. CacheEventStore是一个内存型cache,与分布式memcache相似,key是query id,value是query log,sharding规则是根据query id做一致性hash,cache的过期时间是分钟级别。其中CacheEventStore极大优化了读取延迟以及减少在LogEventsStore的随机IO读写
3. LogEventsStore会通过读取保存在BigTable里的log file map来获得对应的query log在原始日志文件里的大致位置,从近似位置开始store会顺序读取一小部分数据来找到确切的数据。log file map里面保存了query id与文件路径及偏移的映射关系,EventReader会按照固定的间隔,比如每隔W秒或者M Bytes数据就会往log file map添加新的记录,EventReader同时是会往CacheEventStore填充数据。Bigtable提供了Range Query机制,因此Store会在Bigtable里扫描key在ServerIp:ProcessId:Timestamp-W到ServerIp:ProcessId:Timestamp之间的数据,只要reader按照固定周期更新log file map,就可以保证总可以通过一个记录找到实际的log
Performance Result
1. IdRegistry部署在三个地域的五个数据中心,每个地域最多有两个副本。由于只要有3/5的副本投票成功便可以commit操作,因此可以容忍一个地域的服务出现故障
2. Dispatcher之类的服务会部署美国的东西海岸,同时会与log datacenters物理上很接近
3. IdRegistry会有上千个分库在每个数据中心,同时在每个数据中心,Photon系统会有上千个Dispatcher&Joiner服务。Photon每天能够T级别的点击数据和几十个T级别的展现数据
4. 从读取原始日志到生成Join后日志,90%的处理延迟小于7s,之外还有cache命中率高、较好的数据中心级别的自动容错机制、吞吐能力强等特性
Design Lessons
1. 基于Paxos建立一个分布式系统,需要减少保存在Paxos上的元数据, 因为在不同数据中心之间同步数据的消耗是非常昂贵的
2. 为IdRegistry提供动态扩容机制能够极大系统的可扩展性
3. 通过RPC交互方式相比于数据通过磁盘交互能够极大降低延迟,但是应用层需要处理RPC失败或者重试等场景。同时为了避免系统的过载,让Dispatcher来负责整体的重试机制,能够使得其他worker通过RPC交互极其灵活
4. 模块的单一职责能够使的系统更具有可扩展性,在旧系统里Joiner和LogEventsStore的功能合并在同一个Worker,同时还CacheEventStore,这极大限制了系统的处理能力
5. 其他类似的系统是将多个event作为一个batch来处理,但是这些系统运行稳定要依赖于在一个batch里的所有event都能够同时处理成功。由于在Photon里的应用场景里,展现有可能延迟于点击,因此除非所有query log都已经到达,否则一个batch是不能commit的。因此在设计时需要根据具体的场景来考虑是否需要batch
Future Work
1. 将Phonton扩展成能够处理多流的Join,以及进一步降低处理的延迟:通过server直接发送query event和click event到Joiner,而不需要Dispatcher等日志拷贝到数据中心,这能够使大多数的数据得到最快的处理,剩下的missing event则交给原有的logs-base系统来处理