-
Notifications
You must be signed in to change notification settings - Fork 789
Revert "Remove deprecated mqtrigger with kind fission (#2875)" #2946
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This reverts commit f44174d.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2946 +/- ##
==========================================
- Coverage 45.37% 44.57% -0.80%
==========================================
Files 230 236 +6
Lines 23619 24216 +597
==========================================
+ Hits 10717 10795 +78
- Misses 11506 12019 +513
- Partials 1396 1402 +6
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Sweep: PR ReviewAuthors of pull request: @soharab-ic This pull request added support for Kafka message queue triggers to the Fission project. New Kafka-specific RBAC rules were defined in A new deployment configuration for Kafka message queue triggers was added in The main application ( A new package The Unit tests for the message queue trigger manager were added in
|
{{- define "kafka-rules" }} | |
rules: | |
- apiGroups: | |
- fission.io | |
resources: | |
- environments | |
- functions | |
- messagequeuetriggers | |
- packages | |
verbs: | |
- create | |
- get | |
- list | |
- watch | |
- update | |
- patch | |
- delete |
View Diff
charts/fission-all/templates/_fission-kubernetes-roles.tpl
- The new RBAC rules for Kafka allow extensive permissions, including create, delete, and patch actions on critical resources like configmaps, pods, and secrets, which could pose a security risk if not properly controlled.
- Sweep has identified a redundant function: The new function "kafka-kuberules" is redundant as its purpose and functionality are already covered by existing methods like "executor-kuberules" and "kubewatcher-kuberules".
fission/charts/fission-all/templates/_fission-kubernetes-roles.tpl
Lines 207 to 254 in 65aa6ec
{{- define "kafka-kuberules" }} | |
rules: | |
- apiGroups: | |
- "" | |
resources: | |
- configmaps | |
- pods | |
- secrets | |
- services | |
- replicationcontrollers | |
- events | |
verbs: | |
- create | |
- delete | |
- get | |
- list | |
- watch | |
- patch | |
- apiGroups: | |
- "" | |
resources: | |
- configmaps | |
- secrets | |
verbs: | |
- get | |
- apiGroups: | |
- apps | |
resources: | |
- deployments | |
- deployments/scale | |
- replicasets | |
verbs: | |
- create | |
- get | |
- list | |
- watch | |
- update | |
- patch | |
- delete | |
- apiGroups: | |
- apiextensions.k8s.io | |
resources: | |
- customresourcedefinitions | |
verbs: | |
- get | |
- list | |
- watch | |
{{- end }} |
View Diff
fission/charts/fission-all/templates/_fission-kubernetes-roles.tpl
Lines 207 to 254 in 65aa6ec
{{- define "kafka-kuberules" }} | |
rules: | |
- apiGroups: | |
- "" | |
resources: | |
- configmaps | |
- pods | |
- secrets | |
- services | |
- replicationcontrollers | |
- events | |
verbs: | |
- create | |
- delete | |
- get | |
- list | |
- watch | |
- patch | |
- apiGroups: | |
- "" | |
resources: | |
- configmaps | |
- secrets | |
verbs: | |
- get | |
- apiGroups: | |
- apps | |
resources: | |
- deployments | |
- deployments/scale | |
- replicasets | |
verbs: | |
- create | |
- get | |
- list | |
- watch | |
- update | |
- patch | |
- delete | |
- apiGroups: | |
- apiextensions.k8s.io | |
resources: | |
- customresourcedefinitions | |
verbs: | |
- get | |
- list | |
- watch | |
{{- end }} |
View Diff
charts/fission-all/templates/_fission-kuberntes-role-generator.tpl
- Sweep has identified a redundant function: The new function is redundant because its purpose and functionality are already covered by the "kubernetes-role-generator" function, which includes "kafka-kuberules" when the component is "kafka".
fission/charts/fission-all/templates/_fission-kuberntes-role-generator.tpl
Lines 29 to 31 in 65aa6ec
{{- if eq "kafka" .component }} | |
{{- include "kafka-kuberules" . }} | |
{{- end }} |
View Diff
charts/fission-all/templates/_fission-role-generator.tpl
- Sweep has identified a redundant function: The new function is redundant as its purpose and functionality are already covered by existing functions like "kubernetes-role-generator" and "fission-role-generator".
fission/charts/fission-all/templates/_fission-role-generator.tpl
Lines 23 to 25 in 65aa6ec
{{- if eq "kafka" .component }} | |
{{- include "kafka-rules" . }} | |
{{- end }} |
View Diff
charts/fission-all/templates/mqt-fission-kafka/deployment.yaml
- The deployment will fail if any of the required TLS files (CA certificate, user certificate, or user key) are missing, as the chart will fail with an error message.
fission/charts/fission-all/templates/mqt-fission-kafka/deployment.yaml
Lines 89 to 103 in 65aa6ec
{{- if .Files.Get (printf "%s" .Values.kafka.authentication.tls.caCert) }} | |
caCert: {{ .Files.Get (printf "%s" .Values.kafka.authentication.tls.caCert) | b64enc }} | |
{{- else }} | |
{{ fail "Invalid chart. CA Certificate not found." }} | |
{{- end }} | |
{{- if .Files.Get (printf "%s" .Values.kafka.authentication.tls.userCert) }} | |
userCert: {{ .Files.Get (printf "%s" .Values.kafka.authentication.tls.userCert) | b64enc }} | |
{{- else }} | |
{{ fail "Invalid chart. User Certificate not found." }} | |
{{- end }} | |
{{- if .Files.Get (printf "%s" .Values.kafka.authentication.tls.userKey) }} | |
userKey: {{ .Files.Get (printf "%s" .Values.kafka.authentication.tls.userKey) | b64enc }} | |
{{- else }} | |
{{ fail "Invalid chart. User Key not found." }} | |
{{- end }} |
View Diff
charts/fission-all/templates/mqt-fission-kafka/role-fission-cr.yaml
- The code does not handle the case where
.Values.additionalFissionNamespaces
is not a list, which could lead to runtime errors. - Sweep has identified a redundant function: The new function is redundant because it follows the same pattern as existing methods that include the "fission-role-generator" template with different components. For example, the function in 'charts/fission-all/templates/executor/role-fission-cr.yaml' serves a similar purpose.
- Sweep has identified a redundant function: The new function is redundant because its purpose and functionality are already covered by existing methods in the code snippets, such as those in 'charts/fission-all/templates/kubewatcher/role-fission-cr.yaml' and 'charts/fission-all/templates/mqt-keda/role-fission-cr.yaml'.
{{- if gt (len .Values.additionalFissionNamespaces) 0 }} | |
{{- range $namespace := $.Values.additionalFissionNamespaces }} | |
{{ include "fission-role-generator" (merge (dict "namespace" $namespace "component" "kafka") $) }} | |
{{- end }} | |
{{- end }} |
View Diff
{{- include "fission-role-generator" (merge (dict "namespace" .Values.defaultNamespace "component" "kafka") .) }} |
View Diff
{{- if gt (len .Values.additionalFissionNamespaces) 0 }} | |
{{- range $namespace := $.Values.additionalFissionNamespaces }} | |
{{ include "fission-role-generator" (merge (dict "namespace" $namespace "component" "kafka") $) }} | |
{{- end }} | |
{{- end }} |
View Diff
charts/fission-all/templates/mqt-fission-kafka/role-kubernetes.yaml
- The code does not handle the case where
.Values.defaultNamespace
or.Values.additionalFissionNamespaces
are not defined, which could lead to runtime errors. - Sweep has identified a redundant function: The new function is redundant as it follows the same pattern as existing methods for other components like "executor", "buildermgr", "fluentbit", "kubewatcher", and "keda".
- Sweep has identified a redundant function: The new function is redundant as its purpose and functionality are already covered by existing methods in the code snippets.
- Sweep has identified a redundant function: The new function is redundant because the iteration over
$.Values.additionalFissionNamespaces
is already handled by existing methods in the code snippets. - Sweep has identified a redundant function: The new function is redundant because its purpose and functionality are already covered by the existing method in 'charts/fission-all/templates/mqt-fission-kafka/role-kubernetes.yaml'.
- Sweep has identified a redundant function: The new function is redundant as it does not contain any code and does not provide any unique contributions or improvements. The existing functions already cover the necessary functionalities.
{{- include "kubernetes-role-generator" (merge (dict "namespace" .Values.defaultNamespace "component" "kafka") .) }} | |
{{- if gt (len .Values.additionalFissionNamespaces) 0 }} | |
{{- range $namespace := $.Values.additionalFissionNamespaces }} | |
{{ include "kubernetes-role-generator" (merge (dict "namespace" $namespace "component" "kafka") $) }} | |
{{- end }} | |
{{- end }} |
View Diff
{{- include "kubernetes-role-generator" (merge (dict "namespace" .Values.defaultNamespace "component" "kafka") .) }} |
View Diff
{{- if gt (len .Values.additionalFissionNamespaces) 0 }} |
View Diff
{{- range $namespace := $.Values.additionalFissionNamespaces }} |
View Diff
{{ include "kubernetes-role-generator" (merge (dict "namespace" $namespace "component" "kafka") $) }} |
View Diff
{{- end }} |
View Diff
cmd/fission-bundle/main.go
- The new
runMessageQueueMgr
function does not include any specific error handling or logging within themqtrigger.Start
call, which could make debugging difficult if the function fails. - Sweep has identified a redundant function: The new function
runMessageQueueMgr
is redundant because its purpose and functionality are already covered by the existingrunMQManager
function.
fission/cmd/fission-bundle/main.go
Lines 78 to 80 in 65aa6ec
func runMessageQueueMgr(ctx context.Context, clientGen crd.ClientGeneratorInterface, logger *zap.Logger, mgr manager.Interface, routerUrl string) error { | |
return mqtrigger.Start(ctx, clientGen, logger, mgr, routerUrl) | |
} |
View Diff
fission/cmd/fission-bundle/main.go
Lines 78 to 80 in 65aa6ec
func runMessageQueueMgr(ctx context.Context, clientGen crd.ClientGeneratorInterface, logger *zap.Logger, mgr manager.Interface, routerUrl string) error { | |
return mqtrigger.Start(ctx, clientGen, logger, mgr, routerUrl) | |
} |
View Diff
cmd/fission-bundle/mqtrigger/mqtrigger.go
- The
readSecrets
function returns an error if the secrets directory does not exist, which could cause theStart
function to fail even if secrets are optional. - The
Start
function callslogger.Fatal
on failure to connect to the message queue server, which will terminate the application instead of allowing for graceful error handling. - Sweep has identified a redundant function: The new function is redundant as its purpose and functionality are already covered by existing methods such as
Start
inpkg/timer/main.go
,pkg/buildermgr/buildermgr.go
,pkg/kubewatcher/main.go
, andStartScalerManager
inpkg/mqtrigger/scalermanager.go
.
fission/cmd/fission-bundle/mqtrigger/mqtrigger.go
Lines 85 to 87 in 65aa6ec
// return if no secrets exist | |
if _, err := os.Stat(secretsPath); os.IsNotExist(err) { | |
return nil, err |
View Diff
fission/cmd/fission-bundle/mqtrigger/mqtrigger.go
Lines 73 to 74 in 65aa6ec
if err != nil { | |
logger.Fatal("failed to connect to remote message queue server", zap.Error(err)) |
View Diff
fission/cmd/fission-bundle/mqtrigger/mqtrigger.go
Lines 38 to 82 in 65aa6ec
func Start(ctx context.Context, clientGen crd.ClientGeneratorInterface, logger *zap.Logger, mgr manager.Interface, routerUrl string) error { | |
fissionClient, err := clientGen.GetFissionClient() | |
if err != nil { | |
return errors.Wrap(err, "failed to get fission client") | |
} | |
err = crd.WaitForFunctionCRDs(ctx, logger, fissionClient) | |
if err != nil { | |
return errors.Wrap(err, "error waiting for CRDs") | |
} | |
mqType := (fv1.MessageQueueType)(os.Getenv("MESSAGE_QUEUE_TYPE")) | |
mqUrl := os.Getenv("MESSAGE_QUEUE_URL") | |
secretsPath := strings.TrimSpace(os.Getenv("MESSAGE_QUEUE_SECRETS")) | |
var secrets map[string][]byte | |
if len(secretsPath) > 0 { | |
// For authentication with message queue | |
secrets, err = readSecrets(logger, secretsPath) | |
if err != nil { | |
return err | |
} | |
} | |
mq, err := factory.Create( | |
logger, | |
mqType, | |
messageQueue.Config{ | |
MQType: (string)(mqType), | |
Url: mqUrl, | |
Secrets: secrets, | |
}, | |
routerUrl, | |
) | |
if err != nil { | |
logger.Fatal("failed to connect to remote message queue server", zap.Error(err)) | |
} | |
mqtMgr := mqtrigger.MakeMessageQueueTriggerManager(logger, fissionClient, mqType, mq) | |
err = mqtMgr.Run(ctx, mgr) | |
if err != nil { | |
return err | |
} | |
return nil | |
} |
View Diff
go.sum
- The addition of multiple
github.com/jcmturner
libraries introduces several new dependencies that may have compatibility issues or introduce security vulnerabilities, especially with cryptographic and authentication functionalities.
Lines 257 to 284 in 65aa6ec
github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= | |
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= | |
github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= | |
github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= | |
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= | |
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= | |
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= | |
github.com/huandu/xstrings v1.3.3 h1:/Gcsuc1x8JVbJ9/rlye4xZnVAbEkGauT8lbebqcQws4= | |
github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= | |
github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA= | |
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= | |
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= | |
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= | |
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= | |
github.com/influxdata/influxdb v1.11.5 h1:+em5VOl6lhAZubXj5o6SobCwvrRs3XDlBx/MUI4schI= | |
github.com/influxdata/influxdb v1.11.5/go.mod h1:k8sWREQl1/9t46VrkrH5adUM4UNGIt206ipO3plbkw8= | |
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= | |
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= | |
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= | |
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= | |
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= | |
github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= | |
github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= | |
github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= | |
github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= | |
github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= | |
github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= | |
github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= |
View Diff
pkg/apis/core/v1/validation.go
- The change to
validator.IsValidTopic
introduces a potential issue if thevalidator.IsValidTopic
function does not handle the additionalMessageQueueType
parameter correctly, which could lead to validation failures.
fission/pkg/apis/core/v1/validation.go
Lines 526 to 531 in 65aa6ec
if !validator.IsValidTopic((string)(spec.MessageQueueType), spec.Topic, spec.MqtKind) { | |
result = multierror.Append(result, MakeValidationErr(ErrorInvalidValue, "MessageQueueTriggerSpec.Topic", spec.Topic, "not a valid topic")) | |
} | |
if len(spec.ResponseTopic) > 0 && !validator.IsValidTopic((string)(spec.MessageQueueType), spec.ResponseTopic, spec.MqtKind) { | |
result = multierror.Append(result, MakeValidationErr(ErrorInvalidValue, "MessageQueueTriggerSpec.ResponseTopic", spec.ResponseTopic, "not a valid topic")) |
View Diff
pkg/mqtrigger/factory/factory.go
- The panic messages in the
Register
function could cause the application to crash if a nil factory is registered or if a duplicate registration occurs. - Sweep has identified a redundant function: The new function is redundant because the
Register
function in thepkg/mqtrigger/validator/validator.go
file already performs similar tasks of registering with thread safety. - Sweep has identified a redundant function: The new function is redundant because its functionality is already covered by the existing
Create
method in thefactory
package.
fission/pkg/mqtrigger/factory/factory.go
Lines 45 to 50 in 65aa6ec
panic("Nil message queue factory") | |
} | |
_, registered := messageQueueFactories[mqType] | |
if registered { | |
panic("Message queue factory already register") |
View Diff
fission/pkg/mqtrigger/factory/factory.go
Lines 40 to 54 in 65aa6ec
func Register(mqType fv1.MessageQueueType, factory MessageQueueFactory) { | |
lock.Lock() | |
defer lock.Unlock() | |
if factory == nil { | |
panic("Nil message queue factory") | |
} | |
_, registered := messageQueueFactories[mqType] | |
if registered { | |
panic("Message queue factory already register") | |
} | |
messageQueueFactories[mqType] = factory | |
} |
View Diff
fission/pkg/mqtrigger/factory/factory.go
Lines 56 to 62 in 65aa6ec
func Create(logger *zap.Logger, mqType fv1.MessageQueueType, mqConfig messageQueue.Config, routerUrl string) (messageQueue.MessageQueue, error) { | |
factory, registered := messageQueueFactories[mqType] | |
if !registered { | |
return nil, errors.Errorf("no supported message queue type found for %q", mqType) | |
} | |
return factory.Create(logger, mqConfig, routerUrl) | |
} |
View Diff
pkg/mqtrigger/messageQueue/kafka/consumer.go
- The
ConsumeClaim
method does not handle the case whereclaim.Messages()
returns nil, which could lead to a nil pointer dereference. - The
kafkaMsgHandler
method retries HTTP requests without any delay, which could lead to rapid retry loops and potential rate limiting issues. - Sweep has identified a redundant function: The new function is redundant because its purpose, functionality, and data processing steps are already covered by the existing
NewMqtConsumerGroupHandler
function inpkg/mqtrigger/messageQueue/kafka/kafka.go
. - Sweep has identified a redundant function: The new function is redundant because the existing
Cleanup
method in theMqtConsumerGroupHandler
struct already performs the same logging tasks. - Sweep has identified a redundant function: The new function is redundant because its functionality is already covered by the existing
ConsumeClaims
method in theMqtConsumerGroupHandler
struct. - Sweep has identified a redundant function: The new function
kafkaMsgHandler
is redundant because its purpose and functionality are already covered by the existingkafkaMsgHandler
method inpkg/mqtrigger/messageQueue/kafka/consumer.go
. - Sweep has identified a redundant function: The new function is redundant because its functionality is already covered by the existing
errorHandler
function inpkg/mqtrigger/messageQueue/kafka/consumer.go
.
fission/pkg/mqtrigger/messageQueue/kafka/consumer.go
Lines 116 to 117 in 65aa6ec
case msg := <-claim.Messages(): | |
if msg != nil { |
View Diff
fission/pkg/mqtrigger/messageQueue/kafka/consumer.go
Lines 162 to 179 in 65aa6ec
for attempt := 0; attempt <= ch.trigger.Spec.MaxRetries; attempt++ { | |
// Make the request | |
resp, err = http.DefaultClient.Do(req) | |
if err != nil { | |
ch.logger.Error("sending function invocation request failed", | |
zap.Error(err), | |
zap.String("function_url", ch.fnUrl), | |
zap.String("trigger", ch.trigger.ObjectMeta.Name)) | |
continue | |
} | |
if resp == nil { | |
continue | |
} | |
if err == nil && resp.StatusCode == http.StatusOK { | |
// Success, quit retrying | |
break | |
} | |
} |
View Diff
fission/pkg/mqtrigger/messageQueue/kafka/consumer.go
Lines 45 to 72 in 65aa6ec
func NewMqtConsumerGroupHandler(version sarama.KafkaVersion, | |
logger *zap.Logger, | |
trigger *fv1.MessageQueueTrigger, | |
producer sarama.SyncProducer, | |
routerUrl string) MqtConsumerGroupHandler { | |
ch := MqtConsumerGroupHandler{ | |
version: version, | |
logger: logger, | |
trigger: trigger, | |
producer: producer, | |
ready: make(chan bool), | |
} | |
// Support other function ref types | |
if ch.trigger.Spec.FunctionReference.Type != fv1.FunctionReferenceTypeFunctionName { | |
ch.logger.Fatal("unsupported function reference type for trigger", | |
zap.Any("function_reference_type", ch.trigger.Spec.FunctionReference.Type), | |
zap.String("trigger", ch.trigger.ObjectMeta.Name)) | |
} | |
// Generate the Headers | |
ch.fissionHeaders = map[string]string{ | |
"X-Fission-MQTrigger-Topic": ch.trigger.Spec.Topic, | |
"X-Fission-MQTrigger-RespTopic": ch.trigger.Spec.ResponseTopic, | |
"X-Fission-MQTrigger-ErrorTopic": ch.trigger.Spec.ErrorTopic, | |
"Content-Type": ch.trigger.Spec.ContentType, | |
} | |
ch.fnUrl = routerUrl + "/" + strings.TrimPrefix(utils.UrlForFunction(ch.trigger.Spec.FunctionReference.Name, ch.trigger.ObjectMeta.Namespace), "/") | |
ch.logger.Debug("function HTTP URL", zap.String("url", ch.fnUrl)) | |
return ch |
View Diff
fission/pkg/mqtrigger/messageQueue/kafka/consumer.go
Lines 90 to 98 in 65aa6ec
func (ch MqtConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { | |
ch.logger.With( | |
zap.String("trigger", ch.trigger.ObjectMeta.Name), | |
zap.String("topic", ch.trigger.Spec.Topic), | |
zap.String("memberID", session.MemberID()), | |
zap.Int32("generationID", session.GenerationID()), | |
zap.String("claims", fmt.Sprintf("%v", session.Claims())), | |
).Info("consumer group session cleanup") | |
return nil |
View Diff
fission/pkg/mqtrigger/messageQueue/kafka/consumer.go
Lines 102 to 131 in 65aa6ec
func (ch MqtConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { | |
trigger := ch.trigger.Name | |
triggerNamespace := ch.trigger.Namespace | |
topic := claim.Topic() | |
partition := string(claim.Partition()) | |
// initially set message lag count | |
mqtrigger.SetMessageLagCount(trigger, triggerNamespace, topic, partition, claim.HighWaterMarkOffset()-claim.InitialOffset()) | |
// Do not move the code below to a goroutine. | |
// The `ConsumeClaim` itself is called within a goroutine | |
for { | |
select { | |
case msg := <-claim.Messages(): | |
if msg != nil { | |
ch.kafkaMsgHandler(msg) | |
session.MarkMessage(msg, "") | |
mqtrigger.IncreaseMessageCount(trigger, triggerNamespace) | |
} | |
mqtrigger.SetMessageLagCount(trigger, triggerNamespace, topic, partition, | |
claim.HighWaterMarkOffset()-msg.Offset-1) | |
// Should return when `session.Context()` is done. | |
case <-session.Context().Done(): | |
return nil | |
} | |
} | |
} |
View Diff
fission/pkg/mqtrigger/messageQueue/kafka/consumer.go
Lines 133 to 252 in 65aa6ec
func (ch *MqtConsumerGroupHandler) kafkaMsgHandler(msg *sarama.ConsumerMessage) { | |
value := string(msg.Value) | |
// Create request | |
req, err := http.NewRequest("POST", ch.fnUrl, strings.NewReader(value)) | |
if err != nil { | |
ch.logger.Error("failed to create HTTP request to invoke function", | |
zap.Error(err), | |
zap.String("function_url", ch.fnUrl)) | |
return | |
} | |
// Set the headers came from Kafka record | |
// Using Header.Add() as msg.Headers may have keys with more than one value | |
if ch.version.IsAtLeast(sarama.V0_11_0_0) { | |
for _, h := range msg.Headers { | |
req.Header.Add(string(h.Key), string(h.Value)) | |
} | |
} else { | |
ch.logger.Warn("headers are not supported by current Kafka version, needs v0.11+: no record headers to add in HTTP request", | |
zap.Any("current_version", ch.version)) | |
} | |
for k, v := range ch.fissionHeaders { | |
req.Header.Set(k, v) | |
} | |
// Make the request | |
var resp *http.Response | |
for attempt := 0; attempt <= ch.trigger.Spec.MaxRetries; attempt++ { | |
// Make the request | |
resp, err = http.DefaultClient.Do(req) | |
if err != nil { | |
ch.logger.Error("sending function invocation request failed", | |
zap.Error(err), | |
zap.String("function_url", ch.fnUrl), | |
zap.String("trigger", ch.trigger.ObjectMeta.Name)) | |
continue | |
} | |
if resp == nil { | |
continue | |
} | |
if err == nil && resp.StatusCode == http.StatusOK { | |
// Success, quit retrying | |
break | |
} | |
} | |
generateErrorHeaders := func(errString string) []sarama.RecordHeader { | |
var errorHeaders []sarama.RecordHeader | |
if ch.version.IsAtLeast(sarama.V0_11_0_0) { | |
if count, ok := errorMessageMap[errString]; ok { | |
errorMessageMap[errString] = count + 1 | |
} else { | |
errorMessageMap[errString] = 1 | |
} | |
errorHeaders = append(errorHeaders, sarama.RecordHeader{Key: []byte("MessageSource"), Value: []byte(ch.trigger.Spec.Topic)}) | |
errorHeaders = append(errorHeaders, sarama.RecordHeader{Key: []byte("RecycleCounter"), Value: []byte(strconv.Itoa(errorMessageMap[errString]))}) | |
} | |
return errorHeaders | |
} | |
if resp == nil { | |
errorString := fmt.Sprintf("request exceed retries: %v", ch.trigger.Spec.MaxRetries) | |
errorHeaders := generateErrorHeaders(errorString) | |
errorHandler(ch.logger, ch.trigger, ch.producer, ch.fnUrl, | |
fmt.Errorf(errorString), errorHeaders) | |
return | |
} | |
defer resp.Body.Close() | |
body, err := io.ReadAll(resp.Body) | |
ch.logger.Debug("got response from function invocation", | |
zap.String("function_url", ch.fnUrl), | |
zap.String("trigger", ch.trigger.ObjectMeta.Name), | |
zap.String("body", string(body))) | |
if err != nil { | |
errorString := "request body error: " + string(body) | |
errorHeaders := generateErrorHeaders(errorString) | |
errorHandler(ch.logger, ch.trigger, ch.producer, ch.fnUrl, | |
errors.Wrapf(err, errorString), errorHeaders) | |
return | |
} | |
if resp.StatusCode != 200 { | |
errorString := fmt.Sprintf("request returned failure: %v, request body error: %v", resp.StatusCode, body) | |
errorHeaders := generateErrorHeaders(errorString) | |
errorHandler(ch.logger, ch.trigger, ch.producer, ch.fnUrl, | |
fmt.Errorf("request returned failure: %v", resp.StatusCode), errorHeaders) | |
return | |
} | |
if len(ch.trigger.Spec.ResponseTopic) > 0 { | |
// Generate Kafka record headers | |
var kafkaRecordHeaders []sarama.RecordHeader | |
if ch.version.IsAtLeast(sarama.V0_11_0_0) { | |
for k, v := range resp.Header { | |
// One key may have multiple values | |
for _, v := range v { | |
kafkaRecordHeaders = append(kafkaRecordHeaders, sarama.RecordHeader{Key: []byte(k), Value: []byte(v)}) | |
} | |
} | |
} else { | |
ch.logger.Warn("headers are not supported by current Kafka version, needs v0.11+: no record headers to add in HTTP request", | |
zap.Any("current_version", ch.version)) | |
} | |
_, _, err := ch.producer.SendMessage(&sarama.ProducerMessage{ | |
Topic: ch.trigger.Spec.ResponseTopic, | |
Value: sarama.StringEncoder(body), | |
Headers: kafkaRecordHeaders, | |
}) | |
if err != nil { | |
ch.logger.Warn("failed to publish response body from function invocation to topic", | |
zap.Error(err), | |
zap.String("topic", ch.trigger.Spec.Topic), | |
zap.String("function_url", ch.fnUrl)) | |
return | |
} | |
} | |
} |
View Diff
fission/pkg/mqtrigger/messageQueue/kafka/consumer.go
Lines 254 to 272 in 65aa6ec
func errorHandler(logger *zap.Logger, trigger *fv1.MessageQueueTrigger, producer sarama.SyncProducer, funcUrl string, err error, errorTopicHeaders []sarama.RecordHeader) { | |
if len(trigger.Spec.ErrorTopic) > 0 { | |
_, _, e := producer.SendMessage(&sarama.ProducerMessage{ | |
Topic: trigger.Spec.ErrorTopic, | |
Value: sarama.StringEncoder(err.Error()), | |
Headers: errorTopicHeaders, | |
}) | |
if e != nil { | |
logger.Error("failed to publish message to error topic", | |
zap.Error(e), | |
zap.String("trigger", trigger.ObjectMeta.Name), | |
zap.String("message", err.Error()), | |
zap.String("topic", trigger.Spec.Topic)) | |
} | |
} else { | |
logger.Error("message received to publish to error topic, but no error topic was set", | |
zap.String("message", err.Error()), zap.String("trigger", trigger.ObjectMeta.Name), zap.String("function_url", funcUrl)) | |
} | |
} |
View Diff
pkg/mqtrigger/messageQueue/kafka/kafka.go
- The
Subscribe
method does not handle the case where the consumer group fails to consume messages, potentially leading to an infinite loop. - The
New
function does not handle the case where the Kafka version parsing fails and defaults to an uninitializedkafkaVersion
. - The
getTLSConfig
method does not check if thecaCert
is successfully appended to thecaCertPool
, which could lead to TLS configuration issues. - Sweep has identified a redundant function: The new function duplicates the functionality of the
Register
function inpkg/mqtrigger/factory/factory.go
and theIsTopicValid
function inpkg/mqtrigger/messageQueue/kafka/kafka.go
. - Sweep has identified a redundant function: The new function is redundant because its purpose and functionality are already covered by the existing
Create
method in theMessageQueueFactory
interface and its implementations. - Sweep has identified a redundant function: The new function is redundant because its purpose and functionality are already covered by the existing
Create
method in theFactory
struct inpkg/mqtrigger/messageQueue/kafka/kafka.go
. - Sweep has identified a redundant function: The new function is redundant because its functionality is already covered by existing methods such as
NewMqtConsumerGroupHandler
,MqtConsumerGroupHandler.Setup
,MqtConsumerGroupHandler.Cleanup
, andMqtConsumerGroupHandler.ConsumeClaim
. - Sweep has identified a redundant function: The new function is redundant because the existing
getTLSConfig
method in theKafka
struct already handles the same tasks and operations. - Sweep has identified a redundant function: The new function is redundant as its purpose and functionality are already covered by the existing
Unsubscribe
methods infakeMessageQueue
andKafka
structs. - Sweep has identified a redundant function: The new function is redundant because the existing
IsTopicValid
function inpkg/mqtrigger/messageQueue/kafka/kafka.go
already performs the same validations.
fission/pkg/mqtrigger/messageQueue/kafka/kafka.go
Lines 185 to 197 in 65aa6ec
for { | |
// Consume messages | |
err := consumer.Consume(ctx, topic, ch) | |
if err != nil { | |
kafka.logger.Error("consumer error", zap.Error(err), zap.String("trigger", trigger.ObjectMeta.Name)) | |
} | |
if ctx.Err() != nil { | |
kafka.logger.Info("consumer context cancelled", zap.String("trigger", trigger.ObjectMeta.Name)) | |
return | |
} | |
ch.ready = make(chan bool) | |
} |
View Diff
fission/pkg/mqtrigger/messageQueue/kafka/kafka.go
Lines 82 to 88 in 65aa6ec
kafkaVersion, err := sarama.ParseKafkaVersion(mqKafkaVersion) | |
if err != nil { | |
logger.Warn("error parsing kafka version string - falling back to default", | |
zap.Error(err), | |
zap.String("failed_version", mqKafkaVersion), | |
zap.Any("default_version", kafkaVersion)) | |
} |
View Diff
fission/pkg/mqtrigger/messageQueue/kafka/kafka.go
Lines 227 to 229 in 65aa6ec
caCertPool := x509.NewCertPool() | |
caCertPool.AppendCertsFromPEM(kafka.authKeys["caCert"]) | |
tlsConfig.RootCAs = caCertPool |
View Diff
fission/pkg/mqtrigger/messageQueue/kafka/kafka.go
Lines 38 to 40 in 65aa6ec
func init() { | |
factory.Register(fv1.MessageQueueTypeKafka, &Factory{}) | |
validator.Register(fv1.MessageQueueTypeKafka, IsTopicValid) |
View Diff
fission/pkg/mqtrigger/messageQueue/kafka/kafka.go
Lines 71 to 73 in 65aa6ec
func (factory *Factory) Create(logger *zap.Logger, mqCfg messageQueue.Config, routerUrl string) (messageQueue.MessageQueue, error) { | |
return New(logger, mqCfg, routerUrl) | |
} |
View Diff
fission/pkg/mqtrigger/messageQueue/kafka/kafka.go
Lines 75 to 146 in 65aa6ec
func New(logger *zap.Logger, mqCfg messageQueue.Config, routerUrl string) (messageQueue.MessageQueue, error) { | |
if len(routerUrl) == 0 || len(mqCfg.Url) == 0 { | |
return nil, errors.New("the router URL or MQ URL is empty") | |
} | |
mqKafkaVersion := os.Getenv("MESSAGE_QUEUE_KAFKA_VERSION") | |
// Parse version string | |
kafkaVersion, err := sarama.ParseKafkaVersion(mqKafkaVersion) | |
if err != nil { | |
logger.Warn("error parsing kafka version string - falling back to default", | |
zap.Error(err), | |
zap.String("failed_version", mqKafkaVersion), | |
zap.Any("default_version", kafkaVersion)) | |
} | |
kafka := Kafka{ | |
logger: logger.Named("kafka"), | |
routerUrl: routerUrl, | |
brokers: strings.Split(mqCfg.Url, ","), | |
version: kafkaVersion, | |
} | |
if tls, _ := strconv.ParseBool(os.Getenv("TLS_ENABLED")); tls { | |
kafka.tls = true | |
authKeys := make(map[string][]byte) | |
if mqCfg.Secrets == nil { | |
return nil, errors.New("no secrets were loaded") | |
} | |
authKeys["caCert"] = mqCfg.Secrets["caCert"] | |
authKeys["userCert"] = mqCfg.Secrets["userCert"] | |
authKeys["userKey"] = mqCfg.Secrets["userKey"] | |
kafka.authKeys = authKeys | |
} | |
logger.Info("created kafka queue", zap.Any("kafka brokers", kafka.brokers), | |
zap.Any("kafka version", kafka.version)) | |
// Create new config | |
saramaConfig := sarama.NewConfig() | |
saramaConfig.Version = kafka.version | |
// consumer config | |
saramaConfig.Consumer.Return.Errors = true | |
// producer config | |
saramaConfig.Producer.RequiredAcks = sarama.WaitForAll | |
saramaConfig.Producer.Retry.Max = 10 | |
saramaConfig.Producer.Return.Successes = true | |
// Setup TLS for both producer and consumer | |
if kafka.tls { | |
tlsConfig, err := kafka.getTLSConfig() | |
if err != nil { | |
return nil, err | |
} | |
saramaConfig.Net.TLS.Enable = true | |
saramaConfig.Net.TLS.Config = tlsConfig | |
} | |
saramaClient, err := sarama.NewClient(kafka.brokers, saramaConfig) | |
if err != nil { | |
return nil, err | |
} | |
kafka.client = saramaClient | |
return kafka, nil |
View Diff
fission/pkg/mqtrigger/messageQueue/kafka/kafka.go
Lines 149 to 207 in 65aa6ec
func (kafka Kafka) Subscribe(trigger *fv1.MessageQueueTrigger) (messageQueue.Subscription, error) { | |
kafka.logger.Debug("inside kakfa subscribe", zap.Any("trigger", trigger)) | |
kafka.logger.Debug("brokers set", zap.Strings("brokers", kafka.brokers)) | |
consumer, err := sarama.NewConsumerGroupFromClient(string(trigger.ObjectMeta.UID), kafka.client) | |
if err != nil { | |
return nil, err | |
} | |
producer, err := sarama.NewSyncProducerFromClient(kafka.client) | |
if err != nil { | |
return nil, err | |
} | |
kafka.logger.Info("created a new producer and a new consumer", zap.Strings("brokers", kafka.brokers), | |
zap.String("topic", trigger.Spec.Topic), | |
zap.String("response topic", trigger.Spec.ResponseTopic), | |
zap.String("error topic", trigger.Spec.ErrorTopic), | |
zap.String("trigger", trigger.ObjectMeta.Name), | |
zap.String("function namespace", trigger.ObjectMeta.Namespace), | |
zap.String("function name", trigger.Spec.FunctionReference.Name)) | |
// consume errors | |
go func() { | |
for err := range consumer.Errors() { | |
kafka.logger.With(zap.String("trigger", trigger.ObjectMeta.Name), zap.String("topic", trigger.Spec.Topic)).Error("consumer error received", zap.Error(err)) | |
} | |
}() | |
ctx, cancel := context.WithCancel(context.Background()) | |
ch := NewMqtConsumerGroupHandler(kafka.version, kafka.logger, trigger, producer, kafka.routerUrl) | |
// consume messages | |
go func() { | |
topic := []string{trigger.Spec.Topic} | |
// Create a new session for the consumer group until the context is cancelled | |
for { | |
// Consume messages | |
err := consumer.Consume(ctx, topic, ch) | |
if err != nil { | |
kafka.logger.Error("consumer error", zap.Error(err), zap.String("trigger", trigger.ObjectMeta.Name)) | |
} | |
if ctx.Err() != nil { | |
kafka.logger.Info("consumer context cancelled", zap.String("trigger", trigger.ObjectMeta.Name)) | |
return | |
} | |
ch.ready = make(chan bool) | |
} | |
}() | |
<-ch.ready // wait for consumer to be ready | |
mqtConsumer := MqtConsumer{ | |
ctx: ctx, | |
cancel: cancel, | |
consumer: consumer, | |
} | |
return mqtConsumer, nil |
View Diff
fission/pkg/mqtrigger/messageQueue/kafka/kafka.go
Lines 210 to 231 in 65aa6ec
func (kafka Kafka) getTLSConfig() (*tls.Config, error) { | |
tlsConfig := tls.Config{} | |
cert, err := tls.X509KeyPair(kafka.authKeys["userCert"], kafka.authKeys["userKey"]) | |
if err != nil { | |
return nil, err | |
} | |
tlsConfig.Certificates = []tls.Certificate{cert} | |
skipVerify, err := strconv.ParseBool(os.Getenv("INSECURE_SKIP_VERIFY")) | |
if err != nil { | |
kafka.logger.Error("failed to parse value of env variable INSECURE_SKIP_VERIFY taking default value false, expected boolean value: true/false", | |
zap.String("received", os.Getenv("INSECURE_SKIP_VERIFY"))) | |
} else { | |
tlsConfig.InsecureSkipVerify = skipVerify | |
} | |
caCertPool := x509.NewCertPool() | |
caCertPool.AppendCertsFromPEM(kafka.authKeys["caCert"]) | |
tlsConfig.RootCAs = caCertPool | |
return &tlsConfig, nil |
View Diff
fission/pkg/mqtrigger/messageQueue/kafka/kafka.go
Lines 234 to 237 in 65aa6ec
func (kafka Kafka) Unsubscribe(subscription messageQueue.Subscription) error { | |
mqtConsumer := subscription.(MqtConsumer) | |
mqtConsumer.cancel() | |
return mqtConsumer.consumer.Close() |
View Diff
fission/pkg/mqtrigger/messageQueue/kafka/kafka.go
Lines 242 to 255 in 65aa6ec
func IsTopicValid(topic string) bool { | |
if len(topic) == 0 { | |
return false | |
} | |
if topic == "." || topic == ".." { | |
return false | |
} | |
if len(topic) > 249 { | |
return false | |
} | |
if !validKafkaTopicName.MatchString(topic) { | |
return false | |
} | |
return true |
View Diff
pkg/mqtrigger/messageQueue/messageQueue.go
- Changing the package name from "mqtrigger" to "messageQueue" requires updating all import paths in the codebase that reference this package, which could lead to broken imports if not done comprehensively.
package messageQueue |
View Diff
pkg/mqtrigger/metrics.go
- Sweep has identified a redundant function: The new function
IncreaseSubscriptionCount
is redundant because the same functionality is already provided by the existingIncreaseSubscriptionCount
function in thepkg/mqtrigger/mqtmanager.go
file. - Sweep has identified a redundant function: The new function
DecreaseSubscriptionCount
is redundant because the existingDecreaseSubscriptionCount
function in thepkg/mqtrigger/mqtmanager.go
file already performs the same task. - Sweep has identified a redundant function: The new function
SetMessageLagCount
is redundant because its functionality is already implemented in theConsumeClaim
method of theMqtConsumerGroupHandler
struct. - Sweep has identified a redundant function: The new function is redundant because the
init
function inpkg/mqtrigger/metrics.go
already registers the same metrics (subscriptionCount
,messageCount
,messageLagCount
) with the metrics registry.
fission/pkg/mqtrigger/metrics.go
Lines 50 to 52 in 65aa6ec
func IncreaseSubscriptionCount() { | |
subscriptionCount.WithLabelValues().Inc() | |
} |
View Diff
fission/pkg/mqtrigger/metrics.go
Lines 54 to 56 in 65aa6ec
func DecreaseSubscriptionCount() { | |
subscriptionCount.WithLabelValues().Dec() | |
} |
View Diff
fission/pkg/mqtrigger/metrics.go
Lines 62 to 64 in 65aa6ec
func SetMessageLagCount(trigname, trignamespace, topic, partition string, lag int64) { | |
messageLagCount.WithLabelValues(trigname, trignamespace, topic, partition).Set(float64(lag)) | |
} |
View Diff
fission/pkg/mqtrigger/metrics.go
Lines 66 to 71 in 65aa6ec
func init() { | |
registry := metrics.Registry | |
registry.MustRegister(subscriptionCount) | |
registry.MustRegister(messageCount) | |
registry.MustRegister(messageLagCount) | |
} |
View Diff
pkg/mqtrigger/mqtmanager.go
- The
RegisterTrigger
method does not handle the case whereSubscribe
returns an error but the subscription is not nil, which could lead to inconsistent state. - The
service
method does not handle the case wherereqChan
is closed, which could lead to a panic. - Sweep has identified a redundant function: The new function is redundant because the existing
MakeMessageQueueTriggerManager
function inpkg/mqtrigger/mqtmanager.go
already performs the same initialization tasks. - Sweep has identified a redundant function: The new function is redundant because its tasks and purpose are already covered by
StartScalerManager
inpkg/mqtrigger/scalermanager.go
andRun
inpkg/timer/timerSync.go
. - Sweep has identified a redundant function: The new function is redundant because its purpose and functionality are already covered by the existing
service
method in theMessageQueueTriggerManager
struct. - Sweep has identified a redundant function: The new function
addTrigger
is redundant because its functionality is already covered by the existingaddTrigger
method in theMessageQueueTriggerManager
class. - Sweep has identified a redundant function: The new function
getTriggerSubscription
is redundant because its functionality is already covered by the existingcheckTriggerSubscription
method, which internally callsgetTriggerSubscription
. - Sweep has identified a redundant function: The new function
checkTriggerSubscription
is redundant because its purpose and functionality are already covered by the existing methodgetTriggerSubscription
. It does not provide any unique contributions. - Sweep has identified a redundant function: The new function
delTriggerSubscription
is redundant because its functionality is already covered by theservice
method, which handlesDELETE_TRIGGER
requests. - Sweep has identified a redundant function: The new function
mqtInformerHandlers
is redundant as its functionality is already covered by the existingmqTriggerEventHandlers
function inscalermanager.go
.
fission/pkg/mqtrigger/mqtmanager.go
Lines 164 to 191 in 65aa6ec
func (mqt *MessageQueueTriggerManager) RegisterTrigger(trigger *fv1.MessageQueueTrigger) { | |
isPresent := mqt.checkTriggerSubscription(trigger) | |
if isPresent { | |
mqt.logger.Debug("message queue trigger already registered", zap.String("trigger_name", trigger.ObjectMeta.Name)) | |
return | |
} | |
// actually subscribe using the message queue client impl | |
sub, err := mqt.messageQueue.Subscribe(trigger) | |
if err != nil { | |
mqt.logger.Warn("failed to subscribe to message queue trigger", zap.Error(err), zap.String("trigger_name", trigger.ObjectMeta.Name)) | |
return | |
} | |
if sub == nil { | |
mqt.logger.Warn("subscription is nil", zap.String("trigger_name", trigger.ObjectMeta.Name)) | |
return | |
} | |
triggerSub := triggerSubscription{ | |
trigger: *trigger, | |
subscription: sub, | |
} | |
// add to our list | |
err = mqt.addTrigger(&triggerSub) | |
if err != nil { | |
mqt.logger.Fatal("adding message queue trigger failed", zap.Error(err), zap.String("trigger_name", trigger.ObjectMeta.Name)) | |
} | |
mqt.logger.Info("message queue trigger created", zap.String("trigger_name", trigger.ObjectMeta.Name)) | |
} |
View Diff
fission/pkg/mqtrigger/mqtmanager.go
Lines 102 to 136 in 65aa6ec
func (mqt *MessageQueueTriggerManager) service() { | |
for { | |
req := <-mqt.reqChan | |
resp := response{triggerSub: nil, err: nil} | |
k, err := k8sCache.MetaNamespaceKeyFunc(&req.triggerSub.trigger) | |
if err != nil { | |
resp.err = err | |
req.respChan <- resp | |
continue | |
} | |
switch req.requestType { | |
case ADD_TRIGGER: | |
if _, ok := mqt.triggers[k]; ok { | |
resp.err = errors.New("trigger already exists") | |
} else { | |
mqt.triggers[k] = req.triggerSub | |
mqt.logger.Debug("set trigger subscription", zap.String("key", k)) | |
IncreaseSubscriptionCount() | |
} | |
req.respChan <- resp | |
case GET_TRIGGER_SUBSCRIPTION: | |
if _, ok := mqt.triggers[k]; !ok { | |
resp.err = errors.New("trigger does not exist") | |
} else { | |
resp.triggerSub = mqt.triggers[k] | |
} | |
req.respChan <- resp | |
case DELETE_TRIGGER: | |
delete(mqt.triggers, k) | |
mqt.logger.Debug("delete trigger", zap.String("key", k)) | |
DecreaseSubscriptionCount() | |
req.respChan <- resp | |
} | |
} |
View Diff
fission/pkg/mqtrigger/mqtmanager.go
Lines 69 to 80 in 65aa6ec
func MakeMessageQueueTriggerManager(logger *zap.Logger, | |
fissionClient versioned.Interface, mqType fv1.MessageQueueType, messageQueue messageQueue.MessageQueue) *MessageQueueTriggerManager { | |
mqTriggerMgr := MessageQueueTriggerManager{ | |
logger: logger.Named("message_queue_trigger_manager"), | |
reqChan: make(chan request), | |
triggers: make(map[string]*triggerSubscription), | |
fissionClient: fissionClient, | |
messageQueueType: mqType, | |
messageQueue: messageQueue, | |
} | |
return &mqTriggerMgr | |
} |
View Diff
fission/pkg/mqtrigger/mqtmanager.go
Lines 82 to 99 in 65aa6ec
func (mqt *MessageQueueTriggerManager) Run(ctx context.Context, mgr manager.Interface) error { | |
go mqt.service() | |
for _, informer := range utils.GetInformersForNamespaces(mqt.fissionClient, time.Minute*30, fv1.MessageQueueResource) { | |
_, err := informer.AddEventHandler(mqt.mqtInformerHandlers()) | |
if err != nil { | |
return err | |
} | |
mgr.Add(ctx, func(ctx context.Context) { | |
informer.Run(ctx.Done()) | |
}) | |
if ok := k8sCache.WaitForCacheSync(ctx.Done(), informer.HasSynced); !ok { | |
mqt.logger.Fatal("failed to wait for caches to sync") | |
} | |
} | |
mgr.Add(ctx, func(ctx context.Context) { | |
metrics.ServeMetrics(ctx, "mqtrigger", mqt.logger, mgr) | |
}) | |
return nil |
View Diff
fission/pkg/mqtrigger/mqtmanager.go
Lines 102 to 137 in 65aa6ec
func (mqt *MessageQueueTriggerManager) service() { | |
for { | |
req := <-mqt.reqChan | |
resp := response{triggerSub: nil, err: nil} | |
k, err := k8sCache.MetaNamespaceKeyFunc(&req.triggerSub.trigger) | |
if err != nil { | |
resp.err = err | |
req.respChan <- resp | |
continue | |
} | |
switch req.requestType { | |
case ADD_TRIGGER: | |
if _, ok := mqt.triggers[k]; ok { | |
resp.err = errors.New("trigger already exists") | |
} else { | |
mqt.triggers[k] = req.triggerSub | |
mqt.logger.Debug("set trigger subscription", zap.String("key", k)) | |
IncreaseSubscriptionCount() | |
} | |
req.respChan <- resp | |
case GET_TRIGGER_SUBSCRIPTION: | |
if _, ok := mqt.triggers[k]; !ok { | |
resp.err = errors.New("trigger does not exist") | |
} else { | |
resp.triggerSub = mqt.triggers[k] | |
} | |
req.respChan <- resp | |
case DELETE_TRIGGER: | |
delete(mqt.triggers, k) | |
mqt.logger.Debug("delete trigger", zap.String("key", k)) | |
DecreaseSubscriptionCount() | |
req.respChan <- resp | |
} | |
} | |
} |
View Diff
fission/pkg/mqtrigger/mqtmanager.go
Lines 145 to 147 in 65aa6ec
func (mqt *MessageQueueTriggerManager) addTrigger(triggerSub *triggerSubscription) error { | |
resp := mqt.makeRequest(ADD_TRIGGER, triggerSub) | |
return resp.err |
View Diff
fission/pkg/mqtrigger/mqtmanager.go
Lines 150 to 152 in 65aa6ec
func (mqt *MessageQueueTriggerManager) getTriggerSubscription(trigger *fv1.MessageQueueTrigger) *triggerSubscription { | |
resp := mqt.makeRequest(GET_TRIGGER_SUBSCRIPTION, &triggerSubscription{trigger: *trigger}) | |
return resp.triggerSub |
View Diff
fission/pkg/mqtrigger/mqtmanager.go
Lines 155 to 157 in 65aa6ec
func (mqt *MessageQueueTriggerManager) checkTriggerSubscription(trigger *fv1.MessageQueueTrigger) bool { | |
return mqt.getTriggerSubscription(trigger) != nil | |
} |
View Diff
fission/pkg/mqtrigger/mqtmanager.go
Lines 159 to 161 in 65aa6ec
func (mqt *MessageQueueTriggerManager) delTriggerSubscription(trigger *fv1.MessageQueueTrigger) error { | |
resp := mqt.makeRequest(DELETE_TRIGGER, &triggerSubscription{trigger: *trigger}) | |
return resp.err |
View Diff
fission/pkg/mqtrigger/mqtmanager.go
Lines 193 to 225 in 65aa6ec
func (mqt *MessageQueueTriggerManager) mqtInformerHandlers() k8sCache.ResourceEventHandlerFuncs { | |
return k8sCache.ResourceEventHandlerFuncs{ | |
AddFunc: func(obj interface{}) { | |
trigger := obj.(*fv1.MessageQueueTrigger) | |
mqt.logger.Debug("Added mqt", zap.Any("trigger: ", trigger.ObjectMeta)) | |
mqt.RegisterTrigger(trigger) | |
}, | |
DeleteFunc: func(obj interface{}) { | |
trigger := obj.(*fv1.MessageQueueTrigger) | |
mqt.logger.Debug("Delete mqt", zap.Any("trigger: ", trigger.ObjectMeta)) | |
triggerSubscription := mqt.getTriggerSubscription(trigger) | |
if triggerSubscription == nil { | |
mqt.logger.Info("Unsubscribe failed", zap.String("trigger_name", trigger.ObjectMeta.Name)) | |
return | |
} | |
err := mqt.messageQueue.Unsubscribe(triggerSubscription.subscription) | |
if err != nil { | |
mqt.logger.Warn("failed to unsubscribe from message queue trigger", zap.Error(err), zap.String("trigger_name", trigger.ObjectMeta.Name)) | |
return | |
} | |
err = mqt.delTriggerSubscription(trigger) | |
if err != nil { | |
mqt.logger.Warn("deleting message queue trigger failed", zap.Error(err), zap.String("trigger_name", trigger.ObjectMeta.Name)) | |
} | |
mqt.logger.Info("message queue trigger deleted", zap.String("trigger_name", trigger.ObjectMeta.Name)) | |
}, | |
UpdateFunc: func(oldObj interface{}, newObj interface{}) { | |
trigger := newObj.(*fv1.MessageQueueTrigger) | |
mqt.logger.Debug("Updated mqt", zap.Any("trigger: ", trigger.ObjectMeta)) | |
mqt.RegisterTrigger(trigger) | |
}, | |
} |
View Diff
pkg/mqtrigger/mqtmanager_test.go
- The test function does not wait for the
mgr.service()
goroutine to complete, which could lead to race conditions or incomplete test execution. - Sweep has identified a redundant function: The new function is redundant because its functionality is already covered by the
Subscribe
method in theKafka
struct found inpkg/mqtrigger/messageQueue/kafka/kafka.go
. - Sweep has identified a redundant function: The new function is redundant because its purpose and functionality are already covered by the existing
Unsubscribe
method in theKafka
struct. - Sweep has identified a redundant function: The new function
TestMqtManager
is redundant as its purpose and functionality are already covered by existing test methods in thepkg/mqtrigger/mqtmanager_test.go
file.
fission/pkg/mqtrigger/mqtmanager_test.go
Line 58 in 65aa6ec
go mgr.service() |
View Diff
fission/pkg/mqtrigger/mqtmanager_test.go
Lines 38 to 45 in 65aa6ec
func (f fakeMessageQueue) Subscribe(trigger *fv1.MessageQueueTrigger) (messageQueue.Subscription, error) { | |
ctx, cancel := context.WithCancel(context.Background()) | |
mqtConsumer := mqtConsumer{ | |
ctx: ctx, | |
cancel: cancel, | |
} | |
return mqtConsumer, nil | |
} |
View Diff
fission/pkg/mqtrigger/mqtmanager_test.go
Lines 47 to 51 in 65aa6ec
func (f fakeMessageQueue) Unsubscribe(triggerSub messageQueue.Subscription) error { | |
sub := triggerSub.(mqtConsumer) | |
sub.cancel() | |
return nil | |
} |
View Diff
fission/pkg/mqtrigger/mqtmanager_test.go
Lines 53 to 98 in 65aa6ec
func TestMqtManager(t *testing.T) { | |
logger := loggerfactory.GetLogger() | |
defer logger.Sync() | |
msgQueue := fakeMessageQueue{} | |
mgr := MakeMessageQueueTriggerManager(logger, nil, fv1.MessageQueueTypeKafka, msgQueue) | |
go mgr.service() | |
trigger := fv1.MessageQueueTrigger{ | |
ObjectMeta: metav1.ObjectMeta{ | |
Name: "test", | |
Namespace: "default", | |
}, | |
} | |
if mgr.checkTriggerSubscription(&trigger) { | |
t.Errorf("checkTrigger should return false") | |
} | |
sub, err := msgQueue.Subscribe(&trigger) | |
if err != nil { | |
t.Errorf("Subscribe should not return error") | |
} | |
triggerSub := triggerSubscription{ | |
trigger: trigger, | |
subscription: sub, | |
} | |
err = mgr.addTrigger(&triggerSub) | |
if err != nil { | |
t.Errorf("addTrigger should not return error") | |
} | |
if !mgr.checkTriggerSubscription(&trigger) { | |
t.Errorf("checkTrigger should return true") | |
} | |
getSub := mgr.getTriggerSubscription(&trigger) | |
if getSub == nil { | |
t.Fatal("getTriggerSubscription should return triggerSub") | |
} | |
if getSub.trigger.ObjectMeta.Name != trigger.ObjectMeta.Name { | |
t.Errorf("getTriggerSubscription should return triggerSub with trigger name %s", trigger.ObjectMeta.Name) | |
} | |
getSub.subscription.(mqtConsumer).cancel() | |
err = mgr.delTriggerSubscription(&trigger) | |
if err != nil { | |
t.Errorf("delTriggerSubscription should not return error") | |
} | |
if mgr.checkTriggerSubscription(&trigger) { | |
t.Errorf("checkTrigger should return false") | |
} | |
} |
View Diff
pkg/mqtrigger/validator/validator.go
- The
Register
function panics if a validator is nil or already registered, which could cause the application to crash if not handled properly. - The
IsValidTopic
function does not handle the case wheremqtKind
is neither "keda" nor a registeredmqType
, potentially leading to unexpected behavior. - Sweep has identified a redundant function: The new function is redundant as its purpose and functionality are already covered by the existing
Register
function inpkg/mqtrigger/factory/factory.go
.
fission/pkg/mqtrigger/validator/validator.go
Lines 42 to 56 in 65aa6ec
func Register(mqType string, validator TopicValidator) { | |
lock.Lock() | |
defer lock.Unlock() | |
if validator == nil { | |
panic("Nil message queue topic validator") | |
} | |
_, registered := topicValidators[mqType] | |
if registered { | |
panic("Message queue topic validator already register") | |
} | |
topicValidators[mqType] = validator | |
} |
View Diff
fission/pkg/mqtrigger/validator/validator.go
Lines 58 to 67 in 65aa6ec
func IsValidTopic(mqType, topic, mqtKind string) bool { | |
if mqtKind == "keda" { | |
return true | |
} | |
validator, registered := topicValidators[mqType] | |
if !registered { | |
return false | |
} | |
return validator(topic) | |
} |
View Diff
fission/pkg/mqtrigger/validator/validator.go
Lines 42 to 56 in 65aa6ec
func Register(mqType string, validator TopicValidator) { | |
lock.Lock() | |
defer lock.Unlock() | |
if validator == nil { | |
panic("Nil message queue topic validator") | |
} | |
_, registered := topicValidators[mqType] | |
if registered { | |
panic("Message queue topic validator already register") | |
} | |
topicValidators[mqType] = validator | |
} |
View Diff
Potential Issues
Sweep is unsure if these are issues, but they might be worth checking out.
charts/fission-all/templates/_fission-kuberntes-role-generator.tpl
- The new conditional block for the "kafka" component does not include any validation or error handling if the "kafka-kuberules" template is missing or incorrectly defined.
fission/charts/fission-all/templates/_fission-kuberntes-role-generator.tpl
Lines 29 to 31 in 65aa6ec
{{- if eq "kafka" .component }} | |
{{- include "kafka-kuberules" . }} | |
{{- end }} |
View Diff
charts/fission-all/templates/mqt-fission-kafka/podmonitor.yaml
- If
.Values.podMonitor.namespace
is not defined, thePodMonitor
will be created in the default namespace, which may not be the intended behavior.
{{- if .Values.podMonitor.namespace }} | |
namespace: {{ .Values.podMonitor.namespace }} | |
{{- end }} |
View Diff
charts/fission-all/templates/mqt-fission-kafka/serviceaccount.yaml
- The namespace for the ServiceAccount is templated but not validated, which could lead to deployment issues if the Helm release namespace is not correctly specified.
namespace: {{ .Release.Namespace }} |
View Diff
charts/fission-all/values.yaml
- The
insecureSkipVerify
field in the TLS configuration, if set to true, could make the system susceptible to man-in-the-middle attacks.
fission/charts/fission-all/values.yaml
Lines 487 to 491 in 65aa6ec
enabled: false | |
## InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name. | |
## Warning: Setting this to true, makes TLS susceptible to man-in-the-middle attacks | |
## | |
insecureSkipVerify: false |
View Diff
go.mod
- The inclusion of multiple indirect dependencies from
github.com/jcmturner
libraries may introduce security vulnerabilities if these libraries are not properly vetted for security issues.
Lines 117 to 121 in 65aa6ec
github.com/jcmturner/aescts/v2 v2.0.0 // indirect | |
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect | |
github.com/jcmturner/gofork v1.7.6 // indirect | |
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect | |
github.com/jcmturner/rpc/v2 v2.0.3 // indirect |
View Diff
go.sum
- The addition of
github.com/IBM/sarama
introduces a new dependency that may have compatibility issues with existing Kafka client implementations or configurations.
Lines 17 to 21 in 65aa6ec
github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc= | |
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= | |
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= | |
github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ= | |
github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= |
View Diff
pkg/mqtrigger/metrics.go
- The
IncreaseSubscriptionCount
andDecreaseSubscriptionCount
functions do not use any labels, which may lead to issues if multiple instances of the metric are registered.
fission/pkg/mqtrigger/metrics.go
Lines 50 to 55 in 65aa6ec
func IncreaseSubscriptionCount() { | |
subscriptionCount.WithLabelValues().Inc() | |
} | |
func DecreaseSubscriptionCount() { | |
subscriptionCount.WithLabelValues().Dec() |
View Diff
pkg/mqtrigger/mqtmanager.go
- The
RegisterTrigger
method does not handle the case whereSubscribe
returns an error but the subscription is not nil, potentially leading to inconsistent state.
fission/pkg/mqtrigger/mqtmanager.go
Lines 164 to 191 in 65aa6ec
func (mqt *MessageQueueTriggerManager) RegisterTrigger(trigger *fv1.MessageQueueTrigger) { | |
isPresent := mqt.checkTriggerSubscription(trigger) | |
if isPresent { | |
mqt.logger.Debug("message queue trigger already registered", zap.String("trigger_name", trigger.ObjectMeta.Name)) | |
return | |
} | |
// actually subscribe using the message queue client impl | |
sub, err := mqt.messageQueue.Subscribe(trigger) | |
if err != nil { | |
mqt.logger.Warn("failed to subscribe to message queue trigger", zap.Error(err), zap.String("trigger_name", trigger.ObjectMeta.Name)) | |
return | |
} | |
if sub == nil { | |
mqt.logger.Warn("subscription is nil", zap.String("trigger_name", trigger.ObjectMeta.Name)) | |
return | |
} | |
triggerSub := triggerSubscription{ | |
trigger: *trigger, | |
subscription: sub, | |
} | |
// add to our list | |
err = mqt.addTrigger(&triggerSub) | |
if err != nil { | |
mqt.logger.Fatal("adding message queue trigger failed", zap.Error(err), zap.String("trigger_name", trigger.ObjectMeta.Name)) | |
} | |
mqt.logger.Info("message queue trigger created", zap.String("trigger_name", trigger.ObjectMeta.Name)) | |
} |
View Diff
The following file crds/v1/fission.io_messagequeuetriggers.yaml were not reviewed as they were deemed unsuitable for the following reason: None. If this is an error please let us know.
Signed-off-by: Md Soharab Ansari <soharab.ansari@infracloud.io>
Description
This PR reverts commit f44174d of the PR #2875.
This PR adds fission mqtrigger for kafka which implemented old approach.
https://fission.io/docs/usage/triggers/message-queue-trigger/kafka/
Which issue(s) this PR fixes:
Fixes #
Testing
Follow this example kafka-mqt-fission and generate specs as follows:
Invoke producer and watch over consumer logs.
Checklist: