From f624ed8dafa130baa6a58afb4fbed19b1fa4f91a Mon Sep 17 00:00:00 2001 From: TanYibin <5491541@qq.com> Date: Wed, 10 May 2023 15:44:22 +0800 Subject: [PATCH] 修改异步方法使用线程池,解决线程不释放问题 --- huaheng-wms-core/src/main/java/org/jeecg/modules/wms/home/service/impl/HomePageViewServiceImpl.java | 8 ++++++-- huaheng-wms-core/src/main/java/org/jeecg/modules/wms/inventory/inventoryTransaction/service/IInventoryTransactionService.java | 2 -- huaheng-wms-core/src/main/java/org/jeecg/modules/wms/inventory/inventoryTransaction/service/impl/InventoryTransactionServiceImpl.java | 11 ----------- huaheng-wms-core/src/main/java/org/jeecg/modules/wms/task/taskHeader/service/impl/TaskHeaderServiceImpl.java | 4 ++-- huaheng-wms-core/src/main/java/org/jeecg/utils/aspect/ApiLoggerAspect.java | 219 ++++++--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- huaheng-wms-core/src/main/java/org/jeecg/utils/support/SyncConfiguration.java | 38 ++++++++++++++++++++++++++++++++++++++ huaheng-wms-core/src/main/java/org/jeecg/utils/support/SystemSync.java | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 117 insertions(+), 230 deletions(-) create mode 100644 huaheng-wms-core/src/main/java/org/jeecg/utils/support/SyncConfiguration.java create mode 100644 huaheng-wms-core/src/main/java/org/jeecg/utils/support/SystemSync.java diff --git a/huaheng-wms-core/src/main/java/org/jeecg/modules/wms/home/service/impl/HomePageViewServiceImpl.java b/huaheng-wms-core/src/main/java/org/jeecg/modules/wms/home/service/impl/HomePageViewServiceImpl.java index dd1e6a8..94e1653 100644 --- a/huaheng-wms-core/src/main/java/org/jeecg/modules/wms/home/service/impl/HomePageViewServiceImpl.java +++ b/huaheng-wms-core/src/main/java/org/jeecg/modules/wms/home/service/impl/HomePageViewServiceImpl.java @@ -40,6 +40,7 @@ import org.jeecg.modules.wms.task.taskHeader.service.impl.TaskHeaderServiceImpl; import org.jeecg.utils.HuahengJwtUtil; import org.jeecg.utils.constant.QuantityConstant; import org.jeecg.utils.support.Convert; +import org.jeecg.utils.support.SystemSync; import org.springframework.stereotype.Service; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -100,6 +101,9 @@ public class HomePageViewServiceImpl implements HomePageViewService { @Resource IInventoryTransactionService inventoryTransactionService; + + @Resource + SystemSync systemSync; @Override public Result<String> deliveringAmount(HttpServletRequest request) throws ParseException, InterruptedException, ExecutionException, TimeoutException { @@ -112,9 +116,9 @@ public class HomePageViewServiceImpl implements HomePageViewService { String startTime = simpleDateFormat.format(dateTime - (1000 * 60 * 60 * 24) * i); String endTime = simpleDateFormat.format(DateUtil.offsetDay(simpleDateFormat.parse(startTime), 1)); shipmentQuantityMap.put(startTime, - inventoryTransactionService.asyncCalculateInventory(startTime, endTime, QuantityConstant.INVENTORY_TRANSACTION_SHIPMENT)); + systemSync.asyncCalculateInventory(startTime, endTime, QuantityConstant.INVENTORY_TRANSACTION_SHIPMENT)); receiptQuantityMap.put(startTime, - inventoryTransactionService.asyncCalculateInventory(startTime, endTime, QuantityConstant.INVENTORY_TRANSACTION_RECEIPT)); + systemSync.asyncCalculateInventory(startTime, endTime, QuantityConstant.INVENTORY_TRANSACTION_RECEIPT)); } Option option = new Option(); option.tooltip().trigger(Trigger.axis).axisPointer().type(PointerType.cross); diff --git a/huaheng-wms-core/src/main/java/org/jeecg/modules/wms/inventory/inventoryTransaction/service/IInventoryTransactionService.java b/huaheng-wms-core/src/main/java/org/jeecg/modules/wms/inventory/inventoryTransaction/service/IInventoryTransactionService.java index 3803a6e..dcf393b 100644 --- a/huaheng-wms-core/src/main/java/org/jeecg/modules/wms/inventory/inventoryTransaction/service/IInventoryTransactionService.java +++ b/huaheng-wms-core/src/main/java/org/jeecg/modules/wms/inventory/inventoryTransaction/service/IInventoryTransactionService.java @@ -18,6 +18,4 @@ public interface IInventoryTransactionService extends IService<InventoryTransact boolean batchSave(List<InventoryTransaction> inventoryTransactionList); - Future<BigDecimal> asyncCalculateInventory(String startTime, String endTime, Integer type) throws ParseException; - } diff --git a/huaheng-wms-core/src/main/java/org/jeecg/modules/wms/inventory/inventoryTransaction/service/impl/InventoryTransactionServiceImpl.java b/huaheng-wms-core/src/main/java/org/jeecg/modules/wms/inventory/inventoryTransaction/service/impl/InventoryTransactionServiceImpl.java index 172a76d..835528a 100644 --- a/huaheng-wms-core/src/main/java/org/jeecg/modules/wms/inventory/inventoryTransaction/service/impl/InventoryTransactionServiceImpl.java +++ b/huaheng-wms-core/src/main/java/org/jeecg/modules/wms/inventory/inventoryTransaction/service/impl/InventoryTransactionServiceImpl.java @@ -29,15 +29,4 @@ public class InventoryTransactionServiceImpl extends ServiceImpl<InventoryTransa public boolean batchSave(List<InventoryTransaction> inventoryTransactionList) { return this.saveBatch(inventoryTransactionList); } - - @Async - @Override - public Future<BigDecimal> asyncCalculateInventory(String startTime, String endTime, Integer type) { - LambdaQueryWrapper<InventoryTransaction> inventoryTransactionLambdaQueryWrapper = Wrappers.lambdaQuery(); - inventoryTransactionLambdaQueryWrapper.select(InventoryTransaction::getCreateTime, InventoryTransaction::getQty, InventoryTransaction::getType) - .ge(InventoryTransaction::getCreateTime, startTime).lt(InventoryTransaction::getCreateTime, endTime).eq(InventoryTransaction::getType, type); - List<InventoryTransaction> inventoryTransactions = list(inventoryTransactionLambdaQueryWrapper); - BigDecimal shipmentResult = inventoryTransactions.stream().map(InventoryTransaction::getQty).reduce(BigDecimal.ZERO, BigDecimal::add); - return new AsyncResult<>(shipmentResult); - } } diff --git a/huaheng-wms-core/src/main/java/org/jeecg/modules/wms/task/taskHeader/service/impl/TaskHeaderServiceImpl.java b/huaheng-wms-core/src/main/java/org/jeecg/modules/wms/task/taskHeader/service/impl/TaskHeaderServiceImpl.java index 4efe245..4a32923 100644 --- a/huaheng-wms-core/src/main/java/org/jeecg/modules/wms/task/taskHeader/service/impl/TaskHeaderServiceImpl.java +++ b/huaheng-wms-core/src/main/java/org/jeecg/modules/wms/task/taskHeader/service/impl/TaskHeaderServiceImpl.java @@ -831,10 +831,10 @@ public class TaskHeaderServiceImpl extends ServiceImpl<TaskHeaderMapper, TaskHea public Result completeTaskByWMS(Integer taskId) { TaskHeader taskHeader = getById(taskId); if (taskHeader == null) { - return Result.error("任务" + taskId + "未找到,执行中止"); + return Result.error("任务 " + taskId + " 未找到,执行中止"); } if (taskHeader.getStatus().equals(QuantityConstant.TASK_STATUS_COMPLETED)) { - return Result.ok("任务(" + taskId + ")任务已经是完成的!"); + return Result.ok("任务 " + taskId + " 任务已完成!"); } int taskType = taskHeader.getTaskType().intValue(); if (taskType != QuantityConstant.TASK_TYPE_OVER_STATION) { diff --git a/huaheng-wms-core/src/main/java/org/jeecg/utils/aspect/ApiLoggerAspect.java b/huaheng-wms-core/src/main/java/org/jeecg/utils/aspect/ApiLoggerAspect.java index 4248cd5..aae68cb 100644 --- a/huaheng-wms-core/src/main/java/org/jeecg/utils/aspect/ApiLoggerAspect.java +++ b/huaheng-wms-core/src/main/java/org/jeecg/utils/aspect/ApiLoggerAspect.java @@ -23,17 +23,13 @@ import org.aspectj.lang.reflect.MethodSignature; import org.jeecg.JeecgSystemApplication; import org.jeecg.common.api.vo.Result; import org.jeecg.modules.wms.config.address.entity.Address; -import org.jeecg.modules.wms.config.address.service.IAddressService; +import org.jeecg.modules.wms.config.address.service.impl.AddressServiceImpl; import org.jeecg.modules.wms.monitor.apiLog.entity.ApiLog; -import org.jeecg.modules.wms.monitor.apiLog.service.IApiLogService; import org.jeecg.utils.ServletUtils; import org.jeecg.utils.SpringUtils; import org.jeecg.utils.StringUtils; -import org.jeecg.utils.config.ApplicationConfig; -import org.jeecg.utils.constant.QuantityConstant; import org.jeecg.utils.support.ApiLogger; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Async; +import org.jeecg.utils.support.SystemSync; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.stereotype.Component; @@ -55,23 +51,6 @@ import okhttp3.Response; public class ApiLoggerAspect { public static final String HUAHENG_SYSTEM_NAME = "HUAHENG_WMS4"; - - private static IApiLogService apiLogService; - - private static IAddressService addressService; - - @Autowired - private ApplicationConfig applicationConfig; - - @Autowired - public void setApiLogService(IApiLogService apiLogService) { - ApiLoggerAspect.apiLogService = apiLogService; - } - - @Autowired - public void setAddressService(IAddressService addressService) { - ApiLoggerAspect.addressService = addressService; - } // 配置织入点 @Pointcut("@annotation(org.jeecg.utils.support.ApiLogger)") @@ -100,151 +79,10 @@ public class ApiLoggerAspect { return ret; } -// /** 记录WMS调用第三方接口的日志 **/ -// private Object aroundWms2XXX(ProceedingJoinPoint point, ApiLogger apiLogger) { -// Object ret = null; -// ApiLog log = new ApiLog(); -// -// HttpURLConnection connection = null; -// String body = null; -// -// try { -// connection = (HttpURLConnection)point.getArgs()[0]; -// body = (String)point.getArgs()[1]; -// initApiLog(connection, body); -// ret = point.proceed(); -// } catch (Throwable e) { -// setApiLogThrowable(log, e); -// ret = Result.error(e.getMessage()); -// } finally { -// if (ret != null) { -// finishApiLog(log, connection, ret.toString()); -// } -// } -// return ret; -// } - -// /** -// * 记录响应头信息,保存日志到数据库 -// */ -// public static void finishApiLog(ApiLog log, HttpURLConnection connection, String body) { -// try { -// log.setResponseBody(body); -// log.setResponseTime(new Date()); -// Long duration = log.getResponseTime().getTime() - log.getRequestTime().getTime(); -// log.setDuration(duration.intValue()); -// log.setHttpCode(connection.getResponseCode()); -// -// // 响应头 -// Set<String> keyset = connection.getHeaderFields().keySet(); -// ArrayList<String> headerList = new ArrayList<>(); -// Iterator<String> it = keyset.iterator(); -// while (it.hasNext()) { -// String name = it.next(); -// String header = connection.getHeaderField(name); -// if (name == null || "".equals(name)) { -// // 第一行没有name -// // HTTP/1.1 200 OK -// headerList.add(header); -// } else { -// headerList.add(name + ": " + header); -// } -// } -// log.setResponseHeader(org.apache.commons.lang3.StringUtils.join(headerList, "\n")); -// Result json = JSON.parseObject(body, Result.class); -// log.setRetCode(json.getCode()); -// } catch (Exception e) { -// e.printStackTrace(); -// } finally { -// SpringUtils.getBean(ApiLogAspect.class).saveApiLog(log); -// } -// } - -// /** -// * 根据url,从address表中判断调用的去向 -// */ -// public static void parseUrl(ApiLog log, URL url, String warehouseCode) { -// try { -// String[] spList = url.toString().split("/"); -// String apiName = spList[spList.length - 1]; -// String ip = JeecgSystemApplication.getLocalHostExactAddress().getHostAddress(); -// Address address = addressService.getAddressByUrl(url.toString(), warehouseCode); -// log.setApiName(apiName); -// log.setRequestFrom("WMS"); -// log.setIp(ip); -// log.setResponseBy(address.getParam().toUpperCase()); -// } catch (Exception e) { -// e.printStackTrace(); -// } -// } - -// /** -// * 记录WMS调用第三方系统接口的请求信息 -// * 在HttpUtils.body POST方法中直接调用本类static方法 -// **/ -// public static ApiLog initApiLog(String Method, String urlStr, String body, HttpHeaders headers, String warehouseCode) { -// ApiLog log = new ApiLog(); -// try { -// URL url = new URL(urlStr); -// log.setApiMethod(Method); -// log.setUrl(urlStr); -// log.setRequestTime(new Date()); -// parseUrl(log, url, warehouseCode); -// -// // 请求头 -// Set<String> keySet = headers.keySet(); -// ArrayList<String> headerList = new ArrayList<>(); -// Iterator<String> it = keySet.iterator(); -// while (it.hasNext()) { -// String name = it.next(); -// String header = String.valueOf(headers.getContentType()); -// headerList.add(name + ": " + header); -// } -// -// log.setRequestHeader(org.apache.commons.lang3.StringUtils.join(headerList, "\n")); -// log.setRequestBody(body); -// } catch (Exception e) { -// e.printStackTrace(); -// } -// -// return log; -// } - -// /** -// * 记录WMS调用第三方系统接口的请求信息 -// * 在HttpUtils.body POST方法中直接调用本类static方法 -// **/ -// public static ApiLog initApiLog(HttpURLConnection connection, String body) { -// ApiLog log = new ApiLog(); -// try { -// log.setApiMethod(connection.getRequestMethod()); -// log.setUrl(connection.getURL().toString()); -// log.setRequestTime(new Date()); -// parseUrl(log, connection.getURL()); -// -// // 请求头 -// Set<String> keySet = connection.getRequestProperties().keySet(); -// ArrayList<String> headerList = new ArrayList<>(); -// Iterator<String> it = keySet.iterator(); -// while (it.hasNext()) { -// String name = it.next(); -// String header = connection.getRequestProperty(name); -// headerList.add(name + ":" + header); -// } -// -// log.setRequestHeader(org.apache.commons.lang3.StringUtils.join(headerList, "\n")); -// log.setRequestBody(body); -// } catch (Exception e) { -// e.printStackTrace(); -// } -// -// return log; -// } - /** * 记录WMS调用第三方系统接口的请求信息 * 在OKHttpUtils.bodypost方法中直接调用本类static方法 - * @param apiLog + * @param apiLog **/ public static void initApiLog(ApiLog apiLog, Request request, String body) { try { @@ -289,7 +127,7 @@ public class ApiLoggerAspect { } catch (Exception e) { e.printStackTrace(); } - SpringUtils.getBean(ApiLoggerAspect.class).saveApiLog(log); + SpringUtils.getBean(SystemSync.class).saveApiLog(log); } } @@ -301,7 +139,7 @@ public class ApiLoggerAspect { apiLog.setApiName(apiName); apiLog.setRequestFrom(HUAHENG_SYSTEM_NAME); apiLog.setIp(JeecgSystemApplication.getLocalHostExactAddress().getHostAddress()); - Address address = addressService.getAddressByUrl(url.toString()); + Address address = SpringUtils.getBean(AddressServiceImpl.class).getAddressByUrl(url.toString()); if (address != null) { apiLog.setApiName(address.getRemark()); apiLog.setResponseBy(address.getParam().toUpperCase()); @@ -311,41 +149,6 @@ public class ApiLoggerAspect { } } -// /** -// * 记录响应头信息 -// **/ -// public static void finishApiLog(ApiLog log, HttpHeaders headers, String body) { -// try { -// log.setResponseBody(body); -// log.setResponseTime(new Date()); -// Long duration = log.getResponseTime().getTime() - log.getRequestTime().getTime(); -// log.setDuration(duration.intValue()); -// log.setHttpCode(200); -// -// // 响应头 -// Set<String> keyset = headers.keySet(); -// ArrayList<String> headerList = new ArrayList<>(); -// Iterator<String> it = keyset.iterator(); -// while (it.hasNext()) { -// String name = it.next(); -// String header = String.valueOf(headers.getContentType()); -// if (name == null || "".equals(name)) -// // 第一行没有name -// // HTTP/1.1 200 OK -// headerList.add(header); -// else -// headerList.add(name + ": " + header); -// } -// log.setResponseHeader(org.apache.commons.lang3.StringUtils.join(headerList, "\n")); -// Result json = JSON.parseObject(body, Result.class); -// log.setRetCode(json.getCode()); -// } catch (Exception e) { -// e.printStackTrace(); -// } finally { -// SpringUtils.getBean(ApiLogAspect.class).saveApiLog(log); -// } -// } - private ApiLog initApiLog(ApiLogger apiLogger, ProceedingJoinPoint point) { ApiLog log = new ApiLog(); try { @@ -398,7 +201,7 @@ public class ApiLoggerAspect { } catch (Exception e) { e.printStackTrace(); } finally { - saveApiLog(log); + SpringUtils.getBean(SystemSync.class).saveApiLog(log); } } @@ -474,7 +277,6 @@ public class ApiLoggerAspect { Object[] args = point.getArgs(); for (int i = 0; i < m.getParameterNames().length; i++) { String name = m.getParameterNames()[i]; -// Class type = m.getParameterTypes()[i]; if (args[i] instanceof HttpServletRequest) { continue; } @@ -522,15 +324,6 @@ public class ApiLoggerAspect { } } - @Async - public void saveApiLog(ApiLog log) { - try { - apiLogService.saveOrUpdate(log); - } catch (Exception e) { - e.printStackTrace(); - } - } - public static String getIpAddr(HttpServletRequest request) { String ipAddress = request.getHeader("x-forwarded-for"); if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { diff --git a/huaheng-wms-core/src/main/java/org/jeecg/utils/support/SyncConfiguration.java b/huaheng-wms-core/src/main/java/org/jeecg/utils/support/SyncConfiguration.java new file mode 100644 index 0000000..3fd6f82 --- /dev/null +++ b/huaheng-wms-core/src/main/java/org/jeecg/utils/support/SyncConfiguration.java @@ -0,0 +1,38 @@ +package org.jeecg.utils.support; + +import java.util.concurrent.ThreadPoolExecutor; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@Configuration +@EnableAsync +public class SyncConfiguration { + @Bean(name = "asyncPoolTaskExecutor") + public ThreadPoolTaskExecutor executor() { + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + // 核心线程数 + taskExecutor.setCorePoolSize(10); + // 线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程 + taskExecutor.setMaxPoolSize(100); + // 缓存队列 + taskExecutor.setQueueCapacity(50); + // 许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁 + taskExecutor.setKeepAliveSeconds(200); + // 异步方法内部线程名称 + taskExecutor.setThreadNamePrefix("async-"); + /** + * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略 + * 通常有以下四种策略: + * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 + * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 + * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) + * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功 + */ + taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + taskExecutor.initialize(); + return taskExecutor; + } +} \ No newline at end of file diff --git a/huaheng-wms-core/src/main/java/org/jeecg/utils/support/SystemSync.java b/huaheng-wms-core/src/main/java/org/jeecg/utils/support/SystemSync.java new file mode 100644 index 0000000..73039f4 --- /dev/null +++ b/huaheng-wms-core/src/main/java/org/jeecg/utils/support/SystemSync.java @@ -0,0 +1,65 @@ +package org.jeecg.utils.support; + +import java.math.BigDecimal; +import java.util.List; +import java.util.concurrent.Future; + +import org.jeecg.modules.wms.inventory.inventoryTransaction.entity.InventoryTransaction; +import org.jeecg.modules.wms.inventory.inventoryTransaction.service.IInventoryTransactionService; +import org.jeecg.modules.wms.monitor.apiLog.entity.ApiLog; +import org.jeecg.modules.wms.monitor.apiLog.service.IApiLogService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.AsyncResult; +import org.springframework.stereotype.Component; + +import com.alibaba.fastjson.JSON; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +public class SystemSync { + + @Autowired + private IApiLogService apiLogService; + + @Autowired + private IInventoryTransactionService inventoryTransactionService; + + /** + * 保存API调用日志 + * @author TanYibin + * @createDate 2023年5月9日 + * @param apiLog + */ + @Async("asyncPoolTaskExecutor") + public void saveApiLog(ApiLog apiLog) { + try { + apiLogService.saveOrUpdate(apiLog); + } catch (Exception e) { + log.error("saveApiLog 执行异常,apiLog:{}", JSON.toJSONString(apiLog), e); + } + } + + /** + * 异步库存信息查询 + * @author TanYibin + * @createDate 2023年5月9日 + * @param startTime + * @param endTime + * @param type + * @return + */ + @Async("asyncPoolTaskExecutor") + public Future<BigDecimal> asyncCalculateInventory(String startTime, String endTime, Integer type) { + LambdaQueryWrapper<InventoryTransaction> inventoryTransactionLambdaQueryWrapper = Wrappers.lambdaQuery(); + inventoryTransactionLambdaQueryWrapper.select(InventoryTransaction::getCreateTime, InventoryTransaction::getQty, InventoryTransaction::getType) + .ge(InventoryTransaction::getCreateTime, startTime).lt(InventoryTransaction::getCreateTime, endTime).eq(InventoryTransaction::getType, type); + List<InventoryTransaction> inventoryTransactions = inventoryTransactionService.list(inventoryTransactionLambdaQueryWrapper); + BigDecimal shipmentResult = inventoryTransactions.stream().map(InventoryTransaction::getQty).reduce(BigDecimal.ZERO, BigDecimal::add); + return new AsyncResult<>(shipmentResult); + } +} -- libgit2 0.22.2