Skip to content

Conversation

soharab-ic
Copy link
Contributor

@soharab-ic soharab-ic commented May 24, 2024

Description

This PR reverts commit f44174d of the PR #2875.

This PR adds fission mqtrigger for kafka which implemented old approach.

https://fission.io/docs/usage/triggers/message-queue-trigger/kafka/

Which issue(s) this PR fixes:

Fixes #

Testing

Follow this example kafka-mqt-fission and generate specs as follows:

fission spec init
fission env create --spec --name go --image fission/go-env-1.16:1.32.1 --builder fission/go-builder-1.16:1.32.1
fission package create --spec --src producer.zip --env go --name kafka-producer
fission package create --spec --src consumer.zip --env go --name kafka-consumer
fission fn create --spec --name kafka-producer --env go --pkg kafka-producer --entrypoint Handler --configmap mqt-kafka-configmap
fission fn create --spec --name kafka-consumer --env go --pkg kafka-consumer --entrypoint Handler
fission mqt create --spec --name kafkatest --function kafka-consumer --mqtype kafka --mqtkind fission --topic request-topic  --resptopic response-topic  --errortopic error-topic

Invoke producer and watch over consumer logs.

fission fn test --name kafka-producer
fission fn log --name kafka-consumer

Checklist:

  • I ran tests as well as code linting locally to verify my changes.
  • I have done manual verification of my changes, changes working as expected.
  • I have added new tests to cover my changes.
  • My changes follow contributing guidelines of Fission.
  • I have signed all of my commits.

@soharab-ic soharab-ic requested a review from sanketsudake May 24, 2024 06:08
Copy link

codecov bot commented May 24, 2024

Codecov Report

Attention: Patch coverage is 12.76949% with 526 lines in your changes are missing coverage. Please review.

Project coverage is 44.57%. Comparing base (f7e9e71) to head (066bfc6).

Files Patch % Lines
pkg/mqtrigger/messageQueue/kafka/consumer.go 0.00% 187 Missing ⚠️
pkg/mqtrigger/messageQueue/kafka/kafka.go 1.89% 155 Missing ⚠️
pkg/mqtrigger/mqtmanager.go 38.28% 76 Missing and 3 partials ⚠️
cmd/fission-bundle/mqtrigger/mqtrigger.go 0.00% 65 Missing ⚠️
cmd/fission-bundle/main.go 0.00% 12 Missing ⚠️
pkg/mqtrigger/validator/validator.go 47.61% 9 Missing and 2 partials ⚠️
pkg/mqtrigger/factory/factory.go 37.50% 8 Missing and 2 partials ⚠️
pkg/mqtrigger/metrics.go 69.23% 4 Missing ⚠️
pkg/apis/core/v1/validation.go 0.00% 0 Missing and 2 partials ⚠️
pkg/fission-cli/cmd/mqtrigger/create.go 0.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2946      +/-   ##
==========================================
- Coverage   45.37%   44.57%   -0.80%     
==========================================
  Files         230      236       +6     
  Lines       23619    24216     +597     
==========================================
+ Hits        10717    10795      +78     
- Misses      11506    12019     +513     
- Partials     1396     1402       +6     
Flag Coverage Δ
unittests 44.57% <12.76%> (-0.80%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link

sweep-ai bot commented May 24, 2024

Sweep: PR Review

Authors of pull request: @soharab-ic

This pull request added support for Kafka message queue triggers to the Fission project.

New Kafka-specific RBAC rules were defined in charts/fission-all/templates/_fission-component-roles.tpl and charts/fission-all/templates/_fission-kubernetes-roles.tpl to grant necessary permissions for Kafka operations. These rules were integrated into the role generator templates to ensure they are applied when Kafka is enabled.

A new deployment configuration for Kafka message queue triggers was added in charts/fission-all/templates/mqt-fission-kafka/deployment.yaml, which includes settings for Kafka brokers, TLS authentication, and environment variables. Additionally, a PodMonitor resource was introduced in charts/fission-all/templates/mqt-fission-kafka/podmonitor.yaml to enable Prometheus monitoring for Kafka pods.

The main application (cmd/fission-bundle/main.go) was updated to include a new runMessageQueueMgr function, which starts the Kafka message queue manager. Command-line options and usage documentation were also updated to support the new --mqt flag for starting the message queue trigger manager.

A new package mqtrigger was introduced, including the Start function in cmd/fission-bundle/mqtrigger/mqtrigger.go to initialize the message queue trigger manager. The factory package was created to register and create message queue instances, and the kafka package was added to handle Kafka-specific consumer group sessions and message processing.

The go.mod and go.sum files were updated to include dependencies for Kafka, such as github.com/IBM/sarama and other related libraries. New metrics for message queue triggers were introduced in pkg/mqtrigger/metrics.go to track subscription counts, message counts, and message lag.

Unit tests for the message queue trigger manager were added in pkg/mqtrigger/mqtmanager_test.go to ensure the correct handling of trigger subscriptions and message processing.


Sweep Found These Issues

charts/fission-all/templates/_fission-component-roles.tpl
  • The new "kafka-rules" block grants extensive permissions (create, get, list, watch, update, patch, delete) to resources, which could potentially lead to security risks if not properly scoped or monitored.
  • {{- define "kafka-rules" }}
    rules:
    - apiGroups:
    - fission.io
    resources:
    - environments
    - functions
    - messagequeuetriggers
    - packages
    verbs:
    - create
    - get
    - list
    - watch
    - update
    - patch
    - delete

    View Diff

charts/fission-all/templates/_fission-kubernetes-roles.tpl
  • The new RBAC rules for Kafka allow extensive permissions, including create, delete, and patch actions on critical resources like configmaps, pods, and secrets, which could pose a security risk if not properly controlled.
  • {{- define "kafka-kuberules" }}
    rules:
    - apiGroups:
    - ""
    resources:
    - configmaps
    - pods
    - secrets
    - services
    - replicationcontrollers
    - events
    verbs:
    - create
    - delete
    - get
    - list
    - watch
    - patch
    - apiGroups:
    - ""
    resources:
    - configmaps
    - secrets
    verbs:
    - get
    - apiGroups:
    - apps
    resources:
    - deployments
    - deployments/scale
    - replicasets
    verbs:
    - create
    - get
    - list
    - watch
    - update
    - patch
    - delete
    - apiGroups:
    - apiextensions.k8s.io
    resources:
    - customresourcedefinitions
    verbs:
    - get
    - list
    - watch
    {{- end }}

    View Diff

  • Sweep has identified a redundant function: The new function "kafka-kuberules" is redundant as its purpose and functionality are already covered by existing methods like "executor-kuberules" and "kubewatcher-kuberules".
  • {{- define "kafka-kuberules" }}
    rules:
    - apiGroups:
    - ""
    resources:
    - configmaps
    - pods
    - secrets
    - services
    - replicationcontrollers
    - events
    verbs:
    - create
    - delete
    - get
    - list
    - watch
    - patch
    - apiGroups:
    - ""
    resources:
    - configmaps
    - secrets
    verbs:
    - get
    - apiGroups:
    - apps
    resources:
    - deployments
    - deployments/scale
    - replicasets
    verbs:
    - create
    - get
    - list
    - watch
    - update
    - patch
    - delete
    - apiGroups:
    - apiextensions.k8s.io
    resources:
    - customresourcedefinitions
    verbs:
    - get
    - list
    - watch
    {{- end }}

    View Diff

charts/fission-all/templates/_fission-kuberntes-role-generator.tpl

charts/fission-all/templates/_fission-role-generator.tpl

charts/fission-all/templates/mqt-fission-kafka/deployment.yaml
  • The deployment will fail if any of the required TLS files (CA certificate, user certificate, or user key) are missing, as the chart will fail with an error message.
  • {{- if .Files.Get (printf "%s" .Values.kafka.authentication.tls.caCert) }}
    caCert: {{ .Files.Get (printf "%s" .Values.kafka.authentication.tls.caCert) | b64enc }}
    {{- else }}
    {{ fail "Invalid chart. CA Certificate not found." }}
    {{- end }}
    {{- if .Files.Get (printf "%s" .Values.kafka.authentication.tls.userCert) }}
    userCert: {{ .Files.Get (printf "%s" .Values.kafka.authentication.tls.userCert) | b64enc }}
    {{- else }}
    {{ fail "Invalid chart. User Certificate not found." }}
    {{- end }}
    {{- if .Files.Get (printf "%s" .Values.kafka.authentication.tls.userKey) }}
    userKey: {{ .Files.Get (printf "%s" .Values.kafka.authentication.tls.userKey) | b64enc }}
    {{- else }}
    {{ fail "Invalid chart. User Key not found." }}
    {{- end }}

    View Diff

charts/fission-all/templates/mqt-fission-kafka/role-fission-cr.yaml
  • The code does not handle the case where .Values.additionalFissionNamespaces is not a list, which could lead to runtime errors.
  • {{- if gt (len .Values.additionalFissionNamespaces) 0 }}
    {{- range $namespace := $.Values.additionalFissionNamespaces }}
    {{ include "fission-role-generator" (merge (dict "namespace" $namespace "component" "kafka") $) }}
    {{- end }}
    {{- end }}

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because it follows the same pattern as existing methods that include the "fission-role-generator" template with different components. For example, the function in 'charts/fission-all/templates/executor/role-fission-cr.yaml' serves a similar purpose.
  • {{- include "fission-role-generator" (merge (dict "namespace" .Values.defaultNamespace "component" "kafka") .) }}

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because its purpose and functionality are already covered by existing methods in the code snippets, such as those in 'charts/fission-all/templates/kubewatcher/role-fission-cr.yaml' and 'charts/fission-all/templates/mqt-keda/role-fission-cr.yaml'.
  • {{- if gt (len .Values.additionalFissionNamespaces) 0 }}
    {{- range $namespace := $.Values.additionalFissionNamespaces }}
    {{ include "fission-role-generator" (merge (dict "namespace" $namespace "component" "kafka") $) }}
    {{- end }}
    {{- end }}

    View Diff

charts/fission-all/templates/mqt-fission-kafka/role-kubernetes.yaml

cmd/fission-bundle/main.go
  • The new runMessageQueueMgr function does not include any specific error handling or logging within the mqtrigger.Start call, which could make debugging difficult if the function fails.
  • func runMessageQueueMgr(ctx context.Context, clientGen crd.ClientGeneratorInterface, logger *zap.Logger, mgr manager.Interface, routerUrl string) error {
    return mqtrigger.Start(ctx, clientGen, logger, mgr, routerUrl)
    }

    View Diff

  • Sweep has identified a redundant function: The new function runMessageQueueMgr is redundant because its purpose and functionality are already covered by the existing runMQManager function.
  • func runMessageQueueMgr(ctx context.Context, clientGen crd.ClientGeneratorInterface, logger *zap.Logger, mgr manager.Interface, routerUrl string) error {
    return mqtrigger.Start(ctx, clientGen, logger, mgr, routerUrl)
    }

    View Diff

cmd/fission-bundle/mqtrigger/mqtrigger.go
  • The readSecrets function returns an error if the secrets directory does not exist, which could cause the Start function to fail even if secrets are optional.
  • // return if no secrets exist
    if _, err := os.Stat(secretsPath); os.IsNotExist(err) {
    return nil, err

    View Diff

  • The Start function calls logger.Fatal on failure to connect to the message queue server, which will terminate the application instead of allowing for graceful error handling.
  • if err != nil {
    logger.Fatal("failed to connect to remote message queue server", zap.Error(err))

    View Diff

  • Sweep has identified a redundant function: The new function is redundant as its purpose and functionality are already covered by existing methods such as Start in pkg/timer/main.go, pkg/buildermgr/buildermgr.go, pkg/kubewatcher/main.go, and StartScalerManager in pkg/mqtrigger/scalermanager.go.
  • func Start(ctx context.Context, clientGen crd.ClientGeneratorInterface, logger *zap.Logger, mgr manager.Interface, routerUrl string) error {
    fissionClient, err := clientGen.GetFissionClient()
    if err != nil {
    return errors.Wrap(err, "failed to get fission client")
    }
    err = crd.WaitForFunctionCRDs(ctx, logger, fissionClient)
    if err != nil {
    return errors.Wrap(err, "error waiting for CRDs")
    }
    mqType := (fv1.MessageQueueType)(os.Getenv("MESSAGE_QUEUE_TYPE"))
    mqUrl := os.Getenv("MESSAGE_QUEUE_URL")
    secretsPath := strings.TrimSpace(os.Getenv("MESSAGE_QUEUE_SECRETS"))
    var secrets map[string][]byte
    if len(secretsPath) > 0 {
    // For authentication with message queue
    secrets, err = readSecrets(logger, secretsPath)
    if err != nil {
    return err
    }
    }
    mq, err := factory.Create(
    logger,
    mqType,
    messageQueue.Config{
    MQType: (string)(mqType),
    Url: mqUrl,
    Secrets: secrets,
    },
    routerUrl,
    )
    if err != nil {
    logger.Fatal("failed to connect to remote message queue server", zap.Error(err))
    }
    mqtMgr := mqtrigger.MakeMessageQueueTriggerManager(logger, fissionClient, mqType, mq)
    err = mqtMgr.Run(ctx, mgr)
    if err != nil {
    return err
    }
    return nil
    }

    View Diff

go.sum
  • The addition of multiple github.com/jcmturner libraries introduces several new dependencies that may have compatibility issues or introduce security vulnerabilities, especially with cryptographic and authentication functionalities.
  • fission/go.sum

    Lines 257 to 284 in 65aa6ec

    github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
    github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
    github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
    github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
    github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
    github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
    github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
    github.com/huandu/xstrings v1.3.3 h1:/Gcsuc1x8JVbJ9/rlye4xZnVAbEkGauT8lbebqcQws4=
    github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
    github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA=
    github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
    github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
    github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
    github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
    github.com/influxdata/influxdb v1.11.5 h1:+em5VOl6lhAZubXj5o6SobCwvrRs3XDlBx/MUI4schI=
    github.com/influxdata/influxdb v1.11.5/go.mod h1:k8sWREQl1/9t46VrkrH5adUM4UNGIt206ipO3plbkw8=
    github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A=
    github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
    github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
    github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
    github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
    github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
    github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg=
    github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo=
    github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o=
    github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
    github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8=
    github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=

    View Diff

pkg/apis/core/v1/validation.go
  • The change to validator.IsValidTopic introduces a potential issue if the validator.IsValidTopic function does not handle the additional MessageQueueType parameter correctly, which could lead to validation failures.
  • if !validator.IsValidTopic((string)(spec.MessageQueueType), spec.Topic, spec.MqtKind) {
    result = multierror.Append(result, MakeValidationErr(ErrorInvalidValue, "MessageQueueTriggerSpec.Topic", spec.Topic, "not a valid topic"))
    }
    if len(spec.ResponseTopic) > 0 && !validator.IsValidTopic((string)(spec.MessageQueueType), spec.ResponseTopic, spec.MqtKind) {
    result = multierror.Append(result, MakeValidationErr(ErrorInvalidValue, "MessageQueueTriggerSpec.ResponseTopic", spec.ResponseTopic, "not a valid topic"))

    View Diff

pkg/mqtrigger/factory/factory.go
  • The panic messages in the Register function could cause the application to crash if a nil factory is registered or if a duplicate registration occurs.
  • panic("Nil message queue factory")
    }
    _, registered := messageQueueFactories[mqType]
    if registered {
    panic("Message queue factory already register")

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because the Register function in the pkg/mqtrigger/validator/validator.go file already performs similar tasks of registering with thread safety.
  • func Register(mqType fv1.MessageQueueType, factory MessageQueueFactory) {
    lock.Lock()
    defer lock.Unlock()
    if factory == nil {
    panic("Nil message queue factory")
    }
    _, registered := messageQueueFactories[mqType]
    if registered {
    panic("Message queue factory already register")
    }
    messageQueueFactories[mqType] = factory
    }

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because its functionality is already covered by the existing Create method in the factory package.
  • func Create(logger *zap.Logger, mqType fv1.MessageQueueType, mqConfig messageQueue.Config, routerUrl string) (messageQueue.MessageQueue, error) {
    factory, registered := messageQueueFactories[mqType]
    if !registered {
    return nil, errors.Errorf("no supported message queue type found for %q", mqType)
    }
    return factory.Create(logger, mqConfig, routerUrl)
    }

    View Diff

pkg/mqtrigger/messageQueue/kafka/consumer.go
  • The ConsumeClaim method does not handle the case where claim.Messages() returns nil, which could lead to a nil pointer dereference.
  • case msg := <-claim.Messages():
    if msg != nil {

    View Diff

  • The kafkaMsgHandler method retries HTTP requests without any delay, which could lead to rapid retry loops and potential rate limiting issues.
  • for attempt := 0; attempt <= ch.trigger.Spec.MaxRetries; attempt++ {
    // Make the request
    resp, err = http.DefaultClient.Do(req)
    if err != nil {
    ch.logger.Error("sending function invocation request failed",
    zap.Error(err),
    zap.String("function_url", ch.fnUrl),
    zap.String("trigger", ch.trigger.ObjectMeta.Name))
    continue
    }
    if resp == nil {
    continue
    }
    if err == nil && resp.StatusCode == http.StatusOK {
    // Success, quit retrying
    break
    }
    }

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because its purpose, functionality, and data processing steps are already covered by the existing NewMqtConsumerGroupHandler function in pkg/mqtrigger/messageQueue/kafka/kafka.go.
  • func NewMqtConsumerGroupHandler(version sarama.KafkaVersion,
    logger *zap.Logger,
    trigger *fv1.MessageQueueTrigger,
    producer sarama.SyncProducer,
    routerUrl string) MqtConsumerGroupHandler {
    ch := MqtConsumerGroupHandler{
    version: version,
    logger: logger,
    trigger: trigger,
    producer: producer,
    ready: make(chan bool),
    }
    // Support other function ref types
    if ch.trigger.Spec.FunctionReference.Type != fv1.FunctionReferenceTypeFunctionName {
    ch.logger.Fatal("unsupported function reference type for trigger",
    zap.Any("function_reference_type", ch.trigger.Spec.FunctionReference.Type),
    zap.String("trigger", ch.trigger.ObjectMeta.Name))
    }
    // Generate the Headers
    ch.fissionHeaders = map[string]string{
    "X-Fission-MQTrigger-Topic": ch.trigger.Spec.Topic,
    "X-Fission-MQTrigger-RespTopic": ch.trigger.Spec.ResponseTopic,
    "X-Fission-MQTrigger-ErrorTopic": ch.trigger.Spec.ErrorTopic,
    "Content-Type": ch.trigger.Spec.ContentType,
    }
    ch.fnUrl = routerUrl + "/" + strings.TrimPrefix(utils.UrlForFunction(ch.trigger.Spec.FunctionReference.Name, ch.trigger.ObjectMeta.Namespace), "/")
    ch.logger.Debug("function HTTP URL", zap.String("url", ch.fnUrl))
    return ch

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because the existing Cleanup method in the MqtConsumerGroupHandler struct already performs the same logging tasks.
  • func (ch MqtConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
    ch.logger.With(
    zap.String("trigger", ch.trigger.ObjectMeta.Name),
    zap.String("topic", ch.trigger.Spec.Topic),
    zap.String("memberID", session.MemberID()),
    zap.Int32("generationID", session.GenerationID()),
    zap.String("claims", fmt.Sprintf("%v", session.Claims())),
    ).Info("consumer group session cleanup")
    return nil

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because its functionality is already covered by the existing ConsumeClaims method in the MqtConsumerGroupHandler struct.
  • func (ch MqtConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    trigger := ch.trigger.Name
    triggerNamespace := ch.trigger.Namespace
    topic := claim.Topic()
    partition := string(claim.Partition())
    // initially set message lag count
    mqtrigger.SetMessageLagCount(trigger, triggerNamespace, topic, partition, claim.HighWaterMarkOffset()-claim.InitialOffset())
    // Do not move the code below to a goroutine.
    // The `ConsumeClaim` itself is called within a goroutine
    for {
    select {
    case msg := <-claim.Messages():
    if msg != nil {
    ch.kafkaMsgHandler(msg)
    session.MarkMessage(msg, "")
    mqtrigger.IncreaseMessageCount(trigger, triggerNamespace)
    }
    mqtrigger.SetMessageLagCount(trigger, triggerNamespace, topic, partition,
    claim.HighWaterMarkOffset()-msg.Offset-1)
    // Should return when `session.Context()` is done.
    case <-session.Context().Done():
    return nil
    }
    }
    }

    View Diff

  • Sweep has identified a redundant function: The new function kafkaMsgHandler is redundant because its purpose and functionality are already covered by the existing kafkaMsgHandler method in pkg/mqtrigger/messageQueue/kafka/consumer.go.
  • func (ch *MqtConsumerGroupHandler) kafkaMsgHandler(msg *sarama.ConsumerMessage) {
    value := string(msg.Value)
    // Create request
    req, err := http.NewRequest("POST", ch.fnUrl, strings.NewReader(value))
    if err != nil {
    ch.logger.Error("failed to create HTTP request to invoke function",
    zap.Error(err),
    zap.String("function_url", ch.fnUrl))
    return
    }
    // Set the headers came from Kafka record
    // Using Header.Add() as msg.Headers may have keys with more than one value
    if ch.version.IsAtLeast(sarama.V0_11_0_0) {
    for _, h := range msg.Headers {
    req.Header.Add(string(h.Key), string(h.Value))
    }
    } else {
    ch.logger.Warn("headers are not supported by current Kafka version, needs v0.11+: no record headers to add in HTTP request",
    zap.Any("current_version", ch.version))
    }
    for k, v := range ch.fissionHeaders {
    req.Header.Set(k, v)
    }
    // Make the request
    var resp *http.Response
    for attempt := 0; attempt <= ch.trigger.Spec.MaxRetries; attempt++ {
    // Make the request
    resp, err = http.DefaultClient.Do(req)
    if err != nil {
    ch.logger.Error("sending function invocation request failed",
    zap.Error(err),
    zap.String("function_url", ch.fnUrl),
    zap.String("trigger", ch.trigger.ObjectMeta.Name))
    continue
    }
    if resp == nil {
    continue
    }
    if err == nil && resp.StatusCode == http.StatusOK {
    // Success, quit retrying
    break
    }
    }
    generateErrorHeaders := func(errString string) []sarama.RecordHeader {
    var errorHeaders []sarama.RecordHeader
    if ch.version.IsAtLeast(sarama.V0_11_0_0) {
    if count, ok := errorMessageMap[errString]; ok {
    errorMessageMap[errString] = count + 1
    } else {
    errorMessageMap[errString] = 1
    }
    errorHeaders = append(errorHeaders, sarama.RecordHeader{Key: []byte("MessageSource"), Value: []byte(ch.trigger.Spec.Topic)})
    errorHeaders = append(errorHeaders, sarama.RecordHeader{Key: []byte("RecycleCounter"), Value: []byte(strconv.Itoa(errorMessageMap[errString]))})
    }
    return errorHeaders
    }
    if resp == nil {
    errorString := fmt.Sprintf("request exceed retries: %v", ch.trigger.Spec.MaxRetries)
    errorHeaders := generateErrorHeaders(errorString)
    errorHandler(ch.logger, ch.trigger, ch.producer, ch.fnUrl,
    fmt.Errorf(errorString), errorHeaders)
    return
    }
    defer resp.Body.Close()
    body, err := io.ReadAll(resp.Body)
    ch.logger.Debug("got response from function invocation",
    zap.String("function_url", ch.fnUrl),
    zap.String("trigger", ch.trigger.ObjectMeta.Name),
    zap.String("body", string(body)))
    if err != nil {
    errorString := "request body error: " + string(body)
    errorHeaders := generateErrorHeaders(errorString)
    errorHandler(ch.logger, ch.trigger, ch.producer, ch.fnUrl,
    errors.Wrapf(err, errorString), errorHeaders)
    return
    }
    if resp.StatusCode != 200 {
    errorString := fmt.Sprintf("request returned failure: %v, request body error: %v", resp.StatusCode, body)
    errorHeaders := generateErrorHeaders(errorString)
    errorHandler(ch.logger, ch.trigger, ch.producer, ch.fnUrl,
    fmt.Errorf("request returned failure: %v", resp.StatusCode), errorHeaders)
    return
    }
    if len(ch.trigger.Spec.ResponseTopic) > 0 {
    // Generate Kafka record headers
    var kafkaRecordHeaders []sarama.RecordHeader
    if ch.version.IsAtLeast(sarama.V0_11_0_0) {
    for k, v := range resp.Header {
    // One key may have multiple values
    for _, v := range v {
    kafkaRecordHeaders = append(kafkaRecordHeaders, sarama.RecordHeader{Key: []byte(k), Value: []byte(v)})
    }
    }
    } else {
    ch.logger.Warn("headers are not supported by current Kafka version, needs v0.11+: no record headers to add in HTTP request",
    zap.Any("current_version", ch.version))
    }
    _, _, err := ch.producer.SendMessage(&sarama.ProducerMessage{
    Topic: ch.trigger.Spec.ResponseTopic,
    Value: sarama.StringEncoder(body),
    Headers: kafkaRecordHeaders,
    })
    if err != nil {
    ch.logger.Warn("failed to publish response body from function invocation to topic",
    zap.Error(err),
    zap.String("topic", ch.trigger.Spec.Topic),
    zap.String("function_url", ch.fnUrl))
    return
    }
    }
    }

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because its functionality is already covered by the existing errorHandler function in pkg/mqtrigger/messageQueue/kafka/consumer.go.
  • func errorHandler(logger *zap.Logger, trigger *fv1.MessageQueueTrigger, producer sarama.SyncProducer, funcUrl string, err error, errorTopicHeaders []sarama.RecordHeader) {
    if len(trigger.Spec.ErrorTopic) > 0 {
    _, _, e := producer.SendMessage(&sarama.ProducerMessage{
    Topic: trigger.Spec.ErrorTopic,
    Value: sarama.StringEncoder(err.Error()),
    Headers: errorTopicHeaders,
    })
    if e != nil {
    logger.Error("failed to publish message to error topic",
    zap.Error(e),
    zap.String("trigger", trigger.ObjectMeta.Name),
    zap.String("message", err.Error()),
    zap.String("topic", trigger.Spec.Topic))
    }
    } else {
    logger.Error("message received to publish to error topic, but no error topic was set",
    zap.String("message", err.Error()), zap.String("trigger", trigger.ObjectMeta.Name), zap.String("function_url", funcUrl))
    }
    }

    View Diff

pkg/mqtrigger/messageQueue/kafka/kafka.go
  • The Subscribe method does not handle the case where the consumer group fails to consume messages, potentially leading to an infinite loop.
  • for {
    // Consume messages
    err := consumer.Consume(ctx, topic, ch)
    if err != nil {
    kafka.logger.Error("consumer error", zap.Error(err), zap.String("trigger", trigger.ObjectMeta.Name))
    }
    if ctx.Err() != nil {
    kafka.logger.Info("consumer context cancelled", zap.String("trigger", trigger.ObjectMeta.Name))
    return
    }
    ch.ready = make(chan bool)
    }

    View Diff

  • The New function does not handle the case where the Kafka version parsing fails and defaults to an uninitialized kafkaVersion.
  • kafkaVersion, err := sarama.ParseKafkaVersion(mqKafkaVersion)
    if err != nil {
    logger.Warn("error parsing kafka version string - falling back to default",
    zap.Error(err),
    zap.String("failed_version", mqKafkaVersion),
    zap.Any("default_version", kafkaVersion))
    }

    View Diff

  • The getTLSConfig method does not check if the caCert is successfully appended to the caCertPool, which could lead to TLS configuration issues.
  • caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM(kafka.authKeys["caCert"])
    tlsConfig.RootCAs = caCertPool

    View Diff

  • Sweep has identified a redundant function: The new function duplicates the functionality of the Register function in pkg/mqtrigger/factory/factory.go and the IsTopicValid function in pkg/mqtrigger/messageQueue/kafka/kafka.go.
  • func init() {
    factory.Register(fv1.MessageQueueTypeKafka, &Factory{})
    validator.Register(fv1.MessageQueueTypeKafka, IsTopicValid)

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because its purpose and functionality are already covered by the existing Create method in the MessageQueueFactory interface and its implementations.
  • func (factory *Factory) Create(logger *zap.Logger, mqCfg messageQueue.Config, routerUrl string) (messageQueue.MessageQueue, error) {
    return New(logger, mqCfg, routerUrl)
    }

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because its purpose and functionality are already covered by the existing Create method in the Factory struct in pkg/mqtrigger/messageQueue/kafka/kafka.go.
  • func New(logger *zap.Logger, mqCfg messageQueue.Config, routerUrl string) (messageQueue.MessageQueue, error) {
    if len(routerUrl) == 0 || len(mqCfg.Url) == 0 {
    return nil, errors.New("the router URL or MQ URL is empty")
    }
    mqKafkaVersion := os.Getenv("MESSAGE_QUEUE_KAFKA_VERSION")
    // Parse version string
    kafkaVersion, err := sarama.ParseKafkaVersion(mqKafkaVersion)
    if err != nil {
    logger.Warn("error parsing kafka version string - falling back to default",
    zap.Error(err),
    zap.String("failed_version", mqKafkaVersion),
    zap.Any("default_version", kafkaVersion))
    }
    kafka := Kafka{
    logger: logger.Named("kafka"),
    routerUrl: routerUrl,
    brokers: strings.Split(mqCfg.Url, ","),
    version: kafkaVersion,
    }
    if tls, _ := strconv.ParseBool(os.Getenv("TLS_ENABLED")); tls {
    kafka.tls = true
    authKeys := make(map[string][]byte)
    if mqCfg.Secrets == nil {
    return nil, errors.New("no secrets were loaded")
    }
    authKeys["caCert"] = mqCfg.Secrets["caCert"]
    authKeys["userCert"] = mqCfg.Secrets["userCert"]
    authKeys["userKey"] = mqCfg.Secrets["userKey"]
    kafka.authKeys = authKeys
    }
    logger.Info("created kafka queue", zap.Any("kafka brokers", kafka.brokers),
    zap.Any("kafka version", kafka.version))
    // Create new config
    saramaConfig := sarama.NewConfig()
    saramaConfig.Version = kafka.version
    // consumer config
    saramaConfig.Consumer.Return.Errors = true
    // producer config
    saramaConfig.Producer.RequiredAcks = sarama.WaitForAll
    saramaConfig.Producer.Retry.Max = 10
    saramaConfig.Producer.Return.Successes = true
    // Setup TLS for both producer and consumer
    if kafka.tls {
    tlsConfig, err := kafka.getTLSConfig()
    if err != nil {
    return nil, err
    }
    saramaConfig.Net.TLS.Enable = true
    saramaConfig.Net.TLS.Config = tlsConfig
    }
    saramaClient, err := sarama.NewClient(kafka.brokers, saramaConfig)
    if err != nil {
    return nil, err
    }
    kafka.client = saramaClient
    return kafka, nil

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because its functionality is already covered by existing methods such as NewMqtConsumerGroupHandler, MqtConsumerGroupHandler.Setup, MqtConsumerGroupHandler.Cleanup, and MqtConsumerGroupHandler.ConsumeClaim.
  • func (kafka Kafka) Subscribe(trigger *fv1.MessageQueueTrigger) (messageQueue.Subscription, error) {
    kafka.logger.Debug("inside kakfa subscribe", zap.Any("trigger", trigger))
    kafka.logger.Debug("brokers set", zap.Strings("brokers", kafka.brokers))
    consumer, err := sarama.NewConsumerGroupFromClient(string(trigger.ObjectMeta.UID), kafka.client)
    if err != nil {
    return nil, err
    }
    producer, err := sarama.NewSyncProducerFromClient(kafka.client)
    if err != nil {
    return nil, err
    }
    kafka.logger.Info("created a new producer and a new consumer", zap.Strings("brokers", kafka.brokers),
    zap.String("topic", trigger.Spec.Topic),
    zap.String("response topic", trigger.Spec.ResponseTopic),
    zap.String("error topic", trigger.Spec.ErrorTopic),
    zap.String("trigger", trigger.ObjectMeta.Name),
    zap.String("function namespace", trigger.ObjectMeta.Namespace),
    zap.String("function name", trigger.Spec.FunctionReference.Name))
    // consume errors
    go func() {
    for err := range consumer.Errors() {
    kafka.logger.With(zap.String("trigger", trigger.ObjectMeta.Name), zap.String("topic", trigger.Spec.Topic)).Error("consumer error received", zap.Error(err))
    }
    }()
    ctx, cancel := context.WithCancel(context.Background())
    ch := NewMqtConsumerGroupHandler(kafka.version, kafka.logger, trigger, producer, kafka.routerUrl)
    // consume messages
    go func() {
    topic := []string{trigger.Spec.Topic}
    // Create a new session for the consumer group until the context is cancelled
    for {
    // Consume messages
    err := consumer.Consume(ctx, topic, ch)
    if err != nil {
    kafka.logger.Error("consumer error", zap.Error(err), zap.String("trigger", trigger.ObjectMeta.Name))
    }
    if ctx.Err() != nil {
    kafka.logger.Info("consumer context cancelled", zap.String("trigger", trigger.ObjectMeta.Name))
    return
    }
    ch.ready = make(chan bool)
    }
    }()
    <-ch.ready // wait for consumer to be ready
    mqtConsumer := MqtConsumer{
    ctx: ctx,
    cancel: cancel,
    consumer: consumer,
    }
    return mqtConsumer, nil

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because the existing getTLSConfig method in the Kafka struct already handles the same tasks and operations.
  • func (kafka Kafka) getTLSConfig() (*tls.Config, error) {
    tlsConfig := tls.Config{}
    cert, err := tls.X509KeyPair(kafka.authKeys["userCert"], kafka.authKeys["userKey"])
    if err != nil {
    return nil, err
    }
    tlsConfig.Certificates = []tls.Certificate{cert}
    skipVerify, err := strconv.ParseBool(os.Getenv("INSECURE_SKIP_VERIFY"))
    if err != nil {
    kafka.logger.Error("failed to parse value of env variable INSECURE_SKIP_VERIFY taking default value false, expected boolean value: true/false",
    zap.String("received", os.Getenv("INSECURE_SKIP_VERIFY")))
    } else {
    tlsConfig.InsecureSkipVerify = skipVerify
    }
    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM(kafka.authKeys["caCert"])
    tlsConfig.RootCAs = caCertPool
    return &tlsConfig, nil

    View Diff

  • Sweep has identified a redundant function: The new function is redundant as its purpose and functionality are already covered by the existing Unsubscribe methods in fakeMessageQueue and Kafka structs.
  • func (kafka Kafka) Unsubscribe(subscription messageQueue.Subscription) error {
    mqtConsumer := subscription.(MqtConsumer)
    mqtConsumer.cancel()
    return mqtConsumer.consumer.Close()

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because the existing IsTopicValid function in pkg/mqtrigger/messageQueue/kafka/kafka.go already performs the same validations.
  • func IsTopicValid(topic string) bool {
    if len(topic) == 0 {
    return false
    }
    if topic == "." || topic == ".." {
    return false
    }
    if len(topic) > 249 {
    return false
    }
    if !validKafkaTopicName.MatchString(topic) {
    return false
    }
    return true

    View Diff

pkg/mqtrigger/messageQueue/messageQueue.go

pkg/mqtrigger/metrics.go
  • Sweep has identified a redundant function: The new function IncreaseSubscriptionCount is redundant because the same functionality is already provided by the existing IncreaseSubscriptionCount function in the pkg/mqtrigger/mqtmanager.go file.
  • func IncreaseSubscriptionCount() {
    subscriptionCount.WithLabelValues().Inc()
    }

    View Diff

  • Sweep has identified a redundant function: The new function DecreaseSubscriptionCount is redundant because the existing DecreaseSubscriptionCount function in the pkg/mqtrigger/mqtmanager.go file already performs the same task.
  • func DecreaseSubscriptionCount() {
    subscriptionCount.WithLabelValues().Dec()
    }

    View Diff

  • Sweep has identified a redundant function: The new function SetMessageLagCount is redundant because its functionality is already implemented in the ConsumeClaim method of the MqtConsumerGroupHandler struct.
  • func SetMessageLagCount(trigname, trignamespace, topic, partition string, lag int64) {
    messageLagCount.WithLabelValues(trigname, trignamespace, topic, partition).Set(float64(lag))
    }

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because the init function in pkg/mqtrigger/metrics.go already registers the same metrics (subscriptionCount, messageCount, messageLagCount) with the metrics registry.
  • func init() {
    registry := metrics.Registry
    registry.MustRegister(subscriptionCount)
    registry.MustRegister(messageCount)
    registry.MustRegister(messageLagCount)
    }

    View Diff

pkg/mqtrigger/mqtmanager.go
  • The RegisterTrigger method does not handle the case where Subscribe returns an error but the subscription is not nil, which could lead to inconsistent state.
  • func (mqt *MessageQueueTriggerManager) RegisterTrigger(trigger *fv1.MessageQueueTrigger) {
    isPresent := mqt.checkTriggerSubscription(trigger)
    if isPresent {
    mqt.logger.Debug("message queue trigger already registered", zap.String("trigger_name", trigger.ObjectMeta.Name))
    return
    }
    // actually subscribe using the message queue client impl
    sub, err := mqt.messageQueue.Subscribe(trigger)
    if err != nil {
    mqt.logger.Warn("failed to subscribe to message queue trigger", zap.Error(err), zap.String("trigger_name", trigger.ObjectMeta.Name))
    return
    }
    if sub == nil {
    mqt.logger.Warn("subscription is nil", zap.String("trigger_name", trigger.ObjectMeta.Name))
    return
    }
    triggerSub := triggerSubscription{
    trigger: *trigger,
    subscription: sub,
    }
    // add to our list
    err = mqt.addTrigger(&triggerSub)
    if err != nil {
    mqt.logger.Fatal("adding message queue trigger failed", zap.Error(err), zap.String("trigger_name", trigger.ObjectMeta.Name))
    }
    mqt.logger.Info("message queue trigger created", zap.String("trigger_name", trigger.ObjectMeta.Name))
    }

    View Diff

  • The service method does not handle the case where reqChan is closed, which could lead to a panic.
  • func (mqt *MessageQueueTriggerManager) service() {
    for {
    req := <-mqt.reqChan
    resp := response{triggerSub: nil, err: nil}
    k, err := k8sCache.MetaNamespaceKeyFunc(&req.triggerSub.trigger)
    if err != nil {
    resp.err = err
    req.respChan <- resp
    continue
    }
    switch req.requestType {
    case ADD_TRIGGER:
    if _, ok := mqt.triggers[k]; ok {
    resp.err = errors.New("trigger already exists")
    } else {
    mqt.triggers[k] = req.triggerSub
    mqt.logger.Debug("set trigger subscription", zap.String("key", k))
    IncreaseSubscriptionCount()
    }
    req.respChan <- resp
    case GET_TRIGGER_SUBSCRIPTION:
    if _, ok := mqt.triggers[k]; !ok {
    resp.err = errors.New("trigger does not exist")
    } else {
    resp.triggerSub = mqt.triggers[k]
    }
    req.respChan <- resp
    case DELETE_TRIGGER:
    delete(mqt.triggers, k)
    mqt.logger.Debug("delete trigger", zap.String("key", k))
    DecreaseSubscriptionCount()
    req.respChan <- resp
    }
    }

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because the existing MakeMessageQueueTriggerManager function in pkg/mqtrigger/mqtmanager.go already performs the same initialization tasks.
  • func MakeMessageQueueTriggerManager(logger *zap.Logger,
    fissionClient versioned.Interface, mqType fv1.MessageQueueType, messageQueue messageQueue.MessageQueue) *MessageQueueTriggerManager {
    mqTriggerMgr := MessageQueueTriggerManager{
    logger: logger.Named("message_queue_trigger_manager"),
    reqChan: make(chan request),
    triggers: make(map[string]*triggerSubscription),
    fissionClient: fissionClient,
    messageQueueType: mqType,
    messageQueue: messageQueue,
    }
    return &mqTriggerMgr
    }

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because its tasks and purpose are already covered by StartScalerManager in pkg/mqtrigger/scalermanager.go and Run in pkg/timer/timerSync.go.
  • func (mqt *MessageQueueTriggerManager) Run(ctx context.Context, mgr manager.Interface) error {
    go mqt.service()
    for _, informer := range utils.GetInformersForNamespaces(mqt.fissionClient, time.Minute*30, fv1.MessageQueueResource) {
    _, err := informer.AddEventHandler(mqt.mqtInformerHandlers())
    if err != nil {
    return err
    }
    mgr.Add(ctx, func(ctx context.Context) {
    informer.Run(ctx.Done())
    })
    if ok := k8sCache.WaitForCacheSync(ctx.Done(), informer.HasSynced); !ok {
    mqt.logger.Fatal("failed to wait for caches to sync")
    }
    }
    mgr.Add(ctx, func(ctx context.Context) {
    metrics.ServeMetrics(ctx, "mqtrigger", mqt.logger, mgr)
    })
    return nil

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because its purpose and functionality are already covered by the existing service method in the MessageQueueTriggerManager struct.
  • func (mqt *MessageQueueTriggerManager) service() {
    for {
    req := <-mqt.reqChan
    resp := response{triggerSub: nil, err: nil}
    k, err := k8sCache.MetaNamespaceKeyFunc(&req.triggerSub.trigger)
    if err != nil {
    resp.err = err
    req.respChan <- resp
    continue
    }
    switch req.requestType {
    case ADD_TRIGGER:
    if _, ok := mqt.triggers[k]; ok {
    resp.err = errors.New("trigger already exists")
    } else {
    mqt.triggers[k] = req.triggerSub
    mqt.logger.Debug("set trigger subscription", zap.String("key", k))
    IncreaseSubscriptionCount()
    }
    req.respChan <- resp
    case GET_TRIGGER_SUBSCRIPTION:
    if _, ok := mqt.triggers[k]; !ok {
    resp.err = errors.New("trigger does not exist")
    } else {
    resp.triggerSub = mqt.triggers[k]
    }
    req.respChan <- resp
    case DELETE_TRIGGER:
    delete(mqt.triggers, k)
    mqt.logger.Debug("delete trigger", zap.String("key", k))
    DecreaseSubscriptionCount()
    req.respChan <- resp
    }
    }
    }

    View Diff

  • Sweep has identified a redundant function: The new function addTrigger is redundant because its functionality is already covered by the existing addTrigger method in the MessageQueueTriggerManager class.
  • func (mqt *MessageQueueTriggerManager) addTrigger(triggerSub *triggerSubscription) error {
    resp := mqt.makeRequest(ADD_TRIGGER, triggerSub)
    return resp.err

    View Diff

  • Sweep has identified a redundant function: The new function getTriggerSubscription is redundant because its functionality is already covered by the existing checkTriggerSubscription method, which internally calls getTriggerSubscription.
  • func (mqt *MessageQueueTriggerManager) getTriggerSubscription(trigger *fv1.MessageQueueTrigger) *triggerSubscription {
    resp := mqt.makeRequest(GET_TRIGGER_SUBSCRIPTION, &triggerSubscription{trigger: *trigger})
    return resp.triggerSub

    View Diff

  • Sweep has identified a redundant function: The new function checkTriggerSubscription is redundant because its purpose and functionality are already covered by the existing method getTriggerSubscription. It does not provide any unique contributions.
  • func (mqt *MessageQueueTriggerManager) checkTriggerSubscription(trigger *fv1.MessageQueueTrigger) bool {
    return mqt.getTriggerSubscription(trigger) != nil
    }

    View Diff

  • Sweep has identified a redundant function: The new function delTriggerSubscription is redundant because its functionality is already covered by the service method, which handles DELETE_TRIGGER requests.
  • func (mqt *MessageQueueTriggerManager) delTriggerSubscription(trigger *fv1.MessageQueueTrigger) error {
    resp := mqt.makeRequest(DELETE_TRIGGER, &triggerSubscription{trigger: *trigger})
    return resp.err

    View Diff

  • Sweep has identified a redundant function: The new function mqtInformerHandlers is redundant as its functionality is already covered by the existing mqTriggerEventHandlers function in scalermanager.go.
  • func (mqt *MessageQueueTriggerManager) mqtInformerHandlers() k8sCache.ResourceEventHandlerFuncs {
    return k8sCache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
    trigger := obj.(*fv1.MessageQueueTrigger)
    mqt.logger.Debug("Added mqt", zap.Any("trigger: ", trigger.ObjectMeta))
    mqt.RegisterTrigger(trigger)
    },
    DeleteFunc: func(obj interface{}) {
    trigger := obj.(*fv1.MessageQueueTrigger)
    mqt.logger.Debug("Delete mqt", zap.Any("trigger: ", trigger.ObjectMeta))
    triggerSubscription := mqt.getTriggerSubscription(trigger)
    if triggerSubscription == nil {
    mqt.logger.Info("Unsubscribe failed", zap.String("trigger_name", trigger.ObjectMeta.Name))
    return
    }
    err := mqt.messageQueue.Unsubscribe(triggerSubscription.subscription)
    if err != nil {
    mqt.logger.Warn("failed to unsubscribe from message queue trigger", zap.Error(err), zap.String("trigger_name", trigger.ObjectMeta.Name))
    return
    }
    err = mqt.delTriggerSubscription(trigger)
    if err != nil {
    mqt.logger.Warn("deleting message queue trigger failed", zap.Error(err), zap.String("trigger_name", trigger.ObjectMeta.Name))
    }
    mqt.logger.Info("message queue trigger deleted", zap.String("trigger_name", trigger.ObjectMeta.Name))
    },
    UpdateFunc: func(oldObj interface{}, newObj interface{}) {
    trigger := newObj.(*fv1.MessageQueueTrigger)
    mqt.logger.Debug("Updated mqt", zap.Any("trigger: ", trigger.ObjectMeta))
    mqt.RegisterTrigger(trigger)
    },
    }

    View Diff

pkg/mqtrigger/mqtmanager_test.go
  • The test function does not wait for the mgr.service() goroutine to complete, which could lead to race conditions or incomplete test execution.

  • View Diff

  • Sweep has identified a redundant function: The new function is redundant because its functionality is already covered by the Subscribe method in the Kafka struct found in pkg/mqtrigger/messageQueue/kafka/kafka.go.
  • func (f fakeMessageQueue) Subscribe(trigger *fv1.MessageQueueTrigger) (messageQueue.Subscription, error) {
    ctx, cancel := context.WithCancel(context.Background())
    mqtConsumer := mqtConsumer{
    ctx: ctx,
    cancel: cancel,
    }
    return mqtConsumer, nil
    }

    View Diff

  • Sweep has identified a redundant function: The new function is redundant because its purpose and functionality are already covered by the existing Unsubscribe method in the Kafka struct.
  • func (f fakeMessageQueue) Unsubscribe(triggerSub messageQueue.Subscription) error {
    sub := triggerSub.(mqtConsumer)
    sub.cancel()
    return nil
    }

    View Diff

  • Sweep has identified a redundant function: The new function TestMqtManager is redundant as its purpose and functionality are already covered by existing test methods in the pkg/mqtrigger/mqtmanager_test.go file.
  • func TestMqtManager(t *testing.T) {
    logger := loggerfactory.GetLogger()
    defer logger.Sync()
    msgQueue := fakeMessageQueue{}
    mgr := MakeMessageQueueTriggerManager(logger, nil, fv1.MessageQueueTypeKafka, msgQueue)
    go mgr.service()
    trigger := fv1.MessageQueueTrigger{
    ObjectMeta: metav1.ObjectMeta{
    Name: "test",
    Namespace: "default",
    },
    }
    if mgr.checkTriggerSubscription(&trigger) {
    t.Errorf("checkTrigger should return false")
    }
    sub, err := msgQueue.Subscribe(&trigger)
    if err != nil {
    t.Errorf("Subscribe should not return error")
    }
    triggerSub := triggerSubscription{
    trigger: trigger,
    subscription: sub,
    }
    err = mgr.addTrigger(&triggerSub)
    if err != nil {
    t.Errorf("addTrigger should not return error")
    }
    if !mgr.checkTriggerSubscription(&trigger) {
    t.Errorf("checkTrigger should return true")
    }
    getSub := mgr.getTriggerSubscription(&trigger)
    if getSub == nil {
    t.Fatal("getTriggerSubscription should return triggerSub")
    }
    if getSub.trigger.ObjectMeta.Name != trigger.ObjectMeta.Name {
    t.Errorf("getTriggerSubscription should return triggerSub with trigger name %s", trigger.ObjectMeta.Name)
    }
    getSub.subscription.(mqtConsumer).cancel()
    err = mgr.delTriggerSubscription(&trigger)
    if err != nil {
    t.Errorf("delTriggerSubscription should not return error")
    }
    if mgr.checkTriggerSubscription(&trigger) {
    t.Errorf("checkTrigger should return false")
    }
    }

    View Diff

pkg/mqtrigger/validator/validator.go
  • The Register function panics if a validator is nil or already registered, which could cause the application to crash if not handled properly.
  • func Register(mqType string, validator TopicValidator) {
    lock.Lock()
    defer lock.Unlock()
    if validator == nil {
    panic("Nil message queue topic validator")
    }
    _, registered := topicValidators[mqType]
    if registered {
    panic("Message queue topic validator already register")
    }
    topicValidators[mqType] = validator
    }

    View Diff

  • The IsValidTopic function does not handle the case where mqtKind is neither "keda" nor a registered mqType, potentially leading to unexpected behavior.
  • func IsValidTopic(mqType, topic, mqtKind string) bool {
    if mqtKind == "keda" {
    return true
    }
    validator, registered := topicValidators[mqType]
    if !registered {
    return false
    }
    return validator(topic)
    }

    View Diff

  • Sweep has identified a redundant function: The new function is redundant as its purpose and functionality are already covered by the existing Register function in pkg/mqtrigger/factory/factory.go.
  • func Register(mqType string, validator TopicValidator) {
    lock.Lock()
    defer lock.Unlock()
    if validator == nil {
    panic("Nil message queue topic validator")
    }
    _, registered := topicValidators[mqType]
    if registered {
    panic("Message queue topic validator already register")
    }
    topicValidators[mqType] = validator
    }

    View Diff


Potential Issues

Sweep is unsure if these are issues, but they might be worth checking out.

charts/fission-all/templates/_fission-kuberntes-role-generator.tpl

charts/fission-all/templates/mqt-fission-kafka/podmonitor.yaml

charts/fission-all/templates/mqt-fission-kafka/serviceaccount.yaml

charts/fission-all/values.yaml
  • The insecureSkipVerify field in the TLS configuration, if set to true, could make the system susceptible to man-in-the-middle attacks.
  • enabled: false
    ## InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name.
    ## Warning: Setting this to true, makes TLS susceptible to man-in-the-middle attacks
    ##
    insecureSkipVerify: false

    View Diff

go.mod
  • The inclusion of multiple indirect dependencies from github.com/jcmturner libraries may introduce security vulnerabilities if these libraries are not properly vetted for security issues.
  • fission/go.mod

    Lines 117 to 121 in 65aa6ec

    github.com/jcmturner/aescts/v2 v2.0.0 // indirect
    github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
    github.com/jcmturner/gofork v1.7.6 // indirect
    github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
    github.com/jcmturner/rpc/v2 v2.0.3 // indirect

    View Diff

go.sum
  • The addition of github.com/IBM/sarama introduces a new dependency that may have compatibility issues with existing Kafka client implementations or configurations.
  • fission/go.sum

    Lines 17 to 21 in 65aa6ec

    github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc=
    github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
    github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
    github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ=
    github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=

    View Diff

pkg/mqtrigger/metrics.go
  • The IncreaseSubscriptionCount and DecreaseSubscriptionCount functions do not use any labels, which may lead to issues if multiple instances of the metric are registered.
  • func IncreaseSubscriptionCount() {
    subscriptionCount.WithLabelValues().Inc()
    }
    func DecreaseSubscriptionCount() {
    subscriptionCount.WithLabelValues().Dec()

    View Diff

pkg/mqtrigger/mqtmanager.go
  • The RegisterTrigger method does not handle the case where Subscribe returns an error but the subscription is not nil, potentially leading to inconsistent state.
  • func (mqt *MessageQueueTriggerManager) RegisterTrigger(trigger *fv1.MessageQueueTrigger) {
    isPresent := mqt.checkTriggerSubscription(trigger)
    if isPresent {
    mqt.logger.Debug("message queue trigger already registered", zap.String("trigger_name", trigger.ObjectMeta.Name))
    return
    }
    // actually subscribe using the message queue client impl
    sub, err := mqt.messageQueue.Subscribe(trigger)
    if err != nil {
    mqt.logger.Warn("failed to subscribe to message queue trigger", zap.Error(err), zap.String("trigger_name", trigger.ObjectMeta.Name))
    return
    }
    if sub == nil {
    mqt.logger.Warn("subscription is nil", zap.String("trigger_name", trigger.ObjectMeta.Name))
    return
    }
    triggerSub := triggerSubscription{
    trigger: *trigger,
    subscription: sub,
    }
    // add to our list
    err = mqt.addTrigger(&triggerSub)
    if err != nil {
    mqt.logger.Fatal("adding message queue trigger failed", zap.Error(err), zap.String("trigger_name", trigger.ObjectMeta.Name))
    }
    mqt.logger.Info("message queue trigger created", zap.String("trigger_name", trigger.ObjectMeta.Name))
    }

    View Diff


The following file crds/v1/fission.io_messagequeuetriggers.yaml were not reviewed as they were deemed unsuitable for the following reason: None. If this is an error please let us know.

Signed-off-by: Md Soharab Ansari <soharab.ansari@infracloud.io>
@sanketsudake sanketsudake merged commit c126298 into main May 27, 2024
@sanketsudake sanketsudake deleted the revert-pr-2875 branch June 26, 2024 05:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants