Skip to content

Conversation

bwplotka
Copy link
Member

This is chained on top of #14329

bwplotka added 2 commits June 24, 2024 09:20
Spec: prometheus/docs#2462

Supersedes #13968

Signed-off-by: bwplotka <bwplotka@gmail.com>
…ity.

Signed-off-by: bwplotka <bwplotka@gmail.com>
@bwplotka bwplotka changed the title [PRW 2.0] (part 3) generalize remote write logic for DRY/maintainability [PRW 2.0] (part X) generalize remote write logic for DRY/maintainability Jun 25, 2024
Copy link
Member

@cstyan cstyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some basic comments just from trying to grok the changes. I also started trying to fix tests locally but there's some bigger changes needed there as well to ensure we still have proper coverage after the removal of the direct buildWriteRequest functions.

It looks like some tests also just don't complete or get incorrect responses, such as TestBasicContentNegotiation

}
if sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
// protoTimeSeriesQueue is a generic queue for both v1 and v2 Remote Write
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// protoTimeSeriesQueue is a generic queue for both v1 and v2 Remote Write
// protoTimeSeriesBuffer is a generic queue for both v1 and v2 Remote Write

if protoMsg == config.RemoteWriteProtoMsgV1 {
ret.v1 = make([]prompb.TimeSeries, max)
for i := range ret.v1 {
// NOTO(bwplotka): Why empty one-elem samples and exemplar?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we didn't ever attempt to insert the exemplar into the TS it is associated with, so in theory any TimeSeries could be a sample or exemplar

}

func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exemplarCount, histogramCount, metadataCount int, duration time.Duration) {
func (p *protoTimeSeriesBuffer) FilterOutTooOldSamples(logger log.Logger, metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) (highest, lowest int64) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (p *protoTimeSeriesBuffer) FilterOutTooOldSamples(logger log.Logger, metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) (highest, lowest int64) {
func (p *protoTimeSeriesBuffer) FilterOldSamples(logger log.Logger,

we might also want a comment

// Filter samples older than sampleAgeLimit, allows for quicker catching up when recovering.

for i := range pendingDataV2 {
pendingDataV2[i].Samples = []writev2.Sample{{}}
}
pendingSeries := newProtoTimeSeriesBuffer(s.qm.protoMsg, max, s.qm.sendExemplars, s.qm.sendNativeHistograms)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call the var pendingProtoSeries to make it clearer this will already be serialized as protobuf by the time we get to sendSamplesWithBackoff? I had to trace back through the call chain to confirm that as it is atm

func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) error {
// Build the WriteRequest with no metadata.
req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, buf, nil, enc)
func (s *shards) sendSamplesWithBackoff(ctx context.Context, series *protoTimeSeriesBuffer, enc Compression) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe both send samples functions should have name changes to sendRequest?

Comment on lines +1836 to +1838
// TODO(bwplotka): This does not count dropped samples in the filter above.
// Is this on purpose? Given drop samples metric?
attribute.Int("samples", series.nPendingSamples),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not that I know of, I think this got missed during the review for the change that added the filtering functionality

func v2WriteRequestToWriteRequest(reqV2 *writev2.Request) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, len(reqV2.Timeseries)),
// TODO handle metadata?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably we should, though we'll lose some granularity during the conversion

the metricmetadata only differentiates on metric family name, so either first seen or last seen set of metadata for a metric family name wins

@bwplotka
Copy link
Member Author

I will make sure comments apply to further PRs, but I switched tactic a bit (split), so this has to be recreated on top of #14347

@bwplotka bwplotka closed this Jun 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants