Skip to content

MaxOutstandingMessages setting is not always observed. #1074

@anasanzari

Description

@anasanzari

Client PubSub

Describe Your Environment

I am running pubsub client on Ubuntu 16.04. The following snippet explains my situation.

func TestPull(t *testing.T){

	client := pubsubclient.GetPubSubClient()
	ctx := context.Background()

	mu := sync.Mutex{}
	count := 0

	waitAndAck := func(msg *pubsub.Message) {
		time.Sleep(10 * time.Second)
		msg.Ack()
		mu.Lock()
		defer mu.Unlock()
		count--
		logrus.Println("count-- :", count)
	}

	sub := client.Subscription("testtoppicsubs")
	sub.ReceiveSettings.NumGoroutines = 1
	sub.ReceiveSettings.MaxOutstandingMessages = 10
	err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Printf("Got message: %q\n", string(msg.Data))
		mu.Lock()
		count++
		logrus.Println("count++ :", count) //shouldn't go beyond 10.
		mu.Unlock()
		go waitAndAck(msg)
	})

	if err != nil {
		t.Log("Error : ",err)
	}

}

Expected Behavior

As documented, sub.Receive should call f only 10 times, as msg acking are delayed. But count goes beyond 10 as soon as it run.

Actual Behavior

ReceiveSettings.MaxOutstandingMessages is not working as it is expected.

Metadata

Metadata

Assignees

Labels

triage meI really want to be triaged.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions