Java
有状态功能是应用程序的基础。它们是隔离,分布和持久性的原子单位。作为对象,它们封装单个实体(例如,特定的用户,设备或会话)的状态并编码其行为。有状态的功能可以通过消息传递相互之间以及与外部系统进行交互。支持Java SDK作为Embedded_module。
首先,需要在你的应用程序中添加以下依赖项
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-sdk</artifactId>
<version>2.0.0</version>
</dependency>定义有状态函数
有状态函数是实现StatefulFunction接口的任何类。以下是一个简单的hello world函数的示例。
package org.apache.flink.statefun.docs;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;
public class FnHelloWorld implements StatefulFunction {
@Override
public void invoke(Context context, Object input) {
System.out.println("Hello " + input.toString());
}
}函数通过其invoke方法处理每个传入消息。输入是非类型化的,并以java.lang.Object的形式在系统中传递,因此一个函数可以潜在地处理多种类型的消息。
.上下文提供关于当前消息和函数的元数据,以及如何调用其他函数或外部系统。函数是基于函数类型和唯一标识符来调用的。
有状态匹配函数
有状态功能为处理事件和状态提供了强大的抽象,允许开发人员构建可以对任何类型的消息做出反应的组件。通常,函数仅需要处理一组已知的消息类型,并且StatefulMatchFunction接口提供了针对该问题的有效解决方案。
匹配简单函数
有状态匹配函数是针对这种模式的有状态函数的一种自定义变体。开发人员概述了预期的类型,可选的谓词和类型明确的业务逻辑,然后让系统将每个输入分配给正确的操作。 变量绑定在一个configure方法内部,该方法在首次加载实例时执行。
使你的函数完整
与第一个示例类似,默认情况下,match函数是部分函数,它将在与任何分支都不匹配的任何输入上抛出IllegalStateException。可以通过提供一个otherwise子句来完成所有操作,以完成不匹配的输入,并将其视为Java switch语句中的默认子句,从而使它们变得完整。otherwise操作将其消息视为非类型化的java.lang.Object消息,允许你处理任何意外消息。
Action Resolution Order
匹配函数将始终使用以下解析规则匹配从最特定到最不特定的操作。
首先,找到一个匹配类型和谓词的操作。如果两个谓词对特定输入返回true,则首先在绑定器中注册的谓词将获胜。接下来,搜索与类型匹配但没有关联谓词的操作。最后,如果一个catch-all存在,它将被执行或者抛出一个IllegalStateException。
函数类型和消息传递
在Java中,函数类型定义为包含名称空间和名称的字符串类型引用。该类型绑定到模块定义中的实现类。下面是hello world函数的示例函数类型。
然后可以从其他函数中引用此类型,以创建地址并向特定实例发送消息。
发送延迟的消息
函数能够延迟发送消息,以便它们会在一段时间后到达。函数甚至可以向自己发送延迟的消息,这些消息可以用作回调。延迟的消息是非阻塞的,因此功能将在发送和接收延迟的消息之间继续处理记录。
完成异步请求
在与外部系统(如数据库或API)进行交互时,需要注意与外部系统的通信延迟不会主导应用程序的全部工作。有状态函数允许注册一个java CompletableFuture,它将在将来的某个时候解析为一个值。Future和一个元数据对象一起注册,该元数据对象提供关于调用者的附加上下文。
当将来完成时,无论是成功完成还是异常完成,调用方函数类型和id都将使用AsyncOperationResult调用。异步结果可以在以下三种状态之一完成:
成功
异步操作已成功完成,可以通过AsyncOperationResult#value获得结果。
失败
异步操作失败,可以通过AsyncOperationResult#throwable找到原因。
未知
有状态功能在CompletableFuture完成之前已重新启动(可能在其他计算机上),因此未知异步操作的状态是什么。
持久化
有状态函数将状态视为头等公民,因此所有有状态函数都可以轻松定义状态,并在运行时自动将其设置为容错状态。通过仅定义一个或多个持久字段,所有有状态函数都可以包含状态。
最简单的入门方法是使用PersistedValue,它由其名称和所存储类型的类定义。数据始终限于特定的函数类型和标识符。以下是一个有状态函数,可根据用户被访问的次数向其打招呼。
持久化值附带了正确的基本方法来构建强大的有状态应用程序。调用PersistedValue#get将返回存储在state中的对象的当前值,如果未设置任何值,则返回null。相反,PersistedValue#set将更新state中的值,PersistedValue#clear将从state中删除该值。
集合类型
除了PersistedValue之外,Java SDK还支持两种持久化的集合类型。PersistedTable是键和值的集合,PersistedAppendingBuffer是一个只追加的缓冲区。
这些类型在功能上分别等同于PersistedValue和PersistedValue,但在某些情况下可能提供更好的性能。
函数提供者和依赖注入
有状态功能是在节点的分布式群集中创建的。 StatefulFunctionProvider是工厂类,用于在首次激活时创建有状态函数的新实例。
在每个并行工作程序中,每种类型的提供程序都调用一次,而不是每个id调用一次。如果有状态函数需要自定义配置,则可以在提供程序内部定义它们并将其传递给函数的构造函数。这也是创建共享物理资源(如数据库连接)的地方,这些资源可由任意数量的虚拟函数使用。现在,测试可以快速提供模拟或测试依赖项,而不需要复杂的依赖项注入框架。
Last updated
Was this helpful?