Hystrix Command执行以及熔断机制整理

作者 : 开心源码 本文共5269个字,预计阅读时间需要14分钟 发布时间: 2022-05-12 共167人阅读

这几天在看Hystrix的少量实现,里面大量运使用了rxjava的原理,将代码简化到了极致,对于有rxjava基础的同学,相信看懂Hystrix代码并不是一件难事。我这篇文章主要是针对Hystrix Command执行之后的一个数据流向以及熔断机制做了一个梳理和总结,后续还会出对于Hystrix组装command、超时机制、隔离机制等源代码实现进行一个梳理和总结。这篇文章权当做hystrix梳理的第一步,尽管感觉从代码执行顺序上这篇文章不应该是第一个,但是从这一块开始梳理我感觉可以更好的了解Hystrix中的消息流的一个概念(不得不感叹,有的人写代码就是再创造,而我写代码可能就是为了工作吧。。)

首先我们先理解一下Hystrix熔断的一个基本原理:

Hystrix熔断机制流程图(转自Spring MicroServices in Action)

当出现问题时,Hystrix会检查一个肯定时长(图中为10s)的一个时间窗(window),在这个时间窗内能否有足够多的请求,假如有足够多的请求,能否错误率已经达到阈值,假如达到则启动断路器熔断机制,这时再有请求过来就会直接到fallback路径。在断路器打开之后,会有一个sleep window(图中为5s),每经过一个sleep window,当有请求过来的时候,断路器会放掉一个请求给remote 服务,让它去试探下游服务能否已经恢复,假如成功,断路器会恢复到正常状态,让后续请求重新请求到remote 服务,否则,保持熔断状态。

这里我们就会考虑,我们应该怎样去实现这样一个window机制呢?通过ScheduledExecutorService和并发List以及累加器?我的确没想到比较好的方法,当我看了Hystrix的文档和实现之后,恍然大悟。它通过一个叫 metrics.rollingStats.numBuckets的属性,标明我们的window需要被拆分到多少个bucket(桶)中,对于我们上图的例子10s的window,我们设置5个桶的话,每个桶就是2s的时长。我们针对每个桶统计桶内的请求的一个情况(成功or失败),而后对于10s的一个时长window,我们只需组合连续的5个bucket就能得到一个window内的统计数据,就能做一个判断,当有新的bucket来的时候,我们在我们的短路器中只要要抛弃最老的bucket,把最新的bucket加进来,形成一个LinkedList,就能重新做统计了,这样也形成了一个滚动统计的计算模式。这样的样例很适合通过基于流和响应式的reactiveX框架来做,因而Hystrix也采使用了RxJava来实现。

在理解了Hystrix熔断的原理之后,我们上一个流程图(跟上面这图简直没法比,丑的想哭):

command执行完成后数据流向

这是我自己整理的一个流程图,途中黄色箭头方向为数据流向方向,其中椭圆框有包含关系是由于这几个类存在继承关系,即HealthCountsStream继承自BucketRollingCountersStream,而BucketRollingCounterStream继承自BucketCountersStream。这个图比较清晰的展示了在一个command执行完成之后,整个数据的流向。下面我将针对每一步的源代码,做下说明。

1. 首先AbstractCommand(HystrixCommand的父类)会发起一个handleCommandEnd方法调使用:

在方法执行完成之后调使用terminateCommandCleanUp调使用handleCommandEnd

2. 在调使用handleCommandEnd方法中,首先会取消掉timoutTimer,避免不必要的timout操作,接着形成一个executionResult,这个result可以看做包含了整个command运行周期所有信息,接着调使用HystrixCommandMetrics的markCommandDone操作

handleCommand内容

3. HystrixCommandMetrics这个类本身的使用途是作为所有Command的指标衡量的一个工具,在markCommandDone方法中直接调使用了HystrixThreadEventStream的executionDone方法

markCommandDone方法

4.在HystrixThreadEventStream中我们需要注意几个东西:首先这个getInstance是从threadLocal中获取一个HystrixThreadEventStream实例,因而我们知道对于一个线程都会有自己独立的HystrixThreadEventStream实例以及它自己的成员变量。

threadLocal公告getInstance方法

而后在executionDone方法中,会根据executionResult生成一个HystrixCommandCompletion事件,而后将它传递给一个叫wirteOnlyCommandCompletionSubject的实例成员变量。

在这个subject中通过doOnNext的callback最终把消息传递给了HystrixCommandCompletionStream

5. 这个HystrixCommandCompletionStream最终在哪里使用到找起来比较麻烦,但是我通过find usage还是找到了他的使用处:

在HealthCountsStream中用了它作为构造函数参数传入到super中,一路跟踪,我们找到了HealthCountsStream的源头:

在BucketedCounterStream(HealthCountersStream终极父类)中,对这个inputStream做了少量列变换,首先通过window操作将HystrixCommandCompletionStream中所得到的事件以bucketSizeInMs的时长分割成了多个子块,而后通过flatMap操作把他们reduce成了一个bucket的summary信息,并通过startWith操作将一段empty的summary列表作为了初始的消息流。

接着我们在BucketCounterStream的子类,也是HealthCounterStream的父类:BucketedRollingCounterStream中,我们看到了进一步的操作:

首先通过window操作将上一步解决好的以bucket为单位的消息流分割成以numBuckets为window长度,1个bucket为步长间隔的消息流(这里可能说的比较拗口,后面会通过图表进行解释),再通过flatMap将每个消息转换成以一个numBuckets长度的window内的summary信息。而flapMap中的reduceWindowToSummary函数,就是HealthCountsStream在构造的时候传入给父类的,因而,在这一步flatMap之后,得到的消息流就已经是HealthCountStream所指定的HealthCounts数据结构了。最终再通过observe方法把这个sourceStream暴露出去(中间有几步不涉及数据变化,所有就不开展了)

6. 最终我们可以看到在HystrixCircuitBreak的默认实现中:

我们可以看到断路器subscribe到了我们刚刚看到的HealthCountsStream,并且在onNext中针对每次发出的消息,通过判断window中的请求数量和错误比例来控制了断路器能否断开的逻辑。整个熔断机制分析也就到了数据的终点。

下面我们通过图表对之前的两个消息流的变换做下说明:

BucketCounterStream原理图

BucketRollingCountStream原理图

还有短路器断开情况下的恢复机制,这里可以略微说明下:

在之前的代码中我们可以看到:在断路器判定错误过多需要短路的时候会做以下操作:

短路操作

首先会将状态设置到OPEN, 而后在circuitOpened中设置当前的时间戳,这个变量在后面sleep window试探操作时会使用到。

在一个command执行之前,我们会通过下列代码进行执行前操作(command的组装预解决执行过程我会在后面的文章中讲解):

command执行前判断

我们进入这个方法看:

attemptExecution实现

我们可以看到,在这个方法中,我们经过少量列的判断,最终查看现在距离上次断路器断开时间能否已经过去了一个sleepWindow,假如是,则将status改为HALF_OPEN,即试探状态,并且通过cas操作保证了只有第一个request能够通过,后续request继续回到fallback执行路径下。

随着command的执行,我们在它complete或者者出错的时候会有下列操作:

命令执行后逻辑

可以看到,当命令正常完成,会调使用断路器的markSuccess操作,而异常到了fallback中则会执行markNonSuccess

标记操作

这里面的代码就比较简单了,当成功并且状态为HALF_OPEN的时候,重置stream,并且恢复断路器状态和circuitOpened到-1,假如失败则设置当前时间到circuitOpened,方便下一次sleepWindow操作。

下面是为了补充整个执行顺序、数据以及线程相关情况,我在少量关键节点上加了打印当前线程和数据的操作,并执行Hystrix core中本身的少量test例子,打出来的结果如下:

执行结果记录

从上图我们可以看到,本身Command的发起和HealthCounterStream的注册以及Timeout的发起都在main线程中进行,而command的run,直到BucketedCounterStream的一系列消息传递都发生在Hystrix为这个Command开拓的线程池中进行。而因为window操作符的关系,BucketedRollingCounterStream、HealthCountsStream和HystrixCircuitBreaker都在RxJava本身的computation线程池中进行。

一点点自己的总结:

Hystrix乃至于netflix团队对于RxJava这种响应式编程范式可以说已经运使用的出神入化,对于rxjava与目前传统编程范式所涉及到中间的migrate问题,Hystrix中都有比较好的实践,像操作符的组合,Subject的应使用等等,我也不禁觉得想学好rxjava,还是动手最重要。不过rxjava因为自身操作符lift的关系,在调试的时候可能会比较麻烦,特别是在单步的时候会发现有很长的call stack出现,这也是对于以后开发和运维是一个比较大的挑战。我看下来的感觉是尽量使用rxjava自己的api做到一个闭环,把问题域集中在rxjava内部,不要引入外部的机制,是比较好的实践方式。另外,对于我现在的工作,在少量场景中引入响应式编程可能也会让较复杂的场景简单化。由于响应式编程的目的是要将问题和逻辑都集中在信息流中,通过订阅与转换甚至是切换执行线程来完成业务逻辑和状态迁移,在逻辑上保持一个比较简单清晰的模型,并且很容易将业务进行分stage解决。

回到Hystrix本身,其实熔断的整个机制通过stream解决已经划分的比较清晰了,每个阶段会有自己的解决逻辑和转换方式,通过window来实现滚动统计也是非常巧妙。有一点我比较关心是最终写入是通过一个全局的SerializableStream做的write操作。而熟习rxjava的同学都知道,这种序列化的stream是通过emitter-loop来保证的串行访问,大致原理就是当有多个线程进入时,假如都没有开始emit,那先到的那个会取得权力emit,但这并不算完,后到的thread就把自己的event放置到同一个queue里面,即可以撤了,而先到的那个在emit完自己的event之后,需要检查当前queue能否有多的,假如有那你就把多的都emit了吧,谁让你先来的?直到发现现在queue也空了,你即可以走了。在这个期间假如也有thread进来一样也是放到queue里面。这样可以保证尽量不阻塞thread的同时保证串行,但是我一直都在思索,不会出现一个线程一直发而无法退出执行自己后面任务的情况么?不过我后来也在想,就算出现也没什么,由于在这个command自己的thread中,总需要有个thread来做这个事情。只需自己保证执行了onNext,解决了自己的请求,后面的就无所谓了。不知道这样的想法能否正确,可能需要做肯定的测试来进行佐证。

第一篇关于Hystrix的大致就是这样。欢迎有兴趣的同学一起探讨,中间有什么问题也希望大家能够指正,多多交流,共同进步!

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » Hystrix Command执行以及熔断机制整理

发表回复