diff --git a/pom.xml b/pom.xml index c9e525b..8944a80 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ 1.8 - 2.3.12.RELEASE + 2.7.18 Hoxton.SR12 2.2.9.RELEASE @@ -179,6 +179,13 @@ fastjson 1.2.83 + + + + + org.springframework.kafka + spring-kafka + diff --git a/src/main/java/com/sczx/sync/Application.java b/src/main/java/com/sczx/sync/Application.java index ae10340..7047eaa 100644 --- a/src/main/java/com/sczx/sync/Application.java +++ b/src/main/java/com/sczx/sync/Application.java @@ -7,7 +7,6 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.core.env.Environment; import org.springframework.retry.annotation.EnableRetry; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.transaction.annotation.EnableTransactionManagement; @@ -25,7 +24,5 @@ public class Application { public static void main(String[] args) throws IOException { ConfigurableApplicationContext context = SpringApplication.run(Application.class, args); - Environment environment = context.getBean(Environment.class); - log.info("启动成功,后端服务API地址:http://{}:{}/swagger-ui.html", ComputerInfo.getIpAddr(), environment.getProperty("server.port")); } } diff --git a/src/main/java/com/sczx/sync/Task/DataResendTask.java b/src/main/java/com/sczx/sync/Task/DataResendTask.java index 3ceea81..45df074 100644 --- a/src/main/java/com/sczx/sync/Task/DataResendTask.java +++ b/src/main/java/com/sczx/sync/Task/DataResendTask.java @@ -1,38 +1,38 @@ -package com.sczx.sync.Task; - -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.sczx.sync.mapper.DataReceiveRecordMapper; -import com.sczx.sync.mapper.OrderBatteryInfoMapper; -import com.sczx.sync.po.DataReceivePo; -import com.sczx.sync.service.impl.SendDataServiceImpl; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.util.List; - -// 定时任务类 -@Component -public class DataResendTask { - - @Autowired - private SendDataServiceImpl sendDataService; - - @Autowired - private DataReceiveRecordMapper dataReceiveRecordMapper; - - @Autowired - private OrderBatteryInfoMapper orderBatteryInfoMapper; - - @Scheduled(fixedRate = 15000) - public void resendFailedData() { - // 查询所有发送失败的记录 - List failedRecords = dataReceiveRecordMapper.selectList(new QueryWrapper().eq("status", 3).eq("data_type", "batteryorder")); - for (DataReceivePo record : failedRecords) { - String status = orderBatteryInfoMapper.selectOrderStatus(record.getCid()); - if (status.equals("RENT_ING")) { - sendDataService.retryForward(record.getId()); - } - } - } -} +//package com.sczx.sync.Task; +// +//import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +//import com.sczx.sync.mapper.DataReceiveRecordMapper; +//import com.sczx.sync.mapper.OrderBatteryInfoMapper; +//import com.sczx.sync.po.DataReceivePo; +//import com.sczx.sync.service.impl.SendDataServiceImpl; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.scheduling.annotation.Scheduled; +//import org.springframework.stereotype.Component; +// +//import java.util.List; +// +//// 定时任务类 +//@Component +//public class DataResendTask { +// +// @Autowired +// private SendDataServiceImpl sendDataService; +// +// @Autowired +// private DataReceiveRecordMapper dataReceiveRecordMapper; +// +// @Autowired +// private OrderBatteryInfoMapper orderBatteryInfoMapper; +// +// @Scheduled(fixedRate = 15000) +// public void resendFailedData() { +// // 查询所有发送失败的记录 +// List failedRecords = dataReceiveRecordMapper.selectList(new QueryWrapper().eq("status", 3).eq("data_type", "batteryorder")); +// for (DataReceivePo record : failedRecords) { +// String status = orderBatteryInfoMapper.selectOrderStatus(record.getCid()); +// if (status.equals("RENT_ING")) { +// sendDataService.retryForward(record.getId()); +// } +// } +// } +//} diff --git a/src/main/java/com/sczx/sync/config/KafkaConsumer.java b/src/main/java/com/sczx/sync/config/KafkaConsumer.java new file mode 100644 index 0000000..399f71c --- /dev/null +++ b/src/main/java/com/sczx/sync/config/KafkaConsumer.java @@ -0,0 +1,26 @@ +package com.sczx.sync.config; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; +import org.springframework.beans.factory.annotation.Autowired; + +@Component +public class KafkaConsumer { + + @Autowired + private ThreadPoolConfig threadPoolConfig; + + @KafkaListener(topics = "jt808_forward_prod") + public void listen(String message) { + // 将消息处理任务提交到线程池 + threadPoolConfig.kafkaMessageExecutor().execute(() -> { + processMessage(message); + }); + } + + private void processMessage(String message) { + // 实际的消息处理逻辑 + System.out.println("Processing message: " + message); + // 执行耗时操作... + } +} diff --git a/src/main/java/com/sczx/sync/config/KafkaConsumerConfig.java b/src/main/java/com/sczx/sync/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..9ba3072 --- /dev/null +++ b/src/main/java/com/sczx/sync/config/KafkaConsumerConfig.java @@ -0,0 +1,102 @@ +package com.sczx.sync.config; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + + +@Configuration +@EnableKafka +public class KafkaConsumerConfig { + + @Value("${kafka.bootstrap-servers}") + private String bootstrapServers; + + @Value("${kafka.consumer.group-id}") + private String groupId; + + + @Value("${kafka.properties.sasl.jaas.config}") + private String kafkaPropertiesConfig; + + @Bean + public DefaultKafkaConsumerFactory consumerFactory() throws IOException { + Map props = new HashMap<>(); + + // 设置接入点 + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + // 设置SSL信任库 + InputStream truststoreStream = KafkaConsumerConfig.class.getClassLoader().getResourceAsStream("mix.4096.client.truststore.jks"); + if (truststoreStream != null) { + Path tempFile = Files.createTempFile("truststore", ".jks"); + Files.copy(truststoreStream, tempFile, StandardCopyOption.REPLACE_EXISTING); + props.put("ssl.truststore.location", tempFile.toString()); + } else { + System.err.println("Truststore not found in classpath"); + } + + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient"); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); + // SASL 配置 + String saslMechanism = "PLAIN"; + props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + props.put("sasl.jaas.config",kafkaPropertiesConfig); + + + + props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); + props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000); + props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30); + + // 明确指定反序列化器 + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + + // 修复点:显式指定泛型类型 + return new DefaultKafkaConsumerFactory<>(props); + } + + + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + + try { + factory.setConsumerFactory(consumerFactory()); + } catch (IOException e) { + throw new RuntimeException("Failed to create Kafka consumer factory", e); + } + + return factory; + } + +} diff --git a/src/main/java/com/sczx/sync/config/ThreadPoolConfig.java b/src/main/java/com/sczx/sync/config/ThreadPoolConfig.java new file mode 100644 index 0000000..25b470e --- /dev/null +++ b/src/main/java/com/sczx/sync/config/ThreadPoolConfig.java @@ -0,0 +1,22 @@ +package com.sczx.sync.config; + +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; + +@Configuration +@EnableAsync +public class ThreadPoolConfig { + + public Executor kafkaMessageExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(10); + executor.setMaxPoolSize(20); + executor.setQueueCapacity(200); + executor.setThreadNamePrefix("kafka-message-processor-"); + executor.initialize(); + return executor; + } +} diff --git a/src/main/resources/mix.4096.client.truststore.jks b/src/main/resources/mix.4096.client.truststore.jks new file mode 100644 index 0000000..d26f7cd Binary files /dev/null and b/src/main/resources/mix.4096.client.truststore.jks differ