diff --git a/jeecg-boot/jeecg-boot-base/jeecg-boot-base-core/pom.xml b/jeecg-boot/jeecg-boot-base/jeecg-boot-base-core/pom.xml index b8cfba6..e174ae0 100644 --- a/jeecg-boot/jeecg-boot-base/jeecg-boot-base-core/pom.xml +++ b/jeecg-boot/jeecg-boot-base/jeecg-boot-base-core/pom.xml @@ -151,7 +151,7 @@ <version>${shiro.version}</version> </dependency> <dependency> - <groupId>org.hibernate</groupId> + <groupId>org.jeecgframework.boot</groupId> <artifactId>hibernate-re</artifactId> <version>2.4.2</version> </dependency> diff --git a/jeecg-boot/jeecg-boot-module-system/pom.xml b/jeecg-boot/jeecg-boot-module-system/pom.xml index ae0c43b..e95c746 100644 --- a/jeecg-boot/jeecg-boot-module-system/pom.xml +++ b/jeecg-boot/jeecg-boot-module-system/pom.xml @@ -45,9 +45,9 @@ </dependency> <!-- 积木报表 --> <dependency> - <groupId>com.jimureport</groupId> + <groupId>org.jeecgframework.jimureport</groupId> <artifactId>spring-boot-starter-jimureport</artifactId> - <version>1.1.09-beta</version> + <version>1.2.0</version> <exclusions> <exclusion> <artifactId>autopoi-web</artifactId> diff --git a/jeecg-boot/jeecg-boot-starter/jeecg-boot-starter-job/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/jeecg-boot/jeecg-boot-starter/jeecg-boot-starter-job/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java new file mode 100644 index 0000000..d6cf990 --- /dev/null +++ b/jeecg-boot/jeecg-boot-starter/jeecg-boot-starter-job/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -0,0 +1,228 @@ +package com.xxl.job.core.executor; + +import com.xxl.job.core.biz.AdminBiz; +import com.xxl.job.core.biz.client.AdminBizClient; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.job.core.server.EmbedServer; +import com.xxl.job.core.thread.JobLogFileCleanThread; +import com.xxl.job.core.thread.JobThread; +import com.xxl.job.core.thread.TriggerCallbackThread; +import com.xxl.job.core.util.IpUtil; +import com.xxl.job.core.util.NetUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * 重写目的修改默认端口9999为10000避免和网关冲突 + */ +public class XxlJobExecutor { + private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class); + + // ---------------------- param ---------------------- + private String adminAddresses; + private String accessToken; + private String appname; + private String address; + private String ip; + private int port; + private String logPath; + private int logRetentionDays; + + public void setAdminAddresses(String adminAddresses) { + this.adminAddresses = adminAddresses; + } + + public void setAccessToken(String accessToken) { + this.accessToken = accessToken; + } + + public void setAppname(String appname) { + this.appname = appname; + } + + public void setAddress(String address) { + this.address = address; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public void setPort(int port) { + this.port = port; + } + + public void setLogPath(String logPath) { + this.logPath = logPath; + } + + public void setLogRetentionDays(int logRetentionDays) { + this.logRetentionDays = logRetentionDays; + } + + + // ---------------------- start + stop ---------------------- + public void start() throws Exception { + + // init logpath + XxlJobFileAppender.initLogPath(logPath); + + // init invoker, admin-client + initAdminBizList(adminAddresses, accessToken); + + + // init JobLogFileCleanThread + JobLogFileCleanThread.getInstance().start(logRetentionDays); + + // init TriggerCallbackThread + TriggerCallbackThread.getInstance().start(); + + // init executor-server + initEmbedServer(address, ip, port, appname, accessToken); + } + + public void destroy() { + // destory executor-server + stopEmbedServer(); + + // destory jobThreadRepository + if (jobThreadRepository.size() > 0) { + for (Map.Entry<Integer, JobThread> item : jobThreadRepository.entrySet()) { + JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job."); + // wait for job thread push result to callback queue + if (oldJobThread != null) { + try { + oldJobThread.join(); + } catch (InterruptedException e) { + logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e); + } + } + } + jobThreadRepository.clear(); + } + jobHandlerRepository.clear(); + + + // destory JobLogFileCleanThread + JobLogFileCleanThread.getInstance().toStop(); + + // destory TriggerCallbackThread + TriggerCallbackThread.getInstance().toStop(); + + } + + + // ---------------------- admin-client (rpc invoker) ---------------------- + private static List<AdminBiz> adminBizList; + + private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { + if (adminAddresses != null && adminAddresses.trim().length() > 0) { + for (String address : adminAddresses.trim().split(",")) { + if (address != null && address.trim().length() > 0) { + + AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken); + + if (adminBizList == null) { + adminBizList = new ArrayList<AdminBiz>(); + } + adminBizList.add(adminBiz); + } + } + } + } + + public static List<AdminBiz> getAdminBizList() { + return adminBizList; + } + + // ---------------------- executor-server (rpc provider) ---------------------- + private EmbedServer embedServer = null; + + private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception { + + // fill ip port 修改默认端口 + port = port > 0 ? port : NetUtil.findAvailablePort(10000); + ip = (ip != null && ip.trim().length() > 0) ? ip : IpUtil.getIp(); + + // generate address + if (address == null || address.trim().length() == 0) { + String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null + address = "http://{ip_port}/".replace("{ip_port}", ip_port_address); + } + + // accessToken + if (accessToken == null || accessToken.trim().length() == 0) { + logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken."); + } + + // start + embedServer = new EmbedServer(); + embedServer.start(address, port, appname, accessToken); + } + + private void stopEmbedServer() { + // stop provider factory + if (embedServer != null) { + try { + embedServer.stop(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + } + + + // ---------------------- job handler repository ---------------------- + private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>(); + + public static IJobHandler loadJobHandler(String name) { + return jobHandlerRepository.get(name); + } + + public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) { + logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); + return jobHandlerRepository.put(name, jobHandler); + } + + + // ---------------------- job thread repository ---------------------- + private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>(); + + public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason) { + JobThread newJobThread = new JobThread(jobId, handler); + newJobThread.start(); + logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); + + JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!! + if (oldJobThread != null) { + oldJobThread.toStop(removeOldReason); + oldJobThread.interrupt(); + } + + return newJobThread; + } + + public static JobThread removeJobThread(int jobId, String removeOldReason) { + JobThread oldJobThread = jobThreadRepository.remove(jobId); + if (oldJobThread != null) { + oldJobThread.toStop(removeOldReason); + oldJobThread.interrupt(); + + return oldJobThread; + } + return null; + } + + public static JobThread loadJobThread(int jobId) { + JobThread jobThread = jobThreadRepository.get(jobId); + return jobThread; + } + +}