wayne
wayne
发布于 2026-01-18 / 8 阅读
0
0

自适应并发控制线程池设计:从对付慢接口讲起

自适应并发控制线程池设计

遇到的问题:当拉取数据遇上慢接口

在公司系统中,我们需要批量校验数十万会员在第三方BMS系统中的状态。但BMS接口存在两个致命问题:

  1. 响应时间飘忽不定:正常300ms,慢接口可达10秒以上

  2. 10s以上的响应容易打挂对方系统。

传统固定线程池方案在此场景下如同"闭眼开车",如果接口响应变慢时,所有线程阻塞等待,新任务堆积,最终导致:

  • 线程资源耗尽

  • 内存溢出

  • 级联故障波及整个系统

架构设计:自适应并发控制系统

自适应线程池-架构

核心机制深度解析

动态并发调整(Worker线程核心逻辑)

  • 采用指数级降级:先降低并发(7-10秒),再熔断(>10秒)

  • 状态隔离:降级与熔断使用独立计时器,避免互相干扰

  • 原子操作:通过AtomicInteger/AtomicBoolean保证线程安全

if (elapsed > 7000 && elapsed <= 10000 && concurrencyLevel.get() > MIN_CONCURRENCY) {
    // 降级:响应变慢时降低并发
    concurrencyLevel.set(MIN_CONCURRENCY);
    demotionUntil = Instant.now().plus(Duration.ofMinutes(5));
} else if (elapsed > 10000) {
    // 熔断:超时立即暂停所有请求
    shouldPause.set(true);
    pauseUntil = Instant.now().plus(Duration.ofMinutes(30));
}

线程自旋检查退出机制

  • 线程身份验证:通过workerId实现动态线程淘汰

  • 双重熔断检查:任务开始前+请求完成后双重校验

  • 优雅退出:捕获中断信号,快速释放资源

// 检查是否达到线程池数量上线
if (workerId >= concurrencyLevel.get()) {
    LogUtil.info("Worker-{}被跳过(当前上限:{})", workerId, concurrencyLevel.get());
    return;
}

// 检查是否熔断状态
if (shouldPause.get()) {
    LogUtil.info("Worker-{}暂停中,退出...", workerId);
    return;
}

熔断自愈机制

  • 独立线程定期检查:独立线程定期检查是否恢复

  • 错峰恢复:线程重建时随机延迟2-5秒,避免瞬间流量冲击

  • 状态机设计:明确熔断/降级/正常三种状态的流转条件

// 定期检查是否需要恢复(每分钟)
scheduler.scheduleAtFixedRate(this::checkResume, 1, 1, TimeUnit.MINUTES);

private void checkResume() {
    // 熔断到期恢复
    if (shouldPause.get() && Instant.now().isAfter(pauseUntil)) {
        shouldPause.set(false);
        concurrencyLevel.set(INITIAL_CONCURRENCY);
        // 重建工作线程
    } 
    // 降级到期恢复
    else if (!shouldPause.get() && concurrencyLevel.get() == MIN_CONCURRENCY 
             && Instant.now().isAfter(demotionUntil)) {
        concurrencyLevel.set(INITIAL_CONCURRENCY);
        // 仅补充缺失的线程
    }
}

延伸:与Java线程池的实现关联

这里简单分析一下Java线程池的整个流程(对整个逻辑进行了简化,方便理解):

  1. 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。

  2. 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。

  3. 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。

  4. 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,拒绝策略会调用RejectedExecutionHandler.rejectedExecution()方法。

     

ThreadPoolExecutor 是 Java 中管理线程池的核心类,其工作流程可分为任务提交、线程管理、任务执行和资源回收四个阶段。以下是详细流程:

线程池初始化

  • 参数配置:创建时需指定核心参数:

    • corePoolSize:核心线程数,默认长期存活。

    • maximumPoolSize:最大线程数,限制线程池可扩展的上限。

    • keepAliveTime:非核心线程的空闲存活时间。

    • workQueue:任务队列(如 LinkedBlockingQueueArrayBlockingQueue)。

    • RejectedExecutionHandler:拒绝策略(如抛异常、丢弃任务等)。

任务提交与处理流程

当调用 execute(Runnable command) 提交任务时,按以下顺序处理:

  1. 优先创建核心线程

    • 若当前工作线程数 < corePoolSize立即创建新线程执行任务,即使存在空闲线程。

    • 示例:corePoolSize=5,已创建3个线程(均空闲),新任务会触发创建第4个线程。

  2. 任务入队

    • 若线程数 ≥ corePoolSize,尝试将任务放入工作队列:

      • 队列未满:任务进入队列,等待空闲线程处理。

      • 队列已满:进入下一步。

  3. 创建非核心线程

    • 若队列已满且线程数 < maximumPoolSize创建新线程执行任务

    • 示例:maximumPoolSize=10corePoolSize=5,队列满后创建第6~10个线程。

  4. 触发拒绝策略

    • 若队列已满且线程数 ≥ maximumPoolSize,执行拒绝策略:

      • AbortPolicy(默认):抛出 RejectedExecutionException

      • CallerRunsPolicy:由提交任务的线程直接执行任务。

      • DiscardOldestPolicy:丢弃队列最旧任务,重新提交当前任务。

      • DiscardPolicy:静默丢弃当前任务。

任务执行与线程复用

  • 线程执行逻辑
    每个工作线程(Worker)循环从队列中获取任务:

    1. 从队列中取任务(poll()take(),取决于队列类型)。

    2. 执行任务 Runnable.run()

    3. 若队列为空,线程根据 keepAliveTime 决定是否等待:

      • 非核心线程:超时未获取任务则终止。

      • 核心线程:默认永久等待(除非配置 allowCoreThreadTimeOut=true)。

资源回收与线程终止

  • 非核心线程回收
    若线程空闲时间超过 keepAliveTime,且线程数 > corePoolSize,该线程被终止。

  • 核心线程保留
    默认核心线程永不超时,除非设置 allowCoreThreadTimeOut=true

关键点总结

阶段

行为

任务提交

优先创建线程至 corePoolSize,其次入队,最后扩展至 maximumPoolSize

队列选择

无界队列(如 LinkedBlockingQueue)会导致 maximumPoolSize 失效。

线程复用

核心线程常驻,非核心线程超时回收。

拒绝策略

队列和线程满载后,按策略处理新任务。

模板代码

package com.example.adaptive.threadpool;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import jakarta.annotation.PostConstruct;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 通用自适应并发控制线程池框架
 * 适用于需要熔断、降级、自愈能力的第三方服务调用场景
 */
@Slf4j
@Component
public class AdaptiveThreadPoolFramework {

    // 配置参数(建议提取到配置中心)
    private static final int INITIAL_CONCURRENCY = 3;    // 初始并发级别
    private static final int MIN_CONCURRENCY = 1;        // 最小并发级别
    private static final long TIMEOUT_THRESHOLD_MS = 10_000;  // 超时阈值
    private static final long CIRCUIT_BREAK_DURATION_MINUTES = 30; // 熔断持续时间
    private static final long DEGRADATION_DURATION_MINUTES = 5;   // 降级持续时间

    // 全局状态控制
    private final AtomicInteger concurrencyLevel = new AtomicInteger(INITIAL_CONCURRENCY);
    private final AtomicBoolean shouldPause = new AtomicBoolean(false);
    private volatile Instant pauseUntil = Instant.MIN;      // 熔断结束时间
    private volatile Instant demotionUntil = Instant.MIN;   // 降级结束时间

    // 线程池组件
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private final ExecutorService workerPool = Executors.newCachedThreadPool();
    private final LinkedBlockingQueue<AdaptiveTask> taskQueue = new LinkedBlockingQueue<>();

    @PostConstruct
    public void init() {
        start();
    }

    public void start() {
        log.info("自适应线程池启动,初始并发级别: {}", INITIAL_CONCURRENCY);
        
        // 启动工作线程
        for (int i = 0; i < concurrencyLevel.get(); i++) {
            workerPool.submit(new Worker(i));
        }
        
        // 启动状态检查调度器
        scheduler.scheduleAtFixedRate(this::checkRecovery, 1, 1, TimeUnit.MINUTES);
    }

    public void stop() {
        log.info("自适应线程池停止...");
        workerPool.shutdownNow();
        scheduler.shutdownNow();
    }

    /**
     * 任务恢复检查:处理熔断和降级状态的自动恢复
     */
    private void checkRecovery() {
        Instant now = Instant.now();
        
        // 检查熔断状态是否到期
        if (shouldPause.get() && now.isAfter(pauseUntil)) {
            log.info("熔断结束,恢复请求处理");
            shouldPause.set(false);
            resetConcurrency();
        }
        // 检查降级状态是否到期
        else if (!shouldPause.get() && concurrencyLevel.get() == MIN_CONCURRENCY && now.isAfter(demotionUntil)) {
            log.info("降级结束,恢复初始并发级别");
            resetConcurrency();
        }
    }

    private void resetConcurrency() {
        int currentLevel = concurrencyLevel.get();
        concurrencyLevel.set(INITIAL_CONCURRENCY);
        
        // 补充缺失的工作线程
        for (int i = currentLevel; i < INITIAL_CONCURRENCY; i++) {
            try {
                Thread.sleep((long) (Math.random() * 3000)); // 错峰启动
                workerPool.submit(new Worker(i));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }

    /**
     * 工作线程:执行任务并监控执行质量
     */
    private class Worker implements Runnable {
        private final int workerId;

        public Worker(int workerId) {
            this.workerId = workerId;
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                // 状态检查:熔断中
                if (shouldPause.get()) {
                    log.info("Worker-{} 因熔断暂停", workerId);
                    return;
                }
                
                // 身份检查:巧妙利用workerId检查是否超出当前并发限制
                if (workerId >= concurrencyLevel.get()) {
                    log.info("Worker-{} 被降级终止 (当前并发: {})", workerId, concurrencyLevel.get());
                    return;
                }

                // 获取任务
                AdaptiveTask task = taskQueue.poll();
                if (task == null) {
                    log.info("Worker-{} 无任务可处理,退出", workerId);
                    return;
                }

                // 执行任务并监控
                Instant start = Instant.now();
                try {
                    boolean success = executeTask(task);
                    long elapsed = Duration.between(start, Instant.now()).toMillis();
                    
                    handleTaskResult(task, success, elapsed);
                    
                } catch (Exception e) {
                    long elapsed = Duration.between(start, Instant.now()).toMillis();
                    handleTaskFailure(task, e, elapsed);
                }
            }
        }

        private boolean executeTask(AdaptiveTask task) throws Exception {
            // 模板方法:子类实现具体业务逻辑
            return task.execute();
        }

        private void handleTaskResult(AdaptiveTask task, boolean success, long elapsed) {
            if (!success) {
                handleFailure(elapsed);
                return;
            }
            
            log.info("[Worker-{}] 任务成功,耗时: {}ms", workerId, elapsed);
            
            // 自适应调整:基于响应时间
            if (elapsed > 7000 && elapsed <= TIMEOUT_THRESHOLD_MS) {
                applyDegradation();
            } else if (elapsed > TIMEOUT_THRESHOLD_MS) {
                applyCircuitBreak();
            }
        }

        private void handleTaskFailure(AdaptiveTask task, Exception e, long elapsed) {
            log.error("[Worker-{}] 任务失败,耗时: {}ms, 异常: {}", workerId, elapsed, e.getMessage());
            handleFailure(elapsed);
        }

        private void handleFailure(long elapsed) {
            if (elapsed > TIMEOUT_THRESHOLD_MS) {
                applyCircuitBreak();
            } else {
                applyDegradation();
            }
        }

        private void applyDegradation() {
            if (concurrencyLevel.get() > MIN_CONCURRENCY) {
                int oldLevel = concurrencyLevel.getAndSet(MIN_CONCURRENCY);
                if (oldLevel != MIN_CONCURRENCY) {
                    demotionUntil = Instant.now().plus(Duration.ofMinutes(DEGRADATION_DURATION_MINUTES));
                    log.warn("Worker-{} 触发降级,并发级别降至 {}", workerId, MIN_CONCURRENCY);
                }
            }
        }

        private void applyCircuitBreak() {
            if (!shouldPause.getAndSet(true)) {
                pauseUntil = Instant.now().plus(Duration.ofMinutes(CIRCUIT_BREAK_DURATION_MINUTES));
                log.error("Worker-{} 触发熔断,暂停 {} 分钟", workerId, CIRCUIT_BREAK_DURATION_MINUTES);
            }
        }
    }

    /**
     * 任务接口:定义可适配的任务模板
     */
    public interface AdaptiveTask {
        boolean execute() throws Exception;
        String getTaskId();
    }

    /**
     * 提交任务到框架
     */
    public boolean submitTask(AdaptiveTask task) {
        if (shouldPause.get()) {
            log.warn("系统处于熔断状态,拒绝任务: {}", task.getTaskId());
            return false;
        }
        return taskQueue.offer(task);
    }

    /**
     * 获取当前运行状态
     */
    public String getStatus() {
        return String.format(
            "状态: %s, 并发级别: %d/%d, 任务队列: %d, 熔断剩余: %d分钟, 降级剩余: %d分钟",
            shouldPause.get() ? "熔断中" : concurrencyLevel.get() == MIN_CONCURRENCY ? "降级中" : "正常",
            concurrencyLevel.get(), INITIAL_CONCURRENCY,
            taskQueue.size(),
            shouldPause.get() ? Duration.between(Instant.now(), pauseUntil).toMinutes() : 0,
            concurrencyLevel.get() == MIN_CONCURRENCY ? Duration.between(Instant.now(), demotionUntil).toMinutes() : 0
        );
    }
}


评论