Kafka KRaft线上集群部署实战(broker、controller分离部署)
这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
kafka版本
- kafka_2.13-3.5.0
背景
继之前Linux Kafka 3.5 KRaft模式集群部署:weihubeats.blog.csdn.net/article/det…
我们采用的 一个节点即是broker
又是controller
,这种部署方式仅适合于测试环境,本次我们要实现线上环境的Kafka
部署
所以我们的部署方式为broker
和controller
分离
容量评估
- qps:假设
qps
1000左右 - 磁盘: 假设每天消息量1000w左右,每条消息大小2M,消息保存时间为14天。副本为2个。那么总的磁盘空间为 1000w * 2M * 14 * 2 / 1024 = 54g左右,还有其他索引数据啥的,预留10% 大概50~60g左右,所以暂时100g够用。磁盘扩容比价方便
机器
所以暂时选着5台机器部署
- 3台部署KRaft
- 2台部署broker
机器配置暂时都是4核16g,磁盘100g
这里和测试环境部署不一样的是broker
和controller
是独立部署的
假设我们的机器ip是
机器 | ip | 域名 | 部署角色 |
---|---|---|---|
1 | 192.168.1.1 | kafka-controller-prd-001.com | controller |
2 | 192.168.1.2 | kafka-controller-prd-002.com | controller |
3 | 192.168.1.3 | kafka-controller-prd-003.com | controller |
4 | 192.168.1.4 | kafka-prd-001.com | broker |
5 | 192.168.1.5 | kafka-prd-002.com | broker |
注意所有机器的9092
、9093
、9099
端口都打开
注意为了保证高可用最好保证部署broker和controller不同副本在不同的可用区
配置文件
controller
controller2
和controller3
的配置和controller1
其实是差不多的,只不过node.id
不同,其次是listeners
要对应自己的域名
注意
node.id
中的id
和controller.quorum.voters
中的数字要对应上
controller1
# Licensed to the Apache Software Foundation (ASF) under one or more
1. contributor license agreements. See the NOTICE file distributed with
1. this work for additional information regarding copyright ownership.
1. The ASF licenses this file to You under the Apache License, Version 2.0
1. (the "License"); you may not use this file except in compliance with
1. the License. You may obtain a copy of the License at
1. 1. http://www.apache.org/licenses/LICENSE-2.0
1. 1. Unless required by applicable law or agreed to in writing, software
1. distributed under the License is distributed on an "AS IS" BASIS,
1. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1. See the License for the specific language governing permissions and
1. limitations under the License.
1. 1. This configuration file is intended for use in KRaft mode, where
1. Apache ZooKeeper is not present.
1.
############################# Server Basics #############################
1. The role of this server. Setting this puts us in KRaft mode
process.roles=controller
1. The node id associated with this instance's roles
node.id=1
1. The connect string for the controller quorum
controller.quorum.voters=1@kafka-controller-prd-001.com:9093,2@kafka-controller-prd-001.com:9093,3@kafka-controller-prd-001.com:9093
############################# Socket Server Settings #############################
1. The address the socket server listens on.
1. Note that only the controller listeners are allowed here when `process.roles=controller`, and this listener should be consistent with `controller.quorum.voters` value.
1. FORMAT:
1. listeners = listener_name://host_name:port
1. EXAMPLE:
1. listeners = PLAINTEXT://your.host.name:9092
listeners=CONTROLLER://kafka-controller-prd-001.com:9093
1. A comma-separated list of the names of the listeners used by the controller.
1. This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
1. Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
1. The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
1. The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
1. The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
1. The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
1. The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
1. A comma separated list of directories under which to store log files
log.dirs=/tmp/kraft-controller-logs
1. The default number of log partitions per topic. More partitions allow greater
1. parallelism for consumption, but this will also result in more files across
1. the brokers.
num.partitions=2
1. The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
1. This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
1. The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
1. For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=2
############################# Log Flush Policy #############################
1. Messages are immediately written to the filesystem but by default we only fsync() to sync
1. the OS cache lazily. The following configurations control the flush of data to disk.
1. There are a few important trade-offs here:
1. 1. Durability: Unflushed data may be lost if you are not using replication.
1. 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
1. 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
1. The settings below allow one to configure the flush policy to flush data after a period of time or
1. every N messages (or both). This can be done globally and overridden on a per-topic basis.
1. The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
1. The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
1. The following configurations control the disposal of log segments. The policy can
1. be set to delete segments after a period of time, or after a given size has accumulated.
1. A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
1. from the end of the log.
1. The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
1. A size-based retention policy for logs. Segments are pruned from the log unless the remaining
1. segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
1. The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
1. The interval at which log segments are checked to see if they can be deleted according
1. to the retention policies
log.retention.check.interval.ms=300000
controller2
# Licensed to the Apache Software Foundation (ASF) under one or more
1. contributor license agreements. See the NOTICE file distributed with
1. this work for additional information regarding copyright ownership.
1. The ASF licenses this file to You under the Apache License, Version 2.0
1. (the "License"); you may not use this file except in compliance with
1. the License. You may obtain a copy of the License at
1. 1. http://www.apache.org/licenses/LICENSE-2.0
1. 1. Unless required by applicable law or agreed to in writing, software
1. distributed under the License is distributed on an "AS IS" BASIS,
1. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1. See the License for the specific language governing permissions and
1. limitations under the License.
1. 1. This configuration file is intended for use in KRaft mode, where
1. Apache ZooKeeper is not present.
1.
############################# Server Basics #############################
1. The role of this server. Setting this puts us in KRaft mode
process.roles=controller
1. The node id associated with this instance's roles
node.id=2
1. The connect string for the controller quorum
controller.quorum.voters=1@kafka-controller-prd-001.com:9093,2@kafka-controller-prd-001.com:9093,3@kafka-controller-prd-001.com:9093
############################# Socket Server Settings #############################
1. The address the socket server listens on.
1. Note that only the controller listeners are allowed here when `process.roles=controller`, and this listener should be consistent with `controller.quorum.voters` value.
1. FORMAT:
1. listeners = listener_name://host_name:port
1. EXAMPLE:
1. listeners = PLAINTEXT://your.host.name:9092
listeners=CONTROLLER://kafka-controller-prd-002.com:9093
1. A comma-separated list of the names of the listeners used by the controller.
1. This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
1. Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
1. The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
1. The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
1. The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
1. The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
1. The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
1. A comma separated list of directories under which to store log files
log.dirs=/tmp/kraft-controller-logs
1. The default number of log partitions per topic. More partitions allow greater
1. parallelism for consumption, but this will also result in more files across
1. the brokers.
num.partitions=2
1. The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
1. This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
1. The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
1. For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=2
############################# Log Flush Policy #############################
1. Messages are immediately written to the filesystem but by default we only fsync() to sync
1. the OS cache lazily. The following configurations control the flush of data to disk.
1. There are a few important trade-offs here:
1. 1. Durability: Unflushed data may be lost if you are not using replication.
1. 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
1. 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
1. The settings below allow one to configure the flush policy to flush data after a period of time or
1. every N messages (or both). This can be done globally and overridden on a per-topic basis.
1. The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
1. The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
1. The following configurations control the disposal of log segments. The policy can
1. be set to delete segments after a period of time, or after a given size has accumulated.
1. A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
1. from the end of the log.
1. The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
1. A size-based retention policy for logs. Segments are pruned from the log unless the remaining
1. segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
1. The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
1. The interval at which log segments are checked to see if they can be deleted according
1. to the retention policies
log.retention.check.interval.ms=300000
controller3
# Licensed to the Apache Software Foundation (ASF) under one or more
1. contributor license agreements. See the NOTICE file distributed with
1. this work for additional information regarding copyright ownership.
1. The ASF licenses this file to You under the Apache License, Version 2.0
1. (the "License"); you may not use this file except in compliance with
1. the License. You may obtain a copy of the License at
1. 1. http://www.apache.org/licenses/LICENSE-2.0
1. 1. Unless required by applicable law or agreed to in writing, software
1. distributed under the License is distributed on an "AS IS" BASIS,
1. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1. See the License for the specific language governing permissions and
1. limitations under the License.
1. 1. This configuration file is intended for use in KRaft mode, where
1. Apache ZooKeeper is not present.
1.
############################# Server Basics #############################
1. The role of this server. Setting this puts us in KRaft mode
process.roles=controller
1. The node id associated with this instance's roles
node.id=3
1. The connect string for the controller quorum
controller.quorum.voters=1@kafka-controller-prd-001.com:9093,2@kafka-controller-prd-001.com:9093,3@kafka-controller-prd-001.com:9093
############################# Socket Server Settings #############################
1. The address the socket server listens on.
1. Note that only the controller listeners are allowed here when `process.roles=controller`, and this listener should be consistent with `controller.quorum.voters` value.
1. FORMAT:
1. listeners = listener_name://host_name:port
1. EXAMPLE:
1. listeners = PLAINTEXT://your.host.name:9092
listeners=CONTROLLER://kafka-controller-prd-003.com:9093
1. A comma-separated list of the names of the listeners used by the controller.
1. This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
1. Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
1. The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
1. The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
1. The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
1. The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
1. The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
1. A comma separated list of directories under which to store log files
log.dirs=/tmp/kraft-controller-logs
1. The default number of log partitions per topic. More partitions allow greater
1. parallelism for consumption, but this will also result in more files across
1. the brokers.
num.partitions=2
1. The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
1. This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
1. The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
1. For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=2
############################# Log Flush Policy #############################
1. Messages are immediately written to the filesystem but by default we only fsync() to sync
1. the OS cache lazily. The following configurations control the flush of data to disk.
1. There are a few important trade-offs here:
1. 1. Durability: Unflushed data may be lost if you are not using replication.
1. 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
1. 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
1. The settings below allow one to configure the flush policy to flush data after a period of time or
1. every N messages (or both). This can be done globally and overridden on a per-topic basis.
1. The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
1. The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
1. The following configurations control the disposal of log segments. The policy can
1. be set to delete segments after a period of time, or after a given size has accumulated.
1. A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
1. from the end of the log.
1. The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
1. A size-based retention policy for logs. Segments are pruned from the log unless the remaining
1. segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
1. The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
1. The interval at which log segments are checked to see if they can be deleted according
1. to the retention policies
log.retention.check.interval.ms=300000
broker
这里broker
中的配置也是node.id
和listeners
不一样,其次是node.id
不能和controller
中的node.id
重复,要整个集群唯一
broker1
# Licensed to the Apache Software Foundation (ASF) under one or more
1. contributor license agreements. See the NOTICE file distributed with
1. this work for additional information regarding copyright ownership.
1. The ASF licenses this file to You under the Apache License, Version 2.0
1. (the "License"); you may not use this file except in compliance with
1. the License. You may obtain a copy of the License at
1. 1. http://www.apache.org/licenses/LICENSE-2.0
1. 1. Unless required by applicable law or agreed to in writing, software
1. distributed under the License is distributed on an "AS IS" BASIS,
1. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1. See the License for the specific language governing permissions and
1. limitations under the License.
1. 1. This configuration file is intended for use in KRaft mode, where
1. Apache ZooKeeper is not present.
1.
############################# Server Basics #############################
1. The role of this server. Setting this puts us in KRaft mode
process.roles=broker
1. The node id associated with this instance's roles
node.id=4
1. The connect string for the controller quorum
controller.quorum.voters=1@kafka-controller-prd-001.com:9093,2@kafka-controller-prd-001.com:9093,3@kafka-controller-prd-001.com:9093
############################# Socket Server Settings #############################
1. The address the socket server listens on. If not configured, the host name will be equal to the value of
1. java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
1. FORMAT:
1. listeners = listener_name://host_name:port
1. EXAMPLE:
1. listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://kafka-prd-001.com:9092
1. Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
1. A comma-separated list of the names of the listeners used by the controller.
1. This is required if running in KRaft mode. On a node with `process.roles=broker`, only the first listed listener will be used by the broker.
controller.listener.names=CONTROLLER
1. Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
1. The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
1. The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
1. The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
1. The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
1. The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
1. A comma separated list of directories under which to store log files
log.dirs=/tmp/kraft-broker-logs
1. The default number of log partitions per topic. More partitions allow greater
1. parallelism for consumption, but this will also result in more files across
1. the brokers.
num.partitions=2
1. The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
1. This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
1. The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
1. For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=2
############################# Log Flush Policy #############################
1. Messages are immediately written to the filesystem but by default we only fsync() to sync
1. the OS cache lazily. The following configurations control the flush of data to disk.
1. There are a few important trade-offs here:
1. 1. Durability: Unflushed data may be lost if you are not using replication.
1. 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
1. 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
1. The settings below allow one to configure the flush policy to flush data after a period of time or
1. every N messages (or both). This can be done globally and overridden on a per-topic basis.
1. The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
1. The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
1. The following configurations control the disposal of log segments. The policy can
1. be set to delete segments after a period of time, or after a given size has accumulated.
1. A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
1. from the end of the log.
1. The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
1. A size-based retention policy for logs. Segments are pruned from the log unless the remaining
1. segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
1. The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
1. The interval at which log segments are checked to see if they can be deleted according
1. to the retention policies
log.retention.check.interval.ms=300000
auto.create.topics.enable=false
message.max.bytes=10485760
broker2
# Licensed to the Apache Software Foundation (ASF) under one or more
1. contributor license agreements. See the NOTICE file distributed with
1. this work for additional information regarding copyright ownership.
1. The ASF licenses this file to You under the Apache License, Version 2.0
1. (the "License"); you may not use this file except in compliance with
1. the License. You may obtain a copy of the License at
1. 1. http://www.apache.org/licenses/LICENSE-2.0
1. 1. Unless required by applicable law or agreed to in writing, software
1. distributed under the License is distributed on an "AS IS" BASIS,
1. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1. See the License for the specific language governing permissions and
1. limitations under the License.
1. 1. This configuration file is intended for use in KRaft mode, where
1. Apache ZooKeeper is not present.
1.
############################# Server Basics #############################
1. The role of this server. Setting this puts us in KRaft mode
process.roles=broker
1. The node id associated with this instance's roles
node.id=5
1. The connect string for the controller quorum
controller.quorum.voters=1@kafka-controller-prd-001.com:9093,2@kafka-controller-prd-001.com:9093,3@kafka-controller-prd-001.com:9093
############################# Socket Server Settings #############################
1. The address the socket server listens on. If not configured, the host name will be equal to the value of
1. java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
1. FORMAT:
1. listeners = listener_name://host_name:port
1. EXAMPLE:
1. listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://kafka-prd-002.com:9092
1. Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
1. A comma-separated list of the names of the listeners used by the controller.
1. This is required if running in KRaft mode. On a node with `process.roles=broker`, only the first listed listener will be used by the broker.
controller.listener.names=CONTROLLER
1. Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
1. The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
1. The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
1. The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
1. The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
1. The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
1. A comma separated list of directories under which to store log files
log.dirs=/tmp/kraft-broker-logs
1. The default number of log partitions per topic. More partitions allow greater
1. parallelism for consumption, but this will also result in more files across
1. the brokers.
num.partitions=2
1. The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
1. This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
1. The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
1. For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=2
############################# Log Flush Policy #############################
1. Messages are immediately written to the filesystem but by default we only fsync() to sync
1. the OS cache lazily. The following configurations control the flush of data to disk.
1. There are a few important trade-offs here:
1. 1. Durability: Unflushed data may be lost if you are not using replication.
1. 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
1. 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
1. The settings below allow one to configure the flush policy to flush data after a period of time or
1. every N messages (or both). This can be done globally and overridden on a per-topic basis.
1. The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
1. The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
1. The following configurations control the disposal of log segments. The policy can
1. be set to delete segments after a period of time, or after a given size has accumulated.
1. A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
1. from the end of the log.
1. The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
1. A size-based retention policy for logs. Segments are pruned from the log unless the remaining
1. segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
1. The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
1. The interval at which log segments are checked to see if they can be deleted according
1. to the retention policies
log.retention.check.interval.ms=300000
auto.create.topics.enable=false
message.max.bytes=10485760
部署
1. 安装jdk11
sudo apt update
apt install openjdk-11-jdk -y
5台机器执行
2. 下载kafka二进制包
wget http://mirrors.aliyun.com/apache/kafka/3.5.0/kafka_2.13-3.5.0.tgz
5台机器执行
3. 解压
tar -xzf kafka_2.13-3.5.0.tgz
5台机器执行
4. 给集群生成一个UUID
kafka_2.13-3.0.0/bin/kafka-storage.sh random-uuid
单台机器执行即可
这里我生成的UUID为mzXiaoZouNnbe32DRMpSkX
5. 修改配置文件
1、2、3机器修改controller配置
即kafka_2.13-3.5.0/config/kraft/controller.properties
4、5机器修改配置kafka_2.13-3.5.0/config/kraft/broker.properties
6. 启动集群
启动controller
按顺序依次启动集群启动controller1
、controller2
、controller3
启动步骤(每个controller启动都是执行如下脚本)
sh /home/ubuntu/kafka_2.13-3.5.0/bin/kafka-storage.sh format -t mzDehZx0RNmke27PRMpNkA -c /home/ubuntu/kafka_2.13-3.5.0/config/kraft/controller.properties
export KAFKA_HEAP_OPTS="-Xmx11G -Xms11G"&&JMX_PORT=9988 nohup sh /data/kafka_2.13-3.5.0/bin/kafka-server-start.sh /data/kafka_2.13-3.5.0/config/kraft/controller.properties &
启动broker
按顺序依次启动集群启动broker1
、broker2
sh /home/ubuntu/kafka_2.13-3.5.0/bin/kafka-storage.sh format -t mzDehZx0RNmke27PRMpNkA -c /home/ubuntu/kafka_2.13-3.5.0/config/kraft/broker.properties
export KAFKA_HEAP_OPTS="-Xmx11G -Xms11G"&&JMX_PORT=9988 nohup sh /data/kafka_2.13-3.5.0/bin/kafka-server-start.sh /data/kafka_2.13-3.5.0/config/kraft/broker.properties &
接入点
由于我们之前设置了域名,所以我们的接入点是
kafka-prd-001.com:9092,kafka-prd-002.com:9092
删除元数据
如果要删除启动的元数据重新部署
rm -rf /tmp/kafka-logs /tmp/kraft-controller-logs /tmp/kraft-combined-logs
测试
测控和之前一样 可以参考 Linux Kafka 3.5 KRaft模式集群部署:weihubeats.blog.csdn.net/article/det…