博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hystrix源码之插件
阅读量:6733 次
发布时间:2019-06-25

本文共 14970 字,大约阅读时间需要 49 分钟。

hot3.png

HystrixPlugins

  获取并发相关类(HystrixConcurrencyStrategy)、事件通知类(HystrixEventNotifier)、度量信息类(HystrixMetricsPublisher)、Properties配置类(HystrixPropertiesStrategy)、HystrixCommand回调函数类(HystrixCommandExecutionHook)、HystrixDynamicProperties,6类插件。

插件获取:

  HystrixPlugins 首先会去properties(HystrixDynamicProperties)配置下需找相应类型的实现类。

private static 
T getPluginImplementationViaProperties(Class
pluginClass, HystrixDynamicProperties dynamicProperties) { String classSimpleName = pluginClass.getSimpleName(); // Check Archaius for plugin class. String propertyName = "hystrix.plugin." + classSimpleName + ".implementation"; String implementingClass = dynamicProperties.getString(propertyName, null).get(); ....

  如果返回null,则通过ServiceLoader获取相应类型的实现类。

复制代码

private 
T getPluginImplementation(Class
pluginClass) { T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties); if (p != null) return p; return findService(pluginClass, classLoader); } private static
T findService( Class
spi, ClassLoader classLoader) throws ServiceConfigurationError { ServiceLoader
sl = ServiceLoader.load(spi, classLoader); for (T s : sl) { if (s != null) return s; } return null; }

复制代码

  如果获取成功,存储变量中。

复制代码

final AtomicReference
notifier = new AtomicReference
();public HystrixEventNotifier getEventNotifier() { if (notifier.get() == null) { // check for an implementation from Archaius first Object impl = getPluginImplementation(HystrixEventNotifier.class); if (impl == null) { // nothing set via Archaius so initialize with default notifier.compareAndSet(null, HystrixEventNotifierDefault.getInstance()); // we don't return from here but call get() again in case of thread-race so the winner will always get returned } else { // we received an implementation from Archaius so use it notifier.compareAndSet(null, (HystrixEventNotifier) impl); } } return notifier.get(); }

复制代码

  获取HystrixDynamicProperties对象,有点不同,首先使用HystrixDynamicPropertiesSystemProperties配置来需找相应类型的实现类。第二如果查询不同会默认使用Archaius,如果依然没有Archaius支持,会使用HystrixDynamicPropertiesSystemProperties。

复制代码

private static HystrixDynamicProperties resolveDynamicProperties(ClassLoader classLoader, LoggerSupplier logSupplier) {        HystrixDynamicProperties hp = getPluginImplementationViaProperties(HystrixDynamicProperties.class,                 HystrixDynamicPropertiesSystemProperties.getInstance());        if (hp != null) {            logSupplier.getLogger().debug(                    "Created HystrixDynamicProperties instance from System property named "                    + "\"hystrix.plugin.HystrixDynamicProperties.implementation\". Using class: {}",                     hp.getClass().getCanonicalName());            return hp;        }        hp = findService(HystrixDynamicProperties.class, classLoader);        if (hp != null) {            logSupplier.getLogger()                    .debug("Created HystrixDynamicProperties instance by loading from ServiceLoader. Using class: {}",                             hp.getClass().getCanonicalName());            return hp;        }        hp = HystrixArchaiusHelper.createArchaiusDynamicProperties();        if (hp != null) {            logSupplier.getLogger().debug("Created HystrixDynamicProperties. Using class : {}",                     hp.getClass().getCanonicalName());            return hp;        }        hp = HystrixDynamicPropertiesSystemProperties.getInstance();        logSupplier.getLogger().info("Using System Properties for HystrixDynamicProperties! Using class: {}",                 hp.getClass().getCanonicalName());        return hp;    }

复制代码

  HystrixConcurrencyStrategy

  并发相关的策略类。

  获取线程池,实际根据配置创建ThreadPoolExecutor。

复制代码

/**     获取线程池*/    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty
corePoolSize, HystrixProperty
maximumPoolSize, HystrixProperty
keepAliveTime, TimeUnit unit, BlockingQueue
workQueue) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); final int dynamicCoreSize = corePoolSize.get(); final int dynamicMaximumSize = maximumPoolSize.get(); if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory); } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory); } } public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get(); final int dynamicCoreSize = threadPoolProperties.coreSize().get(); final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get(); final int maxQueueSize = threadPoolProperties.maxQueueSize().get(); final BlockingQueue
workQueue = getBlockingQueue(maxQueueSize); if (allowMaximumSizeToDivergeFromCoreSize) { final int dynamicMaximumSize = threadPoolProperties.maximumSize().get(); if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } } private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) { if (!PlatformSpecific.isAppEngineStandardEnvironment()) { return new ThreadFactory() { private final AtomicInteger threadNumber = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet()); thread.setDaemon(true); return thread; } }; } else { return PlatformSpecific.getAppEngineThreadFactory(); } } public BlockingQueue
getBlockingQueue(int maxQueueSize) { if (maxQueueSize <= 0) { return new SynchronousQueue
(); } else { return new LinkedBlockingQueue
(maxQueueSize); } }

复制代码

   在执行callable前提供用户对callable进行分装

public 
Callable
wrapCallable(Callable
callable) { return callable; }

  获取HystrixRequestVariable

public 
HystrixRequestVariable
getRequestVariable(final HystrixRequestVariableLifecycle
rv) { return new HystrixLifecycleForwardingRequestVariable
(rv); }

HystrixCommandExecutionHook

   hystrix命令执行过程中的回调接口,提供了一下回调接口。

复制代码

/**    在命令执行前被调用*/    public 
void onStart(HystrixInvokable
commandInstance) { //do nothing by default } /** 当接收到数据时被调用*/ public
T onEmit(HystrixInvokable
commandInstance, T value) { return value; //by default, just pass through } /** *当命令异常时被调用 * @since 1.2 */ public
Exception onError(HystrixInvokable
commandInstance, FailureType failureType, Exception e) { return e; //by default, just pass through } /** *当执行成功后被调用 * @since 1.4 */ public
void onSuccess(HystrixInvokable
commandInstance) { //do nothing by default } /** 在线程池执行命令前执行*/ public
void onThreadStart(HystrixInvokable
commandInstance) { //do nothing by default } /** 在线程池执行完成后调用*/ public
void onThreadComplete(HystrixInvokable
commandInstance) { // do nothing by default } /** 当开始执行命令时调用*/ public
void onExecutionStart(HystrixInvokable
commandInstance) { //do nothing by default } /** 当执行命令emits a value时调用 * * @param commandInstance The executing HystrixInvokable instance. * @param value value emitted * * @since 1.4 */ public
T onExecutionEmit(HystrixInvokable
commandInstance, T value) { return value; //by default, just pass through } /** 执行抛出异常时调用*/ public
Exception onExecutionError(HystrixInvokable
commandInstance, Exception e) { return e; //by default, just pass through } /** 当调用命令成功执行完调用 * @since 1.4 */ public
void onExecutionSuccess(HystrixInvokable
commandInstance) { //do nothing by default } /** * Invoked when the fallback method in {@link HystrixInvokable} starts. * * @param commandInstance The executing HystrixInvokable instance. * * @since 1.2 */ public
void onFallbackStart(HystrixInvokable
commandInstance) { //do nothing by default } /** * Invoked when the fallback method in {@link HystrixInvokable} emits a value. * * @param commandInstance The executing HystrixInvokable instance. * @param value value emitted * * @since 1.4 */ public
T onFallbackEmit(HystrixInvokable
commandInstance, T value) { return value; //by default, just pass through } /** * Invoked when the fallback method in {@link HystrixInvokable} fails with an Exception. * * @param commandInstance The executing HystrixInvokable instance. * @param e exception object * * @since 1.2 */ public
Exception onFallbackError(HystrixInvokable
commandInstance, Exception e) { //by default, just pass through return e; } /** * Invoked when the user-defined execution method in {@link HystrixInvokable} completes successfully. * * @param commandInstance The executing HystrixInvokable instance. * * @since 1.4 */ public
void onFallbackSuccess(HystrixInvokable
commandInstance) { //do nothing by default } /** 如果从缓存中获取数据时调用*/ public
void onCacheHit(HystrixInvokable
commandInstance) { //do nothing by default } /** 当取消挂载时被调用 * @since 1.5.9 */ public
void onUnsubscribe(HystrixInvokable
commandInstance) { //do nothing by default }

复制代码

 HystrixEventNotifier

  hystrix命令执行过程中,接收相应的事件通知。

复制代码

/**    接收到消息后执行  接受一下消息:   RESPONSE_FROM_CACHE如果该command从缓存中获取数据   CANCELLED 如果command在完成前unsubscrible   EXCEPTION_THROWN 如果command出现异常   EMIT 如果command发送数据   THREAD_POOL_REJECTED 如果线程池抛出reject异常   FAILURE 执行失败异常  FALLBACK_EMIT执行fallback返回数据  FALLBACK_SUCCESS执行fallback成功  FALLBACK_FAILURE执行fallback发生异常  FALLBACK_REJECTION超过信号量,fallback未被执行。  FALLBACK_MISSING  SEMAPHORE_REJECTED信号量模型执行被拒绝。  SHORT_CIRCUITED熔断*/    public void markEvent(HystrixEventType eventType, HystrixCommandKey key) {        // do nothing    }    /**     * Called after a command is executed using thread isolation.     * Will not get called if a command is rejected, short-circuited etc.*/    public void markCommandExecution(HystrixCommandKey key, ExecutionIsolationStrategy isolationStrategy, int duration, List
eventsDuringExecution) { // do nothing }

复制代码

 HystrixMetricsPublisher

  HystrixMetricsPublisherFactory使用该插件来创建HystrixMetricsPublisherCommand、HystrixMetricsPublisherThreadPool、HystrixMetricsPublisherCollapser,并调用相应的initialize方法,这些类用来向第三方publish hystrix的metrics信息。

复制代码

private final ConcurrentHashMap
threadPoolPublishers = new ConcurrentHashMap
(); /* package */ HystrixMetricsPublisherThreadPool getPublisherForThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties) { // attempt to retrieve from cache first HystrixMetricsPublisherThreadPool publisher = threadPoolPublishers.get(threadPoolKey.name()); if (publisher != null) { return publisher; } // it doesn't exist so we need to create it publisher = HystrixPlugins.getInstance().getMetricsPublisher().getMetricsPublisherForThreadPool(threadPoolKey, metrics, properties); // attempt to store it (race other threads) HystrixMetricsPublisherThreadPool existing = threadPoolPublishers.putIfAbsent(threadPoolKey.name(), publisher); if (existing == null) { // we won the thread-race to store the instance we created so initialize it publisher.initialize(); // done registering, return instance that got cached return publisher; } else { // we lost so return 'existing' and let the one we created be garbage collected // without calling initialize() on it return existing; } }

复制代码

  

 HystrixPropertiesStrategy

  创建HystrixCommandProperties、HystrixThreadPoolProperties、HystrixCollapserProperties、HystrixTimerThreadPoolProperties

复制代码

/**     创建HystrixCommandProperties*/    public HystrixCommandProperties getCommandProperties(HystrixCommandKey commandKey, HystrixCommandProperties.Setter builder) {        return new HystrixPropertiesCommandDefault(commandKey, builder);    }/**     创建HystrixThreadPoolProperties*/    public HystrixThreadPoolProperties getThreadPoolProperties(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter builder) {        return new HystrixPropertiesThreadPoolDefault(threadPoolKey, builder);    }/**     创建HystrixCollapserProperties*/    public HystrixCollapserProperties getCollapserProperties(HystrixCollapserKey collapserKey, HystrixCollapserProperties.Setter builder) {        return new HystrixPropertiesCollapserDefault(collapserKey, builder);    }/**     创建HystrixTimerThreadPoolProperties*/    public HystrixTimerThreadPoolProperties getTimerThreadPoolProperties() {        return new HystrixPropertiesTimerThreadPoolDefault();    }

复制代码

 

http://www.cnblogs.com/zhangwanhua/p/7878515.html

转载于:https://my.oschina.net/xiaominmin/blog/1590739

你可能感兴趣的文章
windows环境搭建禅道项目管理工具
查看>>
Fibonacci数列
查看>>
10个带源码的充满活力的Web设计教程
查看>>
[14]CSS3 文本效果
查看>>
检索 COM 类工厂中 CLSID 为 {BDEADF26-C265-11D0-BCED-00A0C90AB50F}的组件时失败,原因是出现以下错误: 80040154。...
查看>>
陶哲轩实分析习题17.3.2
查看>>
CentOS rpmdb open failed
查看>>
UILabel
查看>>
indexOf()的用法
查看>>
前端 时间个性化 插件 jquery.timeago.js
查看>>
20145337 《Java程序设计》第九周学习总结
查看>>
js中frame的操作问题
查看>>
BZOJ 4913 [Sdoi2017] 遗忘的集合
查看>>
Java经典设计模式(1):五大创建型模式(附实例和详解)
查看>>
Vue源码探究-数据绑定的实现
查看>>
【九】MongoDB管理之安全性
查看>>
Java课堂动手动脑--方法
查看>>
原创博文《arduino入门》预告
查看>>
Android一个小巧的记录app(便签或者日记 随心)
查看>>
基于色彩恒常( color constancy)特性的Frankle-McCann Retinex图像增强。
查看>>