自适应并发控制线程池设计
遇到的问题:当拉取数据遇上慢接口
在公司系统中,我们需要批量校验数十万会员在第三方BMS系统中的状态。但BMS接口存在两个致命问题:
响应时间飘忽不定:正常300ms,慢接口可达10秒以上
10s以上的响应容易打挂对方系统。
传统固定线程池方案在此场景下如同"闭眼开车",如果接口响应变慢时,所有线程阻塞等待,新任务堆积,最终导致:
线程资源耗尽
内存溢出
级联故障波及整个系统
架构设计:自适应并发控制系统

核心机制深度解析
动态并发调整(Worker线程核心逻辑)
采用指数级降级:先降低并发(7-10秒),再熔断(>10秒)
状态隔离:降级与熔断使用独立计时器,避免互相干扰
原子操作:通过
AtomicInteger/AtomicBoolean保证线程安全
if (elapsed > 7000 && elapsed <= 10000 && concurrencyLevel.get() > MIN_CONCURRENCY) {
// 降级:响应变慢时降低并发
concurrencyLevel.set(MIN_CONCURRENCY);
demotionUntil = Instant.now().plus(Duration.ofMinutes(5));
} else if (elapsed > 10000) {
// 熔断:超时立即暂停所有请求
shouldPause.set(true);
pauseUntil = Instant.now().plus(Duration.ofMinutes(30));
}线程自旋检查退出机制
线程身份验证:通过workerId实现动态线程淘汰
双重熔断检查:任务开始前+请求完成后双重校验
优雅退出:捕获中断信号,快速释放资源
// 检查是否达到线程池数量上线
if (workerId >= concurrencyLevel.get()) {
LogUtil.info("Worker-{}被跳过(当前上限:{})", workerId, concurrencyLevel.get());
return;
}
// 检查是否熔断状态
if (shouldPause.get()) {
LogUtil.info("Worker-{}暂停中,退出...", workerId);
return;
}熔断自愈机制
独立线程定期检查:独立线程定期检查是否恢复
错峰恢复:线程重建时随机延迟2-5秒,避免瞬间流量冲击
状态机设计:明确熔断/降级/正常三种状态的流转条件
// 定期检查是否需要恢复(每分钟)
scheduler.scheduleAtFixedRate(this::checkResume, 1, 1, TimeUnit.MINUTES);
private void checkResume() {
// 熔断到期恢复
if (shouldPause.get() && Instant.now().isAfter(pauseUntil)) {
shouldPause.set(false);
concurrencyLevel.set(INITIAL_CONCURRENCY);
// 重建工作线程
}
// 降级到期恢复
else if (!shouldPause.get() && concurrencyLevel.get() == MIN_CONCURRENCY
&& Instant.now().isAfter(demotionUntil)) {
concurrencyLevel.set(INITIAL_CONCURRENCY);
// 仅补充缺失的线程
}
}延伸:与Java线程池的实现关联
这里简单分析一下Java线程池的整个流程(对整个逻辑进行了简化,方便理解):
如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。
如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,拒绝策略会调用
RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor 是 Java 中管理线程池的核心类,其工作流程可分为任务提交、线程管理、任务执行和资源回收四个阶段。以下是详细流程:
线程池初始化
参数配置:创建时需指定核心参数:
corePoolSize:核心线程数,默认长期存活。maximumPoolSize:最大线程数,限制线程池可扩展的上限。keepAliveTime:非核心线程的空闲存活时间。workQueue:任务队列(如LinkedBlockingQueue、ArrayBlockingQueue)。RejectedExecutionHandler:拒绝策略(如抛异常、丢弃任务等)。
任务提交与处理流程
当调用 execute(Runnable command) 提交任务时,按以下顺序处理:
优先创建核心线程
若当前工作线程数 <
corePoolSize,立即创建新线程执行任务,即使存在空闲线程。示例:
corePoolSize=5,已创建3个线程(均空闲),新任务会触发创建第4个线程。
任务入队
若线程数 ≥
corePoolSize,尝试将任务放入工作队列:队列未满:任务进入队列,等待空闲线程处理。
队列已满:进入下一步。
创建非核心线程
若队列已满且线程数 <
maximumPoolSize,创建新线程执行任务。示例:
maximumPoolSize=10,corePoolSize=5,队列满后创建第6~10个线程。
触发拒绝策略
若队列已满且线程数 ≥
maximumPoolSize,执行拒绝策略:AbortPolicy(默认):抛出
RejectedExecutionException。CallerRunsPolicy:由提交任务的线程直接执行任务。
DiscardOldestPolicy:丢弃队列最旧任务,重新提交当前任务。
DiscardPolicy:静默丢弃当前任务。
任务执行与线程复用
线程执行逻辑:
每个工作线程(Worker)循环从队列中获取任务:从队列中取任务(
poll()或take(),取决于队列类型)。执行任务
Runnable.run()。若队列为空,线程根据
keepAliveTime决定是否等待:非核心线程:超时未获取任务则终止。
核心线程:默认永久等待(除非配置
allowCoreThreadTimeOut=true)。
资源回收与线程终止
非核心线程回收:
若线程空闲时间超过keepAliveTime,且线程数 >corePoolSize,该线程被终止。核心线程保留:
默认核心线程永不超时,除非设置allowCoreThreadTimeOut=true。
关键点总结
模板代码
package com.example.adaptive.threadpool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import jakarta.annotation.PostConstruct;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 通用自适应并发控制线程池框架
* 适用于需要熔断、降级、自愈能力的第三方服务调用场景
*/
@Slf4j
@Component
public class AdaptiveThreadPoolFramework {
// 配置参数(建议提取到配置中心)
private static final int INITIAL_CONCURRENCY = 3; // 初始并发级别
private static final int MIN_CONCURRENCY = 1; // 最小并发级别
private static final long TIMEOUT_THRESHOLD_MS = 10_000; // 超时阈值
private static final long CIRCUIT_BREAK_DURATION_MINUTES = 30; // 熔断持续时间
private static final long DEGRADATION_DURATION_MINUTES = 5; // 降级持续时间
// 全局状态控制
private final AtomicInteger concurrencyLevel = new AtomicInteger(INITIAL_CONCURRENCY);
private final AtomicBoolean shouldPause = new AtomicBoolean(false);
private volatile Instant pauseUntil = Instant.MIN; // 熔断结束时间
private volatile Instant demotionUntil = Instant.MIN; // 降级结束时间
// 线程池组件
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final ExecutorService workerPool = Executors.newCachedThreadPool();
private final LinkedBlockingQueue<AdaptiveTask> taskQueue = new LinkedBlockingQueue<>();
@PostConstruct
public void init() {
start();
}
public void start() {
log.info("自适应线程池启动,初始并发级别: {}", INITIAL_CONCURRENCY);
// 启动工作线程
for (int i = 0; i < concurrencyLevel.get(); i++) {
workerPool.submit(new Worker(i));
}
// 启动状态检查调度器
scheduler.scheduleAtFixedRate(this::checkRecovery, 1, 1, TimeUnit.MINUTES);
}
public void stop() {
log.info("自适应线程池停止...");
workerPool.shutdownNow();
scheduler.shutdownNow();
}
/**
* 任务恢复检查:处理熔断和降级状态的自动恢复
*/
private void checkRecovery() {
Instant now = Instant.now();
// 检查熔断状态是否到期
if (shouldPause.get() && now.isAfter(pauseUntil)) {
log.info("熔断结束,恢复请求处理");
shouldPause.set(false);
resetConcurrency();
}
// 检查降级状态是否到期
else if (!shouldPause.get() && concurrencyLevel.get() == MIN_CONCURRENCY && now.isAfter(demotionUntil)) {
log.info("降级结束,恢复初始并发级别");
resetConcurrency();
}
}
private void resetConcurrency() {
int currentLevel = concurrencyLevel.get();
concurrencyLevel.set(INITIAL_CONCURRENCY);
// 补充缺失的工作线程
for (int i = currentLevel; i < INITIAL_CONCURRENCY; i++) {
try {
Thread.sleep((long) (Math.random() * 3000)); // 错峰启动
workerPool.submit(new Worker(i));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
/**
* 工作线程:执行任务并监控执行质量
*/
private class Worker implements Runnable {
private final int workerId;
public Worker(int workerId) {
this.workerId = workerId;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
// 状态检查:熔断中
if (shouldPause.get()) {
log.info("Worker-{} 因熔断暂停", workerId);
return;
}
// 身份检查:巧妙利用workerId检查是否超出当前并发限制
if (workerId >= concurrencyLevel.get()) {
log.info("Worker-{} 被降级终止 (当前并发: {})", workerId, concurrencyLevel.get());
return;
}
// 获取任务
AdaptiveTask task = taskQueue.poll();
if (task == null) {
log.info("Worker-{} 无任务可处理,退出", workerId);
return;
}
// 执行任务并监控
Instant start = Instant.now();
try {
boolean success = executeTask(task);
long elapsed = Duration.between(start, Instant.now()).toMillis();
handleTaskResult(task, success, elapsed);
} catch (Exception e) {
long elapsed = Duration.between(start, Instant.now()).toMillis();
handleTaskFailure(task, e, elapsed);
}
}
}
private boolean executeTask(AdaptiveTask task) throws Exception {
// 模板方法:子类实现具体业务逻辑
return task.execute();
}
private void handleTaskResult(AdaptiveTask task, boolean success, long elapsed) {
if (!success) {
handleFailure(elapsed);
return;
}
log.info("[Worker-{}] 任务成功,耗时: {}ms", workerId, elapsed);
// 自适应调整:基于响应时间
if (elapsed > 7000 && elapsed <= TIMEOUT_THRESHOLD_MS) {
applyDegradation();
} else if (elapsed > TIMEOUT_THRESHOLD_MS) {
applyCircuitBreak();
}
}
private void handleTaskFailure(AdaptiveTask task, Exception e, long elapsed) {
log.error("[Worker-{}] 任务失败,耗时: {}ms, 异常: {}", workerId, elapsed, e.getMessage());
handleFailure(elapsed);
}
private void handleFailure(long elapsed) {
if (elapsed > TIMEOUT_THRESHOLD_MS) {
applyCircuitBreak();
} else {
applyDegradation();
}
}
private void applyDegradation() {
if (concurrencyLevel.get() > MIN_CONCURRENCY) {
int oldLevel = concurrencyLevel.getAndSet(MIN_CONCURRENCY);
if (oldLevel != MIN_CONCURRENCY) {
demotionUntil = Instant.now().plus(Duration.ofMinutes(DEGRADATION_DURATION_MINUTES));
log.warn("Worker-{} 触发降级,并发级别降至 {}", workerId, MIN_CONCURRENCY);
}
}
}
private void applyCircuitBreak() {
if (!shouldPause.getAndSet(true)) {
pauseUntil = Instant.now().plus(Duration.ofMinutes(CIRCUIT_BREAK_DURATION_MINUTES));
log.error("Worker-{} 触发熔断,暂停 {} 分钟", workerId, CIRCUIT_BREAK_DURATION_MINUTES);
}
}
}
/**
* 任务接口:定义可适配的任务模板
*/
public interface AdaptiveTask {
boolean execute() throws Exception;
String getTaskId();
}
/**
* 提交任务到框架
*/
public boolean submitTask(AdaptiveTask task) {
if (shouldPause.get()) {
log.warn("系统处于熔断状态,拒绝任务: {}", task.getTaskId());
return false;
}
return taskQueue.offer(task);
}
/**
* 获取当前运行状态
*/
public String getStatus() {
return String.format(
"状态: %s, 并发级别: %d/%d, 任务队列: %d, 熔断剩余: %d分钟, 降级剩余: %d分钟",
shouldPause.get() ? "熔断中" : concurrencyLevel.get() == MIN_CONCURRENCY ? "降级中" : "正常",
concurrencyLevel.get(), INITIAL_CONCURRENCY,
taskQueue.size(),
shouldPause.get() ? Duration.between(Instant.now(), pauseUntil).toMinutes() : 0,
concurrencyLevel.get() == MIN_CONCURRENCY ? Duration.between(Instant.now(), demotionUntil).toMinutes() : 0
);
}
}