Substation is a cloud native toolkit for building modular ingest, transform, and load (ITL) data pipelines

Substation

Substation is a cloud native data pipeline toolkit.

What is Substation?

Substation is a modular ingest, transform, load (ITL) application for moving data between distributed systems. Originally designed to collect, normalize, and enrich security event data, the application provides methods for achieving high quality data through interconnected data pipelines.

Substation also provides Go packages for filtering and modifying JSON data.

Features

As an event-driven ITL application, Substation has these features:

  • real-time event filtering and processing
  • cross-dataset event correlation and enrichment
  • concurrent event routing to downstream systems
  • runs on containers, built for extensibility
    • support for new event filters and processors
    • support for new ingest sources and load destinations
    • supports creation of custom applications (e.g., multi-cloud)

As a package, Substation has these features:

Use Cases

Substation was originally designed to support the mission of achieving high quality data for threat hunting, threat detection, and incident response, but it can be used to move data between many distributed systems and services. Here are some example use cases:

  • data availability: sink data to an intermediary streaming service such as AWS Kinesis, then concurrently sink it to a data lake, data warehouse, and SIEM
  • data consistency: normalize data across every dataset using a permissive schema such as the Elastic Common Schema
  • data completeness: enrich data by integrating AWS Lambda functions and building self-populating AWS DynamoDB tables for low latency, real-time event context

Example Data Pipelines

Simple

The simplest data pipeline is one with a single source (ingest), a single transform, and a single sink (load). The diagram below shows pipelines that ingest data from different sources and sink it unmodified to a data warehouse where it can be used for analysis.

graph TD
    sink(Data Warehouse)

    %% pipeline one
    source_a(HTTPS Source)
    processing_a[Transfer]

    %% flow
    subgraph pipeline X
    source_a ---|Push| processing_a
    end

    processing_a ---|Push| sink

    %% pipeline two
    source_b(Data Lake)
    processing_b[Transfer]

    %% flow
    subgraph pipeline Y
    source_b ---|Pull| processing_b
    end

    processing_b ---|Push| sink

Complex

The complexity of a data pipeline, including its features and how it connects with other pipelines, is up to the user. The diagram below shows two complex data pipelines that have these feature:

  • both pipelines write unmodified data to intermediary streaming data storage (e.g., AWS Kinesis) to support concurrent consumers and downstream systems
  • both pipelines transform data by enriching it from their own inter-pipeline metadata lookup (e.g., AWS DynamoDB)
  • pipeline Y additionally transforms data by enriching it from pipeline X's metadata lookup
graph TD

    %% pipeline a
    source_a_http(HTTPS Source)
    sink_a_streaming(Streaming Data Storage)
    sink_a_metadata(Metadata Lookup)
    sink_a_persistent[Data Warehouse]
    processing_a_http[Transfer]
    processing_a_persistent[Transform]
    processing_a_metadata[Transform]

    %% flow
    subgraph pipeline Y
    source_a_http ---|Push| processing_a_http
    processing_a_http ---|Push| sink_a_streaming
    sink_a_streaming ---|Pull| processing_a_persistent
    sink_a_streaming ---|Pull| processing_a_metadata
    processing_a_persistent---|Push| sink_a_persistent
    processing_a_persistent---|Pull| sink_a_metadata
    processing_a_metadata ---|Push| sink_a_metadata
    end

    processing_a_persistent ---|Pull| sink_b_metadata

    %% pipeline b
    source_b_http(HTTPS Source)
    sink_b_streaming(Streaming Data Storage)
    sink_b_metadata(Metadata Lookup)
    sink_b_persistent(Data Warehouse)
    processing_b_http[Transfer]
    processing_b_persistent[Transform]
    processing_b_metadata[Transform]

    %% flow
    subgraph pipeline X
    source_b_http ---|Push| processing_b_http
    processing_b_http ---|Push| sink_b_streaming
    sink_b_streaming ---|Pull| processing_b_persistent
    sink_b_streaming ---|Pull| processing_b_metadata
    processing_b_persistent---|Push| sink_b_persistent
    processing_b_persistent---|Pull| sink_b_metadata
    processing_b_metadata ---|Push| sink_b_metadata
    end

As a toolkit, Substation makes no assumptions about how data pipelines are configured and connected. We encourage experimentation and outside-the-box thinking when it comes to pipeline design!

Quickstart

Users can use the steps below to test Substation's functionality. We recommend doing the steps below in a Docker container (we've included Visual Studio Code configurations for developing and testing Substation in .devcontainer/ and .vscode/).

Step 1: Compile the File Binary

From the project root, run the commands below to compile the Substation file app.

$ cd cmd/file/substation/
$ go build .
$ ./substation -h

Step 2: Compile the quickstart Configuration File

From the project root, run the commands below to compile the quickstart Jsonnet configuration files into a Substation JSON config.

$ sh build/config/compile.sh

Step 3: Test Substation

From the project root, run the commands below to create a sample events file and test Substation. After this, we recommend reviewing the config documentation and running more tests with other event processors to learn how the app works.

$ echo '{"hello":"world"}' >> quickstart.json
$ ./cmd/file/substation/substation -input /tmp/quickstart.json -config config/quickstart/config.json

Step 4: Test Substation in AWS

Navigate to the build directory and review the terraform, container, and config documentation. build/terraform/aws/example_pipeline.tf is a fully-featured data pipeline that can be used as an example of how to deploy pipelines in AWS.

Additional Documentation

More documentation about Substation can be found across the project, including:

Licensing

Substation and its associated code is released under the terms of the MIT License.

Comments
  • refactor!: Standardizing Use of io

    refactor!: Standardizing Use of io

    Description

    • Updates functions and methods that handle streaming data to use the io package
      • internal/aws/appconfig
      • internal/aws/s3manager
      • internal/file
      • internal/sink/s3
    • Adds a SetConfig method to the Substation app that accepts an io.Reader
    • Adds an internal package for detecting content (media) type of files
    • Adds an internal package that wraps bufio and Scanner
    • Moves the SUBSTATION_CONFIG handler from app.go to config.go
    • Changes and moves the SUBSTATION_SCAN_METHOD handler from app.go to internal/bufio (breaking)
    • Changes the Substation app to make the package private, except for the creation of a new app (breaking)
    • Changes all published apps (cmd/aws/lambda, cmd/file, examples/service) to support changes made in this PR
      • cmd/file now dynamically retrieves configs from local disk, HTTP(S), or S3
      • cmd/aws/lambda now dynamically retrieves configs from local disk, HTTP(S), S3, or AppConfig
    • Updated CONTRIBUTING.md with design pattern documentation for reading and writing streaming data

    Motivation and Context

    Currently the project uses bytes in places where the io package makes more sense, specifically in handling of files. This is most risky when downloading content from cloud buckets and the web -- with bytes all of the content is read into memory at once, but io is more memory efficient because it's streaming data instead of putting it all into memory. Since we're a serverless application and some serverless providers (like AWS Lambda) charge based on memory allocation, there's a tangible benefit to making the application more efficient on memory. More than that, switching to io decreases the likelihood that we'll need to do more refactors in the future.

    While refactoring these methods and functions a few other issues and opportunities came up:

    • having a standardized way to create and use a bufio scanner
    • having a standardized way to inspect files by media type
    • making the core application private

    How Has This Been Tested?

    • All unit tests were converted from supporting bytes to io
    • Packages that cannot be easily unit tested are supported by example tests -- these serve the dual function of testing and showing how to use packages
    • Integration tested using the cmd/file app
    • End to end tested using examples/aws in a development AWS account

    Types of changes

    • [ ] Bug fix (non-breaking change which fixes an issue)
    • [ ] New feature (non-breaking change which adds functionality)
    • [x] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [x] My code follows the code style of this project.
    • [x] My change requires a change to the documentation.
    • [x] I have updated the documentation accordingly.
  • fix(linter): fix golangci-lint warnings across substation

    fix(linter): fix golangci-lint warnings across substation

    This updates the Go code lintng to use the meta-linter golangci-lint and fixes related lints.

    Description

    Changes include:

    • fix: handle or explicitly ignore all errors
    • fix: remove redundant function calls (#31) and conversions
    • fix: rename predeclared caps identifier
    • chore: tools: gofmt -> gofumpt, and staticcheck -> golangci-lint
    • docs: update comments to remove grammar mistakes and follow Go comment conventions.

    Motivation and Context

    This closes #31, improves error handling and consistency across with project.

    How Has This Been Tested?

    Local builds, unit tests, and benchmarks all pass.

    Types of changes

    • [x] Bug fix (non-breaking change which fixes an issue)
    • [ ] New feature (non-breaking change which adds functionality)
    • [ ] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [x] My code follows the code style of this project.
    • [ ] My change requires a change to the documentation.
    • [ ] I have updated the documentation accordingly.
  • refactor: Update Configs

    refactor: Update Configs

    Description

    • Moves the Substation config so that it is public (can be imported by other Go apps)
      • config.go and Jsonnet functions are now in config/
      • example configurations are now in examples/
    • Adds runtime concurrency setting via SUBSTATION_CONCURRENCY environment variable
      • Default behavior is the same as the current behavior
    • Updated documentation

    Motivation and Context

    • Making the Substation config public was a requirement to make the condition inspectors and processors highly usable in other applications
    • Using SUBSTATION_CONCURRENCY is preferred over pinning concurrency to the number of CPUs -- there are some scenarios when less or more concurrency is required

    How Has This Been Tested?

    Types of changes

    • [ ] Bug fix (non-breaking change which fixes an issue)
    • [x] New feature (non-breaking change which adds functionality)
    • [ ] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [x] My code follows the code style of this project.
    • [x] My change requires a change to the documentation.
    • [x] I have updated the documentation accordingly.
  • chore(main): release 0.7.1

    chore(main): release 0.7.1

  • fix: Static gzip Media Signature

    fix: Static gzip Media Signature

    Description

    • Adds a static media signature for application/x-gzip

    Motivation and Context

    There is an occasional media identification bug when retrieving compressed CloudTrail files from AWS S3 -- http.DetectContentType incorrectly identifies the media type as application/vnd.ms-fontobject. The bug is rare (at Brex we process more than 100k compressed CloudTrail files every day and we see the bug, at most, once every few months) but it needs to be fixed because it results in data loss.

    How Has This Been Tested?

    The fix was integration tested with a CloudTrail file known to trigger the false match.

    Types of changes

    • [x] Bug fix (non-breaking change which fixes an issue)
    • [ ] New feature (non-breaking change which adds functionality)
    • [ ] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [x] My code follows the code style of this project.
    • [ ] My change requires a change to the documentation.
    • [ ] I have updated the documentation accordingly.
  • chore(main): release 0.7.0

    chore(main): release 0.7.0

  • chore(main): release 0.6.1

    chore(main): release 0.6.1

  • chore(main): release 0.6.0

    chore(main): release 0.6.0

  • feat: Add gRPC Support

    feat: Add gRPC Support

    Description

    • Adds initial Protobuf definitions to proto/
      • Adds build dependencies in all Dockerfile, including devcontainer
      • Adds build script in build/scripts/proto/compile.sh
    • Adds gRPC server and service package to internal/service
    • Adds gRPC sink to internal/sink
      • Adds internal/file to support loading a server certificate for the sink from one of many locations
    • Adds example application utilizing the gRPC Sink service
    • Updates devcontainer settings, including an install script

    Motivation and Context

    This update is the first step in the project becoming a true "distributed system" (in the architectural sense) and enables users to deploy the system in new ways (described as future work below). This is quite big from an impact point of view, so we should address any open questions before approving the PR.

    In the short term, this gives us the ability to support synchronous (sync) invocation in AWS Lambda. The problem we've had until now is that there was no way for the sink goroutine to send results back to the calling application -- this is a requirement in supporting sync invocations because the processed data must be returned by the Lambda handler. With this change we can send data from the sink to the caller by using a gRPC service for inter-process communication (IPC). This is shown in the diagram below where goroutines are represented by dotted lines and data flow is represented by solid arrows. (Using gRPC for IPC is described in more detail in the new example included in this PR.)

    graph TD
    handler -.- gRPC
    handler -.- transform
    handler -.- sink
    handler --> ingest
    ingest --> transform
    transform --> sink
    sink --> gRPC
    gRPC --> handler
    

    Not supporting sync invocation was a blocker to supporting some AWS services (like data transformation in Kinesis Firehose) and making the system behave like a data enrichment microservice -- it's not yet clear how we'll implement this, but these changes will make it possible to deploy Substation as an "enrichment node" within the context of a larger system that can be invoked using the existing Lambda processor.

    Onto future work, the added benefit of gRPC is that, with little effort on our part, we can extend that functionality to systems beyond serverless AWS services. This PR adds a definition for a Sink service that mimics the internal Sink interface, but we could also add definitions that mimic other components of the system as well. For example, these definitions would turn every processor and inspector into configurable microservices:

    // Applicator mirrors the Applicator interface defined in process
    service Applicator {
      rpc Apply(Capsule) returns (Capsule) {}
    }
    
    // Inspector mirrors the Inspector interface defined in condition
    service Inspector {
      rpc Inspect(Capsule) returns (Decision) {}
    }
    

    Overall, defining Protobuf based on the system's structs and interfaces would be a relatively safe method for making components accessible to external services (including services not written in Go) and letting others build their own distributed data pipelines on non-serverless infrastructure. (I don't anticipate the team at Brex doing this any time soon since it would increase complexity and we're happy with AWS Lambda, but if others are interested, then it's easy to support).

    How Has This Been Tested?

    The new example included in this PR acts as an integration test for all of the new features, including the proto, the internal gRPC service, the internal gRPC server, and the gRPC sink.

    Types of changes

    • [ ] Bug fix (non-breaking change which fixes an issue)
    • [x] New feature (non-breaking change which adds functionality)
    • [ ] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [x] My code follows the code style of this project.
    • [x] My change requires a change to the documentation.
    • [x] I have updated the documentation accordingly.
  • chore(main): release 0.5.0

    chore(main): release 0.5.0

    :robot: I have created a release beep boop

    0.5.0 (2022-10-04)

    ⚠ BREAKING CHANGES

    • Update App Concurrency Model (#30)
    • Add Forward Compatibility for SNS (#21)

    Features

    • Add Forward Compatibility for SNS (#21) (b93dc1e)
    • Add Initial Support for Application Metrics (#25) (30f103d)
    • AppConfig Script Updates (#28) (5261485)
    • Customizable Kinesis Data Stream Autoscaling (#27) (2dd7ea7)
    • Improvements to JSON Parsing (#29) (98cac69)
    • Improvements to Reading and Decoding Files (#24) (e310cb5)

    Bug Fixes

    • linter: fix golangci-lint warnings across substation (#32) (9b7e077)
    • streamname bug (#23) (da9de62)

    Code Refactoring


    This PR was generated with Release Please. See documentation.

  • feat!: Encapsulation

    feat!: Encapsulation

    Description

    • Makes encapsulation the default pattern for handling data in all Substation applications and public APIs (see changes in /config/)
      • Adds backwards compatible functions for handling non-encapsulated data
    • Adds AWS Firehose sink support
    • Adds AWS SQS source and sink support
    • Adds context support condition API
    • Updates code style of public APIs

    This is a non-breaking change for ITL applications and a breaking change for the public APIs.

    Motivation and Context

    The motivation for adding encapsulation was the need to let users simultaneously handle both data (structured or unstructured) and metadata. This has two advantages:

    • sources can add metadata when data is ingested
    • metadata can be tracked alongside unstructured (e.g., binary) data

    This change means that users can now access and interpret information only the source application knows; for example:

    • substation/file: the filename and file size of the source file that data is read from
    • substation/aws/kinesis: the Kinesis stream data is delivered by and approximate arrival time of data
    • substation/aws/s3: the S3 bucket and object that data is read from

    It also provides total separation of metadata from data, which is most useful in sink applications; for example:

    • http: storing HTTP headers as metadata instead of in the data object
    • s3: storing the S3 prefix as metadata instead of in the data object
    • sumologic: storing the category as metadata instead of in the data object

    This gives us some flexibility for growth in the future. For example, instead of relying on configured settings for Kinesis streams and S3 buckets, we can add features to let users dynamically apply these from metadata.

    How Has This Been Tested?

    • All public APIs had their unit tests updated
    • Firehose and SQS features were tested in a development AWS account

    Types of changes

    • [ ] Bug fix (non-breaking change which fixes an issue)
    • [x] New feature (non-breaking change which adds functionality)
    • [x] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [x] My code follows the code style of this project.
    • [x] My change requires a change to the documentation.
    • [x] I have updated the documentation accordingly.
  • refactor!: Breaking Public APIs

    refactor!: Breaking Public APIs

    Description

    • Changes all processors, conditions, transform, and sinks to be private and only accessible via factory methods
    • Changes operators to all (formerly AND), any(formerly OR), none (formerly NAND)
    • Changes libsonnet function libraries to be more modular
    • Changes all AWS sinks and processors to start with aws_
    • Updated all docs, added pointers to https://substation.readme.io/
    • Upgraded all dependencies
    • Renames the concat processor to join
    • Changes the math processor to support floats
    • Adds raw array support to internal/json

    Motivation and Context

    This gets us closer to a maintainable 1.0 release by limiting how users interact with the project's public APIs. This PR makes it so that all interactions are via factory methods with heavy emphasis on using Jsonnet / configurations as code. Some components are also simplified (e.g., operators) and documentation is improved.

    This is intended to be the last breaking change for public APIs for a very long time, so we should review the entire public codebase to see if anything is missing.

    How Has This Been Tested?

    Unit tests are passing, local integration tests are passing, end to end tests in AWS are pending.

    Types of changes

    • [ ] Bug fix (non-breaking change which fixes an issue)
    • [ ] New feature (non-breaking change which adds functionality)
    • [x] Breaking change (fix or feature that would cause existing functionality to change)

    Checklist:

    • [x] My code follows the code style of this project.
    • [x] My change requires a change to the documentation.
    • [x] I have updated the documentation accordingly.
  • Tracking 2023 Roadmap

    Tracking 2023 Roadmap

    Is your feature request related to a problem? Please describe. Depends, maybe not 🤷

    Describe the solution you'd like It would be super helpful if we could track the features mentioned in the 2023 Roadmap as "Issues" with tags such as "enhancement" 😄

    Describe alternatives you've considered ✍️ the issues/features etc down on 🧻 but that doesn't scale 🤔

    Additional context n/a

provide api for cloud service like aliyun, aws, google cloud, tencent cloud, huawei cloud and so on

cloud-fitter 云适配 Communicate with public and private clouds conveniently by a set of apis. 用一套接口,便捷地访问各类公有云和私有云 对接计划 内部筹备中,后续开放,有需求欢迎联系。 开发者社区 开发者社区文档

Dec 20, 2022
Tpf2-tpnetmap-toolkit - A toolkit to create svg map images from TransportFever2 world data
Tpf2-tpnetmap-toolkit - A toolkit to create svg map images from TransportFever2 world data

tpf2-tpnetmap-toolkit TransportFever2 のワールドデータから svg のマップ画像を作成するツールキットです。 1. 導入方

Feb 17, 2022
ThoughtLoom: Transform Data into Insights with OpenAI LLMs
ThoughtLoom: Transform Data into Insights with OpenAI LLMs

ThoughtLoom: Transform Data into Insights with OpenAI LLMs ThoughtLoom is a powerful tool designed to foster creativity and enhance productivity throu

May 4, 2023
Cloud-Z gathers information and perform benchmarks on cloud instances in multiple cloud providers.

Cloud-Z Cloud-Z gathers information and perform benchmarks on cloud instances in multiple cloud providers. Cloud type, instance id, and type CPU infor

Jun 8, 2022
A lightweight, cloud-native data transfer agent and aggregator
A lightweight, cloud-native data transfer agent and aggregator

English | 中文 Loggie is a lightweight, high-performance, cloud-native agent and aggregator based on Golang. It supports multiple pipeline and pluggable

Jan 6, 2023
Transform latin letters to runes & vice versa. Go version.

Riimut Transform latin letters to runes & vice versa. Go version. Includes transformers for four main runic alphabets: Elder Futhark Younger Futhark M

Aug 2, 2022
Flowlogs2metrics - Transform flow logs into metrics
Flowlogs2metrics - Transform flow logs into metrics

Overview Flow-Logs to Metrics (a.k.a. FL2M) is an observability tool that consum

Jan 3, 2023
GitOops is a tool to help attackers and defenders identify lateral movement and privilege escalation paths in GitHub organizations by abusing CI/CD pipelines and GitHub access controls.
GitOops is a tool to help attackers and defenders identify lateral movement and privilege escalation paths in GitHub organizations by abusing CI/CD pipelines and GitHub access controls.

GitOops is a tool to help attackers and defenders identify lateral movement and privilege escalation paths in GitHub organizations by abusing CI/CD pipelines and GitHub access controls.

Jan 2, 2023
Modular Kubernetes operator to manage the lifecycle of databases

Ensemble Ensemble is a simple and modular Kubernetes Operator to manage the lifecycle of a wide range of databases. Infrastructure as code with Kubern

Aug 12, 2022
Next generation recitation assignment tool for 6.033. Modular, scalable, fast

Next generation recitation assignment tool for 6.033. Modular, scalable, fast

Feb 3, 2022
Tool for generating Spinnaker application/pipelines and k8s manifests

jarvis Just A Rather Very Intelligent System Get git clone [email protected]:ealebe

Jan 6, 2022
Open Service Mesh (OSM) is a lightweight, extensible, cloud native service mesh that allows users to uniformly manage, secure, and get out-of-the-box observability features for highly dynamic microservice environments.
Open Service Mesh (OSM) is a lightweight, extensible, cloud native service mesh that allows users to uniformly manage, secure, and get out-of-the-box observability features for highly dynamic microservice environments.

Open Service Mesh (OSM) Open Service Mesh (OSM) is a lightweight, extensible, Cloud Native service mesh that allows users to uniformly manage, secure,

Jan 2, 2023
Polaris is a cloud-native service discovery and governance center

It can be used to solve the problem of service connection, fault tolerance, traffic control and secure in distributed and microservice architecture.

Dec 26, 2022
The OCI Service Operator for Kubernetes (OSOK) makes it easy to connect and manage OCI services from a cloud native application running in a Kubernetes environment.

OCI Service Operator for Kubernetes Introduction The OCI Service Operator for Kubernetes (OSOK) makes it easy to create, manage, and connect to Oracle

Sep 27, 2022
Cloud Native Electronic Trading System built on Kubernetes and Knative Eventing

Ingenium -- Still heavily in prototyping stage -- Ingenium is a cloud native electronic trading system built on top of Kubernetes and Knative Eventing

Aug 29, 2022
🔥 🔥 Open source cloud native security observability platform. Linux, K8s, AWS Fargate and more. 🔥 🔥
🔥 🔥   Open source cloud native security observability platform. Linux, K8s, AWS Fargate and more. 🔥 🔥

CVE-2021-44228 Log4J Vulnerability can be detected at runtime and attack paths can be visualized by ThreatMapper. Live demo of Log4J Vulnerability her

Jan 1, 2023
Build powerful pipelines in any programming language.
Build powerful pipelines in any programming language.

Gaia is an open source automation platform which makes it easy and fun to build powerful pipelines in any programming language. Based on HashiCorp's g

Jan 3, 2023
🏯 Monitor your (gitlab/github) CI/CD pipelines via command line interface with fortress
🏯 Monitor your (gitlab/github) CI/CD pipelines via command line interface with fortress

__ _ / _| | | | |_ ___ _ __| |_ _ __ ___ ___ ___ | _/ _ \| '__| __| '__/ _ \/ __/ _

Mar 31, 2022
Drone plugin to skip pipelines based on changed files

drone-skip-pipeline Drone plugin to skip pipelines based on changed files. Build Build the binary with the following command: export GOOS=linux export

Aug 7, 2022