syntax ="proto3";package example;// External request sent by a user who wants to be greetedmessage GreetRequest {// The name of the user to greet string name = 1;}// A customized response sent to the usermessage GreetResponse {// The name of the user being greeted string name = 1;// The users customized greeting string greeting = 2;}// An internal message used to store statemessage SeenCount {// The number of times a users has been seen so far int64 seen = 1;}
def compute_greeting(name, seen):
"""
Compute a personalized greeting, based on the number of times this @name had been seen before.
"""
templates = ["", "Welcome %s", "Nice to see you again %s", "Third time is a charm %s"]
if seen < len(templates):
greeting = templates[seen] % name
else:
greeting = "Nice to see you at the %d-nth time %s!" % (seen, name)
response = GreetResponse()
response.name = name
response.greeting = greeting
return response
from messages_pb2 importSeenCount, GreetRequest, GreetResponsefrom statefun import StatefulFunctionsfrom statefun import RequestReplyHandlerfrom statefun import kafka_egress_recordfunctions = StatefulFunctions()@functions.bind("example/greeter")def greet(context, greet_request: GreetRequest): state = context.state('seen_count').unpack(SeenCount) if not state: state = SeenCount() state.seen = 1 else: state.seen += 1 context.state('seen_count').pack(state) response = compute_greeting(greet_request.name, state.seen) egress_message = kafka_egress_record(topic="greetings", key=greet_request.name, value=response) context.pack_and_send_egress("example/greets", egress_message)def compute_greeting(name, seen): """ Compute a personalized greeting, based on the number of times this @name had been seen before. """ templates = ["", "Welcome %s", "Nice to see you again %s", "Third time is a charm %s"] if seen < len(templates): greeting = templates[seen] % name else: greeting = "Nice to see you at the %d-nth time %s!" % (seen, name) response = GreetResponse() response.name = name response.greeting = greeting return responsehandler = RequestReplyHandler(functions)## Serve the endpoint#from flask import requestfrom flask import make_responsefrom flask import Flaskapp = Flask(__name__)@app.route('/statefun', methods=['POST'])def handle(): response_data = handler(request.data) response = make_response(response_data) response.headers.set('Content-Type', 'application/octet-stream') return responseif __name__ == "__main__": app.run()