Commit b0a6f36adef56bf5ed7b4a5e8d6f1e83dec81ac7

Authored by zhangdaiscott
1 parent ffe64fd4

依赖jar上传到maven官仓,简化使用

重写xxljob避免默认 9999端口冲突
jeecg-boot/jeecg-boot-base/jeecg-boot-base-core/pom.xml
... ... @@ -151,7 +151,7 @@
151 151 <version>${shiro.version}</version>
152 152 </dependency>
153 153 <dependency>
154   - <groupId>org.hibernate</groupId>
  154 + <groupId>org.jeecgframework.boot</groupId>
155 155 <artifactId>hibernate-re</artifactId>
156 156 <version>2.4.2</version>
157 157 </dependency>
... ...
jeecg-boot/jeecg-boot-module-system/pom.xml
... ... @@ -45,9 +45,9 @@
45 45 </dependency>
46 46 <!-- 积木报表 -->
47 47 <dependency>
48   - <groupId>com.jimureport</groupId>
  48 + <groupId>org.jeecgframework.jimureport</groupId>
49 49 <artifactId>spring-boot-starter-jimureport</artifactId>
50   - <version>1.1.09-beta</version>
  50 + <version>1.2.0</version>
51 51 <exclusions>
52 52 <exclusion>
53 53 <artifactId>autopoi-web</artifactId>
... ...
jeecg-boot/jeecg-boot-starter/jeecg-boot-starter-job/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java 0 → 100644
  1 +package com.xxl.job.core.executor;
  2 +
  3 +import com.xxl.job.core.biz.AdminBiz;
  4 +import com.xxl.job.core.biz.client.AdminBizClient;
  5 +import com.xxl.job.core.handler.IJobHandler;
  6 +import com.xxl.job.core.log.XxlJobFileAppender;
  7 +import com.xxl.job.core.server.EmbedServer;
  8 +import com.xxl.job.core.thread.JobLogFileCleanThread;
  9 +import com.xxl.job.core.thread.JobThread;
  10 +import com.xxl.job.core.thread.TriggerCallbackThread;
  11 +import com.xxl.job.core.util.IpUtil;
  12 +import com.xxl.job.core.util.NetUtil;
  13 +import org.slf4j.Logger;
  14 +import org.slf4j.LoggerFactory;
  15 +
  16 +import java.util.ArrayList;
  17 +import java.util.List;
  18 +import java.util.Map;
  19 +import java.util.concurrent.ConcurrentHashMap;
  20 +import java.util.concurrent.ConcurrentMap;
  21 +
  22 +/**
  23 + * 重写目的修改默认端口9999为10000避免和网关冲突
  24 + */
  25 +public class XxlJobExecutor {
  26 + private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);
  27 +
  28 + // ---------------------- param ----------------------
  29 + private String adminAddresses;
  30 + private String accessToken;
  31 + private String appname;
  32 + private String address;
  33 + private String ip;
  34 + private int port;
  35 + private String logPath;
  36 + private int logRetentionDays;
  37 +
  38 + public void setAdminAddresses(String adminAddresses) {
  39 + this.adminAddresses = adminAddresses;
  40 + }
  41 +
  42 + public void setAccessToken(String accessToken) {
  43 + this.accessToken = accessToken;
  44 + }
  45 +
  46 + public void setAppname(String appname) {
  47 + this.appname = appname;
  48 + }
  49 +
  50 + public void setAddress(String address) {
  51 + this.address = address;
  52 + }
  53 +
  54 + public void setIp(String ip) {
  55 + this.ip = ip;
  56 + }
  57 +
  58 + public void setPort(int port) {
  59 + this.port = port;
  60 + }
  61 +
  62 + public void setLogPath(String logPath) {
  63 + this.logPath = logPath;
  64 + }
  65 +
  66 + public void setLogRetentionDays(int logRetentionDays) {
  67 + this.logRetentionDays = logRetentionDays;
  68 + }
  69 +
  70 +
  71 + // ---------------------- start + stop ----------------------
  72 + public void start() throws Exception {
  73 +
  74 + // init logpath
  75 + XxlJobFileAppender.initLogPath(logPath);
  76 +
  77 + // init invoker, admin-client
  78 + initAdminBizList(adminAddresses, accessToken);
  79 +
  80 +
  81 + // init JobLogFileCleanThread
  82 + JobLogFileCleanThread.getInstance().start(logRetentionDays);
  83 +
  84 + // init TriggerCallbackThread
  85 + TriggerCallbackThread.getInstance().start();
  86 +
  87 + // init executor-server
  88 + initEmbedServer(address, ip, port, appname, accessToken);
  89 + }
  90 +
  91 + public void destroy() {
  92 + // destory executor-server
  93 + stopEmbedServer();
  94 +
  95 + // destory jobThreadRepository
  96 + if (jobThreadRepository.size() > 0) {
  97 + for (Map.Entry<Integer, JobThread> item : jobThreadRepository.entrySet()) {
  98 + JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");
  99 + // wait for job thread push result to callback queue
  100 + if (oldJobThread != null) {
  101 + try {
  102 + oldJobThread.join();
  103 + } catch (InterruptedException e) {
  104 + logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e);
  105 + }
  106 + }
  107 + }
  108 + jobThreadRepository.clear();
  109 + }
  110 + jobHandlerRepository.clear();
  111 +
  112 +
  113 + // destory JobLogFileCleanThread
  114 + JobLogFileCleanThread.getInstance().toStop();
  115 +
  116 + // destory TriggerCallbackThread
  117 + TriggerCallbackThread.getInstance().toStop();
  118 +
  119 + }
  120 +
  121 +
  122 + // ---------------------- admin-client (rpc invoker) ----------------------
  123 + private static List<AdminBiz> adminBizList;
  124 +
  125 + private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
  126 + if (adminAddresses != null && adminAddresses.trim().length() > 0) {
  127 + for (String address : adminAddresses.trim().split(",")) {
  128 + if (address != null && address.trim().length() > 0) {
  129 +
  130 + AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
  131 +
  132 + if (adminBizList == null) {
  133 + adminBizList = new ArrayList<AdminBiz>();
  134 + }
  135 + adminBizList.add(adminBiz);
  136 + }
  137 + }
  138 + }
  139 + }
  140 +
  141 + public static List<AdminBiz> getAdminBizList() {
  142 + return adminBizList;
  143 + }
  144 +
  145 + // ---------------------- executor-server (rpc provider) ----------------------
  146 + private EmbedServer embedServer = null;
  147 +
  148 + private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
  149 +
  150 + // fill ip port 修改默认端口
  151 + port = port > 0 ? port : NetUtil.findAvailablePort(10000);
  152 + ip = (ip != null && ip.trim().length() > 0) ? ip : IpUtil.getIp();
  153 +
  154 + // generate address
  155 + if (address == null || address.trim().length() == 0) {
  156 + String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null
  157 + address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
  158 + }
  159 +
  160 + // accessToken
  161 + if (accessToken == null || accessToken.trim().length() == 0) {
  162 + logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
  163 + }
  164 +
  165 + // start
  166 + embedServer = new EmbedServer();
  167 + embedServer.start(address, port, appname, accessToken);
  168 + }
  169 +
  170 + private void stopEmbedServer() {
  171 + // stop provider factory
  172 + if (embedServer != null) {
  173 + try {
  174 + embedServer.stop();
  175 + } catch (Exception e) {
  176 + logger.error(e.getMessage(), e);
  177 + }
  178 + }
  179 + }
  180 +
  181 +
  182 + // ---------------------- job handler repository ----------------------
  183 + private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
  184 +
  185 + public static IJobHandler loadJobHandler(String name) {
  186 + return jobHandlerRepository.get(name);
  187 + }
  188 +
  189 + public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) {
  190 + logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
  191 + return jobHandlerRepository.put(name, jobHandler);
  192 + }
  193 +
  194 +
  195 + // ---------------------- job thread repository ----------------------
  196 + private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
  197 +
  198 + public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason) {
  199 + JobThread newJobThread = new JobThread(jobId, handler);
  200 + newJobThread.start();
  201 + logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
  202 +
  203 + JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
  204 + if (oldJobThread != null) {
  205 + oldJobThread.toStop(removeOldReason);
  206 + oldJobThread.interrupt();
  207 + }
  208 +
  209 + return newJobThread;
  210 + }
  211 +
  212 + public static JobThread removeJobThread(int jobId, String removeOldReason) {
  213 + JobThread oldJobThread = jobThreadRepository.remove(jobId);
  214 + if (oldJobThread != null) {
  215 + oldJobThread.toStop(removeOldReason);
  216 + oldJobThread.interrupt();
  217 +
  218 + return oldJobThread;
  219 + }
  220 + return null;
  221 + }
  222 +
  223 + public static JobThread loadJobThread(int jobId) {
  224 + JobThread jobThread = jobThreadRepository.get(jobId);
  225 + return jobThread;
  226 + }
  227 +
  228 +}
... ...