252
阿里云
技术社区[云栖]
小规模的流处理框架.Part 1: thread pools
(译者:强力推荐这篇文章,作者设计了一个用于小流量的流式数据处理框架,并详细给出了每一个需要注意的设计细节,对比了不同设计方案的优缺点,能够让你对流处理过程,某些设计模式和设计原则以及指标度量工具有一个更深刻的认识!)
在GeeCON 2016上我为我的公司准备了一个编程竞赛,这次的任务是设计并实现一个能够满足以下要求的系统:
系统能够每秒处理1000个任务,每一个Event至少有2个属性:
- clientId-我们希望每一秒有多个任务是在同一个客户端下处理的(译者:不同的clientId对应不同的ClientProjection,即对应不同的一系列操作)
- UUID-全局唯一的
消费一个任务要花费10毫秒,为这样的流设计一个消费者:
- 能够实时的处理任务
- 和同一个客户端有关的任务应该被有序地处理,例如你不能对拥有同一个clientId的任务序列使用并行处理
- 如果10秒内出现了重复的UUID,丢弃它。假设10秒后不会重复
有几个关于以上要求的重要细节:
- 1000events/s的任务量,消耗一个event要10ms,1s内能消耗100个event,那么为了保证实时性,就需要10个并发的消费者。
- events拥有聚集的ID(clientId),在1s内我们希望多个event能够被指定到同一个给定的client上,并且我们不能够并发地或无序地处理这些event。
- 我们必须以某种方式忽略重复的信息,最可能的方法就是记住最近10s内所有的ID,这就需要暂时保存一万个UUID。
在这篇文章中,我会引导你们使用一些成功的方案并做一些小小的突破,你将要学习如何使用精确地有针对性的度量器来解决问题。
Naive sequential processing
我们可以在迭代器中处理这个问题,首先我们可以对API做一些假设,想象一下它会是这个样子:
01 |
interface EventStream {
|
03 |
void consume(EventConsumer consumer);
|
08 |
interface EventConsumer {
|
09 |
Event consume(Event event);
|
15 |
private final Instant created = Instant.now();
|
16 |
private final int clientId;
|
17 |
private final UUID uuid;
|
一个典型的推送式API,和JMS很像。需要注意的是EventConsumer是阻塞的,这就意味着它不会返回新的Event,除非前一个已经被处理完毕了。这仅仅是我做出的一个假设,而且它没有太大的违反之前的要求,这也是JMS中消息侦听者的工作机制。下面是一个简单的实现,这个实现只是简单的添加了一个工作间隔为10ms的侦听器:
1 |
class ClientProjection implements EventConsumer {
|
4 |
public Event consume(Event event) {
|
5 |
Sleeper.randSleep( 10 , 1 ); //译者:这里只是用睡眠来代替实际编程中一些耗时的操作
|
当然在现实生活中这个consumer可能会在数据库中做一些存储操作,或者进行远程调用等等。我在睡眠时间的分布上添加了一些随机性,目的是使得手动测试更加贴近实际情况(译者:实际情况中耗时操作的用时不尽相同,所以要随机化):
03 |
private static final Random RANDOM = new Random();
|
05 |
static void randSleep( double mean, double stdDev) {
|
06 |
final double micros = 1_000 * (mean + RANDOM.nextGaussian() * stdDev);
|
08 |
TimeUnit.MICROSECONDS.sleep(( long ) micros);
|
09 |
} catch (InterruptedException e) {
|
10 |
throw new RuntimeException(e);
|
18 |
EventStream es = new EventStream(); //some real implementation here
|
19 |
es.consume( new ClientProjection());
|
以上的代码能够编译并运行,但为了满足设计要求我们必须要插入一些度量器。最重要的度量器就是有关于信息消费的潜伏期,这个潜伏期指的是从信息的产生到开始处理的这段时间。我们使用 Dropwizard Metrics来实现这个潜伏期的度量:
01 |
class ClientProjection implements EventConsumer {
|
03 |
private final ProjectionMetrics metrics;
|
05 |
ClientProjection(ProjectionMetrics metrics) {
|
06 |
this .metrics = metrics;
|
10 |
public Event consume(Event event) {
|
11 |
metrics.latency(Duration.between(event.getCreated(), Instant.now()));
|
12 |
Sleeper.randSleep( 10 , 1 );
|
ProjectionMetrics类的功能如下(主要就是将event的潜伏期用柱状图的形式表现出来):
01 |
import com.codahale.metrics.Histogram;
|
02 |
import com.codahale.metrics.MetricRegistry;
|
03 |
import com.codahale.metrics.Slf4jReporter;
|
04 |
import lombok.extern.slf4j.Slf4j;
|
06 |
import java.time.Duration;
|
07 |
import java.util.concurrent.TimeUnit;
|
10 |
class ProjectionMetrics {
|
12 |
private final Histogram latencyHist;
|
14 |
ProjectionMetrics(MetricRegistry metricRegistry) {
|
15 |
final Slf4jReporter reporter = Slf4jReporter.forRegistry(metricRegistry)
|
17 |
.convertRatesTo(TimeUnit.SECONDS)
|
18 |
.convertDurationsTo(TimeUnit.MILLISECONDS)
|
20 |
reporter.start( 1 , TimeUnit.SECONDS);
|
21 |
latencyHist = metricRegistry.histogram(MetricRegistry.name(ProjectionMetrics. class , "latency" ));
|
24 |
void latency(Duration duration) {
|
25 |
latencyHist.update(duration.toMillis());
|
现在当你运行这个解决方案时,你很快就会发现潜伏期的中值和第99.9%的值(分别指的是第count/2个值和第99.9%*count个值)都在无限增长:
1 |
type=HISTOGRAM, [...] count= 84 , min= 0 , max= 795 , mean= 404.88540608274104 , [...]
|
2 |
median= 414.0 , p75= 602.0 , p95= 753.0 , p98= 783.0 , p99= 795.0 , p999= 795.0
|
3 |
type=HISTOGRAM, [...] count= 182 , min= 0 , max= 1688 , mean= 861.1706371990878 , [...]
|
4 |
median= 869.0 , p75= 1285.0 , p95= 1614.0 , p98= 1659.0 , p99= 1678.0 , p999= 1688.0
|
6 |
[... 30 seconds later...]
|
8 |
type=HISTOGRAM, [...] count= 2947 , min= 14 , max= 26945 , mean= 15308.138585757424 , [...]
|
9 |
median= 16150.0 , p75= 21915.0 , p95= 25978.0 , p98= 26556.0 , p99= 26670.0 , p999= 26945.0
|
在运行了30s之后我们的应用程序处理event会出现平均15s的延迟,因此它并不具备完整的实时性,显然缺少并发才是原因所在。我们的ClientProjection事件消费者会花费10ms去完成事件处理,所以它每秒最多可以处理100个event,然而我们需要更多的处理量。我们必须要增强ClientProjection同时不违反其他的设计要求!
Naive thread pool
最显而易见的解决方法是对EventConsumer使用多线程技术,最简单的实现途径就是利用ExecutorService:
01 |
import java.util.concurrent.ExecutorService;
|
02 |
import java.util.concurrent.Executors;
|
04 |
class NaivePool implements EventConsumer, Closeable {
|
06 |
private final EventConsumer downstream;
|
07 |
private final ExecutorService executorService;
|
09 |
NaivePool( int size, EventConsumer downstream) {
|
10 |
this .executorService = Executors.newFixedThreadPool(size);
|
11 |
this .downstream = downstream;
|
15 |
public Event consume(Event event) {
|
16 |
executorService.submit(() -> downstream.consume(event));
|
21 |
public void close() throws IOException {
|
22 |
executorService.shutdown();
|
这里我们使用了装饰者模式。最初的ClientProjection实现EventConsumer是可行的,但我们利用加入了并发的另一个EventConsumer实现对ClientProjection进行包装。这就允许我们能够将更复杂的行为组合起来而不用更改ClientProjection本身,这种设计可以:
- 解耦:不同的EventConsumer互不影响,但它们却可以自由地组合在一起,在同一个线程池中工作
- 单一职责:每个EventConsumer只做一项工作,并将自己委托给下一个组件即线程池
-
开放/关闭原则:我们可以改变系统的行为却不用修改现有实现
开放/关闭原则通常可以通过注入策略模式和模板方法模式来实现,这很简单。整体的代码如下:
01 |
MetricRegistry metricRegistry = |
03 |
ProjectionMetrics metrics = |
04 |
new ProjectionMetrics(metricRegistry);
|
05 |
ClientProjection clientProjection = |
06 |
new ClientProjection(metrics);
|
08 |
new NaivePool( 10 , clientProjection);
|
09 |
EventStream es = new EventStream();
|
10 |
es.consume(naivePool); |
我们写的度量器显示这种改良的方案确实表现的更好:
1 |
type=HISToOGRAM, count= 838 , min= 1 , max= 422 , mean= 38.80768197277468 , [...]
|
2 |
median= 37.0 , p75= 45.0 , p95= 51.0 , p98= 52.0 , p99= 52.0 , p999= 422.0
|
3 |
type=HISTOGRAM, count= 1814 , min= 1 , max= 281 , mean= 47.82642776789085 , [...]
|
4 |
median= 51.0 , p75= 57.0 , p95= 61.0 , p98= 62.0 , p99= 63.0 , p999= 65.0
|
6 |
[... 30 seconds later...]
|
8 |
type=HISTOGRAM, count= 30564 , min= 5 , max= 3838 , mean= 364.2904915942238 , [...]
|
9 |
median= 352.0 , p75= 496.0 , p95= 568.0 , p98= 574.0 , p99= 1251.0 , p999= 3531.0
|
我们可以看到延迟虽然也在增长但规模却小得多,30s后潜伏期达到了364ms。这种潜伏期增长是系统问题,我们需要更多的度量器。注意到NaivePool(你会明白为什么这里是naive-初级的)会开启10条线程,这应该足以处理1000个event,每个要花费10ms。在实际情况下,我们需要一点额外的处理容量来避免因垃圾回收或小规模峰值负荷所带来的问题。为了证明线程池才是我们的瓶颈,我们要监控它内部的队列,这需要一点小小的工作量:
01 |
class NaivePool implements EventConsumer, Closeable {
|
03 |
private final EventConsumer downstream;
|
04 |
private final ExecutorService executorService;
|
06 |
NaivePool( int size, EventConsumer downstream, MetricRegistry metricRegistry) {
|
07 |
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
|
08 |
String name = MetricRegistry.name(ProjectionMetrics. class , "queue" );
|
09 |
Gauge<Integer> gauge = queue::size;
|
10 |
metricRegistry.register(name, gauge);
|
11 |
this .executorService =
|
12 |
new ThreadPoolExecutor(
|
13 |
size, size, 0L, TimeUnit.MILLISECONDS, queue);
|
14 |
this .downstream = downstream;
|
18 |
public Event consume(Event event) {
|
19 |
executorService.submit(() -> downstream.consume(event));
|
24 |
public void close() throws IOException {
|
25 |
executorService.shutdown();
|
这里使用ThreadPoolExecutor的目的是为了能够提供自定义的LinkedBlockingQueue实例,接下来就可以监控队列的长度(see:ExecutorService – 10 tips and tricks)。Gauge会周期性地调用queue::size,你需要的时候就会提供队列的长度。度量器显示线程池的大小确实是一个问题:
1 |
type=GAUGE, name=[...].queue, value= 35
|
2 |
type=GAUGE, name=[...].queue, value= 52
|
4 |
[... 30 seconds later...]
|
6 |
type=GAUGE, name=[...].queue, value= 601
|
不断增长的队列长度进一步加剧了队列内正在等待着的task的潜伏期,将线程池的大小增加到10到20之间,最终队列的长度显示合理并且没有失控。然而我们仍然没有解决重复ID问题,并且也没有解决同一个clientId可能会对它的events进行并发处理的问题。
Obscure locking
让我们从避免对拥有相同clientId的events使用并行处理开始。如果两个有相同clientId的event一个接一个地来,相继进入线程池队列,那么NaivePool会几乎同时将它们取出队列实现并行处理。开始的时候我们可能会想到对每一个clientId加一个Lock:
02 |
class FailOnConcurrentModification implements EventConsumer {
|
04 |
private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();
|
05 |
private final EventConsumer downstream;
|
07 |
FailOnConcurrentModification(EventConsumer downstream) {
|
08 |
this .downstream = downstream;
|
12 |
public Event consume(Event event) {
|
13 |
Lock lock = findClientLock(event);
|
16 |
downstream.consume(event);
|
21 |
log.error( "Client {} already being modified by another thread" , event.getClientId());
|
26 |
private Lock findClientLock(Event event) {
|
27 |
return clientLocks.computeIfAbsent(
|
29 |
clientId -> new ReentrantLock());
|
以上的代码完全搞错方向了,这种设计太过于复杂,但运行代码至少会发现一个问题。events的处理过程就像下面这样,由一个装饰者包裹着另一个:
1 |
ClientProjection clientProjection = |
2 |
new ClientProjection( new ProjectionMetrics(metricRegistry));
|
3 |
FailOnConcurrentModification failOnConcurrentModification = |
4 |
new FailOnConcurrentModification(clientProjection);
|
6 |
new NaivePool( 10 , failOnConcurrentModification, metricRegistry);
|
7 |
EventStream es = new EventStream();
|
一旦运行过一会儿错误信息就会弹出来,告诉我们在其他线程中已经在处理拥有相同clientId的event。我们为每一个clientId都绑定了一个Lock,这样做的目的是为了弄清楚如果其他的线程没有处理的时候client的状态。这种丑陋的方法让我们的方案变得惨不忍睹,与其因获取不到Lock而抛出错误信息,还不如等待一下,等待Lock被释放:
02 |
class WaitOnConcurrentModification implements EventConsumer {
|
04 |
private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();
|
05 |
private final EventConsumer downstream;
|
06 |
private final Timer lockWait;
|
08 |
WaitOnConcurrentModification(EventConsumer downstream, MetricRegistry metricRegistry) {
|
09 |
this .downstream = downstream;
|
10 |
lockWait = metricRegistry.timer(MetricRegistry.name(WaitOnConcurrentModification. class , "lockWait" ));
|
14 |
public Event consume(Event event) {
|
16 |
final Lock lock = findClientLock(event);
|
17 |
final Timer.Context time = lockWait.time();
|
19 |
final boolean locked = lock.tryLock( 1 , TimeUnit.SECONDS);
|
22 |
downstream.consume(event);
|
27 |
} catch (InterruptedException e) {
|
28 |
log.warn( "Interrupted" , e);
|
33 |
private Lock findClientLock(Event event) {
|
34 |
return clientLocks.computeIfAbsent(
|
36 |
clientId -> new ReentrantLock());
|
这次的设计和之前的很像,但不同的是tryLock()会持续1s的时间以等待指定client的Lock被释放。如果两个有相同clientId的event相继出现,其中一个会获取到Lock进行处理,而另一个会一直阻塞直到unlock()被调用。
这段代码不仅复杂,而且在某些微妙的情况下可能会发生不可预知的错误。例如,如果两个有相同clientId的event几乎在同一时刻出现,那么谁将会是第一个?两个event会在同一时刻请求Lock,这时我们并不能保证哪一个event会第一个得到非公平锁,处理event的顺序可能就会发生混乱。肯定会有更好的方法…
Dedicated threads
让我们退一步,深吸一口气。你会怎样确保事情不会并行发生?仅仅使用一个线程就行了!事实上这是我们最开始的做法,但它的处理流量并不理想。我们不用关心不同clientIds的并发情况,我们只需要确保有相同clientId的events由一个专有线程处理就行。
你可能会想到使用一个map将clientId映射到Thread,当然这太简单了。我们可能会创造上千个线程,而它们大多数的时候可能都处于空闲状态(对于给定的clientId每秒可能只处理少数几个event)。一个很好的折中是使用固定大小的线程池,每个线程负责指定的一些clientId。在这种方法中,两个不同的clientId可能会在同一个线程中完成处理,但相同的clientId总是在同一个线程中处理。如果两个有相同clientId的event出现了,它们都会被送去同一个线程,因此为了避免并发处理,以下实现相当简单:
01 |
class SmartPool implements EventConsumer, Closeable {
|
03 |
private final List<ExecutorService> threadPools;
|
04 |
private final EventConsumer downstream;
|
06 |
SmartPool( int size, EventConsumer downstream, MetricRegistry metricRegistry) {
|
07 |
this .downstream = downstream;
|
08 |
List<ExecutorService> list = IntStream
|
10 |
.mapToObj(i -> Executors.newSingleThreadExecutor())
|
11 |
.collect(Collectors.toList());
|
12 |
//译者:这里使用CopyOnWriteArrayList是为了保证访问threadPools里面元素时是线程安全的
|
13 |
this .threadPools = new CopyOnWriteArrayList<>(list);
|
17 |
public void close() throws IOException {
|
18 |
threadPools.forEach(ExecutorService::shutdown);
|
22 |
public Event consume(Event event) {
|
23 |
final int threadIdx = event.getClientId() % threadPools.size();
|
24 |
final ExecutorService executor = threadPools.get(threadIdx);
|
25 |
executor.submit(() -> downstream.consume(event));
|
关键点是最后的那部分:
1 |
int threadIdx = event.getClientId() % threadPools.size();
|
2 |
ExecutorService executor = threadPools.get(threadIdx); |
这个简单的算法总是为相同的clientId使用同一个ExecutorService单线程,不同的ID可能会在同一个线程内处理,例如当threadPools的大小为20时,Id为7, 27, 47的client都会在索引为7的线程内处理。虽然一个线程会对应多个clientId,但只要一个clientId在同一个线程内处理就行了。基于这点,锁就不需要了,顺序调用也就得到了保障。边注:一个clientId对应一个线程可能产生无法预估的后果,但一个actor对应一个clientId(例如在Akka里面就是如此)就简单许多。
顺便为了保证安全,我为每一个线程池都插入了度量器以监控它们的队列长度,实现如下:
01 |
class SmartPool implements EventConsumer, Closeable {
|
03 |
private final List<LinkedBlockingQueue<Runnable>> queues;
|
04 |
private final List<ExecutorService> threadPools;
|
05 |
private final EventConsumer downstream;
|
07 |
SmartPool( int size, EventConsumer downstream, MetricRegistry metricRegistry) {
|
08 |
this .downstream = downstream;
|
09 |
this .queues = IntStream
|
11 |
.mapToObj(i -> new LinkedBlockingQueue<Runnable>())
|
12 |
.collect(Collectors.toList());
|
13 |
List<ThreadPoolExecutor> list = queues
|
15 |
.map(q -> new ThreadPoolExecutor( 1 , 1 , 0L, TimeUnit.MILLISECONDS, q))
|
16 |
.collect(Collectors.toList());
|
17 |
this .threadPools = new CopyOnWriteArrayList<>(list);
|
18 |
metricRegistry.register(MetricRegistry.name(ProjectionMetrics. class , "queue" ), (Gauge<Double>) this ::averageQueueLength);
|
最后更新:2017-05-19 11:01:38