-
Notifications
You must be signed in to change notification settings - Fork 2.6k
[jaeger-v2] Add kafka exporter and receiver configuration #4971
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
998fe18
6165551
5198ff7
46ef03f
8da3224
5a811c0
d6c2dc9
51b71d2
7607376
baa5eb8
caae89d
1b4980a
52f95fb
7352bfc
59d3071
2375329
657f192
fb44ba5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
service: | ||
pipelines: | ||
traces: | ||
receivers: [otlp, jaeger, zipkin] | ||
processors: [batch] | ||
exporters: [kafka] | ||
|
||
receivers: | ||
otlp: | ||
protocols: | ||
grpc: | ||
http: | ||
|
||
jaeger: | ||
protocols: | ||
grpc: | ||
thrift_binary: | ||
thrift_compact: | ||
thrift_http: | ||
|
||
zipkin: | ||
|
||
processors: | ||
batch: | ||
|
||
exporters: | ||
kafka: | ||
brokers: | ||
- localhost:9092 | ||
encoding: otlp_proto # available encodings are otlp_proto, jaeger_proto, jaeger_json, zipkin_proto, zipkin_json, zipkin_thrift |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
service: | ||
extensions: [jaeger_storage] | ||
pipelines: | ||
traces: | ||
receivers: [kafka] | ||
processors: [batch] | ||
exporters: [jaeger_storage_exporter] # same as in the default cmd/jaeger-v2/config.yaml | ||
telemetry: | ||
metrics: | ||
address: 0.0.0.0:8889 # to avoid port conflict with collector-with-kafka.yaml | ||
|
||
extensions: | ||
jaeger_storage: | ||
grpc: | ||
memstore: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we don't know it's a |
||
server: localhost:17271 | ||
connection-timeout: 5s | ||
|
||
receivers: | ||
kafka: | ||
brokers: | ||
- localhost:9092 | ||
encoding: otlp_proto # available encodings are otlp_proto, jaeger_proto, jaeger_json, zipkin_proto, zipkin_json, zipkin_thrift | ||
initial_offset: earliest # consume messages from the beginning | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. perhaps we should parameterize this and default to latest checkpoint rather than earliest, since it would be very bad to run with earliest in production. The integration tests can override the value via env var. |
||
|
||
processors: | ||
batch: | ||
|
||
exporters: | ||
jaeger_storage_exporter: | ||
trace_storage: memstore |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
service: | ||
extensions: [jaeger_storage] | ||
pipelines: | ||
traces: | ||
receivers: [kafka] | ||
processors: [batch] | ||
exporters: [jaeger_storage_exporter] # same as in the default cmd/jaeger-v2/config.yaml | ||
telemetry: | ||
metrics: | ||
address: 0.0.0.0:8889 # to avoid port conflict with collector-with-kafka.yaml | ||
|
||
extensions: | ||
jaeger_storage: | ||
memory: | ||
memstore: | ||
max_traces: 100000 | ||
memstore_archive: | ||
max_traces: 100000 | ||
|
||
receivers: | ||
kafka: | ||
brokers: | ||
- localhost:9092 | ||
encoding: otlp_proto # available encodings are otlp_proto, jaeger_proto, jaeger_json, zipkin_proto, zipkin_json, zipkin_thrift | ||
initial_offset: earliest # consume messages from the beginning | ||
|
||
processors: | ||
batch: | ||
|
||
exporters: | ||
jaeger_storage_exporter: | ||
trace_storage: memstore |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# Integration | ||
|
||
Jaeger v2 integration tests are built on top of [OTEL Testbed module](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/testbed). OTEL Testbed provide comprehensive tools for conducting end-to-end tests for the OTEL Collector, such as reproducible short-term benchmarks, correctness tests, long-running stability tests and maximum load stress tests. To learn more about OTEL Testbed, please refer to the their [README](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/testbed/README.md) | ||
|
||
## kafka_test | ||
|
||
Kafka e2e test checks if the pipelines through `kafka` and finally at `remote-storage` have stored match exactly with the provided data using `GoldenDataProvider` (Provides data from the "Golden" dataset generated using pairwise combinatorial testing a.k.a PICT techniques for use in correctness tests) and validated using `CorrectnessTestValidator`. | ||
|
||
The pipelines are checked in 2 steps, which the first test case verifies if the spans sent to Kafka are correct, and the second one checks the spans stored in the remote storage. | ||
 | ||
|
||
To conduct the tests, run the following command: | ||
|
||
``` | ||
scripts/otel-kafka-integration-test.sh [kafka_version=latest] [remote_storage_version=latest] | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
FIXME |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
// Copyright (c) 2024 The Jaeger Authors. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package datareceivers | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" | ||
"go.opentelemetry.io/collector/component/componenttest" | ||
"go.opentelemetry.io/collector/consumer" | ||
"go.opentelemetry.io/collector/receiver" | ||
"go.opentelemetry.io/collector/receiver/receivertest" | ||
|
||
"github.com/jaegertracing/jaeger/cmd/jaeger/integration/receivers/storagereceiver" | ||
) | ||
|
||
type jaegerStorageDataReceiver struct { | ||
Port int | ||
receiver receiver.Traces | ||
} | ||
|
||
func NewJaegerStorageDataReceiver(port int) testbed.DataReceiver { | ||
return &jaegerStorageDataReceiver{Port: port} | ||
} | ||
|
||
func (dr *jaegerStorageDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, _ consumer.Logs) error { | ||
factory := storagereceiver.NewFactory() | ||
cfg := factory.CreateDefaultConfig().(*storagereceiver.Config) | ||
cfg.GRPC.RemoteServerAddr = fmt.Sprintf("localhost:%d", dr.Port) | ||
cfg.GRPC.RemoteConnectTimeout = time.Duration(5 * time.Second) | ||
// TODO add support for other backends | ||
|
||
var err error | ||
set := receivertest.NewNopCreateSettings() | ||
dr.receiver, err = factory.CreateTracesReceiver(context.Background(), set, cfg, tc) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return dr.receiver.Start(context.Background(), componenttest.NewNopHost()) | ||
} | ||
|
||
func (dr *jaegerStorageDataReceiver) Stop() error { | ||
return dr.receiver.Shutdown(context.Background()) | ||
} | ||
|
||
func (dr *jaegerStorageDataReceiver) GenConfigYAMLStr() string { | ||
return fmt.Sprintf(` | ||
jaeger_storage_receiver: | ||
grpc: | ||
server: localhost:%d`, dr.Port) | ||
} | ||
|
||
func (dr *jaegerStorageDataReceiver) ProtocolName() string { | ||
return "jaeger_storage_receiver" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
// Copyright (c) 2024 The Jaeger Authors. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package datareceivers | ||
yurishkuro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" | ||
"go.opentelemetry.io/collector/component/componenttest" | ||
"go.opentelemetry.io/collector/consumer" | ||
"go.opentelemetry.io/collector/receiver" | ||
"go.opentelemetry.io/collector/receiver/receivertest" | ||
) | ||
|
||
type kafkaDataReceiver struct { | ||
testbed.DataReceiverBase | ||
receiver receiver.Traces | ||
} | ||
|
||
func NewKafkaDataReceiver(port int) testbed.DataReceiver { | ||
return &kafkaDataReceiver{DataReceiverBase: testbed.DataReceiverBase{Port: port}} | ||
} | ||
|
||
func (dr *kafkaDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, _ consumer.Logs) error { | ||
factory := kafkareceiver.NewFactory() | ||
cfg := factory.CreateDefaultConfig().(*kafkareceiver.Config) | ||
cfg.Brokers = []string{fmt.Sprintf("localhost:%d", dr.Port)} | ||
cfg.GroupID = "testbed_collector" | ||
|
||
var err error | ||
set := receivertest.NewNopCreateSettings() | ||
dr.receiver, err = factory.CreateTracesReceiver(context.Background(), set, cfg, tc) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return dr.receiver.Start(context.Background(), componenttest.NewNopHost()) | ||
} | ||
|
||
func (dr *kafkaDataReceiver) Stop() error { | ||
return dr.receiver.Shutdown(context.Background()) | ||
} | ||
|
||
func (dr *kafkaDataReceiver) GenConfigYAMLStr() string { | ||
return fmt.Sprintf(` | ||
kafka: | ||
brokers: | ||
- localhost:%d | ||
encoding: otlp_proto`, dr.Port) | ||
} | ||
|
||
func (dr *kafkaDataReceiver) ProtocolName() string { | ||
return "kafka" | ||
} |
Uh oh!
There was an error while loading. Please reload this page.