Skip to content
Documentation
API
Source

YoMo Source

Source generates data to Zipper.

func yomo.NewSource

yomo.NewSource(name string, zipperAddr string, opts ...SourceOption) Source

Create Source instance.

  • name: The name of this source instance.
  • zipperAddr: The endpoint of the Zipper to connect to.
  • opts: The SourceOption when create instance.

example:

source := yomo.NewSource(
	"websocket-source",
	"localhost:9000",
	yomo.WithCredential("token:123456abcdefg"),
)

type Source

source.Connect() error

Start connect to Zipper.

source.Close() error

Disconnect from Zipper.

source.Write(tag uint32, data []byte) error

Write data to direct Zipper with specified Tag.

The default transactionID is epoch time.

  • tag: The Tag of data.
  • data: The data to write.

source.Broadcast(tag uint32, data []byte) error

Write the data to all downstreams with specified Tag.

  • tag: The Tag of data.
  • data: The data to broadcast to all downstream Zippers.

source.SetErrorHandler(fn func(err error))

Set the error handler function when server error occurs.

  • fn: The error handler function.
    • err: The error.

source.SetReceiveHandler(fn func(tag Tag, data []byte))

Set the observe handler function.

  • fn: The observe handler function.
    • tag: The Tag of data.
    • data: The data.

type SourceOption

func WithObserveDataTags(tags ...Tag) SourceOption

Set data tag list which observed by this source.

  • tags: The Tag list.

func WithCredential(token string) SourceOption

Set the credential method when this Source instance connect to Zipper.

  • token: The token string.

func WithClientTLSConfig(tc *tls.Config) SourceOption

Set TLS config for this Source instance.

  • tc: The TLS config.

func WithClientQuicConfig(qc *quic.Config) SourceOption

Set QUIC config for this Source instance.

WithLogger(logger *slog.Logger) SourceOption

Set the logger for this Source instance.

  • logger: The logger.

WithTracerProvider(tp *tracesdk.TracerProvider) SourceOption

Set the tracer provider for this Source instance.

  • tp: The tracer provider.
// trace
tp, shutdown, err := trace.NewTracerProviderWithJaeger("yomo-source")
if err == nil {
	log.Println("[source] 🛰 trace enabled")
}
defer shutdown(context.Background())
 
// source
source := yomo.NewSource(
	"source",
	"localhost:9000",
	yomo.WithTracerProvider(tp),  // use trace provider
)

More about YoMo Observability.

Example

Followed the tutorial in yomo run, let's implement an application to generate the data. assume the Zipper Service is already listening on localhost:9000:

main.go
package main
 
import (
	"encoding/json"
	"fmt"
	"math/rand"
	"time"
 
	"github.com/yomorun/yomo"
)
 
type noiseData struct {
	Noise float32 `json:"noise"` // Noise level
	Time  int64   `json:"time"`  // Timestamp (ms)
	From  string  `json:"from"`  // Source identify
}
 
func main() {
	// connect to YoMo-Zipper.
	source := yomo.NewSource(
		"yomo-source",
		"localhost:9000",
	)
	err := source.Connect()
	if err != nil {
		panic(err)
	}
	defer source.Close()
 
	source.SetDataTag(0x33)
 
	for {
		// generate random data.
		data := noiseData{
			Noise: rand.New(rand.NewSource(time.Now().UnixNano())).Float32() * 200,
			Time:  time.Now().UnixNano() / int64(time.Millisecond),
			From:  "my-dev-mac",
		}
		sendingBuf, _ := json.Marshal(&data)
		fmt.Println("emit: ", data)
 
		// send data to Zipper via QUIC stream.
		source.Write(sendingBuf)
 
		time.Sleep(200 * time.Millisecond)
	}
}

Then we add go.mod by:

go mod init emitter
 
go: creating new go.mod: module emitter
go: to add module requirements and sums:
        go mod tidy
 
go mod tidy
 
go: finding module for package github.com/yomorun/yomo
go: found github.com/yomorun/yomo in github.com/yomorun/yomo v1.12.1
go: finding module for package github.com/kr/pretty
go: found github.com/kr/pretty in github.com/kr/pretty v0.3.1

Finally, we can run the application by:

go run main.go 
 
time=2023-04-16T23:04:54.603+08:00 level=INFO msg="use credential" component=Source client_id=a3zcJtlYwSbwU0aK5H2zK client_name=yomo-source credential_name=none
time=2023-04-16T23:04:54.608+08:00 level=INFO msg="connected to zipper" component=Source client_id=a3zcJtlYwSbwU0aK5H2zK client_name=yomo-source zipper_addr=localhost:9000
emit:  {13.991055 1681657494608 my-dev-mac}
emit:  {20.788294 1681657494810 my-dev-mac}
emit:  {194.06801 1681657495010 my-dev-mac}
emit:  {158.60637 1681657495211 my-dev-mac}
^C

Be sure your Zipper Service has started and listend on localhost:9000.