-
-
Notifications
You must be signed in to change notification settings - Fork 72
Closed
Description
We currently have a use case for using pond to parallel work being pulled down from a queue. We initialize pond with a max concurrency and queue size and allow it to run without waiting on any tasks to complete. However on some applications we see pond because non-responsive and seems to lock up without accepting anymore messages to SubmitErr
Below is some code in which I was able to replicate the issue.
package main
import (
"fmt"
"github.com/alitto/pond/v2"
log "github.com/sirupsen/logrus"
"time"
)
func main() {
pool := pond.NewPool(100,pond.WithQueueSize(5))
logPoolMetrics(pool)
//infinite loop to mimic reading from a queue
for {
for i := 0; i < 300; i++ {
pool.SubmitErr(func() error {
fmt.Println("HELLO WORLD")
return nil
})
}
}
}
func logPoolMetrics(pool pond.Pool) {
timer := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case <-timer.C:
log.WithField("completed_tasks", pool.CompletedTasks()).
WithField("failed_tasks", pool.FailedTasks()).
WithField("successful_tasks", pool.SuccessfulTasks()).
WithField("submitted_tasks", pool.SubmittedTasks()).
WithField("running_workers", pool.RunningWorkers()).
WithField("max_concurrency", pool.MaxConcurrency()).
WithField("drop_tasks",pool.DroppedTasks()).
WithField("queue_size", pool.QueueSize()).
Info("pool metrics")
}
}
}()
}
Metadata
Metadata
Assignees
Labels
No labels