Skip to content

Conversation

ItalyPaleAle
Copy link
Contributor

@ItalyPaleAle ItalyPaleAle commented Jul 14, 2022

Description

This PR is the first step in enabling support for streaming data that transits through Dapr runtimes, with the initial focus on sidecar-to-sidecar communication, as well as service invocation(currently limited to HTTP only).

To explain the issue, here's a demo of a service "client" calling into "server" through Dapr, making an HTTP request. The payload is "big" (about 70KB) and for demonstration purposes, both "client" and "server" are slow at producing or consuming the data.

before.mp4

As you can see, the flow is:

  1. "client" makes a request to its sidecar
  2. The sidecar waits for the entire request to be complete and loads the entire data in memory, then serializes it (to protobuf)
  3. "client" app's sidecar then sends the data to "server" app's sidecar (I did not add any "slowdown" in this step, but this does take some time especially with large payloads)
  4. "server" app's sidecar waits for the entire payload to be received (from "client" app's sidecar) and loaded in memory, then un-serializes it
  5. "server" app's sidecar sends the data to the "server" app

The flow is then repeated exactly the same way (but with roles inverted) for the response.

Because in every step the sidecars read the entire message in memory, this can make it challenging to use Dapr service-to-service invocation with large payloads (for example, images) because it adds significant latency. Additionally, because all the data is kept in memory in both serialized and unserialized forms, there's an increase in memory usage.

This PR changes the behavior, and now service-to-service invocation (same example) behaves this way:

after.mp4

By leveraging streaming, this is much faster, especially when transferring large files or when the producer is slow. Especially the TTFB (Time To First Byte) is much shorter.

What this PR includes, at a higher level:

  • Changed the way sidecars invoke one another to use streams. The new method ServiceInvocation.CallLocalStream is a variant of ServiceInvocation.CallLocal that uses streams rather than making unary calls. Internally this uses a buffer of up to 4KB, so messages that are smaller than that (and that are received in a single chunk by the runtime) are transferred in a single block. This should allow having no performance impact for small payloads (compared to unary RPCs).
  • In the HTTP API server, service invocation calls are now read as streams, so the runtime can begin invoking the other app as soon as the first bytes come in.
  • Likewise, when the sidecar invokes the apps using HTTP, the data is now sent to the app as a stream. Apps can begin receiving data as soon as the runtime gets it.

It also includes a lot of internal changes to support the two things above.

For clients that invoke Dapr via regular HTTP calls and receive responses through a regular HTTP server, no change will be needed. They will be able to enjoy the benefits of streaming for free and without doing anything!

For clients that use the Dapr SDKs, we may need to update the SDKs to leverage streaming when possible and appropriate. That is a separate work item.

Lastly, this PR is not "complete" as it doesn't add support for streams everywhere where we could want them. For example, we will want to support clients that use gRPC too. We will also want to leverage streaming for reading/writing into state stores and bindings.

Talking to @artursouza, he recommended focusing on HTTP service invocation first, and take this as an opportunity to lay the groundwork that will ultimately be useful in supporting all other scenarios. In this regard, this PR does provide some solid foundations for the remaining work.

Implementation details

This PR creates a new gRPC method that is used for sidecar-to-sidecar communication.

Currently, sidecar-to-sidecar communication uses the CallLocal RPC:

// Invokes a method of the specific service.
rpc CallLocal (InternalInvokeRequest) returns (InternalInvokeResponse) {}

This RPC is not removed and is retained for backwards compatibility (e.g. when Dapr 1.8 invokes Dapr vNext), however it is not used otherwise.

A new RPC is added:

// Invokes a method of the specific service using a stream of data.
rpc CallLocalStream (stream InternalInvokeRequestStream) returns (stream InternalInvokeResponseStream) {}

The request and response objects are defined as:

// InternalInvokeRequestStream is a variant of InternalInvokeRequest used in streaming RPCs.
message InternalInvokeRequestStream {
  // Request details.
  // This does not contain any data in message.data.
  InternalInvokeRequest request = 1;

  // Chunk of data.
  StreamPayload payload = 2;
}

// InternalInvokeResponseStream is a variant of InternalInvokeResponse used in streaming RPCs.
message InternalInvokeResponseStream {
  // Response details.
  // This does not contain any data in message.data.
  InternalInvokeResponse response = 1;

  // Chunk of data.
  StreamPayload payload = 2;
}

// Chunk of data sent in a streaming request or response.
message StreamPayload {
  // Data sent in the chunk.
  google.protobuf.Any data = 1;

  // Set to true if this is the last chunk.
  bool complete = 2;
}

In the protos above, InternalInvokeRequest and InternalInvokeResponse are the existing protos, that are used by the unary CallLocal RPC. In the streamed version, they are wrapped in a proto together with a StreamPayload object.

Note that, although the RPC uses bi-directional streaming, it still behaves like a unary call, in the sense that once the connection has been established, first the sender sends the entire request, and then it reads the entire response from the receiver. We use streams to be able to send data in chunks, but the sidecars do not maintain a persistent stream that is used to exchange multiple messages or where communication is constantly bi-directional.

How it works:

  1. The caller sidecar invokes CallLocalStream on the target sidecar and establishes the gRPC connection and the streaming RPC.
  2. The caller sidecar sends the data in chunks (see below for details on how chunking works).
  3. The target sidecar can begin processing the message (i.e. send it to the target app) as soon as the first chunk is received.
  4. Once the full payload has been sent, the target sidecar waits for the response from the app (which is read as a stream).
  5. The target sidecar sends the response in chunks to the caller sidecar, with the first chunk sent as soon as it's available.

Each chunk is a message of type InternalInvokeRequestStream or InternalInvokeResponseStream sent over the RPC, where:

  • The first message in the stream MUST contain an InternalInvokeRequest or InternalInvokeResponse with the required keys present.
  • The first message in the stream MAY contain a payload, but that is not required.
  • Subsequent messages (any message except the first in the stream) MUST contain a payload and MUST NOT contain any other property.
  • The last message in the stream MUST contain a payload with complete set to true. That message is assumed to be the last one from the sender and no more messages are to be sent in the stream after that.

The amount of data contained in payload.data is variable and it's up to the discretion of the sender. In this PR, chunks are at most 4KB in size, although senders may send smaller chunks if they wish. Receivers must not assume that messages will contain any specific number of bytes in the payload.

Note that it's possible for senders to send a single message in the stream. If the data is small and could fit in a single chunk, senders MAY choose to include a payload with complete=true in the first message. In this case, the entire message contains both an InternalInvokeRequest/InternalInvokeResponse with the required keys present AND a payload with data (if any) and complete=true. Receivers should assume that single message to be the entire communication from the sender.

To do

  • Figure out a way to ensure backwards compatibility when a runtime that supports streaming (e.g. daprd "vNext") tries to invoke a runtime that does not (e.g. daprd 1.8)
  • Update docs
  • Review what updates are needed in the SDKs

Issue reference

Fixes #3103
Also impacts other issues such as #4866

Checklist

Please make sure you've completed the relevant tasks for this PR, out of the following list:

@ItalyPaleAle ItalyPaleAle requested review from a team as code owners July 14, 2022 02:08
@yaron2
Copy link
Member

yaron2 commented Jul 14, 2022

Please make sure that this supports load balancing, otherwise we cannot accept this PR.

Meaning, if you call between app 1 and app 2 through Dapr and another instance pops up for app 2, it will start receiving requests.

@yaron2
Copy link
Member

yaron2 commented Jul 14, 2022

Also, for the scenario mentioned of transferring a file between two sidecars, why can't users do that with gRPC streaming with Dapr? As Dapr supports streaming naturally all the way through from app to sidecar, sidecar to remote sidecar and then to the target app.

@ItalyPaleAle
Copy link
Contributor Author

ItalyPaleAle commented Jul 14, 2022

Please make sure that this supports load balancing, otherwise we cannot accept this PR.
Meaning, if you call between app 1 and app 2 through Dapr and another instance pops up for app 2, it will start receiving requests.

Yes, this should have no issue with load balancing. It creates a new stream for each invocation, so it can load balance. Internally, at the wire level, it behaves the same as an unary invocation.

Also, for the scenario mentioned of transferring a file between two sidecars, why can't users do that with gRPC streaming with Dapr?

This is based on HTTP which supports streaming natively as a protocol, we just didn’t support it in Dapr. HTTP is simpler and is easier to integrate with existing services.

In fact, I started thinking about this problem about 2 months ago when I made an attempt at developing a demo using Imaginary (which exposes a HTTP endpoint) through Dapr, and noticed these issues.

One other thing to note is that this only supports a scenario where there’s a single request followed by a single response. The goal is to provide a stream-based transport for service invocation, transparently. End-users should continue to think of this as regular service invocation, and not think of this as a stream.

gRPC proxying remains the best solution when users need long-running streams and the ability to have two-way, alternating communication.

Lastly, as I wrote, this starts with HTTP but it also lays the foundation for supporting streams throughout the runtime. My goal is to support streams for other things including state stores (see #4804) and bindings.

@yaron2
Copy link
Member

yaron2 commented Jul 14, 2022

Please make sure that this supports load balancing, otherwise we cannot accept this PR.

Meaning, if you call between app 1 and app 2 through Dapr and another instance pops up for app 2, it will start receiving requests.

Alternatively, you can keep existing mechanisms untouched while making streaming explicit and respecting HTTP semantics by introducing the Transfer-Encoding: chunked header.

@ItalyPaleAle
Copy link
Contributor Author

I don’t think we need Transfer-Encoding: Chunked: that is meant for different purposes such as receiving a sequence of messages (like server-side events). This is meant to make service invocation faster and more efficient, by processing HTTP messages as a byte stream. It doesn’t require any change at the application level, and will work out-of-the-box even with regular curl requests.

@yaron2
Copy link
Member

yaron2 commented Jul 14, 2022

Yes, this should have no issue with load balancing. It creates a new stream for each invocation, so it can load balance. Internally, at the wire level, it behaves the same as an unary invocation.

Ok, just please make sure to validate that either manually or with a test (better).

See my comment above about leaving current mechanism untouched and enabling streaming explicitly using a header.

@yaron2
Copy link
Member

yaron2 commented Jul 14, 2022

I don’t think we need Transfer-Encoding: Chunked: that is meant for different purposes such as receiving a sequence of messages (like server-side events). This is meant to make service invocation faster and more efficient, by processing HTTP messages as a byte stream. It doesn’t require any change at the application level, and will work out-of-the-box even with regular curl requests.

Sounds good if there's no risk of behavioral breaking change like load balancing. Once this PR compiles I'll help verify.

@codecov
Copy link

codecov bot commented Jul 14, 2022

Codecov Report

Merging #4903 (b386d7b) into master (efaca38) will decrease coverage by 1.03%.
The diff coverage is 54.29%.

@@            Coverage Diff             @@
##           master    #4903      +/-   ##
==========================================
- Coverage   65.66%   64.62%   -1.04%     
==========================================
  Files         124      127       +3     
  Lines       12991    13547     +556     
==========================================
+ Hits         8530     8755     +225     
- Misses       3858     4151     +293     
- Partials      603      641      +38     
Impacted Files Coverage Δ
pkg/actors/actors_mock.go 0.00% <0.00%> (ø)
pkg/messaging/direct_messaging.go 10.57% <0.00%> (-8.48%) ⬇️
pkg/runtime/cli.go 1.65% <0.00%> (ø)
utils/host.go 38.46% <ø> (ø)
utils/resolvconf.go 75.86% <ø> (ø)
utils/streams/limitreadcloser.go 0.00% <0.00%> (ø)
utils/streams/teereadcloser.go 0.00% <0.00%> (ø)
pkg/grpc/api.go 67.81% <38.77%> (-5.18%) ⬇️
pkg/actors/actors.go 57.70% <52.43%> (-1.07%) ⬇️
pkg/http/server.go 64.05% <60.00%> (+0.23%) ⬆️
... and 23 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

ItalyPaleAle added a commit to ItalyPaleAle/dapr that referenced this pull request Aug 19, 2022
Spin-off from dapr#4903

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
@ItalyPaleAle
Copy link
Contributor Author

Note that I've not abandoned this PR. I just realized this is very big, and a lot of things won't work well until #4979 is merged.

I'm the process of splitting it into multiple PRs, also to make it easier to review.

ItalyPaleAle added a commit to ItalyPaleAle/dapr that referenced this pull request Aug 19, 2022
Spin-off from dapr#4903

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
ItalyPaleAle added a commit to ItalyPaleAle/dapr that referenced this pull request Aug 19, 2022
Spin-off from dapr#4903

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
ItalyPaleAle added a commit to ItalyPaleAle/dapr that referenced this pull request Aug 19, 2022
spin-off from dapr#4903

Co-authored-by: Hal Spang <halspang@microsoft.com>
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
@ItalyPaleAle ItalyPaleAle force-pushed the invoke-stream branch 4 times, most recently from 7f4df0a to 10bdc44 Compare August 22, 2022 19:23
@yaron2
Copy link
Member

yaron2 commented Aug 23, 2022

Note that I've not abandoned this PR. I just realized this is very big, and a lot of things won't work well until #4979 is merged.

I'm the process of splitting it into multiple PRs, also to make it easier to review.

Since #4979 will only be merge in 1.10 the soonest, doesn't it make sense to close this PR for now as this is at least 4/5 months away?

@ItalyPaleAle
Copy link
Contributor Author

The “core” PR I’m working on does not involve the API server (app-to-dapr).

It only implements partial streaming support by changing the messaging APIs (dapr-to-dapr) and the HTTP channel (dapr-to-app). That’s not full support for streaming but at least implements the core functionality and the required APIs to unblock things like blob stores and cryptography, who depend on core streaming support.

Updating the HTTP API server will indeed depend on #4979.

I will close this PR once the “core” one is ready, hopefully by end of next week.

artursouza pushed a commit that referenced this pull request Aug 23, 2022
* resiliency.PolicyDefined returns the policy object

spin-off from #4903

Co-authored-by: Hal Spang <halspang@microsoft.com>
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Tweaks

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: Hal Spang <halspang@microsoft.com>
artursouza added a commit that referenced this pull request Aug 29, 2022
* Improvements to E2E test apps

Spin-off from #4903

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Fixed error after merge

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Fixed panic

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
…nvocation

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
ItalyPaleAle added a commit to ItalyPaleAle/dapr that referenced this pull request Sep 13, 2022
This is a slightly reduced version of dapr#4903 that implements streaming only in:

- Communications between dapr sidecars (via the gRPC local channel)
- HTTP app channel (from Dapr to apps)
- Responses from direct messages and actor invocations over HTTP APIs

It does not add full streaming support to service invocation, as a few things are missing:

- Nothing is changing on gRPC APIs for now, which lack streaming support
- HTTP API server does not read request bodies as streams, due to buggy support in fasthttp. This will require removing fasthttp per dapr#4979

Related issues: dapr#3103 (partial fix) and also dapr#4866

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
@ItalyPaleAle
Copy link
Contributor Author

Closing this in favor of #5170

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Stream requests and not buffer on invoke service method.
3 participants