一个线上问题看各家线程池拒绝策略

又到了烧高香的季节,业务旺季来了。

之前线上项目偶发出现线程池耗尽的问题,最近终于有空能好好研究一把,问题实际并不复杂,也得益于Dubbo的线程池拒绝策略才能很快找到大致的原因。

通过这个问题,也有些好奇各家使用的线程池拒绝策略是怎样的,刨刨坑、挖挖土,一起来看看吧~

问题背景

之前线上偶发出现线程池耗尽问题,现象如下:

在调用下游Dubbo接口时,提示Server端的线程池耗尽。

最开始以为是有突发流量,但是监控显示流量稳定,并且扩容后发现问题依然存在,渐渐意识到问题并不简单。

问题分析

既然有异常日志和堆栈,先看看到底什么场景下会出现这个异常。在Dubbo源码中,我们可以找到这一段提示出现在AbortPolicyWithReport中。

1
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy

AbortPolicyWithReport继承自java.util.concurrent.ThreadPoolExecutor.AbortPolicy,是一种线程池拒绝策略,当线程池中的缓冲任务队列满,且线程数量达到最大时,就会触发拒绝策略,调用拒绝策略的rejectedExecution()方法进行处理。

那么,有哪些不同的拒绝策略呢?

JDK线程池拒绝策略

java.util.concurrent.ThreadPoolExecutor,我们可以找到JDK预设置的四种拒绝策略:

  • CallerRunsPolicy - 调用者线程处理

该策略下,如果线程池未关闭,则交由当前调用者线程进行处理,否则直接丢弃任务。

1
2
3
4
5
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}
  • AbortPolicy - 抛出异常

如果不配置拒绝策略的话,线程池会默认使用该策略,直接抛出rejectedExecution,交由上层业务处理。

1
2
3
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("...");
}
  • DiscardPolicy - 丢弃当前任务

最简单的处理方法,直接丢弃。

1
2
3
//实际方法体就是空的,即该场景下不处理,直接丢弃
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
  • DiscardOldestPolicy - 丢弃最老的任务

该策略是丢弃队列中最老的任务(其实就是下一个要执行的任务),并尝试执行当前任务。

1
2
3
4
5
6
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

Dubbo线程池拒绝策略

那么Dubbo的拒绝策略是怎样的呢?

其实从名字就能看出来,AbortPolicyWithReport

1
2
3
4
5
6
7
8
9
10
11
12
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    String msg = String.format("Thread pool is EXHAUSTED!" + ...);
    logger.warn(msg);
    dumpJStack();
    dispatchThreadPoolExhaustedEvent(msg);
    throw new RejectedExecutionException(msg);
}

}

Dubbo的拒绝策略是抛出异常RejectedExecutionException,同时还会做一件事情 - dumpJStack(),记录下当时的JVM线程堆栈。

dumpJStack

先来看看源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void dumpJStack() {

   //一些dump时间间隔和并发控制
   ...

    //新建单线程池用于dump堆栈
    ExecutorService pool = Executors.newSingleThreadExecutor();
    pool.execute(() -> {
        ...
		  try (FileOutputStream jStackStream = new FileOutputStream(
        new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
            JVMUtil.jstack(jStackStream);
        } catch (Throwable t) {
            logger.error("dump jStack error", t);
        }
        ...
    });
    ...
}

做法其实很简单,最终调用JVMUtil.jstack把当前JVM的线程堆栈dump下来,而这样做有一个很大的好处,就是能知道当时其他线程到底在做什么,帮助分析线程池溢出的原因。

原因分析

有线程堆栈就好办了,看看当时线程都在做什么。

Dubbo底层使用Netty实现网络通信,涉及的线程池包括IO线程池(boss、worker)和业务线程池(处理业务事件)。通过之前的日志可以看到是Server端的业务线程池,即DubboServerHandler耗尽,那么统计一把,看看线程都在做什么。

很明显,大量的线程都阻塞在获取DB连接上,DB连接池里连接数不够了。那接下来就好办了,可以看看同时间段是不是有慢查询长时间占住了连接,或者是真的连接池小了,线程池和连接池配比不对,分析至此就不继续展开(并不是讨论重点哈哈)。

不同的拒绝策略

可以看到,Dubbo通过重写了拒绝了策略,来帮助异常场景下进行问题定位,带来了很大的帮助。

那么其他主流组件是怎么做的呢?

RocketMQ

Broker为例,其中包含了非常多的线程池用于处理不同的消息处理场景,包含send、put、pull、query等等。

在线程池的使用上,RocketMQ通过BrokerFixedThreadPoolExecutor继承封装了一层ThreadPoolExecutor,上层可以自行传入参数,其中也包含了可配置的RejectedExecutionHandler

实际在Broker创建消息处理的不同线程池时,并没有指定特殊的拒绝策略,所以使用的是默认的AbortPolicy,即丢弃消息。

1
2
3
4
5
6
7
8
9
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
    this.brokerConfig.getSendMessageThreadPoolNums(),
    this.brokerConfig.getSendMessageThreadPoolNums(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.sendThreadPoolQueue,
    new ThreadFactoryImpl("SendMessageThread_")
    //并没有设置拒绝策略
);

同时为了避免任务溢出,为每个线程池默认设置了较大的任务队列大小。

1
2
3
4
5
6
7
8
9
private int sendThreadPoolQueueCapacity = 10000;
private int putThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
private int replyThreadPoolQueueCapacity = 10000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
private int heartbeatThreadPoolQueueCapacity = 50000;
private int endTransactionPoolQueueCapacity = 100000;

综上,RocketMQ的拒绝策略使用了AbortPolicy,即抛出异常,同时为了避免任务队列溢出,设置了较大的任务队列。

Netty

EventLoopGroup为例,线程池的拒绝策略默认使用RejectedExecutionHandlers,通过单例模式提供Handler进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class RejectedExecutionHandlers {
    private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
        @Override
        public void rejected(Runnable task, SingleThreadEventExecutor executor) {
            throw new RejectedExecutionException();
        }
    };

    private RejectedExecutionHandlers() { }

    public static RejectedExecutionHandler reject() {
        return REJECT;
    }

    ...
}

可以看出,Netty的拒绝策略默认也是抛出异常,与RocketMQ对比的不同的点在于,任务队列的大小会取max(16, maxPendingTasks)io.netty.eventLoop.maxPengdingTasks可通过环境变量进行配置。

Doris

团队内一直在用Doris,属于计算存储分离、MPP架构的分析型存储组件,看了一眼FE中的拒绝策略,官方实现了两种:

LogDiscardPolicy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static class LogDiscardPolicy implements RejectedExecutionHandler {

    private static final Logger LOG = LogManager.getLogger(LogDiscardPolicy.class);

    private String threadPoolName;

    public LogDiscardPolicy(String threadPoolName) {
        this.threadPoolName = threadPoolName;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
       LOG.warn("Task " + r.toString() + " rejected from " + threadPoolName + " " + executor.toString());
    }
}

可以理解就是DiscardPolicy,丢弃任务,同时记录warn日志。

BlockedPolicy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static class BlockedPolicy implements RejectedExecutionHandler {
    private String threadPoolName;
    private int timeoutSeconds;

    public BlockedPolicy(String threadPoolName, int timeoutSeconds) {
        this.threadPoolName = threadPoolName;
        this.timeoutSeconds = timeoutSeconds;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            executor.getQueue().offer(r, timeoutSeconds, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            LOG.warn("Task " + r.toString() + " wait to enqueue in " + threadPoolName + " " + executor.toString() + " failed");
        }
    }
}

这种策略会特殊一些,它会阻塞住当前线程,尽最大努力尝试将任务放入队列中。如果超过指定的阻塞时间timeoutSeconds(默认60s),仍然无法将任务放入队列中,则记录warn日志,并丢弃任务。

这两种策略在Doris中都有实际使用到,同时线程池的任务队列大小默认设置为10。

ElasticSearch

ES的拒绝策略相对复杂一些,其自定义实现了两种拒绝策略。

  • EsAbortPolicy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class EsAbortPolicy extends EsRejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r instanceof AbstractRunnable) {
            if (((AbstractRunnable) r).isForceExecution()) {
                BlockingQueue<Runnable> queue = executor.getQueue();
                if ((queue instanceof SizeBlockingQueue) == false) {
                    throw new IllegalStateException("forced execution, but expected a size queue");
                }
                try {
                    ((SizeBlockingQueue<Runnable>) queue).forcePut(r);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("forced execution, but got interrupted", e);
                }
                return;
            }
        }
        incrementRejections();
        throw newRejectedException(r, executor, executor.isShutdown());
    }
}

其实本质上就是AbortPolicy,但是会进行一些特殊处理,包括forceExecution强制执行的判断、任务拒绝次数统计,最终抛出异常。

ES中,线程池的forceExecution是指什么?

在满足条件的情况下,即使用了ES自定义的AbstractRunnable进行任务封装、SizeBlockingQueue作为任务队列时,可以根据任务配置判断是否强制放入任务队列。对于一些比较重要的任务,不能丢弃时,可以将forceExecution 设置为true。

强制放入任务队列带来的效果取决于SizeBlockingQueue中封装的队列类型,如果封装的是ArrayBlockingQueue,则会阻塞等待队列有空余;如果封装的是LinkedTransferQueue,由于队列大小无限,且put使用的是ASYNC模式,所以会立刻放入队列并返回。

  • ForceQueuePolicy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static class ForceQueuePolicy extends EsRejectedExecutionHandler {

        ...

        @Override
        public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
            if (rejectAfterShutdown) {
                if (executor.isShutdown()) {
                    reject(executor, task);
                } else {
                    put(executor, task);
                    if (executor.isShutdown() && executor.remove(task)) {
                        reject(executor, task);
                    }
                }
            } else {
                put(executor, task);
            }
        }

        private static void put(ThreadPoolExecutor executor, Runnable task) {
            final BlockingQueue<Runnable> queue = executor.getQueue();
            // force queue policy should only be used with a scaling queue
            assert queue instanceof ExecutorScalingQueue;
            try {
                queue.put(task);
            } catch (final InterruptedException e) {
                assert false : "a scaling queue never blocks so a put to it can never be interrupted";
                throw new AssertionError(e);
            }
        }

        private void reject(ThreadPoolExecutor executor, Runnable task) {
            incrementRejections();
            throw newRejectedException(task, executor, true);
        }
    }

}

该策略在线程池未关闭,且使用了ES自定义的ExecutorScalingQueue的任务队列时,会强制将任务放入线程池队列中。其中,ExecutorScalingQueue也是继承自LinkedTransferQueue,最终调用put方法以ASYNC模式放入任务队列中。

看上去也是forceExecution,而且最终都是使用LinkedTransferQueue的put方法以ASYNC模式非阻塞入队列。那么EsAbortPolicyForceQueuePolicy有什么不同呢?

两者有很多相似点,都有forceExecution的判断,而且拒绝时都是抛出RejectedExecutionException

不同点在于,ForceQueuePolicy默认采用强制执行模式,且在线程池关闭时依然可能往队列放入任务。

其他

在GitHub上随意翻了一下,也有看到用策略链的方式,实现也很简单,可以随意组合配置不同的策略。

1
2
3
4
5
6
7
8
9
10
11
public class RejectedExecutionChainPolicy implements RejectedExecutionHandler {

    private final RejectedExecutionHandler[] handlerChain;

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        for (RejectedExecutionHandler handler : handlerChain) {
            handler.rejectedExecution(r, executor);
        }
    }
}

总结

拒绝策略主要应用在线程池出现资源溢出的情况下,除了常见的由JDK提供的四种拒绝策略外,不同的组件也会尝试使用不同的拒绝策略来应用。

JDK提供的拒绝策略

类型 说明
CallerRunsPolicy JDK提供,调用者线程处理
AbortPolicy JDK线程池默认使用,抛出RejectedExecutionException异常
DiscardPolicy JDK提供,丢弃当前任务
DiscardOldestPolicy JDK提供,丢弃下一个要执行的任务

自定义拒绝策略

组件 类型 说明
RocketMQ AbortPolicy 使用的线程池默认拒绝的略,即AbortPolicy
Dubbo AbortPolicyWithReport 抛出RejectedExecutionException异常,并报告溢出,记录下JVM线程堆栈
Netty RejectedExecutionHandlers 逻辑与AbortPolicy一致,抛出异常,封装为单例Handler使用
Doris LogDiscardPolicy 逻辑与DiscardPolicy一致,丢弃任务,并记录warn日志
Doris BlockedPolicy 尽最大努力尝试将任务放入队列执行,最多等待60s,超时后记录warn日志,并丢弃任务
Elastic EsAbortPolicy 正常情况下与AbortPolicy一致,如果线程标记强制执行,则强制执行或放入任务队列,实际入队列的表现取决于队列类型,可能阻塞或立即返回
Elastic ForceQueuePolicy 默认强制执行任务或放入任务队列,异步非阻塞
其他 PolicyChain 策略链,包含多种拒绝策略,根据条件与节点处理结果决定最终表现