更改ZooKeeper、Kafka配置文件
- 更改ZooKeeper配置文件
vi /usr/local/kafka/config/zookeeper.properties
# 存储 Zookeeper 快照的目录 dataDir=/tmp/zookeeper # 客户端连接的端口 clientPort=2181 # 禁用每个 IP 地址的连接数量限制,因为这是非生产环境配置 maxClientCnxns=0 # 默认禁用 adminserver 以避免端口冲突。 # 如果选择启用,可以将端口设置为非冲突的值 admin.enableServer=false # admin.serverPort=8080 # Kerberos 配置 # 使用 SASLAuthenticationProvider 进行 Kerberos 认证 authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider # JAAS 登录续期时间,单位为毫秒,这里设置为 3600000 毫秒(即 1 小时) jaasLoginRenew=3600000
- 更改Kafka配置文件
vi /usr/local/kafka/config/server.properties
# Broker 的唯一标识符,每个 broker 必须设置为一个唯一的整数 broker.id=0 # 监听客户端连接的地址和端口,使用 SASL_PLAINTEXT 协议 listeners=SASL_PLAINTEXT://:9092 # 给客户端的连接地址和端口,通常用于消费者连接 advertised.listeners=SASL_PLAINTEXT://jd-lavm:9092 # 将监听器名称映射到安全协议,默认为相同 #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # 服务器用于接收网络请求和发送响应的线程数量 num.network.threads=3 # 服务器用于处理请求的线程数量,包括磁盘 I/O 操作 num.io.threads=8 # Socket 服务器使用的发送缓冲区大小(SO_SNDBUF) socket.send.buffer.bytes=102400 # Socket 服务器使用的接收缓冲区大小(SO_RCVBUF) socket.receive.buffer.bytes=102400 # Socket 服务器接受的请求最大大小(防止内存溢出) socket.request.max.bytes=104857600 # 存储日志文件的目录列表,以逗号分隔 log.dirs=/tmp/kafka-logs # 每个主题的默认分区数 num.partitions=1 # 每个数据目录的恢复线程数量 num.recovery.threads.per.data.dir=1 # 偏移量主题的复制因子 offsets.topic.replication.factor=1 # 事务状态日志的复制因子 transaction.state.log.replication.factor=1 # 事务状态日志的最小ISR(在同步副本中的副本数量) transaction.state.log.min.isr=1 # 日志文件的最小存活时间,以便根据时间进行删除 log.retention.hours=168 # 检查日志段是否可以根据保留策略删除的时间间隔 log.retention.check.interval.ms=300000 # Zookeeper 的根目录,用于存储所有 Kafka znode zookeeper.connect=localhost:2181 # 连接 Zookeeper的超时时间(以毫秒为单位) zookeeper.connection.timeout.ms=18000 # 初始再平衡延迟时间(以毫秒为单位) group.initial.rebalance.delay.ms=0 # 是否自动创建主题 auto.create.topics.enable=true # Broker之间的安全协议 security.inter.broker.protocol=SASL_PLAINTEXT # Broker 之间通信所用的 SASL 机制 sasl.mechanism.inter.broker.protocol=GSSAPI # 启用的SASL机制 sasl.enabled.mechanisms=GSSAPI # Kerberos服务的名称(需要与keytab文件定义的服务主体名称一致,在本配置中服务主体是kafka/[email protected]) sasl.kerberos.service.name=kafka
JAAS配置
- 创建kafka的jaas配置文件
cd /usr/local/kafka/config/kraft/keytab touch kafka-jaas.conf
KafkaServer { # 使用Kerberos登录模块进行身份验证 com.sun.security.auth.module.Krb5LoginModule required # 指定使用Keytab文件进行认证 useKeyTab=true # 指定Keytab文件的路径 keyTab="/usr/local/kafka/config/kraft/keytab/kafka.keytab" # 存储密钥以供后续使用 storeKey=true # 不使用票据缓存,直接使用Keytab进行认证 useTicketCache=false # 指定 Kafka服务的Kerberos主体 principal="kafka/[email protected]"; }; # 用于向ZooKeeper注册服务时的Kerberos身份验证 Client { # 使用Kerberos登录模块进行身份验证 com.sun.security.auth.module.Krb5LoginModule required # 指定使用Keytab文件进行认证 useKeyTab=true # 指定Zookeeper的Keytab文件路径 keyTab="/usr/local/kafka/config/kraft/keytab/zookeeper.keytab" # 存储密钥以供后续使用 storeKey=true # 不使用票据缓存,直接使用Keytab进行认证 useTicketCache=false # 指定Zookeeper服务的Kerberos主体 principal="zookeeper/[email protected]"; };
- 创建ZooKeeper的jaas配置文件
cd /usr/local/kafka/config/kraft/keytab touch zookeeper-jaas.conf
Server { # 使用Kerberos登录模块进行身份验证 com.sun.security.auth.module.Krb5LoginModule required # 指定使用Keytab文件进行认证 useKeyTab=true # 指定Zookeeper的Keytab文件路径 keyTab="/usr/local/kafka/config/kraft/keytab/zookeeper.keytab" # 存储密钥以供后续使用 storeKey=true # 不使用票据缓存,直接使用Keytab进行认证 useTicketCache=false # 指定Zookeeper服务的Kerberos主体 principal="zookeeper/[email protected]"; };
更改ZooKeeper、Kafka启动sh文件
-
为ZooKeeper启动文件添加环境变量(可以复制出来一份zookeeper-server-start.sh重命名一下)
#!/bin/bash if [ $# -lt 1 ]; then echo "USAGE: $0 [-daemon] zookeeper.properties" exit 1 fi base_dir=$(dirname $0) if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties" fi if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M" fi EXTRA_ARGS=${EXTRA_ARGS-'-name zookeeper -loggc'} COMMAND=$1 case $COMMAND in -daemon) EXTRA_ARGS="-daemon "$EXTRA_ARGS shift ;; *) ;; esac # 添加参数告知Java应用程序使用指定的JAAS配置文件进行身份验证 export JVMFLAGS="-Djava.security.auth.login.config=/usr/local/kafka/config/kraft/keytab/zookeeper-jaas.conf" exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"
-
为kafka启动文件添加环境变量(可以复制出来一份kafka-server-start.sh重命名一下)
#!/bin/bash if [ $# -lt 1 ]; then echo "USAGE: $0 [-daemon] server.properties [--override property=value]*" exit 1 fi base_dir=$(dirname $0) if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties" fi if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'} COMMAND=$1 case $COMMAND in -daemon) EXTRA_ARGS="-daemon "$EXTRA_ARGS shift ;; *) ;; esac # -Djava.security.auth.login.config= 用于指定 JAAS 配置文件的位置,文件中定义了ZooKeeper Kerberos身份验证所需的登录上下文 # -Djava.security.krb5.conf= 用于指定 Kerberos 配置文件的位置,该文件通常包含Kerberos领域、KDC(密钥分发中心)地址等信息 # -Djava.security.auth.login.config= 用于Kafka的JAAS配置,包含Kafka连接Zookeeper或其他服务的身份验证信息。 export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kraft/keytab/zookeeper-jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/usr/local/kafka/config/kraft/keytab/kafka-jaas.conf" exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
启动ZooKeeper、Kafka,并使用Datagrip连接Kafka
- 启动ZooKeeper、Kafka
# 启动ZooKeeper并指定配置文件 nohup /usr/local/kafka/bin/zookeeper-server-sasl-start.sh /usr/local/kafka/config/zookeeper.properties & # 启动Kafka并指定配置文件 nohup /usr/local/kafka/bin/kafka-server-sasl-start.sh /usr/local/kafka/config/server.properties &
- 配置Datagrip
- 安装Big Data Tools、Kafka插件
- 连接配置
- 安装Big Data Tools、Kafka插件
- 需要注意Principal与keytab需要对应上,可以单独生成给客户端的Principal与keytab
- krb5.conf配置需要点击IDE Kerberos settings
- 此时的krb5.conf需要注意要注释或删除掉
includedir /etc/krb5.conf.d/
,否则会错误识别,导致Datagrip读取krb5.conf失败
- 查看Kafka Topics,创建生产者、消费者
- 查看Kafka Topics,创建生产者、消费者
Comments NOTHING