Commit f624ed8dafa130baa6a58afb4fbed19b1fa4f91a

Authored by 谭毅彬
1 parent 97ea51e6

修改异步方法使用线程池,解决线程不释放问题

Signed-off-by: TanYibin <5491541@qq.com>
huaheng-wms-core/src/main/java/org/jeecg/modules/wms/home/service/impl/HomePageViewServiceImpl.java
... ... @@ -40,6 +40,7 @@ import org.jeecg.modules.wms.task.taskHeader.service.impl.TaskHeaderServiceImpl;
40 40 import org.jeecg.utils.HuahengJwtUtil;
41 41 import org.jeecg.utils.constant.QuantityConstant;
42 42 import org.jeecg.utils.support.Convert;
  43 +import org.jeecg.utils.support.SystemSync;
43 44 import org.springframework.stereotype.Service;
44 45  
45 46 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
... ... @@ -100,6 +101,9 @@ public class HomePageViewServiceImpl implements HomePageViewService {
100 101  
101 102 @Resource
102 103 IInventoryTransactionService inventoryTransactionService;
  104 +
  105 + @Resource
  106 + SystemSync systemSync;
103 107  
104 108 @Override
105 109 public Result<String> deliveringAmount(HttpServletRequest request) throws ParseException, InterruptedException, ExecutionException, TimeoutException {
... ... @@ -112,9 +116,9 @@ public class HomePageViewServiceImpl implements HomePageViewService {
112 116 String startTime = simpleDateFormat.format(dateTime - (1000 * 60 * 60 * 24) * i);
113 117 String endTime = simpleDateFormat.format(DateUtil.offsetDay(simpleDateFormat.parse(startTime), 1));
114 118 shipmentQuantityMap.put(startTime,
115   - inventoryTransactionService.asyncCalculateInventory(startTime, endTime, QuantityConstant.INVENTORY_TRANSACTION_SHIPMENT));
  119 + systemSync.asyncCalculateInventory(startTime, endTime, QuantityConstant.INVENTORY_TRANSACTION_SHIPMENT));
116 120 receiptQuantityMap.put(startTime,
117   - inventoryTransactionService.asyncCalculateInventory(startTime, endTime, QuantityConstant.INVENTORY_TRANSACTION_RECEIPT));
  121 + systemSync.asyncCalculateInventory(startTime, endTime, QuantityConstant.INVENTORY_TRANSACTION_RECEIPT));
118 122 }
119 123 Option option = new Option();
120 124 option.tooltip().trigger(Trigger.axis).axisPointer().type(PointerType.cross);
... ...
huaheng-wms-core/src/main/java/org/jeecg/modules/wms/inventory/inventoryTransaction/service/IInventoryTransactionService.java
... ... @@ -18,6 +18,4 @@ public interface IInventoryTransactionService extends IService&lt;InventoryTransact
18 18  
19 19 boolean batchSave(List<InventoryTransaction> inventoryTransactionList);
20 20  
21   - Future<BigDecimal> asyncCalculateInventory(String startTime, String endTime, Integer type) throws ParseException;
22   -
23 21 }
... ...
huaheng-wms-core/src/main/java/org/jeecg/modules/wms/inventory/inventoryTransaction/service/impl/InventoryTransactionServiceImpl.java
... ... @@ -29,15 +29,4 @@ public class InventoryTransactionServiceImpl extends ServiceImpl&lt;InventoryTransa
29 29 public boolean batchSave(List<InventoryTransaction> inventoryTransactionList) {
30 30 return this.saveBatch(inventoryTransactionList);
31 31 }
32   -
33   - @Async
34   - @Override
35   - public Future<BigDecimal> asyncCalculateInventory(String startTime, String endTime, Integer type) {
36   - LambdaQueryWrapper<InventoryTransaction> inventoryTransactionLambdaQueryWrapper = Wrappers.lambdaQuery();
37   - inventoryTransactionLambdaQueryWrapper.select(InventoryTransaction::getCreateTime, InventoryTransaction::getQty, InventoryTransaction::getType)
38   - .ge(InventoryTransaction::getCreateTime, startTime).lt(InventoryTransaction::getCreateTime, endTime).eq(InventoryTransaction::getType, type);
39   - List<InventoryTransaction> inventoryTransactions = list(inventoryTransactionLambdaQueryWrapper);
40   - BigDecimal shipmentResult = inventoryTransactions.stream().map(InventoryTransaction::getQty).reduce(BigDecimal.ZERO, BigDecimal::add);
41   - return new AsyncResult<>(shipmentResult);
42   - }
43 32 }
... ...
huaheng-wms-core/src/main/java/org/jeecg/modules/wms/task/taskHeader/service/impl/TaskHeaderServiceImpl.java
... ... @@ -831,10 +831,10 @@ public class TaskHeaderServiceImpl extends ServiceImpl&lt;TaskHeaderMapper, TaskHea
831 831 public Result completeTaskByWMS(Integer taskId) {
832 832 TaskHeader taskHeader = getById(taskId);
833 833 if (taskHeader == null) {
834   - return Result.error("任务" + taskId + "未找到,执行中止");
  834 + return Result.error("任务 " + taskId + " 未找到,执行中止");
835 835 }
836 836 if (taskHeader.getStatus().equals(QuantityConstant.TASK_STATUS_COMPLETED)) {
837   - return Result.ok("任务(" + taskId + ")任务已经是完成的!");
  837 + return Result.ok("任务 " + taskId + " 任务已完成!");
838 838 }
839 839 int taskType = taskHeader.getTaskType().intValue();
840 840 if (taskType != QuantityConstant.TASK_TYPE_OVER_STATION) {
... ...
huaheng-wms-core/src/main/java/org/jeecg/utils/aspect/ApiLoggerAspect.java
... ... @@ -23,17 +23,13 @@ import org.aspectj.lang.reflect.MethodSignature;
23 23 import org.jeecg.JeecgSystemApplication;
24 24 import org.jeecg.common.api.vo.Result;
25 25 import org.jeecg.modules.wms.config.address.entity.Address;
26   -import org.jeecg.modules.wms.config.address.service.IAddressService;
  26 +import org.jeecg.modules.wms.config.address.service.impl.AddressServiceImpl;
27 27 import org.jeecg.modules.wms.monitor.apiLog.entity.ApiLog;
28   -import org.jeecg.modules.wms.monitor.apiLog.service.IApiLogService;
29 28 import org.jeecg.utils.ServletUtils;
30 29 import org.jeecg.utils.SpringUtils;
31 30 import org.jeecg.utils.StringUtils;
32   -import org.jeecg.utils.config.ApplicationConfig;
33   -import org.jeecg.utils.constant.QuantityConstant;
34 31 import org.jeecg.utils.support.ApiLogger;
35   -import org.springframework.beans.factory.annotation.Autowired;
36   -import org.springframework.scheduling.annotation.Async;
  32 +import org.jeecg.utils.support.SystemSync;
37 33 import org.springframework.scheduling.annotation.EnableAsync;
38 34 import org.springframework.stereotype.Component;
39 35  
... ... @@ -55,23 +51,6 @@ import okhttp3.Response;
55 51 public class ApiLoggerAspect {
56 52  
57 53 public static final String HUAHENG_SYSTEM_NAME = "HUAHENG_WMS4";
58   -
59   - private static IApiLogService apiLogService;
60   -
61   - private static IAddressService addressService;
62   -
63   - @Autowired
64   - private ApplicationConfig applicationConfig;
65   -
66   - @Autowired
67   - public void setApiLogService(IApiLogService apiLogService) {
68   - ApiLoggerAspect.apiLogService = apiLogService;
69   - }
70   -
71   - @Autowired
72   - public void setAddressService(IAddressService addressService) {
73   - ApiLoggerAspect.addressService = addressService;
74   - }
75 54  
76 55 // 配置织入点
77 56 @Pointcut("@annotation(org.jeecg.utils.support.ApiLogger)")
... ... @@ -100,151 +79,10 @@ public class ApiLoggerAspect {
100 79 return ret;
101 80 }
102 81  
103   -// /** 记录WMS调用第三方接口的日志 **/
104   -// private Object aroundWms2XXX(ProceedingJoinPoint point, ApiLogger apiLogger) {
105   -// Object ret = null;
106   -// ApiLog log = new ApiLog();
107   -//
108   -// HttpURLConnection connection = null;
109   -// String body = null;
110   -//
111   -// try {
112   -// connection = (HttpURLConnection)point.getArgs()[0];
113   -// body = (String)point.getArgs()[1];
114   -// initApiLog(connection, body);
115   -// ret = point.proceed();
116   -// } catch (Throwable e) {
117   -// setApiLogThrowable(log, e);
118   -// ret = Result.error(e.getMessage());
119   -// } finally {
120   -// if (ret != null) {
121   -// finishApiLog(log, connection, ret.toString());
122   -// }
123   -// }
124   -// return ret;
125   -// }
126   -
127   -// /**
128   -// * 记录响应头信息,保存日志到数据库
129   -// */
130   -// public static void finishApiLog(ApiLog log, HttpURLConnection connection, String body) {
131   -// try {
132   -// log.setResponseBody(body);
133   -// log.setResponseTime(new Date());
134   -// Long duration = log.getResponseTime().getTime() - log.getRequestTime().getTime();
135   -// log.setDuration(duration.intValue());
136   -// log.setHttpCode(connection.getResponseCode());
137   -//
138   -// // 响应头
139   -// Set<String> keyset = connection.getHeaderFields().keySet();
140   -// ArrayList<String> headerList = new ArrayList<>();
141   -// Iterator<String> it = keyset.iterator();
142   -// while (it.hasNext()) {
143   -// String name = it.next();
144   -// String header = connection.getHeaderField(name);
145   -// if (name == null || "".equals(name)) {
146   -// // 第一行没有name
147   -// // HTTP/1.1 200 OK
148   -// headerList.add(header);
149   -// } else {
150   -// headerList.add(name + ": " + header);
151   -// }
152   -// }
153   -// log.setResponseHeader(org.apache.commons.lang3.StringUtils.join(headerList, "\n"));
154   -// Result json = JSON.parseObject(body, Result.class);
155   -// log.setRetCode(json.getCode());
156   -// } catch (Exception e) {
157   -// e.printStackTrace();
158   -// } finally {
159   -// SpringUtils.getBean(ApiLogAspect.class).saveApiLog(log);
160   -// }
161   -// }
162   -
163   -// /**
164   -// * 根据url,从address表中判断调用的去向
165   -// */
166   -// public static void parseUrl(ApiLog log, URL url, String warehouseCode) {
167   -// try {
168   -// String[] spList = url.toString().split("/");
169   -// String apiName = spList[spList.length - 1];
170   -// String ip = JeecgSystemApplication.getLocalHostExactAddress().getHostAddress();
171   -// Address address = addressService.getAddressByUrl(url.toString(), warehouseCode);
172   -// log.setApiName(apiName);
173   -// log.setRequestFrom("WMS");
174   -// log.setIp(ip);
175   -// log.setResponseBy(address.getParam().toUpperCase());
176   -// } catch (Exception e) {
177   -// e.printStackTrace();
178   -// }
179   -// }
180   -
181   -// /**
182   -// * 记录WMS调用第三方系统接口的请求信息
183   -// * 在HttpUtils.body POST方法中直接调用本类static方法
184   -// **/
185   -// public static ApiLog initApiLog(String Method, String urlStr, String body, HttpHeaders headers, String warehouseCode) {
186   -// ApiLog log = new ApiLog();
187   -// try {
188   -// URL url = new URL(urlStr);
189   -// log.setApiMethod(Method);
190   -// log.setUrl(urlStr);
191   -// log.setRequestTime(new Date());
192   -// parseUrl(log, url, warehouseCode);
193   -//
194   -// // 请求头
195   -// Set<String> keySet = headers.keySet();
196   -// ArrayList<String> headerList = new ArrayList<>();
197   -// Iterator<String> it = keySet.iterator();
198   -// while (it.hasNext()) {
199   -// String name = it.next();
200   -// String header = String.valueOf(headers.getContentType());
201   -// headerList.add(name + ": " + header);
202   -// }
203   -//
204   -// log.setRequestHeader(org.apache.commons.lang3.StringUtils.join(headerList, "\n"));
205   -// log.setRequestBody(body);
206   -// } catch (Exception e) {
207   -// e.printStackTrace();
208   -// }
209   -//
210   -// return log;
211   -// }
212   -
213   -// /**
214   -// * 记录WMS调用第三方系统接口的请求信息
215   -// * 在HttpUtils.body POST方法中直接调用本类static方法
216   -// **/
217   -// public static ApiLog initApiLog(HttpURLConnection connection, String body) {
218   -// ApiLog log = new ApiLog();
219   -// try {
220   -// log.setApiMethod(connection.getRequestMethod());
221   -// log.setUrl(connection.getURL().toString());
222   -// log.setRequestTime(new Date());
223   -// parseUrl(log, connection.getURL());
224   -//
225   -// // 请求头
226   -// Set<String> keySet = connection.getRequestProperties().keySet();
227   -// ArrayList<String> headerList = new ArrayList<>();
228   -// Iterator<String> it = keySet.iterator();
229   -// while (it.hasNext()) {
230   -// String name = it.next();
231   -// String header = connection.getRequestProperty(name);
232   -// headerList.add(name + ":" + header);
233   -// }
234   -//
235   -// log.setRequestHeader(org.apache.commons.lang3.StringUtils.join(headerList, "\n"));
236   -// log.setRequestBody(body);
237   -// } catch (Exception e) {
238   -// e.printStackTrace();
239   -// }
240   -//
241   -// return log;
242   -// }
243   -
244 82 /**
245 83 * 记录WMS调用第三方系统接口的请求信息
246 84 * 在OKHttpUtils.bodypost方法中直接调用本类static方法
247   - * @param apiLog
  85 + * @param apiLog
248 86 **/
249 87 public static void initApiLog(ApiLog apiLog, Request request, String body) {
250 88 try {
... ... @@ -289,7 +127,7 @@ public class ApiLoggerAspect {
289 127 } catch (Exception e) {
290 128 e.printStackTrace();
291 129 }
292   - SpringUtils.getBean(ApiLoggerAspect.class).saveApiLog(log);
  130 + SpringUtils.getBean(SystemSync.class).saveApiLog(log);
293 131 }
294 132 }
295 133  
... ... @@ -301,7 +139,7 @@ public class ApiLoggerAspect {
301 139 apiLog.setApiName(apiName);
302 140 apiLog.setRequestFrom(HUAHENG_SYSTEM_NAME);
303 141 apiLog.setIp(JeecgSystemApplication.getLocalHostExactAddress().getHostAddress());
304   - Address address = addressService.getAddressByUrl(url.toString());
  142 + Address address = SpringUtils.getBean(AddressServiceImpl.class).getAddressByUrl(url.toString());
305 143 if (address != null) {
306 144 apiLog.setApiName(address.getRemark());
307 145 apiLog.setResponseBy(address.getParam().toUpperCase());
... ... @@ -311,41 +149,6 @@ public class ApiLoggerAspect {
311 149 }
312 150 }
313 151  
314   -// /**
315   -// * 记录响应头信息
316   -// **/
317   -// public static void finishApiLog(ApiLog log, HttpHeaders headers, String body) {
318   -// try {
319   -// log.setResponseBody(body);
320   -// log.setResponseTime(new Date());
321   -// Long duration = log.getResponseTime().getTime() - log.getRequestTime().getTime();
322   -// log.setDuration(duration.intValue());
323   -// log.setHttpCode(200);
324   -//
325   -// // 响应头
326   -// Set<String> keyset = headers.keySet();
327   -// ArrayList<String> headerList = new ArrayList<>();
328   -// Iterator<String> it = keyset.iterator();
329   -// while (it.hasNext()) {
330   -// String name = it.next();
331   -// String header = String.valueOf(headers.getContentType());
332   -// if (name == null || "".equals(name))
333   -// // 第一行没有name
334   -// // HTTP/1.1 200 OK
335   -// headerList.add(header);
336   -// else
337   -// headerList.add(name + ": " + header);
338   -// }
339   -// log.setResponseHeader(org.apache.commons.lang3.StringUtils.join(headerList, "\n"));
340   -// Result json = JSON.parseObject(body, Result.class);
341   -// log.setRetCode(json.getCode());
342   -// } catch (Exception e) {
343   -// e.printStackTrace();
344   -// } finally {
345   -// SpringUtils.getBean(ApiLogAspect.class).saveApiLog(log);
346   -// }
347   -// }
348   -
349 152 private ApiLog initApiLog(ApiLogger apiLogger, ProceedingJoinPoint point) {
350 153 ApiLog log = new ApiLog();
351 154 try {
... ... @@ -398,7 +201,7 @@ public class ApiLoggerAspect {
398 201 } catch (Exception e) {
399 202 e.printStackTrace();
400 203 } finally {
401   - saveApiLog(log);
  204 + SpringUtils.getBean(SystemSync.class).saveApiLog(log);
402 205 }
403 206 }
404 207  
... ... @@ -474,7 +277,6 @@ public class ApiLoggerAspect {
474 277 Object[] args = point.getArgs();
475 278 for (int i = 0; i < m.getParameterNames().length; i++) {
476 279 String name = m.getParameterNames()[i];
477   -// Class type = m.getParameterTypes()[i];
478 280 if (args[i] instanceof HttpServletRequest) {
479 281 continue;
480 282 }
... ... @@ -522,15 +324,6 @@ public class ApiLoggerAspect {
522 324 }
523 325 }
524 326  
525   - @Async
526   - public void saveApiLog(ApiLog log) {
527   - try {
528   - apiLogService.saveOrUpdate(log);
529   - } catch (Exception e) {
530   - e.printStackTrace();
531   - }
532   - }
533   -
534 327 public static String getIpAddr(HttpServletRequest request) {
535 328 String ipAddress = request.getHeader("x-forwarded-for");
536 329 if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
... ...
huaheng-wms-core/src/main/java/org/jeecg/utils/support/SyncConfiguration.java 0 → 100644
  1 +package org.jeecg.utils.support;
  2 +
  3 +import java.util.concurrent.ThreadPoolExecutor;
  4 +
  5 +import org.springframework.context.annotation.Bean;
  6 +import org.springframework.context.annotation.Configuration;
  7 +import org.springframework.scheduling.annotation.EnableAsync;
  8 +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  9 +
  10 +@Configuration
  11 +@EnableAsync
  12 +public class SyncConfiguration {
  13 + @Bean(name = "asyncPoolTaskExecutor")
  14 + public ThreadPoolTaskExecutor executor() {
  15 + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
  16 + // 核心线程数
  17 + taskExecutor.setCorePoolSize(10);
  18 + // 线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
  19 + taskExecutor.setMaxPoolSize(100);
  20 + // 缓存队列
  21 + taskExecutor.setQueueCapacity(50);
  22 + // 许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
  23 + taskExecutor.setKeepAliveSeconds(200);
  24 + // 异步方法内部线程名称
  25 + taskExecutor.setThreadNamePrefix("async-");
  26 + /**
  27 + * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
  28 + * 通常有以下四种策略:
  29 + * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
  30 + * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
  31 + * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  32 + * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
  33 + */
  34 + taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  35 + taskExecutor.initialize();
  36 + return taskExecutor;
  37 + }
  38 +}
0 39 \ No newline at end of file
... ...
huaheng-wms-core/src/main/java/org/jeecg/utils/support/SystemSync.java 0 → 100644
  1 +package org.jeecg.utils.support;
  2 +
  3 +import java.math.BigDecimal;
  4 +import java.util.List;
  5 +import java.util.concurrent.Future;
  6 +
  7 +import org.jeecg.modules.wms.inventory.inventoryTransaction.entity.InventoryTransaction;
  8 +import org.jeecg.modules.wms.inventory.inventoryTransaction.service.IInventoryTransactionService;
  9 +import org.jeecg.modules.wms.monitor.apiLog.entity.ApiLog;
  10 +import org.jeecg.modules.wms.monitor.apiLog.service.IApiLogService;
  11 +import org.springframework.beans.factory.annotation.Autowired;
  12 +import org.springframework.scheduling.annotation.Async;
  13 +import org.springframework.scheduling.annotation.AsyncResult;
  14 +import org.springframework.stereotype.Component;
  15 +
  16 +import com.alibaba.fastjson.JSON;
  17 +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  18 +import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  19 +
  20 +import lombok.extern.slf4j.Slf4j;
  21 +
  22 +@Slf4j
  23 +@Component
  24 +public class SystemSync {
  25 +
  26 + @Autowired
  27 + private IApiLogService apiLogService;
  28 +
  29 + @Autowired
  30 + private IInventoryTransactionService inventoryTransactionService;
  31 +
  32 + /**
  33 + * 保存API调用日志
  34 + * @author TanYibin
  35 + * @createDate 2023年5月9日
  36 + * @param apiLog
  37 + */
  38 + @Async("asyncPoolTaskExecutor")
  39 + public void saveApiLog(ApiLog apiLog) {
  40 + try {
  41 + apiLogService.saveOrUpdate(apiLog);
  42 + } catch (Exception e) {
  43 + log.error("saveApiLog 执行异常,apiLog:{}", JSON.toJSONString(apiLog), e);
  44 + }
  45 + }
  46 +
  47 + /**
  48 + * 异步库存信息查询
  49 + * @author TanYibin
  50 + * @createDate 2023年5月9日
  51 + * @param startTime
  52 + * @param endTime
  53 + * @param type
  54 + * @return
  55 + */
  56 + @Async("asyncPoolTaskExecutor")
  57 + public Future<BigDecimal> asyncCalculateInventory(String startTime, String endTime, Integer type) {
  58 + LambdaQueryWrapper<InventoryTransaction> inventoryTransactionLambdaQueryWrapper = Wrappers.lambdaQuery();
  59 + inventoryTransactionLambdaQueryWrapper.select(InventoryTransaction::getCreateTime, InventoryTransaction::getQty, InventoryTransaction::getType)
  60 + .ge(InventoryTransaction::getCreateTime, startTime).lt(InventoryTransaction::getCreateTime, endTime).eq(InventoryTransaction::getType, type);
  61 + List<InventoryTransaction> inventoryTransactions = inventoryTransactionService.list(inventoryTransactionLambdaQueryWrapper);
  62 + BigDecimal shipmentResult = inventoryTransactions.stream().map(InventoryTransaction::getQty).reduce(BigDecimal.ZERO, BigDecimal::add);
  63 + return new AsyncResult<>(shipmentResult);
  64 + }
  65 +}
... ...