Skip to content

Fix item not-nil race#45

Merged
jjeffcaii merged 1 commit intojjeffcaii:masterfrom
echistyakov:fix-item-not-nil-race
Jan 20, 2025
Merged

Fix item not-nil race#45
jjeffcaii merged 1 commit intojjeffcaii:masterfrom
echistyakov:fix-item-not-nil-race

Conversation

@echistyakov
Copy link
Copy Markdown
Contributor

Investigation

I caught this bug while stress-testing RSocket-based Client/Server implementation in Facebook Thrift: https://github.com/facebook/fbthrift/blob/main/thrift/lib/go/thrift/stress/server_test.go

I noticed that occasionally an RSocket would return a rsocket: socket closed already error for no apparent reason. (Communication in this stress test is done between synthetic client/server via a unix socket - so there should be no errors at all under stress test.)

The investigation lead me to the processor class in this repo. I had a suspicion that upon RSocket close - the wrong processor instance was getting notified (due to a global processor pool being use in the implementation).
https://github.com/rsocket/rsocket-go/blob/099cb5babee5b6e19d9488de5d7ad12f107be5ef/internal/socket/duplex.go#L135

To test my theory I applied the following patch:

diff --git a/mono/initiate.go b/mono/initiate.go
index 4d0f4bb..1f9525f 100644
--- a/mono/initiate.go
+++ b/mono/initiate.go
@@ -61,6 +61,9 @@ func NewProcessor(sc scheduler.Scheduler, hook ProcessorFinallyHook) (Mono, Sink
        p := globalProcessorPool.get()
        p.mu.Lock()
        p.sc = sc
+       if p.item != nil {
+               panic("item not nil!!!")
+       }
        p.hookOnFinally = hook
        p.mu.Unlock()
        return wrap(p), p, p

And after a few hundred thousand iterations I got the following panic:

panic: item not nil!!!

goroutine 284085 [running]:
github.com/jjeffcaii/reactor-go/mono.NewProcessor({0x357990, 0xc004f82070}, 0xc0042300a0)
        fbcode/third-party-go/vendor/[github.com/jjeffcaii/reactor-go/mono/initiate.go:65](https://www.internalfb.com/phabricator/paste/view/github.com/jjeffcaii/reactor-go/mono/initiate.go:65) +0x1bb
[github.com/rsocket/rsocket-go/rx/mono.NewProcessor({0x357990, 0xc004f82070}, 0xc0042300a0)](https://www.internalfb.com/phabricator/paste/view/github.com/rsocket/rsocket-go/rx/mono.NewProcessor(%7B0x357990,%200xc004f82070%7D,%200xc0042300a0))
        fbcode/third-party-go/vendor/[github.com/rsocket/rsocket-go/rx/mono/utils.go:86](https://www.internalfb.com/phabricator/paste/view/github.com/rsocket/rsocket-go/rx/mono/utils.go:86) +0x50
[github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).RequestResponse(0xc0034c0000, {0x357d30, 0xc003c6c0c0})](https://www.internalfb.com/phabricator/paste/view/github.com/rsocket/rsocket-go/internal/socket.(*DuplexConnection).RequestResponse(0xc0034c0000,%20%7B0x357d30,%200xc003c6c0c0%7D))
        fbcode/third-party-go/vendor/[github.com/rsocket/rsocket-go/internal/socket/duplex.go:274](https://www.internalfb.com/phabricator/paste/view/github.com/rsocket/rsocket-go/internal/socket/duplex.go:274) +0x22f
[github.com/rsocket/rsocket-go/internal/socket.(*BaseSocket).RequestResponse(0xc0039a4460, {0x357d30, 0xc003c6c0c0})](https://www.internalfb.com/phabricator/paste/view/github.com/rsocket/rsocket-go/internal/socket.(*BaseSocket).RequestResponse(0xc0039a4460,%20%7B0x357d30,%200xc003c6c0c0%7D))
        fbcode/third-party-go/vendor/[github.com/rsocket/rsocket-go/internal/socket/base_socket.go:53](https://www.internalfb.com/phabricator/paste/view/github.com/rsocket/rsocket-go/internal/socket/base_socket.go:53) +0xe7
thrift/lib/go/thrift.(*rsocketClient).RequestResponse(0xc004f84570, {0x357ba8, 0xc003a34000}, {0x2cb857, 0x4}, 0x2, 0x1, 0xc004f84630, {0xc000a400c0, 0x8, ...})
        fbcode/thrift/lib/go/thrift/rocket_rsocket_client.go:151 +0x142
thrift/lib/go/thrift.(*rocketClient).Flush(0xc0049ba000)
        fbcode/thrift/lib/go/thrift/rocket_client.go:123 +0x437
[github.com/facebook/fbthrift/thrift/lib/go/thrift/types.(*SerialChannel).sendMsg(0xc003b82060, {0x357ac8, 0x1acafe0}, {0x2cb857, 0x4}, {0x354b80, 0xc0022ca2b0}, 0x1)](https://www.internalfb.com/phabricator/paste/view/github.com/facebook/fbthrift/thrift/lib/go/thrift/types.(*SerialChannel).sendMsg(0xc003b82060,%20%7B0x357ac8,%200x1acafe0%7D,%20%7B0x2cb857,%200x4%7D,%20%7B0x354b80,%200xc0022ca2b0%7D,%200x1))
        fbcode/thrift/lib/go/thrift/types/serial_channel.go:61 +0x2f1
[github.com/facebook/fbthrift/thrift/lib/go/thrift/types.(*SerialChannel).Call(0xc003b82060, {0x357ac8, 0x1acafe0}, {0x2cb857, 0x4}, {0x354b80, 0xc0022ca2b0}, {0x354ba0, 0xc004f82050})](https://www.internalfb.com/phabricator/paste/view/github.com/facebook/fbthrift/thrift/lib/go/thrift/types.(*SerialChannel).Call(0xc003b82060,%20%7B0x357ac8,%200x1acafe0%7D,%20%7B0x2cb857,%200x4%7D,%20%7B0x354b80,%200xc0022ca2b0%7D,%20%7B0x354ba0,%200xc004f82050%7D))
        fbcode/thrift/lib/go/thrift/types/serial_channel.go:118 +0x150
[github.com/facebook/fbthrift/thrift/lib/go/thrift/dummy.(*DummyChannelClient).Echo(0xc0022ca260, {0x357ac8, 0x1acafe0}, {0x2cb9cd, 0x5})](https://www.internalfb.com/phabricator/paste/view/github.com/facebook/fbthrift/thrift/lib/go/thrift/dummy.(*DummyChannelClient).Echo(0xc0022ca260,%20%7B0x357ac8,%200x1acafe0%7D,%20%7B0x2cb9cd,%200x5%7D))
        fbcode/thrift/lib/go/thrift/dummy/svcs.go:87 +0x115
[github.com/facebook/fbthrift/thrift/lib/go/thrift/dummy.(*DummyClient).Echo(0xc004f82038, {0x2cb9cd, 0x5})](https://www.internalfb.com/phabricator/paste/view/github.com/facebook/fbthrift/thrift/lib/go/thrift/dummy.(*DummyClient).Echo(0xc004f82038,%20%7B0x2cb9cd,%200x5%7D))
        fbcode/thrift/lib/go/thrift/dummy/svcs.go:95 +0x6d
thrift/lib/go/thrift/stress.runStressTest.func4()
        fbcode/thrift/lib/go/thrift/stress/server_test.go:130 +0x40d
[golang.org/x/sync/errgroup.(*Group).Go.func1()](https://www.internalfb.com/phabricator/paste/view/golang.org/x/sync/errgroup.(*Group).Go.func1())
        fbcode/third-party-go/vendor/[golang.org/x/sync/errgroup/errgroup.go:78](https://www.internalfb.com/phabricator/paste/view/golang.org/x/sync/errgroup/errgroup.go:78) +0xa2
created by [golang.org/x/sync/errgroup.(*Group).Go](https://www.internalfb.com/phabricator/paste/view/golang.org/x/sync/errgroup.(*Group).Go) in goroutine 23
        fbcode/third-party-go/vendor/[golang.org/x/sync/errgroup/errgroup.go:75](https://www.internalfb.com/phabricator/paste/view/golang.org/x/sync/errgroup/errgroup.go:75) +0x15d

Race condition

The race condition is as follows:

  1. A processor instance is obtained from the global pool:
    https://github.com/echistyakov/reactor-go/blob/ee85b1f144c3b16dc7149f45573cf029c9c139cb/mono/initiate.go#L60-L61
  2. The caller provides a ProcessorFinallyHook which is implemented correctly and and has a call to Dispose() the processor instance (on "finally").
  3. The processor instance is used by its caller to subscribe to via processorSubscriber (in a multi-goroutine environment - where multiple goroutines might have a reference to this processor as a Sink interface).
  4. At some point, the aforementioned processorSubscriber runs its OnComplete method, invoking the "on finally" hook of the processor:
    https://github.com/echistyakov/reactor-go/blob/ee85b1f144c3b16dc7149f45573cf029c9c139cb/mono/processor.go#L188
  5. The "on finally" hook correctly calls "Dispose" method of the processor.
  6. Dispose, in turn, places the processor back into the global pool.
  7. However, the goroutine running the Dispose method gets interrupted by another goroutine right in between the following lines:
    https://github.com/echistyakov/reactor-go/blob/ee85b1f144c3b16dc7149f45573cf029c9c139cb/mono/processor.go#L46-L47
  8. And this other rogue goroutine calls either Success or Error on the processor instance - setting the item to some non-nil value.
  9. The original goroutine is able to finish running Dispose and the processor is placed back into the global pool.
  10. The processor is soon removed again from the pool to be used by another caller.
  11. But the item is not nil and at the same time unrelated to the new caller. That's both a leak (between unrelated callers) and a race at the same time!

After the fix

The issue with "socket already closed" goes away.

Copy link
Copy Markdown
Owner

@jjeffcaii jjeffcaii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jjeffcaii jjeffcaii merged commit 5248ec1 into jjeffcaii:master Jan 20, 2025
@jjeffcaii
Copy link
Copy Markdown
Owner

It has been released with v0.5.6. Thanks for the contribution~ 👍🏽

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants