Apache Kafka

有状态功能提供了一个Apache Kafka I/O模块,用于读取和写入Kafka Topic。它基于Apache Flink的通用Kafka连接器,arrow-up-right并提供一次精确的处理语义。Kafka I/O模块可以用Yaml或Java配置。

依赖项

要在Java中使用Kafka I/O模块,请在pom中加入以下依赖项。

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>statefun-kafka-io</artifactId>
	<version>2.0.0</version>
	<scope>provided</scope>
</dependency>

Kafka入口规范

KafkaIngressSpec声明了使用Kafka集群的入口规范。

它接受以下参数:

  1. 与该入口相关的入口标识符

  2. Topic名称/Topic名称列表

  3. 引导服务器的地址

  4. 要使用的消费者组ID

  5. 用于从Kafka反序列化数据的KafkaIngressDeserializer(仅限Java)

  6. 开始消费的位置

入口还接受Propertie,使用KafkaIngressBuilder与属性(properties)直接配置Kafka客户端。有关可用属性的完整列表,请参阅Kafka消费者配置文档。请注意,使用命名方法(如KafkaIngressBuilder#withConsumerGroupId(String))传递的配置将具有更高的优先级,并覆盖所提供属性中各自的设置。

启动位置

入口允许将启动位置配置为以下之一:

组偏移量(默认)

从指定消费者组提交给Kafka的偏移量开始。

Earlist

从最早的偏移量开始

Latest

从最新的偏移量开始。

指定偏移量

从特定的偏移量开始,该偏移量定义为分区到目标起始偏移量的映射。

Date

从提取时间大于或等于指定日期的偏移量开始。

启动时,如果分区的指定启动偏移量超出范围或不存在(如果将入口配置为从组偏移量,特定偏移量或日期开始,则可能是这种情况)将回退到使用所配置的位置KafkaIngressBuilder#withAutoOffsetResetPosition(KafkaIngressAutoResetPosition)。默认情况下,此位置设置为最新位置。

Kafka 反序列化器

使用Java api时,Kafka入口需要知道如何将Kafka中的二进制数据转换为Java对象。在KafkaIngressDeserializer允许用户指定这样的一个模式。每条Kafka消息都会调用T deserialize(ConsumerRecord<byte[], byte[]> record)方法,并从Kafka传递键,值和元数据。

Kafka出口规范

KafkaEgressBuilder声明用于将数据写出到Kafka集群的出口规范。

它接受以下参数:

  1. 与此出口关联的出口标识符

  2. 引导服务器的地址

  3. KafkaEgressSerializer序列化数据到Kafka(仅适用于Java的)

  4. 容错语义

  5. Kafka Producer的属性

请参阅Kafka Producer配置arrow-up-right文档以获取可用属性的完整列表。

Kafka出口和容错

启用容错功能后,Kafka出口可以提供准确的一次交付保证。可以选择三种不同的操作模式。

None

没有任何保证,产生的记录可能会丢失或重复。

至少一次

有状态函数将确保不会丢失任何记录,但可以重复记录。

恰好一次

有状态函数使用Kafka事务提供一次精确的语义。

Kafka序列化器

使用Java api时,Kafka出口需要知道如何将Java对象转换为二进制数据。在KafkaEgressSerializer允许用户指定这样的一个模式。ProducerRecord<byte[], byte[]> serialize(T out)为每条消息调用该方法,从而允许用户设置键,值和其他元数据。

Last updated

Was this helpful?