-
Notifications
You must be signed in to change notification settings - Fork 505
Enable streaming in Consul HTTP client #2295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable streaming in Consul HTTP client #2295
Conversation
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
5065bdd
to
4f7b07b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks really awesome! I have a couple of small pieces of style feedback.
@dadjeibaah: I know you were testing this codepath when the issue was originally opened; do you have bandwidth to manually test this change with large streaming responses from Consul?
private def parse[T: Manifest](rsp: Response): Future[T] = | ||
for { | ||
data <- rsp match { | ||
case chunked if rsp.isChunked => chunked.chunkReader.accumulate.map({ case (buf, _) => Buf.ByteArray.coerce(buf) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need access to the trailers (indeed, we discard them here) so we can use the slightly more ergonomic .reader
API instead of .chunkReader
.
I think this could simply be Reader.readAll(chunked.reader)
case chunked if rsp.isChunked => chunked.chunkReader.accumulate.map({ case (buf, _) => Buf.ByteArray.coerce(buf) }) | ||
case nonChunked => Future.value(Buf.ByteArray.coerce(nonChunked.content)) | ||
} | ||
parsed <- data match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my scala destructuring is a bit rusty but I think you might be able to simplify this as:
for {
Buf.ByteArray.Owned(bytes, begin, end) <- rsp match {
...
}
} yield mapper.readValue(bytes, begin, end - begin)
or even as
val content = if rsp.isChunked {
...
} else {
...
}
content.map {
case Buf.ByteArray.Owned(bytes, begin, end) => mapper.readValue(...)
}
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks fantastic! Let's do some manual testing before merging.
@adleong I already did manual testing and it works. I guess @dadjeibaah can also try it out manually to verify further. |
Great work, @zaharidichev! |
This work is continuation of #2287. In essence the change enables
streaming
for the consul http client and as a result of that makes handling large consul responses easier (i.e. not limited by max size config). The PR exposes thefixedLengthStreamedAfterKB
which as described in the docsIn order to reproduce the initial issue and validate the solution you can use the method described by @dadjeibaah in #2287. Note that if you want to see the initial issue you need to first run against this version of the code. After this PR is applied you can run the example again and convince yourself that exception is not thrown despite the fact that the message size configuration is not tweaked.
Fixes #2288
Signed-off-by: Zahari Dichev zaharidichev@gmail.com