-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathmgr.go
More file actions
131 lines (119 loc) · 3.1 KB
/
mgr.go
File metadata and controls
131 lines (119 loc) · 3.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package gorums
import (
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)
// Manager maintains a connection pool of nodes on
// which quorum calls can be performed.
type Manager struct {
mu sync.Mutex
nodes []*Node
lookup map[uint32]*Node
closeOnce sync.Once
logger *log.Logger
opts managerOptions
nextMsgID uint64
}
// NewManager returns a new Manager for managing connection to nodes added
// to the manager. This function accepts manager options used to configure
// various aspects of the manager.
func NewManager(opts ...ManagerOption) *Manager {
m := &Manager{
lookup: make(map[uint32]*Node),
opts: newManagerOptions(),
}
for _, opt := range opts {
opt(&m.opts)
}
if m.opts.logger != nil {
m.logger = m.opts.logger
}
m.opts.grpcDialOpts = append(m.opts.grpcDialOpts, grpc.WithDefaultCallOptions(
grpc.CallContentSubtype(ContentSubtype),
))
if m.opts.backoff != backoff.DefaultConfig {
m.opts.grpcDialOpts = append(m.opts.grpcDialOpts, grpc.WithConnectParams(
grpc.ConnectParams{Backoff: m.opts.backoff},
))
}
if m.logger != nil {
m.logger.Printf("ready")
}
return m
}
// Close closes all node connections and any client streams.
func (m *Manager) Close() error {
var err error
m.closeOnce.Do(func() {
for _, node := range m.nodes {
err = errors.Join(err, node.close())
}
})
return err
}
// NodeIDs returns the identifier of each available node. IDs are returned in
// the same order as they were provided in the creation of the Manager.
func (m *Manager) NodeIDs() []uint32 {
m.mu.Lock()
defer m.mu.Unlock()
ids := make([]uint32, 0, len(m.nodes))
for _, node := range m.nodes {
ids = append(ids, node.ID())
}
return ids
}
// Node returns the node with the given identifier if present.
func (m *Manager) Node(id uint32) (node *Node, found bool) {
m.mu.Lock()
defer m.mu.Unlock()
node, found = m.lookup[id]
return node, found
}
// Nodes returns a slice of each available node. IDs are returned in the same
// order as they were provided in the creation of the Manager.
func (m *Manager) Nodes() []*Node {
m.mu.Lock()
defer m.mu.Unlock()
return m.nodes
}
// Size returns the number of nodes in the Manager.
func (m *Manager) Size() (nodes int) {
m.mu.Lock()
defer m.mu.Unlock()
return len(m.nodes)
}
func (m *Manager) addNode(node *Node) {
m.mu.Lock()
defer m.mu.Unlock()
m.lookup[node.id] = node
m.nodes = append(m.nodes, node)
}
func (m *Manager) newNode(addr string, id uint32) (*Node, error) {
if _, found := m.Node(id); found {
return nil, fmt.Errorf("node %d already exists", id)
}
opts := nodeOptions{
ID: id,
SendBufferSize: m.opts.sendBuffer,
MsgIDGen: m.getMsgID,
Metadata: m.opts.metadata,
PerNodeMD: m.opts.perNodeMD,
DialOpts: m.opts.grpcDialOpts,
Manager: m,
}
n, err := newNode(addr, opts)
if err != nil {
return nil, err
}
m.addNode(n)
return n, nil
}
// getMsgID returns a unique message ID for a new RPC from this client's manager.
func (m *Manager) getMsgID() uint64 {
return atomic.AddUint64(&m.nextMsgID, 1)
}