Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

sync sessions with blockstore#398

Closed
dirkmc wants to merge 41 commits intomasterfrom
refactor/notif
Closed

sync sessions with blockstore#398
dirkmc wants to merge 41 commits intomasterfrom
refactor/notif

Conversation

@dirkmc
Copy link
Contributor

@dirkmc dirkmc commented May 20, 2020

Fixes ipfs/boxo#92

There are some issues with the existing message flow through the code as outlined in ipfs/boxo#92. The flow is complex and there are some synchonization issues.

Existing Message Flow

Bitswap (existing)

In the existing message flow, the Session

  • subscribes to keys with a Pubsub implementation, that in turn relies on an external library
  • registers the keys with the SessionInterestManager

When a message is received by Bitswap, it

  • informs Pubsub, which informs the relevant Sessions
  • informs the SessionManager which asks the SessionInterestManager which sessions are interested in the message, then passes the message to the interested Sessions
  • stores the blocks in the Blockstore

When the Session receives a block, it informs the SessionManager which checks the SessionInterestManager to see if there are any other interested sessions, and if not tells the PeerManager to cancel the want.

New Message Flow

Bitswap (new)

In the new message flow, the Session creates a WantRequest to manage a set of keys with the WantRequestManager. The WantRequestManager checks the Blockstore to see if the block is already there.

When a message is received by Bitswap, it informs the WantRequestManager, which

  • informs the relevant WantRequest which informs the Session
  • stores the blocks in the Blockstore

The PeerManager keeps track of which sessions have pending wants (by session ID).
When the Session receives a block, it sends a cancel for the session ID and want to the PeerManager.

@dirkmc dirkmc requested a review from Stebalien May 20, 2020 18:04
Copy link
Member

@Stebalien Stebalien left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to need more docs and better function names before I can review the design.

  • Let's get rid of On* methods/functions where possible. That doesn't say what the method does and lets us just stick arbitrary logic in it.
  • Docs, lots of docs. Especially docs talking about the purpose and goal of types/functions. This is a great test to see if the function. Even if we have TODOs, I need to know what the code is trying to do.
  • Avoid mixing patterns.
    • Don't mix callbacks with channels and event loops. The channel event loop pattern is designed to run specific logic on specific goroutines. If we start throwing callbacks around, these callbacks can get called anywhere.
    • If we're using channel and event loop pattern, all state updates need to go through the event loop.

// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
return bsgetter.SyncGetBlock(parent, k, bs.GetBlocks)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed bsgetter - its functionality now lives inside the session


// Put wanted blocks into blockstore
if len(wanted) > 0 {
err := bs.blockstore.PutMany(wanted)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blockstore operations now happen inside of the WantRequestManager so it can synchronize access


// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Messages are now only published to the WantRequestManager, instead of sending blocks to the Notifier and messages to the SessionManager

// Free up block presence tracking for keys that no session is interested
// in anymore
unwanted := pm.pwm.unwanted(cancelKs)
pm.bpm.RemoveKeys(unwanted)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PeerManager now keeps track of which sessions are interested in which wants.
So I moved the check for whether any sessions are interested in a key (and thus when it's safe to send a cancel) from the SessionManager / SessionInterestManager to here in the PeerManager.

sessions map[uint64]struct{}
tp wantType
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the peerWantManager we now keep track of which sessions are interested in each want

consecutiveTicks int
initialSearchDelay time.Duration
periodicSearchDelay delay.D
wantRequests map[*bswrm.WantRequest]struct{}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The session now keeps track of individual WantRequests (each call to GetBlocks() creates a new WantRequest)

dontHaves = interestedRes[2]
s.logReceiveFrom(from, ks, haves, dontHaves)
// GetBlock fetches a single block.
func (s *Session) GetBlock(reqctx context.Context, k cid.Cid) (blocks.Block, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code in GetBlock() was moved from getter.go into the Session. Seems like a more natural place for it to live.

)

// SessionWantsCanceller provides a method to cancel wants
type SessionWantsCanceller interface {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer cancel wants with the SessionManager - instead the sessions tell the PeerManager when they're no longer interested in a want and it decides whether to cancel the want


// Cancel keys that no session is interested in anymore
sm.cancelWants(cancelKs)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PeerManager now takes care of cancelling wants

}

// ReceiveFrom is called when a new message is received
func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WantRequestManager now takes care of receiving incoming messages and distributing them to sessions

@Stebalien
Copy link
Member

I haven't read it in-depth, but I like the general structure. I'll give it a more in-depth review once we finally ship a release.

// sent
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
// sent. It will only send a cancel for keys that no session wants anymore.
func (pwm *peerWantManager) sendCancels(sid uint64, cancelKs []cid.Cid) {
Copy link
Contributor Author

@dirkmc dirkmc Jun 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With these changes PeerWantManager actually ends up performing better than master on BenchmarkPeerManager:

$ go test ./internal/peermanager -run xyz -v -bench . -benchtime 5s
goos: darwin
goarch: amd64
pkg: github.com/ipfs/go-bitswap/internal/peermanager
BenchmarkPeerManager
BenchmarkPeerManager-8   	  203724	     29745 ns/op

master

$ go test ./internal/peermanager -run xyz -v -bench . -benchtime 5s
goos: darwin
goarch: amd64
pkg: github.com/ipfs/go-bitswap/internal/peermanager
BenchmarkPeerManager
BenchmarkPeerManager-8   	  169858	     35684 ns/op

var wr *bswrm.WantRequest
var err error
wr, err = s.wrm.NewWantRequest(keys, func(ks []cid.Cid) {
s.incoming <- op{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cancel function is called when the request is cancelled or the session is shutdown, so it will block the WantRequest.Run() go-routine which is about to exit. When the session shuts down, it drains the incoming channel, so this channel send shouldn't ever block forever. I'll add a comment to clarify.

@dirkmc dirkmc marked this pull request as ready for review June 12, 2020 16:33
@dirkmc dirkmc requested a review from Stebalien June 12, 2020 16:33
@dirkmc
Copy link
Contributor Author

dirkmc commented Jun 12, 2020

I ran the benchmarks manually and there seems to be no significant change in performance:

Screen Shot 2020-06-12 at 5 49 39 PM

Left side: This branch / Right side: master

Copy link
Member

@Stebalien Stebalien left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basic pass


// Check if the block is wanted by one of the sessions
for wr := range wrm.wrs[c] {
if wr.wants(c) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we check this? Why not just if len(wrm.wrs[c]) > 0? I.e., "there exists a want request for the CID"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently, yes. What about having two maps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have 2 maps but that would mean when a want changes state from wanted -> unwanted we have to update both maps, so I believe it would be less performant

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating the wants might be slightly more expensive, but we'd be able to get rid of this loop. An extra map operation per block received is better than iterating over the entire wantlist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah so this code is actually iterating over all the WantRequests who have registered interest in the want (there will almost always be just one).

To clarify:

func (wrm *WantRequestManager) wantedBlocks(blks []blocks.Block) []blocks.Block {
	wrm.lk.Lock()
	defer wrm.lk.Unlock()

	wanted := make([]blocks.Block, 0, len(blks))
	for _, b := range blks {
		c := b.Cid()

		// Check if the block is wanted by one of the sessions
		for wr := range wrm.wrs[c] {
			if wr.wants(c) {
				wanted = append(wanted, b)
				break
			}
		}
	}
	return wanted
}

wrm.wrs is a mapping from CID -> <set of WantRequests>, so

for wr := range wrm.wrs[c] {

iterates over <set of WantRequests>


// Publish the message to WantRequests that are interested in the
// blocks / HAVEs / DONT_HAVEs in the message
return wrm.publish(msg), nil
Copy link
Member

@Stebalien Stebalien Jun 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • This will keep blocks in memory for longer. May not be an issue.
  • What about blocks we're not even interested in?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also returning the wrong thing. Don't we want to return the blocks we want, not the blocks we're simply interested in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what it returns - added some comments to make this clearer

}

//
// The WantRequestManager keeps track of WantRequests.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we be a bit more explicit about the purpose of this service? That'll help us reason about where functionality should live.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

//
// The WantRequestManager keeps track of which sessions want which blocks,
// distributes incoming messages to those sessions, and writes blocks to
// the blockstore.
// When the client calls Session.WantBlocks(keys), the session creates a
// WantRequest with those keys on the WantRequestManager.
// When a message arrives Bitswap calls PublishToSessions and the
// WantRequestManager writes the blocks to the blockstore and informs all
// Sessions that are interested in the message.
//

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! I wonder if this should be called something like the Router, BlockRouter, or ResponseRouter, or something like that. WantRequestManager makes me think it manages wants.

But this description makes it clear what this service is supposed to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that the naming is confusing, let's think about how to make names better as we work out the other details of the PR.

bitswap.go Outdated
for _, b := range notWanted {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
}
wantedKs, err := bs.wrm.PublishToSessions(&bswrm.IncomingMessage{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to not have to return state from this. We'd be able to make this async. We should be able to move providing and bitswap engine updates inside, maybe?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. Nevermind. That doesn't really change anything.

if !local {
// If the blocks came from the network, only put blocks to the
// blockstore if the local node actually wanted them
wanted = wrm.wantedBlocks(msg.Blks)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. This is racy, isn't it? I guess it's racy already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The steps are:

  1. figure out which blocks are wanted
  2. write wanted blocks to the blockstore (may take some time)
  3. publish message to sessions
  • figure out which blocks are wanted now
  • set wants to "received" state
  • send to sessions

We need to unlock the WantRequestManager for step 2, which is why we need to figure out which blocks are wanted twice. It's possible for the same block to occasionally be written twice to the blockstore, although I wouldn't expect that to happen very often and it's not a problem in terms of correctness (only in terms of performance). Does the blockstore detect when the same block is written twice?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing twice should be fine, it just might be slightly slower.

for _, c := range interestedKs {
log.Debugw("Bitswap <- block", "local", s.self, "from", from, "cid", c, "session", s.id)
// Use a WantRequest to listen for incoming messages pertaining to the keys
wr, err := s.wrm.NewWantRequest(keys)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this work when we switch to streaming requests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently WantRequest.Run() keeps track of which keys have been received, and closes the outgoing channel of blocks when all keys have been received.
I imagine that if we have a channel of incoming keys (instead of a slice) WantRequest.Run() will instead check if the channel is closed, and if so, close the outgoing channel of blocks.


// Send the message and the set of cids of wanted blocks
select {
case wr.messages <- &messageWanted{fmsg, wanted}:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's any way to avoid this event loop, I'd do that. We're mixing locked state with event loops.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this, I'm pretty sure we don't need this event loop at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right it would be better not to mix locking with event loops.

We need locking so that we can return which blocks were wanted from PublishToSessions()

I believe we need an event loop to

  • send blocks on the outgoing channel (returned from Session.GetBlocks())
  • listen for requestCtx.Done() and sessCtx.Done()

In future we'll probably also need it to read from the incoming channel of keys.

Any way we can avoid that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outgoing channel is completely buffered, right? Can't we just write into it directly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok, if we're streaming wants, this will no longer apply.


// drainCancels receives on the incoming channel until a cancel request for
// each WantRequest has been received and passed to the sessionWantSender
func (s *Session) drainCancels() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there really no better way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open to suggestions :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really...

@Stebalien
Copy link
Member

So, the message diagram is missing at least one message: blocks from the WantRequest to the user. Can we decouple that? Would that make things simpler?

That is:

  • User asks session for blocks.
  • Session tells the want request manager it's interested in the blocks.
  • Session starts fetching the blocks.
  • Eventually, the want request manager sends an incoming message to the session that includes the block.
  • The Session sends the blocks back to the user.

This will ensure that blocks take a more predictable path:

out: { { users } -> sessions } -> session peer manager -> { peer queues }
                        \
                         v
in:  { message } -> want request manager -> { sessions -> { users } }

@Stebalien
Copy link
Member

What happens if I call GetBlocks with the same CIDs multiple times concurrently on the same session?

@dirkmc
Copy link
Contributor Author

dirkmc commented Jun 15, 2020

  • the want request manager sends an incoming message to the session that includes the block
  • the Session sends the blocks back to the user.

I agree that it would be nice to have a more linear flow like that. The catch is that the Session would need to be aware of which WantRequests are interested in which keys. We could add that, although it seems a little redundant to Route from WantRequest -> Session -> WantRequest.

We could also have change from a WantRequestManager that knows about which WantRequests are interested in each key to

  • a SessionWantManager that knows about which Sessions are interested in which keys
  • Sessions that know which WantRequests are interested in which keys

Again it's a little redundant.

@dirkmc
Copy link
Contributor Author

dirkmc commented Jun 15, 2020

What happens if I call GetBlocks with the same CIDs multiple times concurrently on the same session?

I think that will cause problems (I think that's probably always been an issue)

@Stebalien
Copy link
Member

I agree that it would be nice to have a more linear flow like that. The catch is that the Session would need to be aware of which WantRequests are interested in which keys. We could add that, although it seems a little redundant to Route from WantRequest -> Session -> WantRequest.

I'm suggesting a different flow:

  • The user asks the user asks the session for a block (refcounted in the session in case the user asks multiple times).
  • The session asks tells the wantrequestmanager that it's interested in wants (refcounted per session).
  • The session tells the session peer manager to start sending wants to peers.
  • When bitswap receives a block, it informs the wantrequestmanager.
  • The wantrequestmanager informs the sessions.
  • The sessions figure out which users want the blocks, and route them.

Effectively, this gets rid of wantrequests. I'm not sure if this is worth implementing in this patch, but it would definitely fix #398 (comment).

I've posted some architecture thoughts in ipfs/boxo#90 to sum up how I see this flow working. Unfortunately, I know I'm missing quite a few details.

@dirkmc
Copy link
Contributor Author

dirkmc commented Jun 17, 2020

The sessions figure out which users want the blocks, and route them

I think this is the issue - we still need to have a mapping from the CID back to the channel for the user who wants the block (ie the WantRequest).

We also need to cancel wants when the request context is cancelled, which means we need a mapping from user context to wants (the WantRequest).

@Stebalien
Copy link
Member

@dirkmc I can't remember where we left this, I just remember that you needed to hop to the lotus team. Should we try to land it?

@dirkmc
Copy link
Contributor Author

dirkmc commented Sep 15, 2021

To be honest I think it's probably better to drop this PR and start again at the design stage - it was more complicated than I anticipated and I think it would be better to debate it at the design level before trying to implement it in code

@Stebalien
Copy link
Member

Yeah, SGTM.

@Stebalien Stebalien closed this Sep 15, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[ipfs/go-bitswap] Sync sessions and received blocks

2 participants