-
Notifications
You must be signed in to change notification settings - Fork 10.3k
Expand file tree
/
Copy pathhead_read.go
More file actions
788 lines (686 loc) · 26.4 KB
/
head_read.go
File metadata and controls
788 lines (686 loc) · 26.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"context"
"errors"
"fmt"
"math"
"slices"
"sync"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
)
// headChunksBufMaxCap is the maximum capacity for the reusable headChunksBuf
// slice. If the buffer grows beyond this, it is released to avoid holding
// oversized backing arrays across many series iterations.
const headChunksBufMaxCap = 256
func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) {
return h.exemplars.ExemplarQuerier(ctx)
}
// Index returns an IndexReader against the block.
func (h *Head) Index() (IndexReader, error) {
return h.indexRange(math.MinInt64, math.MaxInt64), nil
}
func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
if hmin := h.MinTime(); hmin > mint {
mint = hmin
}
return &headIndexReader{head: h, mint: mint, maxt: maxt}
}
// headIndexReader provides index reading for the head block.
// Not safe for concurrent use from multiple goroutines.
type headIndexReader struct {
head *Head
mint, maxt int64
// Reusable buffer for collectHeadChunks inside appendSeriesChunks,
// avoiding a per-series allocation during iteration.
headChunksBuf []*memChunk
}
func (*headIndexReader) Close() error {
return nil
}
func (h *headIndexReader) Symbols() index.StringIter {
return h.head.postings.Symbols()
}
// SortedLabelValues returns label values present in the head for the
// specific label name that are within the time range mint to maxt.
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
func (h *headIndexReader) SortedLabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
values, err := h.LabelValues(ctx, name, hints, matchers...)
if err == nil {
slices.Sort(values)
}
return values, err
}
// LabelValues returns label values present in the head for the
// specific label name that are within the time range mint to maxt.
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
func (h *headIndexReader) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() {
return []string{}, nil
}
if len(matchers) == 0 {
return h.head.postings.LabelValues(ctx, name, hints), nil
}
return labelValuesWithMatchers(ctx, h, name, hints, matchers...)
}
// LabelNames returns all the unique label names present in the head
// that are within the time range mint to maxt.
func (h *headIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error) {
if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() {
return []string{}, nil
}
if len(matchers) == 0 {
labelNames := h.head.postings.LabelNames()
slices.Sort(labelNames)
return labelNames, nil
}
return labelNamesWithMatchers(ctx, h, matchers...)
}
// Postings returns the postings list iterator for the label pairs.
func (h *headIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
return h.head.postings.Postings(ctx, name, values...), nil
}
func (h *headIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
return h.head.postings.PostingsForLabelMatching(ctx, name, match)
}
func (h *headIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
return h.head.postings.PostingsForAllLabelValues(ctx, name)
}
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
series := make([]*memSeries, 0, 128)
notFoundSeriesCount := 0
// Fetch all the series only once.
for p.Next() {
s := h.head.series.getByID(chunks.HeadSeriesRef(p.At()))
if s == nil {
notFoundSeriesCount++
} else {
series = append(series, s)
}
}
if notFoundSeriesCount > 0 {
h.head.logger.Debug("Looked up series not found", "count", notFoundSeriesCount)
}
if err := p.Err(); err != nil {
return index.ErrPostings(fmt.Errorf("expand postings: %w", err))
}
slices.SortFunc(series, func(a, b *memSeries) int {
return labels.Compare(a.labels(), b.labels())
})
// Convert back to list.
ep := make([]storage.SeriesRef, 0, len(series))
for _, p := range series {
ep = append(ep, storage.SeriesRef(p.ref))
}
return index.NewListPostings(ep)
}
// ShardedPostings implements IndexReader. This function returns an failing postings list if sharding
// has not been enabled in the Head.
func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
if !h.head.opts.EnableSharding {
return index.ErrPostings(errors.New("sharding is disabled"))
}
out := make([]storage.SeriesRef, 0, 128)
notFoundSeriesCount := 0
for p.Next() {
s := h.head.series.getByID(chunks.HeadSeriesRef(p.At()))
if s == nil {
notFoundSeriesCount++
continue
}
// Check if the series belong to the shard.
if s.shardHash%shardCount != shardIndex {
continue
}
out = append(out, storage.SeriesRef(s.ref))
}
if notFoundSeriesCount > 0 {
h.head.logger.Debug("Looked up series not found", "count", notFoundSeriesCount)
}
return index.NewListPostings(out)
}
// Series returns the series for the given reference.
// Chunks are skipped if chks is nil.
func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
s := h.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
h.head.metrics.seriesNotFound.Inc()
return storage.ErrNotFound
}
builder.Assign(s.labels())
if chks == nil {
return nil
}
s.Lock()
defer s.Unlock()
*chks = (*chks)[:0]
*chks, h.headChunksBuf = appendSeriesChunks(s, h.mint, h.maxt, *chks, h.headChunksBuf)
if cap(h.headChunksBuf) > headChunksBufMaxCap {
h.headChunksBuf = nil
}
return nil
}
func (h *Head) staleIndex(mint, maxt int64, staleSeriesRefs []storage.SeriesRef) (*headStaleIndexReader, error) {
return &headStaleIndexReader{
headIndexReader: h.indexRange(mint, maxt),
staleSeriesRefs: staleSeriesRefs,
}, nil
}
// headStaleIndexReader gives the stale series that have no out-of-order data.
// This is only used for stale series compaction at the moment, that will only ask for all
// the series during compaction. So to make that efficient, this index reader requires the
// pre-calculated list of stale series refs that can be returned without re-reading the Head.
type headStaleIndexReader struct {
*headIndexReader
staleSeriesRefs []storage.SeriesRef
}
func (h *headStaleIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
// If all postings are requested, return the precalculated list.
k, v := index.AllPostingsKey()
if len(h.staleSeriesRefs) > 0 && name == k && len(values) == 1 && values[0] == v {
return index.NewListPostings(h.staleSeriesRefs), nil
}
seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.Postings(ctx, name, values...))
if err != nil {
return index.ErrPostings(err), err
}
return index.NewListPostings(seriesRefs), nil
}
func (h *headStaleIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
// Unused for compaction, so we don't need to optimise.
seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.PostingsForLabelMatching(ctx, name, match))
if err != nil {
return index.ErrPostings(err)
}
return index.NewListPostings(seriesRefs)
}
func (h *headStaleIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
// Unused for compaction, so we don't need to optimise.
seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.PostingsForAllLabelValues(ctx, name))
if err != nil {
return index.ErrPostings(err)
}
return index.NewListPostings(seriesRefs)
}
// filterStaleSeriesAndSortPostings returns the stale series references from the given postings
// that also do not have any out-of-order data.
func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.SeriesRef, error) {
series := make([]*memSeries, 0, 1024)
notFoundSeriesCount := 0
for p.Next() {
s := h.series.getByID(chunks.HeadSeriesRef(p.At()))
if s == nil {
notFoundSeriesCount++
continue
}
s.Lock()
if s.ooo != nil {
// Has out-of-order data; skip it because we cannot determine if a series
// is stale when it's getting out-of-order data.
s.Unlock()
continue
}
if value.IsStaleNaN(s.lastValue) ||
(s.lastHistogramValue != nil && value.IsStaleNaN(s.lastHistogramValue.Sum)) ||
(s.lastFloatHistogramValue != nil && value.IsStaleNaN(s.lastFloatHistogramValue.Sum)) {
series = append(series, s)
}
s.Unlock()
}
if notFoundSeriesCount > 0 {
h.logger.Debug("Looked up stale series not found", "count", notFoundSeriesCount)
}
if err := p.Err(); err != nil {
return nil, fmt.Errorf("expand postings: %w", err)
}
slices.SortFunc(series, func(a, b *memSeries) int {
return labels.Compare(a.labels(), b.labels())
})
refs := make([]storage.SeriesRef, 0, len(series))
for _, p := range series {
refs = append(refs, storage.SeriesRef(p.ref))
}
return refs, nil
}
// SortedPostings returns the postings as it is because we expect any postings obtained via
// headStaleIndexReader to be already sorted.
func (*headStaleIndexReader) SortedPostings(p index.Postings) index.Postings {
// All the postings function above already give the sorted list of postings.
return p
}
// SortedStaleSeriesRefsNoOOOData returns all the series refs of the stale series that do not have any out-of-order data.
func (h *Head) SortedStaleSeriesRefsNoOOOData(ctx context.Context) ([]storage.SeriesRef, error) {
k, v := index.AllPostingsKey()
return h.filterStaleSeriesAndSortPostings(h.postings.Postings(ctx, k, v))
}
// appendSeriesChunks appends chunk metadata for s to chks.
// headChunksBuf is a reusable buffer for collectHeadChunks; the (possibly grown) buffer is returned
// so callers can pass it back on the next call to avoid per-series allocations.
func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta, headChunksBuf []*memChunk) ([]chunks.Meta, []*memChunk) {
for i, c := range s.mmappedChunks {
// Do not expose chunks that are outside of the specified range.
if !c.OverlapsClosedInterval(mint, maxt) {
continue
}
chks = append(chks, chunks.Meta{
MinTime: c.minTime,
MaxTime: c.maxTime,
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))),
})
}
if s.headChunks == nil {
return chks, headChunksBuf
}
// Fast path: single head chunk — no allocation, no linked-list walk.
if s.headChunks.prev == nil {
if s.headChunks.OverlapsClosedInterval(mint, maxt) {
chks = append(chks, chunks.Meta{
MinTime: s.headChunks.minTime,
MaxTime: math.MaxInt64,
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)))),
})
}
return chks, headChunksBuf
}
// Multiple head chunks: collect once O(N), iterate O(N).
headChunksBuf = collectHeadChunks(s.headChunks, headChunksBuf[:0])
clear(headChunksBuf[len(headChunksBuf):cap(headChunksBuf)])
for i, chk := range headChunksBuf {
maxTime := chk.maxTime
if i == len(headChunksBuf)-1 {
maxTime = math.MaxInt64 // Open (newest) chunk.
}
if chk.OverlapsClosedInterval(mint, maxt) {
chks = append(chks, chunks.Meta{
MinTime: chk.minTime,
MaxTime: maxTime,
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+i))),
})
}
}
return chks, headChunksBuf
}
// headChunkID returns the HeadChunkID referred to by the given position.
// * 0 <= pos < len(s.mmappedChunks) refer to s.mmappedChunks[pos]
// * pos >= len(s.mmappedChunks) refers to s.headChunks linked list.
func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID {
return chunks.HeadChunkID(pos) + s.firstChunkID
}
const oooChunkIDMask = 1 << 23
// oooHeadChunkID returns the HeadChunkID referred to by the given position.
// Only the bottom 24 bits are used. Bit 23 is always 1 for an OOO chunk; for the rest:
// * 0 <= pos < len(s.oooMmappedChunks) refer to s.oooMmappedChunks[pos]
// * pos == len(s.oooMmappedChunks) refers to s.oooHeadChunk
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID {
return (chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID) | oooChunkIDMask
}
func unpackHeadChunkRef(ref chunks.ChunkRef) (seriesID chunks.HeadSeriesRef, chunkID chunks.HeadChunkID, isOOO bool) {
sid, cid := chunks.HeadChunkRef(ref).Unpack()
return sid, (cid & (oooChunkIDMask - 1)), (cid & oooChunkIDMask) != 0
}
// LabelNamesFor returns all the label names for the series referred to by the postings.
// The names returned are sorted.
func (h *headIndexReader) LabelNamesFor(ctx context.Context, series index.Postings) ([]string, error) {
namesMap := make(map[string]struct{})
i := 0
for series.Next() {
i++
if i%checkContextEveryNIterations == 0 && ctx.Err() != nil {
return nil, ctx.Err()
}
memSeries := h.head.series.getByID(chunks.HeadSeriesRef(series.At()))
if memSeries == nil {
// Series not found, this happens during compaction,
// when series was garbage collected after the caller got the series IDs.
continue
}
memSeries.labels().Range(func(lbl labels.Label) {
namesMap[lbl.Name] = struct{}{}
})
}
if err := series.Err(); err != nil {
return nil, err
}
names := make([]string, 0, len(namesMap))
for name := range namesMap {
names = append(names, name)
}
slices.Sort(names)
return names, nil
}
// Chunks returns a ChunkReader against the block.
func (h *Head) Chunks() (ChunkReader, error) {
return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State(math.MinInt64, math.MaxInt64))
}
func (h *Head) chunksRange(mint, maxt int64, is *isolationState) (*headChunkReader, error) {
h.closedMtx.Lock()
defer h.closedMtx.Unlock()
if h.closed {
return nil, errors.New("can't read from a closed head")
}
if hmin := h.MinTime(); hmin > mint {
mint = hmin
}
return &headChunkReader{
head: h,
mint: mint,
maxt: maxt,
isoState: is,
}, nil
}
// headChunkReader provides chunk reading for the head block.
// Not safe for concurrent use from multiple goroutines.
type headChunkReader struct {
head *Head
mint, maxt int64
isoState *isolationState
// When true, enables the head-chunks cache. Range queries benefit from
// caching because they look up every chunk of a series; instant queries
// only need one chunk per series, so the cache is wasted overhead.
enableCache bool
// Cache for head chunks — avoids O(n²) linked-list walks when
// iterating all chunks of a series oldest-to-newest.
cachedSeriesRef storage.SeriesRef
cachedHeadChunks []*memChunk
cachedHeadChunksHead *memChunk // Head pointer at collection time; detects head-chunk replacement.
cachedMmapLen int // len(s.mmappedChunks) at collection time; detects mmap events.
}
func (h *headChunkReader) Close() error {
if h.isoState != nil {
h.isoState.Close()
}
return nil
}
func (h *headChunkReader) getOrCollectHeadChunks(s *memSeries) []*memChunk {
// Skip if the cache is disabled (instant queries) or there are no head chunks or there's only one.
if !h.enableCache || s.headChunks == nil || s.headChunks.prev == nil {
return nil
}
ref := storage.SeriesRef(s.ref)
if ref == h.cachedSeriesRef && s.headChunks == h.cachedHeadChunksHead && h.cachedMmapLen == len(s.mmappedChunks) {
return h.cachedHeadChunks
}
var buf []*memChunk
if h.cachedHeadChunks != nil {
buf = h.cachedHeadChunks[:0]
}
h.cachedHeadChunks = collectHeadChunks(s.headChunks, buf)
// Allow GC of *memChunk pointers left over from a previous, longer collection.
clear(h.cachedHeadChunks[len(h.cachedHeadChunks):cap(h.cachedHeadChunks)])
h.cachedSeriesRef = ref
h.cachedHeadChunksHead = s.headChunks
h.cachedMmapLen = len(s.mmappedChunks)
return h.cachedHeadChunks
}
// ChunkOrIterable returns the chunk for the reference number.
func (h *headChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
chk, _, err := h.chunk(meta, false)
return chk, nil, err
}
type ChunkReaderWithCopy interface {
ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error)
}
// ChunkOrIterableWithCopy returns the chunk for the reference number.
// If the chunk is the in-memory chunk, then it makes a copy and returns the copied chunk, plus the max time of the chunk.
func (h *headChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
chk, maxTime, err := h.chunk(meta, true)
return chk, nil, maxTime, err
}
// chunk returns the chunk for the reference number.
// If copyLastChunk is true, then it makes a copy of the head chunk if asked for it.
// Also returns max time of the chunk.
func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.Chunk, int64, error) {
sid, cid, isOOO := unpackHeadChunkRef(meta.Ref)
s := h.head.series.getByID(sid)
// This means that the series has been garbage collected.
if s == nil {
return nil, 0, storage.ErrNotFound
}
s.Lock()
defer s.Unlock()
var headChunks []*memChunk
if !isOOO {
headChunks = h.getOrCollectHeadChunks(s)
}
return h.head.chunkFromSeries(s, cid, isOOO, h.mint, h.maxt, h.isoState, copyLastChunk, headChunks)
}
// Dumb thing to defeat chunk pool.
type wrapOOOHeadChunk struct {
chunkenc.Chunk
}
// Call with s locked.
func (h *Head) chunkFromSeries(s *memSeries, cid chunks.HeadChunkID, isOOO bool, mint, maxt int64, isoState *isolationState, copyLastChunk bool, headChunks []*memChunk) (chunkenc.Chunk, int64, error) {
if isOOO {
chk, maxTime, err := s.oooChunk(cid, h.chunkDiskMapper, &h.memChunkPool)
return wrapOOOHeadChunk{chk}, maxTime, err
}
c, headChunk, isOpen, err := s.chunk(cid, h.chunkDiskMapper, &h.memChunkPool, headChunks)
if err != nil {
return nil, 0, err
}
defer func() {
if !headChunk {
// Set this to nil so that Go GC can collect it after it has been used.
c.chunk = nil
c.prev = nil
h.memChunkPool.Put(c)
}
}()
// This means that the chunk is outside the specified range.
if !c.OverlapsClosedInterval(mint, maxt) {
return nil, 0, storage.ErrNotFound
}
chk, maxTime := c.chunk, c.maxTime
if headChunk && isOpen && copyLastChunk {
// The caller may ask to copy the head chunk in order to take the
// bytes of the chunk without causing the race between read and append.
b := s.headChunks.chunk.Bytes()
newB := make([]byte, len(b))
copy(newB, b) // TODO(codesome): Use bytes.Clone() when we upgrade to Go 1.20.
// TODO(codesome): Put back in the pool (non-trivial).
chk, err = h.opts.ChunkPool.Get(s.headChunks.chunk.Encoding(), newB)
if err != nil {
return nil, 0, err
}
}
return &safeHeadChunk{
Chunk: chk,
s: s,
cid: cid,
isoState: isoState,
}, maxTime, nil
}
// chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk.
// If headChunk is false, it means that the returned *memChunk
// (and not the chunkenc.Chunk inside it) can be garbage collected after its usage.
// if isOpen is true, it means that the returned *memChunk is used for appends.
func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool, headChunks []*memChunk) (chunk *memChunk, headChunk, isOpen bool, err error) {
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are
// incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index.
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
// is >= len(s.mmappedChunks), it represents one of the chunks on s.headChunks linked list.
// The order of elements is different for slice and linked list.
// For s.mmappedChunks slice newer chunks are appended to it.
// For s.headChunks list newer chunks are prepended to it.
//
// memSeries {
// mmappedChunks: [t0, t1, t2]
// headChunk: {t5}->{t4}->{t3}
// }
ix := int(id) - int(s.firstChunkID)
var headChunksLen int
if headChunks != nil {
headChunksLen = len(headChunks)
} else if s.headChunks != nil {
headChunksLen = s.headChunks.len()
}
if ix < 0 || ix > len(s.mmappedChunks)+headChunksLen-1 {
return nil, false, false, storage.ErrNotFound
}
if ix < len(s.mmappedChunks) {
chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref)
if err != nil {
var cerr *chunks.CorruptionErr
if errors.As(err, &cerr) {
panic(err)
}
return nil, false, false, err
}
mc := memChunkPool.Get().(*memChunk)
mc.chunk = chk
mc.minTime = s.mmappedChunks[ix].minTime
mc.maxTime = s.mmappedChunks[ix].maxTime
return mc, false, false, nil
}
// Head chunk lookup.
ix -= len(s.mmappedChunks)
// Fast path: use pre-collected slice for O(1) indexed lookup.
if headChunks != nil {
if ix >= len(headChunks) {
return nil, false, false, storage.ErrNotFound
}
return headChunks[ix], true, ix == len(headChunks)-1, nil
}
// Fallback: walk the linked list.
offset := headChunksLen - ix - 1
// headChunks is a linked list where first element is the most recent one and the last one is the oldest.
// This order is reversed when compared with mmappedChunks, since mmappedChunks[0] is the oldest chunk,
// while headChunk.atOffset(0) would give us the most recent chunk.
// So when calling headChunk.atOffset() we need to reverse the value of ix.
elem := s.headChunks.atOffset(offset)
if elem == nil {
// This should never really happen and would mean that headChunksLen value is NOT equal
// to the length of the headChunks list.
return nil, false, false, storage.ErrNotFound
}
return elem, true, offset == 0, nil
}
// oooChunk returns the chunk for the HeadChunkID by m-mapping it from the disk.
// It never returns the head OOO chunk.
func (s *memSeries) oooChunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, _ *sync.Pool) (chunk chunkenc.Chunk, maxTime int64, err error) {
// ix represents the index of chunk in the s.ooo.oooMmappedChunks slice. The chunk id's are
// incremented by 1 when new chunk is created, hence (id - firstOOOChunkID) gives the slice index.
ix := int(id) - int(s.ooo.firstOOOChunkID)
if ix < 0 || ix >= len(s.ooo.oooMmappedChunks) {
return nil, 0, storage.ErrNotFound
}
chk, err := chunkDiskMapper.Chunk(s.ooo.oooMmappedChunks[ix].ref)
return chk, s.ooo.oooMmappedChunks[ix].maxTime, err
}
// safeHeadChunk makes sure that the chunk can be accessed without a race condition.
type safeHeadChunk struct {
chunkenc.Chunk
s *memSeries
cid chunks.HeadChunkID
isoState *isolationState
}
func (c *safeHeadChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
c.s.Lock()
it := c.s.iterator(c.cid, c.Chunk, c.isoState, reuseIter)
c.s.Unlock()
return it
}
// iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range.
// It is unsafe to call this concurrently with s.append(...) without holding the series lock.
func (s *memSeries) iterator(id chunks.HeadChunkID, c chunkenc.Chunk, isoState *isolationState, it chunkenc.Iterator) chunkenc.Iterator {
ix := int(id) - int(s.firstChunkID)
numSamples := c.NumSamples()
stopAfter := numSamples
if isoState != nil && !isoState.IsolationDisabled() {
totalSamples := 0 // Total samples in this series.
previousSamples := 0 // Samples before this chunk.
for j, d := range s.mmappedChunks {
totalSamples += int(d.numSamples)
if j < ix {
previousSamples += int(d.numSamples)
}
}
ix -= len(s.mmappedChunks)
if s.headChunks != nil {
// Iterate all head chunks from the oldest to the newest.
headChunksLen := s.headChunks.len()
for j := headChunksLen - 1; j >= 0; j-- {
chk := s.headChunks.atOffset(j)
chkSamples := chk.chunk.NumSamples()
totalSamples += chkSamples
// Chunk ID is len(s.mmappedChunks) + $(headChunks list position).
// Where $(headChunks list position) is zero for the oldest chunk and $(s.headChunks.len() - 1)
// for the newest (open) chunk.
if headChunksLen-1-j < ix {
previousSamples += chkSamples
}
}
}
// Removing the extra transactionIDs that are relevant for samples that
// come after this chunk, from the total transactionIDs.
appendIDsToConsider := int(s.txs.txIDCount) - (totalSamples - (previousSamples + numSamples))
// Iterate over the appendIDs, find the first one that the isolation state says not
// to return.
it := s.txs.iterator()
for index := range appendIDsToConsider {
appendID := it.At()
if appendID <= isoState.maxAppendID { // Easy check first.
if _, ok := isoState.incompleteAppends[appendID]; !ok {
it.Next()
continue
}
}
// Stopped in a previous chunk.
stopAfter = max(numSamples-(appendIDsToConsider-index), 0)
break
}
}
if stopAfter == 0 {
return chunkenc.NewNopIterator()
}
if stopAfter == numSamples {
return c.Iterator(it)
}
return makeStopIterator(c, it, stopAfter)
}
// stopIterator wraps an Iterator, but only returns the first
// stopAfter values, if initialized with i=-1.
type stopIterator struct {
chunkenc.Iterator
i, stopAfter int
}
func (it *stopIterator) Next() chunkenc.ValueType {
if it.i+1 >= it.stopAfter {
return chunkenc.ValNone
}
it.i++
return it.Iterator.Next()
}
func makeStopIterator(c chunkenc.Chunk, it chunkenc.Iterator, stopAfter int) chunkenc.Iterator {
// Re-use the Iterator object if it is a stopIterator.
if stopIter, ok := it.(*stopIterator); ok {
stopIter.Iterator = c.Iterator(stopIter.Iterator)
stopIter.i = -1
stopIter.stopAfter = stopAfter
return stopIter
}
return &stopIterator{
Iterator: c.Iterator(it),
i: -1,
stopAfter: stopAfter,
}
}