Reactive Extensions for the Go language.

RxGo

CI Go Report Card Join the chat at https://gitter.im/ReactiveX/RxGo

Reactive Extensions for the Go Language

ReactiveX

ReactiveX, or Rx for short, is an API for programming with Observable streams. This is the official ReactiveX API for the Go language.

ReactiveX is a new, alternative way of asynchronous programming to callbacks, promises, and deferred. It is about processing streams of events or items, with events being any occurrences or changes within the system. A stream of events is called an Observable.

An operator is a function that defines an Observable, how and when it should emit data. The list of operators covered is available here.

RxGo

The RxGo implementation is based on the concept of pipelines. A pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function.

Let's see a concrete example with each box being an operator:

  • We create a static Observable based on a fixed list of items using the Just operator.
  • We define a transformation function (convert a circle into a square) using the Map operator.
  • We filter each yellow square using the Filter operator.

In this example, the final items are sent in a channel, available to a consumer. There are many ways to consume or to produce data using RxGo. Publishing the results in a channel is only one of them.

Each operator is a transformation stage. By default, everything is sequential. Yet, we can leverage modern CPU architectures by defining multiple instances of the same operator. Each operator instance being a goroutine connected to a common channel.

The philosophy of RxGo is to implement the ReactiveX concepts and leverage the main Go primitives (channels, goroutines, etc.) so that the integration between the two worlds is as smooth as possible.

Installation of RxGo v2

go get -u github.com/reactivex/rxgo/v2

Getting Started

Hello World

Let's create our first Observable and consume an item:

observable := rxgo.Just("Hello, World!")()
ch := observable.Observe()
item := <-ch
fmt.Println(item.V)

The Just operator creates an Observable from a static list of items. Of(value) creates an item from a given value. If we want to create an item from an error, we have to use Error(err). This is a difference with the v1 that was accepting a value or an error directly without having to wrap it. What's the rationale for this change? It is to prepare RxGo for the generics feature coming (hopefully) in Go 2.

By the way, the Just operator uses currying as syntactic sugar. This way, it accepts multiple items in the first parameter list and multiple options in the second parameter list. We'll see below how to specify options.

Once the Observable is created, we can observe it using Observe(). By default, an Observable is lazy in the sense that it emits items only once a subscription is made. Observe() returns a <-chan rxgo.Item.

We consumed an item from this channel and printed its value of the item using item.V.

An item is a wrapper on top of a value or an error. We may want to check the type first like this:

item := <-ch
if item.Error() {
    return item.E
}
fmt.Println(item.V)

item.Error() returns a boolean indicating whether an item contains an error. Then, we use either item.E to get the error or item.V to get the value.

By default, an Observable is stopped once an error is produced. However, there are special operators to deal with errors (e.g., OnError, Retry, etc.)

It is also possible to consume items using callbacks:

observable.ForEach(func(v interface{}) {
    fmt.Printf("received: %v\n", v)
}, func(err error) {
    fmt.Printf("error: %e\n", err)
}, func() {
    fmt.Println("observable is closed")
})

In this example, we passed three functions:

  • A NextFunc triggered when a value item is emitted.
  • An ErrFunc triggered when an error item is emitted.
  • A CompletedFunc triggered once the Observable is completed.

ForEach is non-blocking. Yet, it returns a notification channel that will be closed once the Observable completes. Hence, to make the previous code blocking, we simply need to use <-:

<-observable.ForEach(...)

Real-World Example

Let's say we want to implement a stream that consumes the following Customer structure:

type Customer struct {
	ID             int
	Name, LastName string
	Age            int
	TaxNumber      string
}

We create a producer that will emit Customers to a given chan rxgo.Item and create an Observable from it:

// Create the input channel
ch := make(chan rxgo.Item)
// Data producer
go producer(ch)

// Create an Observable
observable := rxgo.FromChannel(ch)

Then, we need to perform the two following operations:

  • Filter the customers whose age is below 18.
  • Enrich each customer with a tax number. Retrieving a tax number is done, for example, by an IO-bound function doing an external REST call.

As the enriching step is IO-bound, it might be interesting to parallelize it within a given pool of goroutines. Yet, let's imagine that all the Customer items need to be produced sequentially based on its ID.

observable.
	Filter(func(item interface{}) bool {
		// Filter operation
		customer := item.(Customer)
		return customer.Age > 18
	}).
	Map(func(_ context.Context, item interface{}) (interface{}, error) {
		// Enrich operation
		customer := item.(Customer)
		taxNumber, err := getTaxNumber(customer)
		if err != nil {
			return nil, err
		}
		customer.TaxNumber = taxNumber
		return customer, nil
	},
		// Create multiple instances of the map operator
		rxgo.WithPool(pool),
		// Serialize the items emitted by their Customer.ID
		rxgo.Serialize(func(item interface{}) int {
			customer := item.(Customer)
			return customer.ID
		}), rxgo.WithBufferedChannel(1))

In the end, we consume the items using ForEach() or Observe() for example. Observe() returns a <-chan Item:

for customer := range observable.Observe() {
	if customer.Error() {
		return err
	}
	fmt.Println(customer)
}

Observable Types

Hot vs. Cold Observables

In the Rx world, there is a distinction between cold and hot Observables. When the data is produced by the Observable itself, it is a cold Observable. When the data is produced outside the Observable, it is a hot Observable. Usually, when we don't want to create a producer over and over again, we favour a hot Observable.

In RxGo, there is a similar concept.

First, let's create a hot Observable using FromChannel operator and see the implications:

ch := make(chan rxgo.Item)
go func() {
    for i := 0; i < 3; i++ {
        ch <- rxgo.Of(i)
    }
    close(ch)
}()
observable := rxgo.FromChannel(ch)

// First Observer
for item := range observable.Observe() {
    fmt.Println(item.V)
}

// Second Observer
for item := range observable.Observe() {
    fmt.Println(item.V)
}

The result of this execution is:

0
1
2

It means the first Observer already consumed all items. And nothing left for others.
Though this behavior can be altered with Connectable Observables.
The main point here is the goroutine produced those items.

On the other hand, let's create a cold Observable using Defer operator:

observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {
    for i := 0; i < 3; i++ {
        ch <- rxgo.Of(i)
    }
}})

// First Observer
for item := range observable.Observe() {
    fmt.Println(item.V)
}

// Second Observer
for item := range observable.Observe() {
    fmt.Println(item.V)
}

Now, the result is:

0
1
2
0
1
2

In the case of a cold observable, the stream was created independently for every Observer.

Again, hot vs cold Observables are not about how you consume items, it's about where data is produced.
Good example for hot Observable are price ticks from a trading exchange.
And if you teach an Observable to fetch products from a database, then yield them one by one, you will create the cold Observable.

Backpressure

There is another operator called FromEventSource that creates an Observable from a channel. The difference between FromChannel operator is that as soon as the Observable is created, it starts to emit items regardless if there is an Observer or not. Hence, the items emitted by an Observable without Observer(s) are lost (while they are buffered with FromChannel operator).

A use case with FromEventSource operator is, for example, telemetry. We may not be interested in all the data produced from the very beginning of a stream—only the data since we started to observe it.

Once we start observing an Observable created with FromEventSource, we can configure the backpressure strategy. By default, it is blocking (there is a guaranteed delivery for the items emitted after we observe it). We can override this strategy this way:

observable := rxgo.FromEventSource(input, rxgo.WithBackPressureStrategy(rxgo.Drop))

The Drop strategy means that if the pipeline after FromEventSource was not ready to consume an item, this item is dropped.

By default, a channel connecting operators is non-buffered. We can override this behaviour like this:

observable.Map(transform, rxgo.WithBufferedChannel(42))

Each operator has an opts ...Option parameter allowing to pass such options.

Lazy vs. Eager Observation

The default observation strategy is lazy. It means an operator processes the items emitted by an Observable once we start observing it. We can change this behaviour this way:

observable := rxgo.FromChannel(ch).Map(transform, rxgo.WithObservationStrategy(rxgo.Eager))

In this case, the Map operator is triggered whenever an item is produced, even without any Observer.

Sequential vs. Parallel Operators

By default, each operator is sequential. One operator being one goroutine instance. We can override it using the following option:

observable.Map(transform, rxgo.WithPool(32))

In this example, we create a pool of 32 goroutines that consume items concurrently from the same channel. If the operation is CPU-bound, we can use the WithCPUPool() option that creates a pool based on the number of logical CPUs.

Connectable Observable

A Connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when its connect() method is called. In this way, you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items.

Let's create a Connectable Observable using rxgo.WithPublishStrategy:

ch := make(chan rxgo.Item)
go func() {
	ch <- rxgo.Of(1)
	ch <- rxgo.Of(2)
	ch <- rxgo.Of(3)
	close(ch)
}()
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

Then, we create two Observers:

observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
	return i.(int) + 1, nil
}).DoOnNext(func(i interface{}) {
	fmt.Printf("First observer: %d\n", i)
})

observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
	return i.(int) * 2, nil
}).DoOnNext(func(i interface{}) {
	fmt.Printf("Second observer: %d\n", i)
})

If observable was not a Connectable Observable, as DoOnNext creates an Observer, the source Observable would have begun emitting items. Yet, in the case of a Connectable Observable, we have to call Connect():

observable.Connect()

Once Connect() is called, the Connectable Observable begins to emit items.

There is another important change with a regular Observable. A Connectable Observable publishes its items. It means all the Observers receive a copy of the items.

Here is an example with a regular Observable:

ch := make(chan rxgo.Item)
go func() {
	ch <- rxgo.Of(1)
	ch <- rxgo.Of(2)
	ch <- rxgo.Of(3)
	close(ch)
}()
// Create a regular Observable
observable := rxgo.FromChannel(ch)

// Create the first Observer
observable.DoOnNext(func(i interface{}) {
	fmt.Printf("First observer: %d\n", i)
})

// Create the second Observer
observable.DoOnNext(func(i interface{}) {
	fmt.Printf("Second observer: %d\n", i)
})
First observer: 1
First observer: 2
First observer: 3

Now, with a Connectable Observable:

ch := make(chan rxgo.Item)
go func() {
	ch <- rxgo.Of(1)
	ch <- rxgo.Of(2)
	ch <- rxgo.Of(3)
	close(ch)
}()
// Create a Connectable Observable
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

// Create the first Observer
observable.DoOnNext(func(i interface{}) {
	fmt.Printf("First observer: %d\n", i)
})

// Create the second Observer
observable.DoOnNext(func(i interface{}) {
	fmt.Printf("Second observer: %d\n", i)
})

disposed, cancel := observable.Connect()
go func() {
	// Do something
	time.Sleep(time.Second)
	// Then cancel the subscription
	cancel()
}()
// Wait for the subscription to be disposed
<-disposed
Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3

Observable, Single, and Optional Single

An Iterable is an object that can be observed using Observe(opts ...Option) <-chan Item.

An Iterable can be either:

  • An Observable: emit 0 or multiple items
  • A Single: emit 1 item
  • An Optional Single: emit 0 or 1 item

Documentation

Package documentation: https://pkg.go.dev/github.com/reactivex/rxgo/v2

Assert API

How to use the assert API to write unit tests while using RxGo.

Operator Options

Operator options

Creating Observables

  • Create — create an Observable from scratch by calling Observer methods programmatically
  • Defer — do not create the Observable until the Observer subscribes, and create a fresh Observable for each Observer
  • Empty/Never/Thrown — create Observables that have very precise and limited behaviour
  • FromChannel — create an Observable based on a lazy channel
  • FromEventSource — create an Observable based on an eager channel
  • Interval — create an Observable that emits a sequence of integers spaced by a particular time interval
  • Just — convert a set of objects into an Observable that emits that or those objects
  • JustItem — convert one object into a Single that emits this object
  • Range — create an Observable that emits a range of sequential integers
  • Repeat — create an Observable that emits a particular item or sequence of items repeatedly
  • Start — create an Observable that emits the return value of a function
  • Timer — create an Observable that completes after a specified delay

Transforming Observables

  • Buffer — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time
  • FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
  • GroupBy — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key
  • GroupByDynamic — divide an Observable into a dynamic set of Observables that each emit GroupedObservables from the original Observable, organized by key
  • Map — transform the items emitted by an Observable by applying a function to each item
  • Marshal — transform the items emitted by an Observable by applying a marshalling function to each item
  • Scan — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
  • Unmarshal — transform the items emitted by an Observable by applying an unmarshalling function to each item
  • Window — apply a function to each item emitted by an Observable, sequentially, and emit each successive value

Filtering Observables

  • Debounce — only emit an item from an Observable if a particular timespan has passed without it emitting another item
  • Distinct/DistinctUntilChanged — suppress duplicate items emitted by an Observable
  • ElementAt — emit only item n emitted by an Observable
  • Filter — emit only those items from an Observable that pass a predicate test
  • Find — emit the first item passing a predicate, then complete
  • First/FirstOrDefault — emit only the first item or the first item that meets a condition from an Observable
  • IgnoreElements — do not emit any items from an Observable but mirror its termination notification
  • Last/LastOrDefault — emit only the last item emitted by an Observable
  • Sample — emit the most recent item emitted by an Observable within periodic time intervals
  • Skip — suppress the first n items emitted by an Observable
  • SkipLast — suppress the last n items emitted by an Observable
  • Take — emit only the first n items emitted by an Observable
  • TakeLast — emit only the last n items emitted by an Observable

Combining Observables

  • CombineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
  • Join — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable
  • Merge — combine multiple Observables into one by merging their emissions
  • StartWithIterable — emit a specified sequence of items before beginning to emit the items from the source Iterable
  • ZipFromIterable — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

Error Handling Operators

  • Catch — recover from an onError notification by continuing the sequence without error
  • Retry/BackOffRetry — if a source Observable sends an onError notification, resubscribe to it in the hopes that it will complete without error

Observable Utility Operators

  • Do - register an action to take upon a variety of Observable lifecycle events
  • Run — create an Observer without consuming the emitted items
  • Send — send the Observable items in a specific channel
  • Serialize — force an Observable to make serialized calls and to be well-behaved
  • TimeInterval — convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions
  • Timestamp — attach a timestamp to each item emitted by an Observable

Conditional and Boolean Operators

  • All — determine whether all items emitted by an Observable meet some criteria
  • Amb — given two or more source Observables, emit all of the items from only the first of these Observables to emit an item
  • Contains — determine whether an Observable emits a particular item or not
  • DefaultIfEmpty — emit items from the source Observable, or a default item if the source Observable emits nothing
  • SequenceEqual — determine whether two Observables emit the same sequence of items
  • SkipWhile — discard items emitted by an Observable until a specified condition becomes false
  • TakeUntil — discard items emitted by an Observable after a second Observable emits an item or terminates
  • TakeWhile — discard items emitted by an Observable after a specified condition becomes false

Mathematical and Aggregate Operators

  • Average — calculates the average of numbers emitted by an Observable and emits this average
  • Concat — emit the emissions from two or more Observables without interleaving them
  • Count — count the number of items emitted by the source Observable and emit only this value
  • Max — determine, and emit, the maximum-valued item emitted by an Observable
  • Min — determine, and emit, the minimum-valued item emitted by an Observable
  • Reduce — apply a function to each item emitted by an Observable, sequentially, and emit the final value
  • Sum — calculate the sum of numbers emitted by an Observable and emit this sum

Operators to Convert Observables

Contributing

All contributions are very welcome! Be sure you check out the contributing guidelines first. Newcomers can take a look at ongoing issues and check for the help needed label.

Also, if you publish a post about RxGo, please let us know. We would be glad to include it in the External Resources section.

Thanks to all the people who already contributed to RxGo!

External Resources

Owner
ReactiveX
Reactive Extensions for Async Programming
ReactiveX
Comments
  • How to filter on subscribers?

    How to filter on subscribers?

    Observables can do many things on the items/messages they handle and then notify all subscribers.

    However, how to do an Observable that will only notify a sub-set of all subscribers based on some condition(s)?

    For example, how to notify only "2D" subscribers that were hit by a mouse-click?

  • RxGo v2

    RxGo v2

    This issue is a placeholder to discuss the future v2 release.

    I know for example migrating Observable, Observer etc. should be part of the v1 according to this card: https://github.com/ReactiveX/RxGo/projects/1#card-6433015

    Yet, if we do that it will not be backward compatible anymore with the existing API. For example, an user cannot wait anymore on the end of an Observable by doing a:

    <-myObservable
    

    Regarding the v2, I've started to implement few stuf which are part of this PR: https://github.com/ReactiveX/RxGo/pull/95. In a nutshell, it contains a migration of the main types into interfaces, a creation of new types (Optional, Single etc.) and new operators. Is this PR going in the right direction for the v2?

    What else should we include?

    Furthermore, what are the plans for the v2 in terms of scheduling? Shall we release something before or after Go 2?

  • Observable emits items only to first Observer reading from the wrapped channel

    Observable emits items only to first Observer reading from the wrapped channel

    Due to Observable being only a simple <-chan interface{} wrapper, items sent to the channel will only be received by one of the subscribed Observers.

    The expected behavior would be for every Observer to receive the same items sent to the underlying channel. This is how it works in other ReactiveX implementations.

  • Blocking connect operator discrepancy

    Blocking connect operator discrepancy

    As we know, method connectable.From accepts an iter object. But if this iter object is channel, nothing will happen until this chan is closed. As shown in the code below:

    ch:=make(chan interface{}, 10)
    //var ch = []interface{}{1,2,3,4}  is work
    go func() {
        for{
            ch<-time.Now().Unix()
            time.Sleep(1*time.Second)
        }
    }()
    
    iter, _ := iterable.New(ch)
    
    co := connectable.From(iter)
    
    co = co.Do(func(item interface{}) {
        // will never printed
        fmt.Println("svc1", item)
    })
    
    <-co.Connect()
    

    I just implemented it in my fork (even though it passed the test cases, but with many other flaws). Or can we find out a better solution? https://github.com/dalianzhu/RxGo/blob/master/connectable/connectable.go

  • Implement the defer() function as described in Rx

    Implement the defer() function as described in Rx

    We need a way to create an Observable that 'lazily' emits items only when subscribed to.

    Implement the defer() function as described in Rx: http://reactivex.io/documentation/operators/defer.html

    (Is there any way the current code base emit items only when subscribed to?)

  • Feature request: Flatmap

    Feature request: Flatmap

    As per contribution guidelines, I'm opening an issue for discussion regarding the implementation of Flatmap.

    Per Documentation:

    The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.

    A naive implementation could look like

    func (o Observable) FlatMap(apply fx.FlatMappableFunc) Observable {
    	out := make(chan interface{})
    	go func() {
    		for item := range o {
                            go func(sub Observable) {
                                    handler := observer.Observer{
                                            NextHandler: func(i interface{}) {
                                                    out <- i
                                            },
                                            ErrHandler: func(err error) {
                                                    out <- err
                                            },
                                    }
                                    s := sub.Subscribe(handler)
                                    <-s
                            }(apply(item))
    		}           
    		close(out)
    	}()
    	return Observable(out)
    }
    

    I see that the dev branch has a Merge function, however I'm not sure how to exploit the underlying reflect.SelecCase with a dynamic number of Observables. Creating a subscriber (and a go routine) for each element seems sub-optimal, but out of the top of my head I'm not aware of a different solution.

    I'd be happy to submit a PR once agreement of the implementation is reached.

  • Add Observable variant `Single`

    Add Observable variant `Single`

    RxJava and other have implemented the variant called Single: http://reactivex.io/documentation/single.html

    I believe since golang is heavily used in developing Restful APIs the Single pattern will be really effective in developing microservices where to satisfy a single Rest call the server has to actual coordinate with multiple backends. In this scenario the Single patter will allow for asynchronous handling of Rest requests, making the code a lot more declarative.

  • Sample operator

    Sample operator

    Hi, I've implemented Sample operator + tests. I've chosen the second variant described in the documentation (that is Observable as an argument), because it is more general and the operator can be used in a broader range.

  • Hot vs cold observable

    Hot vs cold observable

    In the v2 branch we started to make a clear distinction between hot and cold observable. This issue is a placeholder to discuss about the ongoing implementation, the API, the impacts etc.

  • improve test coverage and fix on error behavior

    improve test coverage and fix on error behavior

    I created this pull request to improve test coverage of create operator, I've added to ease example writing. Besides that I corrected behavior of onerror for create operator (onerror signals the terminal state) and the example.

    Thanks @avelino for your help.

  • Panic handling

    Panic handling

    Hi, first of all, great initiative. Rx is something that should definitely be implemented in Go. There should be maybe some kind of panic error handling in handlers. For example, basic implementation would be something like:

    // OnNext applies Observer's NextHandler to an Item
    func (ob Observer) OnNext(item interface{}) {
    	defer func() {
    		if (recover() != nil) {
    			ob.OnError(errors.New(7, "Panicked!"))
    		}
    	}()
    	switch item := item.(type) {
    	case error:
    		return
    	default:
    		if ob.NextHandler != nil {
    			ob.NextHandler(item)
    		}
    	}
    }
    

    A better approach would be an optional panic handler, as the deffer is not very performant.

  • Add `Subject` to rxgo

    Add `Subject` to rxgo

    I miss the subject under rxjs

    It is a bidirectional stream, could be emit new item via the next() method. https://rxjs.dev/guide/subject when that. i can share this subject object to other method to emit item or observe it.

    I tried to simulate this method But it is inevitable that one more channel is created

    type Subject interface {
    	rxgo.Observable
    	Next(interface{})
    }
    
    type SubjectImpl struct {
    	rxgo.Observable
    	channel chan rxgo.Item
    }
    
    func (subject *SubjectImpl) Next(i interface{}) {
    	rxgo.Of(i).SendBlocking(subject.channel)
    }
    
    func NewSubject(opts ...rxgo.Option) Subject {
    	ch := make(chan rxgo.Item)
    	obs := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
    		defer close(ch)
    		for {
    			select {
    			case item := <-ch:
    				item.SendBlocking(next)
    			case <-ctx.Done():
    				return
    			}
    		}
    	}}, opts...)
    
    	return &SubjectImpl{
    		channel:    ch,
    		Observable: obs,
    	}
    }
    
  • Bump actions/checkout from 2.4.0 to 3.2.0

    Bump actions/checkout from 2.4.0 to 3.2.0

    Bumps actions/checkout from 2.4.0 to 3.2.0.

    Release notes

    Sourced from actions/checkout's releases.

    v3.2.0

    What's Changed

    New Contributors

    Full Changelog: https://github.com/actions/checkout/compare/v3...v3.2.0

    v3.1.0

    What's Changed

    New Contributors

    Full Changelog: https://github.com/actions/checkout/compare/v3.0.2...v3.1.0

    v3.0.2

    What's Changed

    Full Changelog: https://github.com/actions/checkout/compare/v3...v3.0.2

    v3.0.1

    v3.0.0

    • Updated to the node16 runtime by default
      • This requires a minimum Actions Runner version of v2.285.0 to run, which is by default available in GHES 3.4 or later.

    v2.5.0

    What's Changed

    Full Changelog: https://github.com/actions/checkout/compare/v2...v2.5.0

    ... (truncated)

    Changelog

    Sourced from actions/checkout's changelog.

    Changelog

    v3.1.0

    v3.0.2

    v3.0.1

    v3.0.0

    v2.3.1

    v2.3.0

    v2.2.0

    v2.1.1

    • Changes to support GHES (here and here)

    v2.1.0

    v2.0.0

    ... (truncated)

    Commits

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
  • Bump actions/setup-go from 3.2.0 to 3.5.0

    Bump actions/setup-go from 3.2.0 to 3.5.0

    Bumps actions/setup-go from 3.2.0 to 3.5.0.

    Release notes

    Sourced from actions/setup-go's releases.

    Add support for stable and oldstable aliases

    In scope of this release we introduce aliases for the go-version input. The stable alias instals the latest stable version of Go. The oldstable alias installs previous latest minor release (the stable is 1.19.x -> the oldstable is 1.18.x).

    Stable

    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-go@v3
        with:
          go-version: 'stable'
      - run: go run hello.go
    

    OldStable

    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-go@v3
        with:
          go-version: 'oldstable'
      - run: go run hello.go
    

    Add support for go.work and pass the token input through on GHES

    In scope of this release we added support for go.work file to pass it in go-version-file input.

    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-go@v3
        with:
          go-version-file: go.work
      - run: go run hello.go
    

    Besides, we added support to pass the token input through on GHES.

    Fix cache issues and update dependencies

    In scope of this release we fixed the issue with the correct generation of the cache key when the go-version-file input is set (actions/setup-go#267). Moreover, we fixed an issue when the cache folder was not found. Besides, we updated actions/core to 1.10.0 version (actions/setup-go#273).

    Support architecture input and fix Expand-Archive issue

    This release introduces support for architecture input for setup-go action #253. It also adds support for arm32 architecture for self-hosted runners. If architecture is not provided action will use default runner architecture. Example of usage:

    steps:
    - uses: actions/checkout@v3
    - uses: actions/setup-go@v3
    </tr></table> 
    

    ... (truncated)

    Commits
    • 6edd440 fix log for stable aliases (#303)
    • 38dbe75 Add stable and oldstable aliases (#300)
    • 30c39bf Merge pull request #301 from jongwooo/chore/use-cache-in-check-dist
    • 8377b69 Use cache in check-dist.yml
    • d0a58c1 Merge pull request #294 from JamesMGreene/patch-1
    • 3dcd9d6 Update to latest actions/publish-action
    • e983b65 Merge pull request #283 from koba1t/add_support_gowork_for_go-version-file
    • 27b43e1 Pass the token input through on GHES (#277)
    • 7678c83 add support gowork for go-version-file
    • c4a742c fix(): cache resolve version input (#267)
    • Additional commits viewable in compare view

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
  • Bump github.com/cenkalti/backoff/v4 from 4.1.1 to 4.2.0

    Bump github.com/cenkalti/backoff/v4 from 4.1.1 to 4.2.0

    Bumps github.com/cenkalti/backoff/v4 from 4.1.1 to 4.2.0.

    Commits
    • e5c9822 Remove travis
    • 911f539 Added GitHub Actions
    • d815f46 Added variants of Retry* methods that return data in addition to an error
    • 6b0e4ad make sure no randomness is used when randomizationFactor is 0 (#118)
    • See full diff in compare view

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
  • Bump golangci/golangci-lint-action from 2.5.2 to 3.3.1

    Bump golangci/golangci-lint-action from 2.5.2 to 3.3.1

    Bumps golangci/golangci-lint-action from 2.5.2 to 3.3.1.

    Release notes

    Sourced from golangci/golangci-lint-action's releases.

    v3.3.1

    What's Changed

    Full Changelog: https://github.com/golangci/golangci-lint-action/compare/v3...v3.3.1

    v3.3.0

    What's Changed

    ... (truncated)

    Commits
    • 0ad9a09 build(deps-dev): bump @​typescript-eslint/parser from 5.41.0 to 5.42.0 (#599)
    • 235ea57 build(deps-dev): bump eslint from 8.26.0 to 8.27.0 (#598)
    • a6ed001 build(deps-dev): bump @​typescript-eslint/eslint-plugin from 5.41.0 to 5.42.0 ...
    • 3a7156a build(deps-dev): bump @​typescript-eslint/parser from 5.40.1 to 5.41.0 (#596)
    • 481f8ba build(deps): bump @​types/semver from 7.3.12 to 7.3.13 (#595)
    • 06edb37 build(deps-dev): bump @​typescript-eslint/eslint-plugin from 5.40.1 to 5.41.0 ...
    • c2f79a7 build(deps): bump @​actions/cache from 3.0.5 to 3.0.6 (#593)
    • d6eac69 build(deps-dev): bump @​typescript-eslint/eslint-plugin from 5.40.0 to 5.40.1 ...
    • 7268434 build(deps-dev): bump eslint from 8.25.0 to 8.26.0 (#591)
    • a926e2b build(deps-dev): bump @​typescript-eslint/parser from 5.40.0 to 5.40.1 (#590)
    • Additional commits viewable in compare view

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
  • Counting items of a Connectable Observable

    Counting items of a Connectable Observable

    Hello, I am trying to count events produced by a Connectable Observable in 2 ways:

    • The total number of events and
    • The number of events that pass a filter.

    Here is some example code:

    package main
    
    import (
    	"context"
    	"fmt"
    
    	"github.com/reactivex/rxgo/v2"
    )
    
    func main() {
    	events := rxgo.Create([]rxgo.Producer{func(_ context.Context, next chan<- rxgo.Item) {
    		next <- rxgo.Of(expensiveReadFromDisk(0))
    		next <- rxgo.Of(expensiveReadFromDisk(1))
    		next <- rxgo.Of(expensiveReadFromDisk(2))
    	}}, rxgo.WithPublishStrategy())
    
    	total := events.Count()
    	filtered := events.Filter(func(i interface{}) bool {
    		return i.(int) > 0
    	}).Count()
    
    	events.Connect(context.Background())
    
    	t, _ := total.Get()
    	fmt.Printf("   Total: %d\n", t.V)
    	f, _ := filtered.Get()
    	fmt.Printf("Filtered: %d\n", f.V)
    }
    
    func expensiveReadFromDisk(e int) int {
    	fmt.Printf("Reading event: %d\n", e)
    	return e
    }
    

    I expected the code to output

    Reading event: 0
    Reading event: 1
    Reading event: 2
       Total: 3
    Filtered: 2
    

    Instead, however, the code outputs only this:

    Reading event: 0
    Reading event: 1
    Reading event: 2
       Total: 3
    

    Then it blocks on the following line and gets stuck forever.

    	f, _ := filtered.Get()
    

    Is this the intended behavior? If yes, what would be the correct way of achieving the intended result? Thank you very much!

Kita is a declarative, reactive GUI toolkit for build cross platform apps with web technology with single codebase
Kita is a declarative, reactive GUI toolkit for build cross platform apps with web technology with single codebase

Kita is a declarative, reactive GUI toolkit for build cross platform apps with web technology with single codebase. Inspired by Flutter, React. S

Apr 18, 2022
UIKit - A declarative, reactive GUI toolkit for build cross platform apps with web technology with single codebase
 UIKit - A declarative, reactive GUI toolkit for build cross platform apps with web technology with single codebase

UIKit - A declarative, reactive GUI toolkit for build cross platform apps with web technology with single codebase

Apr 18, 2022
AVL tree with some useful extensions written in Go

gocover An AVL tree (Adel'son-Vel'skii & Landis) is a binary search tree in which the heights of the left and right subtrees of the root differ by at

Mar 23, 2022
timeutil - useful extensions (Timedelta, Strftime, ...) to the golang's time package

timeutil - useful extensions to the golang's time package timeutil provides useful extensions (Timedelta, Strftime, ...) to the golang's time package.

Dec 22, 2022
The X Go Binding is a low-level API to communicate with the X server. It is modeled on XCB and supports many X extensions.

Note that this project is largely unmaintained as I don't have the time to do or support more development. Please consider using this fork instead: ht

Dec 29, 2022
Interact with Chromium-based browsers' debug port to view open tabs, installed extensions, and cookies
Interact with Chromium-based browsers' debug port to view open tabs, installed extensions, and cookies

WhiteChocolateMacademiaNut Description Interacts with Chromium-based browsers' debug port to view open tabs, installed extensions, and cookies. Tested

Nov 2, 2022
sqlx is a library which provides a set of extensions on go's standard database/sql library

sqlx is a library which provides a set of extensions on go's standard database/sql library. The sqlx versions of sql.DB, sql.TX, sql.Stmt, et al. all leave the underlying interfaces untouched, so that their interfaces are a superset on the standard ones. This makes it relatively painless to integrate existing codebases using database/sql with sqlx.

Jan 7, 2023
Package cae implements PHP-like Compression and Archive Extensions.

Compression and Archive Extensions 中文文档 Package cae implements PHP-like Compression and Archive Extensions. But this package has some modifications de

Jun 16, 2022
Simple SQL extensions for Go

go-sx provides some extensions to the standard library database/sql package. It is designed for those who wish to use the full power of SQL without a heavy abstraction layer.

Aug 31, 2022
Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Dec 28, 2022
Bundle k6 with extensions as fast and easily as possible

xk6bundler xk6bundler is a CLI tool and GitHub Action makes bundle k6 with extensions as fast and easily as possible. Features Build for multiple targ

Nov 29, 2022
Take a list of domains and scan for endpoints, secrets, api keys, file extensions, tokens and more...
Take a list of domains and scan for endpoints, secrets, api keys, file extensions, tokens and more...

Take a list of domains and scan for endpoints, secrets, api keys, file extensions, tokens and more... Coded with ?? by edoardottt. Share on Twitter! P

Dec 25, 2022
ipx provides general purpose extensions to golang's IP functions in net package

ipx ipx is a library which provides a set of extensions on go's standart IP functions in net package. compability with net package ipx is fully compat

May 24, 2021
Kitex byte-dance internal Golang microservice RPC framework with high performance and strong scalability, customized extensions for byte internal.
Kitex byte-dance internal Golang microservice RPC framework with high performance and strong scalability, customized extensions for byte internal.

Kitex 字节跳动内部的 Golang 微服务 RPC 框架,具有高性能、强可扩展的特点,针对字节内部做了定制扩展。

Jan 9, 2023
A tool for moving files into directories by file extensions
A tool for moving files into directories by file extensions

The tool for moving files into directories by file extensions Example before moving structure: moving into same extension dir result: moving into diff

Dec 6, 2021
create temporary Firefox profile, install user.js and extensions, launch Firefox

tmpfox tmpfox is a Firefox wrapper that: Creates a temporary Firefox profile Installs user.js configuration file from Arkenfox for increased privacy a

Jul 27, 2022
PolarDB Cluster Manager is the cluster management component of PolarDB for PostgreSQL, responsible for topology management, high availability, configuration management, and plugin extensions.

What is PolarDB Cluster Manager PolarDB Cluster Manager is the cluster management component of PolarDB for PostgreSQL, responsible for topology manage

Nov 9, 2022
Extensions for the melatonin test framework

melatonin-ext - Extensions for the melatonin test framework These packages extend melatonin to provide additional test contexts for testing various 3r

Nov 27, 2021
alog is a dependency free, zero/minimum memory allocation JSON logger with extensions
alog is a dependency free, zero/minimum memory allocation JSON logger with extensions

Alog (c) 2020-2021 Gon Y Yi. https://gonyyi.com. MIT License Version 1.0.0 Intro Alog was built with a very simple goal in mind: Support Tagging (and

Dec 13, 2021
This package includes various utilities and extensions for your Go code.

Go utilities This package includes various utilities and extensions for your Go code. Inspired by lodash Install go get github.com/murat/go-utils@mast

May 11, 2022