Commit b9f516a51355ccde77f1f29feb03e7fbea0582c4
1 parent
ae917438
消息队列中报微服务异常 issues/I4977W
Showing
5 changed files
with
96 additions
and
10 deletions
jeecg-boot/jeecg-boot-base/jeecg-boot-base-tools/src/main/java/org/jeecg/common/config/mqtoken/TransmitUserTokenFilter.java
0 → 100644
1 | +package org.jeecg.common.config.mqtoken; | ||
2 | + | ||
3 | +import javax.servlet.*; | ||
4 | +import javax.servlet.http.HttpServletRequest; | ||
5 | +import java.io.IOException; | ||
6 | + | ||
7 | +/** | ||
8 | + * 存放token到上下文供队列调用feign使用 | ||
9 | + * @author zyf | ||
10 | + */ | ||
11 | +public class TransmitUserTokenFilter implements Filter { | ||
12 | + | ||
13 | + private static String X_ACCESS_TOKEN="X-Access-Token"; | ||
14 | + | ||
15 | + public TransmitUserTokenFilter() { | ||
16 | + } | ||
17 | + | ||
18 | + @Override | ||
19 | + public void init(FilterConfig filterConfig) throws ServletException { | ||
20 | + } | ||
21 | + | ||
22 | + @Override | ||
23 | + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { | ||
24 | + this.initUserInfo((HttpServletRequest) request); | ||
25 | + chain.doFilter(request, response); | ||
26 | + } | ||
27 | + | ||
28 | + private void initUserInfo(HttpServletRequest request) { | ||
29 | + String token = request.getHeader(X_ACCESS_TOKEN); | ||
30 | + if (token!=null) { | ||
31 | + try { | ||
32 | + //将token放入上下文中 | ||
33 | + UserTokenContext.setToken(token); | ||
34 | + } catch (Exception e) { | ||
35 | + | ||
36 | + } | ||
37 | + } | ||
38 | + } | ||
39 | + | ||
40 | + @Override | ||
41 | + public void destroy() { | ||
42 | + } | ||
43 | +} | ||
0 | \ No newline at end of file | 44 | \ No newline at end of file |
jeecg-boot/jeecg-boot-base/jeecg-boot-base-tools/src/main/java/org/jeecg/common/config/mqtoken/UserTokenContext.java
0 → 100644
1 | +package org.jeecg.common.config.mqtoken; | ||
2 | + | ||
3 | + | ||
4 | +/** | ||
5 | + * 用户token上下文 | ||
6 | + * @author zyf | ||
7 | + */ | ||
8 | +public class UserTokenContext { | ||
9 | + | ||
10 | + private static ThreadLocal<String> userToken = new ThreadLocal<String>(); | ||
11 | + | ||
12 | + public UserTokenContext() { | ||
13 | + } | ||
14 | + | ||
15 | + public static String getToken(){ | ||
16 | + return userToken.get(); | ||
17 | + } | ||
18 | + | ||
19 | + public static void setToken(String token){ | ||
20 | + userToken.set(token); | ||
21 | + } | ||
22 | +} |
jeecg-boot/jeecg-boot-starter/jeecg-boot-starter-rabbitmq/src/main/java/org/jeecg/boot/starter/rabbitmq/client/RabbitMqClient.java
@@ -28,6 +28,7 @@ import java.text.SimpleDateFormat; | @@ -28,6 +28,7 @@ import java.text.SimpleDateFormat; | ||
28 | import java.util.Date; | 28 | import java.util.Date; |
29 | import java.util.HashMap; | 29 | import java.util.HashMap; |
30 | import java.util.Map; | 30 | import java.util.Map; |
31 | +import java.util.Properties; | ||
31 | 32 | ||
32 | /** | 33 | /** |
33 | * 消息队列客户端 | 34 | * 消息队列客户端 |
@@ -91,11 +92,16 @@ public class RabbitMqClient { | @@ -91,11 +92,16 @@ public class RabbitMqClient { | ||
91 | rabbitAdmin.declareExchange(directExchange); | 92 | rabbitAdmin.declareExchange(directExchange); |
92 | if (ObjectUtil.isNotEmpty(queues)) { | 93 | if (ObjectUtil.isNotEmpty(queues)) { |
93 | for (String queueName : queues) { | 94 | for (String queueName : queues) { |
94 | - Queue queue = new Queue(queueName); | ||
95 | - addQueue(queue); | ||
96 | - Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName); | ||
97 | - rabbitAdmin.declareBinding(binding); | ||
98 | - log.info("队列创建成功:" + queueName); | 95 | + Properties result = rabbitAdmin.getQueueProperties(queueName); |
96 | + if (ObjectUtil.isEmpty(result)) { | ||
97 | + Queue queue = new Queue(queueName); | ||
98 | + addQueue(queue); | ||
99 | + Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName); | ||
100 | + rabbitAdmin.declareBinding(binding); | ||
101 | + log.info("创建队列:" + queueName); | ||
102 | + }else{ | ||
103 | + log.info("已有队列:" + queueName); | ||
104 | + } | ||
99 | } | 105 | } |
100 | } | 106 | } |
101 | } | 107 | } |
jeecg-boot/jeecg-boot-starter/jeecg-boot-starter-rabbitmq/src/main/java/org/jeecg/boot/starter/rabbitmq/config/RabbitMqConfig.java
1 | package org.jeecg.boot.starter.rabbitmq.config; | 1 | package org.jeecg.boot.starter.rabbitmq.config; |
2 | 2 | ||
3 | 3 | ||
4 | +import java.util.UUID; | ||
5 | + | ||
4 | import org.jeecg.boot.starter.rabbitmq.event.JeecgRemoteApplicationEvent; | 6 | import org.jeecg.boot.starter.rabbitmq.event.JeecgRemoteApplicationEvent; |
5 | -import org.jeecg.boot.starter.rabbitmq.exchange.DelayExchangeBuilder; | 7 | +import org.jeecg.common.config.mqtoken.TransmitUserTokenFilter; |
6 | import org.springframework.amqp.core.AcknowledgeMode; | 8 | import org.springframework.amqp.core.AcknowledgeMode; |
7 | -import org.springframework.amqp.core.CustomExchange; | ||
8 | import org.springframework.amqp.rabbit.connection.ConnectionFactory; | 9 | import org.springframework.amqp.rabbit.connection.ConnectionFactory; |
9 | import org.springframework.amqp.rabbit.core.RabbitAdmin; | 10 | import org.springframework.amqp.rabbit.core.RabbitAdmin; |
10 | import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; | 11 | import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; |
@@ -13,8 +14,6 @@ import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan; | @@ -13,8 +14,6 @@ import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan; | ||
13 | import org.springframework.context.annotation.Bean; | 14 | import org.springframework.context.annotation.Bean; |
14 | import org.springframework.context.annotation.Configuration; | 15 | import org.springframework.context.annotation.Configuration; |
15 | 16 | ||
16 | -import java.util.UUID; | ||
17 | - | ||
18 | /** | 17 | /** |
19 | * 消息队列配置类 | 18 | * 消息队列配置类 |
20 | * | 19 | * |
@@ -33,7 +32,14 @@ public class RabbitMqConfig { | @@ -33,7 +32,14 @@ public class RabbitMqConfig { | ||
33 | return rabbitAdmin; | 32 | return rabbitAdmin; |
34 | } | 33 | } |
35 | 34 | ||
36 | - | 35 | + /** |
36 | + * 注入获取token过滤器 | ||
37 | + * @return | ||
38 | + */ | ||
39 | + @Bean | ||
40 | + public TransmitUserTokenFilter transmitUserInfoFromHttpHeader(){ | ||
41 | + return new TransmitUserTokenFilter(); | ||
42 | + } | ||
37 | 43 | ||
38 | @Bean | 44 | @Bean |
39 | public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { | 45 | public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { |
jeecg-boot/jeecg-boot-starter/jeecg-boot-starter-rabbitmq/src/main/java/org/jeecg/boot/starter/rabbitmq/core/BaseRabbiMqHandler.java
@@ -2,15 +2,24 @@ package org.jeecg.boot.starter.rabbitmq.core; | @@ -2,15 +2,24 @@ package org.jeecg.boot.starter.rabbitmq.core; | ||
2 | 2 | ||
3 | import com.rabbitmq.client.Channel; | 3 | import com.rabbitmq.client.Channel; |
4 | import lombok.extern.slf4j.Slf4j; | 4 | import lombok.extern.slf4j.Slf4j; |
5 | + | ||
5 | import org.jeecg.boot.starter.rabbitmq.listenter.MqListener; | 6 | import org.jeecg.boot.starter.rabbitmq.listenter.MqListener; |
7 | +import org.jeecg.common.config.mqtoken.UserTokenContext; | ||
6 | 8 | ||
7 | import java.io.IOException; | 9 | import java.io.IOException; |
8 | 10 | ||
11 | +/** | ||
12 | + * | ||
13 | + * @author zyf | ||
14 | + */ | ||
9 | @Slf4j | 15 | @Slf4j |
10 | public class BaseRabbiMqHandler<T> { | 16 | public class BaseRabbiMqHandler<T> { |
11 | 17 | ||
18 | + private String token= UserTokenContext.getToken(); | ||
19 | + | ||
12 | public void onMessage(T t, Long deliveryTag, Channel channel, MqListener mqListener) { | 20 | public void onMessage(T t, Long deliveryTag, Channel channel, MqListener mqListener) { |
13 | try { | 21 | try { |
22 | + UserTokenContext.setToken(token); | ||
14 | mqListener.handler(t, channel); | 23 | mqListener.handler(t, channel); |
15 | channel.basicAck(deliveryTag, false); | 24 | channel.basicAck(deliveryTag, false); |
16 | } catch (Exception e) { | 25 | } catch (Exception e) { |