syntax = "proto3";
package example;
// External request sent by a user who wants to be greeted
message GreetRequest {
// The name of the user to greet
string name = 1;
}
// A customized response sent to the user
message GreetResponse {
// The name of the user being greeted
string name = 1;
// The users customized greeting
string greeting = 2;
}
// An internal message used to store state
message SeenCount {
// The number of times a users has been seen so far
int64 seen = 1;
}
def compute_greeting(name, seen):
"""
Compute a personalized greeting, based on the number of times this @name had been seen before.
"""
templates = ["", "Welcome %s", "Nice to see you again %s", "Third time is a charm %s"]
if seen < len(templates):
greeting = templates[seen] % name
else:
greeting = "Nice to see you at the %d-nth time %s!" % (seen, name)
response = GreetResponse()
response.name = name
response.greeting = greeting
return response
from messages_pb2 import SeenCount, GreetRequest, GreetResponse
from statefun import StatefulFunctions
from statefun import RequestReplyHandler
from statefun import kafka_egress_record
functions = StatefulFunctions()
@functions.bind("example/greeter")
def greet(context, greet_request: GreetRequest):
state = context.state('seen_count').unpack(SeenCount)
if not state:
state = SeenCount()
state.seen = 1
else:
state.seen += 1
context.state('seen_count').pack(state)
response = compute_greeting(greet_request.name, state.seen)
egress_message = kafka_egress_record(topic="greetings", key=greet_request.name, value=response)
context.pack_and_send_egress("example/greets", egress_message)
def compute_greeting(name, seen):
"""
Compute a personalized greeting, based on the number of times this @name had been seen before.
"""
templates = ["", "Welcome %s", "Nice to see you again %s", "Third time is a charm %s"]
if seen < len(templates):
greeting = templates[seen] % name
else:
greeting = "Nice to see you at the %d-nth time %s!" % (seen, name)
response = GreetResponse()
response.name = name
response.greeting = greeting
return response
handler = RequestReplyHandler(functions)
#
# Serve the endpoint
#
from flask import request
from flask import make_response
from flask import Flask
app = Flask(__name__)
@app.route('/statefun', methods=['POST'])
def handle():
response_data = handler(request.data)
response = make_response(response_data)
response.headers.set('Content-Type', 'application/octet-stream')
return response
if __name__ == "__main__":
app.run()