AWS Kinesis
有状态功能提供了一个AWS Kinesis I/O模块,用于读取和写入Kinesis流。它基于Apache Flink的Kinesis连接器。Kinesis I/O模块可以用Yaml或Java配置。
依赖项
要在Java中使用Kinesis I / O模块,请在pom中添加以下依赖项。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-kinesis-io</artifactId>
<version>2.0.0</version>
<scope>provided</scope>
</dependency>Kinesis入口规范
KinesisIngressSpec声明要从Kinesis流中消费的入口规范。
它接受以下参数:
AWS区域
AWS凭证提供者
KinesisIngressDeserializer,用于从Kinesis反序列化数据(仅限Java)
流开始位置
Kinesis客户端的属性
要消费的流的名称
入口还接受使用KinesisIngressBuilder#withClientConfigurationProperty()来直接配置Kinesis客户端的属性。请参阅Kinesis 客户端配置文档以获取可用属性的完整列表。请注意,使用命名方法传递的配置将具有更高的优先级,并在提供的属性中覆盖它们各自的设置。
启动位置
入口允许将启动位置配置为以下之一:
最新-Latest (default)
最早-Earlist
从尽可能早的位置开始消费。
Date
从摄取时间大于或等于指定日期的偏移量开始。
Kinesis 反序列化器
Kinesis 入口需要知道如何将Kinesis中的二进制数据转换为Java对象。KinesisIngressDeserializer允许用户指定这样的模式。为每个Kinesis记录调用T deserialize(IngressRecord IngressRecord)方法,传递来自Kinesis的二进制数据和元数据。
Kinesis出口规格
KinesisEgressBuilder声明用于将数据写出到Kinesis流的出口规范。
它接受以下参数:
与此出口关联的出口标识符
AWS凭证提供者
用于将数据序列化到Kinesis中的
KinesisEgressSerializer(仅限Java)AWS区域
Kinesis客户端的属性
施加背压前的最大未完成记录数
请参阅Kinesis Producer默认配置属性文档以获取可用属性的完整列表。
Kinesis序列化器
Kinesis出口需要知道如何将Java对象转换为二进制数据。KinesisEgressSerializer允许用户指定这样的一个模式。为每条消息调用EgressRecord serialize(T value)方法,从而允许用户设置值和其他元数据。
AWS 区域
Kinesis入口和出口都可以配置到特定的AWS区域。
默认提供商链 (default)
咨询AWS的默认提供商链,以确定AWS区域。
指定
使用区域的唯一ID指定一个AWS区域。
自定义端点
通过非标准的AWS服务终端节点连接到AWS区域。通常仅用于开发和测试目的。
AWS 凭证
Kinesis入口和出口都可以使用标准AWS凭证提供程序进行配置。
默认提供者链(默认)
请咨询AWS的默认提供商链,以确定AWS凭证。
Basic
直接使用提供的访问密钥ID和密钥字符串指定AWS凭证。
Profile
使用AWS配置配置文件以及配置文件的配置路径来指定AWS凭证。
Last updated
Was this helpful?