-
Notifications
You must be signed in to change notification settings - Fork 9.8k
[PRW 2.0] (part X) generalize remote write logic for DRY/maintainability #14338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Spec: prometheus/docs#2462 Supersedes #13968 Signed-off-by: bwplotka <bwplotka@gmail.com>
…ity. Signed-off-by: bwplotka <bwplotka@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some basic comments just from trying to grok the changes. I also started trying to fix tests locally but there's some bigger changes needed there as well to ensure we still have proper coverage after the removal of the direct buildWriteRequest
functions.
It looks like some tests also just don't complete or get incorrect responses, such as TestBasicContentNegotiation
} | ||
if sendNativeHistograms { | ||
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0] | ||
// protoTimeSeriesQueue is a generic queue for both v1 and v2 Remote Write |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// protoTimeSeriesQueue is a generic queue for both v1 and v2 Remote Write | |
// protoTimeSeriesBuffer is a generic queue for both v1 and v2 Remote Write |
if protoMsg == config.RemoteWriteProtoMsgV1 { | ||
ret.v1 = make([]prompb.TimeSeries, max) | ||
for i := range ret.v1 { | ||
// NOTO(bwplotka): Why empty one-elem samples and exemplar? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because we didn't ever attempt to insert the exemplar into the TS it is associated with, so in theory any TimeSeries
could be a sample or exemplar
} | ||
|
||
func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exemplarCount, histogramCount, metadataCount int, duration time.Duration) { | ||
func (p *protoTimeSeriesBuffer) FilterOutTooOldSamples(logger log.Logger, metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) (highest, lowest int64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (p *protoTimeSeriesBuffer) FilterOutTooOldSamples(logger log.Logger, metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) (highest, lowest int64) { | |
func (p *protoTimeSeriesBuffer) FilterOldSamples(logger log.Logger, |
we might also want a comment
// Filter samples older than sampleAgeLimit, allows for quicker catching up when recovering.
for i := range pendingDataV2 { | ||
pendingDataV2[i].Samples = []writev2.Sample{{}} | ||
} | ||
pendingSeries := newProtoTimeSeriesBuffer(s.qm.protoMsg, max, s.qm.sendExemplars, s.qm.sendNativeHistograms) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we call the var pendingProtoSeries
to make it clearer this will already be serialized as protobuf by the time we get to sendSamplesWithBackoff
? I had to trace back through the call chain to confirm that as it is atm
func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) error { | ||
// Build the WriteRequest with no metadata. | ||
req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, buf, nil, enc) | ||
func (s *shards) sendSamplesWithBackoff(ctx context.Context, series *protoTimeSeriesBuffer, enc Compression) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe both send samples functions should have name changes to sendRequest
?
// TODO(bwplotka): This does not count dropped samples in the filter above. | ||
// Is this on purpose? Given drop samples metric? | ||
attribute.Int("samples", series.nPendingSamples), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not that I know of, I think this got missed during the review for the change that added the filtering functionality
func v2WriteRequestToWriteRequest(reqV2 *writev2.Request) (*prompb.WriteRequest, error) { | ||
req := &prompb.WriteRequest{ | ||
Timeseries: make([]prompb.TimeSeries, len(reqV2.Timeseries)), | ||
// TODO handle metadata? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably we should, though we'll lose some granularity during the conversion
the metricmetadata
only differentiates on metric family name, so either first seen or last seen set of metadata for a metric family name wins
I will make sure comments apply to further PRs, but I switched tactic a bit (split), so this has to be recreated on top of #14347 |
This is chained on top of #14329