Flink Stateful Functions中文文档
  • Home
  • Getting Started
    • 项目设置
    • Python演练
    • Java演练
  • 概念
    • 应用程序构建块
    • 逻辑函数
    • 分布式架构
  • 开发包
    • Java
    • Python
    • 模块(Module)
  • I/O模块
    • 总览
    • Apache Kafka
    • AWS Kinesis
    • Flink连接器
  • 部署与运营
    • 总览
    • 打包部署
    • 配置
    • 指标
    • 状态引导
Powered by GitBook
On this page
  • 状态引导函数
  • 创建一个保存点
  • 部署方式

Was this helpful?

  1. 部署与运营

状态引导

Previous指标

Last updated 5 years ago

Was this helpful?

通常情况下,应用程序需要文件、数据库或其他系统中的历史数据提供的初始状态。因为状态是由Apache Flink的快照机制管理的,对于有状态函数应用程序,这意味着将初始状态写入一个可用于启动作业的保存点。用户可以使用Flink的 和StatefulFunctionSavepointCreator来引导有状态函数应用程序的初始状态。

首先,在你的应用程序中需要包含以下库:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>statefun-flink-state-processor</artifactId>
  <version>2.0.0</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-state-processor-api_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

注意:保存点创建器当前仅支持初始化Java模块的状态。

状态引导函数

StateBootstrapFunction定义如何使用给定输入引导StatefulFunction实例的状态。

每个引导函数实例直接对应于一个StatefulFunction类型。同样,每个实例都由一个地址唯一标识,该地址由正在引导的函数的类型和ID表示。引导功能实例保留的任何状态将对StatefulFunction具有相同地址的相应活动实例可用。

例如,考虑以下状态引导程序功能:

public class MyStateBootstrapFunction implements StateBootstrapFunction {

	@Persisted
	private PersistedValue<MyState> state = PersistedValue.of("my-state", MyState.class);

	@Override
	public void bootstrap(Context context, Object input) {
		state.set(extractStateFromInput(input));
	}
 }

假设这个引导函数是为函数类型MyFunctionType提供的,并且引导函数实例的id是id-13。该函数使用给定的引导数据写入名称my-state的持久状态。在从使用这个引导函数生成的保存点恢复有状态函数应用程序之后,带有address (MyFunctionType, id-13)的有状态函数实例将已经在状态名my-state下具有可用的状态值。

创建一个保存点

int maxParallelism = 128;
StatefulFunctionsSavepointCreator newSavepoint = new StatefulFunctionsSavepointCreator(maxParallelism);
// Read data from a file, database, or other location
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

final DataSet<Tuple2<String, Integer>> userSeenCounts = env.fromElements(
	Tuple2.of("foo", 4), Tuple2.of("bar", 3), Tuple2.of("joe", 2));

// Register the dataset with a router
newSavepoint.withBootstrapData(userSeenCounts, MyStateBootstrapFunctionRouter::new);

// Register a bootstrap function to process the records
newSavepoint.withStateBootstrapFunctionProvider(
		new FunctionType("apache", "my-function"),
		ignored -> new MyStateBootstrapFunction());

newSavepoint.write("file:///savepoint/path/");

env.execute();

部署方式

创建新的savpepoint之后,可以使用它来为有状态功能应用程序提供初始状态。

version: "2.1"
services:
  master:
    image: my-statefun-application-image
    command: -s file:///savepoint/path

部署到Flink会话群集时,请在Flink CLI中指定savepoint参数。

$ ./bin/flink run -s file:///savepoint/path stateful-functions-job.jar

通过定义某些元数据(例如最大并行度和状态后端)来创建保存点。默认状态后端是。

每个输入数据集都通过在保存点创建中注册,该将每个记录路由到零个或多个功能实例。然后,可以向保存点创建者注册任意数量的函数类型,类似于在有状态函数模块中注册函数的方式。最后,为生成的保存点指定输出位置。

有关如何使用Flink DataSetAPI的完整详细信息,请查看官方。

在基于镜像进行部署时,请将-s命令传递给Flink 镜像。

State Processor API
RocksDB
路由器
器
路由器
文档
JobMaster