Skip to content

pubsub/awssnssqs: Add support for setting FIFO message metadata #3434

@bartventer

Description

@bartventer

Describe the bug

The gocloud.dev/pubsub package doesn't allow including the MessageGroupId parameter in the metadata (or any other metadata) when publishing to AWS SNS FIFO Topics or AWS SQS FIFO queues. This is a limitation as it's a required parameter. The current workaround, using the BeforeSend method, lacks portability. It would be beneficial to support MessageGroupId in the message metadata.

Error when publishing to an SNS FIFO topic:

Error sending message: pubsub (code=InvalidArgument): InvalidParameter: Invalid parameter: The MessageGroupId parameter is required for FIFO topics.

Error when publishing to an SQS FIFO queue:

Error sending message: pubsub (code=Unknown): InvalidParameterValue: The request must contain the parameter MessageGroupId

To Reproduce

Directory structure:

.
├── main.tf
└── main_test.go

main.tf:

terraform {
	required_version = ">= 1.7.5"
  
	required_providers {
	  aws = {
		source  = "hashicorp/aws"
		version = "~> 5.44.0"
	  }
	}
  }
  
  variable "region" {
	type        = string
	description = "Region to create resources in. See https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html for valid values."
	default     = "us-west-2"
  }
  
  provider "aws" {
	region = var.region
  }
  
  resource "aws_sns_topic" "fifo" {
	name                        = "gocdk-topic.fifo"
	fifo_topic                  = true
	content_based_deduplication = true
  }
  
  resource "aws_sqs_queue" "fifo" {
	name                        = "gocdk-queue.fifo"
	fifo_queue                  = true
	content_based_deduplication = true
  }
  
  output "region" {
	value       = var.region
	description = "The region in which resources were created"
  }
  
  output "sns_topic_arn" {
	value       = aws_sns_topic.fifo.arn
	description = "The ARN of the SNS FIFO topic"
  }
  
  
  output "sqs_queue_url" {
	value       = substr(aws_sqs_queue.fifo.url, 8, length(aws_sqs_queue.fifo.url) - 8)
	description = "The URL of the SQS FIFO queue (formatted without the protocol prefix)"
  }

main_test.go:

package main

import (
	"context"
	"testing"

	"gocloud.dev/pubsub"
	_ "gocloud.dev/pubsub/awssnssqs"
)

// Before running go test, run in this directory:
//  $ terraform init
//  $ terraform apply
// When you're done testing run the following to clean up:
//  $ terraform destroy -auto-approve

// Update these constants with the outputs from terraform apply (or terraform output after apply).
const (
	awsRegion      = "us-west-2"
	awsSNSTopicARN = "arn:aws:sns:us-west-2:252051715350:gocdk-topic.fifo"
	awsSQSQueueURL = "sqs.us-west-2.amazonaws.com/252051715350/gocdk-queue.fifo"
)

func TestFIFOPublish(t *testing.T) {
	message := pubsub.Message{
		Body: []byte("Hello, World!"),
		Metadata: map[string]string{
			// It would be nice to be able to do this
			"DeduplicationId": "1",
			"MessageGroupId":  "1",
		},
	}
	t.Run("SNSTopic", func(t *testing.T) {
		ctx := context.Background()
		topic, err := pubsub.OpenTopic(ctx, "awssns:///"+awsSNSTopicARN+"?region="+awsRegion)
		if err != nil {
			t.Error("Error opening SNS topic:", err)
		}
		defer topic.Shutdown(ctx)
		err = topic.Send(ctx, &message)
		if err != nil {
			t.Error("Error sending message:", err)
		}
	})

	t.Run("SQSQueue", func(t *testing.T) {
		ctx := context.Background()
		queue, err := pubsub.OpenTopic(ctx, "awssqs://"+awsSQSQueueURL+"?region="+awsRegion)
		if err != nil {
			t.Error("Error opening SQS queue:", err)
		}
		defer queue.Shutdown(ctx)
		err = queue.Send(ctx, &message)
		if err != nil {
			t.Error("Error sending message:", err)
		}
	})
}

Expected behavior

Ideally, the MessageGroupId should be able to be included in the message metadata, allowing the message to be published to the SNS FIFO topic and SQS FIFO queue without any errors.

Version

gocloud.dev/pubsub v0.37.0

Additional context

N/A

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions