Skip to content

Using asynchronous gRPC server and client on same context #81

@otherjason

Description

@otherjason

I am developing a gRPC service interface for the first time. I am reasonably familiar with asio, but definitely not expert. Getting a basic server framework going from the examples is straightforward. What I'm trying to do now is put together a simple testing framework that will allow me to write unit tests for my service. It would integrate most easily into my existing tests if I could make it single-process instead of requiring a server and client to be launched separately, so I was trying to write a simple test wrapper that would instantiate the server, then perform asynchronous client requests to test the server's behavior.

With that said, I've edited one of the example programs to try to demonstrate what I want to do:

#include "example/v1/example.grpc.pb.h"
#include "client_rpc.hpp"
#include "helper.hpp"
#include "server_shutdown_asio.hpp"

#include <agrpc/asio_grpc.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <grpcpp/create_channel.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>

#include <iostream>
#include <thread>

namespace asio = boost::asio;

int main(int argc, const char** argv)
{
    const auto port = argc >= 2 ? argv[1] : "50051";
    const auto host = std::string("0.0.0.0:") + port;

    // start up Example server
    std::unique_ptr<grpc::Server> server;

    grpc::ServerBuilder builder;
    agrpc::GrpcContext grpc_context{builder.AddCompletionQueue()};
    builder.AddListeningPort(host, grpc::InsecureServerCredentials());
    example::v1::Example::AsyncService service;
    builder.RegisterService(&service);
    server = builder.BuildAndStart();
    abort_if_not(bool{server});

    // install a request handler for the bidirectional streaming endpoint
    agrpc::repeatedly_request(&example::v1::Example::AsyncService::RequestBidirectionalStreaming, service,
        asio::bind_executor(grpc_context, 
        [&](::grpc::ServerContext &server, ::grpc::ServerAsyncReaderWriter<example::v1::Response, example::v1::Request> &reader_writer) -> asio::awaitable<void>
        {
            std::cout << "server: started request\n";

            // set up an alarm that is used to pace our responses to the client
            agrpc::Alarm alarm(grpc_context);
            int64_t duration_msec = 1000;

            while (true)
            {
                using namespace asio::experimental::awaitable_operators;

                // wait for the first of two events to happen:
                // 
                // 1. we receive a new streaming message from the client
                // 2. the timer expires
                //
                // when the timer expires, we send a message to the client.
                std::cout << "server: reading/waiting\n";
                example::v1::Request cmd;
                auto result = co_await(agrpc::read(reader_writer, cmd) ||
                    alarm.wait(std::chrono::system_clock::now() + std::chrono::milliseconds(duration_msec)));
                if (result.index() == 0)
                {
                    if (std::get<0>(result))
                    {
                        std::cout << "server: got streaming message\n";
                    }
                    else
                    {
                        std::cout << "server: read failed\n";
                        break;
                    }
                }
                else
                {
                    std::cout << "server: alarm expired\n";
                    example::v1::Response resp;
                    co_await agrpc::write(reader_writer, resp);
                }
            }

            co_await agrpc::finish(reader_writer, ::grpc::Status::OK);
        }
    ));
    
    // bring in the server shutdown functionality
    example::ServerShutdown server_shutdown{*server, grpc_context};

    // set up a client stub for this service
    example::v1::Example::Stub stub(::grpc::CreateChannel(host, ::grpc::InsecureChannelCredentials()));
    // spawn a coroutine that will talk to the bidirectional streaming endpoint
    asio::co_spawn(grpc_context,
        [&]() -> asio::awaitable<void>
        {
            // create an RPC to the BidirectionalStreaming interface
            using RPC = example::AwaitableClientRPC<&example::v1::Example::Stub::PrepareAsyncBidirectionalStreaming>;
            RPC rpc(grpc_context);

            // start the RPC
            std::cout << "client: starting RPC\n";
            if (!co_await rpc.start(stub)) co_return;

            // send the first streaming message to the server
            std::cout << "client: writing streaming message\n";
            example::v1::Request cmd;
            auto write_ok = co_await rpc.write(cmd);

            // now, just wait forever for streaming responses back from the server until a read fails
            bool read_ok = true;
            while (write_ok && read_ok)
            {
                std::cout << "client: waiting for streaming message\n";
                example::v1::Response resp;
                read_ok = co_await rpc.read(resp);
            }

            std::cout << "client: done\n";
        },
        asio::detached);

    // run the gRPC context thread
    grpc_context.run();

    std::cout << "Shutdown completed\n";
}

My intended behavior when running the program would be:

  • The server starts.
  • The client coroutine initiates a BidirectionalStreaming RPC.
  • The client coroutine sends a Request streaming message to the server.
  • The server receives the Request.
  • The server sends Response streaming messages periodically to the client (indefinitely in this simple example)
  • At some point later, I can send SIGINT to the process and the ServerShutdown will shut down the server, the next read/write on the RPC fail for the server/client, and the coroutines exit cleanly.

I'm not seeing what I expected, likely because I'm missing something fundamental about how this should work. When I run the above example program, I get:

[user@host:~/git/asio-grpc/build]$ example/asio-grpc-example-streaming-server
client: starting RPC
client: writing streaming message
client: waiting for streaming message
server: started request
server: reading/waiting
server: got streaming message
server: reading/waiting

And nothing else. Specifically, the coroutine handling the BidirectionalStreaming RPC on the server side seems to get stuck in the co_await that should complete when either a new message is received or the alarm expires. The alarm expiration doesn't ever seem to fire.

Furthermore, if I try to send SIGINT or SIGTERM to the process, it does not respond. If I take the co_spawn for the client coroutine out, then I can at least use the signals to shut down the server cleanly.

Should this type of strategy with with asio-grpc?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions