危险的Hystrix线程池

  • 时间:
  • 浏览:2

本文介绍Hystrixtcp连接的工作原理和参数配置,指出处在的间题并提供规避方案,阅读本文须要对Hystrix有一定的了解。

文本讨论的内容,基于hystrix 1.5.18:

    <dependency>
      <groupId>com.netflix.hystrix</groupId>
      <artifactId>hystrix-core</artifactId>
      <version>1.5.18</version>
    </dependency>

tcp连接和Hystrix Command之间的关系

当hystrix command的隔离策略配置为tcp连接,也随后我execution.isolation.strategy设置为THREAD时,command中的代码会塞进tcp连接里执行,跟发起command调用的tcp连接隔被抛弃。摘要官方wiki如下:

execution.isolation.strategy

This property indicates which isolation strategy HystrixCommand.run() executes with, one of the following two choices:

THREAD — it executes on a separate thread and concurrent requests are limited by the number of threads in the thread-pool

SEMAPHORE — it executes on the calling thread and concurrent requests are limited by the semaphore count

六个多线上的服务,往往会有太久hystrix command分别用来管理不同的內部依赖。 或多或少 有有几个hystrixtcp连接处在呢,有有哪些command跟tcp连接的对应关系又是怎么能不能的呢,是一对一吗?

答案是不一定,command跟tcp连接须要做到一对一,但通常全部完会,受到HystrixThreadPoolKey和HystrixCommandGroupKey这两项配置的影响。

优先采用HystrixThreadPoolKey来标识tcp连接,机会必须 配置HystrixThreadPoolKey必须 就使用HystrixCommandGroupKey来标识。command跟tcp连接的对应关系,看完HystrixCommandKey、HystrixThreadPoolKey、HystrixCommandGroupKey这六个参数的配置。

获取tcp连接标识的代码如下,须要看完跟我的描述是一致的:

    /*
     * ThreadPoolKey
     *
     * This defines which thread-pool this command should run on.
     *
     * It uses the HystrixThreadPoolKey if provided, then defaults to use HystrixCommandGroup.
     *
     * It can then be overridden by a property if defined so it can be changed at runtime.
     */
    private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
        if (threadPoolKeyOverride == null) {
            // we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
            if (threadPoolKey == null) {
                /* use HystrixCommandGroup if HystrixThreadPoolKey is null */
                return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
            } else {
                return threadPoolKey;
            }
        } else {
            // we have a property defining the thread-pool so use it instead
            return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
        }
    }

Hystrix会保证同六个tcp连接标识只会创建六个tcp连接:

    /*
     * Use the String from HystrixThreadPoolKey.name() instead of the HystrixThreadPoolKey instance as it's just an interface and we can't ensure the object
     * we receive implements hashcode/equals correctly and do not want the default hashcode/equals which would create a new threadpool for every object we get even if the name is the same
     */
    /* package */final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();

    /**
     * Get the {@link HystrixThreadPool} instance for a given {@link HystrixThreadPoolKey}.
     * <p>
     * This is thread-safe and ensures only 1 {@link HystrixThreadPool} per {@link HystrixThreadPoolKey}.
     *
     * @return {@link HystrixThreadPool} instance
     */
    /* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
        // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
        String key = threadPoolKey.name();

        // this should find it for all but the first time
        HystrixThreadPool previouslyCached = threadPools.get(key);
        if (previouslyCached != null) {
            return previouslyCached;
        }

        // if we get here this is the first time so we need to initialize
        synchronized (HystrixThreadPool.class) {
            if (!threadPools.containsKey(key)) {
                threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
            }
        }
        return threadPools.get(key);
    }

Hystrixtcp连接参数一览

  • coreSize 核心tcp连接数量
  • maximumSize 最大tcp连接数量
  • allowMaximumSizeToDivergeFromCoreSize 允许maximumSize大于coreSize,必须配了这名 值coreSize才有意义
  • keepAliveTimeMinutes 超过这名 时间多于coreSize数量的tcp连接会被回收,必须maximumsize大于coreSize,这名 值才有意义
  • maxQueueSize 任务队列的最大大小,当tcp连接的tcp连接tcp连接全部完会工作,随后我能创建新的tcp连接的后后,新的任务会进到队列里等待歌曲
  • queueSizeRejectionThreshold 任务队列中存储的任务数量超过这名 值,tcp连接拒绝新的任务。这跟maxQueueSize曾经是一回事,随后我受限于hystrix的实现法律方式 maxQueueSize必须动态配置,太久了这名 配置。

根据给定的tcp连接参数猜测tcp连接表现

须要看完hystrix的tcp连接参数跟JDKtcp连接ThreadPoolExecutor参数很像但又不一样,即便是全部地看完文档,仍然给你迷惑。不过无妨,先来猜猜几种配置下的表现。

coreSize = 2; maxQueueSize = 10

tcp连接中常驻六个tcp连接。新任务提交到tcp连接,有空闲tcp连接则直接执行,或多或少 入队等待歌曲歌曲。等待歌曲队列中的任务数=10时,拒绝接受新任务。

coreSize = 2; maximumSize = 5; maxQueueSize = -1

tcp连接中常驻六个tcp连接。新任务提交到tcp连接,有空闲tcp连接则直接执行,必须 空闲tcp连接时,机会当前tcp连接数小于5则创建六个多新的tcp连接用来执行任务,或多或少 拒绝任务。

coreSize = 2; maximumSize = 5; maxQueueSize = 10

这名 配置下从官方文档中机会看都没有来实际表现会是怎么能不能的。猜测有如下这名机会:

  • 机会一。tcp连接中常驻六个tcp连接。新任务提交到tcp连接,六个tcp连接富含空闲则直接执行,或多或少 入队等待歌曲歌曲。当六个tcp连接全部完会工作且等待歌曲队列中的任务数=10时,开使英语 英语 为新任务创建tcp连接,直到tcp连接数量为5,此时开使英语 英语 拒绝新任务。曾经句子,对资源敏感型的任务比较友好,这也是JDKtcp连接ThreadPoolExecutor的行为。

  • 机会二。tcp连接中常驻六个tcp连接。新任务提交到tcp连接,有空闲tcp连接则直接执行,必须 空闲tcp连接时,机会当前tcp连接数小于5则创建六个多新的tcp连接用来执行任务。当tcp连接数量达到六个且全部完会工作时,任务入队等待歌曲歌曲。等待歌曲队列中的任务数=10时,拒绝接受新任务。曾经句子,对延迟敏感型的任务比较友好。

这名状态全部完会机会,从文档中无法选用究竟怎么能不能。

并发状态下Hystrixtcp连接的真正表现

本节中,通过测试来看看tcp连接的行为究竟会怎么能不能。

还是这名 配置:

coreSize = 2; maximumSize = 5; maxQueueSize = 10

朋友通过不断提交任务到hystrixtcp连接,或多或少 在任务的执行代码中使用CountDownLatch占住tcp连接来模拟测试,代码如下:

public class HystrixThreadPoolTest {

  public static void main(String[] args) throws InterruptedException {
    final int coreSize = 2, maximumSize = 5, maxQueueSize = 10;
    final String commandName = "TestThreadPoolCommand";

    final HystrixCommand.Setter commandConfig = HystrixCommand.Setter
        .withGroupKey(HystrixCommandGroupKey.Factory.asKey(commandName))
        .andCommandKey(HystrixCommandKey.Factory.asKey(commandName))
        .andCommandPropertiesDefaults(
            HystrixCommandProperties.Setter()
                .withExecutionTimeoutEnabled(false))
        .andThreadPoolPropertiesDefaults(
            HystrixThreadPoolProperties.Setter()
                .withCoreSize(coreSize)
                .withMaximumSize(maximumSize)
                .withAllowMaximumSizeToDivergeFromCoreSize(true)
                .withMaxQueueSize(maxQueueSize)
                .withQueueSizeRejectionThreshold(maxQueueSize));

    // Run command once, so we can get metrics.
    HystrixCommand<Void> command = new HystrixCommand<Void>(commandConfig) {
      @Override protected Void run() throws Exception {
        return null;
      }
    };
    command.execute();
    Thread.sleep(400);

    final CountDownLatch stopLatch = new CountDownLatch(1);
    List<Thread> threads = new ArrayList<Thread>();

    for (int i = 0; i < coreSize + maximumSize + maxQueueSize; i++) {
      final int fi = i + 1;

      Thread thread = new Thread(new Runnable() {
        public void run() {
          try {
            HystrixCommand<Void> command = new HystrixCommand<Void>(commandConfig) {
              @Override protected Void run() throws Exception {
                stopLatch.await();
                return null;
              }
            };
            command.execute();
          } catch (HystrixRuntimeException e) {
            System.out.println("Started Jobs: " + fi);
            System.out.println("Job:" + fi + " got rejected.");
            printThreadPoolStatus();
            System.out.println();
          }
        }
      });
      threads.add(thread);
      thread.start();
      Thread.sleep(400);

      if(fi == coreSize || fi == coreSize + maximumSize || fi == coreSize + maxQueueSize ) {
        System.out.println("Started Jobs: " + fi);
        printThreadPoolStatus();
        System.out.println();
      }
    }

    stopLatch.countDown();

    for (Thread thread : threads) {
      thread.join();
    }

  }

  static void printThreadPoolStatus() {
    for (HystrixThreadPoolMetrics threadPoolMetrics : HystrixThreadPoolMetrics.getInstances()) {
      String name = threadPoolMetrics.getThreadPoolKey().name();
      Number poolSize = threadPoolMetrics.getCurrentPoolSize();
      Number queueSize = threadPoolMetrics.getCurrentQueueSize();
      System.out.println("ThreadPoolKey: " + name + ", PoolSize: " + poolSize + ", QueueSize: " + queueSize);
    }

  }

}

执行代码得到如下输出:

// 任务数 = coreSize。此时coreSize个tcp连接在工作
Started Jobs: 2
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 0

// 任务数 > coreSize。此时仍然必须coreSize个tcp连接,多于coreSize的任务进入等待歌曲歌曲队列,必须

创建新的tcp连接  
Started Jobs: 7
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 5

// 任务数 = coreSize + maxQueueSize。此时仍然必须coreSize个tcp连接,多于coreSize的任务进入等待歌曲歌曲队列,必须

创建新的tcp连接  
Started Jobs: 12
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10

// 任务数 > coreSize + maxQueueSize。此时仍然必须coreSize个tcp连接,等待歌曲歌曲队列已满,新增任务被拒绝 
Started Jobs: 13
Job:13 got rejected.
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10

Started Jobs: 14
Job:14 got rejected.
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10

Started Jobs: 15
Job:15 got rejected.
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10

Started Jobs: 16
Job:16 got rejected.
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10

Started Jobs: 17
Job:17 got rejected.
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10

全部的测试代码,参见这里

须要看完Hystrixtcp连接的实际表现,跟后后的这名猜测全部完会同,跟JDKtcp连接的表现不同,跟另这名合理猜测随后我通。当maxSize > coreSize && maxQueueSize != -1的后后,maxSize这名 参数根本就不起作用,tcp连接数量永远太久再超过coreSize,对于的任务入队等待歌曲歌曲,队列满了,就直接拒绝新任务。

不得不说,这是这名给你疑惑的,非常危险的,容易配置错误的tcp连接表现。

JDKtcp连接ThreadPoolExecutor

继续分析Hystrixtcp连接的原理后后,先来复习一下JDK中的tcp连接。

只说跟本文讨论的内容相关的参数:

  • corePoolSize核心tcp连接数,maximumPoolSize最大tcp连接数。这名 六个多参数跟hystrixtcp连接的coreSize和maximumSize含义是一致的。
  • workQueue任务等待歌曲歌曲队列。跟hystrix不同,jdktcp连接的等待歌曲歌曲队列全部完会指定大小,随后我须要使用方提供六个多BlockingQueue。
  • handler当tcp连接无法接受任务时的防止器。hystrix是直接拒绝,jdktcp连接须要定制。

须要看完,jdk的tcp连接使用起来更加灵活。配置参数的含义也十分清晰,必须 hystrxtcp连接后边allowMaximumSizeToDivergeFromCoreSize、queueSizeRejectionThreshold这名 奇奇怪怪给你疑惑的参数。

关于jdktcp连接的参数配置,参加如下jdk源码:


    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

必须 在跟hystrixtcp连接对应的参数配置下,jdktcp连接的表现会怎么能不能呢?

corePoolSize = 2; maximumPoolSize = 5; workQueue = new ArrayBlockingQueue(10); handler = new ThreadPoolExecutor.DiscardPolicy()

这里不再测试了,直接给出答案。tcp连接中常驻六个tcp连接。新任务提交到tcp连接,六个tcp连接富含空闲则直接执行,或多或少 入队等待歌曲歌曲。当六个tcp连接全部完会工作且等待歌曲队列中的任务数=10时,开使英语 英语 为新任务创建tcp连接,直到tcp连接数量为5,此时开使英语 英语 拒绝新任务。

相关逻辑涉及的源码贴在下面。值得一提的是,jdktcp连接未必根据等待歌曲歌曲任务的数量来判断等待歌曲歌曲队列算不算已满,随后我直接调用workQueue的offer法律方式 ,机会workQueue接受了那就入队等待歌曲歌曲,或多或少 执行拒绝策略。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

须要看完hystrixtcp连接的配置参数跟jdktcp连接是非常像的,从名字到含义,都基本一致。

为有哪些

事实上hystrix的tcp连接,随后我在jdktcp连接的基础上实现的。相关代码如下:


    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
        final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

        final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
        final int dynamicCoreSize = threadPoolProperties.coreSize().get();
        final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
        final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
        final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

        if (allowMaximumSizeToDivergeFromCoreSize) {
            final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
            if (dynamicCoreSize > dynamicMaximumSize) {
                logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                        dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                        dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            } else {
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            }
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        }
    }

    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        /*
         * We are using SynchronousQueue if maxQueueSize <= 0 (meaning a queue is not wanted).
         * <p>
         * SynchronousQueue will do a handoff from calling thread to worker thread and not allow queuing which is what we want.
         * <p>
         * Queuing results in added latency and would only occur when the thread-pool is full at which point there are latency issues
         * and rejecting is the preferred solution.
         */
        if (maxQueueSize <= 0) {
            return new SynchronousQueue<Runnable>();
        } else {
            return new LinkedBlockingQueue<Runnable>(maxQueueSize);
        }
    }

既然hystrixtcp连接基于jdktcp连接实现,为有哪些在如下六个多基本一致的配置上,行为却不一样呢?

//hystrix
coreSize = 2; maximumSize = 5; maxQueueSize = 10

//jdk
corePoolSize = 2; maximumPoolSize = 5; workQueue = new ArrayBlockingQueue(10); handler = new ThreadPoolExecutor.DiscardPolicy()

jdk在队列满了以完会创建tcp连接执行新任务直到tcp连接数量达到maximumPoolSize,而hystrix在队列满了后后直接拒绝新任务,maximumSize这项配置成了摆设。

意味着着 就在于hystrix判断队列算不算满算不算要拒绝新任务,必须 通过jdktcp连接在判断,随后我我本人判断的。参见如下hystrix源码:

    public boolean isQueueSpaceAvailable() {
        if (queueSize <= 0) {
            // we don't have a queue so we won't look for space but instead
            // let the thread-pool reject or not
            return true;
        } else {
            return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
        }
    }

    public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
        if (threadPool != null) {
            if (!threadPool.isQueueSpaceAvailable()) {
                throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
            }
        }
        return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
    }

须要看完hystrix在队列大小达到maxQueueSize时,根本太久再往底层的ThreadPoolExecutor提交任务。ThreadPoolExecutor也就必须 机会判断workQueue须要offer,更必须创建新的tcp连接了。

为何会么会办

对用惯了jdk的ThreadPoolExecutor的人来说,再用hystrix的确容易出错,笔者就曾在多个重要线上服务的代码里看完过错误的配置,称一声危险的hystrixtcp连接不为过。

那为何会么会办呢?

配置的后后规避间题

一起去配置maximumSize > coreSize,maxQueueSize > 0,像下面曾经,是不行了。

coreSize = 2; maximumSize = 5; maxQueueSize = 10

妥协一下,机会对延迟比较看重,配置maximumSize > coreSize,maxQueueSize = -1。曾经在任务多的后后,太久再有等待歌曲歌曲队列,直接创建新tcp连接执行任务。

coreSize = 2; maximumSize = 5; maxQueueSize = -1

机会对资源比较看重, 不希望创建太tcp连接,配置maximumSize = coreSize,maxQueueSize > 0。曾经在任务多的后后,会进等待歌曲歌曲队列,直到有tcp连接空闲机会超时。

coreSize = 2; maximumSize = 2; maxQueueSize = 10

在hystrix上修复这名 间题

技术上是可行的,有太久方案须要做到。但Netflix机会敲定不再维护hystrix了,这条路也就不通了,除非维护我本人的hystrix分支版本。

Reference

https://github.com/Netflix/Hystrix/wiki/Configuration

https://github.com/Netflix/Hystrix/issues/1589

https://github.com/Netflix/Hystrix/pull/1670