Stream processing stuff for Go

GoStream

Type safe Stream processing library inspired in the Java Streams API.

Table of contents

Requirements

  • Go 1.18.

This library makes intensive usage of Type Parameters (generics) so it is not compatible with any Go version lower than 1.18.

Until Go 1.18 stable is officially released, you can download the development version of Go 1.18 using Gotip:

go install golang.org/dl/gotip@latest
gotip download
alias go=gotip

Usage examples

Example 1: basic creation, transformation and iteration

  1. Creates a literal stream containing all the integers from 1 to 11.
  2. From the Stream, selects all the integers that are prime
  3. For each filtered int, prints a message.
import (
  "fmt"
  "github.com/mariomac/gostream/stream"
)

func main() {
  stream.Of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11).
    Filter(isPrime).
    ForEach(func(n int) {
      fmt.Printf("%d is a prime number\n", n)
    })
}

func isPrime(n int) bool {
  for i := 2; i < n/2; i++ {
    if n%i == 0 {
      return false
    }
  }
  return true
}

Output:

1 is a prime number
2 is a prime number
3 is a prime number
5 is a prime number
7 is a prime number
11 is a prime number

Example 2: generation, map, limit and slice conversion

  1. Creates an infinite stream of random integers (no problem, streams are evaluated lazily!)
  2. Divides the random integer to get a number between 1 and 6
  3. Limits the infinite stream to 5 elements.
  4. Collects the stream items as a slice.
rand.Seed(time.Now().UnixMilli())
fmt.Println("let me throw 5 times a dice for you")

results := stream.Generate(rand.Int).
    Map(func(n int) int {
        return n%6 + 1
    }).
    Limit(5).
    ToSlice()

fmt.Printf("results: %v\n", results)

Output:

let me throw 5 times a dice for you
results: [3 5 2 1 3]

Example 3: Generation from an iterator, Map to a different type

  1. Generates an infinite stream composed by 1, double(1), double(double(1)), etc... and cut it to 6 elements.
  2. Maps the numbers' stream to a strings' stream. Because, at the moment, go does not allow type parameters in methods, we need to invoke the stream.Map function instead of the numbers.Map method because the contained type of the output stream (string) is different than the type of the input stream (int).
  3. Converts the words stream to a slice and prints it.
func main() {
    numbers := stream.Iterate(1, double).Limit(6)
    words := stream.Map(numbers, asWord).ToSlice()
    fmt.Println(words)
}

func double(n int) int {
    return 2 * n
}

func asWord(n int) string {
    if n < 10 {
        return []string{"zero", "one", "two", "three", "four", "five",
            "six", "seven", "eight", "nine"}[n]
    } else {
        return "many"
    }
}

Output:

[one two four eight many many]

Example 4: deduplication of elements

Next example requires to compare the elements of the Stream, so the Stream needs to be converted to a ComparableStream[T], where T must be comparable (this is, defining) the == and != operators:

  1. Instantiate a Stream of comparable items.
  2. Pass it to the Distinct method, that will return a copy of the original Stream without duplicates
  3. Operating as any other stream.
words := stream.Distinct(
  stream.Of("hello", "hello", "!", "ho", "ho", "ho", "!"),
).ToSlice()

fmt.Printf("Deduplicated words: %v\n", words)

Output:

Deduplicated words: [hello ! ho]

Example 5: sorting from higher to lower

  1. Generate a stream of uint32 numbers.
  2. Picking up 5 elements.
  3. Sorting them by the inverse natural order (from higher to lower)
    • It's important to limit the number of elements, avoiding invoking Sorted over an infinite stream (otherwise it would panic).
fmt.Println("picking up 5 random numbers from higher to lower")
stream.Generate(rand.Uint32).
    Limit(5).
    Sorted(order.Inverse(order.Natural[uint32])).
    ForEach(func(n uint32) {
        fmt.Println(n)
    })

Output:

picking up 5 random numbers from higher to lower
4039455774
2854263694
2596996162
1879968118
1823804162

Example 6: Reduce and helper functions

  1. Generate an infinite incremental Stream (1, 2, 3, 4...) using the stream.Iterate instantiator and the item.Increment helper function.
  2. Limit the generated to 8 elements
  3. Reduce all the elements multiplying them using the item.Multiply helper function
fac8, _ := stream.Iterate(1, item.Increment[int]).
    Limit(8).
    Reduce(item.Multiply[int])
fmt.Println("The factorial of 8 is", fac8)

Output:

The factorial of 8 is 40320

Limitations

Due to the initial limitations of Go generics, the API has the following limitations. We will work on overcome them as long as new features are added to the Go type parameters specification.

  • You can use the Map as method as long as the output element has the same type of the input. If you need to Map to a different type, you need to use the stream.Map function.
  • There is no Distinct method. There is only a stream.Distinct function.
  • There is no ToMap method. There is only a stream.ToMap function.

Performance

Streams aren't the fastest option. They are aimed for complex workflows where you can sacrifice few microseconds for the sake of code organization and readability. Also disclaimer: functional streams don't have to always be the most readable option.

The following results show the difference in performance for an arbitrary set of operations in an imperative form versus the functional form using streams (see stream/benchs_test.go file):

$ gotip test -bench=. -benchmem  ./...
goos: darwin
goarch: amd64
pkg: github.com/mariomac/gostream/stream
cpu: Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
BenchmarkImperative-4            2098518               550.6 ns/op          1016 B/op          7 allocs/op
BenchmarkFunctional-4             293095              3653 ns/op            2440 B/op         23 allocs/op

Completion status

  • Stream instantiation functions
    • Comparable
    • Concat
    • Empty
    • Generate
    • Iterate
    • Of
    • OfMap
    • OfSlice
    • OfChannel
  • Stream transformers
    • Distinct
    • Filter
    • FlatMap
    • Limit
    • Map
    • Peek
    • Skip
    • Sorted
  • Collectors/Terminals
    • ToMap
    • ToSlice
    • AllMatch
    • AnyMatch
    • Count
    • FindAny
    • FindFirst
    • ForEach
    • Join (for strings)
    • Max
    • Min
    • NoneMatch
    • Reduce
  • Auxiliary Functions
    • Add (for numbers)
    • Increment (for numbers)
    • IsZero
    • Multiply (for numbers)
    • Neg (for numbers)
    • Not (for bools)
  • Future
    • More operations inspired in the Kafka Streams API
    • Parallel streams
    • Allow users implement their own Comparable or Ordered types

Extra credits

The Stream processing and aggregation functions are heavily inspired in the Java Stream Specification.

Stream code documentation also used Stream Javadoc as an essential reference and might contain citations from it.

Owner
Mario Macias
"Programación en Go (Marcombo ed.)" book author. Software engineer @RedHatOfficial. Before: @newrelic @UPC @midokura @BSCCNS @gameloft and others
Mario Macias
Similar Resources

Broadcast-server - A simple Go server that broadcasts any data/stream

broadcast A simple Go server that broadcasts any data/stream usage data You can

Oct 21, 2022

A go module supply Java-Like generic stream programming (while do type check at runtime)

gostream A go module supplying Java-Like generic stream programming (while do type check at runtime) Using Get a Stream To get a Stream, using SliceSt

Jan 16, 2022

Library for directly interacting and controlling an Elgato Stream Deck on Linux.

Stream Deck Library for directly interacting and controlling an Elgato Stream Deck on Linux. This library is designed to take exclusive control over a

Dec 17, 2022

Rabbitio - Rabbit stream cipher package RFC 4503 for Go

rabbitio rabbitio is a rabbit stream cipher packge based on RFC 4503 for golang

Dec 14, 2022

Sstreamcry - Shadowsocks stream bomb

ShadowStreamCry A Shadowsocks stream bomb. Credits DuckSoft Qv2ray/rc4md5cry v2f

Feb 24, 2022

Moviefetch: a simple program to search and download for movies from websites like 1337x and then stream them

MovieFetch Disclaimer I am NOT responisble for any legal issues or other you enc

Dec 2, 2022

Twitter-plugin - Falco Plugin for Twitter Stream

Twitter Plugin This repository contains the twittter plugin for Falco, which fol

Mar 17, 2022

Provides packet processing capabilities for Go

GoPacket This library provides packet decoding capabilities for Go. See godoc for more details. Minimum Go version required is 1.5 except for pcapgo/E

Dec 29, 2022

Go-backend-test - Creating backend stuff & openid connect authentication stuff in golang

Go Backend Coding Practice This is my practice repo to learn about creating back

Feb 5, 2022

Go Stream, like Java 8 Stream.

Go Stream, like Java 8 Stream.

Dec 1, 2022

Go Collection Stream API, inspired in Java 8 Stream.

GoStream gostream 是一个数据流式处理库。它可以声明式地对数据进行转换、过滤、排序、分组、收集,而无需关心操作细节。 Changelog 2021-11-18 add ToSet() collector Roadmap 移除go-linq依赖 Get GoStream go get

Nov 21, 2022

A stream processing API for Go (alpha)

A stream processing API for Go (alpha)

A data stream processing API for Go (alpha) Automi is an API for processing streams of data using idiomatic Go. Using Automi, programs can process str

Dec 28, 2022

A lightweight stream processing library for Go

A lightweight stream processing library for Go

go-streams A lightweight stream processing library for Go. go-streams provides a simple and concise DSL to build data pipelines. Wiki In computing, a

Dec 31, 2022

Build platforms that flexibly mix SQL, batch, and stream processing paradigms

Overview Gazette makes it easy to build platforms that flexibly mix SQL, batch, and millisecond-latency streaming processing paradigms. It enables tea

Dec 12, 2022

💥 Fusion is a tiny stream processing library written in Go.

💥 Fusion Fusion is a tiny stream processing library written in Go. See reactor for a stream processing tool built using fusion. Features Simple & lig

Jun 30, 2021

Processing Nomad Events Stream

Processing Nomad Events Stream

Nomad Events Sink Nomad Events Sink is an events collection agent which uses Nomad Events SDK to fetch events. Events can help debug the cluster state

Dec 19, 2022

Heatmap creation stuff for go.

Heatmap creation stuff for go.

Heatmaps This is a toolkit for creating heatmaps. Heatmaps are awesome. I use them for a few things, and now you can, too. There are quite a few thing

Jan 4, 2023

Gountries provides: Countries (ISO-3166-1), Country Subdivisions(ISO-3166-2), Currencies (ISO 4217), Geo Coordinates(ISO-6709) as well as translations, country borders and other stuff exposed as struct data.

gountries Inspired by the countries gem for ruby. Countries (ISO-3166-1), Country Subdivisions(ISO-3166-2), Currencies (ISO 4217), Geo Coordinates(ISO

Dec 22, 2022

go.pipeline is a utility library that imitates unix pipeline. It simplifies chaining unix commands (and other stuff) in Go.

go.pipeline go.pipeline is a utility library that imitates unix pipeline. It simplifies chaining unix commands (and other stuff) in Go. Installation g

May 8, 2022
Comments
  • Am I correct in observing that map can only map to the same type?

    Am I correct in observing that map can only map to the same type?

    For example []int -> []int, but not []string -> []int. I'm running into the same problem due to the method type parameter rules, so I can't use method chaining on my own Stream API. Curious to understand your abstract system and method/function equality better.

  • constraints package was moved in final go1.18

    constraints package was moved in final go1.18

  • add dropwhile and takewhile

    add dropwhile and takewhile

    You should probably add dropWhile and takeWhile to support ending infinite streams based on some sort of condition. It could also be a good idea to add support for "finite streams" - similar to Generate but lambda should return value and bool.

Related tags
Make TCP connection storm between server and client for benchmarking network stuff

Make TCP connection storm between server and client for benchmarking network stuff

Nov 14, 2021
A library to simplify writing applications using TCP sockets to stream protobuff messages

BuffStreams Streaming Protocol Buffers messages over TCP in Golang What is BuffStreams? BuffStreams is a set of abstraction over TCPConns for streamin

Dec 13, 2022
A toy project to stream from a Remarkable2

goMarkableStream I use this toy project to stream my remarkable 2 (firmware 2.5) on my laptop using the local wifi. video/demo here Quick start You ne

Dec 31, 2022
V3IO Frames ("Frames") is a Golang based remote data frames access (over gRPC or HTTP stream)

V3IO Frames ("Frames") is a multi-model open-source data-access library that provides a unified high-performance DataFrame API for working with different types of data sources (backends). The library was developed by Iguazio to simplify working with data in the Iguazio Data Science Platform ("the platform"), but it can be extended to support additional backend types.

Oct 1, 2022
Stream Camera based on TCP
Stream Camera based on TCP

streamera Term Project of Computer Networking streamera is a Stream Camera based on TCP, which contains client mode and server mode. Features Client M

Nov 11, 2022
Reflex stream client for redis streams

rredis A reflex stream client for a redis streams using the radix client implementation. It provides an API for inserting data into a stream and for c

Oct 5, 2021
Totem - A Go library that can turn a single gRPC stream into bidirectional unary gRPC servers

Totem is a Go library that can turn a single gRPC stream into bidirectional unar

Jan 6, 2023
A simple Go server that broadcasts any data/stream

broadcast A simple Go server that broadcasts any data/stream usage data You can POST data. curl -X POST --data-binary "@111.png" localhost:9222/test.p

Aug 12, 2022
Reads MAWS formatted data and converts it into JSON output stream.

maws2json Usage examples Over serial line (stdin pipe) Lets assume that Vaisala weather station is connected via RS232 to USB serial dongle in /dev/tt

Feb 6, 2022
Reads JSON object (stream) from file/stdin and routes it/them to GCP Pub/Sub topics.

json2pubsub Publish JSON object (stream) into GCP Pub/Sub topic based on a field value. Usage: json2pubsub --project=STRING <mapping> ... Reads JSON

Nov 3, 2022