kafka消费车辆实时数据

This commit is contained in:
2025-10-14 22:18:20 +08:00
parent 78be6bad44
commit 5257539727
7 changed files with 196 additions and 42 deletions

View File

@ -13,7 +13,7 @@
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<spring-boot.version>2.3.12.RELEASE</spring-boot.version> <spring-boot.version>2.7.18</spring-boot.version>
<spring-cloud.version>Hoxton.SR12</spring-cloud.version> <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
<spring-cloud-alibaba.version>2.2.9.RELEASE</spring-cloud-alibaba.version> <spring-cloud-alibaba.version>2.2.9.RELEASE</spring-cloud-alibaba.version>
</properties> </properties>
@ -179,6 +179,13 @@
<artifactId>fastjson</artifactId> <artifactId>fastjson</artifactId>
<version>1.2.83</version> <version>1.2.83</version>
</dependency> </dependency>
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies> </dependencies>
<!-- Build Configuration --> <!-- Build Configuration -->

View File

@ -7,7 +7,6 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.Environment;
import org.springframework.retry.annotation.EnableRetry; import org.springframework.retry.annotation.EnableRetry;
import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.annotation.EnableTransactionManagement;
@ -25,7 +24,5 @@ public class Application {
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
ConfigurableApplicationContext context = SpringApplication.run(Application.class, args); ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
Environment environment = context.getBean(Environment.class);
log.info("启动成功后端服务API地址http://{}:{}/swagger-ui.html", ComputerInfo.getIpAddr(), environment.getProperty("server.port"));
} }
} }

View File

@ -1,38 +1,38 @@
package com.sczx.sync.Task; //package com.sczx.sync.Task;
//
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; //import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.sczx.sync.mapper.DataReceiveRecordMapper; //import com.sczx.sync.mapper.DataReceiveRecordMapper;
import com.sczx.sync.mapper.OrderBatteryInfoMapper; //import com.sczx.sync.mapper.OrderBatteryInfoMapper;
import com.sczx.sync.po.DataReceivePo; //import com.sczx.sync.po.DataReceivePo;
import com.sczx.sync.service.impl.SendDataServiceImpl; //import com.sczx.sync.service.impl.SendDataServiceImpl;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled; //import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; //import org.springframework.stereotype.Component;
//
import java.util.List; //import java.util.List;
//
// 定时任务类 //// 定时任务类
@Component //@Component
public class DataResendTask { //public class DataResendTask {
//
@Autowired // @Autowired
private SendDataServiceImpl sendDataService; // private SendDataServiceImpl sendDataService;
//
@Autowired // @Autowired
private DataReceiveRecordMapper dataReceiveRecordMapper; // private DataReceiveRecordMapper dataReceiveRecordMapper;
//
@Autowired // @Autowired
private OrderBatteryInfoMapper orderBatteryInfoMapper; // private OrderBatteryInfoMapper orderBatteryInfoMapper;
//
@Scheduled(fixedRate = 15000) // @Scheduled(fixedRate = 15000)
public void resendFailedData() { // public void resendFailedData() {
// 查询所有发送失败的记录 // // 查询所有发送失败的记录
List<DataReceivePo> failedRecords = dataReceiveRecordMapper.selectList(new QueryWrapper<DataReceivePo>().eq("status", 3).eq("data_type", "batteryorder")); // List<DataReceivePo> failedRecords = dataReceiveRecordMapper.selectList(new QueryWrapper<DataReceivePo>().eq("status", 3).eq("data_type", "batteryorder"));
for (DataReceivePo record : failedRecords) { // for (DataReceivePo record : failedRecords) {
String status = orderBatteryInfoMapper.selectOrderStatus(record.getCid()); // String status = orderBatteryInfoMapper.selectOrderStatus(record.getCid());
if (status.equals("RENT_ING")) { // if (status.equals("RENT_ING")) {
sendDataService.retryForward(record.getId()); // sendDataService.retryForward(record.getId());
} // }
} // }
} // }
} //}

View File

@ -0,0 +1,26 @@
package com.sczx.sync.config;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.beans.factory.annotation.Autowired;
@Component
public class KafkaConsumer {
@Autowired
private ThreadPoolConfig threadPoolConfig;
@KafkaListener(topics = "jt808_forward_prod")
public void listen(String message) {
// 将消息处理任务提交到线程池
threadPoolConfig.kafkaMessageExecutor().execute(() -> {
processMessage(message);
});
}
private void processMessage(String message) {
// 实际的消息处理逻辑
System.out.println("Processing message: " + message);
// 执行耗时操作...
}
}

View File

@ -0,0 +1,102 @@
package com.sczx.sync.config;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.properties.sasl.jaas.config}")
private String kafkaPropertiesConfig;
@Bean
public DefaultKafkaConsumerFactory<String, String> consumerFactory() throws IOException {
Map<String, Object> props = new HashMap<>();
// 设置接入点
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 设置SSL信任库
InputStream truststoreStream = KafkaConsumerConfig.class.getClassLoader().getResourceAsStream("mix.4096.client.truststore.jks");
if (truststoreStream != null) {
Path tempFile = Files.createTempFile("truststore", ".jks");
Files.copy(truststoreStream, tempFile, StandardCopyOption.REPLACE_EXISTING);
props.put("ssl.truststore.location", tempFile.toString());
} else {
System.err.println("Truststore not found in classpath");
}
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
// SASL 配置
String saslMechanism = "PLAIN";
props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
props.put("sasl.jaas.config",kafkaPropertiesConfig);
props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
// 明确指定反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
// 修复点:显式指定泛型类型
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
try {
factory.setConsumerFactory(consumerFactory());
} catch (IOException e) {
throw new RuntimeException("Failed to create Kafka consumer factory", e);
}
return factory;
}
}

View File

@ -0,0 +1,22 @@
package com.sczx.sync.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync
public class ThreadPoolConfig {
public Executor kafkaMessageExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("kafka-message-processor-");
executor.initialize();
return executor;
}
}

Binary file not shown.