Skip to content

Commit 24d1dcf

Browse files
authored
Fluffy: Implement offer cache to hold content ids of recent offers (#3233)
1 parent bcff15e commit 24d1dcf

File tree

11 files changed

+146
-20
lines changed

11 files changed

+146
-20
lines changed

fluffy/conf.nim

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,22 @@ type
357357
name: "debug-disable-content-cache"
358358
.}: bool
359359

360+
offerCacheSize* {.
361+
hidden,
362+
desc:
363+
"Size of the in memory local offer cache. This is the max number " &
364+
"of content id values that can be stored in the cache.",
365+
defaultValue: defaultPortalProtocolConfig.offerCacheSize,
366+
name: "debug-offer-cache-size"
367+
.}: int
368+
369+
disableOfferCache* {.
370+
hidden,
371+
desc: "Disable the in memory local offer cache",
372+
defaultValue: defaultPortalProtocolConfig.disableOfferCache,
373+
name: "debug-disable-offer-cache"
374+
.}: bool
375+
360376
disablePoke* {.
361377
hidden,
362378
desc: "Disable POKE functionality for gossip mechanisms testing",

fluffy/database/content_db.nim

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ proc createStoreHandler*(db: ContentDB, cfg: RadiusConfig): DbStoreHandler =
472472
return (
473473
proc(
474474
contentKey: ContentKeyByteList, contentId: ContentId, content: seq[byte]
475-
) {.raises: [], gcsafe.} =
475+
): bool {.raises: [], gcsafe.} =
476476
case cfg.kind
477477
of Dynamic:
478478
# In case of dynamic radius, the radius gets adjusted based on the
@@ -491,11 +491,12 @@ proc createStoreHandler*(db: ContentDB, cfg: RadiusConfig): DbStoreHandler =
491491
# small storage capacity or a very small `contentDeletionFraction`
492492
# combined with some big content.
493493
info "Database pruning attempt resulted in no content deleted"
494-
return
494+
return true # Indicate that the database was prunned
495495
of Static:
496496
# If the radius is static, it may never be adjusted, database capacity
497497
# is disabled and no pruning is ever done.
498498
db.put(contentId, content)
499+
return false
499500
)
500501

501502
proc createContainsHandler*(db: ContentDB): DbContainsHandler =

fluffy/fluffy.nim

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,8 @@ proc run(fluffy: Fluffy, config: PortalConf) {.raises: [CatchableError].} =
199199
portalProtocolConfig = PortalProtocolConfig.init(
200200
config.tableIpLimit, config.bucketIpLimit, config.bitsPerHop, config.alpha,
201201
config.radiusConfig, config.disablePoke, config.maxGossipNodes,
202-
config.contentCacheSize, config.disableContentCache, config.maxConcurrentOffers,
203-
config.disableBanNodes,
202+
config.contentCacheSize, config.disableContentCache, config.offerCacheSize,
203+
config.disableOfferCache, config.maxConcurrentOffers, config.disableBanNodes,
204204
)
205205

206206
portalNodeConfig = PortalNodeConfig(

fluffy/network/beacon/beacon_db.nim

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ proc createStoreHandler*(db: BeaconDb): DbStoreHandler =
514514
return (
515515
proc(
516516
contentKey: ContentKeyByteList, contentId: ContentId, content: seq[byte]
517-
) {.raises: [], gcsafe.} =
517+
): bool {.raises: [], gcsafe.} =
518518
let contentKey = decode(contentKey).valueOr:
519519
# TODO: as this should not fail, maybe it is better to raiseAssert ?
520520
return
@@ -575,6 +575,8 @@ proc createStoreHandler*(db: BeaconDb): DbStoreHandler =
575575
db.put(contentId, content)
576576
else:
577577
db.put(contentId, content)
578+
579+
return false # No data pruned
578580
)
579581

580582
proc createContainsHandler*(db: BeaconDb): DbContainsHandler =

fluffy/network/beacon/beacon_network.nim

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,9 @@ proc validateContent(
360360
return false
361361

362362
let contentId = contentIdOpt.get()
363-
n.portalProtocol.storeContent(contentKey, contentId, contentItem)
363+
n.portalProtocol.storeContent(
364+
contentKey, contentId, contentItem, cacheOffer = true
365+
)
364366

365367
debug "Received offered content validated successfully", srcNodeId, contentKey
366368
else:

fluffy/network/history/history_network.nim

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,9 @@ proc validateContent(
397397
warn "Received offered content with invalid content key", srcNodeId, contentKey
398398
return false
399399

400-
n.portalProtocol.storeContent(contentKey, contentId, contentItem)
400+
n.portalProtocol.storeContent(
401+
contentKey, contentId, contentItem, cacheOffer = true
402+
)
401403

402404
debug "Received offered content validated successfully", srcNodeId, contentKey
403405
else:

fluffy/network/state/state_network.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ proc processOffer*(
202202
return err("Received offered content with invalid content key")
203203

204204
n.portalProtocol.storeContent(
205-
contentKeyBytes, contentId, contentValue.toRetrieval().encode()
205+
contentKeyBytes, contentId, contentValue.toRetrieval().encode(), cacheOffer = true
206206
)
207207

208208
await gossipOffer(

fluffy/network/wire/portal_protocol.nim

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,17 @@ declareCounter portal_gossip_without_lookup,
7777
"Portal wire protocol neighborhood gossip that did not require a node lookup",
7878
labels = ["protocol_id"]
7979
declareCounter portal_content_cache_hits,
80-
"Portal wire protocol local content lookups that hit the cache",
80+
"Portal wire protocol local content lookups that hit the content cache",
8181
labels = ["protocol_id"]
8282
declareCounter portal_content_cache_misses,
83-
"Portal wire protocol local content lookups that don't hit the cache",
83+
"Portal wire protocol local content lookups that don't hit the content cache",
84+
labels = ["protocol_id"]
85+
declareCounter portal_offer_cache_hits,
86+
"Portal wire protocol local content lookups that hit the offer cache",
87+
labels = ["protocol_id"]
88+
declareCounter portal_offer_cache_misses,
89+
"Portal wire protocol local content lookups that don't hit the offer cache",
8490
labels = ["protocol_id"]
85-
8691
declareCounter portal_poke_offers,
8792
"Portal wire protocol offers through poke mechanism", labels = ["protocol_id"]
8893

@@ -141,7 +146,7 @@ type
141146

142147
DbStoreHandler* = proc(
143148
contentKey: ContentKeyByteList, contentId: ContentId, content: seq[byte]
144-
) {.raises: [], gcsafe.}
149+
): bool {.raises: [], gcsafe.}
145150

146151
DbContainsHandler* = proc(contentKey: ContentKeyByteList, contentId: ContentId): bool {.
147152
raises: [], gcsafe
@@ -153,8 +158,16 @@ type
153158

154159
RadiusCache* = LruCache[NodeId, UInt256]
155160

161+
# Caches content fetched from the network during lookups.
162+
# Content outside our radius is also cached in order to improve performance
163+
# of queries which may lookup data outside our radius.
156164
ContentCache = LruCache[ContentId, seq[byte]]
157165

166+
# Caches the content ids of the most recently received content offers.
167+
# Content is only stored in this cache if it falls within our radius and similarly
168+
# the cache is only checked if the content id is within our radius.
169+
OfferCache = LruCache[ContentId, bool]
170+
158171
ContentKV* = object
159172
contentKey*: ContentKeyByteList
160173
content*: seq[byte]
@@ -189,6 +202,7 @@ type
189202
radiusCache: RadiusCache
190203
offerQueue: AsyncQueue[OfferRequest]
191204
offerWorkers: seq[Future[void]]
205+
offerCache*: OfferCache
192206
pingTimings: Table[NodeId, chronos.Moment]
193207
config*: PortalProtocolConfig
194208
pingExtensionCapabilities*: set[uint16]
@@ -529,6 +543,16 @@ proc handleFindContent(
529543

530544
encodeMessage(ContentMessage(contentMessageType: enrsType, enrs: enrs))
531545

546+
proc containsContent(
547+
p: PortalProtocol, contentKey: ContentKeyByteList, contentId: ContentId
548+
): bool =
549+
if p.offerCache.contains(contentId):
550+
portal_offer_cache_hits.inc(labelValues = [$p.protocolId])
551+
true
552+
else:
553+
portal_offer_cache_misses.inc(labelValues = [$p.protocolId])
554+
p.dbContains(contentKey, contentId)
555+
532556
proc handleOffer(
533557
p: PortalProtocol, o: OfferMessage, srcId: NodeId
534558
): Result[AcceptMessage, string] =
@@ -571,7 +595,7 @@ proc handleOffer(
571595
discard contentKeysAcceptList.add(DeclinedNotWithinRadius)
572596
elif not p.stream.canAddPendingTransfer(srcId, contentId):
573597
discard contentKeysAcceptList.add(DeclinedInboundTransferInProgress)
574-
elif p.dbContains(contentKey, contentId):
598+
elif p.containsContent(contentKey, contentId):
575599
discard contentKeysAcceptList.add(DeclinedAlreadyStored)
576600
else:
577601
p.stream.addPendingTransfer(srcId, contentId)
@@ -712,6 +736,8 @@ proc new*(
712736
stream: stream,
713737
radiusCache: RadiusCache.init(256),
714738
offerQueue: newAsyncQueue[OfferRequest](config.maxConcurrentOffers),
739+
offerCache:
740+
OfferCache.init(if config.disableOfferCache: 0 else: config.offerCacheSize),
715741
pingTimings: Table[NodeId, chronos.Moment](),
716742
config: config,
717743
pingExtensionCapabilities: pingExtensionCapabilities,
@@ -1133,6 +1159,7 @@ proc offer(
11331159
return err("Error writing requested data")
11341160

11351161
trace "Offered content item send", dataWritten = dataWritten
1162+
11361163
await socket.closeWait()
11371164
trace "Content successfully offered"
11381165

@@ -1789,6 +1816,7 @@ proc storeContent*(
17891816
contentId: ContentId,
17901817
content: seq[byte],
17911818
cacheContent = false,
1819+
cacheOffer = false,
17921820
): bool {.discardable.} =
17931821
if cacheContent and not p.config.disableContentCache:
17941822
# We cache content regardless of whether it is in our radius or not
@@ -1797,8 +1825,15 @@ proc storeContent*(
17971825
# Always re-check that the key is still in the node range to make sure only
17981826
# content in range is stored.
17991827
if p.inRange(contentId):
1800-
doAssert(p.dbPut != nil)
1801-
p.dbPut(contentKey, contentId, content)
1828+
let dbPruned = p.dbPut(contentKey, contentId, content)
1829+
if dbPruned:
1830+
# invalidate all cached content incase it was removed from the database
1831+
# during pruning
1832+
p.offerCache = OfferCache.init(p.offerCache.capacity)
1833+
1834+
if cacheOffer and not p.config.disableOfferCache:
1835+
p.offerCache.put(contentId, true)
1836+
18021837
true
18031838
else:
18041839
false

fluffy/network/wire/portal_protocol_config.nim

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ type
4444
maxGossipNodes*: int
4545
contentCacheSize*: int
4646
disableContentCache*: bool
47+
offerCacheSize*: int
48+
disableOfferCache*: bool
4749
maxConcurrentOffers*: int
4850
disableBanNodes*: bool
4951

@@ -52,8 +54,10 @@ const
5254
defaultRadiusConfigDesc* = $defaultRadiusConfig.kind
5355
defaultDisablePoke* = false
5456
defaultMaxGossipNodes* = 4
55-
defaultContentCacheSize* = 100
57+
defaultContentCacheSize* = 128
5658
defaultDisableContentCache* = false
59+
defaultOfferCacheSize* = 1024
60+
defaultDisableOfferCache* = false
5761
defaultMaxConcurrentOffers* = 50
5862
defaultAlpha* = 3
5963
revalidationTimeout* = chronos.seconds(30)
@@ -68,6 +72,8 @@ const
6872
maxGossipNodes: defaultMaxGossipNodes,
6973
contentCacheSize: defaultContentCacheSize,
7074
disableContentCache: defaultDisableContentCache,
75+
offerCacheSize: defaultOfferCacheSize,
76+
disableOfferCache: defaultDisableOfferCache,
7177
maxConcurrentOffers: defaultMaxConcurrentOffers,
7278
disableBanNodes: defaultDisableBanNodes,
7379
)
@@ -83,6 +89,8 @@ proc init*(
8389
maxGossipNodes: int,
8490
contentCacheSize: int,
8591
disableContentCache: bool,
92+
offerCacheSize: int,
93+
disableOfferCache: bool,
8694
maxConcurrentOffers: int,
8795
disableBanNodes: bool,
8896
): T =
@@ -96,6 +104,8 @@ proc init*(
96104
maxGossipNodes: maxGossipNodes,
97105
contentCacheSize: contentCacheSize,
98106
disableContentCache: disableContentCache,
107+
offerCacheSize: offerCacheSize,
108+
disableOfferCache: disableOfferCache,
99109
maxConcurrentOffers: maxConcurrentOffers,
100110
disableBanNodes: disableBanNodes,
101111
)

fluffy/rpc/rpc_portal_beacon_api.nim

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ proc installPortalBeaconApiHandlers*(rpcServer: RpcServer, p: PortalProtocol) =
115115
raise invalidKeyErr()
116116

117117
# TODO: Do we need to convert the received offer to a value without proofs before storing?
118-
119118
p.storeContent(keyBytes, contentId, offerValueBytes)
120119

121120
rpcServer.rpc("portal_beaconLocalContent") do(contentKey: string) -> string:

0 commit comments

Comments
 (0)