Description of the change
PIPELINING!!!!!!!
This PR removes the previous requirement for Keysplitting to be synchronous! Now, you can communicate with your agent and continually send messages without having to wait for the ack responses to those messages every time.
This is a complicated PR and removing the previous message RTT requirement has now made it possible to destroy a lot of our existing confusing flows and replace them with nice normal ones.
New Layer Flows in Daemon
Previously, the Keysplitting code was more integrated with different datachannel functions, but now it has more of a true "side-car" design.
Action -> Datachannel Flow
Action -> Plugin -> Datachannel -> Keysplitting -> Datachannel -> Websocket
- Plugin creates
outboxQueue chan ActionWrapper
which it passes to the Action on creation.
- Datachannel creates two go routines:
i. listens to the Plugin's
Outbox() <- chan ActionWrapper
and passes that to MrZap's Inbox(a ActionWrapper)
function
ii. listens to Keysplitting's Outbox() <- chan KeysplittingMessage
and send it to the websocket
- When the action pushes anything to the
outboxQueue
, that is then pushed directly to the Keysplitting side-car which process and puts it in its own outboxQueue
and that datachannel sends it.
Datachannel -> Action Flow
We still just call functions to get the message back to the action (Websocket -> Datachannel -> Plugin -> Action). Keysplitting's Validate(KeysplittingMessage)
function is still called from the handleKeysplitting()
function.
The major difference is that I've removed the ksInputChan
on the daemon. This channel was used previously to put our keysplitting messages into a channel so that it wouldn't impact us processing incoming stream messages but now that we don't have to wait for a return message before returning from our keysplitting message, so we don't need this channel at all.
PIPELINING
Our key data structure is our pipelineMap
this is an OrderedMap where pipelineMap.Newest()
is our most recently built Keysplitting message and pipelineMap.Oldest()
is the opposite. This is key'ed by the hash of the message value: hash(message) -> message
.
NOTE: I have completely removed the hpointer
and expectedHPointer
variables from the daemon side. hpointer
is now satisfied by our pipeline keys and expectedHPointer
is replaced by lastAck
which is equal to the last Ack (either syn/ack or data/ack) message.
Basic Output Pipelining
NOTE: In order to get pipelining to work I had to remove the Timestamp
field from the data/ack message because this field meant that we could never predict the object.
Plugin.outboxQueue
-> Plugin.Outbox()
-> Keysplitting.Inbox()
-> Keysplitting.pipeline()
-> Keysplitting.Outbox()
-> Datachannel.send()
We're now going to explain the steps once the Inbox()
function is called until the message reaches the Keysplitting outbox.
- Keysplitting takes an ActionWrapper and tries to pipeline it, this eventually results in the
Inbox()
call.
NOTE: ActionPayload used to be []byte, which meant a lot of marshalling in actions, now we only do it once in BuildResponse() in our Keysplitting code
type ActionWrapper struct {
Action string
ActionPayload interface{}
}
- Keysplitting is going to check if there's a previous message that we haven't received an ack for (in which case we'll predict the ack based on the most recently sent message before building our response) OR it will build our new message off our most recent ack (
lastAck
).
- Build Response!
- Add it to our
pipelineMap
!
- Add it to our
outboxQueue
!
Message Validation
This hasn't changed much but I'll cover it since there are small changes. Keysplitting.Validate()
is called from handleKeysplitting()
whenever we receive a new message.
- Validate signature on message
- Check that this is a response to a message we've sent
- Set our
lastAck
to whatever we received
- Delete the message this is an ack to from our
pipelineMap
Error Recovery
We only recover IF
from handleError()
in datachannel:
- We're not already recovering
- We haven't already tried more than the max number
- It's a
KeysplittingValidationError
type message
from Recover()
in keysplitting:
- The hpointer field (hash of message the error was thrown on) is not empty
- The error is pointing to a message we sent
When we call Keysplitting.Recover()
, we send a syn. Once we receive the syn/ack, we will grab the nonce. If the nonce corresponds to a message we've received, then we'll send all messages after that message OTHERWISE we'll resend all messages. This works because after our initial syn, syn/ack exchange, the target will respond to any new syns with a syn/ack where the nonce is actually the hash of the last received and correctly validated message. This means that when we recover we're actually syncing the state of the hash chains corresponding to the current state of the Keysplitting hash chain according to the agent and this recovery mechanism allows the daemon to sync its Keysplitting state to that. This was Sebby's idea. sebby mvp.
New Plugin Creation and Destruction
Plugin Creation
There is no more Feed()
flow, no more Food
. Creating and new action and plugin functions now take explicit arguments!
- Server starts up
- Server receives a request which results in some communication with the agent
- Server is responsible for (in this order):
i. Creating a plugin (explicit args)
ii. Passing that to a new datachannel
iii. Starting the desired action in the plugin
Plugin Destruction
Because the datachannel receives a plugin when it starts up, it can already start listening to that plugin dying (even before the action is started up). All plugins now provide a Done() <- chan struct{}
function which the datachannel can listen to and then die when signaled.
After the plugin dies, the datachannel EITHER:
- Agent: sends any messages that are still in its send queue and really dies once that queue is silent for 1 second.
- Daemon: receives messages until the time between receiving messages reaches 2 seconds and we wait a total, maximum time of 10 seconds.
Testing
This PR should be indistinguishable in the functionality of the regular agent here are some suggestions that I like to do when testing functionality of plugins:
Web
- Hitting our grafana dev instance
- espn.com
- Hit some illegitimate or misconfigured virtual target
DB
- Hit the psql db we have locally on our dev bzero-agent machines
- iperf
- Hit some illegitimate or misconfigured virtual target
Shell
- Connecting with a legitimate user
- Connecting with an illegitimate user
Kube
https://docs.google.com/document/d/1DkT4Bs10ZakzcBlRLmbHK_E6MXDoIl1g9UD_uE7-FGE
backend branch:
zli branch: pipelining
Ready to run system tests?
Relevant release note information
Release Notes:
Removes the previous requirement for MrZAP to be synchronous! Now, you can communicate with your agent and continually send messages without having to wait for the ack responses to those messages every time.
Related JIRA tickets
Relates to JIRA: CWC-1494, CWC-1644, CWC-1502, CWC-1831, CWC-1832
Have you considered the security impacts?
Does this PR have any security impact?
If yes, please explain: