YoMo Source
Source generates data to Zipper.
func yomo.NewSource
yomo.NewSource(name string, zipperAddr string, opts ...SourceOption) Source
Create Source
instance.
name
: The name of this source instance.zipperAddr
: The endpoint of the Zipper to connect to.opts
: The SourceOption when create instance.
example:
source := yomo.NewSource(
"websocket-source",
"localhost:9000",
yomo.WithCredential("token:123456abcdefg"),
)
type Source
source.Connect() error
Start connect to Zipper.
source.Close() error
Disconnect from Zipper.
source.Write(tag uint32, data []byte) error
Write data to direct Zipper with specified Tag.
The default transactionID is epoch time.
tag
: The Tag of data.data
: The data to write.
source.Broadcast(tag uint32, data []byte) error
Write the data to all downstreams with specified Tag.
source.SetErrorHandler(fn func(err error))
Set the error handler function when server error occurs.
fn
: The error handler function.err
: The error.
source.SetReceiveHandler(fn func(tag Tag, data []byte))
Set the observe handler function.
fn
: The observe handler function.tag
: The Tag of data.data
: The data.
type SourceOption
func WithObserveDataTags(tags ...Tag) SourceOption
Set data tag list which observed by this source.
tags
: The Tag list.
func WithCredential(token string) SourceOption
Set the credential method when this Source instance connect to Zipper.
token
: The token string.
func WithClientTLSConfig(tc *tls.Config) SourceOption
Set TLS config for this Source instance.
tc
: The TLS config.
func WithClientQuicConfig(qc *quic.Config) SourceOption
Set QUIC config for this Source instance.
qc
: The QUIC config (opens in a new tab).
WithLogger(logger *slog.Logger) SourceOption
Set the logger for this Source instance.
logger
: The logger.
WithTracerProvider(tp *tracesdk.TracerProvider) SourceOption
Set the tracer provider for this Source instance.
tp
: The tracer provider.
// trace
tp, shutdown, err := trace.NewTracerProviderWithJaeger("yomo-source")
if err == nil {
log.Println("[source] 🛰 trace enabled")
}
defer shutdown(context.Background())
// source
source := yomo.NewSource(
"source",
"localhost:9000",
yomo.WithTracerProvider(tp), // use trace provider
)
More about YoMo Observability.
Example
Followed the tutorial in yomo run
, let's implement an application to generate the data. assume the Zipper Service is already listening on localhost:9000
:
package main
import (
"encoding/json"
"fmt"
"math/rand"
"time"
"github.com/yomorun/yomo"
)
type noiseData struct {
Noise float32 `json:"noise"` // Noise level
Time int64 `json:"time"` // Timestamp (ms)
From string `json:"from"` // Source identify
}
func main() {
// connect to YoMo-Zipper.
source := yomo.NewSource(
"yomo-source",
"localhost:9000",
)
err := source.Connect()
if err != nil {
panic(err)
}
defer source.Close()
source.SetDataTag(0x33)
for {
// generate random data.
data := noiseData{
Noise: rand.New(rand.NewSource(time.Now().UnixNano())).Float32() * 200,
Time: time.Now().UnixNano() / int64(time.Millisecond),
From: "my-dev-mac",
}
sendingBuf, _ := json.Marshal(&data)
fmt.Println("emit: ", data)
// send data to Zipper via QUIC stream.
source.Write(sendingBuf)
time.Sleep(200 * time.Millisecond)
}
}
Then we add go.mod
by:
ᐅ go mod init emitter
go: creating new go.mod: module emitter
go: to add module requirements and sums:
go mod tidy
ᐅ go mod tidy
go: finding module for package github.com/yomorun/yomo
go: found github.com/yomorun/yomo in github.com/yomorun/yomo v1.12.1
go: finding module for package github.com/kr/pretty
go: found github.com/kr/pretty in github.com/kr/pretty v0.3.1
Finally, we can run the application by:
ᐅ go run main.go
time=2023-04-16T23:04:54.603+08:00 level=INFO msg="use credential" component=Source client_id=a3zcJtlYwSbwU0aK5H2zK client_name=yomo-source credential_name=none
time=2023-04-16T23:04:54.608+08:00 level=INFO msg="connected to zipper" component=Source client_id=a3zcJtlYwSbwU0aK5H2zK client_name=yomo-source zipper_addr=localhost:9000
emit: {13.991055 1681657494608 my-dev-mac}
emit: {20.788294 1681657494810 my-dev-mac}
emit: {194.06801 1681657495010 my-dev-mac}
emit: {158.60637 1681657495211 my-dev-mac}
^C
Be sure your Zipper Service has started and listend on localhost:9000
.