-
Notifications
You must be signed in to change notification settings - Fork 18.9k
libnetwork: use go-immutable-radix instead of radix #44501
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -396,7 +396,7 @@ func (nDB *NetworkDB) reapTableEntries() { | |
| // The lock is taken at the beginning of the cycle and the deletion is inline | ||
| for _, nid := range nodeNetworks { | ||
| nDB.Lock() | ||
| nDB.indexes[byNetwork].WalkPrefix("/"+nid, func(path string, v interface{}) bool { | ||
| nDB.indexes[byNetwork].Root().WalkPrefix([]byte("/"+nid), func(path []byte, v interface{}) bool { | ||
| // timeCompensation compensate in case the lock took some time to be released | ||
| timeCompensation := time.Since(cycleStart) | ||
| entry, ok := v.(*entry) | ||
|
|
@@ -411,7 +411,7 @@ func (nDB *NetworkDB) reapTableEntries() { | |
| return false | ||
| } | ||
|
|
||
| params := strings.Split(path[1:], "/") | ||
| params := strings.Split(string(path[1:]), "/") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps a micro-optimisation, but if these loops run many times, perhaps worth it while we're changing; how about using Detailspackage main
import (
"bytes"
"strings"
"testing"
)
func BenchmarkSplitString(b *testing.B) {
path := []byte("/one/two/three/four")
sep := "/"
b.ReportAllocs()
for i := 0; i < b.N; i++ {
params := strings.Split(string(path[1:]), sep)
nid := params[0]
tname := params[1]
key := params[2]
_, _, _ = nid, tname, key
}
}
func BenchmarkSplitBytes(b *testing.B) {
path := []byte("/one/two/three/four")
sep := []byte("/")
b.ReportAllocs()
for i := 0; i < b.N; i++ {
params := bytes.Split(path[1:], sep)
nid := string(params[0])
tname := string(params[1])
key := string(params[2])
_, _, _ = nid, tname, key
}
}
func BenchmarkSplitNString(b *testing.B) {
path := []byte("/one/two/three/four")
sep := "/"
b.ReportAllocs()
for i := 0; i < b.N; i++ {
params := strings.SplitN(string(path[1:]), sep, 4)
nid := params[0]
tname := params[1]
key := params[2]
_, _, _ = nid, tname, key
}
}
func BenchmarkSplitNBytes(b *testing.B) {
path := []byte("/one/two/three/four")
sep := []byte("/")
b.ReportAllocs()
for i := 0; i < b.N; i++ {
params := bytes.SplitN(path[1:], sep, 4)
nid := string(params[0])
tname := string(params[1])
key := string(params[2])
_, _, _ = nid, tname, key
}
}
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I initially had that that but whatever you gain by that is offset by the fact that you need to redo the conversion in deleteEntry. |
||
| nid := params[0] | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like this is redundant, as we already know the prefix (which is what we're passing to iterate), which means we can also (instead of only stripping the // Format is "/<networkID>/<tableName>/<endpointID>/<value>",
// trim "/+nid+/" before splitting.
params := bytes.Split(path[len(nid)+2:], []byte("/"))
tname := string(params[0])
key := string(params[1])
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. deleteEntry needs both |
||
| tname := params[1] | ||
| key := params[2] | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Silly question (I can't comment on the line below); as the problem was that we were deleting inside the loop, and as we're already taking a lock, would it make sense to collect the list of entries to delete in Also wondering; it seems we're using the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I thought about this too. The current logic is very hairy because it's not simply a deleting a bunch of radixes, it also checks for each entry if it was successfully deleted and only then it updates the state of the number of entries in that network: https://github.com/moby/moby/pull/44501/files#diff-7b5363044492af3bcc6d8117470bfc51476d389c7a90670cfd5c62a83c32445aR777 And there's that whole timecompensation logic too. So yeah, again, I tried to change the least amount of logic in an area I'm not very comfortable in. If somebody wants to decipher the timecompensation logic and use DeletePrefix that's just an optimization which is not the purpose of this PR. |
||
|
|
@@ -629,7 +629,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b | |
| } | ||
|
|
||
| for _, nid := range networks { | ||
| nDB.indexes[byNetwork].WalkPrefix("/"+nid, func(path string, v interface{}) bool { | ||
| nDB.indexes[byNetwork].Root().WalkPrefix([]byte("/"+nid), func(path []byte, v interface{}) bool { | ||
| entry, ok := v.(*entry) | ||
| if !ok { | ||
| return false | ||
|
|
@@ -640,7 +640,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b | |
| eType = TableEventTypeDelete | ||
| } | ||
|
|
||
| params := strings.Split(path[1:], "/") | ||
| params := strings.Split(string(path[1:]), "/") | ||
| tEvent := TableEvent{ | ||
| Type: eType, | ||
| LTime: entry.ltime, | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -10,10 +10,10 @@ import ( | |||||||||||||
| "sync" | ||||||||||||||
| "time" | ||||||||||||||
|
|
||||||||||||||
| "github.com/armon/go-radix" | ||||||||||||||
| "github.com/docker/docker/libnetwork/types" | ||||||||||||||
| "github.com/docker/docker/pkg/stringid" | ||||||||||||||
| "github.com/docker/go-events" | ||||||||||||||
| iradix "github.com/hashicorp/go-immutable-radix" | ||||||||||||||
| "github.com/hashicorp/memberlist" | ||||||||||||||
| "github.com/hashicorp/serf/serf" | ||||||||||||||
| "github.com/sirupsen/logrus" | ||||||||||||||
|
|
@@ -43,7 +43,7 @@ type NetworkDB struct { | |||||||||||||
|
|
||||||||||||||
| // All the tree index (byTable, byNetwork) that we maintain | ||||||||||||||
| // the db. | ||||||||||||||
| indexes map[int]*radix.Tree | ||||||||||||||
| indexes map[int]*iradix.Tree | ||||||||||||||
|
|
||||||||||||||
| // Memberlist we use to drive the cluster. | ||||||||||||||
| memberlist *memberlist.Memberlist | ||||||||||||||
|
|
@@ -255,7 +255,7 @@ func New(c *Config) (*NetworkDB, error) { | |||||||||||||
|
|
||||||||||||||
| nDB := &NetworkDB{ | ||||||||||||||
| config: c, | ||||||||||||||
| indexes: make(map[int]*radix.Tree), | ||||||||||||||
| indexes: make(map[int]*iradix.Tree), | ||||||||||||||
| networks: make(map[string]map[string]*network), | ||||||||||||||
| nodes: make(map[string]*node), | ||||||||||||||
| failedNodes: make(map[string]*node), | ||||||||||||||
|
|
@@ -265,8 +265,8 @@ func New(c *Config) (*NetworkDB, error) { | |||||||||||||
| broadcaster: events.NewBroadcaster(), | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| nDB.indexes[byTable] = radix.New() | ||||||||||||||
| nDB.indexes[byNetwork] = radix.New() | ||||||||||||||
| nDB.indexes[byTable] = iradix.New() | ||||||||||||||
| nDB.indexes[byNetwork] = iradix.New() | ||||||||||||||
|
|
||||||||||||||
| logrus.Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c) | ||||||||||||||
| if err := nDB.clusterInit(); err != nil { | ||||||||||||||
|
|
@@ -348,7 +348,7 @@ func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) { | |||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) { | ||||||||||||||
| e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key)) | ||||||||||||||
| e, ok := nDB.indexes[byTable].Get([]byte(fmt.Sprintf("/%s/%s/%s", tname, nid, key))) | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably also consider using concatenation for these simple cases (instead of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it's worth to sacrifice readability in this case. |
||||||||||||||
| if !ok { | ||||||||||||||
| return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key) | ||||||||||||||
| } | ||||||||||||||
|
|
@@ -422,12 +422,13 @@ type TableElem struct { | |||||||||||||
| // returns a map of keys and values | ||||||||||||||
| func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]*TableElem { | ||||||||||||||
| entries := make(map[string]*TableElem) | ||||||||||||||
| nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s/%s", tname, nid), func(k string, v interface{}) bool { | ||||||||||||||
| nDB.indexes[byTable].Root().WalkPrefix([]byte(fmt.Sprintf("/%s/%s", tname, nid)), func(k []byte, v interface{}) bool { | ||||||||||||||
| entry := v.(*entry) | ||||||||||||||
| if entry.deleting { | ||||||||||||||
| return false | ||||||||||||||
| } | ||||||||||||||
| key := k[strings.LastIndex(k, "/")+1:] | ||||||||||||||
| key := string(k) | ||||||||||||||
| key = key[strings.LastIndex(key, "/")+1:] | ||||||||||||||
| entries[key] = &TableElem{Value: entry.value, owner: entry.node} | ||||||||||||||
| return false | ||||||||||||||
| }) | ||||||||||||||
|
|
@@ -499,10 +500,10 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { | |||||||||||||
| // Indicates if the delete is triggered for the local node | ||||||||||||||
| isNodeLocal := node == nDB.config.NodeID | ||||||||||||||
|
|
||||||||||||||
| nDB.indexes[byNetwork].WalkPrefix("/"+nid, | ||||||||||||||
| func(path string, v interface{}) bool { | ||||||||||||||
| nDB.indexes[byNetwork].Root().WalkPrefix([]byte("/"+nid), | ||||||||||||||
| func(path []byte, v interface{}) bool { | ||||||||||||||
| oldEntry := v.(*entry) | ||||||||||||||
| params := strings.Split(path[1:], "/") | ||||||||||||||
| params := strings.Split(string(path[1:]), "/") | ||||||||||||||
| nid := params[0] | ||||||||||||||
| tname := params[1] | ||||||||||||||
| key := params[2] | ||||||||||||||
|
|
@@ -554,13 +555,13 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { | |||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| func (nDB *NetworkDB) deleteNodeTableEntries(node string) { | ||||||||||||||
| nDB.indexes[byTable].Walk(func(path string, v interface{}) bool { | ||||||||||||||
| nDB.indexes[byTable].Root().Walk(func(path []byte, v interface{}) bool { | ||||||||||||||
| oldEntry := v.(*entry) | ||||||||||||||
| if oldEntry.node != node { | ||||||||||||||
| return false | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| params := strings.Split(path[1:], "/") | ||||||||||||||
| params := strings.Split(string(path[1:]), "/") | ||||||||||||||
| tname := params[0] | ||||||||||||||
| nid := params[1] | ||||||||||||||
| key := params[2] | ||||||||||||||
|
|
@@ -580,8 +581,8 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) { | |||||||||||||
| func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte, bool) bool) error { | ||||||||||||||
| nDB.RLock() | ||||||||||||||
| values := make(map[string]interface{}) | ||||||||||||||
| nDB.indexes[byTable].WalkPrefix("/"+tname, func(path string, v interface{}) bool { | ||||||||||||||
| values[path] = v | ||||||||||||||
| nDB.indexes[byTable].Root().WalkPrefix([]byte("/"+tname), func(path []byte, v interface{}) bool { | ||||||||||||||
| values[string(path)] = v | ||||||||||||||
| return false | ||||||||||||||
| }) | ||||||||||||||
| nDB.RUnlock() | ||||||||||||||
|
|
@@ -751,9 +752,9 @@ func (nDB *NetworkDB) updateLocalNetworkTime() { | |||||||||||||
|
|
||||||||||||||
| // createOrUpdateEntry this function handles the creation or update of entries into the local | ||||||||||||||
| // tree store. It is also used to keep in sync the entries number of the network (all tables are aggregated) | ||||||||||||||
| func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interface{}) (bool, bool) { | ||||||||||||||
| _, okTable := nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) | ||||||||||||||
| _, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) | ||||||||||||||
| func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interface{}) (okTable bool, okNetwork bool) { | ||||||||||||||
| nDB.indexes[byTable], _, okTable = nDB.indexes[byTable].Insert([]byte(fmt.Sprintf("/%s/%s/%s", tname, nid, key)), entry) | ||||||||||||||
| nDB.indexes[byNetwork], _, okNetwork = nDB.indexes[byNetwork].Insert([]byte(fmt.Sprintf("/%s/%s/%s", nid, tname, key)), entry) | ||||||||||||||
| if !okNetwork { | ||||||||||||||
| // Add only if it is an insert not an update | ||||||||||||||
| n, ok := nDB.networks[nDB.config.NodeID][nid] | ||||||||||||||
|
|
@@ -766,9 +767,9 @@ func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interfac | |||||||||||||
|
|
||||||||||||||
| // deleteEntry this function handles the deletion of entries into the local tree store. | ||||||||||||||
| // It is also used to keep in sync the entries number of the network (all tables are aggregated) | ||||||||||||||
| func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) { | ||||||||||||||
| _, okTable := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)) | ||||||||||||||
| _, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)) | ||||||||||||||
| func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (okTable bool, okNetwork bool) { | ||||||||||||||
| nDB.indexes[byTable], _, okTable = nDB.indexes[byTable].Delete([]byte(fmt.Sprintf("/%s/%s/%s", tname, nid, key))) | ||||||||||||||
| nDB.indexes[byNetwork], _, okNetwork = nDB.indexes[byNetwork].Delete([]byte(fmt.Sprintf("/%s/%s/%s", nid, tname, key))) | ||||||||||||||
|
Comment on lines
+770
to
+772
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my other comment; perhaps if we drop the
Suggested change
We may want to do the actual delete outside of the WalkPrefix function in that case though to remove duplicates first (although it looks like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens when for some unknown reason (something something timeCompensation that I don't understand) not all entries in a prefix are to be deleted? Then you cannot use DeletePrefix, you have to redo a walk for each entry. |
||||||||||||||
| if okNetwork { | ||||||||||||||
| // Remove only if the delete is successful | ||||||||||||||
| n, ok := nDB.networks[nDB.config.NodeID][nid] | ||||||||||||||
|
|
||||||||||||||
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still trying to get my head around this weird construct (added in moby/libnetwork@3feb3aa) even more with the "timeCompensate" below.
Do we know why does the lock/unlock have to happen inside the loop here (but the others lock outside)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment above the for loop is attempting to explain it but I'm not sure I understand it. Could it be that releasing the lock in every iteration allows other goroutines to continue work instead of lagging too much? Either way, this PR is not modifying that logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the commit message of the commit you posted confirms it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, was looking at that description and wondered if that changed now that there's an RWMutex, but yeah, this code is complicated, so perhaps that didn't work