SpringCloud集群服务-3
大约 12 分钟
Hystrix 工作流程
1、
package com.netflix.hystrix;
public abstract class HystrixCommand<R> extends AbstractCommand<R>
implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
// 【J.U.C原子引用】定义了当前要执行的线程对象
private final AtomicReference<Thread> executionThread = new AtomicReference<Thread>();
// 【J.U.C原子布尔】定义一个优雅的线程停止操作(Java就业编程实战这本书是有讲解的)
private final AtomicBoolean interruptOnFutureCancel = new AtomicBoolean(false);
protected abstract R run() throws Exception; // 定义了Hystrix的具体执行
protected R getFallback() { // 获取失败回退的操作
throw new UnsupportedOperationException("No fallback available.");
}
@Override
final protected Observable<R> getExecutionObservable() { // 执行请求
return Observable.defer(new Func0<Observable<R>>() { 创建数据响应
@Override
public Observable<R> call() {
try {
return Observable.just(run());// 请求发送
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() { // 订阅处理
@Override
public void call() {
executionThread.set(Thread.currentThread());// 保存当前的执行线程
}
});
}
@Override
final protected Observable<R> getFallbackObservable() { // 失败回退的订阅
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(getFallback());// 调用失败回退
} catch (Throwable ex) {
return Observable.error(ex); // 响应错误状态
}
}
});
}
public R execute() { // 具体操作的执行
try {
return queue().get();// 按照队列的方式来执行,所有的调用为异步处理
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
public Future<R> queue() {} // 异步处理队列
@Override
protected String getFallbackMethodName() { // 获取失败回退的方法名称
return "getFallback";
}
@Override
protected boolean isFallbackUserDefined() { // 环境的判断
Boolean containsFromMap = commandContainsFallback.get(commandKey);
if (containsFromMap != null) {
return containsFromMap;
} else {
Boolean toInsertIntoMap;
try {
getClass().getDeclaredMethod("getFallback");
toInsertIntoMap = true;
} catch (NoSuchMethodException nsme) {
toInsertIntoMap = false;
}
commandContainsFallback.put(commandKey, toInsertIntoMap);
return toInsertIntoMap;
}
}
@Override
protected boolean commandIsScalar() {
return true;
}
}
2、
package com.netflix.hystrix.contrib.javanica.command;
import rx.Observable;
import java.util.concurrent.Future;
public enum ExecutionType {
ASYNCHRONOUS, // 异步处理(queue)
SYNCHRONOUS, // 同步处理(execute)
OBSERVABLE; // 响应式处理(observer)
public static ExecutionType getExecutionType(Class<?> type) {
if (Future.class.isAssignableFrom(type)) {
return ExecutionType.ASYNCHRONOUS;
} else if (Observable.class.isAssignableFrom(type)) {
return ExecutionType.OBSERVABLE;
} else {
return ExecutionType.SYNCHRONOUS;
}
}
}
3、
package com.netflix.hystrix.contrib.javanica.command;
public class CommandExecutor {
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException { // 具体的命令执行
Validate.notNull(invokable);
Validate.notNull(metaHolder);
switch (executionType) { // 判断当前的执行类型
case SYNCHRONOUS: { // 同步的处理
return castToExecutable(invokable, executionType).execute();
}
case ASYNCHRONOUS: { // 是否为异步处理
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
case OBSERVABLE: { // 是否为响应式的调用
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
if (invokable instanceof HystrixExecutable) {
return (HystrixExecutable) invokable;
}
throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
}
private static HystrixObservable castToObservable(HystrixInvokable invokable) {
if (invokable instanceof HystrixObservable) {
return (HystrixObservable) invokable;
}
throw new RuntimeException("Command should implement " + HystrixObservable.class.getCanonicalName() + " interface to execute in observable mode");
}
}
4、
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.ReactiveHystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
HystrixAutoConfiguration
1、
package org.springframework.cloud.netflix.hystrix;
import org.springframework.web.reactive.DispatcherHandler; // 响应式编程处理类
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ Hystrix.class, HealthIndicator.class,
HealthContributorAutoConfiguration.class }) // 微服务健康配置
@AutoConfigureAfter({ HealthContributorAutoConfiguration.class })
public class HystrixAutoConfiguration {
@Bean
@ConditionalOnEnabledHealthIndicator("hystrix")
public HystrixHealthIndicator hystrixHealthIndicator() { // 微服务的统计数据
return new HystrixHealthIndicator();
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(value = "management.metrics.binders.hystrix.enabled",
matchIfMissing = true)
@ConditionalOnClass({ HystrixMetricsBinder.class })
protected static class HystrixMetricsConfiguration { // 微服务度量配置
@Bean
public HystrixMetricsBinder hystrixMetricsBinder() { // 接收整个的调用的统计信息
return new HystrixMetricsBinder();
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnWebApplication(type = SERVLET)
@ConditionalOnBean(HystrixCommandAspect.class) // only install the stream if enabled
@ConditionalOnClass({ HystrixMetricsStreamServlet.class })
@EnableConfigurationProperties(HystrixProperties.class)
protected static class HystrixServletAutoConfiguration {
@Bean
@ConditionalOnAvailableEndpoint
public HystrixStreamEndpoint hystrixStreamEndpoint(HystrixProperties properties) {
// 在微服务里面可以通过“hystrix.stream”进行服务数据的监控(要提供有Actuator端点)
return new HystrixStreamEndpoint(properties.getConfig()); // 监控端点
}
@Bean
public HasFeatures hystrixStreamFeature() {
return HasFeatures.namedFeature("Hystrix Stream Servlet",
HystrixMetricsStreamServlet.class);
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnWebApplication(type = REACTIVE)
@ConditionalOnBean(HystrixCommandAspect.class) // only install the stream if enabled
@ConditionalOnClass({ DispatcherHandler.class }) // 基于响应式的编程模型
@EnableConfigurationProperties(HystrixProperties.class)
protected static class HystrixWebfluxManagementContextConfiguration {
@Bean
@ConditionalOnAvailableEndpoint
public HystrixWebfluxEndpoint hystrixWebfluxController() {
Observable<String> serializedDashboardData = HystrixDashboardStream
.getInstance().observe()
.concatMap(dashboardData -> Observable.from(SerialHystrixDashboardData
.toMultipleJsonStrings(dashboardData)));
Publisher<String> publisher = RxReactiveStreams
.toPublisher(serializedDashboardData);
return new HystrixWebfluxEndpoint(publisher); // 响应式的断点
}
@Bean
public HasFeatures hystrixStreamFeature() {
return HasFeatures.namedFeature("Hystrix Stream Webflux",
HystrixWebfluxEndpoint.class);
}
}
}
HystrixCircuitBreakerAutoConfiguration
1、
package org.springframework.cloud.netflix.hystrix;
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ Hystrix.class })
@ConditionalOnProperty(name = "spring.cloud.circuitbreaker.hystrix.enabled",
matchIfMissing = true) // 没有配置此项的时候也是启用的
public class HystrixCircuitBreakerAutoConfiguration {
@Autowired(required = false) // Hystrix熔断工厂类集合
private List<Customizer<HystrixCircuitBreakerFactory>> customizers = new ArrayList<>();
@Bean
@ConditionalOnMissingBean(CircuitBreakerFactory.class)
public CircuitBreakerFactory hystrixCircuitBreakerFactory() { // 熔断工厂类创建
HystrixCircuitBreakerFactory factory = new HystrixCircuitBreakerFactory();
customizers.forEach(customizer -> customizer.customize(factory));
return factory;
}
}
2、
package com.netflix.hystrix;
public abstract class HystrixCommandProperties {
private static final Logger logger =
LoggerFactory.getLogger(HystrixCommandProperties.class);
/* defaults */
/* package */ static final Integer default_metricsRollingStatisticalWindow = 10000;// default => statisticalWindow: 10000 = 10 seconds (and default of 10 buckets so each bucket is 1 second)
private static final Integer default_metricsRollingStatisticalWindowBuckets = 10;// default => statisticalWindowBuckets: 10 = 10 buckets in a 10 second window so each bucket is 1 second
private static final Integer default_circuitBreakerRequestVolumeThreshold = 20;// default => statisticalWindowVolumeThreshold: 20 requests in 10 seconds must occur before statistics matter
private static final Integer default_circuitBreakerSleepWindowInMilliseconds = 5000;// default => sleepWindow: 5000 = 5 seconds that we will sleep before trying again after tripping the circuit
private static final Integer default_circuitBreakerErrorThresholdPercentage = 50;// default => errorThresholdPercentage = 50 = if 50%+ of requests in 10 seconds are failures or latent then we will trip the circuit
private static final Boolean default_circuitBreakerForceOpen = false;// default => forceCircuitOpen = false (we want to allow traffic)
/* package */ static final Boolean default_circuitBreakerForceClosed = false;// default => ignoreErrors = false
private static final Integer default_executionTimeoutInMilliseconds = 1000; // default => executionTimeoutInMilliseconds: 1000 = 1 second
private static final Boolean default_executionTimeoutEnabled = true;
private static final ExecutionIsolationStrategy default_executionIsolationStrategy =
ExecutionIsolationStrategy.THREAD;
private static final Boolean default_executionIsolationThreadInterruptOnTimeout = true;
private static final Boolean default_executionIsolationThreadInterruptOnFutureCancel = false;
private static final Boolean default_metricsRollingPercentileEnabled = true;
private static final Boolean default_requestCacheEnabled = true;
private static final Integer default_fallbackIsolationSemaphoreMaxConcurrentRequests = 10;
private static final Boolean default_fallbackEnabled = true;
private static final Integer default_executionIsolationSemaphoreMaxConcurrentRequests = 10;
private static final Boolean default_requestLogEnabled = true;
private static final Boolean default_circuitBreakerEnabled = true;
private static final Integer default_metricsRollingPercentileWindow = 60000; // default to 1 minute for RollingPercentile
private static final Integer default_metricsRollingPercentileWindowBuckets = 6; // default to 6 buckets (10 seconds each in 60 second window)
private static final Integer default_metricsRollingPercentileBucketSize = 100; // default to 100 values max per bucket
private static final Integer default_metricsHealthSnapshotIntervalInMilliseconds = 500; // default to 500ms as max frequency between allowing snapshots of health (error percentage etc)
@SuppressWarnings("unused") private final HystrixCommandKey key;
private final HystrixProperty<Integer> circuitBreakerRequestVolumeThreshold; // number of requests that must be made within a statisticalWindow before open/close decisions are made using stats
private final HystrixProperty<Integer> circuitBreakerSleepWindowInMilliseconds; // milliseconds after tripping circuit before allowing retry
private final HystrixProperty<Boolean> circuitBreakerEnabled; // Whether circuit breaker should be enabled.
private final HystrixProperty<Integer> circuitBreakerErrorThresholdPercentage; // % of 'marks' that must be failed to trip the circuit
private final HystrixProperty<Boolean> circuitBreakerForceOpen; // a property to allow forcing the circuit open (stopping all requests)
private final HystrixProperty<Boolean> circuitBreakerForceClosed; // a property to allow ignoring errors and therefore never trip 'open' (ie. allow all traffic through)
private final HystrixProperty<ExecutionIsolationStrategy> executionIsolationStrategy; // Whether a command should be executed in a separate thread or not.
private final HystrixProperty<Integer> executionTimeoutInMilliseconds; // Timeout value in milliseconds for a command
private final HystrixProperty<Boolean> executionTimeoutEnabled; //Whether timeout should be triggered
private final HystrixProperty<String> executionIsolationThreadPoolKeyOverride; // What thread-pool this command should run in (if running on a separate thread).
private final HystrixProperty<Integer> executionIsolationSemaphoreMaxConcurrentRequests; // Number of permits for execution semaphore
private final HystrixProperty<Integer> fallbackIsolationSemaphoreMaxConcurrentRequests; // Number of permits for fallback semaphore
private final HystrixProperty<Boolean> fallbackEnabled; // Whether fallback should be attempted.
private final HystrixProperty<Boolean> executionIsolationThreadInterruptOnTimeout; // Whether an underlying Future/Thread (when runInSeparateThread == true) should be interrupted after a timeout
private final HystrixProperty<Boolean> executionIsolationThreadInterruptOnFutureCancel; // Whether canceling an underlying Future/Thread (when runInSeparateThread == true) should interrupt the execution thread
private final HystrixProperty<Integer> metricsRollingStatisticalWindowInMilliseconds; // milliseconds back that will be tracked
private final HystrixProperty<Integer> metricsRollingStatisticalWindowBuckets; // number of buckets in the statisticalWindow
private final HystrixProperty<Boolean> metricsRollingPercentileEnabled; // Whether monitoring should be enabled (SLA and Tracers).
private final HystrixProperty<Integer> metricsRollingPercentileWindowInMilliseconds; // number of milliseconds that will be tracked in RollingPercentile
private final HystrixProperty<Integer> metricsRollingPercentileWindowBuckets; // number of buckets percentileWindow will be divided into
private final HystrixProperty<Integer> metricsRollingPercentileBucketSize; // how many values will be stored in each percentileWindowBucket
private final HystrixProperty<Integer> metricsHealthSnapshotIntervalInMilliseconds; // time between health snapshots
private final HystrixProperty<Boolean> requestLogEnabled; // whether command request logging is enabled.
private final HystrixProperty<Boolean> requestCacheEnabled; // Whether request caching is enabled.
public static enum ExecutionIsolationStrategy {
THREAD, SEMAPHORE // 线程的资源隔离以及信号量的资源隔离
}
}
HystrixCircuitBreakerConfiguration
1、
package org.springframework.cloud.netflix.hystrix;
@Configuration(proxyBeanMethods = false)
public class HystrixCircuitBreakerConfiguration {
@Bean
public HystrixCommandAspect hystrixCommandAspect() { // 此时提供了Hystrix执行切面定义
return new HystrixCommandAspect();
}
@Bean
public HystrixShutdownHook hystrixShutdownHook() { // Hystrix关闭钩子处理
return new HystrixShutdownHook();
}
@Bean
public HasFeatures hystrixFeature() {
return HasFeatures
.namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
}
private class HystrixShutdownHook implements DisposableBean { // 关闭处理
@Override
public void destroy() throws Exception {
Hystrix.reset();// 进行了Hystrix线程池的释放
}
}
}
2、
package org.springframework.cloud.netflix.hystrix;
@Configuration(proxyBeanMethods = false)
public class HystrixCircuitBreakerConfiguration {
@Bean
public HystrixCommandAspect hystrixCommandAspect() { // 此时提供了Hystrix执行切面定义
return new HystrixCommandAspect();
}
@Bean
public HystrixShutdownHook hystrixShutdownHook() { // Hystrix关闭钩子处理
return new HystrixShutdownHook();
}
@Bean
public HasFeatures hystrixFeature() {
return HasFeatures
.namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
}
private class HystrixShutdownHook implements DisposableBean { // 关闭处理
@Override
public void destroy() throws Exception {
Hystrix.reset();// 进行了Hystrix线程池的释放
}
}
}
package com.netflix.hystrix;
public interface HystrixThreadPool {
public ExecutorService getExecutor();// 线程池处理
public Scheduler getScheduler();// 调度任务
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread);
public void markThreadExecution();// 线程池开始执行前的标记
public void markThreadCompletion();// 线程执行完毕的标记
public void markThreadRejection();// 拒绝标记
public boolean isQueueSpaceAvailable();// 是否有可用队列空间
}
3、
public abstract class HystrixThreadPoolProperties {
/* defaults */
static int default_coreSize = 10; // core size of thread pool
static int default_maximumSize = 10; // maximum size of thread pool
static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive
static int default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
// -1 turns it off and makes us use SynchronousQueue
static boolean default_allow_maximum_size_to_diverge_from_core_size = false; //should the maximumSize config value get read and used in configuring the threadPool
//turning this on should be a conscious decision by the user, so we default it to false
static int default_queueSizeRejectionThreshold = 5; // number of items in queue
static int default_threadPoolRollingNumberStatisticalWindow = 10000; // milliseconds for rolling number
static int default_threadPoolRollingNumberStatisticalWindowBuckets = 10; // number of buckets in rolling number (10 1-second buckets)
}
HystrixCircuitBreaker
1、
package com.netflix.hystrix;
public interface HystrixCircuitBreaker {
public static class Factory {
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>(); // 熔断器的缓存
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
// 判断是否不是第一次访问
HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
if (previouslyCached != null) { // 获取到了中断器
return previouslyCached; // 直接返回当前的中断器即可
}
HystrixCircuitBreaker cbForCommand =
circuitBreakersByCommand.putIfAbsent(key.name(),
new HystrixCircuitBreakerImpl(key, group, properties, metrics));
if (cbForCommand == null) { // 创建的中断器为空
return circuitBreakersByCommand.get(key.name());// 返回集合中的数据
} else {
return cbForCommand; // 返回创建完成的中断器
}
}
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {
return circuitBreakersByCommand.get(key.name());
}
static void reset() {
circuitBreakersByCommand.clear();
}
}
}
2、
package com.netflix.hystrix;
public interface HystrixCircuitBreaker {
static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { // 中断器实现类
private final HystrixCommandProperties properties; // 获取Hystrix属性,得到阈值
private final HystrixCommandMetrics metrics; // Hystrix度量统计
private AtomicBoolean circuitOpen = new AtomicBoolean(false); // 中断开关
private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();// 半开的检查
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
}
public void markSuccess() { // 调用成功的标记
if (circuitOpen.get()) { // 判断中断器状态(开启状态)
if (circuitOpen.compareAndSet(true, false)) { // CAS操作
metrics.resetStream();// 度量的重置
}
}
}
@Override
public boolean allowRequest() { // 允许访问的控制
if (properties.circuitBreakerForceOpen().get()) { // 强制性中断开启配置
return false; // 不允许
}
if (properties.circuitBreakerForceClosed().get()) { // 强制性中断关闭配置
isOpen();
return true; // 不启用中断器
}
return !isOpen() || allowSingleTest();// 测试以及打开的控制
}
public boolean allowSingleTest() { // 根据度量的结果来测试是否允许关闭中断器(半开)
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
// If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
// if this returns true that means we set the time so we'll return true to allow the singleTest
// if it returned false it means another thread raced us and allowed the singleTest before we did
return true;
}
}
return false;
}
@Override
public boolean isOpen() { // 开启状态判断
if (circuitOpen.get()) { // 中断器状态
// if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
return true;
}
// we're closed, so let's see if errors have made us so we should trip the circuit open
HealthCounts health = metrics.getHealthCounts();// 健康统计
// check if we are past the statisticalWindowVolumeThreshold
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
return false;
}
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// our failure rate is too high, trip the circuit
if (circuitOpen.compareAndSet(false, true)) {
// if the previousValue was false then we want to set the currentTime
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
// How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
// caused another thread to set it to true already even though we were in the process of doing the same
// In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
return true;
}
}
}
}
}
AbstractCommand
1、
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {}
2、
protected final HystrixCircuitBreaker circuitBreaker; // Hystrix中断器
protected final HystrixThreadPool threadPool; // Hystrix线程池
protected final HystrixThreadPoolKey threadPoolKey; // 线程池的KEY
protected final HystrixCommandProperties properties; // Hystrix属性
protected final HystrixCommandMetrics metrics; // Hystrix度量器
3、
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
executionHook.onStart(_cmd); // 调用钩子处理
if (circuitBreaker.allowRequest()) { // 中断器的状态判断
final TryableSemaphore executionSemaphore = getExecutionSemaphore();// 信号量
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); // 释放标记
final Action0 singleSemaphoreRelease = new Action0() { // 释放处理类
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) { // 释放的配置
executionSemaphore.release(); // 释放处理
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) { // 异常处理
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
if (executionSemaphore.tryAcquire()) { // 获取信号量
try {
executionResult = executionResult
.setInvocationStartTime(System.currentTimeMillis());// 调用开始时间
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
4、
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext =
HystrixRequestContext.getContextForCurrentThread();// 当前请求上下文
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT); // 处理标记
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey); // 事件处理
}
if (commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();// 成功标记
}
}
};
final Action0 markOnCompleted = new Action0() { // 操作完成
@Override
public void call() {
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
}
}
};
final Func1<Throwable, Observable<R>> handleFallback =
new Func1<Throwable, Observable<R>>() { // 失败回退处理
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t); // 获取异常类型
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) { // 线程池拒绝
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) { // Hystrix超时
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) { // 请求失败
return handleBadRequestByEmittingError(e);
} else {
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
final Action1<Notification<? super R>> setRequestContext =
new Action1<Notification<? super R>>() { // 设置请求上下文
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
5、
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// 判断当前采用的资源隔离策略是线程池还是信号量,此时的判断是线程池的隔离
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); // 度量操作
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { // 超时处理
return Observable.error(new RuntimeException("timed out before executing run()"));
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD,
ThreadState.STARTED)) { // 线程状态转换
HystrixCounters.incrementGlobalConcurrentThreads();// 中断器的计数
threadPool.markThreadExecution();// 线程执行
// 保存当前执行操作的Hystrix线程
endCurrentThreadExecutingCommand =
Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();// 执行结果
try {
executionHook.onThreadStart(_cmd); // 执行的钩子处理
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else { // command has already been unsubscribed, so return immediately
return Observable.error(
new RuntimeException("unsubscribed before executing run()"));
}
}
}).doOnTerminate(new Action0() { // 执行中断
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
//if it was never started and received terminal, then no need to clean up (I don't think this is possible)
}
//if it was unsubscribed, then other cleanup handled it
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
//if it was never started and was cancelled, then no need to clean up
}
//if it was terminal, then other cleanup handled it
}
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
} else { // 信号量处理
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
return Observable.error(ex);
}
}
});
}
}
demo