A lightweight stream processing library for Go

go-streams

Build Status PkgGoDev Go Report Card codecov

A lightweight stream processing library for Go.
go-streams provides a simple and concise DSL to build data pipelines. pipeline-architecture-example

Wiki
In computing, a pipeline, also known as a data pipeline, is a set of data processing elements connected in series, where the output of one element is the input of the next one. The elements of a pipeline are often executed in parallel or in time-sliced fashion. Some amount of buffer storage is often inserted between elements.

Overview

Building blocks:

  • Source - A Source is a set of stream processing steps that has one open output.
  • Flow - A Flow is a set of stream processing steps that has one open input and one open output.
  • Sink - A Sink is a set of stream processing steps that has one open input. Can be used as a Subscriber.

Flow capabilities (flow package):

  • Map
  • FlatMap
  • Filter
  • PassThrough
  • Split
  • FanOut
  • Merge
  • Throttler
  • SlidingWindow
  • TumblingWindow

Supported Connectors:

Examples

Usage samples are available in the examples directory.

License

Licensed under the MIT License.

Comments
  • Adding support for nats-streaming servers.

    Adding support for nats-streaming servers.

    Motivation

    Modifications

    • Addition of a new package, nats

    Verify change

    • [ ] Make sure the change passes the CI checks.
  • Kafka-go Extension (Windows Compatibility)

    Kafka-go Extension (Windows Compatibility)

    This is probably not high priority for the vast majority of users, however the use of the confluent kafka client makes it so that go-streams is not windows compatible out of the box.

    The suggestion I'd like to make is to change the extension to use something like: https://github.com/segmentio/kafka-go

  • Support for loop/feedback in the graph

    Support for loop/feedback in the graph

    I wonder whether this library supports loop/feedback constructs. Skimming the API gave a hint it shall be possible but I wonder about real world implications.

    Any insights? Thanks!

  • Graceful shutdown

    Graceful shutdown

    In all examples the main go-routine either blocks forever or waits for timeout.

    Is there any way to gracefully exit main function once processing completes (but not aborting all currently running go-routines until they finish the job)? I can do it with ChanSink and using some sync primitive inside it, but it looks a bit ugly. Any nicer solution?

  • feat: new RoundRobin flow util

    feat: new RoundRobin flow util

    Motivation

    • Fixes #___

    Modifications

    • Introduce a new flow util similar to FanOut, but balances the work across go funcs. I feel like this could be useful for parallelizing work across cpu cores.

    Verify change

    • [x] Make sure the change passes the CI checks.
  • Solve the problem of memory leak

    Solve the problem of memory leak

    Solve the memory leak problem caused by the equality of sw.size and sw.slide

    Motivation

    • Fixes #___

    Modifications

    Verify change

    • [ ] Make sure the change passes the CI checks.
  • Fix memory leak caused by time.After()

    Fix memory leak caused by time.After()

    Motivation

    • Fix memory leak casused by time.After()

    Modifications

    • Use time.NewTicker() outside the loop

    Verify change

    • [ ] Make sure the change passes the CI checks.
  • the problem with fanout function

    the problem with fanout function

    I test the fanout function in flow_test.go, it seems not correct when I add another flow. var toReture = func(in interface{}) interface{} { msg := in.(string) return msg }

    func TestFlowUtil(t *testing.T) { in := make(chan interface{}) out := make(chan interface{}) source := ext.NewChanSource(in) flow1 := flow.NewMap(toUpper, 1) flow2 := flow.NewMap(toReture, 1) filter := flow.NewFilter(filterA, 1) sink := ext.NewChanSink(out) var _input = []string{"a", "b", "c"} var _expectedOutput = []string{"b", "b", "c", "c"} go ingest(_input, in) go deferClose(in, time.Second*10) go func() { fanOut := flow.FanOut(source.Via(filter).Via(flow1).Via(flow2, 2) fanOut[1].To(sink) }() var _output []string for e := range sink.Out { _output = append(_output, e.(string)) } sort.Strings(_output) assertEqual(t, _expectedOutput, _output) } The function 'toReture' does nothing ,but it return incorrect result.

  • [feature/zipwith] : implementing ZipWith functionnality and added a test

    [feature/zipwith] : implementing ZipWith functionnality and added a test

    Motivation

    • Added ZipWith functionality, e.g. taking elements from several upstreams in a round-robin fashion, bundling them to a single element, then sending it downstream.

    Modifications

    • Implementation is in utils.go, the test is in flow_test.go.

    Verify change

    • [V] Make sure the change passes the CI checks.
  • Kafka messages should be processed in parallel

    Kafka messages should be processed in parallel

    Hi,

    I was looking at row 133 of kafka_sarama.go source code and noticed that every consumed message are pushed to the same channel. If I'm correct, this have the effect to make the processing of the messages to be sequential, loosing the possibility of processing the partitions in parallel. Could it make sense to have a channel per partition in this case?

  • feat: add keyBy、connect, coFlatMap

    feat: add keyBy、connect, coFlatMap

    Motivation

    • feat: add keyBy、connect, coFlatMap for a stream join a stream

    Modifications

    Verify change

    • [ ] Make sure the change passes the CI checks.
  • Does buffer support?

    Does buffer support?

    Is there a way to support grouping data by time and length? like:

    	ch := make(chan any, 5)
    	go func() {
    		for i := 0; i < 10000; i++ {
    			time.Sleep(rndTime)
    			ch <- i
    		}
    	}()
    	source := extension.NewChanSource(ch)
    
    	m := flow.NewMap(func(i interface{}) interface{} {
    		logrus.Infof("%v", i)
    		return i
    	}, 1)
            // not supported now
    	th := flow.NewBufferWithTimeOrCount(time.Millisecond * 100, 50) // every 100 mil secs or 50 counts emits data
    
    	source.Via(th).Via(m).To(extension.NewIgnoreSink())
    

    expection print:

    [x,x,x,x...x,x,x] // 50 counts
    // if 100 mil secs pass
    [x,x,...x,x] // data counts less than 50
    
  • Is it possible?Materialize the intermediate state

    Is it possible?Materialize the intermediate state

    There are many instances running in Distributed architecture. Some status are shared between instances, like time windows. We could put it on Redis, or other storage.

A library to simplify writing applications using TCP sockets to stream protobuff messages

BuffStreams Streaming Protocol Buffers messages over TCP in Golang What is BuffStreams? BuffStreams is a set of abstraction over TCPConns for streamin

Dec 13, 2022
Totem - A Go library that can turn a single gRPC stream into bidirectional unary gRPC servers

Totem is a Go library that can turn a single gRPC stream into bidirectional unar

Jan 6, 2023
Library for directly interacting and controlling an Elgato Stream Deck on Linux.

Stream Deck Library for directly interacting and controlling an Elgato Stream Deck on Linux. This library is designed to take exclusive control over a

Dec 17, 2022
A toy project to stream from a Remarkable2

goMarkableStream I use this toy project to stream my remarkable 2 (firmware 2.5) on my laptop using the local wifi. video/demo here Quick start You ne

Dec 31, 2022
V3IO Frames ("Frames") is a Golang based remote data frames access (over gRPC or HTTP stream)

V3IO Frames ("Frames") is a multi-model open-source data-access library that provides a unified high-performance DataFrame API for working with different types of data sources (backends). The library was developed by Iguazio to simplify working with data in the Iguazio Data Science Platform ("the platform"), but it can be extended to support additional backend types.

Oct 1, 2022
Stream Camera based on TCP
Stream Camera based on TCP

streamera Term Project of Computer Networking streamera is a Stream Camera based on TCP, which contains client mode and server mode. Features Client M

Nov 11, 2022
Reflex stream client for redis streams

rredis A reflex stream client for a redis streams using the radix client implementation. It provides an API for inserting data into a stream and for c

Oct 5, 2021
A simple Go server that broadcasts any data/stream

broadcast A simple Go server that broadcasts any data/stream usage data You can POST data. curl -X POST --data-binary "@111.png" localhost:9222/test.p

Aug 12, 2022
Reads MAWS formatted data and converts it into JSON output stream.

maws2json Usage examples Over serial line (stdin pipe) Lets assume that Vaisala weather station is connected via RS232 to USB serial dongle in /dev/tt

Feb 6, 2022
Reads JSON object (stream) from file/stdin and routes it/them to GCP Pub/Sub topics.

json2pubsub Publish JSON object (stream) into GCP Pub/Sub topic based on a field value. Usage: json2pubsub --project=STRING <mapping> ... Reads JSON

Nov 3, 2022
Broadcast-server - A simple Go server that broadcasts any data/stream

broadcast A simple Go server that broadcasts any data/stream usage data You can

Oct 21, 2022
A go module supply Java-Like generic stream programming (while do type check at runtime)

gostream A go module supplying Java-Like generic stream programming (while do type check at runtime) Using Get a Stream To get a Stream, using SliceSt

Jan 16, 2022
Rabbitio - Rabbit stream cipher package RFC 4503 for Go

rabbitio rabbitio is a rabbit stream cipher packge based on RFC 4503 for golang

Dec 14, 2022
Sstreamcry - Shadowsocks stream bomb

ShadowStreamCry A Shadowsocks stream bomb. Credits DuckSoft Qv2ray/rc4md5cry v2f

Feb 24, 2022
Moviefetch: a simple program to search and download for movies from websites like 1337x and then stream them

MovieFetch Disclaimer I am NOT responisble for any legal issues or other you enc

Dec 2, 2022
Twitter-plugin - Falco Plugin for Twitter Stream

Twitter Plugin This repository contains the twittter plugin for Falco, which fol

Mar 17, 2022
Provides packet processing capabilities for Go

GoPacket This library provides packet decoding capabilities for Go. See godoc for more details. Minimum Go version required is 1.5 except for pcapgo/E

Dec 29, 2022
🚀Gev is a lightweight, fast non-blocking TCP network library based on Reactor mode. Support custom protocols to quickly and easily build high-performance servers.
🚀Gev is a lightweight, fast non-blocking TCP network library based on Reactor mode. Support custom protocols to quickly and easily build high-performance servers.

gev 中文 | English gev is a lightweight, fast non-blocking TCP network library based on Reactor mode. Support custom protocols to quickly and easily bui

Jan 6, 2023
A lightweight and simplistic Tor library for golang

gotor A lightweight and simplistic Tor library for golang go get github.com/ripmeep/gotor import "github.com/ripmeep/gotor" Usage t := tor.TorConnecti

Jul 2, 2022