AWS Kinesis

有状态功能提供了一个AWS Kinesis I/O模块,用于读取和写入Kinesis流。它基于Apache Flink的Kinesis连接器。Kinesis I/O模块可以用Yaml或Java配置。

依赖项

要在Java中使用Kinesis I / O模块,请在pom中添加以下依赖项。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>statefun-kinesis-io</artifactId>
    <version>2.0.0</version>
    <scope>provided</scope>
</dependency>

Kinesis入口规范

KinesisIngressSpec声明要从Kinesis流中消费的入口规范。

它接受以下参数:

  1. AWS区域

  2. AWS凭证提供者

  3. KinesisIngressDeserializer,用于从Kinesis反序列化数据(仅限Java)

  4. 流开始位置

  5. Kinesis客户端的属性

  6. 要消费的流的名称

入口还接受使用KinesisIngressBuilder#withClientConfigurationProperty()来直接配置Kinesis客户端的属性。请参阅Kinesis 客户端配置文档以获取可用属性的完整列表。请注意,使用命名方法传递的配置将具有更高的优先级,并在提供的属性中覆盖它们各自的设置。

启动位置

入口允许将启动位置配置为以下之一:

最新-Latest (default)

最早-Earlist

从尽可能早的位置开始消费。

Date

从摄取时间大于或等于指定日期的偏移量开始。

Kinesis 反序列化器

Kinesis 入口需要知道如何将Kinesis中的二进制数据转换为Java对象。KinesisIngressDeserializer允许用户指定这样的模式。为每个Kinesis记录调用T deserialize(IngressRecord IngressRecord)方法,传递来自Kinesis的二进制数据和元数据。

Kinesis出口规格

KinesisEgressBuilder声明用于将数据写出到Kinesis流的出口规范。

它接受以下参数:

  1. 与此出口关联的出口标识符

  2. AWS凭证提供者

  3. 用于将数据序列化到Kinesis中的KinesisEgressSerializer(仅限Java)

  4. AWS区域

  5. Kinesis客户端的属性

  6. 施加背压前的最大未完成记录数

请参阅Kinesis Producer默认配置属性文档以获取可用属性的完整列表。

Kinesis序列化器

Kinesis出口需要知道如何将Java对象转换为二进制数据。KinesisEgressSerializer允许用户指定这样的一个模式。为每条消息调用EgressRecord serialize(T value)方法,从而允许用户设置值和其他元数据。

AWS 区域

Kinesis入口和出口都可以配置到特定的AWS区域。

默认提供商链 (default)

咨询AWS的默认提供商链,以确定AWS区域。

指定

使用区域的唯一ID指定一个AWS区域。

自定义端点

通过非标准的AWS服务终端节点连接到AWS区域。通常仅用于开发和测试目的。

AWS 凭证

Kinesis入口和出口都可以使用标准AWS凭证提供程序进行配置。

默认提供者链(默认)

请咨询AWS的默认提供商链,以确定AWS凭证。

Basic

直接使用提供的访问密钥ID和密钥字符串指定AWS凭证。

Profile

使用AWS配置配置文件以及配置文件的配置路径来指定AWS凭证。

Last updated

Was this helpful?