tools for working with streams of data

streamtools

Build Status

4/1/2015 Development for streamtools has waned as our attention has turned towards developing a language paradigm that embraces blocking, types, and more reasonable semantics. Stay tuned.

Streamtools is a graphical toolkit for dealing with streams of data. Streamtools makes it easy to explore, analyse, modify and learn from streams of data.

--

Owner
nytlabs
The New York Times Research & Development group
nytlabs
Comments
  • streamdrill block

    streamdrill block

    This is the first working version of the streamdrill block including an example and a README. Requires the demo from streamdrill.com (runs independently). A startup script is included to run streamdrill with enough resources.

  • architecture to allow for megablocks, agnostic logic

    architecture to allow for megablocks, agnostic logic

    Consider an example flow:

    1. Import from NSQ
    2. Filter
    3. Synchronize
    4. Export to Web Sockets

    The import from NSQ takes a stream from a non-local NSQ and puts in the local NSQ. The Filter then reads from the local NSQ and then publishes to the local NSQ. The Synchronizer reads from the local NSQ and then publishes to the local NSQ, and finally, the Export reads from the local NSQ.

    This would work fine for smaller streams, but the load caused by putting things on and off a local NSQ causes a bunch of redundancy. The thing is that all of the filtering/synchronizing/export logic is still super useful, the only problem is that the logic to speed up the architecture drastically is locked away in binaries that include NSQ readers/publishers.

    I propose an architecture for the design of megablocks

    something like this: /streamtools just contains the structs that we use to deal with Go chan messages. /blocks contains basically everything that is in the root right now (w/ NSQ stuff)

    a file in streamtools would look something like this:

    package streamtools
    
    type Filter struct{
        in chan []byte,
        out chan []byte,
        pattern string
    }
    
    func NewFilter(in chan []byte, out chan[]byte, pattern string){
        this = &Filter{
            in: make(chan []byte),
            out: make(chan []byte),
            pattern: pattern
        }
    
        go func(){
            this.run();
        }
        return this
    }
    
    func (this *Filter) run(){
        for{
            select{
                case in<-:
               // do filter stuff here
    
    }
    

    This way, all of the logic in streamtools becomes agnostic as to how they are implemented. You could use them as part of the streamtools suite, or if you are just handling Go msgs in your own application you can import from /streamtools and use them without NSQ. Or you could chain them together to make megablocks.

    /blocks would be full of NSQ-ready binaries, with really simple code that are basically NSQ wrappers around the streamtools logic. a filer would look something like this:

    import "github.com/nytlabs/stream_tools"
    
    func main(){
        streamtools.NewNSQReader(params, channel A)
        streamtools.NewFilter(pattern, channel A, channel B)
        streamtools.NewNSQPublisher(params, channel B)
    }
    

    and this also means you could do something like

    import "github.com/nytlabs/stream_tools"
    
    func main(){
        streamtools.NewNSQReader(params, channel A)
        streamtools.NewFilter(pattern, channel A, channel B)
        streamtools.NewSynchronizer(channel B, channel C)
        streamtools.NewNSQPublisher(params, channel C)
    }
    

    basically, it allows for streamtools core to be a library that we use in the making of the NSQ-based block binaries. It's also good for when we want to start sharing util functions, like flatten, map, etc

    eh?

  • proposed standards for blocks

    proposed standards for blocks

    Some ideas for the standardization of blocks, heavily inspired by #34 :

    terms

    Block Routine A Block Routine is the func that is called as a go routine that contains only logic. Has a simplejson.Json in chan, a simpleJson out chan, or both. Block Routines are wrapped in Blocks. Block Routines reside as part of the /streamtools library.

    Block Function Block Functions are dependencies for Block Routines . They are not run as go routines and are located in the /streamtools library. It may be a good idea to divorces Block Routines from Block Functions in /streamtools.

    Blocks Blocks provide an NSQ wrapper for Block Routines. Blocks take care of wiring up go channels to NSQ and are responsible for initializing and running Block Routines. Typically, there will be 2-3 go routines per block: an NSQ reader, NSQ publisher, and a Block Routine.

    A block should act as a standalone executable and be able to interface to a standard mode of execution and introspection.

    principles

    one Block Routine per Block All logic for a block should be contained in a single function ( Block Routine ). All Block Routine state should be maintained within that single go routine. All messages in and out of that block should be managed by that single go routine. A Block Routine should not share state with any other go routines, unless it is through a channel.

    Block Routines allow introspection Block Routines should have a chan of some kind that allow reports on what is currently happening in the routine. A health chan may also be nice, to divorce technical stats (in flight, backed up queues, processing time, num msgs processed ) from stats that come from the blocks logic (distribution of X, etc).

    online setting of rules We have yet to standardize how a Block Routine is initialized with rules that govern its logic. I propose that Block Routines should have a rules chan that initializes the logic and signals it to start processing. This would help us in avoiding any kind of flag soup, and potentially allowing for run-time rule changes.

  • Block categorization

    Block categorization

    List the blocks in the GUI as described below.

    Network I/O:

    • fromEmail
    • fromFile
    • fromHTTPGetRequest
    • fromHTTPStream
    • toEmail
    • toFile
    • toHTTPGetRequest
    • webrequest

    Data Stores:

    • redis
    • mongo
    • elasticsearch

    Queues:

    • nsq
    • amqp
    • sqs
    • beanstalk

    Parsers:

    • xml
    • csv

    Stats:

    • count
    • histogram
    • timeseries
    • zipf
    • poisson
    • fft
    • gaussian
    • kullbackLeibler
    • learn
    • linearModel
    • logisticModel
    • movingAverage
    • categorical

    Core:

    • map
    • mask
    • javascript
    • filter
    • cache
    • set
    • join
    • bang
    • ticker
    • queue
    • priority queue
    • toggle
    • tolog
    • unpack
    • pack*

    Embedded Systems:

    • analogpin
    • digitalpin
  • undefined: Asset

    undefined: Asset

    Hi, I have just pulled the latest version from github and tried to compile it:

    ~/Documents/Source/go/src/streamtools$ make
    go get github.com/jteeuwen/go-bindata/...
    go-bindata -pkg=server -o st/server/static_bindata.go gui/...
    cd st/library && go get .
    cd st/server && go get .
    go build -o build/st ./st
    # github.com/nytlabs/streamtools/st/server
    ../github.com/nytlabs/streamtools/st/server/api.go:56: undefined: Asset
    ../github.com/nytlabs/streamtools/st/server/api.go:63: undefined: Asset
    make: *** [build/st] Error 2
    

    OSX Mavericks, go version go1.2.1 darwin/amd64 GOPATH is set to go, PATH contains $GOPATH/bin

    Any ideas?

  • crash

    crash

    panic: runtime error: invalid memory address or nil pointer dereference [signal 0xb code=0x1 addr=0x58 pc=0x72ed0]

    goroutine 51 [running]: runtime.panic(0x554ba0, 0xcf5919) /usr/local/go/src/pkg/runtime/panic.c:266 +0xb6 github.com/nytlabs/streamtools/st/server.funcĀ·001() /Users/202362/go/src/github.com/nytlabs/streamtools/st/server/api.go:228 +0x130 github.com/nytlabs/streamtools/st/server.funcĀ·002(0xc2100922a0, 0xc2101463c0, 0xc2100d6ca8, 0x2, 0xc2100b5b68, ...) /Users/202362/go/src/github.com/nytlabs/streamtools/st/server/api.go:246 +0x138 created by github.com/nytlabs/streamtools/st/server.(*Server).websocketHandler /Users/202362/go/src/github.com/nytlabs/streamtools/st/server/api.go:246 +0x588

  • streamtools splash page with logo to be pushed to our gh-pages branch

    streamtools splash page with logo to be pushed to our gh-pages branch

    a simple splash page, with logo and maybe example video should be put up on our github.io page. the explanation should be brief, and all documentation/howto/installation should point to our wiki (for now). perhaps a link to our binaries as well.

  • Refreshing page with open/busy websocket causes ST to hang

    Refreshing page with open/busy websocket causes ST to hang

    In developing MTA map, I built a page that uses a websocket connection into a block to listen for train locations and map them. If I refresh this page, that block's WS stops reporting, and the UI also hangs. The only remedy is to kill and restart the ST binary.

    Steps to reproduce:

    • Create a pattern where a block emits ~150 messages in bursts
    • Create a page that listens to that block's websocket
    • Refresh that page once the initial connections have been created and tested

    ST pattern UI will not update, and refreshing will hang on "Waiting for localhost." Log will not indicate any errors or warnings, and the only remedy will be to kill and restart the binary. Note that other blocks and their sockets may continue to work even in this hung state.

    Version: 0.2.5

  •  new endpoints for cache block

    new endpoints for cache block

    this adds the following to cache:

    • keys
    • values
    • dump (key/value pairs)

    I renamed the variable 'values' in the cache block to 'cache' so I could use 'values' for the, um, values endpoint.

    I have an open pull request that includes the tutorial commit.

    :8ball:

  • labels labels labels

    labels labels labels

    so hearing about a colleague try to explain her pattern to her boss sounds painful. We need to add labels:

    • labels to blocks, that should also be in modals associated with that block
    • labels to connections, that could live near the rate label
    • floating labels that just live as elements on the page. These seem easiest and doable quickest...
  • fromFile block now uses a poll endpoint to emit a line at a time

    fromFile block now uses a poll endpoint to emit a line at a time

    resolves #456

    previously this block was being a little dumb and emitting each line after you set the rule, giving you no control over how frequently these messages were sent. now it works more like count in that you have to poll it to get the next line(s).

  • 'Correct' Dockerfile

    'Correct' Dockerfile

    Hi !

    I allowed myself to modify the available Dockerfile in order for it to build & run ;)

    Hope it can help

    build : docker build -t streamtools . run (foreground for testing) : docker run --rm -ti -p 7070:7070 streamtools run (daemonized) : docker run -p 7070:7070 streamtools

  • fromhttpstream not saving Endpoint value

    fromhttpstream not saving Endpoint value

    I'm adding a fromhttpstream block to the UI and enter an Endpoint value. Within a few seconds the value disappears and is not saved. It doesn't matter if I hit the Update button or close the popup options. I've tried multiple endpoint values but nothing will save which makes logging impossible.

    I'm using Mac OSX Yosemite and have tried both Safari and Chrome.

  • Denying load of chrome-extension

    Denying load of chrome-extension

    On Chrome Version 42.0.2311.60 beta (64-bit) on OSX I get:

    Denying load of chrome-extension://gkojfkhlekighikafcpjkiklfbnlmeio/js/jquery.min.map. Resources must be listed in the web_accessible_resources manifest key in order to be loaded by pages outside the extension.

  • unpack bizarrely throwing error when importing pattern

    unpack bizarrely throwing error when importing pattern

    When I import any pattern that involves an unpack block, st barfs on it:

    Oct 25 15:42:53 [ 24 ][ ERROR ] "Key was not in rule"

    It's totally valid. Not sure why it's throwing this error.

DataFrames for Go: For statistics, machine-learning, and data manipulation/exploration
DataFrames for Go: For statistics, machine-learning, and data manipulation/exploration

Dataframes are used for statistics, machine-learning, and data manipulation/exploration. You can think of a Dataframe as an excel spreadsheet. This pa

Dec 31, 2022
Package goraph implements graph data structure and algorithms.
Package goraph implements graph data structure and algorithms.

goraph Package goraph implements graph data structure and algorithms. go get -v gopkg.in/gyuho/goraph.v2; I have tutorials and visualizations of grap

Dec 20, 2022
siusiu (suite-suite harmonics) a suite used to manage the suite, designed to free penetration testing engineers from learning and using various security tools, reducing the time and effort spent by penetration testing engineers on installing tools, remembering how to use tools.
siusiu (suite-suite harmonics) a suite used to manage the suite, designed to free penetration testing engineers from learning and using various security tools, reducing the time and effort spent by penetration testing engineers on installing tools, remembering how to use tools.

siusiu (suite-suite harmonics) a suite used to manage the suite, designed to free penetration testing engineers from learning and using various security tools, reducing the time and effort spent by penetration testing engineers on installing tools, remembering how to use tools.

Dec 12, 2022
Go iter tools (for iterating , mapping, filtering, reducing streams -represented as channels-)

Go iter tools (for iterating , mapping, filtering, reducing streams -represented as channels-)

Jan 1, 2023
Go package providing tools for working with Library of Congress data.

go-libraryofcongress Go package providing tools for working with Library of Congress data. Documentation Tools $> make cli go build -mod vendor -o bin

Jan 3, 2023
Probabilistic data structures for processing continuous, unbounded streams.

Boom Filters Boom Filters are probabilistic data structures for processing continuous, unbounded streams. This includes Stable Bloom Filters, Scalable

Dec 30, 2022
Sig - Statistics in Go - CLI tool for quick statistical analysis of data streams

Statistics in Go - CLI tool for quick statistical analysis of data streams

May 16, 2022
Utilities for working with discrete probability distributions and other tools useful for doing NLP work

GNLP A few structures for doing NLP analysis / experiments. Basics counter.Counter A map-like data structure for representing discrete probability dis

Nov 28, 2022
A collection of basic tools that make working with polynomials easier in Go

PolyGo A collection of basic tools that make working with polynomials easier in

Dec 8, 2022
Go package providing opinionated tools and methods for working with the `aws-sdk-go/service/cloudfront` package.

go-aws-cloudfront Go package providing opinionated tools and methods for working with the aws-sdk-go/service/cloudfront package. Documentation Tools $

Feb 2, 2022
gNXI Tools - gRPC Network Management/Operations Interface Tools

gNxI Tools gNMI - gRPC Network Management Interface gNOI - gRPC Network Operations Interface A collection of tools for Network Management that use the

Dec 15, 2022
Chanify is a safe and simple notification tools. This repository is command line tools for Chanify.

Chanify is a safe and simple notification tools. For developers, system administrators, and everyone can push notifications with API.

Dec 29, 2022
Tools - This subrepository holds the source for various packages and tools that support

Go Tools This subrepository holds the source for various packages and tools that

Jan 12, 2022
Package flac provides access to FLAC (Free Lossless Audio Codec) streams.

flac This package provides access to FLAC (Free Lossless Audio Codec) streams. Documentation Documentation provided by GoDoc. flac: provides access to

Jan 5, 2023
Go package capable of generating waveform images from audio streams. MIT Licensed.

waveform Go package capable of generating waveform images from audio streams. MIT Licensed. This library supports any audio streams which the azul3d/e

Nov 17, 2022
Lightweight, fault-tolerant message streams.
Lightweight, fault-tolerant message streams.

Liftbridge provides lightweight, fault-tolerant message streams by implementing a durable stream augmentation for the NATS messaging system. It extend

Jan 2, 2023
Go client to reliable queues based on Redis Cluster Streams

Ami Go client to reliable queues based on Redis Cluster Streams. Consume/produce performance Performance is dependent from: Redis Cluster nodes count;

Dec 12, 2022
redisqueue provides a producer and consumer of a queue that uses Redis streams

redisqueue redisqueue provides a producer and consumer of a queue that uses Redis streams. Features A Producer struct to make enqueuing messages easy.

Dec 29, 2022
Parse and demux MPEG Transport Streams (.ts) natively in GO

This is a Golang library to natively parse and demux MPEG Transport Streams (ts) in GO. WARNING: this library is not yet production ready. Use at your

Jan 9, 2023
Simple example for using Turbos Streams in Go with the Gorilla WebSocket toolkit.

Go Example for TurboStreams over WebSockets Simple example for using Turbos Streams in Go with the Gorilla WebSocket toolkit.

Dec 22, 2022