车辆数据推送-暂
This commit is contained in:
		| @ -1,9 +1,17 @@ | ||||
| package com.sczx.sync.config; | ||||
|  | ||||
| import lombok.extern.slf4j.Slf4j; | ||||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||||
| import com.sczx.sync.dto.DeviceData; | ||||
| import org.springframework.beans.factory.annotation.Value; | ||||
| import org.springframework.kafka.annotation.KafkaListener; | ||||
| import org.springframework.stereotype.Component; | ||||
| import org.springframework.beans.factory.annotation.Autowired; | ||||
| import org.springframework.web.client.RestTemplate; | ||||
|  | ||||
| import java.time.LocalDateTime; | ||||
| import java.util.HashMap; | ||||
| import java.util.Map; | ||||
|  | ||||
| @Slf4j | ||||
| @Component | ||||
| @ -11,6 +19,8 @@ public class KafkaConsumer { | ||||
|  | ||||
|     @Autowired | ||||
|     private ThreadPoolConfig threadPoolConfig; | ||||
|     @Value("${dataPush.deviceDataUrl}") | ||||
|     private String deviceDataUrl; | ||||
|  | ||||
|     @KafkaListener(topics = "jt808_forward_prod") | ||||
|     public void listen(String message) { | ||||
| @ -24,6 +34,50 @@ public class KafkaConsumer { | ||||
|         // 实际的消息处理逻辑 | ||||
|          | ||||
|         log.info("Received message: " + message); | ||||
|         // 执行耗时操作... | ||||
|         try { | ||||
|             ObjectMapper objectMapper = new ObjectMapper(); | ||||
|             DeviceData data = objectMapper.readValue(message, DeviceData.class); | ||||
|  | ||||
|             // 检查时间戳是否在1分钟以内 | ||||
|             LocalDateTime messageTime = data.getTimestamp(); | ||||
|             LocalDateTime currentTime = LocalDateTime.now(); | ||||
|  | ||||
|             long minutesDiff = java.time.Duration.between(messageTime, currentTime).toMinutes(); | ||||
|  | ||||
|             if (minutesDiff > 1) { | ||||
|                 // 消息超过1分钟,跳过处理 | ||||
|                 log.info("消息超过1分钟,跳过处理: " + data.getTs()); | ||||
|                 return; | ||||
|             } | ||||
|  | ||||
|             // 处理1分钟以内的消息 | ||||
|             log.info("处理1分钟以内的消息: " + data.getClientId() + " at " + data.getTs()); | ||||
|  | ||||
|             // 调用接口处理逻辑 | ||||
|             callExternalApi(data); | ||||
|         } catch (Exception e) { | ||||
|             log.error("Failed to process message: " + message); | ||||
|             e.printStackTrace(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|  | ||||
|     private void callExternalApi(DeviceData data) { | ||||
|         try { | ||||
|             // 构造请求参数 | ||||
|             Map<String, Object> requestData = new HashMap<>(); | ||||
|             requestData.put("clientId", data.getClientId()); | ||||
|             requestData.put("latitude", data.getLat()); | ||||
|             requestData.put("longitude", data.getLng()); | ||||
|             requestData.put("fuelStatus", data.getFuelStatus()); | ||||
|  | ||||
|             // 发送HTTP请求调用添加接口 | ||||
|             RestTemplate restTemplate = new RestTemplate(); | ||||
|             restTemplate.postForObject(deviceDataUrl, requestData, String.class); | ||||
|  | ||||
|         } catch (Exception e) { | ||||
|             log.error("Failed to call external API for device: " + data.getClientId()); | ||||
|             e.printStackTrace(); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
							
								
								
									
										259
									
								
								src/main/java/com/sczx/sync/dto/DeviceData.java
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										259
									
								
								src/main/java/com/sczx/sync/dto/DeviceData.java
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,259 @@ | ||||
| package com.sczx.sync.dto; | ||||
|  | ||||
| import java.time.LocalDateTime; | ||||
| import java.time.format.DateTimeFormatter; | ||||
|  | ||||
| public class DeviceData { | ||||
|  | ||||
|     private String longitudeDirection; | ||||
|     private String armStatus; | ||||
|     private int altitude; | ||||
|     private String beidouUsed; | ||||
|     private int speed; | ||||
|     private String locationStatus; | ||||
|     private int providerId; | ||||
|     private String chargingStatus; | ||||
|     private String hallStatus; | ||||
|     private double lat; | ||||
|     private String deviceWorkStatus; | ||||
|     private int direction; | ||||
|     private int statusBit; | ||||
|     private int warnBit; | ||||
|     private String clientId; | ||||
|     private double lng; | ||||
|     private String alarms; | ||||
|     private String accStatus; | ||||
|     private String galileoUsed; | ||||
|     private String movingStatus; | ||||
|     private String latitudeDirection; | ||||
|     private String softAccStatus; | ||||
|     private String glonassUsed; | ||||
|     private String gpsUsed; | ||||
|     private String fuelStatus; | ||||
|     private String fakePowerStatus; | ||||
|     private String ts; | ||||
|  | ||||
|     // 时间格式化器 | ||||
|     private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); | ||||
|  | ||||
|     // 获取 LocalDateTime 对象 | ||||
|     public LocalDateTime getTimestamp() { | ||||
|         return LocalDateTime.parse(this.ts, FORMATTER); | ||||
|     } | ||||
|  | ||||
|     public String getLongitudeDirection() { | ||||
|         return longitudeDirection; | ||||
|     } | ||||
|  | ||||
|     public void setLongitudeDirection(String longitudeDirection) { | ||||
|         this.longitudeDirection = longitudeDirection; | ||||
|     } | ||||
|  | ||||
|     public String getArmStatus() { | ||||
|         return armStatus; | ||||
|     } | ||||
|  | ||||
|     public void setArmStatus(String armStatus) { | ||||
|         this.armStatus = armStatus; | ||||
|     } | ||||
|  | ||||
|     public int getAltitude() { | ||||
|         return altitude; | ||||
|     } | ||||
|  | ||||
|     public void setAltitude(int altitude) { | ||||
|         this.altitude = altitude; | ||||
|     } | ||||
|  | ||||
|     public String getBeidouUsed() { | ||||
|         return beidouUsed; | ||||
|     } | ||||
|  | ||||
|     public void setBeidouUsed(String beidouUsed) { | ||||
|         this.beidouUsed = beidouUsed; | ||||
|     } | ||||
|  | ||||
|     public int getSpeed() { | ||||
|         return speed; | ||||
|     } | ||||
|  | ||||
|     public void setSpeed(int speed) { | ||||
|         this.speed = speed; | ||||
|     } | ||||
|  | ||||
|     public String getLocationStatus() { | ||||
|         return locationStatus; | ||||
|     } | ||||
|  | ||||
|     public void setLocationStatus(String locationStatus) { | ||||
|         this.locationStatus = locationStatus; | ||||
|     } | ||||
|  | ||||
|     public int getProviderId() { | ||||
|         return providerId; | ||||
|     } | ||||
|  | ||||
|     public void setProviderId(int providerId) { | ||||
|         this.providerId = providerId; | ||||
|     } | ||||
|  | ||||
|     public String getChargingStatus() { | ||||
|         return chargingStatus; | ||||
|     } | ||||
|  | ||||
|     public void setChargingStatus(String chargingStatus) { | ||||
|         this.chargingStatus = chargingStatus; | ||||
|     } | ||||
|  | ||||
|     public String getHallStatus() { | ||||
|         return hallStatus; | ||||
|     } | ||||
|  | ||||
|     public void setHallStatus(String hallStatus) { | ||||
|         this.hallStatus = hallStatus; | ||||
|     } | ||||
|  | ||||
|     public double getLat() { | ||||
|         return lat; | ||||
|     } | ||||
|  | ||||
|     public void setLat(double lat) { | ||||
|         this.lat = lat; | ||||
|     } | ||||
|  | ||||
|     public String getDeviceWorkStatus() { | ||||
|         return deviceWorkStatus; | ||||
|     } | ||||
|  | ||||
|     public void setDeviceWorkStatus(String deviceWorkStatus) { | ||||
|         this.deviceWorkStatus = deviceWorkStatus; | ||||
|     } | ||||
|  | ||||
|     public int getDirection() { | ||||
|         return direction; | ||||
|     } | ||||
|  | ||||
|     public void setDirection(int direction) { | ||||
|         this.direction = direction; | ||||
|     } | ||||
|  | ||||
|     public int getStatusBit() { | ||||
|         return statusBit; | ||||
|     } | ||||
|  | ||||
|     public void setStatusBit(int statusBit) { | ||||
|         this.statusBit = statusBit; | ||||
|     } | ||||
|  | ||||
|     public int getWarnBit() { | ||||
|         return warnBit; | ||||
|     } | ||||
|  | ||||
|     public void setWarnBit(int warnBit) { | ||||
|         this.warnBit = warnBit; | ||||
|     } | ||||
|  | ||||
|     public String getClientId() { | ||||
|         return clientId; | ||||
|     } | ||||
|  | ||||
|     public void setClientId(String clientId) { | ||||
|         this.clientId = clientId; | ||||
|     } | ||||
|  | ||||
|     public double getLng() { | ||||
|         return lng; | ||||
|     } | ||||
|  | ||||
|     public void setLng(double lng) { | ||||
|         this.lng = lng; | ||||
|     } | ||||
|  | ||||
|     public String getAlarms() { | ||||
|         return alarms; | ||||
|     } | ||||
|  | ||||
|     public void setAlarms(String alarms) { | ||||
|         this.alarms = alarms; | ||||
|     } | ||||
|  | ||||
|     public String getAccStatus() { | ||||
|         return accStatus; | ||||
|     } | ||||
|  | ||||
|     public void setAccStatus(String accStatus) { | ||||
|         this.accStatus = accStatus; | ||||
|     } | ||||
|  | ||||
|     public String getGalileoUsed() { | ||||
|         return galileoUsed; | ||||
|     } | ||||
|  | ||||
|     public void setGalileoUsed(String galileoUsed) { | ||||
|         this.galileoUsed = galileoUsed; | ||||
|     } | ||||
|  | ||||
|     public String getMovingStatus() { | ||||
|         return movingStatus; | ||||
|     } | ||||
|  | ||||
|     public void setMovingStatus(String movingStatus) { | ||||
|         this.movingStatus = movingStatus; | ||||
|     } | ||||
|  | ||||
|     public String getLatitudeDirection() { | ||||
|         return latitudeDirection; | ||||
|     } | ||||
|  | ||||
|     public void setLatitudeDirection(String latitudeDirection) { | ||||
|         this.latitudeDirection = latitudeDirection; | ||||
|     } | ||||
|  | ||||
|     public String getSoftAccStatus() { | ||||
|         return softAccStatus; | ||||
|     } | ||||
|  | ||||
|     public void setSoftAccStatus(String softAccStatus) { | ||||
|         this.softAccStatus = softAccStatus; | ||||
|     } | ||||
|  | ||||
|     public String getGlonassUsed() { | ||||
|         return glonassUsed; | ||||
|     } | ||||
|  | ||||
|     public void setGlonassUsed(String glonassUsed) { | ||||
|         this.glonassUsed = glonassUsed; | ||||
|     } | ||||
|  | ||||
|     public String getGpsUsed() { | ||||
|         return gpsUsed; | ||||
|     } | ||||
|  | ||||
|     public void setGpsUsed(String gpsUsed) { | ||||
|         this.gpsUsed = gpsUsed; | ||||
|     } | ||||
|  | ||||
|     public String getFuelStatus() { | ||||
|         return fuelStatus; | ||||
|     } | ||||
|  | ||||
|     public void setFuelStatus(String fuelStatus) { | ||||
|         this.fuelStatus = fuelStatus; | ||||
|     } | ||||
|  | ||||
|     public String getFakePowerStatus() { | ||||
|         return fakePowerStatus; | ||||
|     } | ||||
|  | ||||
|     public void setFakePowerStatus(String fakePowerStatus) { | ||||
|         this.fakePowerStatus = fakePowerStatus; | ||||
|     } | ||||
|  | ||||
|     public String getTs() { | ||||
|         return ts; | ||||
|     } | ||||
|  | ||||
|     public void setTs(String ts) { | ||||
|         this.ts = ts; | ||||
|     } | ||||
| } | ||||
| @ -2,3 +2,6 @@ | ||||
| spring: | ||||
|   application: | ||||
|     name: sczx-sync # 微服务名称 | ||||
|  | ||||
| dataPush: | ||||
|   deviceDataUrl: http://127.0.0.1:8098/sczxWeb/fuleStatusApi/device-data | ||||
		Reference in New Issue
	
	Block a user
	 19173159168
					19173159168