hwt 8 ماه پیش
کامیت
aa1c4a4b97
56فایلهای تغییر یافته به همراه3286 افزوده شده و 0 حذف شده
  1. 33 0
      .gitignore
  2. BIN
      .mvn/wrapper/maven-wrapper.jar
  3. 2 0
      .mvn/wrapper/maven-wrapper.properties
  4. 308 0
      mvnw
  5. 205 0
      mvnw.cmd
  6. 91 0
      pom.xml
  7. 13 0
      src/main/java/com/xyhy/chargpilemqttcluster/ChargPileMqttClusterApplication.java
  8. 49 0
      src/main/java/com/xyhy/chargpilemqttcluster/ServerRun.java
  9. 46 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/Basic.java
  10. 28 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/Common.java
  11. 53 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/disruptor/DisruptorCommon.java
  12. 24 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/disruptor/LongEvent.java
  13. 11 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/disruptor/LongEventFactory.java
  14. 28 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/disruptor/LongEventHandler.java
  15. 35 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/disruptor/LongEventProducer.java
  16. 36 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/mqtt/MqttGetMessage.java
  17. 133 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/mqtt/MqttServer.java
  18. 52 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/RabbitUtils.java
  19. 50 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/listening/Listen_addOrder.java
  20. 49 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/listening/Listen_getChargInfo.java
  21. 49 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/listening/Listen_settlement.java
  22. 49 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/listening/Listen_startCp.java
  23. 50 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/listening/Listen_stopCp.java
  24. 49 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/listening/Listen_syn_settlement.java
  25. 27 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_addOrder.java
  26. 27 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_addvinno.java
  27. 27 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_backStartCp.java
  28. 26 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_backStopCp.java
  29. 26 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_chargingstatus.java
  30. 63 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_common.java
  31. 27 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_exceptOrder.java
  32. 26 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_getChargInfo.java
  33. 26 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_save.java
  34. 27 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_saveRealTime.java
  35. 27 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_saveRealTime_LowBattery.java
  36. 27 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_saveRealTime_Tricycle.java
  37. 26 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_saveWarning.java
  38. 27 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_settlement.java
  39. 27 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_syn_settlement.java
  40. 56 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/redis/JedisPoolUtils.java
  41. 40 0
      src/main/java/com/xyhy/chargpilemqttcluster/common/redis/RedisHelp.java
  42. 71 0
      src/main/java/com/xyhy/chargpilemqttcluster/workThread/Job/Job_startCheck.java
  43. 69 0
      src/main/java/com/xyhy/chargpilemqttcluster/workThread/Job/Job_stopCheck.java
  44. 41 0
      src/main/java/com/xyhy/chargpilemqttcluster/workThread/WorkThread.java
  45. 42 0
      src/main/java/com/xyhy/chargpilemqttcluster/workThread/WorkThreadGroupCommon.java
  46. 132 0
      src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/MessageBus.java
  47. 313 0
      src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/MessageCommon.java
  48. 42 0
      src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/back/MessageBackThreadGroupCommon.java
  49. 112 0
      src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/back/Message_back_addOrder.java
  50. 76 0
      src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/back/Message_back_getChargInfo.java
  51. 75 0
      src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/back/Message_back_settlement.java
  52. 70 0
      src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/back/Message_back_syn_settlement.java
  53. 63 0
      src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/back/Message_startCP.java
  54. 51 0
      src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/back/Message_stopCP.java
  55. 60 0
      src/main/resources/application.properties
  56. 94 0
      src/main/resources/logback-spring.xml

+ 33 - 0
.gitignore

@@ -0,0 +1,33 @@
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/

BIN
.mvn/wrapper/maven-wrapper.jar


+ 2 - 0
.mvn/wrapper/maven-wrapper.properties

@@ -0,0 +1,2 @@
+distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.5/apache-maven-3.9.5-bin.zip
+wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar

+ 308 - 0
mvnw

@@ -0,0 +1,308 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+
+# ----------------------------------------------------------------------------
+# Apache Maven Wrapper startup batch script, version 3.2.0
+#
+# Required ENV vars:
+# ------------------
+#   JAVA_HOME - location of a JDK home dir
+#
+# Optional ENV vars
+# -----------------
+#   MAVEN_OPTS - parameters passed to the Java VM when running Maven
+#     e.g. to debug Maven itself, use
+#       set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+#   MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+# ----------------------------------------------------------------------------
+
+if [ -z "$MAVEN_SKIP_RC" ] ; then
+
+  if [ -f /usr/local/etc/mavenrc ] ; then
+    . /usr/local/etc/mavenrc
+  fi
+
+  if [ -f /etc/mavenrc ] ; then
+    . /etc/mavenrc
+  fi
+
+  if [ -f "$HOME/.mavenrc" ] ; then
+    . "$HOME/.mavenrc"
+  fi
+
+fi
+
+# OS specific support.  $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "$(uname)" in
+  CYGWIN*) cygwin=true ;;
+  MINGW*) mingw=true;;
+  Darwin*) darwin=true
+    # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
+    # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+    if [ -z "$JAVA_HOME" ]; then
+      if [ -x "/usr/libexec/java_home" ]; then
+        JAVA_HOME="$(/usr/libexec/java_home)"; export JAVA_HOME
+      else
+        JAVA_HOME="/Library/Java/Home"; export JAVA_HOME
+      fi
+    fi
+    ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+  if [ -r /etc/gentoo-release ] ; then
+    JAVA_HOME=$(java-config --jre-home)
+  fi
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME=$(cygpath --unix "$JAVA_HOME")
+  [ -n "$CLASSPATH" ] &&
+    CLASSPATH=$(cygpath --path --unix "$CLASSPATH")
+fi
+
+# For Mingw, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+  [ -n "$JAVA_HOME" ] && [ -d "$JAVA_HOME" ] &&
+    JAVA_HOME="$(cd "$JAVA_HOME" || (echo "cannot cd into $JAVA_HOME."; exit 1); pwd)"
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+  javaExecutable="$(which javac)"
+  if [ -n "$javaExecutable" ] && ! [ "$(expr "\"$javaExecutable\"" : '\([^ ]*\)')" = "no" ]; then
+    # readlink(1) is not available as standard on Solaris 10.
+    readLink=$(which readlink)
+    if [ ! "$(expr "$readLink" : '\([^ ]*\)')" = "no" ]; then
+      if $darwin ; then
+        javaHome="$(dirname "\"$javaExecutable\"")"
+        javaExecutable="$(cd "\"$javaHome\"" && pwd -P)/javac"
+      else
+        javaExecutable="$(readlink -f "\"$javaExecutable\"")"
+      fi
+      javaHome="$(dirname "\"$javaExecutable\"")"
+      javaHome=$(expr "$javaHome" : '\(.*\)/bin')
+      JAVA_HOME="$javaHome"
+      export JAVA_HOME
+    fi
+  fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+  if [ -n "$JAVA_HOME"  ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+      # IBM's JDK on AIX uses strange locations for the executables
+      JAVACMD="$JAVA_HOME/jre/sh/java"
+    else
+      JAVACMD="$JAVA_HOME/bin/java"
+    fi
+  else
+    JAVACMD="$(\unset -f command 2>/dev/null; \command -v java)"
+  fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+  echo "Error: JAVA_HOME is not defined correctly." >&2
+  echo "  We cannot execute $JAVACMD" >&2
+  exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+  echo "Warning: JAVA_HOME environment variable is not set."
+fi
+
+# traverses directory structure from process work directory to filesystem root
+# first directory with .mvn subdirectory is considered project base directory
+find_maven_basedir() {
+  if [ -z "$1" ]
+  then
+    echo "Path not specified to find_maven_basedir"
+    return 1
+  fi
+
+  basedir="$1"
+  wdir="$1"
+  while [ "$wdir" != '/' ] ; do
+    if [ -d "$wdir"/.mvn ] ; then
+      basedir=$wdir
+      break
+    fi
+    # workaround for JBEAP-8937 (on Solaris 10/Sparc)
+    if [ -d "${wdir}" ]; then
+      wdir=$(cd "$wdir/.." || exit 1; pwd)
+    fi
+    # end of workaround
+  done
+  printf '%s' "$(cd "$basedir" || exit 1; pwd)"
+}
+
+# concatenates all lines of a file
+concat_lines() {
+  if [ -f "$1" ]; then
+    # Remove \r in case we run on Windows within Git Bash
+    # and check out the repository with auto CRLF management
+    # enabled. Otherwise, we may read lines that are delimited with
+    # \r\n and produce $'-Xarg\r' rather than -Xarg due to word
+    # splitting rules.
+    tr -s '\r\n' ' ' < "$1"
+  fi
+}
+
+log() {
+  if [ "$MVNW_VERBOSE" = true ]; then
+    printf '%s\n' "$1"
+  fi
+}
+
+BASE_DIR=$(find_maven_basedir "$(dirname "$0")")
+if [ -z "$BASE_DIR" ]; then
+  exit 1;
+fi
+
+MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}; export MAVEN_PROJECTBASEDIR
+log "$MAVEN_PROJECTBASEDIR"
+
+##########################################################################################
+# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+# This allows using the maven wrapper in projects that prohibit checking in binary data.
+##########################################################################################
+wrapperJarPath="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar"
+if [ -r "$wrapperJarPath" ]; then
+    log "Found $wrapperJarPath"
+else
+    log "Couldn't find $wrapperJarPath, downloading it ..."
+
+    if [ -n "$MVNW_REPOURL" ]; then
+      wrapperUrl="$MVNW_REPOURL/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar"
+    else
+      wrapperUrl="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar"
+    fi
+    while IFS="=" read -r key value; do
+      # Remove '\r' from value to allow usage on windows as IFS does not consider '\r' as a separator ( considers space, tab, new line ('\n'), and custom '=' )
+      safeValue=$(echo "$value" | tr -d '\r')
+      case "$key" in (wrapperUrl) wrapperUrl="$safeValue"; break ;;
+      esac
+    done < "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.properties"
+    log "Downloading from: $wrapperUrl"
+
+    if $cygwin; then
+      wrapperJarPath=$(cygpath --path --windows "$wrapperJarPath")
+    fi
+
+    if command -v wget > /dev/null; then
+        log "Found wget ... using wget"
+        [ "$MVNW_VERBOSE" = true ] && QUIET="" || QUIET="--quiet"
+        if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+            wget $QUIET "$wrapperUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath"
+        else
+            wget $QUIET --http-user="$MVNW_USERNAME" --http-password="$MVNW_PASSWORD" "$wrapperUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath"
+        fi
+    elif command -v curl > /dev/null; then
+        log "Found curl ... using curl"
+        [ "$MVNW_VERBOSE" = true ] && QUIET="" || QUIET="--silent"
+        if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+            curl $QUIET -o "$wrapperJarPath" "$wrapperUrl" -f -L || rm -f "$wrapperJarPath"
+        else
+            curl $QUIET --user "$MVNW_USERNAME:$MVNW_PASSWORD" -o "$wrapperJarPath" "$wrapperUrl" -f -L || rm -f "$wrapperJarPath"
+        fi
+    else
+        log "Falling back to using Java to download"
+        javaSource="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/MavenWrapperDownloader.java"
+        javaClass="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/MavenWrapperDownloader.class"
+        # For Cygwin, switch paths to Windows format before running javac
+        if $cygwin; then
+          javaSource=$(cygpath --path --windows "$javaSource")
+          javaClass=$(cygpath --path --windows "$javaClass")
+        fi
+        if [ -e "$javaSource" ]; then
+            if [ ! -e "$javaClass" ]; then
+                log " - Compiling MavenWrapperDownloader.java ..."
+                ("$JAVA_HOME/bin/javac" "$javaSource")
+            fi
+            if [ -e "$javaClass" ]; then
+                log " - Running MavenWrapperDownloader.java ..."
+                ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$wrapperUrl" "$wrapperJarPath") || rm -f "$wrapperJarPath"
+            fi
+        fi
+    fi
+fi
+##########################################################################################
+# End of extension
+##########################################################################################
+
+# If specified, validate the SHA-256 sum of the Maven wrapper jar file
+wrapperSha256Sum=""
+while IFS="=" read -r key value; do
+  case "$key" in (wrapperSha256Sum) wrapperSha256Sum=$value; break ;;
+  esac
+done < "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.properties"
+if [ -n "$wrapperSha256Sum" ]; then
+  wrapperSha256Result=false
+  if command -v sha256sum > /dev/null; then
+    if echo "$wrapperSha256Sum  $wrapperJarPath" | sha256sum -c > /dev/null 2>&1; then
+      wrapperSha256Result=true
+    fi
+  elif command -v shasum > /dev/null; then
+    if echo "$wrapperSha256Sum  $wrapperJarPath" | shasum -a 256 -c > /dev/null 2>&1; then
+      wrapperSha256Result=true
+    fi
+  else
+    echo "Checksum validation was requested but neither 'sha256sum' or 'shasum' are available."
+    echo "Please install either command, or disable validation by removing 'wrapperSha256Sum' from your maven-wrapper.properties."
+    exit 1
+  fi
+  if [ $wrapperSha256Result = false ]; then
+    echo "Error: Failed to validate Maven wrapper SHA-256, your Maven wrapper might be compromised." >&2
+    echo "Investigate or delete $wrapperJarPath to attempt a clean download." >&2
+    echo "If you updated your Maven version, you need to update the specified wrapperSha256Sum property." >&2
+    exit 1
+  fi
+fi
+
+MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME=$(cygpath --path --windows "$JAVA_HOME")
+  [ -n "$CLASSPATH" ] &&
+    CLASSPATH=$(cygpath --path --windows "$CLASSPATH")
+  [ -n "$MAVEN_PROJECTBASEDIR" ] &&
+    MAVEN_PROJECTBASEDIR=$(cygpath --path --windows "$MAVEN_PROJECTBASEDIR")
+fi
+
+# Provide a "standardized" way to retrieve the CLI args that will
+# work with both Windows and non-Windows executions.
+MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $*"
+export MAVEN_CMD_LINE_ARGS
+
+WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+# shellcheck disable=SC2086 # safe args
+exec "$JAVACMD" \
+  $MAVEN_OPTS \
+  $MAVEN_DEBUG_OPTS \
+  -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
+  "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
+  ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"

+ 205 - 0
mvnw.cmd

@@ -0,0 +1,205 @@
+@REM ----------------------------------------------------------------------------
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements.  See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership.  The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License.  You may obtain a copy of the License at
+@REM
+@REM    https://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied.  See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM ----------------------------------------------------------------------------
+
+@REM ----------------------------------------------------------------------------
+@REM Apache Maven Wrapper startup batch script, version 3.2.0
+@REM
+@REM Required ENV vars:
+@REM JAVA_HOME - location of a JDK home dir
+@REM
+@REM Optional ENV vars
+@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
+@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
+@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
+@REM     e.g. to debug Maven itself, use
+@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+@REM ----------------------------------------------------------------------------
+
+@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
+@echo off
+@REM set title of command window
+title %0
+@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
+@if "%MAVEN_BATCH_ECHO%" == "on"  echo %MAVEN_BATCH_ECHO%
+
+@REM set %HOME% to equivalent of $HOME
+if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
+
+@REM Execute a user defined script before this one
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
+@REM check for pre script, once with legacy .bat ending and once with .cmd ending
+if exist "%USERPROFILE%\mavenrc_pre.bat" call "%USERPROFILE%\mavenrc_pre.bat" %*
+if exist "%USERPROFILE%\mavenrc_pre.cmd" call "%USERPROFILE%\mavenrc_pre.cmd" %*
+:skipRcPre
+
+@setlocal
+
+set ERROR_CODE=0
+
+@REM To isolate internal variables from possible post scripts, we use another setlocal
+@setlocal
+
+@REM ==== START VALIDATION ====
+if not "%JAVA_HOME%" == "" goto OkJHome
+
+echo.
+echo Error: JAVA_HOME not found in your environment. >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+:OkJHome
+if exist "%JAVA_HOME%\bin\java.exe" goto init
+
+echo.
+echo Error: JAVA_HOME is set to an invalid directory. >&2
+echo JAVA_HOME = "%JAVA_HOME%" >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+@REM ==== END VALIDATION ====
+
+:init
+
+@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
+@REM Fallback to current working directory if not found.
+
+set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
+IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
+
+set EXEC_DIR=%CD%
+set WDIR=%EXEC_DIR%
+:findBaseDir
+IF EXIST "%WDIR%"\.mvn goto baseDirFound
+cd ..
+IF "%WDIR%"=="%CD%" goto baseDirNotFound
+set WDIR=%CD%
+goto findBaseDir
+
+:baseDirFound
+set MAVEN_PROJECTBASEDIR=%WDIR%
+cd "%EXEC_DIR%"
+goto endDetectBaseDir
+
+:baseDirNotFound
+set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
+cd "%EXEC_DIR%"
+
+:endDetectBaseDir
+
+IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
+
+@setlocal EnableExtensions EnableDelayedExpansion
+for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
+@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
+
+:endReadAdditionalConfig
+
+SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
+set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
+set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+set WRAPPER_URL="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar"
+
+FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
+    IF "%%A"=="wrapperUrl" SET WRAPPER_URL=%%B
+)
+
+@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
+if exist %WRAPPER_JAR% (
+    if "%MVNW_VERBOSE%" == "true" (
+        echo Found %WRAPPER_JAR%
+    )
+) else (
+    if not "%MVNW_REPOURL%" == "" (
+        SET WRAPPER_URL="%MVNW_REPOURL%/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar"
+    )
+    if "%MVNW_VERBOSE%" == "true" (
+        echo Couldn't find %WRAPPER_JAR%, downloading it ...
+        echo Downloading from: %WRAPPER_URL%
+    )
+
+    powershell -Command "&{"^
+		"$webclient = new-object System.Net.WebClient;"^
+		"if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
+		"$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
+		"}"^
+		"[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%WRAPPER_URL%', '%WRAPPER_JAR%')"^
+		"}"
+    if "%MVNW_VERBOSE%" == "true" (
+        echo Finished downloading %WRAPPER_JAR%
+    )
+)
+@REM End of extension
+
+@REM If specified, validate the SHA-256 sum of the Maven wrapper jar file
+SET WRAPPER_SHA_256_SUM=""
+FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
+    IF "%%A"=="wrapperSha256Sum" SET WRAPPER_SHA_256_SUM=%%B
+)
+IF NOT %WRAPPER_SHA_256_SUM%=="" (
+    powershell -Command "&{"^
+       "$hash = (Get-FileHash \"%WRAPPER_JAR%\" -Algorithm SHA256).Hash.ToLower();"^
+       "If('%WRAPPER_SHA_256_SUM%' -ne $hash){"^
+       "  Write-Output 'Error: Failed to validate Maven wrapper SHA-256, your Maven wrapper might be compromised.';"^
+       "  Write-Output 'Investigate or delete %WRAPPER_JAR% to attempt a clean download.';"^
+       "  Write-Output 'If you updated your Maven version, you need to update the specified wrapperSha256Sum property.';"^
+       "  exit 1;"^
+       "}"^
+       "}"
+    if ERRORLEVEL 1 goto error
+)
+
+@REM Provide a "standardized" way to retrieve the CLI args that will
+@REM work with both Windows and non-Windows executions.
+set MAVEN_CMD_LINE_ARGS=%*
+
+%MAVEN_JAVA_EXE% ^
+  %JVM_CONFIG_MAVEN_PROPS% ^
+  %MAVEN_OPTS% ^
+  %MAVEN_DEBUG_OPTS% ^
+  -classpath %WRAPPER_JAR% ^
+  "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" ^
+  %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
+if ERRORLEVEL 1 goto error
+goto end
+
+:error
+set ERROR_CODE=1
+
+:end
+@endlocal & set ERROR_CODE=%ERROR_CODE%
+
+if not "%MAVEN_SKIP_RC%"=="" goto skipRcPost
+@REM check for post script, once with legacy .bat ending and once with .cmd ending
+if exist "%USERPROFILE%\mavenrc_post.bat" call "%USERPROFILE%\mavenrc_post.bat"
+if exist "%USERPROFILE%\mavenrc_post.cmd" call "%USERPROFILE%\mavenrc_post.cmd"
+:skipRcPost
+
+@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
+if "%MAVEN_BATCH_PAUSE%"=="on" pause
+
+if "%MAVEN_TERMINATE_CMD%"=="on" exit %ERROR_CODE%
+
+cmd /C exit /B %ERROR_CODE%

+ 91 - 0
pom.xml

@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <version>2.1.1.RELEASE</version>
+        <relativePath/> <!-- lookup parent from repository -->
+    </parent>
+    <groupId>com.xyhy</groupId>
+    <artifactId>chargPileMQTTCluster</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>chargPileMQTTCluster</name>
+    <description>chargPileMQTTCluster</description>
+
+
+    <properties>
+        <java.version>1.8</java.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.2.5</version>
+        </dependency>
+
+
+
+        <dependency>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+            <version>4.10.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.lmax</groupId>
+            <artifactId>disruptor</artifactId>
+            <version>3.4.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+            <version>2.9.0</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.49</version>
+        </dependency>
+
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 13 - 0
src/main/java/com/xyhy/chargpilemqttcluster/ChargPileMqttClusterApplication.java

@@ -0,0 +1,13 @@
+package com.xyhy.chargpilemqttcluster;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class ChargPileMqttClusterApplication {
+
+    public static void main(String[] args) {
+        SpringApplication.run(ChargPileMqttClusterApplication.class, args);
+    }
+
+}

+ 49 - 0
src/main/java/com/xyhy/chargpilemqttcluster/ServerRun.java

@@ -0,0 +1,49 @@
+package com.xyhy.chargpilemqttcluster;
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+import com.xyhy.chargpilemqttcluster.common.disruptor.DisruptorCommon;
+import com.xyhy.chargpilemqttcluster.common.mqtt.MqttServer;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.listening.Listen_addOrder;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.listening.Listen_settlement;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.listening.Listen_startCp;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.listening.Listen_stopCp;
+import com.xyhy.chargpilemqttcluster.common.redis.JedisPoolUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Component;
+import redis.clients.jedis.Jedis;
+
+@Component
+public class ServerRun extends Common implements CommandLineRunner {
+
+    @Override
+    public void run(String... args) throws Exception {
+
+        //redis启动监听
+        Jedis jds = JedisPoolUtils.getInstance().getConnection() ;
+        if (jds!=null){
+            log.info("Redis 连接预热成功!");
+        }
+        jds.close();
+
+        //业务队列监听启动
+        DisruptorCommon.getDisruptorCommon().Start();
+
+        Listen_addOrder la = new Listen_addOrder();
+        la.start();
+
+        Listen_settlement ls = new Listen_settlement();
+        ls.start();
+
+        Listen_startCp ls_start = new Listen_startCp();
+        ls_start.start();
+
+        Listen_stopCp ls_stop = new Listen_stopCp();
+        ls_stop.start();
+
+
+        //MQTT server 启动
+        MqttServer.getInstance().connect();
+
+    }
+}

+ 46 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/Basic.java

@@ -0,0 +1,46 @@
+package com.xyhy.chargpilemqttcluster.common;
+
+public class Basic {
+
+
+    public static int FUNCTION_STARTCP=101;
+    public static int FUNCTION_STARTCP_CALLBACK=102;
+    public static int FUNCTION_STOPCP=201;
+    public static int FUNCTION_STOPCP_CALLBACK=202;
+    public static int FUNCTION_GETORDERNUM=301;
+    public static int FUNCTION_GETORDERNUM_CALLBACK=302;
+    public static int FUNCTION_GETORDERINFO=401;
+    public static int FUNCTION_GETORDERINFO_CALLBACK=402;
+    public static int FUNCTION_GETCPINFO=501;
+    public static int FUNCTION_GETCPINFO_CALLBACK=502;
+    public static int FUNCTION_CHARGINGSTATUS=601;
+    public static int FUNCTION_ADDVINNO=701;
+    public static int FUNCTION_SETTLEMENT=801;
+    public static int FUNCTION_SETTLEMENT_CALLBACK=802;
+    public static int FUNCTION_SYSSETTLEMENT=901;
+    public static int FUNCTION_SYSSETTLEMENT_CALLBACK=902;
+    public static int FUNCTION_ADDORDER=1001;
+    public static int FUNCTION_ADDORDER_CALLBACK=1002;
+    public static int FUNCTION_SAVE=1011;
+    public static int FUNCTION_SAVEWARNING=1021;
+    public static int FUNCTION_EXCEPTORORDER=1031;
+
+    public static int FUNCTION_REALTIMEDATA_CAR=1041; //汽车桩子 交流直流
+    public static int FUNCTION_REALTIMEDATA_CHARGTABLE=1042; //换电柜
+    public static int FUNCTION_REALTIMEDATA_LOWBATTERY=1043; //慢车电池
+    public static int FUNCTION_REALTIMEDATA_TRICYCLE=1044; //三轮车
+    public static int FUNCTION_REALTIMEDATA_SLOWCARCHARGINGPILE=1045; //慢车充电桩
+
+    public static int FUNCTION_XCX_STARTCP=1051;
+    public static int FUNCTION_XCX_STARTCPBACK=1052;
+
+    public static int FUNCTION_XCX_STOPCP=1061;
+    public static int FUNCTION_XCX_STOPCPBACK=1062;
+
+    public static int FUNCTION_TIMELINE_CALLBACK=1071;
+
+
+    public static String ONLINE = "1";
+    public static String OFFLINE = "0";
+
+}

+ 28 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/Common.java

@@ -0,0 +1,28 @@
+package com.xyhy.chargpilemqttcluster.common;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.ClassPathResource;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+public class Common extends Basic{
+
+    public final Logger log =  LoggerFactory.getLogger(this.getClass());
+
+    public static String getConfigByName(String Key)  {
+        try {
+            ClassPathResource classPathResource = new ClassPathResource("application.properties");
+            InputStream inputStream =classPathResource.getInputStream();
+            Properties p = new Properties();
+            p.load(inputStream);
+            return p.getProperty(Key);
+        }catch (Exception o)
+        {
+            o.printStackTrace();
+        }
+        return null;
+    }
+
+}

+ 53 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/disruptor/DisruptorCommon.java

@@ -0,0 +1,53 @@
+package com.xyhy.chargpilemqttcluster.common.disruptor;
+
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+public class DisruptorCommon extends Common {
+
+    private static DisruptorCommon _dis;
+
+    public static DisruptorCommon getDisruptorCommon()
+    {
+        if (_dis == null)
+        {
+            _dis = new DisruptorCommon();
+        }
+        return _dis;
+    }
+
+    private Disruptor<LongEvent> _disruptor;
+
+    public Disruptor<LongEvent> getDisruptor()
+    {
+        if (_disruptor==null)
+        {
+            Start();
+
+        }
+        return _disruptor;
+    }
+
+
+    public void Start() {
+        LongEventFactory factory = new LongEventFactory();
+
+        // Specify the size of the ring buffer, must be power of 2.
+        int bufferSize = 1024;
+
+        // Construct the Disruptor
+        _disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
+
+        // Connect the handler
+        _disruptor.handleEventsWith(new LongEventHandler());
+
+        // Start the Disruptor, starts all threads running
+        _disruptor.start();
+
+        log.info("业务队列启动成功............");
+
+
+    }
+
+}

+ 24 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/disruptor/LongEvent.java

@@ -0,0 +1,24 @@
+package com.xyhy.chargpilemqttcluster.common.disruptor;
+
+public class LongEvent {
+
+    private byte[] _value;
+
+    private String _topic;
+
+    public void set(byte[] value,String ctx)
+    {
+        this._value = value;
+        this._topic = ctx;
+    }
+
+    public byte[] getValue()
+    {
+        return _value;
+    }
+
+    public String getTopic()
+    {
+        return _topic;
+    }
+}

+ 11 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/disruptor/LongEventFactory.java

@@ -0,0 +1,11 @@
+package com.xyhy.chargpilemqttcluster.common.disruptor;
+
+import com.lmax.disruptor.EventFactory;
+
+public class LongEventFactory implements EventFactory {
+
+    @Override
+    public Object newInstance() {
+        return new LongEvent();
+    }
+}

+ 28 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/disruptor/LongEventHandler.java

@@ -0,0 +1,28 @@
+package com.xyhy.chargpilemqttcluster.common.disruptor;
+
+import com.lmax.disruptor.EventHandler;
+import com.xyhy.chargpilemqttcluster.common.Common;
+import com.xyhy.chargpilemqttcluster.workThread.WorkThread;
+import com.xyhy.chargpilemqttcluster.workThread.WorkThreadGroupCommon;
+
+
+public class LongEventHandler extends Common implements EventHandler<LongEvent> {
+
+    public LongEventHandler()
+    {
+
+    }
+
+    @Override
+    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
+        String topic = longEvent.getTopic();
+        byte[] cmd = longEvent.getValue();
+
+        WorkThread work = new WorkThread();
+        work.set_topic(topic);
+        work.set_value(cmd);
+        WorkThreadGroupCommon.getThreadGroupCom().getWorkThreadPool().execute(work);
+
+
+    }
+}

+ 35 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/disruptor/LongEventProducer.java

@@ -0,0 +1,35 @@
+package com.xyhy.chargpilemqttcluster.common.disruptor;
+
+import com.lmax.disruptor.RingBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LongEventProducer {
+
+    public final Logger log =  LoggerFactory.getLogger(this.getClass());
+
+    private RingBuffer<LongEvent> ringBuffer;
+
+    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
+        this.ringBuffer = ringBuffer;
+    }
+
+    public void onData(byte[] byteBuffer, String topic) {
+        // 获取事件队列的下表位置
+        long sequence = ringBuffer.next();
+        try {
+            // 取出空队列
+            LongEvent longEvent = ringBuffer.get(sequence);
+            // 给空队列赋值
+            longEvent.set(byteBuffer,topic);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+
+            ringBuffer.publish(sequence);
+        }
+
+    }
+
+
+}

+ 36 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/mqtt/MqttGetMessage.java

@@ -0,0 +1,36 @@
+package com.xyhy.chargpilemqttcluster.common.mqtt;
+
+import com.lmax.disruptor.RingBuffer;
+import com.xyhy.chargpilemqttcluster.common.Common;
+import com.xyhy.chargpilemqttcluster.common.disruptor.DisruptorCommon;
+import com.xyhy.chargpilemqttcluster.common.disruptor.LongEvent;
+import com.xyhy.chargpilemqttcluster.common.disruptor.LongEventProducer;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MqttGetMessage extends Common implements MqttCallback {
+
+    @Override
+    public void connectionLost(Throwable throwable) {
+        log.error("MQTT连接已经断开!");
+        //MqttServer.getInstance().reConnect();
+    }
+
+    @Override
+    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
+        RingBuffer<LongEvent> ringBuffer = DisruptorCommon.getDisruptorCommon().getDisruptor().getRingBuffer();
+
+        // 8.创建生产者
+        LongEventProducer longEventProducer = new LongEventProducer(ringBuffer);
+
+        longEventProducer.onData(mqttMessage.getPayload(),topic);
+    }
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+
+    }
+}

+ 133 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/mqtt/MqttServer.java

@@ -0,0 +1,133 @@
+package com.xyhy.chargpilemqttcluster.common.mqtt;
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.util.List;
+
+
+public class MqttServer extends Common {
+
+
+    private static MqttServer _mqttServer;
+
+    private MqttClient _client;
+
+    private MqttGetMessage pushCallback;
+
+    private String host = Common.getConfigByName("MQTT.server.hostUrl");
+
+    private String clientId = Common.getConfigByName("project.name");
+
+    private String username = Common.getConfigByName("MQTT.server.hostName");
+
+    private String password = Common.getConfigByName("MQTT.server.password");
+
+    private int timeout = Integer.parseInt(Common.getConfigByName("MQTT.server.timeout"));
+
+    private int keepAlive = Integer.parseInt(Common.getConfigByName("MQTT.server.keepAlive"));
+
+    //@Value("#{'${MQTT.server.Subscribe}'.split(',')}")
+    private String[] subscribe;
+
+    public static MqttServer getInstance(){
+        if (_mqttServer == null){
+            _mqttServer = new MqttServer();
+        }
+        return _mqttServer;
+    }
+
+    public void connect(){
+        _mqttServer.connect(host, clientId, username, password, timeout,keepAlive);
+    }
+
+
+    /**
+     * 客户端连接
+     *
+     * @param host      ip+端口
+     * @param clientID  客户端Id
+     * @param username  用户名
+     * @param password  密码
+     * @param timeout   超时时间
+     * @param keeplive 保留数
+     */
+    public void connect(String host,String clientID,String username,String password,int timeout,int keeplive){
+        MqttClient client;
+        try {
+            client=new MqttClient(host,clientID,new MemoryPersistence());
+            MqttConnectOptions options=new MqttConnectOptions();
+            options.setCleanSession(true);
+            options.setUserName(username);
+            options.setPassword(password.toCharArray());
+            options.setConnectionTimeout(timeout);
+            options.setKeepAliveInterval(keeplive);
+            _client = client;
+            try {
+                client.setCallback(new MqttGetMessage());
+                client.connect(options);
+                log.info("MQTT 服务连接成功!");
+                configSubscribe();
+            }catch (Exception e){
+                e.printStackTrace();
+                log.error("MQTT 服务连接失败!",e);
+            }
+        }catch (Exception e){
+            e.printStackTrace();
+            log.error("MQTT 服务连接失败!",e);
+        }
+    }
+
+    void configSubscribe(){
+
+        subscribe = Common.getConfigByName("MQTT.server.Subscribe").split(",");
+        for (String topic : subscribe){
+            addSub(topic);
+        }
+    }
+
+    public void addSub(String topic) {
+        try {
+            _client.subscribe(topic,1);
+            log.info("MQTT 添加订阅主题成功!TOPIC:"+topic);
+        } catch (MqttException e) {
+            e.printStackTrace();
+            log.error("MQTT 添加订阅主题失败!TOPIC:"+topic,e);
+        }
+    }
+
+    /**
+     * 发布
+     * @param topic       主题
+     * @param pushMessage 消息体
+     */
+    public void pushlish(String topic,String pushMessage){
+        log.info("发送消息:"+ pushMessage);
+        MqttMessage message=new MqttMessage();
+        message.setQos(1);
+        message.setRetained(false);
+        message.setPayload(pushMessage.getBytes());
+        MqttTopic mqttTopic= _client.getTopic(topic);
+        if(null== mqttTopic){
+            log.error("topic not exist");
+        }
+        MqttDeliveryToken token;
+        try {
+            token=mqttTopic.publish(message);
+            token.waitForCompletion();
+        }catch (MqttPersistenceException e){
+            e.printStackTrace();
+        }catch (MqttException e){
+            e.printStackTrace();
+        }
+    }
+
+    public void reConnect(){
+        try {
+            _client.reconnect();
+        }catch (MqttException e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 52 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/RabbitUtils.java

@@ -0,0 +1,52 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+public class RabbitUtils extends Common {
+
+    private static final ConnectionFactory CONNECTION_FACTORY = new ConnectionFactory();
+
+    static {
+        CONNECTION_FACTORY.setHost(Common.getConfigByName("rabbitMq.server.ip"));
+        CONNECTION_FACTORY.setPort(Integer.parseInt(Common.getConfigByName("rabbitMq.server.port")));
+        CONNECTION_FACTORY.setUsername(Common.getConfigByName("rabbitMq.server.userName"));
+        CONNECTION_FACTORY.setPassword(Common.getConfigByName("rabbitMq.server.passWord"));
+        CONNECTION_FACTORY.setVirtualHost(Common.getConfigByName("rabbitMq.server.vHost"));
+    }
+
+    //获取连接方法
+    public static Connection getConnection() {
+        try {
+            return CONNECTION_FACTORY.newConnection();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    //获取通道
+    public static Channel getChannel() {
+        try {
+            return RabbitUtils.getConnection().createChannel();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    //关闭通道和关闭连接工具方法
+    public static void closeConnectionAndChannel(Channel channel, Connection connection) {
+        try {
+            if (channel != null) {
+                channel.close();
+            }
+
+            if (connection != null) {
+                connection.close();
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 50 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/listening/Listen_addOrder.java

@@ -0,0 +1,50 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.listening;
+
+import com.alibaba.fastjson.JSON;
+import com.rabbitmq.client.*;
+import com.xyhy.chargpilemqttcluster.common.Common;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.RabbitUtils;
+import com.xyhy.chargpilemqttcluster.workThread.message.back.MessageBackThreadGroupCommon;
+import com.xyhy.chargpilemqttcluster.workThread.message.back.Message_back_addOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class Listen_addOrder {
+
+    public final Logger log =  LoggerFactory.getLogger(this.getClass());
+
+    public void start()
+    {
+
+        String exchangeName = Common.getConfigByName("rabbitMq.server.TOMQTTSERVEREXCHANGE");
+        String queueName = "mqtt."+ Common.FUNCTION_ADDORDER_CALLBACK;
+        String Topic = queueName;
+
+        try {
+            Channel channel =  RabbitUtils.getChannel();
+            channel.exchangeDeclare(exchangeName, "topic",true);
+            channel.queueDeclare(queueName,true,false,false,null);
+            channel.queueBind(queueName,exchangeName,Topic);
+
+            Consumer consumer = new DefaultConsumer(channel) {
+                @Override
+                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
+                                           byte[] body) throws IOException {
+
+                    String message = new String(body, "UTF-8");
+                    Message_back_addOrder work = new Message_back_addOrder();
+                    work.setOb(JSON.parseObject(message));
+                    MessageBackThreadGroupCommon.getThreadGroupCom().getWorkThreadPool().execute(work);
+                }
+            };
+            // channel绑定队列、消费者,autoAck为true表示一旦收到消息则自动回复确认消息
+            channel.basicConsume(queueName, true, consumer);
+            log.info("返回订单开始监听.....");
+        } catch (IOException e) {
+            log.error("返回订单开始监听",e);
+        }
+    }
+
+}

+ 49 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/listening/Listen_getChargInfo.java

@@ -0,0 +1,49 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.listening;
+
+import com.alibaba.fastjson.JSON;
+import com.rabbitmq.client.*;
+import com.xyhy.chargpilemqttcluster.common.Common;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.RabbitUtils;
+import com.xyhy.chargpilemqttcluster.workThread.message.back.MessageBackThreadGroupCommon;
+import com.xyhy.chargpilemqttcluster.workThread.message.back.Message_startCP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class Listen_getChargInfo {
+
+    public final Logger log =  LoggerFactory.getLogger(this.getClass());
+
+    public void start()
+    {
+
+        String exchangeName = Common.getConfigByName("rabbitMq.server.TOMQTTSERVEREXCHANGE");
+        String queueName = "mqtt."+Common.FUNCTION_GETCPINFO_CALLBACK;
+        String Topic = queueName;
+
+        try {
+            Channel channel =  RabbitUtils.getChannel();
+            channel.exchangeDeclare(exchangeName, "topic",true);
+            channel.queueDeclare(queueName,true,false,false,null);
+            channel.queueBind(queueName,exchangeName,Topic);
+
+            Consumer consumer = new DefaultConsumer(channel) {
+                @Override
+                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
+                                           byte[] body) throws IOException {
+                    String message = new String(body, "UTF-8");
+                    Message_startCP work = new Message_startCP();
+                    work.setOb(JSON.parseObject(message));
+                    MessageBackThreadGroupCommon.getThreadGroupCom().getWorkThreadPool().execute(work);
+                }
+            };
+            // channel绑定队列、消费者,autoAck为true表示一旦收到消息则自动回复确认消息
+            channel.basicConsume(queueName, true, consumer);
+            log.info("返回充电桩信息开始监听.....");
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 49 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/listening/Listen_settlement.java

@@ -0,0 +1,49 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.listening;
+
+import com.alibaba.fastjson.JSON;
+import com.rabbitmq.client.*;
+import com.xyhy.chargpilemqttcluster.common.Common;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.RabbitUtils;
+import com.xyhy.chargpilemqttcluster.workThread.message.back.MessageBackThreadGroupCommon;
+import com.xyhy.chargpilemqttcluster.workThread.message.back.Message_back_settlement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class Listen_settlement {
+
+    public final Logger log =  LoggerFactory.getLogger(this.getClass());
+
+    public void start()
+    {
+
+        String exchangeName = Common.getConfigByName("rabbitMq.server.TOMQTTSERVEREXCHANGE");
+        String queueName = "mqtt."+Common.FUNCTION_SETTLEMENT_CALLBACK;
+        String Topic = queueName;
+
+        try {
+            Channel channel =  RabbitUtils.getChannel();
+            channel.exchangeDeclare(exchangeName, "topic",true);
+            channel.queueDeclare(queueName,true,false,false,null);
+            channel.queueBind(queueName,exchangeName,Topic);
+
+            Consumer consumer = new DefaultConsumer(channel) {
+                @Override
+                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
+                                           byte[] body) throws IOException {
+                    String message = new String(body, "UTF-8");
+                    Message_back_settlement work = new Message_back_settlement();
+                    work.setOb(JSON.parseObject(message));
+                    MessageBackThreadGroupCommon.getThreadGroupCom().getWorkThreadPool().execute(work);
+                }
+            };
+            // channel绑定队列、消费者,autoAck为true表示一旦收到消息则自动回复确认消息
+            channel.basicConsume(queueName, true, consumer);
+            log.info("返回结算信息开始监听.....");
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 49 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/listening/Listen_startCp.java

@@ -0,0 +1,49 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.listening;
+
+import com.alibaba.fastjson.JSON;
+import com.rabbitmq.client.*;
+import com.xyhy.chargpilemqttcluster.common.Common;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.RabbitUtils;
+import com.xyhy.chargpilemqttcluster.workThread.message.back.MessageBackThreadGroupCommon;
+import com.xyhy.chargpilemqttcluster.workThread.message.back.Message_startCP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class Listen_startCp {
+
+    public final Logger log =  LoggerFactory.getLogger(this.getClass());
+
+    public void start()
+    {
+
+        String exchangeName = Common.getConfigByName("rabbitMq.server.TOMQTTSERVEREXCHANGE");
+        String queueName = "mqtt."+Common.FUNCTION_STARTCP;
+        String Topic = queueName;
+
+        try {
+            Channel channel =  RabbitUtils.getChannel();
+            channel.exchangeDeclare(exchangeName, "topic",true);
+            channel.queueDeclare(queueName,true,false,false,null);
+            channel.queueBind(queueName,exchangeName,Topic);
+
+            Consumer consumer = new DefaultConsumer(channel) {
+                @Override
+                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
+                                           byte[] body) throws IOException {
+                    String message = new String(body, "UTF-8");
+                    Message_startCP work = new Message_startCP();
+                    work.setOb(JSON.parseObject(message));
+                    MessageBackThreadGroupCommon.getThreadGroupCom().getWorkThreadPool().execute(work);
+                }
+            };
+            // channel绑定队列、消费者,autoAck为true表示一旦收到消息则自动回复确认消息
+            channel.basicConsume(queueName, true, consumer);
+            log.info("启动充电桩开始监听.....");
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 50 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/listening/Listen_stopCp.java

@@ -0,0 +1,50 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.listening;
+
+import com.alibaba.fastjson.JSON;
+import com.rabbitmq.client.*;
+import com.xyhy.chargpilemqttcluster.common.Common;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.RabbitUtils;
+import com.xyhy.chargpilemqttcluster.workThread.message.back.MessageBackThreadGroupCommon;
+import com.xyhy.chargpilemqttcluster.workThread.message.back.Message_startCP;
+import com.xyhy.chargpilemqttcluster.workThread.message.back.Message_stopCP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class Listen_stopCp {
+
+    public final Logger log =  LoggerFactory.getLogger(this.getClass());
+
+    public void start()
+    {
+
+        String exchangeName = Common.getConfigByName("rabbitMq.server.TOMQTTSERVEREXCHANGE");
+        String queueName = "mqtt."+Common.FUNCTION_STOPCP;
+        String Topic = queueName;
+
+        try {
+            Channel channel =  RabbitUtils.getChannel();
+            channel.exchangeDeclare(exchangeName, "topic",true);
+            channel.queueDeclare(queueName,true,false,false,null);
+            channel.queueBind(queueName,exchangeName,Topic);
+
+            Consumer consumer = new DefaultConsumer(channel) {
+                @Override
+                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
+                                           byte[] body) throws IOException {
+                    String message = new String(body, "UTF-8");
+                    Message_stopCP work = new Message_stopCP();
+                    work.setOb(JSON.parseObject(message));
+                    MessageBackThreadGroupCommon.getThreadGroupCom().getWorkThreadPool().execute(work);
+                }
+            };
+            // channel绑定队列、消费者,autoAck为true表示一旦收到消息则自动回复确认消息
+            channel.basicConsume(queueName, true, consumer);
+            log.info("返回停止充电桩开始监听.....");
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 49 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/listening/Listen_syn_settlement.java

@@ -0,0 +1,49 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.listening;
+
+import com.alibaba.fastjson.JSON;
+import com.rabbitmq.client.*;
+import com.xyhy.chargpilemqttcluster.common.Common;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.RabbitUtils;
+import com.xyhy.chargpilemqttcluster.workThread.message.back.MessageBackThreadGroupCommon;
+import com.xyhy.chargpilemqttcluster.workThread.message.back.Message_back_syn_settlement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class Listen_syn_settlement {
+
+    public final Logger log =  LoggerFactory.getLogger(this.getClass());
+
+    public void start()
+    {
+
+        String exchangeName = Common.getConfigByName("rabbitMq.server.TONETTYSERVEREXCHANGE");
+        String queueName = "mqtt."+Common.FUNCTION_SYSSETTLEMENT_CALLBACK;
+        String Topic = queueName;
+
+        try {
+            Channel channel =  RabbitUtils.getChannel();
+            channel.exchangeDeclare(exchangeName, "topic",true);
+            channel.queueDeclare(queueName,true,false,false,null);
+            channel.queueBind(queueName,exchangeName,Topic);
+
+            Consumer consumer = new DefaultConsumer(channel) {
+                @Override
+                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
+                                           byte[] body) throws IOException {
+                    String message = new String(body, "UTF-8");
+                    Message_back_syn_settlement work = new Message_back_syn_settlement();
+                    work.setOb(JSON.parseObject(message));
+                    MessageBackThreadGroupCommon.getThreadGroupCom().getWorkThreadPool().execute(work);
+                }
+            };
+            // channel绑定队列、消费者,autoAck为true表示一旦收到消息则自动回复确认消息
+            channel.basicConsume(queueName, true, consumer);
+            log.info("返回同步结算订单开始监听.....");
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 27 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_addOrder.java

@@ -0,0 +1,27 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.publish;
+
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+public class Publish_addOrder extends Publish_common{
+
+    private static Publish_addOrder _publish_addOrder;
+
+    public Publish_addOrder()
+    {
+        _exchangeName = Common.getConfigByName("rabbitMq.server.TOIMPLSERVEREXCHANGE");
+        _queueName = Common.getConfigByName("rabbitMq.server.queue.toImplServer")+"."+Common.FUNCTION_ADDORDER;
+        _topic = _queueName;
+        getChannel(_exchangeName,_queueName,_topic);
+    }
+
+    public static Publish_addOrder getInstence()
+    {
+        if (_publish_addOrder == null)
+        {
+            _publish_addOrder = new Publish_addOrder();
+        }
+        return _publish_addOrder;
+    }
+
+}

+ 27 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_addvinno.java

@@ -0,0 +1,27 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.publish;
+
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+public class Publish_addvinno extends Publish_common{
+
+    private static Publish_addvinno _publish_addOrder;
+
+    public Publish_addvinno()
+    {
+        _exchangeName = Common.getConfigByName("rabbitMq.server.TOIMPLSERVEREXCHANGE");
+        _queueName = Common.getConfigByName("rabbitMq.server.queue.toImplServer")+"."+Common.FUNCTION_ADDVINNO;
+        _topic = _queueName;
+        getChannel(_exchangeName,_queueName,_topic);
+    }
+
+    public static Publish_addvinno getInstence()
+    {
+        if (_publish_addOrder == null)
+        {
+            _publish_addOrder = new Publish_addvinno();
+        }
+        return _publish_addOrder;
+    }
+
+}

+ 27 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_backStartCp.java

@@ -0,0 +1,27 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.publish;
+
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+public class Publish_backStartCp extends Publish_common{
+
+    private static Publish_backStartCp _publish_backStartCp;
+
+    public Publish_backStartCp()
+    {
+        _exchangeName = Common.getConfigByName("rabbitMq.server.TOIMPLSERVEREXCHANGE");
+        _queueName = Common.getConfigByName("rabbitMq.server.queue.toImplServer")+"."+Common.FUNCTION_STARTCP_CALLBACK;
+        _topic = _queueName;
+        getChannel(_exchangeName,_queueName,_topic);
+    }
+
+    public static Publish_backStartCp getInstence()
+    {
+        if (_publish_backStartCp == null)
+        {
+            _publish_backStartCp = new Publish_backStartCp();
+        }
+        return _publish_backStartCp;
+    }
+
+}

+ 26 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_backStopCp.java

@@ -0,0 +1,26 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.publish;
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+public class Publish_backStopCp extends Publish_common{
+
+    private static Publish_backStopCp _publish_backStartCp;
+
+    public Publish_backStopCp()
+    {
+        _exchangeName = Common.getConfigByName("rabbitMq.server.TOIMPLSERVEREXCHANGE");
+        _queueName = Common.getConfigByName("rabbitMq.server.queue.toImplServer")+"."+Common.FUNCTION_STOPCP_CALLBACK;
+        _topic = _queueName;
+        getChannel(_exchangeName,_queueName,_topic);
+    }
+
+    public static Publish_backStopCp getInstence()
+    {
+        if (_publish_backStartCp == null)
+        {
+            _publish_backStartCp = new Publish_backStopCp();
+        }
+        return _publish_backStartCp;
+    }
+
+}

+ 26 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_chargingstatus.java

@@ -0,0 +1,26 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.publish;
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+public class Publish_chargingstatus extends Publish_common{
+
+    private static Publish_chargingstatus _publish_backStartCp;
+
+    public Publish_chargingstatus()
+    {
+        _exchangeName = Common.getConfigByName("rabbitMq.server.TOIMPLSERVEREXCHANGE");
+        _queueName = Common.getConfigByName("rabbitMq.server.queue.toImplServer")+"."+Common.FUNCTION_CHARGINGSTATUS;
+        _topic = _queueName;
+        getChannel(_exchangeName,_queueName,_topic);
+    }
+
+    public static Publish_chargingstatus getInstence()
+    {
+        if (_publish_backStartCp == null)
+        {
+            _publish_backStartCp = new Publish_chargingstatus();
+        }
+        return _publish_backStartCp;
+    }
+
+}

+ 63 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_common.java

@@ -0,0 +1,63 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.publish;
+
+import com.rabbitmq.client.Channel;
+import com.xyhy.chargpilemqttcluster.common.Common;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.RabbitUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Publish_common extends Common {
+
+    public Channel _channel;
+
+    public String _exchangeName;
+    public String _queueName;
+    public String _topic;
+
+    public final Logger log =  LoggerFactory.getLogger(this.getClass());
+
+    public Channel getChannel(String exchangeName,String queueName,String topic)
+    {
+        return getChannelByTTL(exchangeName,queueName,topic,0);
+    }
+
+    public Channel getChannelByTTL(String exchangeName,String queueName,String topic,int TTL)
+    {
+
+        if (_channel == null)
+        {
+            Channel channel =  RabbitUtils.getChannel();
+            try {
+                channel.exchangeDeclare(exchangeName, "topic");
+                if (TTL>0)
+                {
+                    Map<String,Object> arguments = new HashMap<String, Object>();
+                    arguments.put("x-message-ttl",TTL);
+                    channel.queueDeclare(queueName,true,false,false,arguments);
+                }else{
+                    channel.queueDeclare(queueName,true,false,false,null);
+                }
+                channel.queueBind(queueName,exchangeName,topic);
+                _channel = channel;
+            } catch (IOException e) {
+                e.printStackTrace();
+                return null;
+            }
+        }
+        return _channel;
+    }
+
+    public void sendMessage(String message)
+    {
+        log.info("SendToServer ExchangeName:"+_exchangeName+" Topic:"+_topic+" Body:"+message);
+        try {
+            _channel.basicPublish(_exchangeName, _topic, null,message.getBytes("UTF-8"));
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 27 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_exceptOrder.java

@@ -0,0 +1,27 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.publish;
+
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+public class Publish_exceptOrder extends Publish_common{
+
+    private static Publish_exceptOrder _publish_backStartCp;
+
+    public Publish_exceptOrder()
+    {
+        _exchangeName = Common.getConfigByName("rabbitMq.server.TOIMPLSERVEREXCHANGE");
+        _queueName = Common.getConfigByName("rabbitMq.server.queue.toImplServer")+"."+Common.FUNCTION_EXCEPTORORDER;
+        _topic = _queueName;
+        getChannel(_exchangeName,_queueName,_topic);
+    }
+
+    public static Publish_exceptOrder getInstence()
+    {
+        if (_publish_backStartCp == null)
+        {
+            _publish_backStartCp = new Publish_exceptOrder();
+        }
+        return _publish_backStartCp;
+    }
+
+}

+ 26 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_getChargInfo.java

@@ -0,0 +1,26 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.publish;
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+public class Publish_getChargInfo extends Publish_common{
+
+    private static Publish_getChargInfo _publish_backStartCp;
+
+    public Publish_getChargInfo()
+    {
+        _exchangeName = Common.getConfigByName("rabbitMq.server.TOIMPLSERVEREXCHANGE");
+        _queueName = Common.getConfigByName("rabbitMq.server.queue.toImplServer")+"."+Common.FUNCTION_GETCPINFO;
+        _topic = _queueName;
+        getChannel(_exchangeName,_queueName,_topic);
+    }
+
+    public static Publish_getChargInfo getInstence()
+    {
+        if (_publish_backStartCp == null)
+        {
+            _publish_backStartCp = new Publish_getChargInfo();
+        }
+        return _publish_backStartCp;
+    }
+
+}

+ 26 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_save.java

@@ -0,0 +1,26 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.publish;
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+public class Publish_save extends Publish_common{
+
+    private static Publish_save _publish_addOrder;
+
+    public Publish_save()
+    {
+        _exchangeName = Common.getConfigByName("rabbitMq.server.TOIMPLSERVEREXCHANGE");
+        _queueName = Common.getConfigByName("rabbitMq.server.queue.toImplServer")+"."+Common.FUNCTION_SAVE;
+        _topic = _queueName;
+        getChannel(_exchangeName,_queueName,_topic);
+    }
+
+    public static Publish_save getInstence()
+    {
+        if (_publish_addOrder == null)
+        {
+            _publish_addOrder = new Publish_save();
+        }
+        return _publish_addOrder;
+    }
+
+}

+ 27 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_saveRealTime.java

@@ -0,0 +1,27 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.publish;
+
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+public class Publish_saveRealTime extends Publish_common{
+
+    private static Publish_saveRealTime _publish_saveRealTime;
+
+    public Publish_saveRealTime()
+    {
+        _exchangeName = Common.getConfigByName("rabbitMq.server.TOREALTIMEDATASERVEREXCHANGE");
+        _queueName = Common.getConfigByName("rabbitMq.server.queue.toRealTimeServer")+"."+Common.FUNCTION_REALTIMEDATA_CAR;
+        _topic = _queueName;
+        getChannelByTTL(_exchangeName,_queueName,_topic,60000);
+    }
+
+    public static Publish_saveRealTime getInstence()
+    {
+        if (_publish_saveRealTime == null)
+        {
+            _publish_saveRealTime = new Publish_saveRealTime();
+        }
+        return _publish_saveRealTime;
+    }
+
+}

+ 27 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_saveRealTime_LowBattery.java

@@ -0,0 +1,27 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.publish;
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+public class Publish_saveRealTime_LowBattery extends Publish_common{
+
+    private static Publish_saveRealTime_LowBattery _publish_saveRealTime_LowBattery;
+
+    public Publish_saveRealTime_LowBattery()
+    {
+        _exchangeName = Common.getConfigByName("rabbitMq.server.TOIMPLSERVEREXCHANGE");
+        _queueName = Common.getConfigByName("rabbitMq.server.queue.toImplServer")+"."+Common.FUNCTION_REALTIMEDATA_LOWBATTERY;
+        _topic = _queueName;
+        //getChannelByTTL(_exchangeName,_queueName,_topic,60000);
+        getChannel(_exchangeName,_queueName,_topic);
+    }
+
+    public static Publish_saveRealTime_LowBattery getInstence()
+    {
+        if (_publish_saveRealTime_LowBattery == null)
+        {
+            _publish_saveRealTime_LowBattery = new Publish_saveRealTime_LowBattery();
+        }
+        return _publish_saveRealTime_LowBattery;
+    }
+
+}

+ 27 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_saveRealTime_Tricycle.java

@@ -0,0 +1,27 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.publish;
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+public class Publish_saveRealTime_Tricycle extends Publish_common{
+
+    private static Publish_saveRealTime_Tricycle _publish_saveRealTime_Tricycle;
+
+    public Publish_saveRealTime_Tricycle()
+    {
+        _exchangeName = Common.getConfigByName("rabbitMq.server.TOIMPLSERVEREXCHANGE");
+        _queueName = Common.getConfigByName("rabbitMq.server.queue.toImplServer")+"."+Common.FUNCTION_REALTIMEDATA_TRICYCLE;
+        _topic = _queueName;
+        //getChannelByTTL(_exchangeName,_queueName,_topic,60000);
+        getChannel(_exchangeName,_queueName,_topic);
+    }
+
+    public static Publish_saveRealTime_Tricycle getInstence()
+    {
+        if (_publish_saveRealTime_Tricycle == null)
+        {
+            _publish_saveRealTime_Tricycle = new Publish_saveRealTime_Tricycle();
+        }
+        return _publish_saveRealTime_Tricycle;
+    }
+
+}

+ 26 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_saveWarning.java

@@ -0,0 +1,26 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.publish;
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+public class Publish_saveWarning extends Publish_common{
+
+    private static Publish_saveWarning _publish_addOrder;
+
+    public Publish_saveWarning()
+    {
+        _exchangeName = Common.getConfigByName("rabbitMq.server.TOIMPLSERVEREXCHANGE");
+        _queueName = Common.getConfigByName("rabbitMq.server.queue.toImplServer")+"."+Common.FUNCTION_SAVEWARNING;
+        _topic = _queueName;
+        getChannel(_exchangeName,_queueName,_topic);
+    }
+
+    public static Publish_saveWarning getInstence()
+    {
+        if (_publish_addOrder == null)
+        {
+            _publish_addOrder = new Publish_saveWarning();
+        }
+        return _publish_addOrder;
+    }
+
+}

+ 27 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_settlement.java

@@ -0,0 +1,27 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.publish;
+
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+public class Publish_settlement extends Publish_common{
+
+    private static Publish_settlement _publish_addOrder;
+
+    public Publish_settlement()
+    {
+        _exchangeName = Common.getConfigByName("rabbitMq.server.TOIMPLSERVEREXCHANGE");
+        _queueName = Common.getConfigByName("rabbitMq.server.queue.toImplServer")+"."+Common.FUNCTION_SETTLEMENT;
+        _topic = _queueName;
+        getChannel(_exchangeName,_queueName,_topic);
+    }
+
+    public static Publish_settlement getInstence()
+    {
+        if (_publish_addOrder == null)
+        {
+            _publish_addOrder = new Publish_settlement();
+        }
+        return _publish_addOrder;
+    }
+
+}

+ 27 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/rabbitmq/publish/Publish_syn_settlement.java

@@ -0,0 +1,27 @@
+package com.xyhy.chargpilemqttcluster.common.rabbitmq.publish;
+
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+public class Publish_syn_settlement extends Publish_common{
+
+    private static Publish_syn_settlement _publish_addOrder;
+
+    public Publish_syn_settlement()
+    {
+        _exchangeName = Common.getConfigByName("rabbitMq.server.TOIMPLSERVEREXCHANGE");
+        _queueName = Common.getConfigByName("rabbitMq.server.queue.toImplServer")+"."+Common.FUNCTION_SYSSETTLEMENT;
+        _topic = _queueName;
+        getChannel(_exchangeName,_queueName,_topic);
+    }
+
+    public static Publish_syn_settlement getInstence()
+    {
+        if (_publish_addOrder == null)
+        {
+            _publish_addOrder = new Publish_syn_settlement();
+        }
+        return _publish_addOrder;
+    }
+
+}

+ 56 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/redis/JedisPoolUtils.java

@@ -0,0 +1,56 @@
+package com.xyhy.chargpilemqttcluster.common.redis;
+
+import com.xyhy.chargpilemqttcluster.common.Common;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+
+public class JedisPoolUtils {
+
+
+    public  static JedisPoolUtils util = null;
+
+    private static JedisPool jedisPool = null;
+
+    static {
+        //1.创建连接池配置对象
+        JedisPoolConfig config = new JedisPoolConfig();
+        //2.配置连接池参数
+        //2.1忙时最大连接(最大连接数)
+        config.setMaxTotal(Integer.parseInt(Common.getConfigByName("reids.server.MaxTotal")));//连接池的最大连接数。使用负数表示无极限。
+        //2.2闲时最大连接(最小连接数)
+        config.setMaxIdle(Integer.parseInt(Common.getConfigByName("reids.server.MaxIdle")));//连接池的最大空闲连接数。如果值很小的话会频繁的创建销毁连接导致性能开销增加。
+        //2.*闲时最小连接数
+        config.setMinIdle(Integer.parseInt(Common.getConfigByName("reids.server.MinIdle")));//连接池的最小空闲连接数。确保池满足要求在空闲对象回收运行期间的最小实例数。minIdle配置值大于配置值对于maxIdle,则将使用maxIdle的值。
+        //2.3超时时间
+        config.setMaxWaitMillis(Integer.parseInt(Common.getConfigByName("reids.server.MaxWaitMillis")));
+        //2.4获取连接池检查时机
+        config.setTestOnBorrow(Boolean.parseBoolean(Common.getConfigByName("reids.server.TestOnBorrow")));
+        //3.通过连接池配置对象,设置参数
+        jedisPool = new JedisPool(config, Common.getConfigByName("redis.server.ip"),
+                Integer.parseInt(Common.getConfigByName("redis.server.port")),
+                Integer.parseInt((Common.getConfigByName("reids.server.Timeout"))),
+                Common.getConfigByName("redis.server.pw"),
+                Integer.parseInt(Common.getConfigByName("redis.server.db")));
+    }
+
+    public static JedisPoolUtils getInstance(){
+        if (util ==null){
+            util = new JedisPoolUtils();
+        }
+        return util;
+    }
+
+
+    /**
+     *  获取连接
+     **/
+    public Jedis getConnection(){
+        if (jedisPool != null) {
+            return jedisPool.getResource();
+        }else {
+            return null;
+        }
+    }
+
+}

+ 40 - 0
src/main/java/com/xyhy/chargpilemqttcluster/common/redis/RedisHelp.java

@@ -0,0 +1,40 @@
+package com.xyhy.chargpilemqttcluster.common.redis;
+
+import redis.clients.jedis.Jedis;
+
+
+public class RedisHelp {
+
+    public static void setDeviceVulue(String key,String value)
+    {
+        Jedis jedis = JedisPoolUtils.getInstance().getConnection();
+        jedis.set("Device:"+key,value);
+        jedis.close();
+    }
+
+    public static String getDeviceVulue(String key)
+    {
+        Jedis jedis = JedisPoolUtils.getInstance().getConnection();
+        String value = jedis.get("Device:"+key);
+        jedis.close();
+        return value;
+    }
+
+    public static void setOrderCheckVulue(String key,String value)
+    {
+        Jedis jedis = JedisPoolUtils.getInstance().getConnection();
+        jedis.set("OrderCheck:"+key,value);
+        jedis.close();
+    }
+
+    public static String getOrderCheckVulue(String key)
+    {
+        Jedis jedis = JedisPoolUtils.getInstance().getConnection();
+        String value = jedis.get("OrderCheck:"+key);
+        jedis.close();
+        return value;
+    }
+
+
+
+}

+ 71 - 0
src/main/java/com/xyhy/chargpilemqttcluster/workThread/Job/Job_startCheck.java

@@ -0,0 +1,71 @@
+package com.xyhy.chargpilemqttcluster.workThread.Job;
+
+import com.alibaba.fastjson.JSONObject;
+import com.xyhy.chargpilemqttcluster.common.Common;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.publish.Publish_backStartCp;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.publish.Publish_chargingstatus;
+import com.xyhy.chargpilemqttcluster.common.redis.RedisHelp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Job_startCheck extends Thread {
+
+    public String cdzid;
+
+    public String cdjkid;
+
+    public String orderId;
+
+    public final Logger log = LoggerFactory.getLogger(this.getClass());
+
+    public void run() {
+
+        int i = 0;
+        while (i < 60) {
+            //System.out.println("睡眠:"+i);
+
+            String obj = RedisHelp.getOrderCheckVulue(cdzid+"-"+cdjkid+"-"+orderId);
+            JSONObject returnOBj = JSONObject.parseObject(obj);
+            String getResponse = returnOBj.getString("step");
+
+            if (getResponse.equals("starting")) {
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+
+            } else if (getResponse.equals("success")) {
+
+
+                JSONObject ob2 = new JSONObject();
+                ob2.put("pileLogBean",returnOBj.getJSONObject("pileLogBean"));
+                ob2.put("orderId",orderId);
+                ob2.put("cp_id",cdzid+""+cdjkid);
+                ob2.put("resultCode",200);
+
+                Publish_backStartCp.getInstence().sendMessage(ob2.toJSONString());
+
+                return;
+            } else if (getResponse.equals("error")) {
+                return;
+            }
+            i++;
+        }
+
+        /*
+        String Str_js2;
+        JSONObject ob2 = new JSONObject();
+        ob2.put("orderId", orderId);
+        log.error("开始充电失败(超时,充电桩未响应) 订单id:" + orderId);
+        ob2.put("code", "11");
+        ob2.put("message", "开始充电失败(超时,充电桩未响应) 订单id:" + orderId);
+
+        Publish_chargingstatus.getInstence().sendMessage(ob2.toJSONString());
+
+         */
+
+
+    }
+}

+ 69 - 0
src/main/java/com/xyhy/chargpilemqttcluster/workThread/Job/Job_stopCheck.java

@@ -0,0 +1,69 @@
+package com.xyhy.chargpilemqttcluster.workThread.Job;
+
+import com.alibaba.fastjson.JSONObject;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.publish.Publish_backStartCp;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.publish.Publish_backStopCp;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.publish.Publish_chargingstatus;
+import com.xyhy.chargpilemqttcluster.common.redis.RedisHelp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Job_stopCheck extends Thread{
+    public String cdzid;
+
+    public String cdjkid;
+
+    public String orderId;
+
+    public final Logger log = LoggerFactory.getLogger(this.getClass());
+
+    public void run() {
+
+        int i = 0;
+        while (i < 60) {
+            //System.out.println("睡眠:"+i);
+
+            String obj = RedisHelp.getOrderCheckVulue(cdzid+"-"+cdjkid+"-"+orderId);
+            JSONObject returnOBj = JSONObject.parseObject(obj);
+            String getResponse = returnOBj.getString("step");
+
+            if (getResponse.equals("stoping")) {
+
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+
+            } else if (getResponse.equals("success")) {
+
+
+                JSONObject ob2 = new JSONObject();
+                ob2.put("orderId",orderId);
+                ob2.put("cp_id",cdzid+""+cdjkid);
+                ob2.put("resultCode",200);
+
+                Publish_backStopCp.getInstence().sendMessage(ob2.toJSONString());
+
+                return;
+            } else if (getResponse.equals("error")) {
+                return;
+            }
+            i++;
+        }
+
+        /*
+        String Str_js2;
+        JSONObject ob2 = new JSONObject();
+        ob2.put("orderId", orderId);
+        log.error("开始充电失败(超时,充电桩未响应) 订单id:" + orderId);
+        ob2.put("code", "11");
+        ob2.put("message", "开始充电失败(超时,充电桩未响应) 订单id:" + orderId);
+
+        Publish_chargingstatus.getInstence().sendMessage(ob2.toJSONString());
+
+         */
+
+
+    }
+}

+ 41 - 0
src/main/java/com/xyhy/chargpilemqttcluster/workThread/WorkThread.java

@@ -0,0 +1,41 @@
+package com.xyhy.chargpilemqttcluster.workThread;
+
+import com.xyhy.chargpilemqttcluster.workThread.message.MessageBus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class WorkThread implements Runnable {
+
+
+    public final Logger log =  LoggerFactory.getLogger(this.getClass());
+
+    private String _topic;
+    private byte[] _value;
+
+    public String get_topic() {
+        return _topic;
+    }
+
+    public void set_topic(String _topic) {
+        this._topic = _topic;
+    }
+
+    public byte[] get_value() {
+        return _value;
+    }
+
+    public void set_value(byte[] _value) {
+        this._value = _value;
+    }
+
+    public WorkThread() {
+
+    }
+
+    @Override
+    public void run() {
+        new MessageBus().getMessageAdupter(_topic,_value);
+    }
+}

+ 42 - 0
src/main/java/com/xyhy/chargpilemqttcluster/workThread/WorkThreadGroupCommon.java

@@ -0,0 +1,42 @@
+package com.xyhy.chargpilemqttcluster.workThread;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class WorkThreadGroupCommon {
+
+    private static WorkThreadGroupCommon _ThreadGroupCommon;
+
+    private ThreadPoolExecutor _threadPoolExecutor;
+
+
+    public static WorkThreadGroupCommon getThreadGroupCom()
+    {
+        if(_ThreadGroupCommon==null)
+        {
+            _ThreadGroupCommon = new WorkThreadGroupCommon(1000);
+        }
+        return _ThreadGroupCommon;
+    }
+
+    public WorkThreadGroupCommon(int threadsNum)
+    {
+        _threadPoolExecutor =  new ThreadPoolExecutor(threadsNum, threadsNum,
+                0L, TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<Runnable>());
+    }
+
+    public ThreadPoolExecutor getWorkThreadPool()
+    {
+        return _threadPoolExecutor;
+    }
+
+
+
+
+
+
+
+
+}

+ 132 - 0
src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/MessageBus.java

@@ -0,0 +1,132 @@
+package com.xyhy.chargpilemqttcluster.workThread.message;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.xyhy.chargpilemqttcluster.common.mqtt.MqttServer;
+import com.xyhy.chargpilemqttcluster.common.rabbitmq.publish.*;
+import com.xyhy.chargpilemqttcluster.common.redis.RedisHelp;
+
+import java.text.SimpleDateFormat;
+
+public class MessageBus extends MessageCommon{
+
+    public void Message_101(JSONObject cmd){
+        String devId = cmd.getString("devId");
+        String devType = cmd.getString("devType");
+        RedisHelp.setDeviceVulue(devId,ONLINE);
+        String sendtopic = "toclient/"+devType+"/"+devId;
+        JSONObject returnObj = getSUCCESSResult(102);
+        MqttServer.getInstance().pushlish(sendtopic,returnObj.toJSONString());
+    }   
+
+    public void Message_103(JSONObject cmd){
+        log.info(cmd.toJSONString());
+    }
+
+
+    public void Message_201(JSONObject cmd){
+        JSONObject obj =null;
+        int devType = cmd.getInteger("devType");
+        switch (devType){
+            case 4:
+                obj = getRTDevType3(cmd);
+                //Publish_saveRealTime_Tricycle.getInstence().sendMessage(obj.toJSONString());
+                break;
+        }
+
+        JSONArray arr_Bettey = getRTBettey(cmd);
+        if (arr_Bettey!=null && arr_Bettey.size()>0){
+            Publish_saveRealTime_LowBattery.getInstence().sendMessage(arr_Bettey.toJSONString());
+        }
+
+    }
+
+    public void Message_302(JSONObject cmd){
+
+       String devId = cmd.getString("devId");
+       String intId = cmd.getString("intId");
+       String orderId = cmd.getString("orderId");
+       int resultCode = cmd.getIntValue("resultCode");
+
+       if (resultCode == 1){
+
+           String obj = RedisHelp.getOrderCheckVulue(devId+"-"+intId+"-"+orderId);
+           JSONObject returnOBj = JSONObject.parseObject(obj);
+           returnOBj.put("step","success");
+           RedisHelp.setOrderCheckVulue(devId+"-"+intId+"-"+orderId,returnOBj.toJSONString());
+
+       }else{
+
+       }
+    }
+
+    public void Message_303(JSONObject cmd){
+
+
+
+
+    }
+
+    public void Message_305(JSONObject cmd){
+
+        String devId = cmd.getString("devId");
+        String intId = cmd.getString("intId");
+        String orderId = cmd.getString("orderId");
+        int resultCode = cmd.getIntValue("resultCode");
+
+        if (resultCode == 1){
+
+            String obj = RedisHelp.getOrderCheckVulue(devId+"-"+intId+"-"+orderId);
+            JSONObject returnOBj = JSONObject.parseObject(obj);
+            returnOBj.put("step","success");
+            RedisHelp.setOrderCheckVulue(devId+"-"+intId+"-"+orderId,returnOBj.toJSONString());
+
+        }else{
+
+        }
+    }
+
+
+    public void Message_309(JSONObject cmd){
+
+        JSONObject body = new JSONObject();
+        body.put("chargPileId",cmd.getString("devId")+""+cmd.getString("intId"));
+        body.put("chargnumber",cmd.getString("startdeviceid"));
+        Publish_addOrder.getInstence().sendMessage(body.toJSONString());
+
+    }
+
+    public void Message_401(JSONObject cmd){
+
+        JSONObject obj = new JSONObject();
+        if (cmd.getString("js002").equals("01"))//账号
+        {
+            obj.put("userId",cmd.getString("js003"));
+        }else{ //卡
+            obj.put("chargnumber",cmd.getString("js003"));
+        }
+
+        obj.put("userType",cmd.getString("js002"));
+
+        obj.put("chargPileId",cmd.getString("devId")+cmd.getString("js000"));
+        obj.put("orderid",cmd.getString("js001"));
+
+        obj.put("chargstarttime",cmd.getString("js006"));
+        obj.put("chargendtime",cmd.getString("js007"));
+        obj.put("sharpChargPower",cmd.getDoubleValue("js008"));
+        obj.put("peakChargPower",cmd.getDoubleValue("js010"));
+        obj.put("flatChargPower",cmd.getDoubleValue("js012"));
+        obj.put("valleyChargPower",cmd.getDoubleValue("js014"));
+        obj.put("orderType",cmd.getString("js004"));
+        obj.put("electricMeterBeginNum",cmd.getDoubleValue("js019"));
+        obj.put("electricMeterEndNum",cmd.getDoubleValue("js020"));
+        obj.put("chargpilelogCause",cmd.getString("js021"));
+        obj.put("vinno",cmd.getString("js023"));
+        obj.put("socstart",cmd.getDoubleValue("js024"));
+        obj.put("socend",cmd.getDoubleValue("js025"));
+
+        Publish_settlement.getInstence().sendMessage(obj.toJSONString());
+
+    }
+
+}

+ 313 - 0
src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/MessageCommon.java

@@ -0,0 +1,313 @@
+package com.xyhy.chargpilemqttcluster.workThread.message;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.xyhy.chargpilemqttcluster.common.Common;
+
+import java.lang.reflect.Method;
+import java.util.Calendar;
+import java.util.Date;
+
+
+public class MessageCommon extends Common {
+
+
+    public String _topic;
+
+    public String getTopic() {
+        return _topic;
+    }
+
+    public void setTopic(String topic) {
+        this._topic = topic;
+    }
+
+    public MessageCommon()
+    {
+
+    }
+
+    public String getToClientTopic(JSONObject obj,String type){
+        String topic ="toclient";
+        topic = topic +"/" + type+"/" + obj.getString("cp_id");
+        return topic;
+    }
+
+    //接受命令总线
+    public void getMessageAdupter(String topic,byte[] cmd)
+    {
+        int cmdindex = topic.split("/").length;
+
+        int cmd_type_index = Integer.parseInt(topic.split("/")[cmdindex-1]);
+
+        String functionName = "Message_"+cmd_type_index;
+        try {
+            this._topic = topic;
+            Method m = this.getClass().getDeclaredMethod(functionName,JSONObject.class);
+            JSONObject js_cmd = JSONObject.parseObject(new String(cmd));
+            log.info("收到设备消息:"+js_cmd.toJSONString());
+            m.invoke(this,js_cmd);
+        }catch (Exception ex)
+        {
+            log.error("没有找到响应的处理单元!",ex);
+        }
+
+    }
+
+
+    //cp56time2a(电力专用时间)转Date
+    public Date Convert_cp56time2a_to_date(byte[] cp56time2a)
+    {
+        byte seconds0 = cp56time2a[0];
+        byte seconds1 = cp56time2a[1];
+        byte min = cp56time2a[2];
+        byte hours = cp56time2a[3];
+        byte week = cp56time2a[4];
+        byte day = cp56time2a[4];
+        byte months = cp56time2a[5];
+        byte year = cp56time2a[6];
+
+        if (year==0xff || months==0xff || day==0xff || hours==0xff || min==0xff || seconds1==0xff)
+        {
+            Calendar cal = Calendar.getInstance();
+            cal.set(1970, 0, 01);
+            return cal.getTime();
+        }
+
+        String bit_seconds0 = ""+(byte)((seconds0 >> 7) & 0x1)+
+                              (byte)((seconds0 >> 6) & 0x1)+
+                              (byte)((seconds0 >> 5) & 0x1)+
+                              (byte)((seconds0 >> 4) & 0x1)+
+                              (byte)((seconds0 >> 3) & 0x1)+
+                              (byte)((seconds0 >> 2) & 0x1)+
+                              (byte)((seconds0 >> 1) & 0x1)+
+                              (byte)((seconds0 >> 0) & 0x1);
+
+        String bit_seconds1 = ""+(byte)((seconds1 >> 7) & 0x1)+
+                (byte)((seconds1 >> 6) & 0x1)+
+                (byte)((seconds1 >> 5) & 0x1)+
+                (byte)((seconds1 >> 4) & 0x1)+
+                (byte)((seconds1 >> 3) & 0x1)+
+                (byte)((seconds1 >> 2) & 0x1)+
+                (byte)((seconds1 >> 1) & 0x1)+
+                (byte)((seconds1 >> 0) & 0x1);
+
+        String bit_min = ""+(byte)((min >> 5) & 0x1)+
+                (byte)((min >> 4) & 0x1)
+                +(byte)((min >> 3) & 0x1)+
+                (byte)((min >> 2) & 0x1) +
+                (byte)((min >> 1) & 0x1)+
+                (byte)((min >>0) & 0x1);
+
+        String bit_hours = ""+(byte)((hours >> 4) & 0x1)
+                +(byte)((hours >> 3) & 0x1)+
+                (byte)((hours >> 2) & 0x1) +
+                (byte)((hours >> 1) & 0x1)+
+                (byte)((hours >> 0) & 0x1);
+
+        String bit_week = ""+(byte)((week >> 7) & 0x1)+
+                (byte)((week >> 6) & 0x1)+
+                (byte)((week >> 5) & 0x1);
+
+        String bit_day = ""+(byte)((day >> 4) & 0x1)
+                +(byte)((day >> 3) & 0x1)+
+                (byte)((day >> 2) & 0x1) +
+                (byte)((day >> 1) & 0x1)+
+                (byte)((day >> 0) & 0x1);
+
+        String bit_months = ""+(byte)((months >> 3) & 0x1)+
+                (byte)((months >> 2) & 0x1) +
+                (byte)((months >> 1) & 0x1)+
+                (byte)((months >> 0) & 0x1);
+
+        String bit_year = ""+(byte)((year >> 6) & 0x1)+
+                (byte)((year >> 5) & 0x1)+
+                (byte)((year >> 4) & 0x1)+
+                (byte)((year >> 3) & 0x1)+
+                (byte)((year >> 2) & 0x1)+
+                (byte)((year >> 1) & 0x1)+
+                (byte)((year >> 0) & 0x1);
+
+
+
+        int seccound = Integer.parseInt(bit_seconds1+""+bit_seconds0,2);
+        int seccound_main = seccound/1000;
+        int seccound_millis = seccound%1000;
+
+
+        Calendar c_new = Calendar.getInstance();
+
+        c_new.set(2000+(Integer.parseInt(bit_year,2)),
+                Integer.parseInt(bit_months,2)-1,
+                Integer.parseInt(bit_day,2),
+                Integer.parseInt(bit_hours,2),
+                Integer.parseInt(bit_min,2));
+
+        c_new.set(Calendar.SECOND,seccound_main);
+        c_new.set(Calendar.MILLISECOND,seccound_millis);
+
+        //System.out.println(c_new.get(Calendar.YEAR)+"年"+(c_new.get(Calendar.MONTH)+1)+"月"+c_new.get(Calendar.DAY_OF_MONTH)+"日"+
+          //      c_new.get(Calendar.HOUR)+"时"+c_new.get(Calendar.MINUTE)+"分"+c_new.get(Calendar.SECOND)+"秒"+c_new.get(Calendar.MILLISECOND)+"毫秒");
+
+        return c_new.getTime();
+
+    }
+
+    //date转cp56time2a字节时间
+    public byte[] Convert_date_to_cp56time2a(Date date)
+    {
+        Calendar c = Calendar.getInstance();
+        c.setTime(date);
+
+        int int_year = c.get(Calendar.YEAR);
+        String s_year = get2JZString(int_year-2000,7);
+        byte b_year = bitStringToByte("0"+s_year);
+
+        int int_month = c.get(Calendar.MONTH)+1;
+        String s_month = get2JZString(int_month,5);
+        byte b_month = bitStringToByte("000"+s_month);
+
+        int int_week = c.get(Calendar.DAY_OF_WEEK);
+        String s_week = get2JZString(int_week,3);
+
+        int int_day = c.get(Calendar.DAY_OF_MONTH);
+        String s_day = get2JZString(int_day,5);
+        byte b_day = bitStringToByte(s_week+""+s_day);
+
+
+        int int_hour = c.get(Calendar.HOUR_OF_DAY);
+        String s_hour = get2JZString(int_hour,5);
+        byte b_hour = bitStringToByte("100"+s_hour);
+
+        int int_minute = c.get(Calendar.MINUTE);
+        String s_minute = get2JZString(int_minute,6);
+        byte b_minute = bitStringToByte("00"+s_minute);
+
+        int int_secound = c.get(Calendar.SECOND);
+        int int_millisecound = c.get(Calendar.MILLISECOND);
+
+        int allmillsecound = int_secound*1000+int_millisecound;
+        String s_allmillsecound = get2JZString(allmillsecound,16);
+
+        byte millsecound0 = bitStringToByte(s_allmillsecound.substring(0,8));
+        byte millsecound1 = bitStringToByte(s_allmillsecound.substring(8,16));
+
+
+        byte[] cp56Time2a = new byte[7];
+        cp56Time2a[0]=millsecound1;
+        cp56Time2a[1]=millsecound0;
+        cp56Time2a[2]=b_minute;
+        cp56Time2a[3]=b_hour;
+        cp56Time2a[4]=b_day;
+        cp56Time2a[5]=b_month;
+        cp56Time2a[6]=b_year;
+
+        return cp56Time2a;
+    }
+
+    //10进制转2进制,带补位
+    public String get2JZString(int count,int bw)
+    {
+        String s_count = Integer.toBinaryString(count);
+        if (bw > s_count.length()){
+            String s_bw="";
+           for(int i = 0 ; i < bw-s_count.length();i++){
+               s_bw=s_bw+"0";
+           }
+            s_count = s_bw+s_count;
+        }
+
+        return s_count;
+    }
+
+
+    //2进制bit转byte
+    public byte bitStringToByte(String str) {
+        if(null == str){
+            throw new RuntimeException("when bit string convert to byte, Object can not be null!");
+        }
+        if (8 != str.length()){
+            throw new RuntimeException("bit string'length must be 8");
+        }
+        try{
+            //判断最高位,决定正负
+            if(str.charAt(0) == '0'){
+                return (byte) Integer.parseInt(str,2);
+            }else if(str.charAt(0) == '1'){
+                return (byte) (Integer.parseInt(str,2) - 256);
+            }
+        }catch (NumberFormatException e){
+            throw new RuntimeException("bit string convert to byte failed, byte String must only include 0 and 1!");
+        }
+
+        return 0;
+    }
+
+
+
+
+    public JSONObject getRTDevType3(JSONObject cmd) {
+        JSONObject obj = new JSONObject();
+
+        obj.put("devId",cmd.getString("devId"));
+        obj.put("devType",cmd.getInteger("devType"));
+
+        JSONArray devices = cmd.getJSONArray("rt000");
+        if (devices!=null && devices.size()>0){
+            JSONObject deviceItem = devices.getJSONObject(0);
+            obj.put("rt004",deviceItem.getInteger("rt004"));
+            obj.put("rt025",deviceItem.getDoubleValue("rt025"));
+            obj.put("rt026",deviceItem.getDoubleValue("rt026"));
+            obj.put("rt027",deviceItem.getString("rt027"));
+            obj.put("rt028",deviceItem.getString("rt028"));
+            obj.put("rt029",deviceItem.getInteger("rt029"));
+            obj.put("rt030",deviceItem.getInteger("rt030"));
+            obj.put("rt031",deviceItem.getInteger("rt031"));
+            obj.put("rt032",deviceItem.getInteger("rt032"));
+            obj.put("rt033",deviceItem.getLongValue("rt033"));
+            obj.put("rt034",deviceItem.getLongValue("rt034"));
+            obj.put("rt035",deviceItem.getLongValue("rt035"));
+
+        }
+
+
+        return obj;
+    }
+
+    public JSONArray getRTBettey(JSONObject cmd) {
+
+        JSONArray arr_js_betty = (JSONArray)cmd.getJSONArray("rt036").clone();
+        arr_js_betty.stream().forEach(item -> {
+            JSONObject jsitem= (JSONObject)item;
+            jsitem.put("devId",cmd.getString("devId"));
+            jsitem.put("devType",cmd.getInteger("devType"));
+        });
+        return arr_js_betty;
+    }
+
+
+    public JSONObject getSend(int controlCode){
+        JSONObject returnObj = new JSONObject();
+        returnObj.put("msgType","JSON");
+        returnObj.put("controlCode",controlCode);
+        return returnObj;
+    }
+
+
+    public JSONObject getSUCCESSResult(int controlCode){
+        return getSUCCESSResult(controlCode,true,"Success");
+    }
+
+    public JSONObject getSUCCESSResult(int controlCode,boolean ResultCode,String resultMessage){
+        JSONObject returnObj = new JSONObject();
+        returnObj.put("msgType","JSON");
+        returnObj.put("resultCode",ResultCode);
+        returnObj.put("controlCode",controlCode);
+        returnObj.put("message",resultMessage);
+        return returnObj;
+    }
+
+
+
+}

+ 42 - 0
src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/back/MessageBackThreadGroupCommon.java

@@ -0,0 +1,42 @@
+package com.xyhy.chargpilemqttcluster.workThread.message.back;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class MessageBackThreadGroupCommon {
+
+    private static MessageBackThreadGroupCommon _ThreadGroupCommon;
+
+    private ThreadPoolExecutor _threadPoolExecutor;
+
+
+    public static MessageBackThreadGroupCommon getThreadGroupCom()
+    {
+        if(_ThreadGroupCommon==null)
+        {
+            _ThreadGroupCommon = new MessageBackThreadGroupCommon(1000);
+        }
+        return _ThreadGroupCommon;
+    }
+
+    public MessageBackThreadGroupCommon(int threadsNum)
+    {
+        _threadPoolExecutor =  new ThreadPoolExecutor(threadsNum, threadsNum,
+                0L, TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<Runnable>());
+    }
+
+    public ThreadPoolExecutor getWorkThreadPool()
+    {
+        return _threadPoolExecutor;
+    }
+
+
+
+
+
+
+
+
+}

+ 112 - 0
src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/back/Message_back_addOrder.java

@@ -0,0 +1,112 @@
+package com.xyhy.chargpilemqttcluster.workThread.message.back;
+
+import com.alibaba.fastjson.JSONObject;
+import com.xyhy.chargpilemqttcluster.common.Common;
+import com.xyhy.chargpilemqttcluster.workThread.message.MessageCommon;
+
+
+public class Message_back_addOrder extends MessageCommon implements Runnable {
+
+
+    private JSONObject ob;
+
+    public JSONObject getOb() {
+        return ob;
+    }
+
+    public void setOb(JSONObject ob) {
+        this.ob = ob;
+    }
+
+    @Override
+    public void run() {
+        log.info("监听到队列返回:Listen_addOrder "+ob.toJSONString());
+        /*
+        JSONObject result = ob.getJSONObject("result");
+        JSONObject resultfanal = result.getJSONObject("result");
+        String chargPileId = resultfanal.getString("chargPileId");
+        String cId = chargPileId.substring(0,16);
+        String cjkId = chargPileId.substring(16,18);
+        String card = resultfanal.getString("chargnumber");
+
+        MessageCommon mc = new MessageCommon();
+        sessionMap sm =  sessionMap.getSessionMap();
+        mc.setCtx( sm.getMessage(cId));
+
+
+        int code = result.getInteger("code");
+
+        if (code == 200)
+        {
+
+            double d_balance = Double.parseDouble(resultfanal.get("balance")+"");
+
+            byte[] hender = mc.getSendBodyHender(NO_MESSAGE_HANDER+8+1+16+8+1+4,133,10);
+            hender = mc.CmdLinked(hender,mc.common_getBCDByte(cId));
+            hender = mc.CmdLinked(hender,mc.hexStringToBytes(cjkId));
+            hender = mc.CmdLinked(hender,mc.common_getBCDByte(resultfanal.getString("orderid")));
+            hender = mc.CmdLinked(hender,mc.common_getBCDByte(card));
+            hender = mc.CmdLinked(hender,new byte[]{0x00});
+            hender = mc.CmdLinked(hender,mc.common_getDoubleToByte(d_balance,100,4));
+
+            mc.SendCommend(hender);
+
+            Timmer_orderCheck tk = new Timmer_orderCheck();
+            tk.cdzid = cId;
+            tk.cdjkid = cjkId;
+            tk.orderId = resultfanal.getString("orderid");
+            tk.start();
+
+        }else{
+
+            if (code==501){
+                double d_balance =0.00;
+                byte[] hender = mc.getSendBodyHender(NO_MESSAGE_HANDER+8+1+16+8+1+4,133,10);
+                hender = mc.CmdLinked(hender,mc.common_getBCDByte(cId));
+                hender = mc.CmdLinked(hender,mc.hexStringToBytes(cjkId));
+                hender = mc.CmdLinked(hender,mc.common_getBCDByte("00000000000000000000000000000000"));
+                hender = mc.CmdLinked(hender,mc.common_getBCDByte(card));
+                hender = mc.CmdLinked(hender,new byte[]{0x00});
+                hender = mc.CmdLinked(hender,mc.common_getDoubleToByte(d_balance,100,4));
+
+                mc.SendCommend(hender);
+            }else if(code==500){
+                double d_balance =0.00;
+                byte[] hender = mc.getSendBodyHender(NO_MESSAGE_HANDER+8+1+16+8+1+4,133,10);
+                hender = mc.CmdLinked(hender,mc.common_getBCDByte(cId));
+                hender = mc.CmdLinked(hender,mc.hexStringToBytes(cjkId));
+                hender = mc.CmdLinked(hender,mc.common_getBCDByte("00000000000000000000000000000000"));
+                hender = mc.CmdLinked(hender,mc.common_getBCDByte(card));
+                hender = mc.CmdLinked(hender,new byte[]{0x03});
+                hender = mc.CmdLinked(hender,mc.common_getDoubleToByte(d_balance,100,4));
+
+                mc.SendCommend(hender);
+            }else if(code==502){
+                double d_balance =0.00;
+                byte[] hender = mc.getSendBodyHender(NO_MESSAGE_HANDER+8+1+16+8+1+4,133,10);
+                hender = mc.CmdLinked(hender,mc.common_getBCDByte(cId));
+                hender = mc.CmdLinked(hender,mc.hexStringToBytes(cjkId));
+                hender = mc.CmdLinked(hender,mc.common_getBCDByte("00000000000000000000000000000000"));
+                hender = mc.CmdLinked(hender,mc.common_getBCDByte(card));
+                hender = mc.CmdLinked(hender,new byte[]{0x01});
+                hender = mc.CmdLinked(hender,mc.common_getDoubleToByte(d_balance,100,4));
+
+                mc.SendCommend(hender);
+            }else if(code==503){
+                double d_balance =0.00;
+                byte[] hender = mc.getSendBodyHender(NO_MESSAGE_HANDER+8+1+16+8+1+4,133,10);
+                hender = mc.CmdLinked(hender,mc.common_getBCDByte(cId));
+                hender = mc.CmdLinked(hender,mc.hexStringToBytes(cjkId));
+                hender = mc.CmdLinked(hender,mc.common_getBCDByte("00000000000000000000000000000000"));
+                hender = mc.CmdLinked(hender,mc.common_getBCDByte(card));
+                hender = mc.CmdLinked(hender,new byte[]{0x02});
+                hender = mc.CmdLinked(hender,mc.common_getDoubleToByte(d_balance,100,4));
+
+                mc.SendCommend(hender);
+            }
+
+         */
+
+
+    }
+}

+ 76 - 0
src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/back/Message_back_getChargInfo.java

@@ -0,0 +1,76 @@
+package com.xyhy.chargpilemqttcluster.workThread.message.back;
+
+import com.alibaba.fastjson.JSONObject;
+import com.xyhy.chargpilemqttcluster.common.Common;
+import com.xyhy.chargpilemqttcluster.workThread.message.MessageCommon;
+
+
+public class Message_back_getChargInfo extends MessageCommon implements Runnable {
+
+
+    private JSONObject ob;
+
+    public JSONObject getOb() {
+        return ob;
+    }
+
+    public void setOb(JSONObject ob) {
+        this.ob = ob;
+    }
+
+    @Override
+    public void run() {
+
+        /*
+        String chargPileId = ob.getString("chargPileId");
+        String cId = chargPileId.substring(0,16);
+        String cjkId = chargPileId.substring(16,18);
+        String card = ob.getString("chargnumber");
+
+        MessageCommon mc = new MessageCommon();
+        sessionMap sm =  sessionMap.getSessionMap();
+        mc.setCtx( sm.getMessage(cId));
+
+        JSONObject jsonObject= ob;
+
+
+        byte[] hender = mc.getSendBodyHender(NO_MESSAGE_HANDER+56,133,1);
+
+        hender = mc.CmdLinked(hender,mc.common_getBCDByte(cId));
+        hender = mc.CmdLinked(hender,mc.hexStringToBytes(cjkId));
+        hender = mc.CmdLinked(hender,mc.hexStringToBytes("1234567891023456"));
+        Calendar ca = Calendar.getInstance();
+        ca.add(Calendar.MONTH,-4);
+        Date date_now = ca.getTime();
+        hender = mc.CmdLinked(hender,mc.Convert_date_to_cp56time2a(date_now));
+        ca.add(Calendar.MONTH,10);
+        date_now = ca.getTime();
+        hender = mc.CmdLinked(hender,mc.Convert_date_to_cp56time2a(date_now));
+
+        hender = mc.CmdLinked(hender,new byte[]{0x01});
+        hender = mc.CmdLinked(hender,new byte[]{0x00,0x00,0x00,0x00});
+
+        int sharp = (int)(jsonObject.getDouble("sharpChargPrice")*100);
+        int peak = (int)(jsonObject.getDouble("peakChargPrice")*100);
+        int flat = (int)(jsonObject.getDouble("flatChargPrice")*100);
+        int valley = (int)(jsonObject.getDouble("valleyChargPrice")*100);
+
+        int flatService = (int)(jsonObject.getDouble("flatServicePrice")*100);
+
+        hender = mc.CmdLinked(hender,mc.common_getDoubleToByte(sharp,1,4));
+        hender = mc.CmdLinked(hender,mc.common_getDoubleToByte(peak,1,4));
+        hender = mc.CmdLinked(hender,mc.common_getDoubleToByte(flat,1,4));
+        hender = mc.CmdLinked(hender,mc.common_getDoubleToByte(valley,1,4));
+        hender = mc.CmdLinked(hender,mc.common_getDoubleToByte(flatService,1,4));
+
+        hender[3] = mc.hexStringToBytes(Integer.toHexString(hender.length-3) )[0];
+
+
+        mc.SendCommend(hender);
+
+
+         */
+    }
+
+
+}

+ 75 - 0
src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/back/Message_back_settlement.java

@@ -0,0 +1,75 @@
+package com.xyhy.chargpilemqttcluster.workThread.message.back;
+
+import com.alibaba.fastjson.JSONObject;
+import com.xyhy.chargpilemqttcluster.common.Common;
+import com.xyhy.chargpilemqttcluster.workThread.message.MessageCommon;
+
+
+public class Message_back_settlement extends MessageCommon implements Runnable {
+
+
+    private JSONObject ob;
+
+    public JSONObject getOb() {
+        return ob;
+    }
+
+    public void setOb(JSONObject ob) {
+        this.ob = ob;
+    }
+
+    @Override
+    public void run() {
+        log.info("监听到队列返回:Listen_settlement "+ob.toJSONString());
+        /*
+        JSONObject result = ob.getJSONObject("result");
+
+
+        String chargPileId = result.getString("chargPileId");
+        String cId = chargPileId.substring(0,16);
+        String cjkId = chargPileId.substring(16,18);
+        String card = ob.getString("chargnumber");
+        String orderid = result.getString("orderid");
+
+        MessageCommon mc = new MessageCommon();
+        sessionMap sm =  sessionMap.getSessionMap();
+        mc.setCtx( sm.getMessage(cId));
+
+        try {
+
+
+            JSONObject js_return  = ob;
+            int code = js_return.getInteger("code");
+
+            if (code == 200)
+            {
+
+                byte[] hender = mc.getSendBodyHender(NO_MESSAGE_HANDER+8+1+16+1,133,9);
+                hender =mc.CmdLinked(hender,mc.common_getBCDByte(cId));
+                hender =mc.CmdLinked(hender,mc.hexStringToBytes(cjkId));
+                hender =mc.CmdLinked(hender,mc.common_getBCDByte(orderid));
+                hender =mc.CmdLinked(hender,new byte[]{0x01});
+
+
+                mc.SendCommend(hender);
+                return;
+            }
+
+        }catch (Exception ex)
+        {
+            log.error(ex.toString());
+        }
+
+        byte[] hender = mc.getSendBodyHender(NO_MESSAGE_HANDER+8+1+16+1,133,9);
+        hender =mc.CmdLinked(hender,mc.common_getBCDByte(cId));
+        hender =mc.CmdLinked(hender,mc.hexStringToBytes(cjkId));
+        hender =mc.CmdLinked(hender,mc.common_getBCDByte(orderid));
+        hender =mc.CmdLinked(hender,new byte[]{0x02});
+
+        mc.SendCommend(hender);
+
+
+         */
+        
+    }
+}

+ 70 - 0
src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/back/Message_back_syn_settlement.java

@@ -0,0 +1,70 @@
+package com.xyhy.chargpilemqttcluster.workThread.message.back;
+
+import com.alibaba.fastjson.JSONObject;
+import com.xyhy.chargpilemqttcluster.common.Common;
+import com.xyhy.chargpilemqttcluster.workThread.message.MessageCommon;
+
+
+public class Message_back_syn_settlement extends MessageCommon implements Runnable {
+
+
+    private JSONObject ob;
+
+    public JSONObject getOb() {
+        return ob;
+    }
+
+    public void setOb(JSONObject ob) {
+        this.ob = ob;
+    }
+
+    @Override
+    public void run() {
+
+        /*
+        String chargPileId = ob.getString("chargPileId");
+        String cId = chargPileId.substring(0,16);
+        String cjkId = chargPileId.substring(16,18);
+        String card = ob.getString("chargnumber");
+        String orderid = ob.getString("orderid");
+        int searchIndex = ob.getIntValue("searchIndex");
+
+        MessageCommon mc = new MessageCommon();
+        sessionMap sm =  sessionMap.getSessionMap();
+        mc.setCtx( sm.getMessage(cId));
+
+
+
+        try {
+
+
+            JSONObject js_return  = ob;
+            int code = js_return.getInteger("code");
+
+            if (code == 200)
+            {
+                byte[] hender = mc.getSendBodyHender(NO_MESSAGE_HANDER+4+16,133,13);
+
+                hender =mc.CmdLinked(hender,mc.common_getIntToByte(searchIndex,4));
+                hender =mc.CmdLinked(hender,mc.common_getBCDByte(orderid));
+                mc.SendCommend(hender);
+
+            }
+
+        }catch (Exception ex)
+        {
+            log.error(ex.toString());
+
+            byte[] hender = mc.getSendBodyHender(NO_MESSAGE_HANDER+4+16,133,13);
+            hender =mc.CmdLinked(hender,mc.common_getIntToByte(searchIndex,4));
+            hender =mc.CmdLinked(hender,mc.common_getBCDByte(orderid));
+
+            mc.SendCommend(hender);
+
+        }
+
+
+         */
+
+    }
+}

+ 63 - 0
src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/back/Message_startCP.java

@@ -0,0 +1,63 @@
+package com.xyhy.chargpilemqttcluster.workThread.message.back;
+
+import com.alibaba.fastjson.JSONObject;
+import com.xyhy.chargpilemqttcluster.common.mqtt.MqttServer;
+import com.xyhy.chargpilemqttcluster.common.redis.RedisHelp;
+import com.xyhy.chargpilemqttcluster.workThread.Job.Job_startCheck;
+import com.xyhy.chargpilemqttcluster.workThread.message.MessageCommon;
+
+
+public class Message_startCP extends MessageCommon implements Runnable {
+
+    private JSONObject ob;
+
+    public JSONObject getOb() {
+        return ob;
+    }
+
+    public void setOb(JSONObject ob) {
+        this.ob = ob;
+    }
+
+    @Override
+    public void run() {
+
+        log.info("后台启动充电桩:"+ob.toJSONString());
+
+
+        String sendtopic = getToClientTopic(ob,"03");
+        JSONObject returnObj = getSend(301);
+        String cp_id = ob.getString("cp_id");
+        String cp_index = ob.getString("cp_index");
+        String orderId = ob.getString("order_id");
+        returnObj.put("devId",cp_id);
+        returnObj.put("devType",ob.getString("03"));
+        returnObj.put("intId",cp_index);
+        returnObj.put("orderId",orderId);
+        returnObj.put("phone",ob.getString("iPhoneNo"));
+        int chargMethod = ob.getIntValue("chargMethod");
+        returnObj.put("chargMethod",chargMethod);
+        switch (chargMethod){
+            case 1:
+                returnObj.put("preChargValue",ob.getDoubleValue("preChargValue"));
+                break;
+            case 2:
+                returnObj.put("preChargTime",ob.getIntValue("preChargTime"));
+                break;
+            case 3:
+                returnObj.put("preChargPrice",ob.getDoubleValue("preChargPrice"));
+                break;
+        }
+
+        ob.put("step","starting");
+        RedisHelp.setOrderCheckVulue(cp_id+"-"+cp_index+"-"+orderId,ob.toJSONString());
+
+        MqttServer.getInstance().pushlish(sendtopic,returnObj.toJSONString());
+        Job_startCheck job = new Job_startCheck();
+        job.cdzid = cp_id;
+        job.cdjkid = cp_index;
+        job.orderId = orderId;
+        job.start();
+
+    }
+}

+ 51 - 0
src/main/java/com/xyhy/chargpilemqttcluster/workThread/message/back/Message_stopCP.java

@@ -0,0 +1,51 @@
+package com.xyhy.chargpilemqttcluster.workThread.message.back;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.xyhy.chargpilemqttcluster.common.Common;
+import com.xyhy.chargpilemqttcluster.common.mqtt.MqttServer;
+import com.xyhy.chargpilemqttcluster.common.redis.RedisHelp;
+import com.xyhy.chargpilemqttcluster.workThread.Job.Job_startCheck;
+import com.xyhy.chargpilemqttcluster.workThread.Job.Job_stopCheck;
+import com.xyhy.chargpilemqttcluster.workThread.message.MessageCommon;
+
+
+public class Message_stopCP extends MessageCommon implements Runnable {
+
+    private JSONObject ob;
+
+    public JSONObject getOb() {
+        return ob;
+    }
+
+    public void setOb(JSONObject ob) {
+        this.ob = ob;
+    }
+
+    @Override
+    public void run() {
+
+        log.info("后台停止充电桩:"+ob.toJSONString());
+
+        String sendtopic = getToClientTopic(ob,"03");
+        JSONObject returnObj = getSend(304);
+        String cp_id = ob.getString("cp_id");
+        String cp_index = ob.getString("cp_index");
+        String orderId = ob.getString("order_id");
+        returnObj.put("devId",cp_id);
+        returnObj.put("devType",ob.getString("03"));
+        returnObj.put("intId",cp_index);
+        returnObj.put("orderId",orderId);
+
+        ob.put("step","stoping");
+        RedisHelp.setOrderCheckVulue(cp_id+"-"+cp_index+"-"+orderId,ob.toJSONString());
+
+        MqttServer.getInstance().pushlish(sendtopic,returnObj.toJSONString());
+        Job_stopCheck job = new Job_stopCheck();
+        job.cdzid = cp_id;
+        job.cdjkid = cp_index;
+        job.orderId = orderId;
+        job.start();
+
+    }
+}

+ 60 - 0
src/main/resources/application.properties

@@ -0,0 +1,60 @@
+
+#######MQTT SERVER##########
+MQTT.server.port=10014
+MQTT.server.hostUrl=tcp://39.98.37.180:32449
+MQTT.server.hostName=mqtt_server
+MQTT.server.password=Root_1234@01max
+MQTT.server.timeout=10
+MQTT.server.keepAlive=60
+
+
+
+#######MQTT Subscribe##########
+MQTT.server.Subscribe=$queue/toServer/mqtt/101,$queue/toServer/mqtt/103,$queue/toServer/mqtt/105,$queue/toServer/mqtt/201,$queue/toServer/mqtt/302,$queue/toServer/mqtt/303,$queue/toServer/mqtt/305,$queue/toServer/mqtt/306,$queue/toServer/mqtt/309,$queue/toServer/mqtt/401,$queue/toServer/mqtt/501,$queue/toServer/mqtt/602,$queue/toServer/mqtt/603,$queue/toServer/mqtt/604
+
+
+#######WEB SERVER###########
+server.port=20001
+
+
+#######REDIS SERVER#########
+redis.server.ip=39.98.37.180
+redis.server.port=6397
+redis.server.db=11
+redis.server.pw=boxun91#Rs
+reids.server.MaxTotal=50
+reids.server.MaxIdle=20
+reids.server.MinIdle=10
+reids.server.MaxWaitMillis=3000
+reids.server.TestOnBorrow=true
+reids.server.Timeout=3000
+
+
+
+#######Charging interface service###########
+charging.interface.api.port=9999
+charging.interface.api.realtime=/realtime/restapi/realtime/
+charging.interface.api.pileLog=/web/restapi/pileLog/
+charging.interface.api.url.produce=https://cdglyy.pjnes.com/
+charging.interface.api.url.test=https://jqcs.pjnes.com/
+charging.interface.api.url.status=debug
+
+
+#logging.level.root=error
+
+#######rabbitMQ###########
+rabbitMq.server.ip=39.98.37.180
+rabbitMq.server.port=5673
+rabbitMq.server.vHost=/
+rabbitMq.server.userName=rabbit
+rabbitMq.server.passWord=rabbit@blhld
+rabbitMq.server.queue.toImplServer=TOIMPLSERVERQUEUE
+rabbitMq.server.queue.toMQTTServer=TOMQTTSERVERQUEUE
+rabbitMq.server.queue.toRealTimeServer=TOREALTIMESERVEQUEUE
+
+rabbitMq.server.TOMQTTSERVEREXCHANGE=TOMQTTSERVEREXCHANGE
+rabbitMq.server.TOIMPLSERVEREXCHANGE=TOIMPLSERVEREXCHANGE
+rabbitMq.server.TOREALTIMEDATASERVEREXCHANGE=TOREALTIMEDATASERVEREXCHANGE
+
+#######log projectName###########
+project.name=chargpileMqtt

+ 94 - 0
src/main/resources/logback-spring.xml

@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+    <springProperty scope="context" name="logName" source="project.name" defaultValue="default"/>
+    <!-- 日志存放路径 -->
+    <property name="log.path" value="/var/log/${logName}/logs" />
+    <!-- 日志输出格式 -->
+    <property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n" />
+
+    <!-- 控制台输出 -->
+    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>${log.pattern}</pattern>
+        </encoder>
+    </appender>
+
+    <!-- 系统日志输出 -->
+    <appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${log.path}/sys-info.log</file>
+        <!-- 循环政策:基于时间创建日志文件 -->
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <!-- 日志文件名格式 -->
+            <fileNamePattern>${log.path}/sys-info.%d{yyyy-MM-dd}.log</fileNamePattern>
+            <!-- 日志最大的历史 60天 -->
+            <maxHistory>60</maxHistory>
+        </rollingPolicy>
+        <encoder>
+            <pattern>${log.pattern}</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.LevelFilter">
+            <!-- 过滤的级别 -->
+            <level>INFO</level>
+            <!-- 匹配时的操作:接收(记录) -->
+            <onMatch>ACCEPT</onMatch>
+            <!-- 不匹配时的操作:拒绝(不记录) -->
+            <onMismatch>DENY</onMismatch>
+        </filter>
+    </appender>
+
+    <appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${log.path}/sys-error.log</file>
+        <!-- 循环政策:基于时间创建日志文件 -->
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <!-- 日志文件名格式 -->
+            <fileNamePattern>${log.path}/sys-error.%d{yyyy-MM-dd}.log</fileNamePattern>
+            <!-- 日志最大的历史 60天 -->
+            <maxHistory>60</maxHistory>
+        </rollingPolicy>
+        <encoder>
+            <pattern>${log.pattern}</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.LevelFilter">
+            <!-- 过滤的级别 -->
+            <level>ERROR</level>
+            <!-- 匹配时的操作:接收(记录) -->
+            <onMatch>ACCEPT</onMatch>
+            <!-- 不匹配时的操作:拒绝(不记录) -->
+            <onMismatch>DENY</onMismatch>
+        </filter>
+    </appender>
+
+    <!-- 用户访问日志输出  -->
+    <appender name="sys-user" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${log.path}/sys-user.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <!-- 按天回滚 daily -->
+            <fileNamePattern>${log.path}/sys-user.%d{yyyy-MM-dd}.log</fileNamePattern>
+            <!-- 日志最大的历史 60天 -->
+            <maxHistory>60</maxHistory>
+        </rollingPolicy>
+        <encoder>
+            <pattern>${log.pattern}</pattern>
+        </encoder>
+    </appender>
+
+    <!-- 系统模块日志级别控制  -->
+    <logger name="com.xyhy" level="info" />
+    <!-- Spring日志级别控制  -->
+    <logger name="org.springframework" level="warn" />
+
+    <root level="info">
+        <appender-ref ref="console" />
+    </root>
+
+    <!--系统操作日志-->
+    <root level="info">
+        <appender-ref ref="file_info" />
+        <appender-ref ref="file_error" />
+    </root>
+
+    <!--系统用户操作日志-->
+    <logger name="sys-user" level="info">
+        <appender-ref ref="sys-user"/>
+    </logger>
+</configuration>