对照实验系统论文阅读笔记(二)

这一次我们阅读和翻译关于分层实验平台的来自Google的论文Overlapping experiment infrastructure: more, better, faster experimentation (没有找到特别官方的链接,学术搜索能搜到)。

 

Google也大量需要在线的对照实验来做决策,几乎所有的微小的改变,都需要通过实验决定。相比众多的实验,流量的数量已经成为了瓶颈。为了更高效、更准确地做更多的实验,这篇文章设计了分层实验平台。同一份流量可以同时做多个实验。

 

更多、更好、更快

 

实验平台不仅能同时运行多个实验,也要具有灵活性:不同的实验可以使用不同的配置、不同的流量大小。不正确的实验不应当被允许运行,正确但影响很差的实验应当被迅速发现并中止。实验的设置和进行应当容易并迅速,即使不是工程师也可以做(不需要写代码)。实验指标应当实时可用,这样实验就可以被迅速评估。简单的重复实验应当很容易做,最理想情况下,实验平台可以用做灰度发布(gradually ramping up)。

 

发展到这一步,不仅需要工程上的架构设计,也需要相应的工具和对使用流程的控制。

 

背景

 

这里说的分层实验平台只用在Web互联网服务上,不包括Google Apps、Chrome等(我们可以想象一下,实际上,非Web服务也可以做对照实验,但是如果要做重叠的实验则有一定的困难)。

 

与之前在(一)中提到的对照实验系统一样,做对照实验完全由参数来控制。使用参数使得实验容易创建,因为他们可以是易读的文字,不是代码。同样,数据文件比程序文件更容易快速部署。

 

在最初,Google使用了一种可以称作“单层实验平台”的架构。一个流量只能做一个实验。首先分配基于Cookie划分流量的实验,然后再划分基于随机分配流量的实验。这种架构不足以支持足够多的实验。

 

分层架构

 

layer

 

我们将参数划分为若干个子集,每个子集之内的参数是可能互相影响的,也就是说不能同时做实验的;子集之间的参数则是不会互相影响的(正交的)。这种划分可以来自于检查,也可以来自于过往实验的结果。用每一个子集对应一个抽象的“层”的概念。同时,把对流量的每一个划分抽取对应一个抽象的“领域”(domain)的概念。一个领域里可以有多层,一个层里也可以有多个领域。如图。而这个图也可以看做最宏观上Google Web的层和领域的划分。

 

虽然看起来有点复杂,但这个划分有很多好处。一个非重叠实验领域,可以方便地做影响非常大的实验(可能实验的参数跨遍各层,只能在这个领域做);它允许在两个领域中对相同的参数做不同的划分;我们也可以比较容易地把一个参数从一层中移到另一层中(如果发现这样更有利于流量的利用)。

 

注意到图(c)中增加了发布层Lanuch Layer。Lanuch Layer是一种特殊的层,它们必须运行着100%的流量(它们所在的领域称default domain,不能与其他domain纵向切分),同一个参数最多出现在一个Launch层中,同时最多出现在一个普通层中。Launch中参数的取值,可以被下面普通实验层中进行的实验的参数取值所覆盖。

 

发布层的典型应用是,每一个待发布的参数版本,新建一个发布层,而当它稳定后,删掉这个发布层。由于发布层在default domain中,因此它可以用来最终验证实验间的相互影响。

流量的划分不仅有基于cookie、random的,也会根据实验的要求有更灵活的其他方式,比如userid,query-string等。而不同的划分方式可能会在领域与层之间造成饥饿和有偏的问

题,为此,需要给每个领域和层指定流量划分方式,对已经被上层抽取造成的有偏的流量要加以标识,排除掉它们以避免有偏。

 

分层实验框架的工程架构基本就是如此,而这篇论文的后半部分还用了很大的篇幅讨论前面说到的”工具和使用流程控制”,我们在下一篇阅读笔记中讨论。

 

Photon学习笔记

来源于Google在sigmod2013发表的一篇Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams http://www.mpi-sws.org/~areznich/files/photon-sigmod13.pdf
主要是描述了广告系统中展现流和点击流如何实时Join的问题

Introduction

1.  对于Google Adwords这类的商业系统而言,能够近实时的反馈广告主的预算消费等多维度的统计数据,能够使广告主及时知道调整投放策略后的效果以及允许Google内部系统更有效地优化budget-Control策略
2.  Photon会从不同数据中心的GFS读入Query Log 和 Click Log后,根据query id Join后生成新的log写到不同数据中心的GFS上
3. 需要注意到单独的展现流或者点击流是不能包含报表统计所需要的所有数据的。一种解决方式是将展现信息带到点击串中,但这样会存在不少问题:一来点击串带上额外过多的数据会增大响应延迟和降低用户体验,二来点击URL是有长度限制的,限制了我们所能够带的数据量的大小

Problem Statement

1.  解决的问题:有两条持续增长的数据流,在primary log里每条日志有唯一的identifier,而foreign log里也带有同样的identifier用来与primary log做Join操作
2.  在传统数据库里,上述场景类似于foreign table 根据外键与primary table做inner Join,点击流可以类似于foreign table,展现流类似于primary table,query_id作为Join Key

System Challenges

1. 数据的不重不丢:重复数据将会导致广告主重复计费,丢失数据将会使Google收入减少。Photon保证99.99999%的数据会在秒级别被Join到输出,100%的数据会在小时级别后被Join到。因此Proton需要满足:在任意时候最多只处理一次的语义、准实时的处理一次语义以及最终的不重不丢语义
2. 数据中心级别的自动容错机制:数据中心服务有时候会中断服务,有计划内也有计划外,比如软件升级或者机房异常断电,这个中断服务的时间有可能是分钟级别,也有可能达到了数周的级别。因此在出现故障后,很难决定是在其他数据中心迁移服务还是等待原服务恢复正常,这会极大降低系统的可用性。GFS这类系统在小规模机器故障时能够运行良好,但是在设计时候没有考虑数据中心级别的容错,因此需要应用层用户考虑更多的事情。考虑到Proton的可用性是会直接影响到公司的收入,因此Proton需要数据中心级别的自动容错机制
3. 高可扩展性:Proton不只是要能够分钟数百万的数据,还需要能够应对未来流量的持续增长
4. 低延迟:Proton的输出用于为广告主生成报表,更低的延迟能够便于广告主更有效的优化预算投放等操作
5. 非严格时序的数据流: 展现数据流是根据query timestamp基本有序到达的,但点击数据有天然延迟,不一定是按照query timestamp有序到达,因此使用Window Join的方式比较困难
6. 展现数据流的延迟:虽然展现行为是早于点击行为,但由于展现数据量比点击数据量大同时是分布在不同数据中心等各种原因,展现log不一定先于点击log到达Proton。因此需要处理这种延迟情况

Paxos-Based Id Registry 

1. 在分布式系统中,提供可靠性最直接的方式就是增加副本。类似于线上检索系统,为了提供数据中心级别的可靠性,往往会在不同的数据中心部署同样的服务。Proton也会在不同的数据中心部署相同的worker,尝试去Join同样的输入,但是会保证每个输入input event只会被处理一次。不同worker之间会共享存储在Id Registry上的全局状态:最近N天内已处理过click_id的集合,N的选择取决于资源消耗和丢失数据影响面的trade-off。需要注意到当系统遇到延迟到达N天之前的数据,由于没有办法去判断这批数据是否已经被Join过,为了防止重复计费,所以需要选择丢弃不处理
2. 在输出Join后的数据前,每个worker需要先验证对应的click id是否已经在Id Registry,如果已在则丢弃数据,否则需要尝试去往Id Registry写入click id。只有当写成功后,才能够输出Join后的日志。Id Registry需要保证click id一旦被写入,后续的写同一个click id的操作都会失败。
3.  Id Registry需要提供数据中心级别的容错,多个副本的数据在不同的数据中心实时同步,可以容忍可配置的N个数据中心级别的服务停止。同时要保证写操作能够成功当且仅当对应的click id在Id Registry不存在。

Id-Registry Sever Architecture

1. Id Registry是建立在PaxosDB基础上: 一个内存型的key-value存储,通过Paxos协议来保证多副本之间的一致性,之外还提供了read-modify-write transaction来保证同样的key只有一次写入会成功
2.  PaxosDB保证副本是按照同样的顺序去处理submitted values,  同时在任意时刻只有一个副本能够成为master,也只有master能提交更新到Paxos. 如果master出现故障,PaxosDB会自动选举出一个新的Master
3.  Id Registry Server会有RPC Handler Thread接收client端请求,然后放到内存队列中,同时会有另外background 线程从队列中取出请求,往PaxosDB执行事务操作,并返回结果给Client

Scalable Id-Registry

1. Id Registry提供了数据中心级别的容错,一个commit操作需要经过分布在不同数据中心的replica agree后才能提交,而由于跨数据中心的网络交互延迟在100ms左右,因此会限制 Id Registry的吞吐只能是10个请求每秒,  这是远小于系统的预期。虽然可以通过client批量提交请求来缓解,但由于client端太多,这种机制不能彻底的解决问题。Id Registry使用了Server-Side Batch 和 shading的方式来解决扩展性问题
2. Sever-Side Batch: 系统内部处理的延迟其实主要来自于跨数据中心拓扑带来的交互延迟,而不是在于数据量的大小。因此Id Registry的background线程可以从队列里取出多个请求并作为一个事务提交(类似于Group Commit)。由于一批请求里可能会有对同一个key的写操作,因此Id Registry也提供了多行事务的机制,保证第一个写入才会返回Client成功,其他写入会返回失败。由于一个click id不会超过100个字节,因此background线程一次可以batch上千个请求来提交
3. Sharding: 由于Click id之间的处理是独立的,因此不同的click id可以由不同的Server来处理。当存储数据出现增长时,Id Registry需要支持动态扩容, 只是简单扩shard的方式会使得取模分库的方式出现错误,因为同一个click id会在扩容前后分布在不同的库上,导致有可能重复输出。因此需要一个确定性的mapping机制,在保证能够动态扩容的同时又可以向后兼容来保证数据的一致性。Id Registry使用了time stamp-based shading Configuration机制来解决这个问题。
每一个click id都会被赋予一个时间戳,在扩容期间,给定一个时间间隔S(需要保证当时处处理的click id之间的时间戳之差不会超过S),如果当前时间是T,那么时间戳小于T+S的数据mod旧的分库数,而时间戳大于T+S的数据mod新的分库数。这个sharding配置将会村存储在PaxosDB, 被所有的Id Registry Server和Client端共享。另外,缩容的时候可以采用一样的方式来处理
4. Deleting Old Keys: Id Registry只保留N天的数据,N称为garbage collection threshold。为了删除过期数据,每个库的master实例会有回收线程周期性扫描过期的数据并删除,同时为了保证master切换时的时间同步问题,garbage collection boundary times tamp 会存储在Id Registry里,同时每个实例会有另外一个线程利用TrueTime机制周期性更新times tamp ,保证不会出现时间戳回退现象。如果Client尝试查询或者删除过期数据,Server会返回一个特殊的错误码,让Client跳过当前数据的处理。

Single Datacenter Pipeline

1. Photon在每个数据中心的Pipeline大致会分为以下几个部分:dispatcher负责读取click log并分发请求到Joiner,EventStore提供高效查询Query log机制,Joiner负责从EventStore度读取query log与当前的click log做Join,并利用Id Registry去重和输出Join后的结果。整体处理流程会分为以下几步:
1)  Dispatcher从GFS持续读取日志,并查询Id Registry是否对应的click id已被处理过。如果被处理过,则跳过当前的点击日志
2) 如果没有被处理过,则异步地分发到Joiner并等待结果。如果Joiner处理失败(比如网络或者对应的Query log不存在) ,Dispatcher会在一定周期后再分发到其他Joiner
3) Joiner会从click log抽取出Query Id,并在EventStore查找对应的Query log
4) 如果查找不到,Joiner会返回失败给Dispatcher. 如果查找成功,则Joiner尝试往Id Registry插入click id
5) 如果click id如果已经存在于Id Registry,则Joiner认为对应的click log已经被Join。如果能够成功写入,则会Join完query log和click log后写出到GFS

Unique Event Id Generation

1. 由于会有成千上万的线上Server部署在全世界,因此能够快速生成一个全局唯一的ID对系统的处理延迟是非常重要的, 而每个Server独立生成ID的机制是满足这个特点
2.  Event Id是个三元组: Server IP + Process IP + Timestamp。每个Server会基于当前的时间为每个event生成递增的Times tamp。由于Timestamp的生成会收到时钟频率的影响,为了保证正确性,还需要有个后台线程周期性调用TrueTime API同步时钟

 Dispatch: Ensuring At-Least-Once Semantics

1. Dispatch会周期性扫描日志目录里的成千上万的日志文件,监控是否有新的日志产生。为了提供并发度,Dispatch会有多个worker并行去处理不同的日志文件,同时将各自的处理进度保存在GFS。在发送log event到Joiner之前,会先在Id Registry查询click id验证对应的日志event是否已经处理过,这个优化技巧可以极大的提高系统处理性能
2.  当Dispatch接收不到Joiner的返回处理成功的消息,会将click日志保存在GFS并稍后尝试。为了减少由于连续处理失败(比如网络拥塞)引起的重试比例,Dispatch会使用类似于指数退避的算法。如果Joiner重试了多次仍然失败,并且click log的时间老于某个阈值,Joiner会将日志表示unjoinable,并返回成功给Dispatch
3.  当一个数据中心停止服务后,这个不会影响Photon服务的可用性,因为至少有另外一套在其他数据中心并行运行.当数据中心服务恢复后,Dispatch会读取进度状态重新开始发送日志,由于大多数日志在其他Pipeline已经被处理过,所以Dispatch会在Id Registry查询到这批日志已经被处理过,将可以跳过这些日志并会很快追上当前进度。Dispatch将会采取流控策略保证不会给Id Registry带来过大压力,同时能够保证足够快的恢复速度。如果进度文件已经损坏,可以人工清空状态并从最新文件开始处理,这需要其他Pipeline保证在此之间一直没有停止服务

Joiner

1. Joiner在EventStore查找不到Query log或者自身压力过大时会返回处理失败给Dispatch。当能够查找到Query log时,Joiner会将展现日志和点击日志传递给一个日志处理的lib库Adaptor,Adaptor会做过滤、拼接或者用户自定义操作等,将业务逻辑从Joiner分离能够极大提高Joiner架构的灵活性
2. 当Joiner往 Id Registry插入click id失败时(由于之前有click id插入成功),则Joiner会丢弃这条日志,这种情况被称为wasted Join,  由于至少有两个数据中心在同时运行,因此同一条日志在系统内部至少被处理两次,为了减少wasted Join的比例,Dispatch在下发数据前会先有查询操作。与Dispatch不同,Joiner没有任何状态,因此Dispatch可以采取均衡策略将请求分发到任意的Joiner
3. Joiner在写Id Registry和写日志数据到GFS两个操作之间的非事务性特性会导致数据出现丢失风险。一般可以分为两种情况
1) 其中一种情况是Joiner超时或者没有由于网络原因没有接收到Id Registry的返回结果。当重新发送请求时,由于click id已经存在将会导致Id Registry返回处理失败,Joiner将会丢失这条日志。为了解决这个问题,当Joiner commit一个click id到Id Registry时,同时发送一个全局唯一的token(包括ip + 进程号 + 时间戳)到Id Registry。Id Registry会同时将click id和token一起保存。当Joiner接收不到Id Registry的返回结果重试时,会带上同样的token,当Id Registry检测到token一致时将会返回成功给Joiner
2) 另外一种情况是写Id Registry成功,但在写日志数据到GFS之前Joiner出现宕机。这种情况将会导致click log不能再被其他Joiner Commit,因为click log所对应的token已经不属于任何一个Joiner。为了减少这种情况的影响面,每个Joiner会限制当前要被处理的请求个数,一旦达到上限,将会返回失败给Dispatch。
4. 丢失数据的场景还会有logs Storage丢失数据或者软件bug之类,Proton提供了一个校验系统来作为系统的数据一致性保证。检验系统会读取原始输入click log,并检查是否在Join后的输出文件里,如果发现不在输出文件,但是对应的click id在Id Registry,校验系统将读取tokon中的数据来判断对应的Joiner是否已经宕机或者重启,如果发现Joiner已经宕机或者重启,检验系统将会在Id Registry删除对应的click id并将原始的click log重注入到Dispatch。由于检验系统只是需要读取丢失数据的token,所以可以在Id Registry里删除已经输出的数据的token,通过这样的方式可以减少系统所需要保存的数据量。

EventStore

1.  EventStore主要是提供了根据query id返回query log的服务,在Photon系统中提供了两种EventStore的实现,一种是CacheEventStore,主要利用event的时间局部性(大部分点击行为都是在展现后短时间内发生),另外一种是LogEventsStore,主要利用query log在日志文件中是近似按照timestamp有序保存
2. CacheEventStore是一个内存型cache,与分布式memcache相似,key是query id,value是query log,sharding规则是根据query id做一致性hash,cache的过期时间是分钟级别。其中CacheEventStore极大优化了读取延迟以及减少在LogEventsStore的随机IO读写
3.  LogEventsStore会通过读取保存在BigTable里的log file map来获得对应的query log在原始日志文件里的大致位置,从近似位置开始store会顺序读取一小部分数据来找到确切的数据。log file map里面保存了query id与文件路径及偏移的映射关系,EventReader会按照固定的间隔,比如每隔W秒或者M Bytes数据就会往log file map添加新的记录,EventReader同时是会往CacheEventStore填充数据。Bigtable提供了Range Query机制,因此Store会在Bigtable里扫描key在ServerIp:ProcessId:Timestamp-W到ServerIp:ProcessId:Timestamp之间的数据,只要reader按照固定周期更新log file map,就可以保证总可以通过一个记录找到实际的log

Performance Result

1. IdRegistry部署在三个地域的五个数据中心,每个地域最多有两个副本。由于只要有3/5的副本投票成功便可以commit操作,因此可以容忍一个地域的服务出现故障
2.  Dispatcher之类的服务会部署美国的东西海岸,同时会与log datacenters物理上很接近
3.  IdRegistry会有上千个分库在每个数据中心,同时在每个数据中心,Photon系统会有上千个Dispatcher&Joiner服务。Photon每天能够T级别的点击数据和几十个T级别的展现数据
4. 从读取原始日志到生成Join后日志,90%的处理延迟小于7s,之外还有cache命中率高、较好的数据中心级别的自动容错机制、吞吐能力强等特性

Design Lessons

1. 基于Paxos建立一个分布式系统,需要减少保存在Paxos上的元数据, 因为在不同数据中心之间同步数据的消耗是非常昂贵的
2. 为IdRegistry提供动态扩容机制能够极大系统的可扩展性
3. 通过RPC交互方式相比于数据通过磁盘交互能够极大降低延迟,但是应用层需要处理RPC失败或者重试等场景。同时为了避免系统的过载,让Dispatcher来负责整体的重试机制,能够使得其他worker通过RPC交互极其灵活
4.  模块的单一职责能够使的系统更具有可扩展性,在旧系统里Joiner和LogEventsStore的功能合并在同一个Worker,同时还CacheEventStore,这极大限制了系统的处理能力
5. 其他类似的系统是将多个event作为一个batch来处理,但是这些系统运行稳定要依赖于在一个batch里的所有event都能够同时处理成功。由于在Photon里的应用场景里,展现有可能延迟于点击,因此除非所有query log都已经到达,否则一个batch是不能commit的。因此在设计时需要根据具体的场景来考虑是否需要batch

Future Work

1. 将Phonton扩展成能够处理多流的Join,以及进一步降低处理的延迟:通过server直接发送query event和click event到Joiner,而不需要Dispatcher等日志拷贝到数据中心,这能够使大多数的数据得到最快的处理,剩下的missing event则交给原有的logs-base系统来处理

MillWheel学习笔记

来源于Google在vldb2013发的流式计算框架论文MillWheel: Fault-Tolerant Stream Processing at Internet Scale
http://db.disi.unitn.eu/pages/VLDBProgram/pdf/industry/p734-akidau.pdf

Introduction

1. 实时计算一般可用于实时反作弊或者异常监控等场景,需要 fault tolerance & persistent state & scanlability
2. MillWheel 提供了良好的编程模型和流式计算框架,对用户屏蔽了分布式、容错等各方面 细节,保证数据的不重不丢。同时提供优雅的保存处理进度机制
3. 其他流式系统不能同时满足容错、通用性和扩展性特点。比如 S4 不能保证数据不丢,而 Streaming SQL System (比如Spark Streaming) 在应对复杂的业务场景下SQL语言表达能力不足

Motivation And  Requirements

1. 应用场景google内部的Zeitgeist系统,用来监控检索Query的流量是否突增或者突降。主要是通过模型预测值与实际值做对比。
2. Persistent Storage: 有短期的时间窗口存储需求和长期的数据存储需求
3. LowWatermarks: 用来追踪分布式环境中所有等待处理的events,可以区分数据是延迟还是真的缺失。同时也可以规避应用场景中需要按照数据的严格时间序来处理数据
4. Duplicate Prevention: 保证数据不会被重复处理 ,除了保证不会出现突增的误判外,在计费系统也是有同样的需求
5.  MillWheet 提供了数据的不重不丢机制,能够优雅处理out-of-order数据,同时提供可靠存储,系统内部计算递增的LowWatermark等特性

System Overview

1. MillWheet可以认为是一堆算子组成的DAG图。这些算子可以跑在任意数量的机器上,所以用户不需要关心load balance方面的事情.同时可以支持动态调整拓扑,而不需要重启整个集群服务
2. 数据模型是使用一个三元组表示(key,value ,timestamps) 。算子可以自身的业务逻辑来定义处理的key,value可以是任意的二进制串,timestamps用户可以填任何的数值,但是通常是事件发生的clock time, 系统内部会根据timestamps来计算LowWatermark

Core Concerts

1. Computations: 用户的逻辑运行在算子里,当数据到达后,用户的逻辑将会被触发。如果用户的逻辑需要跟外界系统交互,需要自己保证在这些系统上的操作是幂等的。应用层的处理逻辑不需要关心Key会被不同的节点并行处理。
2. Keys: Keys是系统中最主要抽象的概念,通过定义不同的key extraction function,不同的消费者可以从同一个输入流抽取出不同的key,比如在Zeitgeist系统中可以将Query作为key,但在反作弊系统可以用Cookie来作为Key
3. Streams:Streams是不同算子之间的交互机制,一个算子可订阅一个或者多个Stream,也可以发布一个或者多个Stream
4. Persistent State:   持久化存储的内容通常是byte string,用户可以通过Protobuf之类的来序列化或者反序列。后端的存储系统满足了高可用、多副本等特性(Bigtable or Spanner)等,一般的应用场景是一段时间窗口内的Counter或者不同数据流之间的Join操作
5. Low WaterMark: lwm是个时间戳,反映了数据到达算子A的时间界,而一个算子A的 lwm定义为
min(oldest work of A , low watermark of C : C outputs to A)
C是算子A的上游算子,如果A没有输入Stream, 则lwm与oldest work相同, 其中oldest work of A 是指还没A处理的内部数据的最老时间戳。
MillWheet的数据导入系统Injector在每次导入数据时会设置数据源的lwm,但是这只是个估算值。因此内部系统可能会收到一小部分时间戳较旧的数据,可以根据自身的业务逻辑选择丢失或者更新内部累积量。但这里需要注意到,即使出现了老数据,lwm仍然保证是递增。
lwm的作用在于系统可以告诉用户逻辑某个时间点之前的数据已经到齐, 为了保证lwm的准确性,在用户处理逻辑里,当每次生成一个新的record或者聚合多个record时,用户给这些records赋予的timestamp不能小于源record
6. Timers:  钩子函数,在特定的wall time 或者 lwm value被触发, 用户层可以自身的业务场景是wall time还是lwm, 比如监控系统可以使用wall time触发来在固定的时间发监控邮件,而Zeitgeist系统可以使用lwm来判断是否出现流量突降。此外用户可以选择不设置Timers, 比如判断突增时,数据延迟到达往往不会影响判断的准确性。Timer被设置后,会按照递增的timestamp顺序被触发,同时会持久化到可靠存储里

API

1. Computation API:  用户一般只是需要实现ProcessRecord和ProcessTimer两个函数, 同时框架还提供了SetTimer & ProduceRecord等函数供调用
2. Injector and Low WaterMark API:  lwm在算子框架内部会被自动计算(计算的pending work包括了in-progress and queued deliveries),用户逻辑一般不会直接访问lwm,而是通过设置record的timestamp间接影响;  Injector在G内部一般是通用的组件,通常会有不同的实例部署在不同的机器上,而Injector的lwm会将这些值给聚合起来,一般情况下会使用还没发布完的输入文件的最老创建时间作为一个实例的lwm

Fault Tolerance

1. Exactly-Once Delivery:  在算子收到一个输入时,框架层会执行以下几个操作: 1) 输入的去重 2) 业务逻辑的处理,可能会修改timers/states/productions  3) 将修改commit到后端的存储系统 4) 给上游返回ack  5) 将production发给下游
为了保证数据不丢,上游必须需要接收到Ack包后才会停止发送,不然会一直重试,因为可能会出现下游机器故障等情况。当下游接收到重复的数据包后,为了防止重复处理,系统会为所有的records在生成阶段赋予一个全局唯一的ID,在commit阶段会将已处理的id与其他状态一次原子更新到后端。此外,由于本地内存有限,在本地内存中会使用bloom-filter方式加快查询已处理过的集合。这些在后端存储系统中保存的ID集合,在确保没有发送端在重试发送后,会被系统自动清除,一般情况下是分钟级别。而由于外部系统Injector往往会发送延迟数据,所以这部分数据的ID集合清理回收会在推迟到小时级别
2. Strong-Production: 在给下游发送Production数据之前先写Production checkpoint,称之为Strong-Production,否则会出现发送不一致的风险。比如一个算子需要将一个时间窗口当前的累积量发给下游,如果在发送数据但还没收到ack包之前出现宕机,然后到重启后继续retry发送这段时间内窗口累积量发生变化,将会导致前后两次发送的数据不一致。当Production处理成功后,Production checkpoint将会被删除。
3. Weak Productions and Idempotency: 前面的Extract-Once Delivery 和 Strong-Product保证了数据和状态的不重不丢。但由于很多应用场景自身业务逻辑的操作符合幂等的性质,从资源消耗和延迟优化角度考虑,框架层可以跳过输入去重阶段而允许数据重复处理,同时允许写Production checkpoint在发送数据给下游后-框架层的处理逻辑跟之前有些差异,为了保证数据不丢,需要在接收到下游的Ack包后才会返回给上游Ack(这里自己的理解是如果能够成成功给上游返回Ack也不需要再写checkpoint了)
但对于 Weak Productions,需要注意到先接受下游Ack再返回上游Ack的机制在下游处理失败的异常场景下可能会增大整条流水线的处理延迟,因此可以在等待下游的Ack包一段时间后可以先写Production checkpoint,然后理解返回上游Ack,这样的优化机制能够取得延迟减少和总额外资源消耗减少的双重收益
4. State Manipulation: 系统中保存的状态包括了hard-state(在后端存储系统中) 和soft-state(本地内存,一般是本地cache数据或者聚合统计数据)。由于负载均衡或者机器故障等原因,有可能出现两个Worker同时处理同一份数据,然后出现重复写hard-State,造成数据不一致的风险,因此系统会为每次写操作都会绑定一个sequencer token(类似于抢占式机制,新Worker启动后会先令原有token失效), 保证对同一个key只会有一个写端。
而对于管理soft-state,同样需要保证只有一个写端(Single-Writer),考虑这样一种场景:Worker A在还没有Commit状态到后端存储前,新的Worker A-Prime启动从后端存储扫描数据同步到本地内存,然后Worker-A Commit成功然后返回上游Ack,但是A-Prime同步的之前的旧状态数据,导致出现丢失状态情况,最终会导致整个系统数据流处理混乱

System Implementation

1. MillWheel 建立在分布式环境上,每种算子可以跑在一个或者多个机器节点,Streams交互通过RPC方式
2.  由Replica Master来负责负载均衡操作,对于每种算子都将可处理的key集合划分为不同的连续区域,然后分配到不同的节点。同时会根据机器负载情况,将区域进行合并,拆分或者迁移等操作
3. 后端存储系统使用Bigtable或者Spanner,可以提供单行事务特性。对于一个key的不同的相关状态,比如Timers/Pending Productions/Persistent State都会保存到同一行里
4. 当新的worker重启后,需要从后端存储系统读取状态,然后构建本地内存的Timer Heap或者Production Checkpoint队列。为了保证状态的一致性,因此需要框架提供前面所提到的Single-Writer机制
5. 系统内部会有central authority子系统来跟踪所有的lwm并持久到可靠存储。每个进程会计算自身的lwm(包括Pending Production/Timer等数据,即是之前提到的oldest work),然后汇报给central authority。通常情况下由于本地有内存cache,进程在计算lwm时不需要访问Bigtable,因而计算效率够高。
由于每个进程处理的key的范围都是一段区域,所以进程的lwm也是先按自己的区域聚合后才汇报 到central authority,而central authority会判断是否所有的区域都齐全,若某段区域的数据没有收集则会使用上一次的计算值。最后central authority将各个进程的lwm广播到系统内部各个节点。
每个进程都会通过订阅方式获取到它上游算子的lwm数值,然后结合自身的lwm,计算出最后算子的lwm,再在适当的时候触发Timer. 同时为了保证lwm数据的一致性,每个区域的lwm更新同样会绑定一个sequencer token,类似于之前的Single-Write机制。

Evaluation

1. MillWheel在Google里用途广泛,比如生成广告主实时报表时需要Stream Join,也用于谷歌街景的图像处理等场景
2. 在应用场景中一般建议key要划分均匀防止出现了热点数据导致单台机器负载过重成为了系统整体瓶颈。如果数据出现大规模延迟,那么会造成依赖于lwm的Timer迟迟不能被触发,本地内存不能及时计算完后刷到后端存储系统(因为框架层为了性能优化,避免每一次都从后端存储系统读取数据,会在本地内存保存一段窗口内的数据用于统计),最终将会导致系统整体内存出现上涨风险。一般情况下都是限制injector再导入新的数据直到lwm恢复到正常。

 

对照实验系统论文阅读笔记(一)

对于互联网公司来说,使用对照实验系统(常被称作A/B Testing,Controlled Experiments,与科学研究中常用的对照实验的原理是完全一致的)来做很多决定都是非常有效和必要的。我们有理由相信大规模地、有效地应用对照实验,是Google公司和百度广告系统取得成功的重要原因之一。:)

这篇论文Practical Guide to Controlled Experiments on the Web是2007年Kohavi等人在KDD发表的论文,是比较早期的介绍在互联网系统中应用对照实验的文章,这次我们从这篇论文的翻译和学习笔记开始介绍对照实验系统。

优势

使用对照实验系统被称作“听用户的意见”,这与用户调查不同,用户自己也未必清楚他们的行为在指定的情况下会是什么样,或者他们的体验是好是坏。然而在互联网上面,他们在用鼠标投票,依次做决定比听从“专家”的意见要靠谱和快速。论文中把公司里的专家的意见称作“High Paid Person’s Opinion”,取了个简称叫HiPPO(河马)。-_-|| 事实的确如此,当使用对照实验系统的时候,互联网公司就从争执中解脱了出来,以用户的反馈为向导在可量化的道路上快速地优化创新~~ 可惜这只是理想情况,实践中对实验系统实践的好坏,可以造成巨大的差别。

对照实验

最简单的对照实验系统,是将用户分成50%对50%的两组,一组为实验组,一组为对照组。对指定的指标,比较两组的统计结果,得出实验组的效果。

实验系统的结果评价中存在一个非常重要的统计问题,随机划分用户之后,会产生抽样的误差。可以计算,对于不同的置信水平,要达到不同的实验精度,需要不同的总流量数。对此这篇论文讲得非常简单,我们在以后对其他论文的介绍中会再提到。这篇论文只简单地举个例子,如果要以实验指标相对变化5%为精度,在95%的置信水平下,需要500,000个流量(用户)。需要的流量数与实验精度是平方反比的关系,一般来讲一个改进能提高相对1%已经算是很不错了,所以说需要的流量通常还是相当大的,小流量的系统在这里会非常痛苦,因为这意味着更长的实验时间。

实验的评价标准应当在实验进行之前就确定,换句话说就是用哪些指标做判断应该先选,否则,如果有很多已有的标准,然后在实验过程中看哪个是正面的并且达到了“精度”就认为存在这个效果,就落入了著名的统计学错误“我早就知道了”。而在实践中,这项原则很多人都”不愿意”遵守。

扩展

实验组的流量比例可以逐步扩大,每一个阶段都可以运行相当长的时间,这样可以避免实验组的负面效果或者bug对整个系统造成过大的影响。而这个过程也可以自动进行。其实与这种方法类似的,就是所谓的灰度发布。

实验甚至可以全自动进行,考虑系统中的某些参数是需要不断随时间调整的情况,可以先指定标准,然后随时自动地根据实时实验结果调整参数。

对照实验可以用做程序迁移时的检查,也就是观察一次新的变更是否并不影响用户体验。对于某些公司来说,这可能才是对照实验最重要的用途。

不足

对照实验是有效的,可是却不能带来解释。

短期利益和长期利益不一致的问题。解决这个问题,应当制定更合理的综合性的标准,但是这件事情并不容易做。

先入为主或者新鲜感造成的影响。这是两个相反的效应,为了识别和避免,可以考虑用新用户做一组同样的实验进行对比。

要实验的功能必须先实现。

一致性问题,用户可能会发现他们在使用一个行为不一致的系统。即使按用户抽样也不一定能避免这个问题。

同时进行的实验的相互影响。这篇论文认为实际上这种情况很少发生,这方面的担心常常是过度的。另一方面,可以做自动的检测来判断实验间是否互相影响了。不过,说起来容易做起来难。

信息公开问题。对于较大的改动可能必须要先公布才能开始上线。然而公开的信息可能影响用户行为。

实现

论文中提及的是,使用足够强度的随机算法,将用户根据标签计算散列,并为每个散列准备对应的partition,在抽样时选择指定的partition就可以实现任意比例的抽样。然而这好像并不是一种非常好的实现方式。

对于流量的分派,有三种方法,第一是split,包括物理上的或者虚拟的。物理划分大概是最直接的一种方法,但是这种方法需要一个分布式的配置甚至程序的部署系统。第二种方法是在server端选择,server端程序在处理过程中计算随机函数,并且给出不同的行为。这种方法带来了额外的编程代价,通常变更实验时还需要改变代码。第三种方法是在client端选择,按论文所说,这种是通过JavaScript之类的方法实现的,感觉很受局限,似乎意义不大。

实践中学到的

作者这里引用了一句很经典的话:在现实中理论和现实的差距比理论上要大得多

对数据做分析:整体上指标的结果只是一个方面,实验实际上提供了丰富的信息可以做研究。不过这里要小心之前提到的”我早就知道了“错误。

性能很重要:不知不觉拖慢了网页的加载速度可能会影响很多指标,程度往往令人惊讶。当你发现实验指标很难解释的时候多想想是否是性能的问题。

是否一次只实验一件事?有些人认为一次应该只实验一件事,作者认为这个限制过于严格了。与之前提到的一样,大多数情况下实验并不会互相影响(真的是这样吗?),另一方面技术上让实验并行又是完全可行的,因此作者建议这样的做法:只对确实很可能互相影响的事情分开做实验。在以后我们也许会讨论实验的并行以及实验间相互影响的检测之类的话题。

持续运行A/A测试:与其他测试同时,持续地运行A/A测试。这可以用来检测用户是否被合理划分,数据是否被正确收集,以及设想的置信水平是否确实可靠。

自动的灰度发布和放弃:理论上这是可行的,而且实践中确实也有人已经这样做了。也许,自动调参会是更激进,也更有用的。

了解时间周期效应:在一天的不同时段,或者一周的不同日期,用户行为可能会有很大的差异。因此即使你有足够的流量,最好也要让实验运行数个整天,或者数个整周。

实验的评价标准一定要在实验之前就确定:呵呵,又一次见到这条,这件事确实非常值得强调,而且在很多地方很遗憾地没有被遵守。另外,作者在这里更多强调的是实验各方应当对此达成一致,也就是说在实验前就统一认识,确认哪些指标的变化是符合预期的。为了解决类似的问题,我们在以后的文章中可能会见到另一种方法,通过一个权威机构“实验委员会”来最终解释实验。

 

 

 

Scaling Memcache at Facebook学习笔记

基本情况:

  1. the largest memcached installation in the world
  2. provides a suite of configuration, aggregation and routing services to organize memcached instances into a distributed system
  3. 使用旁路型cache-demand-filled look-aside cache
  4. 在不同的Scale需要解决的问题重点不一样:performace efficiency fault-tolerance consistency

基本操作

  1. Query Cache:先从memcache读取,然后出现miss,再从DB读取,最后写入cache
  2. Update Cache:  先update DB 然后在cache中删除。采用delete cache操作,是因为delete是幂等操作,可以防止出现并发写造成的数据不一致现象

单集群场景

  1. 一个集群内部有thousands of server, 面临的主要问题有两个:读取cache的延迟以及cache miss造成的负载
  2. 解决延迟主要的方式主要在Client端的优化,Client主要负责了序列化、压缩等操作,同时会通过Configuration System更新下游可用的Cache Server
  3. 延迟优化方案-并行化和批量请求:会根据数据之间的依赖关系构造一个DAG,根据DAG最大化可以并行请求的数据数。平均一次请求会有24个key
  4. 延迟优化方案-不同场景分别使用UDP和TCP。对于读请求使用了UDP,前端每个Client的每个线程都直接与memcache通信。Client端在读取失败时会认为是cache miss,但是在这种场景下的cache miss并不会把从DB读取的结果再写入cache,这是为了避免给整个集群的网络带来额外负担。对于更新请求使用了TCP,Client端通过一个部署在同一机器上的mcrouter与memcache交互,引入中间层可以减少Client每个线程维护大量TCP连接带来的网络、内存和CPU消耗。按照论文而言,relying on UDP can lead to a 20% reduction in latency to server requests
  5. 延迟优化方案-使用滑动窗口的流控机制来减少incast congestion。当一个Client发出了大量读请求,如果这些请求的response同时到达,将会对机架和集群交换机的带宽造成了过载压力。滑动窗口过小会造成不必要的延迟,窗口过大会造成incast congestion,导致落到后端DB的压力变大,进一步延迟会变大
  6. 负载优化方案-采用Lease机制。Lease是一个64-bit的Token,由MemCache在读请求时分配给Client,可以用来解决两个问题:stale sets 和thundering herds。stale sets是指在Query Cache和Update Cache并发执行时,可能会导致Query Cache在Cache Miss的情况下会往cache中写入老数据,通过在写入时加Lease验证+Lease会在接收到delete操作时会失效的机制可以解决这个问题。thundering herds发生在一些特殊的key写操作频繁同时很多并发读请求。当大量读请求出现cache miss时就会导致后端压力过大。Cache可以在原有的lease机制做些小改进,并不是每个读请求都分配Lease,而是每个key 10s才分配一次。那么Client在没有获取到lease时 可以稍等一段时间再重新访问cache,此时cache中已有数据。通过这样的方式,可以使得这些特殊的key的DB读请求由17K/s减少到1.3K/s。此外还有个Stale values的tricky技巧,当key接收到删除操作时,value会放到一个最近删除列表,等待过一段短时间才会真正删除(类似于延迟删除)然后部分应用可以接受读取这些待删除的不一致的脏数据。
  7. 负载优化方案-拆分成Memcache Pools。如果不同的应用共享同一个Cache集群,那么使用cache的场景、内存和负载等都会对cache命中率有消极作用。因此为了适应这种差异性,将会把一个集群的cache server拆分成不同的Pool。比如会对一些访问频率低但是cache miss代价很高的key分配large pool。
  8. 负载优化方案-Memcache Pools的多副本机制。对于并发访问key多,数据集不超出一两台cache server的容量范围但是请求数超过了单机处理能力的Pools,会将cache数据作replication。而主要的问题就在于如何报纸多副本的数据一致性。
  9. 负载优化方案-容错处理。对于小规模的cache访问失败(比如由于网络原因或者server宕机),引入了Gutter Pool机制。Gutter是一个小规模cache集群,大致占集群的1%机器。在第一次正常访问cache失败后,会继续访问Gutter,如果再次访问失败,将会把从DB读取到的数据填入到Gutter。Gutter Pool里面的数据过期时间较短,因此没有delete操作,带来的问题就是会读取短期的脏数据。而直接把key rehash到其他剩余存活机器的方法相比于Gutter Pool方式,会存在雪崩的风险。因为一些热门key被rehash会会增加当前机器的负载,可能会导致过载。而Gutter Pool一般负载较低,可以降低这种风险。

多集群场景-In a region

  1. Split web and memcached servers into multiple frontend clusters. These clusters, along with a storage cluster that contain the databases, define a region. 主要考虑的问题是如何在同一个region内部多个集群之间备份数据
  2. Regional Invalidations-在一个region内部会让storage cluster负责cache数据失效的功能。每个DB上会部署一个mcsqueal,负责从commit log里面提取出删除操作,然后发送到前端。mcsqueal并不是直接把删除操作广播到所有Cache Server,而是先转发到一些特定机器上的mcrouter,由mcrouter根据每个key转发到合适的Cache Server上,这是为了减少集群网络包的个数以及网络压力。而相比于直接由web server直接广播删除操作的方式,一来是可以更好地packet delete操作,二来是通过log的方式可以更方便的回放操作,这可以解决delete操作发送失败以及发错到其它cache Server的问题。此外还存在一个优化手段,一个web server修改数据后会往本集群的cache发送delete操作,这是为了能够给客户端提供一个read-after-write语义(即修改后需要读取新数据)以及减少脏数据在本地存活的时间。
  3. Regional Pools-如果落在前端的请求是随机转发,那么所有cache集群保存的key都基本一致,这种数据副本过多会造成内存的极大浪费。为了减少不必要的数据副本数,可以将多个前端集群共享一批cache Server的方式,这个称之为Regional Pools。数据备份是用更多的机器来换取更少的带宽使用、更少的延迟和更好的容错。而Regional Pools是希牺牲了数据备份的优势,在Regional Pools内部跨集群的通信延迟会变大,因此决定哪些key需要放到Regional Pools需要考虑这些因素:访问频率,数据大小和独立用户访问数,一般访问频率低、数据量小的数据会放到Regional Pools。
  4. Cold Cluster Warm Up-新加入的cache集群由于cache命中率较低,为了避免cache miss对后端DB压力过大,可以允许从有正常命中率的cache集群(Warm Cluster)读取数据.需要注意可能会出现读到脏数据的场景,这是因为delete操作没有及时同步到Warm Cluster,会出现从Warm Cluster读取到旧数据然后再写入到Cold Cluster。可以采取延迟删除的机制来解决,对Cold Cluster的delete操作会延迟2s后才真正删除,在此之间任何add操作都会失败。虽然2s这个时间理论上仍然存在读取到脏数据的可能,但是实际上在大多数情况下是可行的。当Cold Cluster cache命中率正常后,将会关闭填充这种填充cache机制。

多集群场景-Across Region

  1. 多个数据中心情况下会建立多个regions. 其中一个region会放置主DB,而其他region提供只读的replica,主从之间通过mysql的同步机制来同步数据。在跨集群的场景下面临的主要问题是如何解决由于主从同步延迟造成的数据不一致问题。
  2. 主从同步延迟主要是可能Server A已经往主库写入新数据了,但Server B由于cache miss由从库中读到的还是老数据,那么填入cache的就是脏数据,可以使用remote marker机制来解决cache和DB数据不一致问题。在需要更新DB数据之前,先根据key生成一个remote marker r(key)并更新cache region中,这是用来标明从库的数据是脏数据。然后再按照正常的逻辑更新DB数据。而在读取数据时如果出现cache miss,那么会先检查是否存在r(key),如果有则从主库读取数据,否则由从库读取数据。当从库同步完数据会由mcsqueal在cache region中删除r(key)。
  3. 这种remote marker机制会存在一种风险,当多个Web Server出现并发写同一个key的时候可能会导致r(key)被过早删除。按照论文中注明,这种冲突场景极少出现

单机优化

  1. 性能优化-允许内部使用的hashtable自动扩张(rehash)、由原先多线程下的全局单一的锁改进为更细粒度的锁以及每个线程一个UDP来避免竞争
  2. 内存优化-改进Slab Allocator,允许不同Slab Class互换内存。当发现要删除的item对应的Slab Class最近使用的频率超过平均的20%,将会把最近最少使用的Slab Class的内存交互给需要的Slab Class使用。整体思路类似一个全局的LRU。
  3. Transient Item Cache-某些key的过期时间会设置的较短,如果这些key已经过期但是还没达到LRU尾部时,它所占用的内存在这段时间就会浪费。因此可以把这些key单独放在一个由链表组成的环形缓冲(即相当于一个循环数组,数组的每个元素是个链表,以过期时间作为数组的下标),每隔1s,会将缓冲头部的数据给删除。
  4. 软件升级-memcache的数据是保存在System V的共享内存区域,方便机器上的软件升级。