From 52575397274979a60c24439ff4780b4b61633b26 Mon Sep 17 00:00:00 2001 From: eric <465889110@qq.com> Date: Tue, 14 Oct 2025 22:18:20 +0800 Subject: [PATCH] =?UTF-8?q?kafka=E6=B6=88=E8=B4=B9=E8=BD=A6=E8=BE=86?= =?UTF-8?q?=E5=AE=9E=E6=97=B6=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 9 +- src/main/java/com/sczx/sync/Application.java | 3 - .../com/sczx/sync/Task/DataResendTask.java | 76 ++++++------- .../com/sczx/sync/config/KafkaConsumer.java | 26 +++++ .../sczx/sync/config/KafkaConsumerConfig.java | 102 ++++++++++++++++++ .../sczx/sync/config/ThreadPoolConfig.java | 22 ++++ .../resources/mix.4096.client.truststore.jks | Bin 0 -> 2253 bytes 7 files changed, 196 insertions(+), 42 deletions(-) create mode 100644 src/main/java/com/sczx/sync/config/KafkaConsumer.java create mode 100644 src/main/java/com/sczx/sync/config/KafkaConsumerConfig.java create mode 100644 src/main/java/com/sczx/sync/config/ThreadPoolConfig.java create mode 100644 src/main/resources/mix.4096.client.truststore.jks diff --git a/pom.xml b/pom.xml index c9e525b..8944a80 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ 1.8 - 2.3.12.RELEASE + 2.7.18 Hoxton.SR12 2.2.9.RELEASE @@ -179,6 +179,13 @@ fastjson 1.2.83 + + + + + org.springframework.kafka + spring-kafka + diff --git a/src/main/java/com/sczx/sync/Application.java b/src/main/java/com/sczx/sync/Application.java index ae10340..7047eaa 100644 --- a/src/main/java/com/sczx/sync/Application.java +++ b/src/main/java/com/sczx/sync/Application.java @@ -7,7 +7,6 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.core.env.Environment; import org.springframework.retry.annotation.EnableRetry; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.transaction.annotation.EnableTransactionManagement; @@ -25,7 +24,5 @@ public class Application { public static void main(String[] args) throws IOException { ConfigurableApplicationContext context = SpringApplication.run(Application.class, args); - Environment environment = context.getBean(Environment.class); - log.info("启动成功,后端服务API地址:http://{}:{}/swagger-ui.html", ComputerInfo.getIpAddr(), environment.getProperty("server.port")); } } diff --git a/src/main/java/com/sczx/sync/Task/DataResendTask.java b/src/main/java/com/sczx/sync/Task/DataResendTask.java index 3ceea81..45df074 100644 --- a/src/main/java/com/sczx/sync/Task/DataResendTask.java +++ b/src/main/java/com/sczx/sync/Task/DataResendTask.java @@ -1,38 +1,38 @@ -package com.sczx.sync.Task; - -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.sczx.sync.mapper.DataReceiveRecordMapper; -import com.sczx.sync.mapper.OrderBatteryInfoMapper; -import com.sczx.sync.po.DataReceivePo; -import com.sczx.sync.service.impl.SendDataServiceImpl; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.util.List; - -// 定时任务类 -@Component -public class DataResendTask { - - @Autowired - private SendDataServiceImpl sendDataService; - - @Autowired - private DataReceiveRecordMapper dataReceiveRecordMapper; - - @Autowired - private OrderBatteryInfoMapper orderBatteryInfoMapper; - - @Scheduled(fixedRate = 15000) - public void resendFailedData() { - // 查询所有发送失败的记录 - List failedRecords = dataReceiveRecordMapper.selectList(new QueryWrapper().eq("status", 3).eq("data_type", "batteryorder")); - for (DataReceivePo record : failedRecords) { - String status = orderBatteryInfoMapper.selectOrderStatus(record.getCid()); - if (status.equals("RENT_ING")) { - sendDataService.retryForward(record.getId()); - } - } - } -} +//package com.sczx.sync.Task; +// +//import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +//import com.sczx.sync.mapper.DataReceiveRecordMapper; +//import com.sczx.sync.mapper.OrderBatteryInfoMapper; +//import com.sczx.sync.po.DataReceivePo; +//import com.sczx.sync.service.impl.SendDataServiceImpl; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.scheduling.annotation.Scheduled; +//import org.springframework.stereotype.Component; +// +//import java.util.List; +// +//// 定时任务类 +//@Component +//public class DataResendTask { +// +// @Autowired +// private SendDataServiceImpl sendDataService; +// +// @Autowired +// private DataReceiveRecordMapper dataReceiveRecordMapper; +// +// @Autowired +// private OrderBatteryInfoMapper orderBatteryInfoMapper; +// +// @Scheduled(fixedRate = 15000) +// public void resendFailedData() { +// // 查询所有发送失败的记录 +// List failedRecords = dataReceiveRecordMapper.selectList(new QueryWrapper().eq("status", 3).eq("data_type", "batteryorder")); +// for (DataReceivePo record : failedRecords) { +// String status = orderBatteryInfoMapper.selectOrderStatus(record.getCid()); +// if (status.equals("RENT_ING")) { +// sendDataService.retryForward(record.getId()); +// } +// } +// } +//} diff --git a/src/main/java/com/sczx/sync/config/KafkaConsumer.java b/src/main/java/com/sczx/sync/config/KafkaConsumer.java new file mode 100644 index 0000000..399f71c --- /dev/null +++ b/src/main/java/com/sczx/sync/config/KafkaConsumer.java @@ -0,0 +1,26 @@ +package com.sczx.sync.config; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; +import org.springframework.beans.factory.annotation.Autowired; + +@Component +public class KafkaConsumer { + + @Autowired + private ThreadPoolConfig threadPoolConfig; + + @KafkaListener(topics = "jt808_forward_prod") + public void listen(String message) { + // 将消息处理任务提交到线程池 + threadPoolConfig.kafkaMessageExecutor().execute(() -> { + processMessage(message); + }); + } + + private void processMessage(String message) { + // 实际的消息处理逻辑 + System.out.println("Processing message: " + message); + // 执行耗时操作... + } +} diff --git a/src/main/java/com/sczx/sync/config/KafkaConsumerConfig.java b/src/main/java/com/sczx/sync/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..9ba3072 --- /dev/null +++ b/src/main/java/com/sczx/sync/config/KafkaConsumerConfig.java @@ -0,0 +1,102 @@ +package com.sczx.sync.config; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + + +@Configuration +@EnableKafka +public class KafkaConsumerConfig { + + @Value("${kafka.bootstrap-servers}") + private String bootstrapServers; + + @Value("${kafka.consumer.group-id}") + private String groupId; + + + @Value("${kafka.properties.sasl.jaas.config}") + private String kafkaPropertiesConfig; + + @Bean + public DefaultKafkaConsumerFactory consumerFactory() throws IOException { + Map props = new HashMap<>(); + + // 设置接入点 + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + // 设置SSL信任库 + InputStream truststoreStream = KafkaConsumerConfig.class.getClassLoader().getResourceAsStream("mix.4096.client.truststore.jks"); + if (truststoreStream != null) { + Path tempFile = Files.createTempFile("truststore", ".jks"); + Files.copy(truststoreStream, tempFile, StandardCopyOption.REPLACE_EXISTING); + props.put("ssl.truststore.location", tempFile.toString()); + } else { + System.err.println("Truststore not found in classpath"); + } + + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient"); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); + // SASL 配置 + String saslMechanism = "PLAIN"; + props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + props.put("sasl.jaas.config",kafkaPropertiesConfig); + + + + props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); + props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000); + props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30); + + // 明确指定反序列化器 + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + + // 修复点:显式指定泛型类型 + return new DefaultKafkaConsumerFactory<>(props); + } + + + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + + try { + factory.setConsumerFactory(consumerFactory()); + } catch (IOException e) { + throw new RuntimeException("Failed to create Kafka consumer factory", e); + } + + return factory; + } + +} diff --git a/src/main/java/com/sczx/sync/config/ThreadPoolConfig.java b/src/main/java/com/sczx/sync/config/ThreadPoolConfig.java new file mode 100644 index 0000000..25b470e --- /dev/null +++ b/src/main/java/com/sczx/sync/config/ThreadPoolConfig.java @@ -0,0 +1,22 @@ +package com.sczx.sync.config; + +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; + +@Configuration +@EnableAsync +public class ThreadPoolConfig { + + public Executor kafkaMessageExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(10); + executor.setMaxPoolSize(20); + executor.setQueueCapacity(200); + executor.setThreadNamePrefix("kafka-message-processor-"); + executor.initialize(); + return executor; + } +} diff --git a/src/main/resources/mix.4096.client.truststore.jks b/src/main/resources/mix.4096.client.truststore.jks new file mode 100644 index 0000000000000000000000000000000000000000..d26f7cd972598df268188d9b30a4381bd6d80ad5 GIT binary patch literal 2253 zcmc)Lc|4T+9tZGeW(;P=ma!9#q7d^uBcx;}jXT86Qb%25NlDhRB{I`u(AXI}*_U&L zdu-uYD$2g(bqxx+%Dx=3b#RAHz1-J1|K8X6MaznBn3YqVaRgr`oG{!TZqN~06>=KP(!R}2OlqYP>bZIAbC(IDG2s3qF{kcr$bCR-MlM2Ht zOYDZkQM&EyDZ~25x4BGecm3SEq~jE>ueEUtoSPIeUmx&&@h;b2$4CRsCB6psiT4lY z$`}+A^OqkkY5BibRI;D=QRvL`$&%+Wd?oYr>WIFl zd1W<(5o-O)5gAk0W!K-wk#@zL_Hk5N>xPh&tT-`#y=o+~r^0u!AS*P^qi5?WM6OM( zR?Ko-no=uluxTY1mK$A*UR^0s3W;k~kj@DgmdM?)^@H*-P$|tcb|lt-eX57?M0(t* ztLEg{%4RgqB|Bq%4lQ32Zdoc=*c@78E_J~!R`_2pe!BeS{x6OvQu*Q-&aJ;@2JTT^ z%ZHXfDO@u1^r#pK81|`Q?42dnkF*qWl(}d$z%7%4d|Mdt|FO9DMC&#A88Rb4=*Y0X zIvfE5V23+|bq!b@0s}Oy>{5h6dk3pwMg=dsORboTf0K%T>b~q(HX*y#TWZu)D3=j5 zMt@5dZf7!!_t*>cyCkbC_DsZ7Bq9c^;5oDEarl!`fePi8v!1J-)V^fxys9u4&+Pzc zf?8};=LBJ@OYgM_#oG3Xta*mozu~N_P4uG^;OCDh@jLVBm!RalgVS2atM#YTm6y<7 zvT~RwuJ$A1V0HA*ZH;6l+?%AU&!>8px|Xr@+R_HEU&b0Hzux&UTc2U8pU-%(QX$ui+iFAJC!SF>KQf>T4{Z3&X7J;e9AYdsAY{{{ zd>irjS;|84v#7E19goS2V#&X1Go)WPIB^a%>qxI97la=7_fPor6o5p#K17U(WMSmx zV^{)B8?K9=CJxx9@^`wP8=IUpv8|iL5|cW8uLaYrcl~6#Zy?&HBC?G6ihvN)s7#Z) zjWN8lLJ7j%IN6_u48m-JFZ-t8gnsvdX4)N?% zikrb3Yp&zYM5$>`w8gjNt=|{$h*^)nl=z$E(c;a$1lbR@y;ADj8Ojk;blEa7STzR! zW-n}T;{a5SubAQwG_8H#{#{dq4ogx^mLhps2sjMRBK4)KyH1CS_59&pLb6Ep`d?Ci z_mvY1H@^C9hU4%6jfLy#eRCFps?13 z(pP~HqwIpOb9 zY=C8|p)BXFOq({K^jyZpdFq)&qpgA;3m&!K4A4zw>v6zlKe?oYN(o+Gk@QuNe=nbHBWtW$$v$OsfXH-Yk!wmHYA9E!S)Upp4F5HR>l8r za{f$#4$8L3ufKVDOJ94J=pbU@eJq9ST;zKKA8og~aehVH?FY@>+H6sZWZH0|y3!Sb z|3fMEdfaDdqfKT!NR*JEh^PNDQ@yC_Zao(LNGrpgM0E)s$~(?alMQMaO}||;N4elG w@X}PRO%k2Qv(|Ky81c^EJ0#ddl-+{YEJfh`Y*AT|@tdXAsI;cmkDdAd1%9q#Q2+n{ literal 0 HcmV?d00001