kafka cluster on kubernetes
本文中的zookeeper和kafka皆採用自製的image
創建kafka和zookeeper cluster
kafka version: 2.8.0 (scala: 2.13)
zookeeper version: 3.7.0
環境
說明 | OS | IP | domain |
master node | CentOS 8 | 192.168.218.44 | k8s-01-master-node-01 |
worker node | CentOS 8 | 192.168.218.47 | k8s-01-worker-node-01 |
worker node | CentOS 8 | 192.168.218.45 | k8s-01-worker-node-02 |
worker node | CentOS 8 | 192.168.218.46 | k8s-01-worker-node-03 |
nfs file server | CentOS 8 | 192.168.218.53 | |
harbor(測試用) | CentOS 8 | 192.168.218.48 |
zookeeper
• step 1. 建立zookeeper image
start-zookeeper.sh
${HOSTNAME##*-}: 取得statefulset hostname上的order當作zookeeper id
echo $ZOOKEEPER_SERVER_ID > $ZOOKEEPER_DATA_DIRS/myid: 將取得的zookeeper id 放入myid文件中
sh $ZOOKEEPER_HOME/bin/zkServer.sh start-foreground: 啟動zookeeper
#!/bin/sh
export ZOOKEEPER_SERVER_ID=${HOSTNAME##*-}
export ZOOKEEPER_DATA_DIRS="$ZOOKEEPER_HOME/servers/zookeeper-$ZOOKEEPER_SERVER_ID/data"
export ZOOKEEPER_LOG_DIRS="$ZOOKEEPER_HOME/servers/zookeeper-$ZOOKEEPER_SERVER_ID/data/log"
echo "ZOOKEEPER_SERVER_ID = $ZOOKEEPER_SERVER_ID"
echo "ZOOKEEPER_DATA_DIRS = $ZOOKEEPER_DATA_DIRS"
echo "ZOOKEEPER_LOG_DIRS = $ZOOKEEPER_LOG_DIRS"
cp /config/* $ZOOKEEPER_HOME/conf/
mkdir -p "$ZOOKEEPER_DATA_DIRS"
mkdir -p "$ZOOKEEPER_LOG_DIRS"
echo dataDir=$ZOOKEEPER_DATA_DIRS >> $ZOOKEEPER_HOME/conf/zoo.cfg
echo dataLogDir=$ZOOKEEPER_LOG_DIRS >> $ZOOKEEPER_HOME/conf/zoo.cfg
echo $ZOOKEEPER_SERVER_ID > $ZOOKEEPER_DATA_DIRS/myid
sh $ZOOKEEPER_HOME/bin/zkServer.sh start-foreground
Dockerfile
FROM centos:centos8
ARG zookeeper_version=3.7.0
ENV ZOOKEEPER_VERSION=$zookeeper_version \
ZOOKEEPER_HOME=/opt/zookeeper
ENV PATH=${PATH}:${ZOOKEEPER_HOME}/bin/
ADD start-zookeeper.sh zookeeper-ready.sh /tmp/
RUN yum install -y wget java-1.8.0-openjdk nc
RUN wget -O /tmp/apache-zookeeper-${ZOOKEEPER_VERSION}-bin.tar.gz https://downloads.apache.org/zookeeper/zookeeper-${ZOOKEEPER_VERSION}/apache-zookeeper-${ZOOKEEPER_VERSION}-bin.tar.gz
RUN tar -zxvf /tmp/apache-zookeeper-${ZOOKEEPER_VERSION}-bin.tar.gz -C /tmp/ \
&& mv /tmp/apache-zookeeper-${ZOOKEEPER_VERSION}-bin /opt/zookeeper \
&& chmod a+x /tmp/*.sh \
&& mv /tmp/*.sh /usr/bin
ENTRYPOINT ["start-zookeeper.sh"]
• step 2. 建立zookeeper headless service
zookeeper-service-headless.yaml
statefulset 皆需建立headless service
apiVersion: v1
kind: Service
metadata:
name: zookeeper-service-headless
labels:
app: zookeeper-service-headless
spec:
clusterIP: None
selector:
app: zookeeper
ports:
- port: 9092
name: server
• step 3. 建立zookeeper pvc
zookeeper-cluster-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: zookeeper-cluster-pvc
namespace: default
spec:
accessModes: [ "ReadWriteMany" ]
storageClassName: "zookeeper-storage-class"
resources:
requests:
storage: 15Gi
• step 4. 建立zookeeper pv
zookeeper-pv.yaml
pv 採用nfs
apiVersion: v1
kind: PersistentVolume
metadata:
name: zookeeper-pv
spec:
storageClassName: zookeeper-storage-class
capacity:
storage: 15Gi
accessModes:
- ReadWriteMany
nfs:
path: "/var/nfsshare/zookeeper"
server: 192.168.218.53
• step 5. 建立zookeeper configmap
zookeeper-config-map.yaml
zookeeper參數放入configmap中
apiVersion: v1
kind: ConfigMap
metadata:
name: zookeeper-config-map
namespace: default
data:
zoo.cfg: |
clientPort=2181
tickTime=2000
initLimit=10
syncLimit=5
maxClientCnxns=60
minSessionTimeout=4000
maxSessionTimeout=40000
autopurge.snapRetainCount=3
autopurge.purgeInteval=12
reconfigEnabled=true
standaloneEnabled=false
4lw.commands.whitelist=ruok
server.0=zookeeper-cluster-0.zookeeper-service-headless.default.svc.cluster.local:2888:3888
server.1=zookeeper-cluster-1.zookeeper-service-headless.default.svc.cluster.local:2888:3888
server.2=zookeeper-cluster-2.zookeeper-service-headless.default.svc.cluster.local:2888:3888
• step 6. 建立zookeeper configmap
zookeeper-cluster-statefulset.yaml
zookeeper-cluster-pvc: 掛載zookeeper資料
zookeeper-config-map: 掛載zookeeper設定檔
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: zookeeper-cluster
spec:
serviceName: zookeeper-service-headless
replicas: 3
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- name: zookeeper
image: 192.168.218.48:5000/test/zookeeper-custom-image:0.0.30
ports:
- containerPort: 9092
name: server
volumeMounts:
- name: zookeeper-cluster-pvc
mountPath: /opt/zookeeper/servers
- name: zookeeper-config-map
mountPath: /config/
volumes:
- name: zookeeper-cluster-pvc
persistentVolumeClaim:
claimName: zookeeper-cluster-pvc
- name: zookeeper-config-map
configMap:
name: zookeeper-config-map
imagePullSecrets:
- name: registry-key
kafka
• step 1. 建立kafka image
start-kafka.sh
#!/bin/sh
export KAFKA_BROKER_ID=${HOSTNAME##*-}
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${POD_IP}:9092
export KAFKA_LOG_DIRS="$KAFKA_HOME/kafka-log/kafka-$KAFKA_BROKER_ID/logs"
echo "KAFKA_BROKER_ID = $KAFKA_BROKER_ID"
echo "KAFKA_ADVERTISED_LISTENERS = $KAFKA_ADVERTISED_LISTENERS"
echo "KAFKA_LOG_DIRS = $KAFKA_LOG_DIRS"
cp /config/* $KAFKA_HOME/config/
mkdir -p "$KAFKA_LOG_DIRS"
echo "log.dirs=$KAFKA_LOG_DIRS" >> "$KAFKA_HOME/config/server.properties"
sh "$KAFKA_HOME/bin/kafka-server-start.sh" "$KAFKA_HOME/config/server.properties"
Dockerfile
FROM centos:centos8
ARG kafka_version=2.8.0
ARG scala_version=2.13
ENV KAFKA_VERSION=$kafka_version \
SCALA_VERSION=$scala_version \
KAFKA_HOME=/opt/kafka
ENV PATH=${PATH}:${KAFKA_HOME}/bin/
ADD start-kafka.sh /tmp/
RUN yum install -y java-1.8.0-openjdk wget
RUN wget -O /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz https://downloads.apache.org/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
RUN tar -zxvf /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /tmp/ \
&& mv /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION} /opt/kafka \
&& chmod a+x /tmp/*.sh \
&& mv /tmp/start-kafka.sh /usr/bin
ENTRYPOINT ["start-kafka.sh"]
• step 2. 建立kafka headless service
kafka-service-headless.yaml
apiVersion: v1
kind: Service
metadata:
name: kafka-service-headless
labels:
app: kafka-service-headless
spec:
clusterIP: None
selector:
app: kafka
ports:
- port: 9092
name: server
• step 3. 建立kafka pvc
kafka-cluster-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: kafka-cluster-pvc
namespace: default
spec:
accessModes: [ "ReadWriteMany" ]
storageClassName: "kafka-storage-class"
resources:
requests:
storage: 15Gi
• step 4. 建立kafka pv
kafka-pv.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: kafka-pv
spec:
storageClassName: kafka-storage-class
capacity:
storage: 15Gi
accessModes:
- ReadWriteMany
nfs:
path: "/var/nfsshare/kafka"
server: 192.168.218.53
• step 5. 建立kafka configmap
kafka-config-map.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-config-map
namespace: default
data:
server.properties: |
zookeeper.connect=zookeeper-cluster-0.zookeeper-service-headless.default.svc.cluster.local:2181,\
zookeeper-cluster-1.zookeeper-service-headless.default.svc.cluster.local:2181,\
zookeeper-cluster-2.zookeeper-service-headless.default.svc.cluster.local:2181
• step 6. 建立kafka statefulset
kafka-broker.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka-broker
spec:
serviceName: kafka-service-headless
replicas: 3
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
image: 192.168.218.48:5000/test/kafka-custom-image:0.0.13
ports:
- containerPort: 9092
name: server
env:
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: kafka-cluster-pvc
mountPath: /opt/kafka/kafka-log
- name: kafka-config
mountPath: /config/
initContainers:
- name: copy-ro-scripts
image: busybox
command: ['sh', '-c', 'cp /etc/pre-install/* /config/']
volumeMounts:
- name: kafka-config
mountPath: /config/
- name: kafka-config-map
mountPath: /etc/pre-install
volumes:
- name: kafka-config
emptyDir: {}
- name: kafka-cluster-pvc
persistentVolumeClaim:
claimName: kafka-cluster-pvc
- name: kafka-config-map
configMap:
name: kafka-config-map
imagePullSecrets:
- name: registry-key
結果
測試一
創建test1 topic
kubectl exec -it kafka-broker-0 -- /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka-broker-1.kafka-service-headless.default.svc.cluster.local:9092 --topic test1 --create
使用broker0中的console producer
kubectl exec -it kafka-broker-0 -- /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server kafka-broker-1.kafka-service-headless.default.svc.cluster.local:9092 --topic test1
使用broker0中的console consumer
kubectl exec -it kafka-broker-0 -- /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka-broker-2.kafka-service-headless.default.svc.cluster.local:9092 --topic test1
創建topic test1
describe topic test1
在console producer中produce data
在console consumer中consume data
測試二
scale up kafka
kubectl scale sts kafka-broker --replicas=10
創建topic test
replication-factor: 5
partitions: 3
kubectl exec -it kafka-broker-0 -- /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka-broker-1.kafka-service-headless.default.svc.cluster.local:9092 --topic test --create --partitions 3 --replication-factor 5
建立一個測試的排程輸出資料到kafka
@Component
@EnableScheduling
public class TestSchedule {
@Autowired
private TestProducer testProducer;
@Scheduled(fixedRate = 1000)
public void produceData() {
testProducer.send(UUID.randomUUID().toString(), "test");
}
}
@Configuration
public class ProducerConfiguration {
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-0.kafka-service-headless.default.svc.cluster.local:9092,kafka-broker-1.kafka-service-headless.default.svc.cluster.local:9092,kafka-broker-2.kafka-service-headless.default.svc.cluster.local:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Service
public class TestProducer {
private static final Logger LOG = LoggerFactory.getLogger(TestProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String message, String topic){
LOG.info("sending message='{}' to topic='{}'", message, topic);
kafkaTemplate.send(topic, message);
}
}
建立一個consumer消費topic test
@Configuration
@EnableKafka
public class ConsumerConfiguration {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka-broker-0.kafka-service-headless.default.svc.cluster.local:9092," +
"kafka-broker-1.kafka-service-headless.default.svc.cluster.local:9092," +
"kafka-broker-2.kafka-service-headless.default.svc.cluster.local:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Service
public class TestConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(TestConsumer.class);
@KafkaListener(topics = "test")
public void listen(@Payload String message) {
LOGGER.info("received message='{}'", message);
}
}
將上述的producer和consumer部署到kubernetes
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: kafka-data-producer-deployment
name: kafka-data-producer-deployment
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: kafka-data-producer-deployment
template:
metadata:
labels:
app: kafka-data-producer-deployment
spec:
containers:
- name: kafka-data-producer-service
image: 192.168.218.48:5000/test/kafka-producer:0.0.1
imagePullPolicy: Always
imagePullSecrets:
- name: registry-key
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: kafka-data-consumer-deployment
name: kafka-data-consumer-deployment
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: kafka-data-consumer-deployment
template:
metadata:
labels:
app: kafka-data-consumer-deployment
spec:
containers:
- name: kafka-data-consumer-service
image: 192.168.218.48:5000/test/kafka-consumer:0.0.1
imagePullPolicy: Always
imagePullSecrets:
- name: registry-key
查詢producer產出的log
kubectl logs -f kafka-data-producer-deployment-5dfcb5d7d7-4smhv
查詢consumer產出的log
kubectl logs -f kafka-data-consumer-deployment-7b86b4b7c6-rkq9x
測試三
刪除其中一個kafka broker
kubectl delete pod kafka-broker-0
pod自動恢復
也可正常生產、消費資料