状态引导
通常情况下,应用程序需要文件、数据库或其他系统中的历史数据提供的初始状态。因为状态是由Apache Flink的快照机制管理的,对于有状态函数应用程序,这意味着将初始状态写入一个可用于启动作业的保存点。用户可以使用Flink的 State Processor API和StatefulFunctionSavepointCreator来引导有状态函数应用程序的初始状态。
首先,在你的应用程序中需要包含以下库:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink-state-processor</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api_2.11</artifactId>
<version>1.10.0</version>
</dependency>
状态引导函数
StateBootstrapFunction定义如何使用给定输入引导StatefulFunction实例的状态。
每个引导函数实例直接对应于一个StatefulFunction
类型。同样,每个实例都由一个地址唯一标识,该地址由正在引导的函数的类型和ID表示。引导功能实例保留的任何状态将对StatefulFunction
具有相同地址的相应活动实例可用。
例如,考虑以下状态引导程序功能:
public class MyStateBootstrapFunction implements StateBootstrapFunction {
@Persisted
private PersistedValue<MyState> state = PersistedValue.of("my-state", MyState.class);
@Override
public void bootstrap(Context context, Object input) {
state.set(extractStateFromInput(input));
}
}
假设这个引导函数是为函数类型MyFunctionType提供的,并且引导函数实例的id是id-13。该函数使用给定的引导数据写入名称my-state的持久状态。在从使用这个引导函数生成的保存点恢复有状态函数应用程序之后,带有address (MyFunctionType, id-13)的有状态函数实例将已经在状态名my-state下具有可用的状态值。
创建一个保存点
通过定义某些元数据(例如最大并行度和状态后端)来创建保存点。默认状态后端是RocksDB。
int maxParallelism = 128;
StatefulFunctionsSavepointCreator newSavepoint = new StatefulFunctionsSavepointCreator(maxParallelism);
每个输入数据集都通过路由器在保存点创建器中注册,该路由器将每个记录路由到零个或多个功能实例。然后,可以向保存点创建者注册任意数量的函数类型,类似于在有状态函数模块中注册函数的方式。最后,为生成的保存点指定输出位置。
// Read data from a file, database, or other location
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final DataSet<Tuple2<String, Integer>> userSeenCounts = env.fromElements(
Tuple2.of("foo", 4), Tuple2.of("bar", 3), Tuple2.of("joe", 2));
// Register the dataset with a router
newSavepoint.withBootstrapData(userSeenCounts, MyStateBootstrapFunctionRouter::new);
// Register a bootstrap function to process the records
newSavepoint.withStateBootstrapFunctionProvider(
new FunctionType("apache", "my-function"),
ignored -> new MyStateBootstrapFunction());
newSavepoint.write("file:///savepoint/path/");
env.execute();
有关如何使用Flink DataSet
API的完整详细信息,请查看官方文档。
部署方式
创建新的savpepoint之后,可以使用它来为有状态功能应用程序提供初始状态。
在基于镜像进行部署时,请将-s
命令传递给Flink JobMaster镜像。
version: "2.1"
services:
master:
image: my-statefun-application-image
command: -s file:///savepoint/path
Last updated
Was this helpful?