A Go package that provides a channel-based adapter for ZeroMQ sockets, bridging ZMQ's message-passing model with Go's channel-based concurrency model.
- Go Channel Interface: Provide Go-native channels for ZMQ message sending and receiving
- All ZMQ Socket Types: Supports REQ/REP, DEALER/ROUTER, PUB/SUB, PUSH/PULL, PAIR, and more
- Optimized Performance: Runs minimal goroutines based on socket capabilities, reducing resource usage
- Thread-Safe: Concurrent access to channels is handled safely
- Multi-part Messages: Full support for ZMQ multi-part messages
- Context Cancellation: Graceful shutdown using Go's context pattern
- Comprehensive Testing: Comprehensive test coverage for various socket types and scenarios
go get github.com/thinkdoggie/zmq4chan
This package depends on zmq4, which requires ZeroMQ 4.x to be installed on your system.
macOS:
brew install zeromq
Ubuntu/Debian:
sudo apt-get install libzmq3-dev
RHEL/CentOS:
sudo yum install zeromq-devel
Here's a simple REQ/REP example:
package main
import (
"context"
"fmt"
"log"
"time"
zmq "github.com/pebbe/zmq4"
"github.com/thinkdoggie/zmq4chan"
)
func main() {
// Create REP socket (server)
repSocket, err := zmq.NewSocket(zmq.REP)
if err != nil {
log.Fatal(err)
}
defer repSocket.Close()
err = repSocket.Bind("tcp://*:5555")
if err != nil {
log.Fatal(err)
}
// Create channel adapter for REP socket
repAdapter := zmq4chan.NewChanAdapter(repSocket, 10, 10)
defer repAdapter.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
repAdapter.Start(ctx)
// Server goroutine
go func() {
for {
select {
case msg := <-repAdapter.RxChan():
fmt.Printf("Server received: %s\n", string(msg[0]))
// Echo back the message
reply := [][]byte{[]byte("Echo: " + string(msg[0]))}
repAdapter.TxChan() <- reply
case <-ctx.Done():
return
}
}
}()
// Create REQ socket (client)
reqSocket, err := zmq.NewSocket(zmq.REQ)
if err != nil {
log.Fatal(err)
}
defer reqSocket.Close()
err = reqSocket.Connect("tcp://localhost:5555")
if err != nil {
log.Fatal(err)
}
reqAdapter := zmq4chan.NewChanAdapter(reqSocket, 10, 10)
defer reqAdapter.Close()
reqAdapter.Start(ctx)
// Send a request
request := [][]byte{[]byte("Hello, World!")}
reqAdapter.TxChan() <- request
// Receive reply
select {
case reply := <-reqAdapter.RxChan():
fmt.Printf("Client received: %s\n", string(reply[0]))
case <-time.After(5 * time.Second):
fmt.Println("Request timeout")
}
}
For complete API documentation, visit pkg.go.dev/github.com/thinkdoggie/zmq4chan.
The main type that bridges ZMQ sockets with Go channels.
type ChanAdapter struct {
// ... internal fields
}
Function | Description |
---|---|
NewChanAdapter(socket, rxSize, txSize) |
Creates a new adapter for a ZMQ socket |
Start(ctx) |
Starts the adapter with context for cancellation |
RxChan() |
Returns receive-only channel for incoming messages |
TxChan() |
Returns send-only channel for outgoing messages |
Close() |
Gracefully shuts down the adapter |
Messages are represented as [][]byte
where each []byte
is a message part:
- Single-part:
[][]byte{[]byte("hello")}
- Multi-part:
[][]byte{[]byte("part1"), []byte("part2")}
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -am 'Add some amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
This project is licensed under the BSD 2-Clause License - see the LICENSE file for details.
- Built on top of the excellent zmq4 Go bindings
- Inspired by the need to integrate ZMQ with idiomatic Go patterns