Skip to content

Enable streaming in Consul HTTP client #2295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 17, 2019

Conversation

zaharidichev
Copy link
Member

This work is continuation of #2287. In essence the change enables streaming for the consul http client and as a result of that makes handling large consul responses easier (i.e. not limited by max size config). The PR exposes the fixedLengthStreamedAfterKB which as described in the docs

configures fixedLengthStreamedAfter limit, which effectively turns on streaming (think withStreaming(true)). The fixedLengthStreamedAfter, however, disables streaming for sufficiently small messages of known fixed length.

In order to reproduce the initial issue and validate the solution you can use the method described by @dadjeibaah in #2287. Note that if you want to see the initial issue you need to first run against this version of the code. After this PR is applied you can run the example again and convince yourself that exception is not thrown despite the fact that the message size configuration is not tweaked.

Fixes #2288

Signed-off-by: Zahari Dichev zaharidichev@gmail.com

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
@zaharidichev zaharidichev force-pushed the zd/streaming-consul-client branch from 5065bdd to 4f7b07b Compare July 13, 2019 15:19
Copy link
Member

@adleong adleong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really awesome! I have a couple of small pieces of style feedback.

@dadjeibaah: I know you were testing this codepath when the issue was originally opened; do you have bandwidth to manually test this change with large streaming responses from Consul?

private def parse[T: Manifest](rsp: Response): Future[T] =
for {
data <- rsp match {
case chunked if rsp.isChunked => chunked.chunkReader.accumulate.map({ case (buf, _) => Buf.ByteArray.coerce(buf) })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need access to the trailers (indeed, we discard them here) so we can use the slightly more ergonomic .reader API instead of .chunkReader.

I think this could simply be Reader.readAll(chunked.reader)

case chunked if rsp.isChunked => chunked.chunkReader.accumulate.map({ case (buf, _) => Buf.ByteArray.coerce(buf) })
case nonChunked => Future.value(Buf.ByteArray.coerce(nonChunked.content))
}
parsed <- data match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my scala destructuring is a bit rusty but I think you might be able to simplify this as:

for {
  Buf.ByteArray.Owned(bytes, begin, end) <- rsp match {
    ...
  }
} yield mapper.readValue(bytes, begin, end - begin)

or even as

val content = if rsp.isChunked {
  ...
} else {
  ...
}
content.map {
  case Buf.ByteArray.Owned(bytes, begin, end) => mapper.readValue(...)
}

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
Copy link
Member

@adleong adleong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks fantastic! Let's do some manual testing before merging.

@zaharidichev
Copy link
Member Author

@adleong I already did manual testing and it works. I guess @dadjeibaah can also try it out manually to verify further.

@adleong
Copy link
Member

adleong commented Jul 17, 2019

Great work, @zaharidichev!

@adleong adleong merged commit 43a1130 into linkerd:master Jul 17, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Stream chunked responses from Consul API
2 participants