轮询请求并动态刷新列表数据

对于耗时比较久的操作,需要前端页面实时请求更新已处理的结果。

一个用于启动异步处理的接口,另一个用于查询处理状态和结果的接口。实现异步处理和轮询查询的功能。

原始逻辑

@ApiOperation(value = "鉴别器类型列表", notes = "鉴别器类型列表,根据鉴别器类型名模糊查询")
@GetMapping("/accountCheck")
public ResponseBean accountCheck(@RequestHeader(value = "teamId") String teamId) {
    List<Account> AccountList = AccountService.findAll(teamId);
    List<AccountEdit> accountEdits = new LinkedList<>();
    for (Account account : AccountList) {
        AccountEdit accountEdit = AccountService.edit(account.getId());
        ResponseBean checkRes = AccountCheckUtils.checkAccount(accountEdit);
        if (checkRes.isError()) {
            accountEdit.setAuthParams(checkRes.getMsg());
            accountEdits.add(accountEdit);
        }
    }
    String info = "鉴别器类型校验完成,共" + accountEdits.size() + "个账号,其中有" + accountEdits.size() + "个账号校验失败!";
    return ResponseBean.success(info, accountEdits);
}

异步轮询

我们可以将这个过程分为两个部分:一个用于启动异步处理的接口,另一个用于查询处理状态和结果的接口。这样可以实现异步处理和轮询查询的功能。以下是优化后的代码:

@Autowired
private AccountService AccountService;
@Autowired
@Qualifier("applicationTaskExecutor")
private AsyncTaskExecutor asyncTaskExecutor;
private final ConcurrentMap<String, AccountCheckTask> taskMap = new ConcurrentHashMap<>();

@ApiOperation(value = "开始鉴别器类型检查", notes = "异步开始鉴别器类型检查")
@GetMapping("/startAccountCheck")
public ResponseBean startAccountCheck(@RequestHeader(value = "teamId") String teamId) {
    String taskId = ActiveInfo.userName()+teamId;
    List<Account> AccountList = AccountService.findAll(teamId);

    AccountCheckTask task = new AccountCheckTask(AccountList.size());
    taskMap.put(taskId, task);

    CompletableFuture.runAsync(() -> processAccounts(AccountList, task), asyncTaskExecutor)
            .thenRun(() -> task.setCompleted(true));

    Map<String, Object> response = new HashMap<>();
    response.put("taskId", taskId);
    response.put("totalAccounts", AccountList.size());
    return ResponseBean.successMap("",response);
}

@ApiOperation(value = "获取鉴别器类型检查进度和结果", notes = "获取异步鉴别器类型检查的进度和结果")
@GetMapping("/getAccountCheckResult")
public ResponseBean getAccountCheckResult(@RequestHeader(value = "teamId") String teamId) {
    String taskId = ActiveInfo.userName()+teamId;
    AccountCheckTask task = taskMap.get(taskId);
    if (task == null) {
        return ResponseBean.error("Task not found");
    }

    Map<String, Object> result = new HashMap<>();
    result.put("totalAccounts", task.getTotalAccounts());
    result.put("processedAccounts", task.getProcessedAccounts());
    result.put("remainingAccounts", task.getRemainingAccounts());
    result.put("completed", task.isCompleted());

    String statusInfo = "进行中";
    if (task.isCompleted()) {
        statusInfo = "已完成";
    }
    List<AccountEdit> failedAccounts = task.getFailedAccounts();
    String info = String.format("账号校验%s(共 %d 个账号):已检测 %d 个账号,其中有 %d 个账号校验失败!",statusInfo,
            task.getTotalAccounts(), task.getProcessedAccounts().get(), failedAccounts.size());
    result.put("info", info);
    result.put("failedAccounts", failedAccounts);

    return ResponseBean.successMap(info,result);
}

private void processAccounts(List<Account> accounts, AccountCheckTask task) {
    accounts.stream()
            .map(account -> AccountService.edit(account.getId()))
            .forEach(accountEdit -> {
                try {
                    ResponseBean checkRes = AccountCheckUtils.checkAccount(accountEdit);
                    if (checkRes.isError()) {
                        accountEdit.setAuthParams(checkRes.getMsg());
                        task.addFailedAccount(accountEdit);
                    }
                } catch (Exception e) {
                    log.error("Error checking account: " + accountEdit.getId(), e);
                    accountEdit.setAuthParams("检查过程中发生错误: " + e.getMessage());
                    task.addFailedAccount(accountEdit);
                } finally {
                    task.incrementProcessedAccounts();
                }
            });
}

其中

@Data
public class AccountCheckTask {
    private final int totalAccounts;
    private final AtomicInteger processedAccounts = new AtomicInteger(0);
    private final List<AccountEdit> failedAccounts = new CopyOnWriteArrayList<>();
    private volatile boolean completed = false;

    public AccountCheckTask(int totalAccounts) {
        this.totalAccounts = totalAccounts;
    }

    public void incrementProcessedAccounts() {
        processedAccounts.incrementAndGet();
    }

    public int getRemainingAccounts() {
        return totalAccounts - processedAccounts.get();
    }

    public void addFailedAccount(AccountEdit account) {
        failedAccounts.add(account);
    }
}

前端轮询

const [checkStatus, setCheckStatus] = useState(false);
const [accountList, setAccountList] = useState([]);
const [checkInfo, setCheckInfo] = useState('');

const startAccountList = async () => {
    const { data } = await axios
    .get('/AccountCheck/startAccountCheck');

    let intervalId = null;
    const fetchData = async () => {
        const { data } = await axios
        .get('/AccountCheck/getAccountCheckResult');
        setAccountList(data.map.failedAccounts.map(convertToListBean));
        setCheckInfo(data.map.info)
        if(data.map.completed){
            setCheckStatus(false)
            clearInterval(intervalId);
        } else {
            setCheckStatus(true)
        }
    };
    fetchData();// 立即执行一次
    // 然后设置定时器
    intervalId = setInterval(fetchData, 5000);
    return () => {
        if (intervalId) {
            clearInterval(intervalId);
        }
    };
};

异常处理

错误说明

***************************
APPLICATION FAILED TO START
***************************

Description:

Field asyncTaskExecutor in xxx...Controller required a single bean, but 2 were found:
	- applicationTaskExecutor: defined by method 'applicationTaskExecutor' in class path resource [org/springframework/boot/autoconfigure/task/TaskExecutionAutoConfiguration.class]
	- taskScheduler: defined by method 'taskScheduler' in class path resource [org/springframework/boot/autoconfigure/task/TaskSchedulingAutoConfiguration.class]

解决方案

这个错误表明 Spring 容器中存在多个 AsyncTaskExecutor 的 bean 定义,导致注入冲突。解决这个问题有几种方法:

使用 @Qualifier 注解

使用 @Qualifier 注解指定要注入的具体 bean:

在你的 AccountCheckController 类中,将:

@Autowired
private AsyncTaskExecutor asyncTaskExecutor;

改为:

@Autowired
@Qualifier("applicationTaskExecutor")
private AsyncTaskExecutor asyncTaskExecutor;
定义 AsyncTaskExecutor

在你的配置类中明确定义一个 AsyncTaskExecutor bean:

@Configuration
public class AsyncConfig {
    @Bean
    public AsyncTaskExecutor asyncTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        executor.setThreadNamePrefix("MyAsync-");
        executor.initialize();
        return executor;
    }
}

然后在控制器中使用 @Qualifier 注入这个 bean:

@Autowired
@Qualifier("asyncTaskExecutor")
private AsyncTaskExecutor asyncTaskExecutor;
禁用自动配置

如果你不需要自定义 AsyncTaskExecutor,可以在 application.properties 或 application.yml 中禁用自动配置:

spring.task.execution.pool.enabled=false
spring.task.scheduling.pool.enabled=false
使用 @Primary 注解

使用 @Primary 注解标记首选的 bean:

在你的配置类中:

@Configuration
public class AsyncConfig {
    @Bean
    @Primary
    public AsyncTaskExecutor asyncTaskExecutor() {
        // ... 配置和返回 AsyncTaskExecutor
    }
}
使用不同的名称

如果你确实需要多个 AsyncTaskExecutor,可以考虑使用不同的名称:

@Configuration
public class AsyncConfig {
    @Bean
    public AsyncTaskExecutor myAsyncTaskExecutor() {
        // ... 配置和返回 AsyncTaskExecutor
    }
}

然后在控制器中:

@Autowired
@Qualifier("myAsyncTaskExecutor")
private AsyncTaskExecutor asyncTaskExecutor;

选择最适合你的项目结构和需求的方法。

通常,使用 @Qualifier 或定义一个明确的 bean 是最直接的解决方案。

如果你的项目中确实需要多个 AsyncTaskExecutor,确保它们有不同的名称,并在注入时使用 @Qualifier 指定正确的 bean。