From 454ae20370de98aa66fcdf679569b31ac5fd6672 Mon Sep 17 00:00:00 2001 From: 19173159168 Date: Wed, 15 Oct 2025 00:18:51 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BD=A6=E8=BE=86=E6=95=B0=E6=8D=AE=E6=8E=A8?= =?UTF-8?q?=E9=80=81-=E6=9A=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/sczx/sync/config/KafkaConsumer.java | 58 +++- .../java/com/sczx/sync/dto/DeviceData.java | 259 ++++++++++++++++++ src/main/resources/application.yml | 5 +- 3 files changed, 319 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/sczx/sync/dto/DeviceData.java diff --git a/src/main/java/com/sczx/sync/config/KafkaConsumer.java b/src/main/java/com/sczx/sync/config/KafkaConsumer.java index 4e5cf0a..196527e 100644 --- a/src/main/java/com/sczx/sync/config/KafkaConsumer.java +++ b/src/main/java/com/sczx/sync/config/KafkaConsumer.java @@ -1,9 +1,17 @@ package com.sczx.sync.config; import lombok.extern.slf4j.Slf4j; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.sczx.sync.dto.DeviceData; +import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.client.RestTemplate; + +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; @Slf4j @Component @@ -11,6 +19,8 @@ public class KafkaConsumer { @Autowired private ThreadPoolConfig threadPoolConfig; + @Value("${dataPush.deviceDataUrl}") + private String deviceDataUrl; @KafkaListener(topics = "jt808_forward_prod") public void listen(String message) { @@ -22,8 +32,52 @@ public class KafkaConsumer { private void processMessage(String message) { // 实际的消息处理逻辑 - + log.info("Received message: " + message); - // 执行耗时操作... + try { + ObjectMapper objectMapper = new ObjectMapper(); + DeviceData data = objectMapper.readValue(message, DeviceData.class); + + // 检查时间戳是否在1分钟以内 + LocalDateTime messageTime = data.getTimestamp(); + LocalDateTime currentTime = LocalDateTime.now(); + + long minutesDiff = java.time.Duration.between(messageTime, currentTime).toMinutes(); + + if (minutesDiff > 1) { + // 消息超过1分钟,跳过处理 + log.info("消息超过1分钟,跳过处理: " + data.getTs()); + return; + } + + // 处理1分钟以内的消息 + log.info("处理1分钟以内的消息: " + data.getClientId() + " at " + data.getTs()); + + // 调用接口处理逻辑 + callExternalApi(data); + } catch (Exception e) { + log.error("Failed to process message: " + message); + e.printStackTrace(); + } + } + + + private void callExternalApi(DeviceData data) { + try { + // 构造请求参数 + Map requestData = new HashMap<>(); + requestData.put("clientId", data.getClientId()); + requestData.put("latitude", data.getLat()); + requestData.put("longitude", data.getLng()); + requestData.put("fuelStatus", data.getFuelStatus()); + + // 发送HTTP请求调用添加接口 + RestTemplate restTemplate = new RestTemplate(); + restTemplate.postForObject(deviceDataUrl, requestData, String.class); + + } catch (Exception e) { + log.error("Failed to call external API for device: " + data.getClientId()); + e.printStackTrace(); + } } } diff --git a/src/main/java/com/sczx/sync/dto/DeviceData.java b/src/main/java/com/sczx/sync/dto/DeviceData.java new file mode 100644 index 0000000..a18026b --- /dev/null +++ b/src/main/java/com/sczx/sync/dto/DeviceData.java @@ -0,0 +1,259 @@ +package com.sczx.sync.dto; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +public class DeviceData { + + private String longitudeDirection; + private String armStatus; + private int altitude; + private String beidouUsed; + private int speed; + private String locationStatus; + private int providerId; + private String chargingStatus; + private String hallStatus; + private double lat; + private String deviceWorkStatus; + private int direction; + private int statusBit; + private int warnBit; + private String clientId; + private double lng; + private String alarms; + private String accStatus; + private String galileoUsed; + private String movingStatus; + private String latitudeDirection; + private String softAccStatus; + private String glonassUsed; + private String gpsUsed; + private String fuelStatus; + private String fakePowerStatus; + private String ts; + + // 时间格式化器 + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + // 获取 LocalDateTime 对象 + public LocalDateTime getTimestamp() { + return LocalDateTime.parse(this.ts, FORMATTER); + } + + public String getLongitudeDirection() { + return longitudeDirection; + } + + public void setLongitudeDirection(String longitudeDirection) { + this.longitudeDirection = longitudeDirection; + } + + public String getArmStatus() { + return armStatus; + } + + public void setArmStatus(String armStatus) { + this.armStatus = armStatus; + } + + public int getAltitude() { + return altitude; + } + + public void setAltitude(int altitude) { + this.altitude = altitude; + } + + public String getBeidouUsed() { + return beidouUsed; + } + + public void setBeidouUsed(String beidouUsed) { + this.beidouUsed = beidouUsed; + } + + public int getSpeed() { + return speed; + } + + public void setSpeed(int speed) { + this.speed = speed; + } + + public String getLocationStatus() { + return locationStatus; + } + + public void setLocationStatus(String locationStatus) { + this.locationStatus = locationStatus; + } + + public int getProviderId() { + return providerId; + } + + public void setProviderId(int providerId) { + this.providerId = providerId; + } + + public String getChargingStatus() { + return chargingStatus; + } + + public void setChargingStatus(String chargingStatus) { + this.chargingStatus = chargingStatus; + } + + public String getHallStatus() { + return hallStatus; + } + + public void setHallStatus(String hallStatus) { + this.hallStatus = hallStatus; + } + + public double getLat() { + return lat; + } + + public void setLat(double lat) { + this.lat = lat; + } + + public String getDeviceWorkStatus() { + return deviceWorkStatus; + } + + public void setDeviceWorkStatus(String deviceWorkStatus) { + this.deviceWorkStatus = deviceWorkStatus; + } + + public int getDirection() { + return direction; + } + + public void setDirection(int direction) { + this.direction = direction; + } + + public int getStatusBit() { + return statusBit; + } + + public void setStatusBit(int statusBit) { + this.statusBit = statusBit; + } + + public int getWarnBit() { + return warnBit; + } + + public void setWarnBit(int warnBit) { + this.warnBit = warnBit; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public double getLng() { + return lng; + } + + public void setLng(double lng) { + this.lng = lng; + } + + public String getAlarms() { + return alarms; + } + + public void setAlarms(String alarms) { + this.alarms = alarms; + } + + public String getAccStatus() { + return accStatus; + } + + public void setAccStatus(String accStatus) { + this.accStatus = accStatus; + } + + public String getGalileoUsed() { + return galileoUsed; + } + + public void setGalileoUsed(String galileoUsed) { + this.galileoUsed = galileoUsed; + } + + public String getMovingStatus() { + return movingStatus; + } + + public void setMovingStatus(String movingStatus) { + this.movingStatus = movingStatus; + } + + public String getLatitudeDirection() { + return latitudeDirection; + } + + public void setLatitudeDirection(String latitudeDirection) { + this.latitudeDirection = latitudeDirection; + } + + public String getSoftAccStatus() { + return softAccStatus; + } + + public void setSoftAccStatus(String softAccStatus) { + this.softAccStatus = softAccStatus; + } + + public String getGlonassUsed() { + return glonassUsed; + } + + public void setGlonassUsed(String glonassUsed) { + this.glonassUsed = glonassUsed; + } + + public String getGpsUsed() { + return gpsUsed; + } + + public void setGpsUsed(String gpsUsed) { + this.gpsUsed = gpsUsed; + } + + public String getFuelStatus() { + return fuelStatus; + } + + public void setFuelStatus(String fuelStatus) { + this.fuelStatus = fuelStatus; + } + + public String getFakePowerStatus() { + return fakePowerStatus; + } + + public void setFakePowerStatus(String fakePowerStatus) { + this.fakePowerStatus = fakePowerStatus; + } + + public String getTs() { + return ts; + } + + public void setTs(String ts) { + this.ts = ts; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index f74bfb0..409b2a4 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,4 +1,7 @@ spring: application: - name: sczx-sync # 微服务名称 \ No newline at end of file + name: sczx-sync # 微服务名称 + +dataPush: + deviceDataUrl: http://127.0.0.1:8098/sczxWeb/fuleStatusApi/device-data \ No newline at end of file