API StreamFunction
StreamFunction is a stateful serverless function that handle chunked data from Zipper, and return a chunked data to Zipper.
func yomo.NewStreamFunction
yomo.NewStreamFunction(name, zipperAddr string, opts ...SfnOption) StreamFunction
Create a stream function instance.
name
: The name of the stream function.zipperAddr
: The endpoint of the Zipper to connect to.opts
: The SfnOption when create the stream function.
example:
sfn := yomo.NewStreamFunction(
"stream-llm-inf-response",
"localhost:9000",
yomo.WithCredential("token:123456abcdefg"),
)
type StreamFunction
sfn.SetObserveDataTags(tags ...Tag)
Set the data Tag list that will be observed from Zipper.
tags
: The data Tag list.
sfn.Init(fn)
This feature is introduced in version 1.14
Set the fn as initializer
for this serverless, It's Optional, if setted, the fn
will be invoked only once at the first time this serverless start.
fn
: The initializer handler withfunc () error
signature.
sfn := NewStreamFunction(
"test-sfn",
"localhost:9000",
)
defer sfn.Close()
// Init fn, load the 7b model to GPU momery when this serverless start.
err := sfn.Init(func() error {
return LoadModelToMemory('llama-2-7b-chat', 'tokenizer.model')
})
// handle every data to be predicted
sfn.SetHandler(...)
sfn.Connect()
sfn.SetHandler(fn AsyncHandler) error
Set the handler function in async mode, which accept the raw bytes data from Zipper, and return the raw bytes data to Zipper.
fn
: The handler function of AsyncHandler.
sfn := yomo.NewStreamFunction(
"my-fn",
"localhost:9000",
)
defer sfn.Close()
sfn.SetObserveDataTags(0x10)
sfn.SetHandler(func (ctx serverless.Context) {
data := ctx.Data()
log.Printf("✅ [my-fn] received <- %v", string(data))
})
err = sfn.Connect()
sfn.SetPipeHandler(fn PipeHandler) error
Set the handler function in blocking mode, which accept the raw bytes data from Zipper, and return the raw bytes data to Zipper.
fn
: The handler function of PipeHandler.
sfn.SetErrorHandler(fn func(err error))
Set the error handler function when server error occurs.
fn
: The error handler function.err
: The error.
sfn.Connect() error
Create a connection to Zipper, when data is received, the handler function will be called.
sfn.Write(tag Tag, data []byte) error
Write data to Zipper.
tag
: The data Tag.data
: The raw bytes data to be wrote.
sfn.Close() error
Close the connection to Zipper.
type SfnOption
func WithObserveDataTags(tags ...Tag) SfnOption
Set data tag list which observed by this stream function.
tags
: The Tag list.
func WithCredential(token string) SfnOption
Set the credential method when this Stream Function instance connect to Zipper.
token
: The token string.
func WithClientTLSConfig(tc *tls.Config) SfnOption
Set TLS config for this Stream Function instance.
tc
: The TLS config.
func WithClientQuicConfig(qc *quic.Config) SfnOption
Set QUIC config for this Stream Function instance.
qc
: The QUIC config (opens in a new tab).
WithLogger(logger *slog.Logger) SfnOption
Set the logger for this Source instance.
logger
: The logger.
WithSfnTracerProvider(tp *tracesdk.TracerProvider) SfnOption
Set the tracer provider for this Source instance.
tp
: The tracer provider.
tp, shutdown, err := trace.NewTracerProviderWithJaeger("yomo-sfn")
if err == nil {
log.Println("[sfn] 🛰 trace enabled")
}
defer shutdown(context.Background())
// stateful function
sfn := yomo.NewStreamFunction(
"sfn",
"localhost:9000",
yomo.WithSfnTracerProvider(tp), // use tracer provider
)
More about YoMo Observability.
type AsyncHandler
type AsyncHandler func(reqData []byte) (respTag Tag, respData []byte)
The request-response mode handler function, async mode.
reqData
: The raw bytes data received from Zipper.
Returns:
AsyncHandler is used to handle high concurrent requests, and the response data will be sent to Zipper after the handler function returns.
type PipeHandler
type PipeHandler func(in <-chan []byte, out chan<- *PayloadFrame)
The blocking mode handler function.
in
: The input channel of the raw bytes data received from Zipper.out
: The output channel of the PayloadFrame to be wrote to Zipper.
PipeHandler is used to handle chunked stream data, like video stream, audio stream, behavior sequence data, etc. Ingress data will be guarantee the order, and the egress data will be guarantee the order too. By this, developers can read video stream data continuously, then handle the frames by an AI model, and write the inference result back to user instantly.