AWS MSK
tio-boot整合AWS MSK
一、准备工作
接入前先确认这几件事:
- MSK 集群已创建
- Topic 已存在,或者由程序自动创建
- 应用运行环境能访问 MSK
- 应用绑定的 AWS 身份有足够权限
AWS 官方说明里提到,MSK Provisioned 集群默认是私网访问,客户端一般需要在同一 VPC 内;而 IAM 模式下,Java 客户端要用 aws-msk-iam-auth,配置 security.protocol=SASL_SSL、sasl.mechanism=AWS_MSK_IAM、IAMLoginModule 和 IAMClientCallbackHandler。(AWS Documentation)
二、Maven 依赖
AWS 官方文档给了 aws-msk-iam-auth 的 Maven 坐标,官方 GitHub README 当前示例版本是 2.3.5。(AWS Documentation)
<dependencies>
<!-- Kafka Java Client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.1</version>
</dependency>
<!-- AWS MSK IAM Authentication -->
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>2.3.5</version>
</dependency>
</dependencies>
or
<dependencies>
<!-- Kafka Java Client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.2</version>
</dependency>
<!-- AWS MSK IAM Authentication -->
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>1.1.8</version>
</dependency>
</dependencies>
说明:
kafka-clients版本通常选与你的项目兼容的稳定版本即可。aws-msk-iam-auth是 Java 连接 IAM 认证 MSK 的关键依赖。(GitHub)
三、app.properties 配置
结合你给的配置,可以这样整理:
# MSK
aws.msk.bootstrap-servers=server1:9098,sever2:9098
aws.msk.enable-producer=true
aws.msk.enable-consumer=false
aws.msk.producer.topic-name=jr_experience_verified_prod
aws.msk.consumer.topic-name=jr_experience_verified_prod
aws.msk.group-id=jr_experience_verified_group
# Producer
aws.msk.acks=all
aws.msk.retries=3
# Consumer
aws.msk.auto-offset-reset=earliest
aws.msk.enable-auto-commit=true
如果你后面想把生产者和消费者拆分成不同 topic,也可以再加:
aws.msk.producer-topic=jr_experience_verified_prod
aws.msk.consumer-topic=jr_experience_verified_prod
四、HelloApp 启动类
import com.litongjava.annotation.AComponentScan;
import com.litongjava.tio.boot.TioApplication;
@AComponentScan
public class HelloApp {
public static void main(String[] args) {
long start = System.currentTimeMillis();
TioApplication.run(HelloApp.class, args);
long end = System.currentTimeMillis();
System.out.println((end - start) + "ms");
}
}
五、KafkaProducerUtils
这个工具类负责保存 Producer,并提供发送消息的方法。
package com.litongjava.kafka;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class KafkaProducerUtils {
private static KafkaProducer<String, String> producer;
private static String topic;
public static void init(KafkaProducer<String, String> kafkaProducer, String topicName) {
producer = kafkaProducer;
topic = topicName;
}
public static KafkaProducer<String, String> getProducer() {
return producer;
}
public static void send(String message) {
send(topic, message);
}
public static void send(String topicName, String message) {
try {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, message);
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
log.info("Kafka message sent, topic:{}, partition:{}, offset:{}",
metadata.topic(), metadata.partition(), metadata.offset());
} catch (Exception e) {
log.error("Kafka send message failed", e);
}
}
}
六、KafkaConsumerRunner
这个类负责消费消息。为了跟你 EMQX 的风格接近,这里用一个线程持续轮询。
package com.litongjava.kafka;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class KafkaConsumerRunner implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final String topic;
private final AtomicBoolean running = new AtomicBoolean(true);
public KafkaConsumerRunner(KafkaConsumer<String, String> consumer, String topic) {
this.consumer = consumer;
this.topic = topic;
}
@Override
public void run() {
consumer.subscribe(Collections.singletonList(topic));
log.info("Kafka consumer subscribed topic: {}", topic);
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
log.info("Received message, topic:{}, partition:{}, offset:{}, key:{}, value:{}",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
}
}
} catch (Exception e) {
log.error("Kafka consumer error", e);
} finally {
try {
consumer.close();
} catch (Exception e) {
log.error("Kafka consumer close error", e);
}
}
}
public void shutdown() {
running.set(false);
}
}
七、MskClientConfig 配置类
这是整合的核心。这里会:
- 读取
app.properties - 创建 Producer
- 创建 Consumer
- 配置 IAM 认证
- 应用关闭时优雅释放资源
AWS 官方给 Java 的 IAM 配置要点是:
security.protocol=SASL_SSLsasl.mechanism=AWS_MSK_IAMsasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
这些都是下面代码里的关键配置。(AWS Documentation)
package com.litongjava.tio.boot.admin.utils;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.utils.hutool.StrUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class AwsProfileUtils {
public static String jaasConfig() {
String awsProfile = firstNonBlank(EnvUtils.get("AWS_PROFILE"), EnvUtils.get("aws.profile"),
EnvUtils.get("aws.msk.aws-profile"));
describeAuthMode(awsProfile);
String jaasConfig = buildJaasConfig(awsProfile);
return jaasConfig;
}
private static String buildJaasConfig(String awsProfile) {
if (StrUtil.isNotBlank(awsProfile)) {
log.info("AWS_PROFILE detected for MSK IAM auth: {}", awsProfile);
return "software.amazon.msk.auth.iam.IAMLoginModule required " + "awsProfileName=\"" + escapeJaasValue(awsProfile)
+ "\";";
}
log.info("AWS_PROFILE not found, using default AWS credentials provider chain for MSK IAM auth");
return "software.amazon.msk.auth.iam.IAMLoginModule required;";
}
private static String firstNonBlank(String... values) {
if (values == null) {
return null;
}
for (String value : values) {
if (StrUtil.isNotBlank(value)) {
return value.trim();
}
}
return null;
}
private static String escapeJaasValue(String value) {
if (value == null) {
return null;
}
return value.replace("\\", "\\\\").replace("\"", "\\\"");
}
private static String describeAuthMode(String awsProfile) {
if (StrUtil.isNotBlank(awsProfile)) {
return "AWS_PROFILE(" + awsProfile + ")";
}
return "DEFAULT_CREDENTIALS_CHAIN";
}
}
package com.litongjava.tio.boot.admin.config;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.litongjava.hook.HookCan;
import com.litongjava.tio.boot.admin.kafaka.KafkaConsumerRunner;
import com.litongjava.tio.boot.admin.kafaka.KafkaProducerUtils;
import com.litongjava.tio.boot.admin.utils.AwsProfileUtils;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.utils.hutool.StrUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TioAdminKafkaMskClientConfig {
private KafkaConsumerRunner consumerRunner;
private Thread consumerThread;
private KafkaProducer<String, String> producer;
public void config() {
String bootstrapServers = EnvUtils.get("aws.msk.bootstrap-servers");
if (StrUtil.isBlank(bootstrapServers)) {
log.info("aws.msk.bootstrap-servers is blank, skip MSK client initialization");
return;
}
boolean enableProducer = EnvUtils.getBoolean("aws.msk.enable-producer", true);
boolean enableConsumer = EnvUtils.getBoolean("aws.msk.enable-consumer", true);
if (!enableProducer && !enableConsumer) {
log.info("Both producer and consumer are disabled, skip MSK client initialization");
return;
}
String producerTopicName = EnvUtils.get("aws.msk.producer.topic-name");
String consumerTopicName = EnvUtils.get("aws.msk.consumer.topic-name");
String groupId = EnvUtils.get("aws.msk.group-id");
String acks = EnvUtils.get("aws.msk.acks", "all");
int retries = EnvUtils.getInt("aws.msk.retries", 3);
String autoOffsetReset = EnvUtils.get("aws.msk.auto-offset-reset", "earliest");
boolean enableAutoCommit = EnvUtils.getBoolean("aws.msk.enable-auto-commit", true);
String jaasConfig = AwsProfileUtils.jaasConfig();
if (enableProducer) {
initProducer(bootstrapServers, producerTopicName, acks, retries, jaasConfig);
} else {
log.info("Kafka producer disabled, config: aws.msk.enable-producer=false");
}
if (enableConsumer) {
initConsumer(bootstrapServers, consumerTopicName, groupId, autoOffsetReset, enableAutoCommit, jaasConfig);
} else {
log.info("Kafka consumer disabled, config: aws.msk.enable-consumer=false");
}
HookCan.me().addDestroyMethod(() -> {
try {
log.info("Shutting down Kafka resources");
if (consumerRunner != null) {
consumerRunner.shutdown();
}
if (consumerThread != null) {
consumerThread.interrupt();
}
if (producer != null) {
producer.close();
}
log.info("Kafka resources shutdown completed");
} catch (Exception e) {
log.error("Shutdown Kafka resources failed", e);
}
});
}
private void initProducer(String bootstrapServers, String producerTopicName, String acks, int retries, String jaasConfig) {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.ACKS_CONFIG, acks);
producerProps.put(ProducerConfig.RETRIES_CONFIG, retries);
producerProps.put("security.protocol", "SASL_SSL");
producerProps.put("sasl.mechanism", "AWS_MSK_IAM");
producerProps.put("sasl.jaas.config", jaasConfig);
producerProps.put("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
producer = new KafkaProducer<>(producerProps);
if (StrUtil.isNotBlank(producerTopicName)) {
KafkaProducerUtils.init(producer, producerTopicName);
log.info("Kafka producer initialized, default topicName:{}", producerTopicName);
} else {
log.info("Kafka producer initialized without default topic, dynamic topic mode");
}
}
private void initConsumer(String bootstrapServers, String consumerTopicName, String groupId, String autoOffsetReset,
boolean enableAutoCommit, String jaasConfig) {
if (StrUtil.isBlank(consumerTopicName)) {
log.info("aws.msk.consumer.topic-name is blank, skip Kafka consumer initialization");
return;
}
if (StrUtil.isBlank(groupId)) {
log.info("aws.msk.group-id is blank, skip Kafka consumer initialization");
return;
}
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
consumerProps.put("security.protocol", "SASL_SSL");
consumerProps.put("sasl.mechanism", "AWS_MSK_IAM");
consumerProps.put("sasl.jaas.config", jaasConfig);
consumerProps.put("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumerRunner = new KafkaConsumerRunner(consumer, consumerTopicName);
consumerThread = new Thread(consumerRunner, "msk-consumer-thread");
consumerThread.start();
log.info("Kafka consumer started, groupId:{}, topicName:{}", groupId, consumerTopicName);
}
}
八、IndexController
和你原来的 EMQX 示例一样,访问 HTTP 接口时发送一条 Kafka 消息。
package com.jobright.study.voice.agent.controller;
import com.litongjava.annotation.RequestPath;
import com.litongjava.tio.boot.admin.kafaka.KafkaProducerUtils;
@RequestPath("/kafaka/test")
public class KafakaTestController {
@RequestPath
public String index() {
String content = "Hello AWS MSK";
KafkaProducerUtils.send(content);
return "Message sent to AWS MSK";
}
}
如果你想传 JSON:
@RequestPath("/send")
public String send() {
String json = "{\"event\":\"experience_verified\",\"status\":\"success\"}";
KafkaProducerUtils.send(json);
return "ok";
}
九、AWS 凭证怎么提供
上面的代码没有显式写 AK/SK,这是因为 aws-msk-iam-auth 会按 AWS Java 默认凭证链取凭证。实际项目里通常有这几种方式:
1. EC2 绑定 IAM Role
最推荐。
2. ECS / EKS 绑定任务角色
云上容器常用。
3. 本地开发用 AWS Profile
AWS 官方文档提到,可以在配置中附带 awsProfileName="your profile name"; 使用命名 profile。(AWS Documentation)
例如本地调试时可把 JAAS 改成:
producerProps.put("sasl.jaas.config",
"software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=\"default\";");
consumer 同理。
十、IAM 权限策略
IAM 模式下,客户端权限必须显式授权。AWS 官方说明了:
- 默认拒绝,必须显式允许
- 生产至少要有
Connect、DescribeTopic、WriteData - 消费至少要有
Connect、DescribeTopic、DescribeGroup、AlterGroup、ReadData
同时,IAM 身份的授权由 IAM policy 决定,Kafka ACL 对 IAM 身份不起作用。(AWS Documentation)
一个常用的最小示例:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:DescribeCluster"
],
"Resource": [
"arn:aws:kafka:us-west-2:123456789012:cluster/jrmskmvp/cluster-uuid"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:DescribeTopic",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData"
],
"Resource": [
"arn:aws:kafka:us-west-2:123456789012:topic/jrmskmvp/cluster-uuid/jr_experience_verified_prod"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:DescribeGroup",
"kafka-cluster:AlterGroup"
],
"Resource": [
"arn:aws:kafka:us-west-2:123456789012:group/jrmskmvp/cluster-uuid/jr_experience_verified_group"
]
}
]
}
如果你的程序需要自动建 topic,还要加:
"kafka-cluster:CreateTopic"
AWS 文档也给出了 topic、group 这类 ARN 的格式。(AWS Documentation)
十一、如何获取正确的 bootstrap servers
你现在已经有:
aws.msk.bootstrap-servers=b-1....:9098,b-2....:9098
这可以直接用。但从 AWS 的推荐做法看,最好通过 MSK 控制台或 CLI 获取对应认证方式的 bootstrap brokers,并且连接串里尽量包含多个 AZ 的 broker,便于故障转移。(AWS Documentation)
CLI 方式:
aws kafka get-bootstrap-brokers \
--cluster-arn arn:aws:kafka:us-west-2:123456789012:cluster/jrmskmvp/cluster-uuid
十二、如果要自动创建 Topic
你可以在启动时用 AdminClient 创建 topic。
package com.litongjava.kafka.admin;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
public class KafkaAdminUtils {
public static void createTopicIfNotExists(String bootstrapServers, String topicName) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "AWS_MSK_IAM");
props.put("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;");
props.put("sasl.client.callback.handler.class",
"software.amazon.msk.auth.iam.IAMClientCallbackHandler");
try (AdminClient adminClient = AdminClient.create(props)) {
NewTopic newTopic = new NewTopic(topicName, 3, (short) 2);
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
}
}
}
注意:
- 自动建 topic 需要 IAM policy 里有
CreateTopic - 分区数、副本数要与你集群规模匹配
AWS 官方对创建 topic、写数据、读数据分别列出了所需权限。(AWS Documentation)
十三、完整发送示例
String content = "{\"uid\":123,\"event\":\"experience_verified\"}";
KafkaProducerUtils.send(content);
指定 topic:
KafkaProducerUtils.send("jr_experience_verified_prod",
"{\"uid\":123,\"event\":\"experience_verified\"}");
十四、常见问题排查
1. TimeoutException / 连不上 broker
优先检查网络。
- 应用是否和 MSK 在同一 VPC
- 安全组是否放通
- 路由是否可达
MSK 默认是私网接入,这类问题最常见。(AWS Documentation)
2. SaslAuthenticationException
优先检查 IAM 认证配置。
security.protocol=SASL_SSLsasl.mechanism=AWS_MSK_IAMaws-msk-iam-auth依赖是否引入- 运行环境是否真有 AWS 凭证
这些配置项是 AWS 官方要求的 Java IAM 接入方式。(AWS Documentation)
3. TopicAuthorizationException / GroupAuthorizationException
大概率是 IAM policy 不完整。
AWS 官方明确说默认拒绝,需要显式授权;生产、消费、建 topic 都是不同权限。(AWS Documentation)
4. 消费不到历史消息
检查:
auto.offset.reset=earliest
并确认消费者 group 是否首次消费该 topic。
十五、和你当前配置对应的最简接入要点
基于你现在的配置:
aws.msk.experience-verified.topic-name=jr_experience_verified_prod
aws.msk.bootstrap-servers=b-1.jrmskmvp.kou1x9.c3.kafka.us-west-2.amazonaws.com:9098,b-2.jrmskmvp.kou1x9.c3.kafka.us-west-2.amazonaws.com:9098
最核心就是这 4 件事:
bootstrap.servers用你现有的地址- Java 里加
aws-msk-iam-auth - Producer / Consumer 都加 IAM 的 4 个配置项
- 给运行环境绑定能访问这个 topic 和 group 的 IAM 权限
十六、建议的项目目录
src/main/java
├── com.litongjava
│ ├── HelloApp.java
│ ├── kafka
│ │ ├── KafkaProducerUtils.java
│ │ ├── KafkaConsumerRunner.java
│ │ └── admin
│ │ └── KafkaAdminUtils.java
│ ├── kafka/config
│ │ └── MskClientConfig.java
│ └── kafka/controller
│ └── IndexController.java
十七、总结
这套接入方案的核心是:
- tio-boot 负责应用启动和生命周期
- Kafka Java Client 负责生产消费
- aws-msk-iam-auth 负责 IAM 认证
- IAM policy 负责授权
- MSK 私网网络负责连通性
