Apache Kafka
有状态功能提供了一个Apache Kafka I/O模块,用于读取和写入Kafka Topic。它基于Apache Flink的通用Kafka连接器,并提供一次精确的处理语义。Kafka I/O模块可以用Yaml或Java配置。
依赖项
要在Java中使用Kafka I/O模块,请在pom中加入以下依赖项。
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>statefun-kafka-io</artifactId>
	<version>2.0.0</version>
	<scope>provided</scope>
</dependency>Kafka入口规范
 KafkaIngressSpec声明了使用Kafka集群的入口规范。
它接受以下参数:
- 与该入口相关的入口标识符 
- Topic名称/Topic名称列表 
- 引导服务器的地址 
- 要使用的消费者组ID 
- 用于从Kafka反序列化数据的 - KafkaIngressDeserializer(仅限Java)
- 开始消费的位置 
package org.apache.flink.statefun.docs.io.kafka;
import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
public class IngressSpecs {
  public static final IngressIdentifier<User> ID =
      new IngressIdentifier<>(User.class, "example", "input-ingress");
  public static final IngressSpec<User> kafkaIngress =
      KafkaIngressBuilder.forIdentifier(ID)
          .withKafkaAddress("localhost:9092")
          .withConsumerGroupId("greetings")
          .withTopic("my-topic")
          .withDeserializer(UserDeserializer.class)
          .withStartupPosition(KafkaIngressStartupPosition.fromLatest())
          .build();
}version: "1.0"
module:
    meta:
    type: remote
spec:
    ingresses:
    - ingress:
        meta:
            type: statefun.kafka.io/routable-protobuf-ingress
            id: example/user-ingress
        spec:
            address: kafka-broker:9092
            consumerGroupId: routable-kafka-e2e
            startupPosition:
                type: earliest
            topics:
              - topic: messages-1
                typeUrl: org.apache.flink.statefun.docs.models.User
                targets:
                  - example-namespace/my-function-1
                  - example-namespace/my-function-2入口还接受Propertie,使用KafkaIngressBuilder与属性(properties)直接配置Kafka客户端。有关可用属性的完整列表,请参阅Kafka消费者配置文档。请注意,使用命名方法(如KafkaIngressBuilder#withConsumerGroupId(String))传递的配置将具有更高的优先级,并覆盖所提供属性中各自的设置。
启动位置
入口允许将启动位置配置为以下之一:
组偏移量(默认)
从指定消费者组提交给Kafka的偏移量开始。
KafkaIngressStartupPosition#fromGroupOffsets();startupPosition:
    type: group-offsetsEarlist
从最早的偏移量开始
KafkaIngressStartupPosition#fromEarliest();startupPosition:
    type: earliestLatest
从最新的偏移量开始。
KafkaIngressStartupPosition#fromLatest();startupPosition:
    type: latest指定偏移量
从特定的偏移量开始,该偏移量定义为分区到目标起始偏移量的映射。
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.add(new TopicPartition("user-topic", 0), 91);
offsets.add(new TopicPartition("user-topic", 11), 11);
offsets.add(new TopicPartition("user-topic", 8), 8);
KafkaIngressStartupPosition#fromSpecificOffsets(offsets);startupPosition:
    type: specific-offsets
    offsets:
        - user-topic/0: 91
        - user-topic/1: 11
        - user-topic/2: 8Date
从提取时间大于或等于指定日期的偏移量开始。
KafkaIngressStartupPosition#fromDate(ZonedDateTime.now());startupPosition:
    type: date
    date: 2020-02-01 04:15:00.00 Z 启动时,如果分区的指定启动偏移量超出范围或不存在(如果将入口配置为从组偏移量,特定偏移量或日期开始,则可能是这种情况)将回退到使用所配置的位置KafkaIngressBuilder#withAutoOffsetResetPosition(KafkaIngressAutoResetPosition)。默认情况下,此位置设置为最新位置。
Kafka 反序列化器
 使用Java api时,Kafka入口需要知道如何将Kafka中的二进制数据转换为Java对象。在KafkaIngressDeserializer允许用户指定这样的一个模式。每条Kafka消息都会调用T deserialize(ConsumerRecord<byte[], byte[]> record)方法,并从Kafka传递键,值和元数据。
package org.apache.flink.statefun.docs.io.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UserDeserializer implements KafkaIngressDeserializer<User> {
	private static Logger LOG = LoggerFactory.getLogger(UserDeserializer.class);
	private final ObjectMapper mapper = new ObjectMapper();
	@Override
	public User deserialize(ConsumerRecord<byte[], byte[]> input) {
		try {
			return mapper.readValue(input.value(), User.class);
		} catch (IOException e) {
			LOG.debug("Failed to deserialize record", e);
			return null;
		}
	}
}Kafka出口规范
KafkaEgressBuilder声明用于将数据写出到Kafka集群的出口规范。
它接受以下参数:
- 与此出口关联的出口标识符 
- 引导服务器的地址 
- KafkaEgressSerializer序列化数据到Kafka(仅适用于Java的)
- 容错语义 
- Kafka Producer的属性 
package org.apache.flink.statefun.docs.io.kafka;
import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
public class EgressSpecs {
  public static final EgressIdentifier<User> ID =
      new EgressIdentifier<>("example", "output-egress", User.class);
  public static final EgressSpec<User> kafkaEgress =
      KafkaEgressBuilder.forIdentifier(ID)
          .withKafkaAddress("localhost:9092")
          .withSerializer(UserSerializer.class)
          .build();
}version: "1.0"
module:
    meta:
    type: remote
spec:
    egresses:
      - egress:
          meta:
            type: statefun.kafka.io/generic-egress
            id: example/output-messages
          spec:
            address: kafka-broker:9092
            deliverySemantic:
              type: exactly-once
              transactionTimeoutMillis: 100000
            properties:
              - foo.config: bar请参阅Kafka Producer配置文档以获取可用属性的完整列表。
Kafka出口和容错
启用容错功能后,Kafka出口可以提供准确的一次交付保证。可以选择三种不同的操作模式。
None
没有任何保证,产生的记录可能会丢失或重复。
KafkaEgressBuilder#withNoProducerSemantics();deliverySemantic:
    type: none至少一次
有状态函数将确保不会丢失任何记录,但可以重复记录。
KafkaEgressBuilder#withAtLeastOnceProducerSemantics();deliverySemantic:
    type: at-least-once恰好一次
有状态函数使用Kafka事务提供一次精确的语义。
KafkaEgressBuilder#withExactlyOnceProducerSemantics(Duration.minutes(15));deliverySemantic:
    type: exactly-once
    transactionTimeoutMillis: 900000 # 15 minKafka序列化器
 使用Java api时,Kafka出口需要知道如何将Java对象转换为二进制数据。在KafkaEgressSerializer允许用户指定这样的一个模式。ProducerRecord<byte[], byte[]> serialize(T out)为每条消息调用该方法,从而允许用户设置键,值和其他元数据。
package org.apache.flink.statefun.docs.io.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UserSerializer implements KafkaEgressSerializer<User> {
  private static final Logger LOG = LoggerFactory.getLogger(UserSerializer.class);
  private static final String TOPIC = "user-topic";
  private final ObjectMapper mapper = new ObjectMapper();
  @Override
  public ProducerRecord<byte[], byte[]> serialize(User user) {
    try {
      byte[] key = user.getUserId().getBytes();
      byte[] value = mapper.writeValueAsBytes(user);
      return new ProducerRecord<>(TOPIC, key, value);
    } catch (JsonProcessingException e) {
      LOG.info("Failed to serializer user", e);
      return null;
    }
  }
}Last updated
Was this helpful?
