private static String greetText(String name, int seen) {
switch (seen) {
case 0:
return String.format("Hello %s !", name);
case 1:
return String.format("Hello again %s !", name);
case 2:
return String.format("Third times the charm! %s!", name);
case 3:
return String.format("Happy to see you once again %s !", name);
default:
return String.format("Hello at the %d-th time %s", seen + 1, name);
}
package org.apache.flink.statefun.examples.greeter;
import org.apache.flink.statefun.examples.kafka.generated.GreetRequest;
import org.apache.flink.statefun.sdk.io.Router;
final class GreetRouter implements Router<GreetRequest> {
@Override
public void route(GreetRequest message, Downstream<GreetRequest> downstream) {
downstream.forward(GreetingConstants.GREETER_FUNCTION_TYPE, message.getWho(), message);
}
}
package org.apache.flink.statefun.examples.greeter;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.annotations.Persisted;
import org.apache.flink.statefun.sdk.state.PersistedValue;
public final class GreetFunction implements StatefulFunction {
@Persisted
private final PersistedValue<Integer> count = PersistedValue.of("count", Integer.class);
@Override
public void invoke(Context context, Object input) {
GreetRequest greetMessage = (GreetRequest) input;
GreetResponse response = computePersonalizedGreeting(greetMessage);
context.send(GreetingConstants.GREETING_EGRESS_ID, response);
}
private GreetResponse computePersonalizedGreeting(GreetRequest greetMessage) {
final String name = greetMessage.getWho();
final int seen = count.getOrDefault(0);
count.set(seen + 1);
String greeting = greetText(name, seen);
return GreetResponse.newBuilder()
.setWho(name)
.setGreeting(greeting)
.build();
}
}