Files
tkcrawl-client/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java

80 lines
3.0 KiB
Java
Raw Normal View History

2025-06-11 14:45:40 +08:00
package com.yupi.springbootinit.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import com.yupi.springbootinit.mapper.NewHostsMapper;
import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.service.HostInfoService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StopWatch;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/*
* @author: ziin
* @date: 2025/6/10 19:04
*/
@Service
@Slf4j
@Transactional(rollbackFor = Exception.class)
public class HostInfoServiceImpl extends ServiceImpl<NewHostsMapper, NewHosts> implements HostInfoService {
@Override
@Async("taskExecutor")
public CompletableFuture<Void> saveHostInfo(List<NewHosts> newHosts) {
try {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
saveBatch(newHosts);
stopWatch.stop();
long totalTimeMillis = stopWatch.getTotalTimeMillis();
log.info(" 存储花费: {}ms", totalTimeMillis);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
// 将异常包装到Future使调用方能处理
return CompletableFuture.failedFuture(e); // Java9+
}
}
// public void processHosts(List<NewHosts> hosts) {
// List<CompletableFuture<Void>> futures = new ArrayList<>();
// // 分片提交(避免单批次过大)
// Lists.partition(hosts, 1500).forEach(batch -> {
// CompletableFuture<Void> future = this.saveHostInfo(batch);
// futures.add(future);
// });
// CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
// .whenComplete((result, ex) -> {
// if (ex != null) {
// log.error("部分批次处理失败", ex);
// } else {
// log.info("所有批次处理完成");
// }
// // 这里可以触发其他业务逻辑(如发送通知)
// });
// }
public CompletableFuture<Void> processHosts(List<NewHosts> hosts) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
// 分片提交(避免单批次过大)
Lists.partition(hosts, 1500).forEach(batch -> {
CompletableFuture<Void> future = this.saveHostInfo(batch);
futures.add(future);
});
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("部分批次处理失败", ex);
} else {
log.info("所有批次处理完成");
}
});
}
}