Conversation
Stebalien
left a comment
There was a problem hiding this comment.
This is going to need more docs and better function names before I can review the design.
- Let's get rid of
On*methods/functions where possible. That doesn't say what the method does and lets us just stick arbitrary logic in it. - Docs, lots of docs. Especially docs talking about the purpose and goal of types/functions. This is a great test to see if the function. Even if we have TODOs, I need to know what the code is trying to do.
- Avoid mixing patterns.
- Don't mix callbacks with channels and event loops. The channel event loop pattern is designed to run specific logic on specific goroutines. If we start throwing callbacks around, these callbacks can get called anywhere.
- If we're using channel and event loop pattern, all state updates need to go through the event loop.
| // GetBlock attempts to retrieve a particular block from peers within the | ||
| // deadline enforced by the context. | ||
| func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) { | ||
| return bsgetter.SyncGetBlock(parent, k, bs.GetBlocks) |
There was a problem hiding this comment.
Removed bsgetter - its functionality now lives inside the session
|
|
||
| // Put wanted blocks into blockstore | ||
| if len(wanted) > 0 { | ||
| err := bs.blockstore.PutMany(wanted) |
There was a problem hiding this comment.
blockstore operations now happen inside of the WantRequestManager so it can synchronize access
|
|
||
| // Send all block keys (including duplicates) to any sessions that want them. | ||
| // (The duplicates are needed by sessions for accounting purposes) | ||
| bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves) |
There was a problem hiding this comment.
Messages are now only published to the WantRequestManager, instead of sending blocks to the Notifier and messages to the SessionManager
| // Free up block presence tracking for keys that no session is interested | ||
| // in anymore | ||
| unwanted := pm.pwm.unwanted(cancelKs) | ||
| pm.bpm.RemoveKeys(unwanted) |
There was a problem hiding this comment.
The PeerManager now keeps track of which sessions are interested in which wants.
So I moved the check for whether any sessions are interested in a key (and thus when it's safe to send a cancel) from the SessionManager / SessionInterestManager to here in the PeerManager.
| sessions map[uint64]struct{} | ||
| tp wantType | ||
| } | ||
|
|
There was a problem hiding this comment.
In the peerWantManager we now keep track of which sessions are interested in each want
| consecutiveTicks int | ||
| initialSearchDelay time.Duration | ||
| periodicSearchDelay delay.D | ||
| wantRequests map[*bswrm.WantRequest]struct{} |
There was a problem hiding this comment.
The session now keeps track of individual WantRequests (each call to GetBlocks() creates a new WantRequest)
| dontHaves = interestedRes[2] | ||
| s.logReceiveFrom(from, ks, haves, dontHaves) | ||
| // GetBlock fetches a single block. | ||
| func (s *Session) GetBlock(reqctx context.Context, k cid.Cid) (blocks.Block, error) { |
There was a problem hiding this comment.
The code in GetBlock() was moved from getter.go into the Session. Seems like a more natural place for it to live.
| ) | ||
|
|
||
| // SessionWantsCanceller provides a method to cancel wants | ||
| type SessionWantsCanceller interface { |
There was a problem hiding this comment.
We no longer cancel wants with the SessionManager - instead the sessions tell the PeerManager when they're no longer interested in a want and it decides whether to cancel the want
|
|
||
| // Cancel keys that no session is interested in anymore | ||
| sm.cancelWants(cancelKs) | ||
|
|
There was a problem hiding this comment.
The PeerManager now takes care of cancelling wants
| } | ||
|
|
||
| // ReceiveFrom is called when a new message is received | ||
| func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) { |
There was a problem hiding this comment.
The WantRequestManager now takes care of receiving incoming messages and distributing them to sessions
|
I haven't read it in-depth, but I like the general structure. I'll give it a more in-depth review once we finally ship a release. |
| // sent | ||
| func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { | ||
| // sent. It will only send a cancel for keys that no session wants anymore. | ||
| func (pwm *peerWantManager) sendCancels(sid uint64, cancelKs []cid.Cid) { |
There was a problem hiding this comment.
With these changes PeerWantManager actually ends up performing better than master on BenchmarkPeerManager:
$ go test ./internal/peermanager -run xyz -v -bench . -benchtime 5s
goos: darwin
goarch: amd64
pkg: github.com/ipfs/go-bitswap/internal/peermanager
BenchmarkPeerManager
BenchmarkPeerManager-8 203724 29745 ns/op
master
$ go test ./internal/peermanager -run xyz -v -bench . -benchtime 5s
goos: darwin
goarch: amd64
pkg: github.com/ipfs/go-bitswap/internal/peermanager
BenchmarkPeerManager
BenchmarkPeerManager-8 169858 35684 ns/op
internal/session/session.go
Outdated
| var wr *bswrm.WantRequest | ||
| var err error | ||
| wr, err = s.wrm.NewWantRequest(keys, func(ks []cid.Cid) { | ||
| s.incoming <- op{ |
There was a problem hiding this comment.
This cancel function is called when the request is cancelled or the session is shutdown, so it will block the WantRequest.Run() go-routine which is about to exit. When the session shuts down, it drains the incoming channel, so this channel send shouldn't ever block forever. I'll add a comment to clarify.
|
|
||
| // Check if the block is wanted by one of the sessions | ||
| for wr := range wrm.wrs[c] { | ||
| if wr.wants(c) { |
There was a problem hiding this comment.
Why do we check this? Why not just if len(wrm.wrs[c]) > 0? I.e., "there exists a want request for the CID"?
There was a problem hiding this comment.
Apparently, yes. What about having two maps?
There was a problem hiding this comment.
We could have 2 maps but that would mean when a want changes state from wanted -> unwanted we have to update both maps, so I believe it would be less performant
There was a problem hiding this comment.
Updating the wants might be slightly more expensive, but we'd be able to get rid of this loop. An extra map operation per block received is better than iterating over the entire wantlist.
There was a problem hiding this comment.
Ah so this code is actually iterating over all the WantRequests who have registered interest in the want (there will almost always be just one).
To clarify:
func (wrm *WantRequestManager) wantedBlocks(blks []blocks.Block) []blocks.Block {
wrm.lk.Lock()
defer wrm.lk.Unlock()
wanted := make([]blocks.Block, 0, len(blks))
for _, b := range blks {
c := b.Cid()
// Check if the block is wanted by one of the sessions
for wr := range wrm.wrs[c] {
if wr.wants(c) {
wanted = append(wanted, b)
break
}
}
}
return wanted
}
wrm.wrs is a mapping from CID -> <set of WantRequests>, so
for wr := range wrm.wrs[c] {
iterates over <set of WantRequests>
|
|
||
| // Publish the message to WantRequests that are interested in the | ||
| // blocks / HAVEs / DONT_HAVEs in the message | ||
| return wrm.publish(msg), nil |
There was a problem hiding this comment.
- This will keep blocks in memory for longer. May not be an issue.
- What about blocks we're not even interested in?
There was a problem hiding this comment.
This is also returning the wrong thing. Don't we want to return the blocks we want, not the blocks we're simply interested in?
There was a problem hiding this comment.
That is what it returns - added some comments to make this clearer
| } | ||
|
|
||
| // | ||
| // The WantRequestManager keeps track of WantRequests. |
There was a problem hiding this comment.
Can we be a bit more explicit about the purpose of this service? That'll help us reason about where functionality should live.
There was a problem hiding this comment.
How about:
//
// The WantRequestManager keeps track of which sessions want which blocks,
// distributes incoming messages to those sessions, and writes blocks to
// the blockstore.
// When the client calls Session.WantBlocks(keys), the session creates a
// WantRequest with those keys on the WantRequestManager.
// When a message arrives Bitswap calls PublishToSessions and the
// WantRequestManager writes the blocks to the blockstore and informs all
// Sessions that are interested in the message.
//
There was a problem hiding this comment.
Awesome! I wonder if this should be called something like the Router, BlockRouter, or ResponseRouter, or something like that. WantRequestManager makes me think it manages wants.
But this description makes it clear what this service is supposed to do.
There was a problem hiding this comment.
Agree that the naming is confusing, let's think about how to make names better as we work out the other details of the PR.
bitswap.go
Outdated
| for _, b := range notWanted { | ||
| log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from) | ||
| } | ||
| wantedKs, err := bs.wrm.PublishToSessions(&bswrm.IncomingMessage{ |
There was a problem hiding this comment.
It would be nice to not have to return state from this. We'd be able to make this async. We should be able to move providing and bitswap engine updates inside, maybe?
There was a problem hiding this comment.
Hm. Nevermind. That doesn't really change anything.
| if !local { | ||
| // If the blocks came from the network, only put blocks to the | ||
| // blockstore if the local node actually wanted them | ||
| wanted = wrm.wantedBlocks(msg.Blks) |
There was a problem hiding this comment.
Hm. This is racy, isn't it? I guess it's racy already.
There was a problem hiding this comment.
The steps are:
- figure out which blocks are wanted
- write wanted blocks to the blockstore (may take some time)
- publish message to sessions
- figure out which blocks are wanted now
- set wants to "received" state
- send to sessions
We need to unlock the WantRequestManager for step 2, which is why we need to figure out which blocks are wanted twice. It's possible for the same block to occasionally be written twice to the blockstore, although I wouldn't expect that to happen very often and it's not a problem in terms of correctness (only in terms of performance). Does the blockstore detect when the same block is written twice?
There was a problem hiding this comment.
Writing twice should be fine, it just might be slightly slower.
| for _, c := range interestedKs { | ||
| log.Debugw("Bitswap <- block", "local", s.self, "from", from, "cid", c, "session", s.id) | ||
| // Use a WantRequest to listen for incoming messages pertaining to the keys | ||
| wr, err := s.wrm.NewWantRequest(keys) |
There was a problem hiding this comment.
How will this work when we switch to streaming requests?
There was a problem hiding this comment.
Currently WantRequest.Run() keeps track of which keys have been received, and closes the outgoing channel of blocks when all keys have been received.
I imagine that if we have a channel of incoming keys (instead of a slice) WantRequest.Run() will instead check if the channel is closed, and if so, close the outgoing channel of blocks.
|
|
||
| // Send the message and the set of cids of wanted blocks | ||
| select { | ||
| case wr.messages <- &messageWanted{fmsg, wanted}: |
There was a problem hiding this comment.
If there's any way to avoid this event loop, I'd do that. We're mixing locked state with event loops.
There was a problem hiding this comment.
Looking at this, I'm pretty sure we don't need this event loop at all.
There was a problem hiding this comment.
You're right it would be better not to mix locking with event loops.
We need locking so that we can return which blocks were wanted from PublishToSessions()
I believe we need an event loop to
- send blocks on the outgoing channel (returned from Session.GetBlocks())
- listen for
requestCtx.Done()andsessCtx.Done()
In future we'll probably also need it to read from the incoming channel of keys.
Any way we can avoid that?
There was a problem hiding this comment.
The outgoing channel is completely buffered, right? Can't we just write into it directly?
There was a problem hiding this comment.
Ah, ok, if we're streaming wants, this will no longer apply.
|
|
||
| // drainCancels receives on the incoming channel until a cancel request for | ||
| // each WantRequest has been received and passed to the sessionWantSender | ||
| func (s *Session) drainCancels() { |
There was a problem hiding this comment.
Is there really no better way?
There was a problem hiding this comment.
Open to suggestions :)
|
So, the message diagram is missing at least one message: blocks from the WantRequest to the user. Can we decouple that? Would that make things simpler? That is:
This will ensure that blocks take a more predictable path: |
|
What happens if I call |
I agree that it would be nice to have a more linear flow like that. The catch is that the Session would need to be aware of which WantRequests are interested in which keys. We could add that, although it seems a little redundant to Route from We could also have change from a WantRequestManager that knows about which WantRequests are interested in each key to
Again it's a little redundant. |
I think that will cause problems (I think that's probably always been an issue) |
I'm suggesting a different flow:
Effectively, this gets rid of wantrequests. I'm not sure if this is worth implementing in this patch, but it would definitely fix #398 (comment). I've posted some architecture thoughts in ipfs/boxo#90 to sum up how I see this flow working. Unfortunately, I know I'm missing quite a few details. |
I think this is the issue - we still need to have a mapping from the CID back to the channel for the user who wants the block (ie the WantRequest). We also need to cancel wants when the request context is cancelled, which means we need a mapping from user context to wants (the WantRequest). |
|
@dirkmc I can't remember where we left this, I just remember that you needed to hop to the lotus team. Should we try to land it? |
|
To be honest I think it's probably better to drop this PR and start again at the design stage - it was more complicated than I anticipated and I think it would be better to debate it at the design level before trying to implement it in code |
|
Yeah, SGTM. |

Fixes ipfs/boxo#92
There are some issues with the existing message flow through the code as outlined in ipfs/boxo#92. The flow is complex and there are some synchonization issues.
Existing Message Flow
In the existing message flow, the
SessionPubsubimplementation, that in turn relies on an external librarySessionInterestManagerWhen a message is received by
Bitswap, itPubsub, which informs the relevantSessionsSessionManagerwhich asks theSessionInterestManagerwhich sessions are interested in the message, then passes the message to the interestedSessionsBlockstoreWhen the
Sessionreceives a block, it informs theSessionManagerwhich checks theSessionInterestManagerto see if there are any other interested sessions, and if not tells thePeerManagerto cancel the want.New Message Flow
In the new message flow, the
Sessioncreates aWantRequestto manage a set of keys with theWantRequestManager. TheWantRequestManagerchecks theBlockstoreto see if the block is already there.When a message is received by
Bitswap, it informs theWantRequestManager, whichWantRequestwhich informs theSessionBlockstoreThe
PeerManagerkeeps track of which sessions have pending wants (by session ID).When the
Sessionreceives a block, it sends a cancel for the session ID and want to thePeerManager.