Consider an example flow:
- Import from NSQ
- Filter
- Synchronize
- Export to Web Sockets
The import from NSQ takes a stream from a non-local NSQ and puts in the local NSQ. The Filter then reads from the local NSQ and then publishes to the local NSQ. The Synchronizer reads from the local NSQ and then publishes to the local NSQ, and finally, the Export reads from the local NSQ.
This would work fine for smaller streams, but the load caused by putting things on and off a local NSQ causes a bunch of redundancy. The thing is that all of the filtering/synchronizing/export logic is still super useful, the only problem is that the logic to speed up the architecture drastically is locked away in binaries that include NSQ readers/publishers.
I propose an architecture for the design of megablocks
something like this:
/streamtools just contains the structs that we use to deal with Go chan messages.
/blocks contains basically everything that is in the root right now (w/ NSQ stuff)
a file in streamtools would look something like this:
package streamtools
type Filter struct{
in chan []byte,
out chan []byte,
pattern string
}
func NewFilter(in chan []byte, out chan[]byte, pattern string){
this = &Filter{
in: make(chan []byte),
out: make(chan []byte),
pattern: pattern
}
go func(){
this.run();
}
return this
}
func (this *Filter) run(){
for{
select{
case in<-:
// do filter stuff here
}
This way, all of the logic in streamtools becomes agnostic as to how they are implemented. You could use them as part of the streamtools suite, or if you are just handling Go msgs in your own application you can import from /streamtools and use them without NSQ. Or you could chain them together to make megablocks.
/blocks would be full of NSQ-ready binaries, with really simple code that are basically NSQ wrappers around the streamtools logic. a filer would look something like this:
import "github.com/nytlabs/stream_tools"
func main(){
streamtools.NewNSQReader(params, channel A)
streamtools.NewFilter(pattern, channel A, channel B)
streamtools.NewNSQPublisher(params, channel B)
}
and this also means you could do something like
import "github.com/nytlabs/stream_tools"
func main(){
streamtools.NewNSQReader(params, channel A)
streamtools.NewFilter(pattern, channel A, channel B)
streamtools.NewSynchronizer(channel B, channel C)
streamtools.NewNSQPublisher(params, channel C)
}
basically, it allows for streamtools core to be a library that we use in the making of the NSQ-based block binaries. It's also good for when we want to start sharing util functions, like flatten, map, etc
eh?