[PRW 2.0] (part X) generalize remote write logic for DRY/maintainability#14338
[PRW 2.0] (part X) generalize remote write logic for DRY/maintainability#14338bwplotka wants to merge 2 commits intoremote-write-2.0from
Conversation
Spec: prometheus/docs#2462 Supersedes #13968 Signed-off-by: bwplotka <[email protected]>
…ity. Signed-off-by: bwplotka <[email protected]>
cstyan
left a comment
There was a problem hiding this comment.
Some basic comments just from trying to grok the changes. I also started trying to fix tests locally but there's some bigger changes needed there as well to ensure we still have proper coverage after the removal of the direct buildWriteRequest functions.
It looks like some tests also just don't complete or get incorrect responses, such as TestBasicContentNegotiation
| } | ||
| if sendNativeHistograms { | ||
| pendingData[nPending].Histograms = pendingData[nPending].Histograms[:0] | ||
| // protoTimeSeriesQueue is a generic queue for both v1 and v2 Remote Write |
There was a problem hiding this comment.
| // protoTimeSeriesQueue is a generic queue for both v1 and v2 Remote Write | |
| // protoTimeSeriesBuffer is a generic queue for both v1 and v2 Remote Write |
| if protoMsg == config.RemoteWriteProtoMsgV1 { | ||
| ret.v1 = make([]prompb.TimeSeries, max) | ||
| for i := range ret.v1 { | ||
| // NOTO(bwplotka): Why empty one-elem samples and exemplar? |
There was a problem hiding this comment.
because we didn't ever attempt to insert the exemplar into the TS it is associated with, so in theory any TimeSeries could be a sample or exemplar
| } | ||
|
|
||
| func (s *shards) updateMetrics(_ context.Context, err error, sampleCount, exemplarCount, histogramCount, metadataCount int, duration time.Duration) { | ||
| func (p *protoTimeSeriesBuffer) FilterOutTooOldSamples(logger log.Logger, metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) (highest, lowest int64) { |
There was a problem hiding this comment.
| func (p *protoTimeSeriesBuffer) FilterOutTooOldSamples(logger log.Logger, metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) (highest, lowest int64) { | |
| func (p *protoTimeSeriesBuffer) FilterOldSamples(logger log.Logger, |
we might also want a comment
// Filter samples older than sampleAgeLimit, allows for quicker catching up when recovering.
| for i := range pendingDataV2 { | ||
| pendingDataV2[i].Samples = []writev2.Sample{{}} | ||
| } | ||
| pendingSeries := newProtoTimeSeriesBuffer(s.qm.protoMsg, max, s.qm.sendExemplars, s.qm.sendNativeHistograms) |
There was a problem hiding this comment.
can we call the var pendingProtoSeries to make it clearer this will already be serialized as protobuf by the time we get to sendSamplesWithBackoff? I had to trace back through the call chain to confirm that as it is atm
| func (s *shards) sendV2SamplesWithBackoff(ctx context.Context, samples []writev2.TimeSeries, labels []string, sampleCount, exemplarCount, histogramCount, metadataCount int, pBuf, buf *[]byte, enc Compression) error { | ||
| // Build the WriteRequest with no metadata. | ||
| req, highest, lowest, err := buildV2WriteRequest(s.qm.logger, samples, labels, pBuf, buf, nil, enc) | ||
| func (s *shards) sendSamplesWithBackoff(ctx context.Context, series *protoTimeSeriesBuffer, enc Compression) error { |
There was a problem hiding this comment.
maybe both send samples functions should have name changes to sendRequest?
| // TODO(bwplotka): This does not count dropped samples in the filter above. | ||
| // Is this on purpose? Given drop samples metric? | ||
| attribute.Int("samples", series.nPendingSamples), |
There was a problem hiding this comment.
not that I know of, I think this got missed during the review for the change that added the filtering functionality
| func v2WriteRequestToWriteRequest(reqV2 *writev2.Request) (*prompb.WriteRequest, error) { | ||
| req := &prompb.WriteRequest{ | ||
| Timeseries: make([]prompb.TimeSeries, len(reqV2.Timeseries)), | ||
| // TODO handle metadata? |
There was a problem hiding this comment.
probably we should, though we'll lose some granularity during the conversion
the metricmetadata only differentiates on metric family name, so either first seen or last seen set of metadata for a metric family name wins
|
I will make sure comments apply to further PRs, but I switched tactic a bit (split), so this has to be recreated on top of #14347 |
This is chained on top of #14329