Flink Stateful Functions中文文档
  • Home
  • Getting Started
    • 项目设置
    • Python演练
    • Java演练
  • 概念
    • 应用程序构建块
    • 逻辑函数
    • 分布式架构
  • 开发包
    • Java
    • Python
    • 模块(Module)
  • I/O模块
    • 总览
    • Apache Kafka
    • AWS Kinesis
    • Flink连接器
  • 部署与运营
    • 总览
    • 打包部署
    • 配置
    • 指标
    • 状态引导
Powered by GitBook
On this page
  • 依赖项
  • Source规范
  • Sink规范

Was this helpful?

  1. I/O模块

Flink连接器

PreviousAWS KinesisNext部署与运营

Last updated 5 years ago

Was this helpful?

Source-Sink I/O模块允许插入尚未集成到专用I/O模块中的现有或自定义Flink连接器。有关如何构建自定义连接器的详细信息,请参见正式的。

依赖项

要使用自定义Flink连接器,请在pom中添加以下依赖项。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>statefun-flink-io</artifactId>
    <version>2.0.0</version>
    <scope>provided</scope>
</dependency>

Source规范

源函数规范从Flink源函数创建一个入口。

package org.apache.flink.statefun.docs.io.flink;

import java.util.Map;
import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.flink.io.datastream.SourceFunctionSpec;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;

public class ModuleWithSourceSpec implements StatefulFunctionModule {

    @Override
    public void configure(Map<String, String> globalConfiguration, Binder binder) {
        IngressIdentifier<User> id = new IngressIdentifier<>(User.class, "example", "users");
        IngressSpec<User> spec = new SourceFunctionSpec<>(id, new FlinkSource<>());
        binder.bindIngress(spec);
    }
}

Sink规范

Sink函数规范从Flink Sink函数创建出口。

package org.apache.flink.statefun.docs.io.flink;

import java.util.Map;
import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.flink.io.datastream.SinkFunctionSpec;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;

public class ModuleWithSinkSpec implements StatefulFunctionModule {

    @Override
    public void configure(Map<String, String> globalConfiguration, Binder binder) {
        EgressIdentifier<User> id = new EgressIdentifier<>("example", "user", User.class);
        EgressSpec<User> spec = new SinkFunctionSpec<>(id, new FlinkSink<>());
        binder.bindEgress(spec);
    }
}
Apache Flink文档