Explorar el Código

监听mq第三方推送

ZhangNing hace 9 meses
padre
commit
5581aa19ce

+ 3 - 3
pom.xml

@@ -9,10 +9,10 @@
         <relativePath/> <!-- lookup parent from repository -->
     </parent>
     <groupId>com.xyhy</groupId>
-    <artifactId>chargPileMQTTCluster</artifactId>
+    <artifactId>jiaranMQTTCluster</artifactId>
     <version>0.0.1-SNAPSHOT</version>
-    <name>chargPileMQTTCluster</name>
-    <description>chargPileMQTTCluster</description>
+    <name>jiaranMQTTCluster</name>
+    <description>jiaranMQTTCluster</description>
 
 
     <properties>

+ 3 - 0
src/main/java/com/xyhy/jiaranmqttcluster/ServerRun.java

@@ -3,6 +3,7 @@ package com.xyhy.jiaranmqttcluster;
 
 import com.xyhy.jiaranmqttcluster.common.Common;
 import com.xyhy.jiaranmqttcluster.common.mqtt.MqttServer;
+import com.xyhy.jiaranmqttcluster.common.rabbitmq.listening.Listen_thirdPartyPush_jiaran;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Component;
 
@@ -15,5 +16,7 @@ public class ServerRun extends Common implements CommandLineRunner {
         //MQTT server 启动
         MqttServer.getInstance().connect();
 
+        Listen_thirdPartyPush_jiaran lt = new Listen_thirdPartyPush_jiaran();
+        lt.start();
     }
 }

+ 35 - 0
src/main/java/com/xyhy/jiaranmqttcluster/common/message/back/MessageBackThreadGroupCommon.java

@@ -0,0 +1,35 @@
+package com.xyhy.jiaranmqttcluster.common.message.back;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class MessageBackThreadGroupCommon {
+
+    private static MessageBackThreadGroupCommon _ThreadGroupCommon;
+
+    private ThreadPoolExecutor _threadPoolExecutor;
+
+
+    public static MessageBackThreadGroupCommon getThreadGroupCom()
+    {
+        if(_ThreadGroupCommon==null)
+        {
+            _ThreadGroupCommon = new MessageBackThreadGroupCommon(1000);
+        }
+        return _ThreadGroupCommon;
+    }
+
+    public MessageBackThreadGroupCommon(int threadsNum)
+    {
+        _threadPoolExecutor =  new ThreadPoolExecutor(threadsNum, threadsNum,
+                0L, TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<Runnable>());
+    }
+
+    public ThreadPoolExecutor getWorkThreadPool()
+    {
+        return _threadPoolExecutor;
+    }
+
+}

+ 22 - 0
src/main/java/com/xyhy/jiaranmqttcluster/common/message/back/Message_back_thirdPartyPush_jaran.java

@@ -0,0 +1,22 @@
+package com.xyhy.jiaranmqttcluster.common.message.back;
+
+import com.alibaba.fastjson.JSONObject;
+
+public class Message_back_thirdPartyPush_jaran implements Runnable {
+
+
+    private JSONObject ob;
+
+    public JSONObject getOb() {
+        return ob;
+    }
+
+    public void setOb(JSONObject ob) {
+        this.ob = ob;
+    }
+
+    @Override
+    public void run() {
+
+    }
+}

+ 53 - 0
src/main/java/com/xyhy/jiaranmqttcluster/common/rabbitmq/listening/Listen_thirdPartyPush_jiaran.java

@@ -0,0 +1,53 @@
+package com.xyhy.jiaranmqttcluster.common.rabbitmq.listening;
+
+import com.alibaba.fastjson.JSON;
+import com.rabbitmq.client.*;
+import com.xyhy.jiaranmqttcluster.common.Basic;
+import com.xyhy.jiaranmqttcluster.common.Common;
+import com.xyhy.jiaranmqttcluster.common.message.back.MessageBackThreadGroupCommon;
+import com.xyhy.jiaranmqttcluster.common.message.back.Message_back_thirdPartyPush_jaran;
+import com.xyhy.jiaranmqttcluster.common.rabbitmq.RabbitUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class Listen_thirdPartyPush_jiaran {
+
+    public final Logger log =  LoggerFactory.getLogger(this.getClass());
+
+    public void start()
+    {
+
+        String exchangeName = Common.getConfigByName("rabbitMq.server.TOMQTTSERVEREXCHANGE");
+//        String queueName = "mqtt."+ Common.FUNCTION_ADDORDER_CALLBACK;
+        String queueName = Common.getConfigByName("rabbitMq.server.ip")+"."+ Basic.FUNCTION_GETCPINFO_CALLBACK;
+        String Topic = queueName;
+
+        try {
+            Channel channel =  RabbitUtils.getChannel();
+            channel.exchangeDeclare(exchangeName, "topic",true);
+            channel.queueDeclare(queueName,true,false,false,null);
+            channel.queueBind(queueName,exchangeName,Topic);
+
+            Consumer consumer = new DefaultConsumer(channel) {
+                @Override
+                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
+                                           byte[] body) throws IOException {
+
+                    String message = new String(body, "UTF-8");
+                    log.info(message);
+//                    Message_back_thirdPartyPush_jaran work = new Message_back_thirdPartyPush_jaran();
+//                    work.setOb(JSON.parseObject(message));
+//                    MessageBackThreadGroupCommon.getThreadGroupCom().getWorkThreadPool().execute(work);
+                }
+            };
+            // channel绑定队列、消费者,autoAck为true表示一旦收到消息则自动回复确认消息
+            channel.basicConsume(queueName, true, consumer);
+            log.info("第三方推送开始监听.....");
+        } catch (IOException e) {
+            log.error("第三方推送开始监听",e);
+        }
+    }
+
+}

+ 1 - 1
src/main/resources/application.properties

@@ -59,4 +59,4 @@ rabbitMq.server.TOIMPLSERVEREXCHANGE=TOIMPLSERVEREXCHANGE
 rabbitMq.server.TOREALTIMEDATASERVEREXCHANGE=TOREALTIMEDATASERVEREXCHANGE
 
 #######log projectName###########
-project.name=myClient12312AAA312312311231
+project.name=jiaranMQTTCluster

+ 94 - 0
src/main/resources/logback-spring.xml

@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+    <springProperty scope="context" name="logName" source="project.name" defaultValue="default"/>
+    <!-- 日志存放路径 -->
+    <property name="log.path" value="/var/log/${logName}/logs" />
+    <!-- 日志输出格式 -->
+    <property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n" />
+
+    <!-- 控制台输出 -->
+    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>${log.pattern}</pattern>
+        </encoder>
+    </appender>
+
+    <!-- 系统日志输出 -->
+    <appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${log.path}/sys-info.log</file>
+        <!-- 循环政策:基于时间创建日志文件 -->
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <!-- 日志文件名格式 -->
+            <fileNamePattern>${log.path}/sys-info.%d{yyyy-MM-dd}.log</fileNamePattern>
+            <!-- 日志最大的历史 60天 -->
+            <maxHistory>60</maxHistory>
+        </rollingPolicy>
+        <encoder>
+            <pattern>${log.pattern}</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.LevelFilter">
+            <!-- 过滤的级别 -->
+            <level>INFO</level>
+            <!-- 匹配时的操作:接收(记录) -->
+            <onMatch>ACCEPT</onMatch>
+            <!-- 不匹配时的操作:拒绝(不记录) -->
+            <onMismatch>DENY</onMismatch>
+        </filter>
+    </appender>
+
+    <appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${log.path}/sys-error.log</file>
+        <!-- 循环政策:基于时间创建日志文件 -->
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <!-- 日志文件名格式 -->
+            <fileNamePattern>${log.path}/sys-error.%d{yyyy-MM-dd}.log</fileNamePattern>
+            <!-- 日志最大的历史 60天 -->
+            <maxHistory>60</maxHistory>
+        </rollingPolicy>
+        <encoder>
+            <pattern>${log.pattern}</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.LevelFilter">
+            <!-- 过滤的级别 -->
+            <level>ERROR</level>
+            <!-- 匹配时的操作:接收(记录) -->
+            <onMatch>ACCEPT</onMatch>
+            <!-- 不匹配时的操作:拒绝(不记录) -->
+            <onMismatch>DENY</onMismatch>
+        </filter>
+    </appender>
+
+    <!-- 用户访问日志输出  -->
+    <appender name="sys-user" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${log.path}/sys-user.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <!-- 按天回滚 daily -->
+            <fileNamePattern>${log.path}/sys-user.%d{yyyy-MM-dd}.log</fileNamePattern>
+            <!-- 日志最大的历史 60天 -->
+            <maxHistory>60</maxHistory>
+        </rollingPolicy>
+        <encoder>
+            <pattern>${log.pattern}</pattern>
+        </encoder>
+    </appender>
+
+    <!-- 系统模块日志级别控制  -->
+    <logger name="com.xyhy" level="info" />
+    <!-- Spring日志级别控制  -->
+    <logger name="org.springframework" level="warn" />
+
+    <root level="info">
+        <appender-ref ref="console" />
+    </root>
+
+    <!--系统操作日志-->
+    <root level="info">
+        <appender-ref ref="file_info" />
+        <appender-ref ref="file_error" />
+    </root>
+
+    <!--系统用户操作日志-->
+    <logger name="sys-user" level="info">
+        <appender-ref ref="sys-user"/>
+    </logger>
+</configuration>