设计背景
数据迁移涉及数万个数据Job多线程处理。
程序代码
// 被Spring管理的线程池 @Resource(name = "unionCollectOrderPool") ThreadPoolTaskExecutor threadPool; // 用于发送 coc请求的参数 private static ThreadLocal<UpdateVoucherReq> updateVoucherReqThreadLocal = ThreadLocal.withInitial(UpdateVoucherReq::new); // 创建DubboUploadRequest对象放置于Thread中 private static ThreadLocal<DubboUploadRequest> uploadRequestThreadLocal = ThreadLocal.withInitial(DubboUploadRequest::new); /** * 具体更新任务的细节 * * @param getListFunction 需要处理数据的函数 * @param updateAndValidFunction 调用接口(更新数据库)并获得函数是否成功 */ public void detailJob(Supplier<List<Map>> getListFunction, BiPredicate<String, Map> updateAndValidFunction) { // 1.通过Supplier 获取需要处理的数据 List<Map> listMap = getListFunction.get(); if (listMap == null) return; // 2.执行具体业务 下载、上传、更新数据库Url // 2.1.添加程序计数 LongAdder validAdder = new LongAdder(); LongAdder successAdder = new LongAdder(); LongAdder failAdder = new LongAdder(); listMap.forEach(fileUrlAndId -> { // 1.下载-首先判断是否有日志 ,使用指定的线程池 CompletableFuture<byte[]> downLoadStage = CompletableFuture.supplyAsync(() -> { byte[] fileByte; try { fileByte = hfsClient.getFile(fileUrlAndId.get("url").toString()); // 判断文件是否存在,若无抛出异常 Assert.notNull(fileByte, "文件不存在"); validAdder.increment(); } catch (Exception e) { log.error("Stage1-下载文件时异常。"); throw new Exception(); } return fileByte; }, threadPool); // 2.上传 CompletableFuture<String> uploadStage = downLoadStage.thenApplyAsync(downLoadResult -> { // 分割文件名[Cp8L4FxqSwaAP7qTAAEq3Ys0gxc以及后缀名[.jpg] String oldUrl = fileUrlAndId.get("url").toString(); int start = oldUrl.lastIndexOf("/"); int end = oldUrl.lastIndexOf("."); String fileName = oldUrl.substring(start 1, end); String fileSuffix = oldUrl.substring(end); // Supplier中获取 dubboUploadRequest(已设置的部分属性) DubboUploadRequest request = uploadRequestThreadLocal.get(); request.setFileName(fileName); request.setFileSuffix(fileSuffix); request.setFileBytes(downLoadResult); request.setFileLength(String.valueOf(downLoadResult.length)); // 请求FSG Dubbo接口 获得响应值Url UploadResponse response; try { response = uploadFacadeRT.upload(request); Assert.notNull(response.getUrl(), "新的Url为Null"); } catch (Exception e) { log.error("Stage2-上传文件时异常。"); throw new Exception(); } return response.getUrl(); }, threadPool); // 3.调用接口更新-异常处理 CompletableFuture<Boolean> updateUrlStage = uploadStage.thenApplyAsync(uploadResult -> { // BiFunction 获取具体处理的业务代码 boolean isSuccess = updateAndValidFunction.test(uploadResult, fileUrlAndId); Assert.isTrue(isSuccess, "Stage3-数据更新异常。"); successAdder.increment(); return true; }, threadPool).handleAsync((step3, throwable) -> { if (throwable != null) { // 失败次数 1,打印日志 failAdder.increment(); log.info("异常小结,Msg:{},ID:{},URL:{};Trace:{}", fileUrlAndId.get("id"), fileUrlAndId.get("url"), throwable.getMessage(), throwable.getStackTrace()); return false; } return true; }, threadPool); // 最长等待3s获取 最后一步结果 // [downLoadStage -> uploadStage -> updateUrlStage] try { updateUrlStage.get(60, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error("任务中断," e.getMessage(), e); } catch (ExecutionException e) { log.error("检索任务异常,ID:" fileUrlAndId.get("id") e.getMessage(), e); e.printStackTrace(); } catch (TimeoutException e) { log.error("执行任务超时,ID:" fileUrlAndId.get("id")); } }); // 2.3.输出日志 log.info("detailJob任务总结:{}条有效数据,成功{}条,失败{}次。", validAdder, successAdder, failAdder); }
具体使用方法
public void rnCocJob() {
log.info("COC处理开始");
detailJob(() -> {
List<Map> fileUrlAndIdList = new ArrayList<>();
int count = jobOfService.cocNeedUpdateCount();
if (count == 0) {
log.info("t_coc_voucher表内没有需要更新的数据");
return null;
}
// 分页获取t_coc_voucher的数据
int currentPage = 0;
while (count > 0) {
count = count - 1000;
fileUrlAndIdList.addAll(jobOfService.selectCocNeedUpdate(currentPage));
currentPage++;
}
return fileUrlAndIdList;
},
(uploadResult, fileUrlAndId) -> {
// 替换成具体的接口
UpdateVoucherReq req = updateVoucherReqThreadLocal.get();
req.setNewUploadUrl(uploadResult);
req.setOldUploadUrl(fileUrlAndId.get("url").toString());
req.setVoucherId((Long) fileUrlAndId.get("id"));
// FacadeUtils.assertFacadeIsSuccess 保证调用成功,一定是成功了
cocFacadeService.updateVoucher(req);
return true;
}
);
log.info("COC处理结束");
}
ThreadLocal的引入不必有多少条数据处理New 多少个Req对象
此类提供线程局部变量。这些变量不同于它们的正常对应变量,因为每个访问一个(通过它的get或set方法)的线程都有它自己的、独立初始化的变量副本。 ThreadLocal实例通常是希望将状态与线程相关联的类中的私有静态字段(例如,用户 ID 或事务 ID)。 例如,下面的类生成每个线程本地的唯一标识符。线程的 id 在第一次调用ThreadId.get()时被分配,并且在后续调用中保持不变。
CompletableFuture 还使用以下策略实现接口CompletionStage,then***方法 可将一个任务产分成多份。代码中。将下载-》上传-》更新 拆分成了三部分。方便日志的记录
CompletionStage可能是异步计算的一个阶段,它在另一个 CompletionStage 完成时执行一个操作或计算一个值。一个阶段在其计算终止时完成,但这可能反过来触发其他相关阶段。此接口中定义的功能仅采用几种基本形式,可扩展为更大的方法集以捕获一系列使用风格: 阶段执行的计算可以表示为 Function、Consumer 或 Runnable(分别使用名称包括apply 、 accept或run的方法),具体取决于它是否需要参数和/或产生结果。例如, stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println()) 。另一种形式( compose )应用阶段本身的功能,而不是它们的结果。 一个阶段的执行可以由单个阶段的完成触发,也可以由两个阶段的完成触发,或者两个阶段中的任一个触发。使用带有前缀then的方法排列单个阶段的依赖关系。由两个阶段完成触发的那些可以使用相应命名的方法组合它们的结果或效果。由两个阶段中的任何一个触发的那些不保证哪些结果或效果用于从属阶段的计算。 阶段之间的依赖关系控制计算的触发,但不保证任何特定的顺序。此外,新阶段计算的执行可以按以下三种方式中的任何一种安排:默认执行、默认异步执行(使用带有后缀async的方法,该方法采用阶段的默认异步执行工具)或自定义(通过提供的Executor )。默认和异步模式的执行属性由 CompletionStage 实现指定,而不是此接口。具有显式 Executor 参数的方法可能具有任意执行属性,甚至可能不支持并发执行,但以适应异步的方式安排处理。 两种方法形式支持处理触发阶段是正常完成还是异常完成:方法whenComplete允许注入动作而不管结果如何,否则在完成时保留结果。方法handle还允许阶段计算替换结果,该结果可以允许其他相关阶段进行进一步处理。在所有其他情况下,如果某个阶段的计算因(未经检查的)异常或错误而突然终止,则所有需要其完成的相关阶段也会异常完成,并且CompletionException将异常作为其原因。如果一个阶段依赖于两个阶段,并且都异常完成,那么 CompletionException 可能对应于这些异常中的任何一个。如果一个阶段依赖于其他两个阶段中的任何一个,并且其中只有一个异常完成,则无法保证依赖阶段是正常完成还是异常完成。在方法whenComplete的情况下,当提供的操作本身遇到异常时,如果尚未异常完成,则阶段异常完成此异常。 所有方法都遵循上述触发、执行和异常完成规范(在单个方法规范中不再重复)。此外,虽然用于为接受它们的方法传递完成结果(即,对于T类型的参数)的参数可能为 null,但为任何其他参数传递 null 值将导致引发NullPointerException 。 该接口未定义初始创建、正常或异常强制完成、探测完成状态或结果或等待阶段完成的方法。 CompletionStage 的实现可以酌情提供实现这种效果的方法。方法toCompletableFuture通过提供一个通用的转换类型来实现此接口的不同实现之间的互操作性。
使用了CompletionStage<U> handleAsync()方法对以上三步进行异常总结
为了记录成功、失败、异常的次数使用了atomic 下的 LongAdder保证线程安全