Apache Kafka
有状态功能提供了一个Apache Kafka I/O模块,用于读取和写入Kafka Topic。它基于Apache Flink的通用Kafka连接器,并提供一次精确的处理语义。Kafka I/O模块可以用Yaml或Java配置。
依赖项
要在Java中使用Kafka I/O模块,请在pom中加入以下依赖项。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-kafka-io</artifactId>
<version>2.0.0</version>
<scope>provided</scope>
</dependency>Kafka入口规范
KafkaIngressSpec声明了使用Kafka集群的入口规范。
它接受以下参数:
与该入口相关的入口标识符
Topic名称/Topic名称列表
引导服务器的地址
要使用的消费者组ID
用于从Kafka反序列化数据的
KafkaIngressDeserializer(仅限Java)开始消费的位置
入口还接受Propertie,使用KafkaIngressBuilder与属性(properties)直接配置Kafka客户端。有关可用属性的完整列表,请参阅Kafka消费者配置文档。请注意,使用命名方法(如KafkaIngressBuilder#withConsumerGroupId(String))传递的配置将具有更高的优先级,并覆盖所提供属性中各自的设置。
启动位置
入口允许将启动位置配置为以下之一:
组偏移量(默认)
从指定消费者组提交给Kafka的偏移量开始。
Earlist
从最早的偏移量开始
Latest
从最新的偏移量开始。
指定偏移量
从特定的偏移量开始,该偏移量定义为分区到目标起始偏移量的映射。
Date
从提取时间大于或等于指定日期的偏移量开始。
启动时,如果分区的指定启动偏移量超出范围或不存在(如果将入口配置为从组偏移量,特定偏移量或日期开始,则可能是这种情况)将回退到使用所配置的位置KafkaIngressBuilder#withAutoOffsetResetPosition(KafkaIngressAutoResetPosition)。默认情况下,此位置设置为最新位置。
Kafka 反序列化器
使用Java api时,Kafka入口需要知道如何将Kafka中的二进制数据转换为Java对象。在KafkaIngressDeserializer允许用户指定这样的一个模式。每条Kafka消息都会调用T deserialize(ConsumerRecord<byte[], byte[]> record)方法,并从Kafka传递键,值和元数据。
Kafka出口规范
KafkaEgressBuilder声明用于将数据写出到Kafka集群的出口规范。
它接受以下参数:
与此出口关联的出口标识符
引导服务器的地址
KafkaEgressSerializer序列化数据到Kafka(仅适用于Java的)容错语义
Kafka Producer的属性
请参阅Kafka Producer配置文档以获取可用属性的完整列表。
Kafka出口和容错
启用容错功能后,Kafka出口可以提供准确的一次交付保证。可以选择三种不同的操作模式。
None
没有任何保证,产生的记录可能会丢失或重复。
至少一次
有状态函数将确保不会丢失任何记录,但可以重复记录。
恰好一次
有状态函数使用Kafka事务提供一次精确的语义。
Kafka序列化器
使用Java api时,Kafka出口需要知道如何将Java对象转换为二进制数据。在KafkaEgressSerializer允许用户指定这样的一个模式。ProducerRecord<byte[], byte[]> serialize(T out)为每条消息调用该方法,从而允许用户设置键,值和其他元数据。
Last updated
Was this helpful?