package org.apache.flink.statefun.docs;
import org.apache.flink.statefun.sdk.FunctionType;
/** A function type that will be bound to {@link FnHelloWorld}. */
public class Identifiers {
public static final FunctionType HELLO_TYPE = new FunctionType("apache/flink", "hello");
}
package org.apache.flink.statefun.docs;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;
/** A simple stateful function that sends a message to the user with id "user1" */
public class FnCaller implements StatefulFunction {
@Override
public void invoke(Context context, Object input) {
context.send(Identifiers.HELLO_TYPE, "user1", new MyUserMessage());
}
}
package org.apache.flink.statefun.docs.delay;
import java.time.Duration;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;
public class FnDelayedMessage implements StatefulFunction {
@Override
public void invoke(Context context, Object input) {
if (input instanceof Message) {
System.out.println("Hello");
context.sendAfter(Duration.ofMinutes(1), context.self(), new DelayedMessage());
}
if (input instanceof DelayedMessage) {
System.out.println("Welcome to the future!");
}
}
}
package org.apache.flink.statefun.docs.async;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.statefun.sdk.AsyncOperationResult;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;
@SuppressWarnings("unchecked")
public class EnrichmentFunction implements StatefulFunction {
private final QueryService client;
public EnrichmentFunction(QueryService client) {
this.client = client;
}
@Override
public void invoke(Context context, Object input) {
if (input instanceof User) {
onUser(context, (User) input);
} else if (input instanceof AsyncOperationResult) {
onAsyncResult((AsyncOperationResult) input);
}
}
private void onUser(Context context, User user) {
CompletableFuture<UserEnrichment> future = client.getDataAsync(user.getUserId());
context.registerAsyncOperation(user, future);
}
private void onAsyncResult(AsyncOperationResult<User, UserEnrichment> result) {
if (result.successful()) {
User metadata = result.metadata();
UserEnrichment value = result.value();
System.out.println(
String.format("Successfully completed future: %s %s", metadata, value));
} else if (result.failure()) {
System.out.println(
String.format("Something has gone terribly wrong %s", result.throwable()));
} else {
System.out.println("Not sure what happened, maybe retry");
}
}
}
package org.apache.flink.statefun.docs;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.annotations.Persisted;
import org.apache.flink.statefun.sdk.state.PersistedValue;
public class FnUserGreeter implements StatefulFunction {
public static FunctionType TYPE = new FunctionType("example", "greeter");
@Persisted
private final PersistedValue<Integer> count = PersistedValue.of("count", Integer.class);
public void invoke(Context context, Object input) {
String userId = context.self().id();
int seen = count.getOrDefault(0);
switch (seen) {
case 0:
System.out.println(String.format("Hello %s!", userId));
break;
case 1:
System.out.println("Hello Again!");
break;
case 2:
System.out.println("Third time is the charm :)");
break;
default:
System.out.println(String.format("Hello for the %d-th time", seen + 1));
}
count.set(seen + 1);
}
}
package org.apache.flink.statefun.docs;
import org.apache.flink.statefun.docs.dependency.ProductionDependency;
import org.apache.flink.statefun.docs.dependency.RuntimeDependency;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
public class CustomProvider implements StatefulFunctionProvider {
public StatefulFunction functionOfType(FunctionType type) {
RuntimeDependency dependency = new ProductionDependency();
return new FnWithDependency(dependency);
}
}
package org.apache.flink.statefun.docs;
import org.apache.flink.statefun.docs.dependency.RuntimeDependency;
import org.apache.flink.statefun.docs.dependency.TestDependency;
import org.junit.Assert;
import org.junit.Test;
public class FunctionTest {
@Test
public void testFunctionWithCustomDependency() {
RuntimeDependency dependency = new TestDependency();
FnWithDependency function = new FnWithDependency(dependency);
Assert.assertEquals("It appears math is broken", 1 + 1, 2);
}
}