Skip to content

Proxy doesn't clean up after buffer sees a client close? #899

@sfroment

Description

@sfroment

Hello,

I'm getting turning buffer 500, with my small understanding of conduit the conduit flow of my request is svc-b -> conduit_proxy of svc-b -> conduit_proxy of svc-a -> svc->a
1st request before kill

DBUG conduit_proxy::telemetry::tap "controller-client", inspect event=StreamRequestOpen(Request { id: 37, uri: http://svc-a.internal.svc.cluster.local/Login, method: POST, server: Server { proxy: Outbound(Process { scheduled_namespace: "internal", start_time: SystemTime { tv_sec: 1525372698, tv_nsec: 663213849 } }), remote: V4(10.20.1.166:34804), local: V4(127.0.0.1:4140), orig_dst: Some(V4(10.20.0.212:9001)), protocol: Http }, client: Client { proxy: Outbound(Process { scheduled_namespace: "internal", start_time: SystemTime { tv_sec: 1525372698, tv_nsec: 663213849 } }), remote: V4(10.20.1.165:9001), protocol: Http, dst_labels: Some(Watch { shared: Shared { value: RwLock { data: Some(DstLabels { formatted: "dst_namespace=\"internal\",dst_service=\"svc-a\",dst_pod=\"svc-a-74477fdcb8-qgzf4\",dst_deployment=\"svc-a\",dst_pod_template_hash=\"3003398764\"", original: {"service": "svc-a", "pod_template_hash": "3003398764", "namespace": "internal", "deployment": "svc-a", "pod": "svc-a-74477fdcb8-qgzf4"} }) }, version: AtomicUsize(0), watchers: Mutex { data: Watchers { next_id: 3, watchers: {2: WatchInner { task: AtomicTask }} } }, cancel: AtomicTask }, inner: WatchInner { task: AtomicTask }, id: 2, ver: 0 }) } }) with tap=Tap { match_: All([]), tx: Sender { capacity: AtomicUsize(100) } }
DBUG conduit_proxy::telemetry::tap "controller-client", inspect event=StreamRequestEnd(Request { id: 37, uri: http://svc-a.internal.svc.cluster.local/Login, method: POST, server: Server { proxy: Outbound(Process { scheduled_namespace: "internal", start_time: SystemTime { tv_sec: 1525372698, tv_nsec: 663213849 } }), remote: V4(10.20.1.166:34804), local: V4(127.0.0.1:4140), orig_dst: Some(V4(10.20.0.212:9001)), protocol: Http }, client: Client { proxy: Outbound(Process { scheduled_namespace: "internal", start_time: SystemTime { tv_sec: 1525372698, tv_nsec: 663213849 } }), remote: V4(10.20.1.165:9001), protocol: Http, dst_labels: Some(Watch { shared: Shared { value: RwLock { data: Some(DstLabels { formatted: "dst_namespace=\"internal\",dst_service=\"svc-a\",dst_pod=\"svc-a-74477fdcb8-qgzf4\",dst_deployment=\"svc-a\",dst_pod_template_hash=\"3003398764\"", original: {"service": "svc-a", "pod_template_hash": "3003398764", "namespace": "internal", "deployment": "svc-a", "pod": "svc-a-74477fdcb8-qgzf4"} }) }, version: AtomicUsize(0), watchers: Mutex { data: Watchers { next_id: 3, watchers: {2: WatchInner { task: AtomicTask }} } }, cancel: AtomicTask }, inner: WatchInner { task: AtomicTask }, id: 2, ver: 0 }) } }, StreamRequestEnd { since_request_open: Duration { secs: 0, nanos: 139097 } }) with tap=Tap { match_: All([]), tx: Sender { capacity: AtomicUsize(99) } }
DBUG conduit_proxy::telemetry::tap "controller-client", inspect event=StreamResponseOpen(Response { request: Request { id: 37, uri: http://svc-a.internal.svc.cluster.local/Login, method: POST, server: Server { proxy: Outbound(Process { scheduled_namespace: "internal", start_time: SystemTime { tv_sec: 1525372698, tv_nsec: 663213849 } }), remote: V4(10.20.1.166:34804), local: V4(127.0.0.1:4140), orig_dst: Some(V4(10.20.0.212:9001)), protocol: Http }, client: Client { proxy: Outbound(Process { scheduled_namespace: "internal", start_time: SystemTime { tv_sec: 1525372698, tv_nsec: 663213849 } }), remote: V4(10.20.1.165:9001), protocol: Http, dst_labels: Some(Watch { shared: Shared { value: RwLock { data: Some(DstLabels { formatted: "dst_namespace=\"internal\",dst_service=\"svc-a\",dst_pod=\"svc-a-74477fdcb8-qgzf4\",dst_deployment=\"svc-a\",dst_pod_template_hash=\"3003398764\"", original: {"service": "svc-a", "pod_template_hash": "3003398764", "namespace": "internal", "deployment": "svc-a", "pod": "svc-a-74477fdcb8-qgzf4"} }) }, version: AtomicUsize(0), watchers: Mutex { data: Watchers { next_id: 3, watchers: {2: WatchInner { task: AtomicTask }} } }, cancel: AtomicTask }, inner: WatchInner { task: AtomicTask }, id: 2, ver: 0 }) } }, status: 200 }, StreamResponseOpen { since_request_open: Duration { secs: 4, nanos: 800338855 } }) with tap=Tap { match_: All([]), tx: Sender { capacity: AtomicUsize(100) } }
DBUG conduit_proxy::telemetry::tap "controller-client", inspect event=StreamResponseEnd(Response { request: Request { id: 37, uri: http://svc-a.internal.svc.cluster.local/Login, method: POST, server: Server { proxy: Outbound(Process { scheduled_namespace: "internal", start_time: SystemTime { tv_sec: 1525372698, tv_nsec: 663213849 } }), remote: V4(10.20.1.166:34804), local: V4(127.0.0.1:4140), orig_dst: Some(V4(10.20.0.212:9001)), protocol: Http }, client: Client { proxy: Outbound(Process { scheduled_namespace: "internal", start_time: SystemTime { tv_sec: 1525372698, tv_nsec: 663213849 } }), remote: V4(10.20.1.165:9001), protocol: Http, dst_labels: Some(Watch { shared: Shared { value: RwLock { data: Some(DstLabels { formatted: "dst_namespace=\"internal\",dst_service=\"svc-a\",dst_pod=\"svc-a-74477fdcb8-qgzf4\",dst_deployment=\"svc-a\",dst_pod_template_hash=\"3003398764\"", original: {"service": "svc-a", "pod_template_hash": "3003398764", "namespace": "internal", "deployment": "svc-a", "pod": "svc-a-74477fdcb8-qgzf4"} }) }, version: AtomicUsize(0), watchers: Mutex { data: Watchers { next_id: 3, watchers: {2: WatchInner { task: AtomicTask }} } }, cancel: AtomicTask }, inner: WatchInner { task: AtomicTask }, id: 2, ver: 0 }) } }, status: 200 }, StreamResponseEnd { grpc_status: Some(0), since_request_open: Duration { secs: 4, nanos: 800367189 }, since_response_open: Duration { secs: 0, nanos: 15201 }, bytes_sent: 1081, frames_sent: 1 }) with tap=Tap { match_: All([]), tx: Sender { capacity: AtomicUsize(99) } }

2nd request after the kill

DBUG conduit_proxy::telemetry::tap "controller-client", inspect event=StreamRequestOpen(Request { id: 53, uri: http://svc-a.internal.svc.cluster.local/Login, method: POST, server: Server { proxy: Outbound(Process { scheduled_namespace: "internal", start_time: SystemTime { tv_sec: 1525372698, tv_nsec: 663213849 } }), remote: V4(10.20.1.166:34804), local: V4(127.0.0.1:4140), orig_dst: Some(V4(10.20.0.212:9001)), protocol: Http }, client: Client { proxy: Outbound(Process { scheduled_namespace: "internal", start_time: SystemTime { tv_sec: 1525372698, tv_nsec: 663213849 } }), remote: V4(10.20.4.103:9001), protocol: Http, dst_labels: Some(Watch { shared: Shared { value: RwLock { data: Some(DstLabels { formatted: "dst_namespace=\"internal\",dst_service=\"svc-a\",dst_pod=\"svc-a-74477fdcb8-t4l9k\",dst_deployment=\"svc-a\",dst_pod_template_hash=\"3003398764\"", original: {"namespace": "internal", "pod_template_hash": "3003398764", "pod": "svc-a-74477fdcb8-t4l9k", "service": "svc-a", "deployment": "svc-a"} }) }, version: AtomicUsize(0), watchers: Mutex { data: Watchers { next_id: 3, watchers: {2: WatchInner { task: AtomicTask }} } }, cancel: AtomicTask }, inner: WatchInner { task: AtomicTask }, id: 2, ver: 0 }) } }) with tap=Tap { match_: All([]), tx: Sender { capacity: AtomicUsize(100) } }
DBUG conduit_proxy::telemetry::tap "controller-client", inspect event=StreamRequestEnd(Request { id: 53, uri: http://svc-a.internal.svc.cluster.local/Login, method: POST, server: Server { proxy: Outbound(Process { scheduled_namespace: "internal", start_time: SystemTime { tv_sec: 1525372698, tv_nsec: 663213849 } }), remote: V4(10.20.1.166:34804), local: V4(127.0.0.1:4140), orig_dst: Some(V4(10.20.0.212:9001)), protocol: Http }, client: Client { proxy: Outbound(Process { scheduled_namespace: "internal", start_time: SystemTime { tv_sec: 1525372698, tv_nsec: 663213849 } }), remote: V4(10.20.4.103:9001), protocol: Http, dst_labels: Some(Watch { shared: Shared { value: RwLock { data: Some(DstLabels { formatted: "dst_namespace=\"internal\",dst_service=\"svc-a\",dst_pod=\"svc-a-74477fdcb8-t4l9k\",dst_deployment=\"svc-a\",dst_pod_template_hash=\"3003398764\"", original: {"namespace": "internal", "pod_template_hash": "3003398764", "pod": "svc-a-74477fdcb8-t4l9k", "service": "svc-a", "deployment": "svc-a"} }) }, version: AtomicUsize(0), watchers: Mutex { data: Watchers { next_id: 3, watchers: {2: WatchInner { task: AtomicTask }} } }, cancel: AtomicTask }, inner: WatchInner { task: AtomicTask }, id: 2, ver: 0 }) } }, StreamRequestEnd { since_request_open: Duration { secs: 0, nanos: 119650 } }) with tap=Tap { match_: All([]), tx: Sender { capacity: AtomicUsize(99) } }
DBUG conduit_proxy::telemetry::tap "controller-client", inspect event=StreamResponseOpen(Response { request: Request { id: 53, uri: http://svc-a.internal.svc.cluster.local/Login, method: POST, server: Server { proxy: Outbound(Process { scheduled_namespace: "internal", start_time: SystemTime { tv_sec: 1525372698, tv_nsec: 663213849 } }), remote: V4(10.20.1.166:34804), local: V4(127.0.0.1:4140), orig_dst: Some(V4(10.20.0.212:9001)), protocol: Http }, client: Client { proxy: Outbound(Process { scheduled_namespace: "internal", start_time: SystemTime { tv_sec: 1525372698, tv_nsec: 663213849 } }), remote: V4(10.20.4.103:9001), protocol: Http, dst_labels: Some(Watch { shared: Shared { value: RwLock { data: Some(DstLabels { formatted: "dst_namespace=\"internal\",dst_service=\"svc-a\",dst_pod=\"svc-a-74477fdcb8-t4l9k\",dst_deployment=\"svc-a\",dst_pod_template_hash=\"3003398764\"", original: {"namespace": "internal", "pod_template_hash": "3003398764", "pod": "svc-a-74477fdcb8-t4l9k", "service": "svc-a", "deployment": "svc-a"} }) }, version: AtomicUsize(0), watchers: Mutex { data: Watchers { next_id: 3, watchers: {2: WatchInner { task: AtomicTask }} } }, cancel: AtomicTask }, inner: WatchInner { task: AtomicTask }, id: 2, ver: 0 }) } }, status: 500 }, StreamResponseOpen { since_request_open: Duration { secs: 0, nanos: 1028917 } }) with tap=Tap { match_: All([]), tx: Sender { capacity: AtomicUsize(100) } }
DBUG conduit_proxy::telemetry::tap "controller-client", inspect event=StreamResponseEnd(Response { request: Request { id: 53, uri: http://svc-a.internal.svc.cluster.local/Login, method: POST, server: Server { proxy: Outbound(Process { scheduled_namespace: "internal", start_time: SystemTime { tv_sec: 1525372698, tv_nsec: 663213849 } }), remote: V4(10.20.1.166:34804), local: V4(127.0.0.1:4140), orig_dst: Some(V4(10.20.0.212:9001)), protocol: Http }, client: Client { proxy: Outbound(Process { scheduled_namespace: "internal", start_time: SystemTime { tv_sec: 1525372698, tv_nsec: 663213849 } }), remote: V4(10.20.4.103:9001), protocol: Http, dst_labels: Some(Watch { shared: Shared { value: RwLock { data: Some(DstLabels { formatted: "dst_namespace=\"internal\",dst_service=\"svc-a\",dst_pod=\"svc-a-74477fdcb8-t4l9k\",dst_deployment=\"svc-a\",dst_pod_template_hash=\"3003398764\"", original: {"namespace": "internal", "pod_template_hash": "3003398764", "pod": "svc-a-74477fdcb8-t4l9k", "service": "svc-a", "deployment": "svc-a"} }) }, version: AtomicUsize(0), watchers: Mutex { data: Watchers { next_id: 3, watchers: {2: WatchInner { task: AtomicTask }} } }, cancel: AtomicTask }, inner: WatchInner { task: AtomicTask }, id: 2, ver: 0 }) } }, status: 500 }, StreamResponseEnd { grpc_status: None, since_request_open: Duration { secs: 0, nanos: 1039716 }, since_response_open: Duration { secs: 0, nanos: 0 }, bytes_sent: 0, frames_sent: 0 }) with tap=Tap { match_: All([]), tx: Sender { capacity: AtomicUsize(99) } }

In the conduit proxy logs of svc-a:

DBUG tokio_core::reactor loop poll - Duration { secs: 9, nanos: 287922607 }
DBUG tokio_core::reactor loop time - Instant { tv_sec: 729258, tv_nsec: 537438748 }
DBUG tokio_core::reactor notifying a task handle
DBUG tokio_core::reactor loop process - 1 events, Duration { secs: 0, nanos: 112986 }
DBUG tokio_core::reactor loop poll - Duration { secs: 0, nanos: 8245 }
DBUG tokio_core::reactor loop time - Instant { tv_sec: 729258, tv_nsec: 537575112 }
DBUG h2::codec::framed_read received; frame=Frame::Headers(Headers { stream_id: StreamId(21), stream_dep: None, flags: HeadersFlag { end_stream: false, end_headers: true, padded: false, priority: false } })
DBUG h2::codec::framed_read received; frame=Frame::Data(Data { stream_id: StreamId(21), flags: DataFlags { end_stream: true }, pad_len: None })
DBUG tokio_core::reactor scheduling direction for: 37
DBUG tokio_core::reactor blocking
DBUG tokio_core::reactor scheduling direction for: 37
DBUG tokio_core::reactor blocking
DBUG tokio_core::reactor loop process - 1 events, Duration { secs: 0, nanos: 461020 }
DBUG tokio_core::reactor loop poll - Duration { secs: 0, nanos: 3092 }
DBUG tokio_core::reactor loop time - Instant { tv_sec: 729258, tv_nsec: 538058390 }
ERR! conduit_proxy turning buffer closed into 500

You can reproduce the error with the demo app just scale the voting pods to 2.
I've changed a bit the server.go file to get a grpc client that do many request

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"

	pb "github.com/runconduit/conduit-examples/emojivoto/emojivoto-web/gen/proto"
	"google.golang.org/grpc"
)

var (
	webPort              = os.Getenv("WEB_PORT")
	emojisvcHost         = "emoji-svc.emojivoto.svc.cluster.local:8080"
	votingsvcHost        = "voting-svc.emojivoto.svc.cluster.local:8080"
	indexBundle          = os.Getenv("INDEX_BUNDLE")
	webpackDevServerHost = os.Getenv("WEBPACK_DEV_SERVER")
)

func main() {
	votingSvcConn := openGrpcClientConnection(votingsvcHost)
	votingClient := pb.NewVotingServiceClient(votingSvcConn)
	defer votingSvcConn.Close()

	c := make(chan os.Signal, 1)
	signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
	log.Printf("Start voting")
	ctx := context.Background()
	for i := 0; i < 4; i++ {
		go func() {
			for {
				voteRequest := &pb.VoteRequest{}
				votingClient.Vote100(ctx, voteRequest)
			}
		}()
	}
	<-c
}

func openGrpcClientConnection(host string) *grpc.ClientConn {
	log.Printf("Connecting to [%s]", host)
	conn, err := grpc.Dial(host, grpc.WithInsecure())
	if err != nil {
		panic(err)
	}
	return conn
}

then just run the go and you'll see error rapidly on voting

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions