Spring-Kafka ErrorHandlingDeserializer Windows OS Apache Kafka

Spring 整合 Apache Kafka 處理事件流

吳沛芸 Peiiun Wu 2021/09/16 09:59:53
12626

 

前言

Apache Kafka 是一款用來高效處理事件串流、分析和整合工作的開源分散式事件流平台(open-source distributed event streaming platform),在具有高擴展性、快速、容錯性與安全性的方式下提供三項能力 ─ 發佈、訂閱事件流、儲存事件流以及處理事件流的能力。

Kafka 的高擴展性的設計關鍵,在於生產者與消費者是完全解耦、對彼此不可知,生產者不用等待消費者,而且 Kafka 本身有許多機制確保事件只會被消費一次。

 

 

概念與名詞解釋

kafka 事件流程

Event:事件,指的是當有什麼事情發生時所保留下的紀錄或訊息,當在對 Kafka 做讀寫時,就是以事件的形式來做這些事情。一個事件包含鍵(key)、值(value)、時間戳章(timestamp),或是可選的元資料頭( metadata headers )

Producers :生產者,指的是向 Kafka 發佈事件的應用程式。

Consumers :消費者,指的是向 Kafka 訂閱事件的應用程式。

Topic:主題,事件被組織化、持久的儲存在主題中。一個主題可以由零個到多個生產者和消費者去發佈或訂閱它。主題中的事件是隨時可以被讀取的,事件被消費後不被刪除。使用者可以透過配置設定每個主題中的事件可以保留多長的時間,而 Kafka 的性能相對於數據的大小是恆定的,因此長時間的儲存數據是完全沒問題的。

Partition:分區,主題分佈在位於不同 Kafka 代理的多個存儲空間中。數據的分佈式放置對於可擴展性非常重要,因為它讓客戶端應用程式同時向多個代理讀取和寫入數據。當一個新的事件發佈到主題中,它實際上被附加到某個主題的分區之一裡,Kafka 確保訂閱此分區的消費者在讀取的順序與事件寫入的順序是完全相同的。

Offset:偏移量,在分區中的每個訊息都有的、一個遞增的 id。

Broker & Cluster:broker 指運行 Kafka 的機器,而 cluster 則是指多個 broker 組成的叢集。

Replicas:備份,為了使數據有容錯性與高可用,每個主題都能夠備份,甚至可以跨越地理區域或數據中心備份。

Leader & ISR:Leader broker 主要負責對於特定分區的讀寫,而 in-sync-replicas (ISR) 表示同步中的 broker;若 Leader broker 無法運作,zookeeper 會指派其中一個 ISR 來替代作為 Leader broker。

 

舉例,假設叢集中有三台 Broker,創建了一個 Topic test-relicas 並指定有 2 個 Replicas、2 個 Partition,來看這個 Topic 的描述:

  • partition 0 的 Leader 是 broker 0,2個備份和同步中的 broker 分別為 broker 0  broker 1
  • partition 1 的 Leader 是 broker 2,2個備份和同步中的 broker 分別為 broker 2  broker 0

想像這個 Topic 在叢集中會長成這樣:

了解了 Kafka 基本的相關知識,相信在後續的實作中會比較容易理解。

 

 

啟動 Kafka 環境

下載 Apache Kafka ,這裡使用的版本為 kafka_2.13-2.8.0,執行環境需為 Java 8 以上版本。

※ Kafka 所有的執行檔放置於 bin 資料夾下,由於筆者環境是 Windows 系統,執行的是 bin/windows 路徑下的 bat 檔。

※ 使用 Windows Terminal (Preview) 開啟終端機,方便開多個分頁輸入指令。

 

Step 1:在一個 Terminal 啟動 Zookeeper,成功啟動看到 Zookeeper 的位址。

# Start the ZooKeeper service
> ./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties

 

Step 2:在另一個 Terminal 啟動 Kafka Server,成功啟動會看到 Kafka Server 的位址 localhost:9092、broker.id=0

# Start the Kafka server
./bin/windows/kafka-server-start.bat ./config/server.properties

 

這兩個步驟完成後,系統中就有 Kafka 在運行了。接下來簡單測試一下 Kafka 的運作方式:

新增 Topic test

# Create Topic [test]
./bin/windows/kafka-topics.bat --create --bootstrap-server localhost:9092 --topic test --replication-factor 1 --partitions 1

 

列出 Topic test 詳情

# Describe Topic [test]
./bin/windows/kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic test

 

測試發佈與訂閱

在 Terminal 分頁各別執行 Producer 和 Consumer 發佈和訂閱 Topic test,一切就緒後在 Producer 的 Terminal 輸入訊息送出,Consumer 的 Terminal 會自動出現該訊息。

# Publish to Topic [test]
./bin/windows/kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test
# Consumer Topic [test]
./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test

 

 

Spring for Apache Kafka 實作

Spring for Apache Kafka 是 Spring 封裝 kafka-client 的項目,將事件流傳遞快速整合入Sping 項目的解決方案。

 

Producer Project

建立一個 Producer 的專案包負責發佈事件到 Kafka Server。

 

Dependencies

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.kafka</groupId>
		<artifactId>spring-kafka</artifactId>
	</dependency>
</dependencies>

 

Configuration

package com.example.demo;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

@Configuration
public class KafkaConfig {

    public static final String JSON_TOPIC = "json";
    public static final String DEFAULT_SERVER = "127.0.0.1:9092";

    @Bean
    public ProducerFactory<String, UserVo> producerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_SERVER);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, UserVo> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public NewTopic defaultTopic() {
        return TopicBuilder.name(JSON_TOPIC).build();
    }
}

● producerFactory:註冊一個 Kafka Producer 實體用來產生訊息,這裡設定了Kafka Server 的位址在本機端的 9092 port 、Key 和 Value 的序列化方式。

● kafkaTemplate:註冊一個 KafkaTemplate,他包裝了 Producer 實體並且提供我們一些便利的方法來發送訊息到 Topic。

● defaultTopic:Spring Boot 會自動註冊 KafkaAdmin,KafkaAdmin 會自動將我們設定的 NewTopic Bean 新增到 Kafka Server 的 Topic。

 

Controller

package com.example.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("kafka")
public class ProducerCtrl {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProducerCtrl.class);

    @Autowired
    private KafkaTemplate<String, UserVo> kafkaTemplate;

    @GetMapping("/publish/{dpt}/{id}")
    public String post(@PathVariable("dpt") final String dpt, @PathVariable("id") final String id) {

        UserVo userVo = new UserVo(id, dpt);

        kafkaTemplate.send(KafkaConfig.JSON_TOPIC, userVo);


        return "Published done";
    }
}

 

UserVo

package com.example.demo;

public class UserVo {

    private String id;
    private String dpt;

    public UserVo(String id, String dpt) {
        super();
        this.id = id;
        this.dpt = dpt;
    }

    public String getDpt() {
        return dpt;
    }

    public void setDpt(String dpt) {
        this.dpt = dpt;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "UserVo [id=" + id + ", dpt=" + dpt + "]";
    }

}

 

測試 Producer Project

當 Producer 專案啟動後,我們先檢查 Topic 是否有被自動註冊到 Kafka Server ─ 這裡顯示先前用 command line 建置的 Topic test 和專案建置的 Topic json。

# List all topics
./bin/windows/kafka-topics.bat --list --bootstrap-server localhost:9092

接著用 Terminal 開啟一個 Consumer 訂閱 Topic json。

# Consume topic [json]
./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic json

呼叫 Publish API 後,可以看到 Terminal 的 Consumer 也收到了訊息,到這裡 Producer 的部分就完成了。

http://localhost:8081/kafka/publish/hr/h01

 

 

Consumer Project

建立一個 Consumer 的專案包用來訂閱 Kafka Server 的事件 。

 

Configuration

package com.example.demo;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@EnableKafka
@Configuration
public class KafkaConfig {

    public static final String TEST_TOPIC = "test";
    public static final String JSON_TOPIC = "json";
    public static final String DEFAULT_SERVER = "127.0.0.1:9092";
    public static final String GROUP_1 = "group_1";
    public static final String GROUP_2 = "group_2";
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConfig.class);


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_SERVER);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_1);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, UserVo> userConsumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_SERVER);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_2);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, UserVo> userKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, UserVo> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(userConsumerFactory());
        return factory;
    }

}

● consumerFactory()、userConsumerFactory():註冊一個 Kafka Consumer 實體用來訂閱訊息,這裡設定了Kafka Server 的位址在本機端的 9092 port 、Key 和 Value 的反序列化方式。

● kafkaListenerContainerFactory()、userKafkaListenerFactory():註冊一個 Container 給有註解 @KafkaListener 的方法。

● @EnableKafka:必須在設定檔上加此註解,用來啟用偵測 @KafkaListener 的方法

 

Consumer

package com.example.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(topics = KafkaConfig.TEST_TOPIC, groupId = KafkaConfig.GROUP_1)
    public void consume(String message) {
        LOGGER.info("Consumed message: {} ", message);
    }


    @KafkaListener(topics = KafkaConfig.JSON_TOPIC, groupId = KafkaConfig.GROUP_2,
            containerFactory = "userKafkaListenerFactory")
    public void consumeJson(UserVo user) throws InterruptedException {
        LOGGER.info("Consumed JSON Message: {} ", user);
    }
}

這裡簡單定義了兩個 KafkaListener,分別是訂閱 Topic test 和 Topic json 。

 

UserVo

public class UserVo {

    private String id;
    private String dpt;

    public UserVo() {}

    public String getDpt() {
        return dpt;
    }

    public void setDpt(String dpt) {
        this.dpt = dpt;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "UserVo [id=" + id + ", dpt=" + dpt + "]";
    }

}

 

測試 Consumer Project

當 Consumer 專案啟動後,使用 Terminal 測試發送訊息到 Topic test,Consumer Project 成功接收。

再來使用先前建立的 publish API 測試發送訊息到 Topic json,Consumer Project 成功接收。

http://localhost:8081/kafka/publish/sa/s01

 

小結

到目前為止,我們使用 Terminal 啟動 Kafka、新增主題到發佈和訂閱主題,對 Kafka 的運作方式以及操作有初步的了解後,接著使用 Spring-Kafka 將 Kafka 整合進 Spring Boot 專案,完成了訊息發佈和接收。接著來看看在實作中遇到的議題 ...

 

 

 

毒藥訊息 Poison Pill

在實作的過程中,可以發現 Configuration 指定了 KEY 和 VALUE 的型態 ─ 發佈訊息時序列化以及訂閱訊息時反序列化。Producer 發佈訊息時 Java Object 轉換成 byte 陣列傳輸到 Kafka Server,Kafka Server 只負責儲存以及分送訊息到指定的主題分區, Consumer 訂閱時將 byte 陣列反序列化回 Java Object 。當 Producer 序列化型態與 Consumer 在反序列化型態無法匹配時,毒藥訊息的情境就產生了。

例如 Producer 將文字訊息傳送給訂閱 JSON 訊息的 Consumer 時, 會發生:

  1. Consumer 反序列化異常拋出錯誤。
  2. 由於 Consumer offset 沒有往前,訊息被阻塞。
  3. Consumer 會持續的嘗試反序列化毒藥訊息然後失敗,程式就像進入了一個無窮迴圈,不但無法繼續訂閱訊息且LOG 記錄被大量的、重複的錯誤訊息淹沒,最終導致儲存空間用盡。

 

使用 ErrorHandlingDeserializer

當在 Consumer 配置了 ErrorHandlingDeserializer,收到毒藥訊息時會回傳 null 並增加 DeserializationException 在 ConsumerRecord 的 header,ConsumerRecord 會呼叫 container 的 ErrorHandler 來處理,這筆毒藥訊息就不會過到 @KafkaListener 的方法,遏止死循環的產生。

以下改寫 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ,原本直接使用的 JsonDeserializer,現在再多包一層 ErrorHandlingDeserializer 當作 VALUE_DESERIALIZER。

Configuration

@Bean
public ConsumerFactory<String, UserVo> userConsumerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_SERVER);
    config.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_2); 

    // use ErrorHandlingDeserializer
    ErrorHandlingDeserializer<UserVo> errorHandlingDeserializer =
            new ErrorHandlingDeserializer<>(new JsonDeserializer<>(UserVo.class));
    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
            errorHandlingDeserializer);
}

另外這邊多配置 org.springframework.kafka.listener.LoggingErrorHandler 取代 container 預設的 SeekToCurrentErrorHandler 用來印錯誤訊息。

@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserVo> userKafkaListenerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, UserVo> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(userConsumerFactory());
    // add ErrorHandler
    factory.setErrorHandler(errorHandler()); 
    return factory;
}

@Bean
public LoggingErrorHandler errorHandler() {
    return new LoggingErrorHandler();
}

 

測試發送毒藥訊息

加了 ErrorHandlingDeserializer 後重新執行 Consumer Project,這裡使用 Terminal 將文字送到訂閱 JSON訊息的 Topic json,可以看到 LOG 只印了一次錯誤訊息、不再陷入死循環。

以上完成配置 ErrorHandlingDeserializer 來確保毒藥訊息可以被處理、 Consumer Offset 可以繼續移動不受阻,確保 Consumer 可以持續訂閱後續的訊息並且搭配 LoggingErrorHandler 留下 LOG 紀錄

 

 

發佈訊息的後續處理

如果希望在 Producer 發佈訊息時多做一些處理,比如說寫進 Database 或是印自訂的 LOG 等等,可以接 send 方法回傳的 ListenableFuture<SendResult>,利用非同步的方式做後續處理。改寫 Producer 發佈訊息的方法為下:

package com.example.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("kafka")
public class ProducerCtrl {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProducerCtrl.class);

    @Autowired
    private KafkaTemplate<String, UserVo> kafkaTemplate;

    @GetMapping("/publish/{dpt}/{id}")
    public String post(@PathVariable("dpt") final String dpt, @PathVariable("id") final String id) {

        UserVo userVo = new UserVo(id, dpt);

        ListenableFuture<SendResult<String, UserVo>> future =
                kafkaTemplate.send(KafkaConfig.JSON_TOPIC, userVo);

        future.addCallback(new KafkaSendCallback<String, UserVo>() {

            @Override
            public void onSuccess(SendResult<String, UserVo> result) {
                LOGGER.info("success send message:{} with offset:{} ", userVo,
                        result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(KafkaProducerException ex) {
                LOGGER.error("fail send message! Do somthing....");
            }
        });

        return "Published done";
    }
}

 

測試 callBack method

http://localhost:8081/kafka/publish/sa/s01

訊息發送成功 LOG:

為了測試 onFailure 方法,這裡將 timeout 時間設定非常短來觸發 TimeoutException:

@Bean
public ProducerFactory<String, UserVo> producerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_SERVER);
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    // for test onFailure method
    config.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 1);
    config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);

    return new DefaultKafkaProducerFactory<>(config);
}

訊息發送失敗 LOG:

如此一來,Producer 發送訊息成功或失敗、要做什麼處理都可以很明確的表示,使程式增加更多彈性。

 

 

Retry 機制

假設 Consumer 訂閱訊息後需要呼叫其他下游系統,當下游系統正在忙碌中沒有回應,你又想要多重試個幾次呢?

Spring-Kafka 提供 AbstractKafkaListenerContainerFactory 來配置  spring-retry template,在 retry template 中可以設定要重試幾次以及每次重試間隔的時間多久。

Configuration

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());    
    factory.setRetryTemplate(kafkaRetry());
    factory.setRecoveryCallback(retryContext -> {
        ConsumerRecord<String, String> consumerRecord =
                (ConsumerRecord) retryContext.getAttribute("record");
        LOGGER.info("Recovery is called for message: {} ", consumerRecord.value());
        return Optional.empty();
    });
    return factory;
}

@Bean
publci RetryTemplate kafkaRetry() {
    RetryTemplate retryTemplate = new RetryTemplate();

    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    fixedBackOffPolicy.setBackOffPeriod(5 * 1000l);
    retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(3);
    retryTemplate.setRetryPolicy(retryPolicy);
    return retryTemplate;
}

這裡 retry template 配置了重試 3 次、每次間隔 5 秒。如果下游系統在 retry 的期間恢復運作,那訊息就可以繼續傳遞。

另外還有設置了 RecoveryCallback,當 retry 都結束了就會觸發,以便做後續處理。

 

何時觸發 Retry 與 Retry 期間訊息的狀態

當 Consumer 在處理訊息中拋出例外錯誤就會觸發 Retry 機制,在 Retry 期間 Consumer 的 thread 會暫停接收訊息直到 Retry 結束。為了方便測試,這裡設定如果文字訊息中含有「retry」這個字就拋出例外錯誤。

@KafkaListener(topics = KafkaConfig.TEST_TOPIC, groupId = KafkaConfig.GROUP_1)
public void consume(String message) {
    LOGGER.info("Consumed message: {} ", message);
    LOGGER.info("Call other API...... ");
    if (message.contains("retry")) {
        LOGGER.info("Incompatible message {} ", message);
        throw new RuntimeException("Incompatible message " + message);
    }
    LOGGER.info("Consumed done. ");
}

 

測試 Retry

使用 Terminal 發送三次訊息到 Topic test,可以看到 Console 中收到 「retry」時拋出例外錯誤觸發了 Retry 機制,直到 retry 結束後才繼續訂閱下個訊息。

./bin/windows/kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test

 

總結

以上我們從了解 Kafka 運作過程開始,接著將使用 Spring-Kafka 將 Kafka 加入專案使用,完成初步的 Producer 和 Consumer 功能,接著是針對一些配置上的調整,包括毒藥訊息、發佈訊息的後續處理以及Retry機制的建立,讓整個事件處理流程更加順暢。

 

 

References

Apache Kafka Tutorials

Apache Kafka Documentation

Spring-Kafka Docs

Intro to Apache Kafka with Spring

Spring for Apache Kafka – Beyond the Basics: Can Your Kafka Consumers Handle a Poison Pill?

Spring-kafka re-try with spring-retry

 

吳沛芸 Peiiun Wu