-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathcorrectable.go
More file actions
137 lines (121 loc) · 3.41 KB
/
correctable.go
File metadata and controls
137 lines (121 loc) · 3.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package gorums
import (
"sync"
)
// LevelNotSet is the zero value level used to indicate that no level (and
// thereby no reply) has been set for a correctable quorum call.
const LevelNotSet = -1
type watcher struct {
level int
ch chan struct{}
}
// Correctable is a generic type for correctable quorum calls.
// It encapsulates the state of a correctable call and provides methods
// for checking the status or waiting for completion at specific levels.
//
// Type parameter Resp is the response type from nodes.
type Correctable[Resp any] struct {
mu sync.Mutex
reply Resp
level int
err error
done bool
watchers []*watcher
donech chan struct{}
}
// NewCorrectable creates a new Correctable object.
func NewCorrectable[Resp any]() *Correctable[Resp] {
return &Correctable[Resp]{
level: LevelNotSet,
donech: make(chan struct{}, 1),
}
}
// Get returns the latest response, the current level, and the last error.
func (c *Correctable[Resp]) Get() (Resp, int, error) {
c.mu.Lock()
defer c.mu.Unlock()
return c.reply, c.level, c.err
}
// Done returns a channel that will close when the correctable call is completed.
func (c *Correctable[Resp]) Done() <-chan struct{} {
return c.donech
}
// Watch returns a channel that will close when the correctable call has reached a specified level.
func (c *Correctable[Resp]) Watch(level int) <-chan struct{} {
ch := make(chan struct{})
c.mu.Lock()
defer c.mu.Unlock()
if level <= c.level {
close(ch)
return ch
}
c.watchers = append(c.watchers, &watcher{level, ch})
return ch
}
// update updates the current state of the correctable call.
// It updates the response, level, and error, and notifies any watchers.
// If done is true, the call is considered complete and the Done channel is closed.
func (c *Correctable[Resp]) update(reply Resp, level int, done bool, err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.done {
panic("update(...) called on a done correctable")
}
c.reply, c.level, c.err, c.done = reply, level, err, done
if done {
close(c.donech)
for _, watcher := range c.watchers {
if watcher != nil {
close(watcher.ch)
}
}
return
}
for i := range c.watchers {
if c.watchers[i] != nil && c.watchers[i].level <= level {
close(c.watchers[i].ch)
c.watchers[i] = nil
}
}
}
// Correctable returns a Correctable that provides progressive updates
// as responses arrive. The level increases with each successful response.
// Use this for correctable quorum patterns where you want to observe
// intermediate states.
//
// Example:
//
// corr := ReadQC(ctx, req).Correctable(2)
// // Wait for level 2 to be reached
// <-corr.Watch(2)
// resp, level, err := corr.Get()
func (r *Responses[Resp]) Correctable(threshold int) *Correctable[Resp] {
corr := &Correctable[Resp]{
level: LevelNotSet,
donech: make(chan struct{}, 1),
}
go func() {
var (
lastResp Resp
count int
errs []nodeError
)
for result := range r.ResponseSeq {
if result.Err != nil {
errs = append(errs, nodeError{nodeID: result.NodeID, cause: result.Err})
continue
}
count++
lastResp = result.Value
// Check if we have reached the threshold
done := count >= threshold
corr.update(lastResp, count, done, nil)
if done {
return
}
}
// If we didn't reach the threshold, mark as done with error
corr.update(lastResp, count, true, QuorumCallError{cause: ErrIncomplete, errors: errs})
}()
return corr
}