-
Notifications
You must be signed in to change notification settings - Fork 38
Description
I am developing a gRPC service interface for the first time. I am reasonably familiar with asio
, but definitely not expert. Getting a basic server framework going from the examples is straightforward. What I'm trying to do now is put together a simple testing framework that will allow me to write unit tests for my service. It would integrate most easily into my existing tests if I could make it single-process instead of requiring a server and client to be launched separately, so I was trying to write a simple test wrapper that would instantiate the server, then perform asynchronous client requests to test the server's behavior.
With that said, I've edited one of the example programs to try to demonstrate what I want to do:
#include "example/v1/example.grpc.pb.h"
#include "client_rpc.hpp"
#include "helper.hpp"
#include "server_shutdown_asio.hpp"
#include <agrpc/asio_grpc.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <grpcpp/create_channel.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <iostream>
#include <thread>
namespace asio = boost::asio;
int main(int argc, const char** argv)
{
const auto port = argc >= 2 ? argv[1] : "50051";
const auto host = std::string("0.0.0.0:") + port;
// start up Example server
std::unique_ptr<grpc::Server> server;
grpc::ServerBuilder builder;
agrpc::GrpcContext grpc_context{builder.AddCompletionQueue()};
builder.AddListeningPort(host, grpc::InsecureServerCredentials());
example::v1::Example::AsyncService service;
builder.RegisterService(&service);
server = builder.BuildAndStart();
abort_if_not(bool{server});
// install a request handler for the bidirectional streaming endpoint
agrpc::repeatedly_request(&example::v1::Example::AsyncService::RequestBidirectionalStreaming, service,
asio::bind_executor(grpc_context,
[&](::grpc::ServerContext &server, ::grpc::ServerAsyncReaderWriter<example::v1::Response, example::v1::Request> &reader_writer) -> asio::awaitable<void>
{
std::cout << "server: started request\n";
// set up an alarm that is used to pace our responses to the client
agrpc::Alarm alarm(grpc_context);
int64_t duration_msec = 1000;
while (true)
{
using namespace asio::experimental::awaitable_operators;
// wait for the first of two events to happen:
//
// 1. we receive a new streaming message from the client
// 2. the timer expires
//
// when the timer expires, we send a message to the client.
std::cout << "server: reading/waiting\n";
example::v1::Request cmd;
auto result = co_await(agrpc::read(reader_writer, cmd) ||
alarm.wait(std::chrono::system_clock::now() + std::chrono::milliseconds(duration_msec)));
if (result.index() == 0)
{
if (std::get<0>(result))
{
std::cout << "server: got streaming message\n";
}
else
{
std::cout << "server: read failed\n";
break;
}
}
else
{
std::cout << "server: alarm expired\n";
example::v1::Response resp;
co_await agrpc::write(reader_writer, resp);
}
}
co_await agrpc::finish(reader_writer, ::grpc::Status::OK);
}
));
// bring in the server shutdown functionality
example::ServerShutdown server_shutdown{*server, grpc_context};
// set up a client stub for this service
example::v1::Example::Stub stub(::grpc::CreateChannel(host, ::grpc::InsecureChannelCredentials()));
// spawn a coroutine that will talk to the bidirectional streaming endpoint
asio::co_spawn(grpc_context,
[&]() -> asio::awaitable<void>
{
// create an RPC to the BidirectionalStreaming interface
using RPC = example::AwaitableClientRPC<&example::v1::Example::Stub::PrepareAsyncBidirectionalStreaming>;
RPC rpc(grpc_context);
// start the RPC
std::cout << "client: starting RPC\n";
if (!co_await rpc.start(stub)) co_return;
// send the first streaming message to the server
std::cout << "client: writing streaming message\n";
example::v1::Request cmd;
auto write_ok = co_await rpc.write(cmd);
// now, just wait forever for streaming responses back from the server until a read fails
bool read_ok = true;
while (write_ok && read_ok)
{
std::cout << "client: waiting for streaming message\n";
example::v1::Response resp;
read_ok = co_await rpc.read(resp);
}
std::cout << "client: done\n";
},
asio::detached);
// run the gRPC context thread
grpc_context.run();
std::cout << "Shutdown completed\n";
}
My intended behavior when running the program would be:
- The server starts.
- The client coroutine initiates a
BidirectionalStreaming
RPC. - The client coroutine sends a
Request
streaming message to the server. - The server receives the
Request
. - The server sends
Response
streaming messages periodically to the client (indefinitely in this simple example) - At some point later, I can send
SIGINT
to the process and theServerShutdown
will shut down the server, the next read/write on the RPC fail for the server/client, and the coroutines exit cleanly.
I'm not seeing what I expected, likely because I'm missing something fundamental about how this should work. When I run the above example program, I get:
[user@host:~/git/asio-grpc/build]$ example/asio-grpc-example-streaming-server
client: starting RPC
client: writing streaming message
client: waiting for streaming message
server: started request
server: reading/waiting
server: got streaming message
server: reading/waiting
And nothing else. Specifically, the coroutine handling the BidirectionalStreaming
RPC on the server side seems to get stuck in the co_await
that should complete when either a new message is received or the alarm expires. The alarm expiration doesn't ever seem to fire.
Furthermore, if I try to send SIGINT
or SIGTERM
to the process, it does not respond. If I take the co_spawn
for the client coroutine out, then I can at least use the signals to shut down the server cleanly.
Should this type of strategy with with asio-grpc?