Skip to content

[PRW 2.0] (part X) generalize remote write logic for DRY/maintainability#14338

Closed
bwplotka wants to merge 2 commits intoremote-write-2.0from
rw20-gen
Closed

[PRW 2.0] (part X) generalize remote write logic for DRY/maintainability#14338
bwplotka wants to merge 2 commits intoremote-write-2.0from
rw20-gen

Conversation

@bwplotka
Copy link
Copy Markdown
Member

This is chained on top of #14329

@bwplotka bwplotka changed the title [PRW 2.0] (part 3) generalize remote write logic for DRY/maintainability [PRW 2.0] (part X) generalize remote write logic for DRY/maintainability Jun 25, 2024
Copy link
Copy Markdown
Member

@cstyan cstyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some basic comments just from trying to grok the changes. I also started trying to fix tests locally but there's some bigger changes needed there as well to ensure we still have proper coverage after the removal of the direct buildWriteRequest functions.

It looks like some tests also just don't complete or get incorrect responses, such as TestBasicContentNegotiation

}
if sendNativeHistograms {
pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0]
// protoTimeSeriesQueue is a generic queue for both v1 and v2 Remote Write
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// protoTimeSeriesQueue is a generic queue for both v1 and v2 Remote Write
// protoTimeSeriesBuffer is a generic queue for both v1 and v2 Remote Write

if protoMsg == config.RemoteWriteProtoMsgV1 {
ret.v1 = make([]prompb.TimeSeries, max)
for i := range ret.v1 {
// NOTO(bwplotka): Why empty one-elem samples and exemplar?
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we didn't ever attempt to insert the exemplar into the TS it is associated with, so in theory any TimeSeries could be a sample or exemplar

}

func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exemplarCount, histogramCount, metadataCount int, duration time.Duration) {
func (p *protoTimeSeriesBuffer) FilterOutTooOldSamples(logger log.Logger, metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) (highest, lowest int64) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (p *protoTimeSeriesBuffer) FilterOutTooOldSamples(logger log.Logger, metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) (highest, lowest int64) {
func (p *protoTimeSeriesBuffer) FilterOldSamples(logger log.Logger,

we might also want a comment

// Filter samples older than sampleAgeLimit, allows for quicker catching up when recovering.

for i := range pendingDataV2 {
pendingDataV2[i].Samples = []writev2.Sample{{}}
}
pendingSeries := newProtoTimeSeriesBuffer(s.qm.protoMsg, max, s.qm.sendExemplars, s.qm.sendNativeHistograms)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call the var pendingProtoSeries to make it clearer this will already be serialized as protobuf by the time we get to sendSamplesWithBackoff? I had to trace back through the call chain to confirm that as it is atm

func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) error {
// Build the WriteRequest with no metadata.
req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, buf, nil, enc)
func (s *shards) sendSamplesWithBackoff(ctx context.Context, series *protoTimeSeriesBuffer, enc Compression) error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe both send samples functions should have name changes to sendRequest?

Comment on lines +1836 to +1838
// TODO(bwplotka): This does not count dropped samples in the filter above.
// Is this on purpose? Given drop samples metric?
attribute.Int("samples", series.nPendingSamples),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not that I know of, I think this got missed during the review for the change that added the filtering functionality

func v2WriteRequestToWriteRequest(reqV2 *writev2.Request) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, len(reqV2.Timeseries)),
// TODO handle metadata?
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably we should, though we'll lose some granularity during the conversion

the metricmetadata only differentiates on metric family name, so either first seen or last seen set of metadata for a metric family name wins

@bwplotka
Copy link
Copy Markdown
Member Author

I will make sure comments apply to further PRs, but I switched tactic a bit (split), so this has to be recreated on top of #14347

@bwplotka bwplotka closed this Jun 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants