Java

有状态功能是应用程序的基础。它们是隔离,分布和持久性的原子单位。作为对象,它们封装单个实体(例如,特定的用户,设备或会话)的状态并编码其行为。有状态的功能可以通过消息传递相互之间以及与外部系统进行交互。支持Java SDK作为Embedded_module

首先,需要在你的应用程序中添加以下依赖项

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>statefun-sdk</artifactId>
	<version>2.0.0</version>
</dependency>

定义有状态函数

有状态函数是实现StatefulFunction接口的任何类。以下是一个简单的hello world函数的示例。

package org.apache.flink.statefun.docs;

import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;

public class FnHelloWorld implements StatefulFunction {

	@Override
	public void invoke(Context context, Object input) {
		System.out.println("Hello " + input.toString());
	}
}

函数通过其invoke方法处理每个传入消息。输入是非类型化的,并以java.lang.Object的形式在系统中传递,因此一个函数可以潜在地处理多种类型的消息。

.上下文提供关于当前消息和函数的元数据,以及如何调用其他函数或外部系统。函数是基于函数类型和唯一标识符来调用的。

有状态匹配函数

有状态功能为处理事件和状态提供了强大的抽象,允许开发人员构建可以对任何类型的消息做出反应的组件。通常,函数仅需要处理一组已知的消息类型,并且StatefulMatchFunction接口提供了针对该问题的有效解决方案。

匹配简单函数

有状态匹配函数是针对这种模式的有状态函数的一种自定义变体。开发人员概述了预期的类型,可选的谓词和类型明确的业务逻辑,然后让系统将每个输入分配给正确的操作。 变量绑定在一个configure方法内部,该方法在首次加载实例时执行。

使你的函数完整

与第一个示例类似,默认情况下,match函数是部分函数,​​它将在与任何分支都不匹配的任何输入上抛出IllegalStateException。可以通过提供一个otherwise子句来完成所有操作,以完成不匹配的输入,并将其视为Java switch语句中的默认子句,从而使它们变得完整。otherwise操作将其消息视为非类型化的java.lang.Object消息,允许你处理任何意外消息。

Action Resolution Order

匹配函数将始终使用以下解析规则匹配从最特定到最不特定的操作。

首先,找到一个匹配类型和谓词的操作。如果两个谓词对特定输入返回true,则首先在绑定器中注册的谓词将获胜。接下来,搜索与类型匹配但没有关联谓词的操作。最后,如果一个catch-all存在,它将被执行或者抛出一个IllegalStateException。

函数类型和消息传递

在Java中,函数类型定义为包含名称空间和名称的字符串类型引用。该类型绑定到模块定义中的实现类。下面是hello world函数的示例函数类型。

然后可以从其他函数中引用此类型,以创建地址并向特定实例发送消息。

发送延迟的消息

函数能够延迟发送消息,以便它们会在一段时间后到达。函数甚至可以向自己发送延迟的消息,这些消息可以用作回调。延迟的消息是非阻塞的,因此功能将在发送和接收延迟的消息之间继续处理记录。

完成异步请求

在与外部系统(如数据库或API)进行交互时,需要注意与外部系统的通信延迟不会主导应用程序的全部工作。有状态函数允许注册一个java CompletableFuture,它将在将来的某个时候解析为一个值。Future和一个元数据对象一起注册,该元数据对象提供关于调用者的附加上下文。

当将来完成时,无论是成功完成还是异常完成,调用方函数类型和id都将使用AsyncOperationResult调用。异步结果可以在以下三种状态之一完成:

成功

异步操作已成功完成,可以通过AsyncOperationResult#value获得结果。

失败

异步操作失败,可以通过AsyncOperationResult#throwable找到原因。

未知

有状态功能在CompletableFuture完成之前已重新启动(可能在其他计算机上),因此未知异步操作的状态是什么。

持久化

有状态函数将状态视为头等公民,因此所有有状态函数都可以轻松定义状态,并在运行时自动将其设置为容错状态。通过仅定义一个或多个持久字段,所有有状态函数都可以包含状态。

最简单的入门方法是使用PersistedValue,它由其名称和所存储类型的类定义。数据始终限于特定的函数类型和标识符。以下是一个有状态函数,可根据用户被访问的次数向其打招呼。

注意:必须将所有PersistedValuePersistedTablePersistedAppendingBuffer字段标记为@Persisted批注,否则运行时将不使它们成为容错的。

持久化值附带了正确的基本方法来构建强大的有状态应用程序。调用PersistedValue#get将返回存储在state中的对象的当前值,如果未设置任何值,则返回null。相反,PersistedValue#set将更新state中的值,PersistedValue#clear将从state中删除该值。

集合类型

除了PersistedValue之外,Java SDK还支持两种持久化的集合类型。PersistedTable是键和值的集合,PersistedAppendingBuffer是一个只追加的缓冲区。

这些类型在功能上分别等同于PersistedValuePersistedValue,但在某些情况下可能提供更好的性能。

函数提供者和依赖注入

有状态功能是在节点的分布式群集中创建的。 StatefulFunctionProvider是工厂类,用于在首次激活时创建有状态函数的新实例。

在每个并行工作程序中,每种类型的提供程序都调用一次,而不是每个id调用一次。如果有状态函数需要自定义配置,则可以在提供程序内部定义它们并将其传递给函数的构造函数。这也是创建共享物理资源(如数据库连接)的地方,这些资源可由任意数量的虚拟函数使用。现在,测试可以快速提供模拟或测试依赖项,而不需要复杂的依赖项注入框架。

Last updated

Was this helpful?