package org.apache.flink.statefun.docs.io.kafka;
import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
public class IngressSpecs {
public static final IngressIdentifier<User> ID =
new IngressIdentifier<>(User.class, "example", "input-ingress");
public static final IngressSpec<User> kafkaIngress =
KafkaIngressBuilder.forIdentifier(ID)
.withKafkaAddress("localhost:9092")
.withConsumerGroupId("greetings")
.withTopic("my-topic")
.withDeserializer(UserDeserializer.class)
.withStartupPosition(KafkaIngressStartupPosition.fromLatest())
.build();
}
package org.apache.flink.statefun.docs.io.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UserDeserializer implements KafkaIngressDeserializer<User> {
private static Logger LOG = LoggerFactory.getLogger(UserDeserializer.class);
private final ObjectMapper mapper = new ObjectMapper();
@Override
public User deserialize(ConsumerRecord<byte[], byte[]> input) {
try {
return mapper.readValue(input.value(), User.class);
} catch (IOException e) {
LOG.debug("Failed to deserialize record", e);
return null;
}
}
}
Kafka出口规范
KafkaEgressBuilder声明用于将数据写出到Kafka集群的出口规范。
它接受以下参数:
与此出口关联的出口标识符
引导服务器的地址
KafkaEgressSerializer序列化数据到Kafka(仅适用于Java的)
容错语义
Kafka Producer的属性
package org.apache.flink.statefun.docs.io.kafka;
import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
public class EgressSpecs {
public static final EgressIdentifier<User> ID =
new EgressIdentifier<>("example", "output-egress", User.class);
public static final EgressSpec<User> kafkaEgress =
KafkaEgressBuilder.forIdentifier(ID)
.withKafkaAddress("localhost:9092")
.withSerializer(UserSerializer.class)
.build();
}