The chans
package provides generic channel operations to help you build concurrent pipelines in Go. It aims to be flexible, unopinionated, and composable, without over-abstracting or taking control away from the developer.
// Given a channel of documents.
docs := make(chan []string, 10)
docs <- []string{"go", "is", "awesome"}
docs <- []string{"cats", "are", "cute"}
close(docs)
// Extract all words from the documents.
words := make(chan string, 10)
chans.Flatten(ctx, words, docs)
close(words)
// Calculate the total byte count of all words.
step := func(acc int, word string) int { return acc + len(word) }
count := chans.Reduce(ctx, words, 0, step)
fmt.Println("byte count =", count)
// byte count = 22
You can find function signatures and usage examples at the links below, or check out the full package documentation.
The golden trio:
- Filter sends values from the input channel to the output if a predicate returns true.
- Map reads values from the input channel, applies a function, and sends the result to the output.
- Reduce combines all values from the input channel into one using a function and returns the result.
Filtering and sampling:
- FilterOut ignores values from the input channel if a predicate returns true, otherwise sends them to the output.
- Drop skips the first N values from the input channel and sends the rest to the output.
- DropWhile skips values from the input channel as long as a predicate returns true, then sends the rest to the output.
- Take sends up to N values from the input channel to the output.
- TakeNth sends every Nth value from the input channel to the output.
- TakeWhile sends values from the input channel to the output while a predicate returns true.
- First returns the first value from the input channel that matches a predicate.
Batching and windowing:
- Chunk groups values from the input channel into fixed-size slices and sends them to the output.
- ChunkBy groups consecutive values from the input channel into slices whenever the key function's result changes.
- Flatten reads slices from the input channel and sends their elements to the output in order.
De-duplication:
- Compact sends values from the input channel to the output, skipping consecutive duplicates.
- CompactBy sends values from the input channel to the output, skipping consecutive duplicates as determined by a custom equality function.
- Distinct sends values from the input channel to the output, skipping all duplicates.
- DistinctBy sends values from the input channel to the output, skipping duplicates as determined by a key function.
Routing:
- Broadcast sends every value from the input channel to all output channels.
- Split sends values from the input channel to output channels in round-robin fashion.
- Partition sends values from the input channel to one of two outputs based on a predicate.
- Merge concurrently sends values from multiple input channels to the output, with no guaranteed order.
- Concat sends values from multiple input channels to the output, processing each input channel in order.
- Drain consumes and discards all values from the input channel.
I think third-party concurrency packages are often too opinionated and try to hide too much complexity. As a result, they end up being inflexible and don't fit a lot of use cases.
For example, here's how you use the Map
function from the rill package:
// Concurrency = 3
users := rill.Map(ids, 3, func(id int) (*User, error) {
return db.GetUser(ctx, id)
})
The code looks simple, but it makes Map
pretty opinionated and not very flexible:
- The function is non-blocking and spawns a goroutine. There is no way to change this.
- The function doesn't exit early on error. There is no way to change this.
- The function creates the output channel. There is no way to control its buffering or lifecycle.
- The function can't be canceled.
- The function requires the developer to use a custom
Try[T]
type for both input and output channels. - The "N workers" logic is baked in, so you can't use a custom concurrent group implementation.
While this approach works for many developers, I personally don't like it. With chans
, my goal was to offer a fairly low-level set of composable channel operations and let developers decide how to use them.
For comparison, here's how you use the chans.Map
function:
err := chans.Map(ctx, users, ids, func(id int) (*User, error) {
return db.GetUser(ctx, id)
})
chans.Map
only implements the core mapping logic:
- Reads values from the input channel.
- Calls the mapping function on each value.
- Writes results to the output channel.
- Stops if there's an error or if the context is canceled.
- Does not start any additional goroutines.
You decide the rest:
- Want Map to be non-blocking? Run it in a goroutine.
- Don't want to exit early? Gather the errors instead of returning them.
- Want to buffer the output channel or keep it opened? You have full control.
- Need to process input in parallel? Use
errgroup.Group
, orsync.WaitGroup
, or any other implementation.
The same applies to other channel operations.
Contributions are welcome. For anything other than bug fixes, please open an issue first to discuss what you want to change.
Make sure to add or update tests as needed.
Created by Anton Zhiyanov. Released under the MIT License.