跳至主要內容

SpringCloud集群服务-3

wangdx大约 12 分钟

Hystrix 工作流程

1、
package com.netflix.hystrix;
public abstract class HystrixCommand<R> extends AbstractCommand<R>
implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
// 【J.U.C原子引用】定义了当前要执行的线程对象
    private final AtomicReference<Thread> executionThread = new AtomicReference<Thread>();
// 【J.U.C原子布尔】定义一个优雅的线程停止操作(Java就业编程实战这本书是有讲解的)
    private final AtomicBoolean interruptOnFutureCancel = new AtomicBoolean(false);
    protected abstract R run() throws Exception; // 定义了Hystrix的具体执行
    protected R getFallback() { // 获取失败回退的操作
        throw new UnsupportedOperationException("No fallback available.");
    }
    @Override
    final protected Observable<R> getExecutionObservable() { // 执行请求
        return Observable.defer(new Func0<Observable<R>>() { 创建数据响应
            @Override
            public Observable<R> call() {
                try {
                    return Observable.just(run());// 请求发送
                } catch (Throwable ex) {
                    return Observable.error(ex);
                }
            }
        }).doOnSubscribe(new Action0() { // 订阅处理
            @Override
            public void call() {
                executionThread.set(Thread.currentThread());// 保存当前的执行线程
            }
        });
    }
    @Override
    final protected Observable<R> getFallbackObservable() { // 失败回退的订阅
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                try {
                    return Observable.just(getFallback());// 调用失败回退
                } catch (Throwable ex) {
                    return Observable.error(ex); // 响应错误状态
                }
            }
        });
    }
    public R execute() { // 具体操作的执行
        try {
            return queue().get();// 按照队列的方式来执行,所有的调用为异步处理
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }
    public Future<R> queue() {} // 异步处理队列
    @Override
    protected String getFallbackMethodName() { // 获取失败回退的方法名称
        return "getFallback";
    }
    @Override
    protected boolean isFallbackUserDefined() { // 环境的判断
        Boolean containsFromMap = commandContainsFallback.get(commandKey);
        if (containsFromMap != null) {
            return containsFromMap;
        } else {
            Boolean toInsertIntoMap;
            try {
                getClass().getDeclaredMethod("getFallback");
                toInsertIntoMap = true;
            } catch (NoSuchMethodException nsme) {
                toInsertIntoMap = false;
            }
            commandContainsFallback.put(commandKey, toInsertIntoMap);
            return toInsertIntoMap;
        }
    }
    @Override
    protected boolean commandIsScalar() {
        return true;
    }
}


2、
package com.netflix.hystrix.contrib.javanica.command;
import rx.Observable;
import java.util.concurrent.Future;
public enum ExecutionType {
    ASYNCHRONOUS,  // 异步处理(queue)
    SYNCHRONOUS,  // 同步处理(execute)
    OBSERVABLE;  // 响应式处理(observer)
    public static ExecutionType getExecutionType(Class<?> type) {
        if (Future.class.isAssignableFrom(type)) {
            return ExecutionType.ASYNCHRONOUS;
        } else if (Observable.class.isAssignableFrom(type)) {
            return ExecutionType.OBSERVABLE;
        } else {
            return ExecutionType.SYNCHRONOUS;
        }
    }
}


3、
package com.netflix.hystrix.contrib.javanica.command;
public class CommandExecutor {
    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException { // 具体的命令执行
        Validate.notNull(invokable);
        Validate.notNull(metaHolder);
        switch (executionType) { // 判断当前的执行类型
            case SYNCHRONOUS: { // 同步的处理
                return castToExecutable(invokable, executionType).execute();
            }
            case ASYNCHRONOUS: { // 是否为异步处理
                HystrixExecutable executable = castToExecutable(invokable, executionType);
                if (metaHolder.hasFallbackMethodCommand()
                        && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                    return new FutureDecorator(executable.queue());
                }
                return executable.queue();
            }
            case OBSERVABLE: { // 是否为响应式的调用
                HystrixObservable observable = castToObservable(invokable);
                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
            }
            default:
                throw new RuntimeException("unsupported execution type: " + executionType);
        }
    }
    private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
        if (invokable instanceof HystrixExecutable) {
            return (HystrixExecutable) invokable;
        }
        throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
    }
    private static HystrixObservable castToObservable(HystrixInvokable invokable) {
        if (invokable instanceof HystrixObservable) {
            return (HystrixObservable) invokable;
        }
        throw new RuntimeException("Command should implement " + HystrixObservable.class.getCanonicalName() + " interface to execute in observable mode");
    }
}


4、
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.ReactiveHystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration

org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration

HystrixAutoConfiguration

1、
package org.springframework.cloud.netflix.hystrix;
import org.springframework.web.reactive.DispatcherHandler; // 响应式编程处理类
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ Hystrix.class, HealthIndicator.class,
      HealthContributorAutoConfiguration.class }) // 微服务健康配置
@AutoConfigureAfter({ HealthContributorAutoConfiguration.class })
public class HystrixAutoConfiguration {
   @Bean
   @ConditionalOnEnabledHealthIndicator("hystrix")
   public HystrixHealthIndicator hystrixHealthIndicator() { // 微服务的统计数据
      return new HystrixHealthIndicator();
   }
   @Configuration(proxyBeanMethods = false)
   @ConditionalOnProperty(value = "management.metrics.binders.hystrix.enabled",
         matchIfMissing = true)
   @ConditionalOnClass({ HystrixMetricsBinder.class })
   protected static class HystrixMetricsConfiguration { // 微服务度量配置
      @Bean
      public HystrixMetricsBinder hystrixMetricsBinder() { // 接收整个的调用的统计信息
         return new HystrixMetricsBinder();
      }
   }
   @Configuration(proxyBeanMethods = false)
   @ConditionalOnWebApplication(type = SERVLET)
   @ConditionalOnBean(HystrixCommandAspect.class) // only install the stream if enabled
   @ConditionalOnClass({ HystrixMetricsStreamServlet.class })
   @EnableConfigurationProperties(HystrixProperties.class)
   protected static class HystrixServletAutoConfiguration {
      @Bean
      @ConditionalOnAvailableEndpoint
      public HystrixStreamEndpoint hystrixStreamEndpoint(HystrixProperties properties) {
// 在微服务里面可以通过“hystrix.stream”进行服务数据的监控(要提供有Actuator端点)
         return new HystrixStreamEndpoint(properties.getConfig()); // 监控端点
      }
      @Bean
      public HasFeatures hystrixStreamFeature() {
         return HasFeatures.namedFeature("Hystrix Stream Servlet",
               HystrixMetricsStreamServlet.class);
      }
   }
   @Configuration(proxyBeanMethods = false)
   @ConditionalOnWebApplication(type = REACTIVE)
   @ConditionalOnBean(HystrixCommandAspect.class) // only install the stream if enabled
   @ConditionalOnClass({ DispatcherHandler.class }) // 基于响应式的编程模型
   @EnableConfigurationProperties(HystrixProperties.class)
   protected static class HystrixWebfluxManagementContextConfiguration {
      @Bean
      @ConditionalOnAvailableEndpoint
      public HystrixWebfluxEndpoint hystrixWebfluxController() {
         Observable<String> serializedDashboardData = HystrixDashboardStream
               .getInstance().observe()
               .concatMap(dashboardData -> Observable.from(SerialHystrixDashboardData
                     .toMultipleJsonStrings(dashboardData)));
         Publisher<String> publisher = RxReactiveStreams
               .toPublisher(serializedDashboardData);
         return new HystrixWebfluxEndpoint(publisher); // 响应式的断点
      }
      @Bean
      public HasFeatures hystrixStreamFeature() {
         return HasFeatures.namedFeature("Hystrix Stream Webflux",
               HystrixWebfluxEndpoint.class);
      }
   }
}

HystrixCircuitBreakerAutoConfiguration

1、
package org.springframework.cloud.netflix.hystrix;
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ Hystrix.class })
@ConditionalOnProperty(name = "spring.cloud.circuitbreaker.hystrix.enabled",
      matchIfMissing = true) // 没有配置此项的时候也是启用的
public class HystrixCircuitBreakerAutoConfiguration {
   @Autowired(required = false) // Hystrix熔断工厂类集合
   private List<Customizer<HystrixCircuitBreakerFactory>> customizers = new ArrayList<>();
   @Bean
   @ConditionalOnMissingBean(CircuitBreakerFactory.class)
   public CircuitBreakerFactory hystrixCircuitBreakerFactory() { // 熔断工厂类创建
      HystrixCircuitBreakerFactory factory = new HystrixCircuitBreakerFactory();
      customizers.forEach(customizer -> customizer.customize(factory));
      return factory;
   }
}


2、
package com.netflix.hystrix;
public abstract class HystrixCommandProperties {
    private static final Logger logger =
			LoggerFactory.getLogger(HystrixCommandProperties.class);
    /* defaults */
    /* package */ static final Integer default_metricsRollingStatisticalWindow = 10000;// default => statisticalWindow: 10000 = 10 seconds (and default of 10 buckets so each bucket is 1 second)
    private static final Integer default_metricsRollingStatisticalWindowBuckets = 10;// default => statisticalWindowBuckets: 10 = 10 buckets in a 10 second window so each bucket is 1 second
    private static final Integer default_circuitBreakerRequestVolumeThreshold = 20;// default => statisticalWindowVolumeThreshold: 20 requests in 10 seconds must occur before statistics matter
    private static final Integer default_circuitBreakerSleepWindowInMilliseconds = 5000;// default => sleepWindow: 5000 = 5 seconds that we will sleep before trying again after tripping the circuit
    private static final Integer default_circuitBreakerErrorThresholdPercentage = 50;// default => errorThresholdPercentage = 50 = if 50%+ of requests in 10 seconds are failures or latent then we will trip the circuit
    private static final Boolean default_circuitBreakerForceOpen = false;// default => forceCircuitOpen = false (we want to allow traffic)
    /* package */ static final Boolean default_circuitBreakerForceClosed = false;// default => ignoreErrors = false
    private static final Integer default_executionTimeoutInMilliseconds = 1000; // default => executionTimeoutInMilliseconds: 1000 = 1 second
    private static final Boolean default_executionTimeoutEnabled = true;
    private static final ExecutionIsolationStrategy default_executionIsolationStrategy =
ExecutionIsolationStrategy.THREAD;
    private static final Boolean default_executionIsolationThreadInterruptOnTimeout = true;
    private static final Boolean default_executionIsolationThreadInterruptOnFutureCancel = false;
    private static final Boolean default_metricsRollingPercentileEnabled = true;
    private static final Boolean default_requestCacheEnabled = true;
    private static final Integer default_fallbackIsolationSemaphoreMaxConcurrentRequests = 10;
    private static final Boolean default_fallbackEnabled = true;
    private static final Integer default_executionIsolationSemaphoreMaxConcurrentRequests = 10;
    private static final Boolean default_requestLogEnabled = true;
    private static final Boolean default_circuitBreakerEnabled = true;
    private static final Integer default_metricsRollingPercentileWindow = 60000; // default to 1 minute for RollingPercentile
    private static final Integer default_metricsRollingPercentileWindowBuckets = 6; // default to 6 buckets (10 seconds each in 60 second window)
    private static final Integer default_metricsRollingPercentileBucketSize = 100; // default to 100 values max per bucket
    private static final Integer default_metricsHealthSnapshotIntervalInMilliseconds = 500; // default to 500ms as max frequency between allowing snapshots of health (error percentage etc)
    @SuppressWarnings("unused") private final HystrixCommandKey key;
    private final HystrixProperty<Integer> circuitBreakerRequestVolumeThreshold; // number of requests that must be made within a statisticalWindow before open/close decisions are made using stats
    private final HystrixProperty<Integer> circuitBreakerSleepWindowInMilliseconds; // milliseconds after tripping circuit before allowing retry
    private final HystrixProperty<Boolean> circuitBreakerEnabled; // Whether circuit breaker should be enabled.
    private final HystrixProperty<Integer> circuitBreakerErrorThresholdPercentage; // % of 'marks' that must be failed to trip the circuit
    private final HystrixProperty<Boolean> circuitBreakerForceOpen; // a property to allow forcing the circuit open (stopping all requests)
    private final HystrixProperty<Boolean> circuitBreakerForceClosed; // a property to allow ignoring errors and therefore never trip 'open' (ie. allow all traffic through)
    private final HystrixProperty<ExecutionIsolationStrategy> executionIsolationStrategy; // Whether a command should be executed in a separate thread or not.
    private final HystrixProperty<Integer> executionTimeoutInMilliseconds; // Timeout value in milliseconds for a command
    private final HystrixProperty<Boolean> executionTimeoutEnabled; //Whether timeout should be triggered
    private final HystrixProperty<String> executionIsolationThreadPoolKeyOverride; // What thread-pool this command should run in (if running on a separate thread).
    private final HystrixProperty<Integer> executionIsolationSemaphoreMaxConcurrentRequests; // Number of permits for execution semaphore
    private final HystrixProperty<Integer> fallbackIsolationSemaphoreMaxConcurrentRequests; // Number of permits for fallback semaphore
    private final HystrixProperty<Boolean> fallbackEnabled; // Whether fallback should be attempted.
    private final HystrixProperty<Boolean> executionIsolationThreadInterruptOnTimeout; // Whether an underlying Future/Thread (when runInSeparateThread == true) should be interrupted after a timeout
    private final HystrixProperty<Boolean> executionIsolationThreadInterruptOnFutureCancel; // Whether canceling an underlying Future/Thread (when runInSeparateThread == true) should interrupt the execution thread
    private final HystrixProperty<Integer> metricsRollingStatisticalWindowInMilliseconds; // milliseconds back that will be tracked
    private final HystrixProperty<Integer> metricsRollingStatisticalWindowBuckets; // number of buckets in the statisticalWindow
    private final HystrixProperty<Boolean> metricsRollingPercentileEnabled; // Whether monitoring should be enabled (SLA and Tracers).
    private final HystrixProperty<Integer> metricsRollingPercentileWindowInMilliseconds; // number of milliseconds that will be tracked in RollingPercentile
    private final HystrixProperty<Integer> metricsRollingPercentileWindowBuckets; // number of buckets percentileWindow will be divided into
    private final HystrixProperty<Integer> metricsRollingPercentileBucketSize; // how many values will be stored in each percentileWindowBucket
    private final HystrixProperty<Integer> metricsHealthSnapshotIntervalInMilliseconds; // time between health snapshots
    private final HystrixProperty<Boolean> requestLogEnabled; // whether command request logging is enabled.
    private final HystrixProperty<Boolean> requestCacheEnabled; // Whether request caching is enabled.
    public static enum ExecutionIsolationStrategy {
        THREAD, SEMAPHORE	// 线程的资源隔离以及信号量的资源隔离
     }
}

HystrixCircuitBreakerConfiguration

1、

package org.springframework.cloud.netflix.hystrix;
@Configuration(proxyBeanMethods = false)
public class HystrixCircuitBreakerConfiguration {
   @Bean
   public HystrixCommandAspect hystrixCommandAspect() {  // 此时提供了Hystrix执行切面定义
      return new HystrixCommandAspect();
   }
   @Bean
   public HystrixShutdownHook hystrixShutdownHook() { // Hystrix关闭钩子处理
      return new HystrixShutdownHook();
   }
   @Bean
   public HasFeatures hystrixFeature() {
      return HasFeatures
            .namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
   }
   private class HystrixShutdownHook implements DisposableBean { // 关闭处理
      @Override
      public void destroy() throws Exception {
         Hystrix.reset();// 进行了Hystrix线程池的释放
      }
   }
}

2、
package org.springframework.cloud.netflix.hystrix;
@Configuration(proxyBeanMethods = false)
public class HystrixCircuitBreakerConfiguration {
   @Bean
   public HystrixCommandAspect hystrixCommandAspect() {  // 此时提供了Hystrix执行切面定义
      return new HystrixCommandAspect();
   }
   @Bean
   public HystrixShutdownHook hystrixShutdownHook() { // Hystrix关闭钩子处理
      return new HystrixShutdownHook();
   }
   @Bean
   public HasFeatures hystrixFeature() {
      return HasFeatures
            .namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
   }
   private class HystrixShutdownHook implements DisposableBean { // 关闭处理
      @Override
      public void destroy() throws Exception {
         Hystrix.reset();// 进行了Hystrix线程池的释放
      }
   }
}
package com.netflix.hystrix;
public interface HystrixThreadPool {
    public ExecutorService getExecutor();// 线程池处理
    public Scheduler getScheduler();// 调度任务
    public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread);
    public void markThreadExecution();// 线程池开始执行前的标记
    public void markThreadCompletion();// 线程执行完毕的标记
    public void markThreadRejection();// 拒绝标记
    public boolean isQueueSpaceAvailable();// 是否有可用队列空间
}


3、
public abstract class HystrixThreadPoolProperties {
    /* defaults */
    static int default_coreSize = 10;            // core size of thread pool
    static int default_maximumSize = 10;         // maximum size of thread pool
    static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive
    static int default_maxQueueSize = -1;        // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
                                                 // -1 turns it off and makes us use SynchronousQueue
    static boolean default_allow_maximum_size_to_diverge_from_core_size = false; //should the maximumSize config value get read and used in configuring the threadPool
                                                                                 //turning this on should be a conscious decision by the user, so we default it to false
    static int default_queueSizeRejectionThreshold = 5; // number of items in queue
    static int default_threadPoolRollingNumberStatisticalWindow = 10000; // milliseconds for rolling number
    static int default_threadPoolRollingNumberStatisticalWindowBuckets = 10; // number of buckets in rolling number (10 1-second buckets)
}


HystrixCircuitBreaker

1、
package com.netflix.hystrix;
public interface HystrixCircuitBreaker {
    public static class Factory {
        private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>(); // 熔断器的缓存
        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            // 判断是否不是第一次访问
            HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
            if (previouslyCached != null) { // 获取到了中断器
                return previouslyCached; // 直接返回当前的中断器即可
            }
            HystrixCircuitBreaker cbForCommand =
circuitBreakersByCommand.putIfAbsent(key.name(),
new HystrixCircuitBreakerImpl(key, group, properties, metrics));
            if (cbForCommand == null) { // 创建的中断器为空
                return circuitBreakersByCommand.get(key.name());// 返回集合中的数据
            } else {
                return cbForCommand; // 返回创建完成的中断器
            }
        }
        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {
            return circuitBreakersByCommand.get(key.name());
        }
        static void reset() {
            circuitBreakersByCommand.clear();
        }
    }
}


2、
package com.netflix.hystrix;
public interface HystrixCircuitBreaker {
    static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { // 中断器实现类
        private final HystrixCommandProperties properties; // 获取Hystrix属性,得到阈值
        private final HystrixCommandMetrics metrics; // Hystrix度量统计
        private AtomicBoolean circuitOpen = new AtomicBoolean(false); // 中断开关
        private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();// 半开的检查
        protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            this.properties = properties;
            this.metrics = metrics;
        }
        public void markSuccess() { // 调用成功的标记
            if (circuitOpen.get()) { // 判断中断器状态(开启状态)
                if (circuitOpen.compareAndSet(true, false)) { // CAS操作
                    metrics.resetStream();// 度量的重置
                }
            }
        }
        @Override
        public boolean allowRequest() { // 允许访问的控制
            if (properties.circuitBreakerForceOpen().get()) { // 强制性中断开启配置
                return false; // 不允许
            }
            if (properties.circuitBreakerForceClosed().get()) { // 强制性中断关闭配置
                isOpen();
                return true; // 不启用中断器
            }
            return !isOpen() || allowSingleTest();// 测试以及打开的控制
        }
        public boolean allowSingleTest() { // 根据度量的结果来测试是否允许关闭中断器(半开)
            long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
            if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
                // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
                // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
                if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
                    // if this returns true that means we set the time so we'll return true to allow the singleTest
                    // if it returned false it means another thread raced us and allowed the singleTest before we did
                    return true;
                }
            }
            return false;
        }
        @Override
        public boolean isOpen() { // 开启状态判断
            if (circuitOpen.get()) { // 中断器状态
                // if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
                return true;
            }
            // we're closed, so let's see if errors have made us so we should trip the circuit open
            HealthCounts health = metrics.getHealthCounts();// 健康统计
            // check if we are past the statisticalWindowVolumeThreshold
            if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
                return false;
            }
            if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                return false;
            } else {
                // our failure rate is too high, trip the circuit
                if (circuitOpen.compareAndSet(false, true)) {
                    // if the previousValue was false then we want to set the currentTime
                    circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
                    return true;
                } else {
                    // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
                    // caused another thread to set it to true already even though we were in the process of doing the same
                    // In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
                    return true;
                }
            }
        }
    }
}


AbstractCommand

1、
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {}

2、
protected final HystrixCircuitBreaker circuitBreaker; // Hystrix中断器
protected final HystrixThreadPool threadPool; // Hystrix线程池
protected final HystrixThreadPoolKey threadPoolKey; // 线程池的KEY
protected final HystrixCommandProperties properties; // Hystrix属性
protected final HystrixCommandMetrics metrics; // Hystrix度量器


3、
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    executionHook.onStart(_cmd); // 调用钩子处理
    if (circuitBreaker.allowRequest()) { // 中断器的状态判断
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();// 信号量
        final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); // 释放标记
        final Action0 singleSemaphoreRelease = new Action0() { // 释放处理类
            @Override
            public void call() {
                if (semaphoreHasBeenReleased.compareAndSet(false, true)) { // 释放的配置
                    executionSemaphore.release(); // 释放处理
                }
            }
        };
        final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
            @Override
            public void call(Throwable t) { // 异常处理
                eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
            }
        };
        if (executionSemaphore.tryAcquire()) { // 获取信号量
            try {
                executionResult = executionResult
.setInvocationStartTime(System.currentTimeMillis());// 调用开始时间
                return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {
                return Observable.error(e);
            }
        } else {
            return handleSemaphoreRejectionViaFallback();
        }
    } else {
        return handleShortCircuitViaFallback();
    }
}


4、
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    final HystrixRequestContext currentRequestContext =
HystrixRequestContext.getContextForCurrentThread();// 当前请求上下文
    final Action1<R> markEmits = new Action1<R>() {
        @Override
        public void call(R r) {
            if (shouldOutputOnNextEvents()) {
                executionResult = executionResult.addEvent(HystrixEventType.EMIT); // 处理标记
                eventNotifier.markEvent(HystrixEventType.EMIT, commandKey); // 事件处理
            }
            if (commandIsScalar()) {
                long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                circuitBreaker.markSuccess();// 成功标记
            }
        }
    };
    final Action0 markOnCompleted = new Action0() { // 操作完成
        @Override
        public void call() {
            if (!commandIsScalar()) {
                long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                circuitBreaker.markSuccess();
            }
        }
    };
    final Func1<Throwable, Observable<R>> handleFallback =
new Func1<Throwable, Observable<R>>() { // 失败回退处理
        @Override
        public Observable<R> call(Throwable t) {
            Exception e = getExceptionFromThrowable(t); // 获取异常类型
            executionResult = executionResult.setExecutionException(e);
            if (e instanceof RejectedExecutionException) { // 线程池拒绝
                return handleThreadPoolRejectionViaFallback(e);
            } else if (t instanceof HystrixTimeoutException) { // Hystrix超时
                return handleTimeoutViaFallback();
            } else if (t instanceof HystrixBadRequestException) { // 请求失败
                return handleBadRequestByEmittingError(e);
            } else {
                 if (e instanceof HystrixBadRequestException) {
                    eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                    return Observable.error(e);
                }
                return handleFailureViaFallback(e);
            }
        }
    };

    final Action1<Notification<? super R>> setRequestContext =
new Action1<Notification<? super R>>() { // 设置请求上下文
        @Override
        public void call(Notification<? super R> rNotification) {
            setRequestContextIfNeeded(currentRequestContext);
        }
    };
    Observable<R> execution;
    if (properties.executionTimeoutEnabled().get()) {
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }
    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}


5、
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        // 判断当前采用的资源隔离策略是线程池还是信号量,此时的判断是线程池的隔离
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                executionResult = executionResult.setExecutionOccurred();
                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                }
                metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); // 度量操作
                if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { // 超时处理
                    return Observable.error(new RuntimeException("timed out before executing run()"));
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD,
ThreadState.STARTED)) { // 线程状态转换
                    HystrixCounters.incrementGlobalConcurrentThreads();// 中断器的计数
                    threadPool.markThreadExecution();// 线程执行
                    // 保存当前执行操作的Hystrix线程
                    endCurrentThreadExecutingCommand =
Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                    executionResult = executionResult.setExecutedInThread();// 执行结果
                    try {
                        executionHook.onThreadStart(_cmd); // 执行的钩子处理
                        executionHook.onRunStart(_cmd);
                        executionHook.onExecutionStart(_cmd);
                        return getUserExecutionObservable(_cmd);
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                } else { // command has already been unsubscribed, so return immediately
                    return Observable.error(
new RuntimeException("unsubscribed before executing run()"));
                }
            }
        }).doOnTerminate(new Action0() { // 执行中断
            @Override
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                    handleThreadEnd(_cmd);
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
                    //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
                }
                //if it was unsubscribed, then other cleanup handled it
            }
        }).doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                    handleThreadEnd(_cmd);
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
                    //if it was never started and was cancelled, then no need to clean up
                }
                //if it was terminal, then other cleanup handled it
            }
        }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
            @Override
            public Boolean call() {
                return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
            }
        }));
    } else { // 信号量处理
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                executionResult = executionResult.setExecutionOccurred();
                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                }
                metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                // semaphore isolated
                // store the command that is being run
                endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                try {
                    executionHook.onRunStart(_cmd);
                    executionHook.onExecutionStart(_cmd);
                    return getUserExecutionObservable(_cmd);  //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
                } catch (Throwable ex) {
                    //If the above hooks throw, then use that as the result of the run method
                    return Observable.error(ex);
                }
            }
        });
    }
}

demo


上次编辑于: