GoStorm is a Go library that implements the communications protocol required to write Storm spouts and Bolts in Go that communicate with the Storm shells.

gostorm

Build Status

godocs

GoStorm is a Go library that implements the communications protocol required for non-Java languages to communicate as part of a storm topology. In other words, gostorm lets you write Storm spouts and bolts in Go.

GoStorm also correctly handles the asynchronous behaviour of bolts i.t.o. task Ids that might not be delivered directly after an emission.

Currently, the main purpose of GoStorm is to act as a library for Go spouts and bolts that wish to interact with a Storm topology. The topology definition itself should still be done in Java and a shell spout or bolt is still required for each Go spout or bolt. See the storm-starter WordCountTopology on how to do this.

For testing and evaluation purposes, the GoStorm release also contains its own splitsentence.go implementation that replaces the python splitsentence.py implementation in the storm-starter WordCountTopology. GoStorm also contains mock output collector implementations that allows a developer to test her code code, before having to submit it as part of a Storm cluster.

GoStorm implements (and enforces) the Storm multilang protocol. Apart from implementing the multilang JSON protocol that is used by Storm shell components, GoStorm also implements a protocol buffer binary encoding scheme for improved performance. The protocol buffer encoding requires the Storm protoshell multilang serialiser and Storm 0.9.2 or later.

GoStorm itself requires Storm 0.10.0 or later.

Encoding

GoStorm implements various encoding schemes with varying levels of performance:

  1. jsonobject
  2. jsonencoded
  3. hybrid
  4. protobuf

The jsonobject scheme encodes all objects as JSON objects. This means that in the Storm shell component, Java serialisation is used by Kryo to serialise your objects. This scheme is pretty slow and should generally be avoided.

The jsonencoded scheme first marshals a JSON object and then sends all objects as byte slices. In the Java shell component, Kryo is able to efficiently marshall the byte slices.

The hybrid scheme also sends byte slices, but the user objects are expected to be protocol buffer objects. These protocol buffer byte slices are still sent in the Storm multilang JSON envelope, so the existing Storm shell components can be used. This scheme has the highest performance that is still compatible with the Storm shell components.

The protobuf scheme is a pure protocol buffer encoding and requires specialised Storm ProtoShell components. These ProtoShell components have already been implemented and I'll paste a link soon. The protobuf encoding is a binary encoding scheme that transmits varints followed by byte slices. No text encodings or "end" strings, which makes it more compact.

I would suggest starting with the jsonencoded scheme and benchmarking your application. If the throughput doesn't suit your needs, start converting your project to use protocol buffers. This allows for the hybrid scheme to be used, without requiring any changes to Storm. For best performance, the protobuf encoding can be used, but this requires some changes in the Storm cluster's configuration.

Bolts

This section will describe how to write bolts using the GoStorm library.

Creating a bolt

To create a bolt, you just have to create a Go "object" that implements the Bolt interface:

type Bolt interface {
    FieldsFactory
    Execute(meta stormmsg.BoltMsgMeta, fields ...interface{})
    Prepare(context *stormmsg.Context, collector OutputCollector)
    Cleanup()
}

type FieldsFactory interface {
    Fields() []interface{}
}

Prepare can be used to setup a bolt and will be called once, before a bolt receives any messages. Prepare supplies the bolt with the topology context and the output collector, which the bolt can use to emit messages.

A bolt receives messages with the Execute method. BoltMsgMeta contains information about the received message, namely: id, comp, stream, task. The fields are the tuple fields (objects) that were emitted by the input component.

Cleanup is called if the topology completes. This will only happen during testing, for finite input streams.

The fields factory declares the message types that the bolt expects to receive. In other words, these fields must match the field types of the execute method. Specifically, GoStorm uses these empty objects to marshal received objects into.

To write a bolt, import the following:

import (
    "github.com/jsgilmore/gostorm"
    stormmsg "github.com/jsgilmore/gostorm/messages"
)

The gostorm import contains the spout and bolt interfaces and the messages import contains the required gostorm message types.

Running a bolt

All that remains is for you to write a main method for your bolt. The main method will run the bolt, specify the encoding that might be used and state whether destination task ids are required (more on that later). It will typically look something like this:

package main

import (
    "github.com/jsgilmore/gostorm"
    _ "github.com/jsgilmore/gostorm/encodings"
)

func main() {
    encoding := "jsonEncoded"
    needTaskIds := false
    myBolt := NewMyBolt()
    gostorm.RunBolt(myBolt, encoding, needTaskIds)
}

The gostorm import contains the RunBolt function. The encodings import imports all GoStorm encodings and allows any of them to be specified in the RunBolt method. This also allows you to use a Go flag and specify the encoding to use at runtime.

Emitting tuples

To emit tuples (objects) to another bolt, the bolt output collector is used:

type OutputCollector interface {
    SendAck(id string)
    SendFail(id string)
    Emit(anchors []string, stream string, fields ...interface{}) (taskIds []int32)
    EmitDirect(anchors []string, stream string, directTask int64, fields ...interface{})
}

SendAck acks a received message. SendFail fails a received message.

Emit emits a tuple (set of fields). Emit tuple returns the destination task IDs to which the message was emitted if this option was set in the RunBolt function, otherwise it returns nil.

The parameters required by the Emit function are:

  1. List of tuple IDs to anchor this emission to.
  2. The output stream to emit the tuple on.
  3. A list of objects that should be emitted.

Tuple emissions may be anchored to received tuples. This specifies that the current emission is as a result of the earlier received tuple. An emission may be anchored to multiple received tuples (say you joined some tuples to create a compound tuple), which is why a list is required. An emission does not have to be anchored, in which case the parameter can be set to nil. Anchoring emissions to received tuples will have the effect that the original emission at the spout will only be acked after all resulting emissions have been acked. If any resultant emission is failed, the spout will immediately receive a failure notification from Storm. If an emission fails to be acked within some timeout period (30s default), the spout originating the emission will also receive a failure notification.

If the bolt has a single output stream, the "default" or the empty ("") string can be used.

The EmitDirect function can be used to emit a tuple directly to a task.

Message unions

A union message type is always emitted (myBoltEvent). The union message contains pointers to all the message types that our bolt can emit. Whenever a message is emitted, it is first placed in the union message structure. This way, the receiver always knows what message type to cast to and can then check for a non-nil element in the union message.

The last two objects in the Emit call are the contents of the message that will be transferred to the receiving bolt in the form that they are Emitted here. It is possible to specify any number of objects. In the above example, we specified a ket and a message. The first field will be used to group tuples on as part of a fieldsGrouping that will be shown when the topology definition is shown.

To ensure the "at least once" processing semantics of Storm, every tuple that is receive should be acknowledged, either by an Ack or a Fail. This is done by the SendAck and SendFail functions that is part of the boltConn interface. To enable Storm to build up its ack directed acyclic graph (DAG): no emission may be anchored to a tuple that has already been acked. The Storm topology will panic if this occurs.

Spouts

This section will describe how to write spouts using the GoStorm library.

Creating spouts

Similarly to bolts in GoStorm, to create a spout, the Spout interface should be implemented:

type Spout interface {
	NextTuple()
	Acked(id string)
	Failed(id string)
	Exit()
	Open(context *stormmsg.Context, collector SpoutOutputCollector)
}

When a spouts starts, GoStorm will call Open on it to provide it with the Storm context and a spout output collector.

Spouts in Storm are synchronous, which means a spout has to wait for NextTuple, Acked, or Failed to be called on it, before it may emit messages. GoStorm will never call any spout (or bolt) functions concurrently.

The Acked and Failed functions inform a spout that the tuple emited with the specified ID was acked or failed respectively. When Acked or Failed is called on a spout, emissions are possible, but it should be kept in mind that always emitting tuples when Storm informs a spout of another tuples state may lead to runaway behaviour.

A spout can emit tuples when NextTuple is called on it. It may emit any number of tuples, but the developer should keep in mind that emitting multiple tuples will increase message latency in the topology.

Running a spout

Very similarly to running bolts, a main method has to be created to run the spout, specify the encoding that might be used and state whether destination task ids are required. It will typically look something like this:

package main

import (
    "github.com/jsgilmore/gostorm"
    _ "github.com/jsgilmore/gostorm/encodings"
)

func main() {
    encoding := "jsonEncoded"
    needTaskIds := false
    mySpout := NewMySpout()
    gostorm.RunSpout(mySpout, encoding, needTaskIds)
}

Emitting tuples

type SpoutOutputCollector interface {
    Emit(id string, stream string, fields ...interface{}) (taskIds []int32)
    EmitDirect(id string, stream string, directTask int64, fields ...interface{})
}

A spout can emit tuples using the Emit or EmitDirect functions of the spout output collector.

The parameters required by the Emit function are:

  1. The id of the tuple to emit.
  2. The output stream to emit the tuple on.
  3. A list of objects that should be emitted.

The ID with which the tuple is emitted will be the ID provided in the Acked and Failed functions.

The output stream and object tuple list is the same as with bolt emissions.

Testing without Storm

It's possible to link up GoStorm spouts and bolts using the mockOutputCollector implementations of GoStorm. This does not require a running Storm cluster or indeed anything other than the GoStorm library. Mock output collectors is a basic way of stringing some Storm components together, while manually calling Execute on a bolt to get the topology running. I am hopefull of obtaining a GoStorm local mode controbution within the next few months. The GoStorm local mode will allow spouts and bolts to be connected in a single process and acks and fails are also handled correctly.

Because mock collectors do not connect to a real Storm topology and because the mock collector implementation in GoStorm is still fairly immature, there are some important differences (and shortcomings) between mock components and real components that should be taken into account when testing:

Deploying to Storm

For an example of how to build and deploy a jar containing a Go ShellBolt to a Storm cluster (using the above splitsentence implementation) see https://github.com/sixgill/gostorm-runner.

Comments
  • Small fix for marshaling complex ShellMsgs

    Small fix for marshaling complex ShellMsgs

    Resolves #9 by making use of a map[string]interface{} to allow for marshaling more complex objects (like nested escaped strings or other json objects).

    Ran into an issue (happy to file?) when trying to log of a json object. Included test case demonstrates problem.

  • SpoutConn Emit() should flush and then always readTaskIds

    SpoutConn Emit() should flush and then always readTaskIds

    I think the spout implementation is a bit broken right now.

    The multilang protocol says that non-direct Emits always immediately receive back the task Ids they're sent to, but currently the needsTaskIds property is a property of the struct.

    It seems like needsTaskIds should go away, and the connection should know that whenever a non-direct emit happens, you need to flush, read task ids, and then return those?

  • panic: Storm failed to initialise: EOF

    panic: Storm failed to initialise: EOF

    I'm trying to get a simple Spout running in a local storm installation. The spout is very simple and basically just implements stubs for the required interface:

    var Log = log.NewLogger(os.Stderr, "", 0) // just in case stdout collided with shell IO
    
    type EventSpout struct {}
    
    func (s *EventSpout) NextTuple() {
        Log.Println("Next Tuple")
    }
    
    func (s *EventSpout) Acked(id string) {
        Log.Printf("Acked %s\n\r", id) 
    }
    
    func (s *EventSpout) Failed(id string) {
        Log.Printf("Failed: %s\n\r", id) 
    }
    
    func (s *EventSpout) Exit() {
        Log.Println("Exit")
    }
    
    func (s *EventSpout) Open(context *stormmsg.Context, collector storm.SpoutOutputCollector) {
       Log.Println("Open")
    }
    

    I build the binary then submit to storm as follows:

    ./bin/storm shell resources/ ~/[path-to-bin]/simplespout 
    

    The log shows that the topology was successfully submitted:

    805  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar stormshell6100219.jar to assigned location: storm-local/nimbus/inbox/stormjar-fab538f7-68ae-41df-ab1a-5a780c6b45b1.jar
    814  [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: storm-local/nimbus/inbox/stormjar-fab538f7-68ae-41df-ab1a-5a780c6b45b1.jar
    

    But, I immediately get the following from simplespout:

    panic: Storm failed to initialise: EOF
    

    Here's the stack trace:

    goroutine 1 [running]:
    github.com/jsgilmore/gostorm/core.(*stormConnImpl).Connect(0xc82009eff0)
        /Users/rcadena/go/src/github.com/jsgilmore/gostorm/core/conns.go:105 +0x188
    github.com/jsgilmore/gostorm.(*shellSpoutImpl).Initialise(0xc82009f020, 0xd60760, 0xc82009cfb0)
        /Users/rcadena/go/src/github.com/jsgilmore/gostorm/shellspout.go:45 +0x66
    github.com/jsgilmore/gostorm.RunSpout(0xd60680, 0x772880, 0x500590, 0xb)
        /Users/rcadena/go/src/github.com/jsgilmore/gostorm/storm.go:69 +0x19a
    main.startSpout()
        /Users/rcadena/simplespout/simplespout.go:15 +0x85
    main.run(0x74a2a0, 0xc8200ae550, 0x3, 0x5)
        /Users/rcadena/simplespout/simplespout.go:10 +0x1c
    main.main()
        /Users/rcadena/simplespout/simplespout.go:22 +0x23
    

    Storm: 0.9.6 gostorm: commit number 333313

    Thanks in advance. I'm new to storm. The topology only has a spout, no bolts. I wanted to get that part running at least.

  • Undefined: proto.ErrWrongType

    Undefined: proto.ErrWrongType

    For my investigation I was using your “split sentence.go” example with the “WordCountTopology.java” example. When I run this example (mvn compile exec:java -Dstorm.topology=storm.starter.WordCountTopology) I get the following error:

    This error can also be reproduced by just running the go file on it own.

    github.com/jsgilmore/gostorm/messages /Users//go/src/github.com/jsgilmore/gostorm/messages/messages.pb.go:266: undefined: proto.ErrWrongType /Users//go/src/github.com/jsgilmore/gostorm/messages/messages.pb.go:288: undefined: proto.ErrWrongType /Users//go/src/github.com/jsgilmore/gostorm/messages/messages.pb.go:349: undefined: proto.ErrWrongType /Users//go/src/github.com/jsgilmore/gostorm/messages/messages.pb.go:364: undefined: proto.ErrWrongType /Users//go/src/github.com/jsgilmore/gostorm/messages/messages.pb.go:426: undefined: proto.ErrWrongType /Users//go/src/github.com/jsgilmore/gostorm/messages/messages.pb.go:448: undefined: proto.ErrWrongType /Users//go/src/github.com/jsgilmore/gostorm/messages/messages.pb.go:509: undefined: proto.ErrWrongType /Users//go/src/github.com/jsgilmore/gostorm/messages/messages.pb.go:531: undefined: proto.ErrWrongType /Users//go/src/github.com/jsgilmore/gostorm/messages/messages.pb.go:558: undefined: proto.ErrWrongType /Users//go/src/github.com/jsgilmore/gostorm/messages/messages.pb.go:620: undefined: proto.ErrWrongType /Users//go/src/github.com/jsgilmore/gostorm/messages/messages.pb.go:620: too many errors

    I could not find any reference on "ErrWrongType" on google documentation.

    In the WordCountTopology.java I have just changed the shell that was using python to use go instead:

    public static class SplitSentence extends ShellBolt implements IRichBolt {

    public SplitSentence() {
      super("go","run", "splitsentence.go");
    }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }
    
    @Override
    public Map<String, Object> getComponentConfiguration() {
      return null;
    }
    

    }

    I am not sure why I am getting the above error and I not able to find proto.ErrWrongType either (not sure if its been deprecated). I would very much appreciate it if you could quickly have a look.

  • Adds json marshaling of the need_task_ids field to json_object

    Adds json marshaling of the need_task_ids field to json_object

    I thought that I broke this with e561305e0224e073b3521ebd32332bae39bb222f, but it doesn't look to me like ShellMsg ever included need_task_ids in the message sent, which explains why I was still seeing taskIds returned from spouts and bolts.

    Either way, this adds it to the message sent back to storm (for emits).

  • Confusing bug in splitsentence.go

    Confusing bug in splitsentence.go

    I think you need to send the address of "sentence" in here, right?

    https://github.com/jsgilmore/gostorm/blob/master/example/splitsentence/splitsentence.go#L72

    Was confusing me in trying to get the sample running.

    Cheers!

  • Fixed: do not flush the writer when using fail, ack or sync

    Fixed: do not flush the writer when using fail, ack or sync

    Previously, when using a bolt and sending a fail, ack or sync, gostorm did not flush the current writer. This raised timeout heartbet issues with storm, as storm thinks that the bolt is a zombie as he does not get any message

  • Gostorm now correctly skips task_ids. Reverts the changes made in PR#5

    Gostorm now correctly skips task_ids. Reverts the changes made in PR#5

    This change reverts the changes I made in #5.

    I submitted a fix to gostorm for handling the undocumented "need_task_ids" field of the multilang protocol, which removes the need for us to require task_ids for spouts/bolts.

    This will also allow us to re-enable the event_receiver bolt in the topology.

  • Implement a topology builder.

    Implement a topology builder.

    Currently, the storm topology still has to be defined as a set of Java classes using the storm topology builder. Because Nimbus is a Thrift daemon, it is possible to define a storm topology in Go.

  • Implement bolt taskId returned from Emit asynchronously

    Implement bolt taskId returned from Emit asynchronously

    When you have a bolt with multiple threads it is possible that the storm framework will not reply with taskIds, but with another tuple instead. This means taskIds cannot be returned with the Emit function and should be handled asynchronously somewhere else.

  • Fix index out of range

    Fix index out of range

    In case there is no tuple content, e.g. for heartbeats iterating over the contentStructs will lead to index out of range on contentList[i]

    I have not yet checked if the same problem occurs with other encoding types but I assume so.

  • command activate has yet not implemented on master branch

    command activate has yet not implemented on master branch

    error message: panic: ShellSpout: Unknown command received from Storm: activate goroutine 1 [running]: github.com/jsgilmore/gostorm.(*shellSpoutImpl).Go(0xc0001dfb30)

    branch: master

    storm version: apache-storm-1.2.3

  • Need better example

    Need better example

    Hi,

    can anyone explain how I can use this library? A complete example would be nice. Your splitsentence example differs completely from the description. This is confusing.

    Thanks!

  • Bolts that accept multiple streams cannot differentiate expected fields

    Bolts that accept multiple streams cannot differentiate expected fields

    either Fields() needs to take a string that can help decide which interfaces get returned, or a new method FieldsForStream(string)? needs to be added to the interface that allows the bolt to return fields dependent on the streamId passed.

A golang package to communicate with HipChat over XMPP

hipchat This is a abstraction in golang to Hipchat's implementation of XMPP. It communicates over TLS and requires zero knowledge of XML or the XMPP p

Jan 3, 2023
This project implements a Go client library for the Hipchat API.

Hipchat This project implements a Go client library for the Hipchat API (API version 2 is not supported). Pull requests are welcome as the API is limi

Jan 3, 2023
This library implements the pub/sub pattern in a generic way. It uses Go's generic types to declare the type of the event.

observer This library implements the pub/sub pattern in a generic way. It uses Go's generic types to declare the type of the event. Usage go get githu

Nov 16, 2022
Package rhymen/go-whatsapp implements the WhatsApp Web API to provide a clean interface for developers

go-whatsapp Package rhymen/go-whatsapp implements the WhatsApp Web API to provide a clean interface for developers. Big thanks to all contributors of

Mar 19, 2022
Godaddy - This package implements the libdns interfaces for the Godaddy API

Godaddy for libdns This package implements the libdns interfaces for the Godaddy

Nov 8, 2022
Wechat Pay SDK(V3) Write by Go.

WechatPay GO(v3) Introduction Wechat Pay SDK(V3) Write by Go. API V3 of Office document is here. Features Signature/Verify messages Encrypt/Decrypt ce

May 23, 2022
Write cloud-agnostic config deployed across multiple clouds

Multy is the easiest way to deploy multi cloud infrastructure Write cloud-agnostic config deployed across multiple clouds. Let's try to deploy a simpl

Dec 25, 2022
Go library to access geocoding and reverse geocoding APIs

GeoService in Go Code Coverage A geocoding service developed in Go's way, idiomatic and elegant, not just in golang. This product is designed to open

Dec 23, 2022
Go library for accessing trending repositories and developers at Github.
Go library for accessing trending repositories and developers at Github.

go-trending A package to retrieve trending repositories and developers from Github written in golang. This package were inspired by rochefort/git-tren

Dec 21, 2022
ANT, ANT+, ANT-FS library and ANT-USB driver written in Go

go-ant ANT, ANT+, ANT-FS library and ANT-USB driver written in Go Instalation Install with: go get -u github.com/purpl3F0x/go-ant Import to project im

Dec 7, 2021
Go library to interface with Solana JSON RPC and WebSocket interfaces
Go library to interface with Solana JSON RPC and WebSocket interfaces

Solana SDK library for Go Go library to interface with Solana JSON RPC and WebSocket interfaces. Clients for Solana native programs, Solana Program Li

Mar 2, 2022
Simple and fast webp library for golang
Simple and fast webp library for golang

go-webp Golang Webp library for encoding and decoding, using C binding for Google libwebp Requirements libwebp Benchmarks % go test -bench "^Benchmark

Dec 28, 2022
Go Client Library for Amazon Product Advertising API

go-amazon-product-advertising-api Go Client Library for Amazon Product Advertising API How to Use go get -u github.com/ngs/go-amazon-product-advertisi

Sep 27, 2022
A Go client library for the Twitter 1.1 API

Anaconda Anaconda is a simple, transparent Go package for accessing version 1.1 of the Twitter API. Successful API queries return native Go structs th

Jan 1, 2023
Go library for http://www.brewerydb.com/ API

brewerydb brewerydb is a Go library for accessing the BreweryDB API usage import "github.com/naegelejd/brewerydb" Construct a new Client using your Br

Sep 27, 2022
Go(lang) client library for Cachet (open source status page system).

cachet Go(lang) client library for Cachet (open source status page system). Features Full API support Components Incidents Metrics Subscribers Various

Sep 27, 2022
Go library for interacting with CircleCI

go-circleci Go library for interacting with CircleCI's API. Supports all current API endpoints allowing you do do things like: Query for recent builds

Nov 26, 2022
Clarifai library for Go

Clarifai Golang Library Library for our v1 API. Disclaimer This API client only supports Clarifai v1 API. Stay tuned for the v2 support. Usage go get

Sep 27, 2022