状态引导

通常情况下,应用程序需要文件、数据库或其他系统中的历史数据提供的初始状态。因为状态是由Apache Flink的快照机制管理的,对于有状态函数应用程序,这意味着将初始状态写入一个可用于启动作业的保存点。用户可以使用Flink的 State Processor API和StatefulFunctionSavepointCreator来引导有状态函数应用程序的初始状态。

首先,在你的应用程序中需要包含以下库:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>statefun-flink-state-processor</artifactId>
  <version>2.0.0</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-state-processor-api_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

注意:保存点创建器当前仅支持初始化Java模块的状态。

状态引导函数

StateBootstrapFunction定义如何使用给定输入引导StatefulFunction实例的状态。

每个引导函数实例直接对应于一个StatefulFunction类型。同样,每个实例都由一个地址唯一标识,该地址由正在引导的函数的类型和ID表示。引导功能实例保留的任何状态将对StatefulFunction具有相同地址的相应活动实例可用。

例如,考虑以下状态引导程序功能:

public class MyStateBootstrapFunction implements StateBootstrapFunction {

	@Persisted
	private PersistedValue<MyState> state = PersistedValue.of("my-state", MyState.class);

	@Override
	public void bootstrap(Context context, Object input) {
		state.set(extractStateFromInput(input));
	}
 }

假设这个引导函数是为函数类型MyFunctionType提供的,并且引导函数实例的id是id-13。该函数使用给定的引导数据写入名称my-state的持久状态。在从使用这个引导函数生成的保存点恢复有状态函数应用程序之后,带有address (MyFunctionType, id-13)的有状态函数实例将已经在状态名my-state下具有可用的状态值。

创建一个保存点

通过定义某些元数据(例如最大并行度和状态后端)来创建保存点。默认状态后端是RocksDB

int maxParallelism = 128;
StatefulFunctionsSavepointCreator newSavepoint = new StatefulFunctionsSavepointCreator(maxParallelism);

每个输入数据集都通过路由器在保存点创建中注册,该路由器将每个记录路由到零个或多个功能实例。然后,可以向保存点创建者注册任意数量的函数类型,类似于在有状态函数模块中注册函数的方式。最后,为生成的保存点指定输出位置。

// Read data from a file, database, or other location
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

final DataSet<Tuple2<String, Integer>> userSeenCounts = env.fromElements(
	Tuple2.of("foo", 4), Tuple2.of("bar", 3), Tuple2.of("joe", 2));

// Register the dataset with a router
newSavepoint.withBootstrapData(userSeenCounts, MyStateBootstrapFunctionRouter::new);

// Register a bootstrap function to process the records
newSavepoint.withStateBootstrapFunctionProvider(
		new FunctionType("apache", "my-function"),
		ignored -> new MyStateBootstrapFunction());

newSavepoint.write("file:///savepoint/path/");

env.execute();

有关如何使用Flink DataSetAPI的完整详细信息,请查看官方文档

部署方式

创建新的savpepoint之后,可以使用它来为有状态功能应用程序提供初始状态。

在基于镜像进行部署时,请将-s命令传递给Flink JobMaster镜像。

version: "2.1"
services:
  master:
    image: my-statefun-application-image
    command: -s file:///savepoint/path

Last updated