Skip to content

Commit 1f496ad

Browse files
add manual checkpoint parameter to consume DSM API (#6190)
* made manual checkpoint configurable * add comments * lint * rewrote tests
1 parent 588b852 commit 1f496ad

File tree

2 files changed

+45
-2
lines changed

2 files changed

+45
-2
lines changed

packages/dd-trace/src/datastreams/checkpointer.js

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ class DataStreamsCheckpointer {
99
this.dsmProcessor = tracer._dataStreamsProcessor
1010
}
1111

12+
/**
13+
* @param {string} type - The type of the checkpoint, usually the streaming technology being used.
14+
* Examples include kafka, kinesis, sns etc.
15+
* @param {string} target - The target of data. This can be a topic, exchange or stream name.
16+
* @param {Object} carrier - The carrier object to inject context into.
17+
*/
1218
setProduceCheckpoint (type, target, carrier) {
1319
if (!this.config.dsmEnabled) return
1420

@@ -23,14 +29,29 @@ class DataStreamsCheckpointer {
2329
this.tracer.inject(ctx, 'text_map_dsm', carrier)
2430
}
2531

26-
setConsumeCheckpoint (type, source, carrier) {
32+
/**
33+
* @param {string} type - The type of the checkpoint, usually the streaming technology being used.
34+
* Examples include kafka, kinesis, sns etc.
35+
* @param {string} source - The source of data. This can be a topic, exchange or stream name.
36+
* @param {Object} carrier - The carrier object to extract context from.
37+
* @param {boolean} [manualCheckpoint=true] - Whether this checkpoint was manually set. Keep true if manually
38+
* instrumenting. Manual instrumentation always overrides automatic
39+
* instrumentation in the case a call is both manually and automatically
40+
* instrumented.
41+
*/
42+
setConsumeCheckpoint (type, source, carrier, manualCheckpoint = true) {
2743
if (!this.config.dsmEnabled) return
2844

2945
const parentCtx = this.tracer.extract('text_map_dsm', carrier)
3046
DataStreamsContext.setDataStreamsContext(parentCtx)
3147

48+
const tags = ['type:' + type, 'topic:' + source, 'direction:in']
49+
if (manualCheckpoint) {
50+
tags.push('manual_checkpoint:true')
51+
}
52+
3253
const ctx = this.dsmProcessor.setCheckpoint(
33-
['type:' + type, 'topic:' + source, 'direction:in', 'manual_checkpoint:true'],
54+
tags,
3455
null,
3556
parentCtx,
3657
null

packages/dd-trace/test/datastreams/data_streams_checkpointer.spec.js

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,26 @@ describe('data streams checkpointer manual api', () => {
7070

7171
expect(DSM_CONTEXT_HEADER in headers).to.equal(true)
7272
})
73+
74+
it('should set manual checkpoint when setConsumeCheckpoint is called without additional parameters', function () {
75+
const headers = {}
76+
const mockSetCheckpoint = sinon.stub().returns({ hash: Buffer.from([1, 2, 3, 4]) })
77+
78+
tracer._tracer._dataStreamsProcessor.setCheckpoint = mockSetCheckpoint
79+
80+
tracer.dataStreamsCheckpointer.setConsumeCheckpoint('kinesis', 'stream-123', headers)
81+
const calledTags = mockSetCheckpoint.getCall(0).args[0]
82+
expect(calledTags).to.include('manual_checkpoint:true')
83+
})
84+
85+
it('should set an automatic checkpoint when setConsumeCheckpoint is called with manualCheckpoint:false', function () {
86+
const headers = {}
87+
const mockSetCheckpoint = sinon.stub().returns({ hash: Buffer.from([1, 2, 3, 4]) })
88+
89+
tracer._tracer._dataStreamsProcessor.setCheckpoint = mockSetCheckpoint
90+
91+
tracer.dataStreamsCheckpointer.setConsumeCheckpoint('kinesis', 'stream-123', headers, false)
92+
const calledTags = mockSetCheckpoint.getCall(0).args[0]
93+
expect(calledTags).to.not.include('manual_checkpoint:true')
94+
})
7395
})

0 commit comments

Comments
 (0)