-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Closed
Labels
triage meI really want to be triaged.I really want to be triaged.
Description
Client PubSub
Describe Your Environment
I am running pubsub client on Ubuntu 16.04. The following snippet explains my situation.
func TestPull(t *testing.T){
client := pubsubclient.GetPubSubClient()
ctx := context.Background()
mu := sync.Mutex{}
count := 0
waitAndAck := func(msg *pubsub.Message) {
time.Sleep(10 * time.Second)
msg.Ack()
mu.Lock()
defer mu.Unlock()
count--
logrus.Println("count-- :", count)
}
sub := client.Subscription("testtoppicsubs")
sub.ReceiveSettings.NumGoroutines = 1
sub.ReceiveSettings.MaxOutstandingMessages = 10
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
fmt.Printf("Got message: %q\n", string(msg.Data))
mu.Lock()
count++
logrus.Println("count++ :", count) //shouldn't go beyond 10.
mu.Unlock()
go waitAndAck(msg)
})
if err != nil {
t.Log("Error : ",err)
}
}
Expected Behavior
As documented, sub.Receive should call f only 10 times, as msg acking are delayed. But count goes beyond 10 as soon as it run.
Actual Behavior
ReceiveSettings.MaxOutstandingMessages is not working as it is expected.
Metadata
Metadata
Assignees
Labels
triage meI really want to be triaged.I really want to be triaged.