Skip to content

Commit 20fd23a

Browse files
authored
Make layer pulls / push configurations tunable (#3480)
1 parent ff60cb2 commit 20fd23a

File tree

7 files changed

+133
-22
lines changed

7 files changed

+133
-22
lines changed

synapse/cortex.py

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import synapse.lib.hive as s_hive
2424
import synapse.lib.view as s_view
2525
import synapse.lib.cache as s_cache
26+
import synapse.lib.const as s_const
2627
import synapse.lib.layer as s_layer
2728
import synapse.lib.nexus as s_nexus
2829
import synapse.lib.oauth as s_oauth
@@ -110,20 +111,6 @@
110111
SYNC_LAYR_ADD = 3 # A layer was added
111112
SYNC_LAYR_DEL = 4 # A layer was deleted
112113

113-
# push/pull def
114-
reqValidPush = s_config.getJsValidator({
115-
'type': 'object',
116-
'properties': {
117-
'url': {'type': 'string'},
118-
'time': {'type': 'number'},
119-
'iden': {'type': 'string', 'pattern': s_config.re_iden},
120-
'user': {'type': 'string', 'pattern': s_config.re_iden},
121-
},
122-
'additionalProperties': True,
123-
'required': ['iden', 'url', 'user', 'time'],
124-
})
125-
reqValidPull = reqValidPush
126-
127114
reqValidTagModel = s_config.getJsValidator({
128115
'type': 'object',
129116
'properties': {
@@ -1024,6 +1011,7 @@ async def initServiceStorage(self):
10241011

10251012
await self._bumpCellVers('cortex:storage', (
10261013
(1, self._storUpdateMacros),
1014+
(2, self._storLayrFeedDefaults),
10271015
), nexs=False)
10281016

10291017
async def _storUpdateMacros(self):
@@ -1415,6 +1403,26 @@ async def _addAllLayrRead(self):
14151403
role = await self.auth.getRoleByName('all')
14161404
await role.addRule((True, ('layer', 'read')), gateiden=layriden)
14171405

1406+
async def _storLayrFeedDefaults(self):
1407+
1408+
for layer in list(self.layers.values()):
1409+
layrinfo = layer.layrinfo # type: s_hive.HiveDict
1410+
1411+
pushs = layrinfo.get('pushs', {})
1412+
if pushs:
1413+
for pdef in pushs.values():
1414+
pdef.setdefault('chunk:size', s_const.layer_pdef_csize)
1415+
pdef.setdefault('queue:size', s_const.layer_pdef_qsize)
1416+
await layrinfo.set('pushs', pushs, nexs=False)
1417+
1418+
pulls = layrinfo.get('pulls', {})
1419+
if pulls:
1420+
pulls = layrinfo.get('pulls', {})
1421+
for pdef in pulls.values():
1422+
pdef.setdefault('chunk:size', s_const.layer_pdef_csize)
1423+
pdef.setdefault('queue:size', s_const.layer_pdef_qsize)
1424+
await layrinfo.set('pulls', pulls, nexs=False)
1425+
14181426
async def initServiceRuntime(self):
14191427

14201428
# do any post-nexus initialization here...
@@ -4740,7 +4748,7 @@ async def _initCoreLayers(self):
47404748
@s_nexus.Pusher.onPushAuto('layer:push:add')
47414749
async def addLayrPush(self, layriden, pdef):
47424750

4743-
reqValidPush(pdef)
4751+
s_schemas.reqValidPush(pdef)
47444752

47454753
iden = pdef.get('iden')
47464754

@@ -4782,7 +4790,7 @@ async def delLayrPush(self, layriden, pushiden):
47824790
@s_nexus.Pusher.onPushAuto('layer:pull:add')
47834791
async def addLayrPull(self, layriden, pdef):
47844792

4785-
reqValidPull(pdef)
4793+
s_schemas.reqValidPull(pdef)
47864794

47874795
iden = pdef.get('iden')
47884796

@@ -4849,12 +4857,14 @@ async def _pushBulkEdits(self, layr0, layr1, pdef):
48494857

48504858
iden = pdef.get('iden')
48514859
user = pdef.get('user')
4852-
48534860
gvar = f'push:{iden}'
4861+
# TODO Remove the defaults in 3.0.0
4862+
csize = pdef.get('chunk:size', s_const.layer_pdef_csize)
4863+
qsize = pdef.get('queue:size', s_const.layer_pdef_qsize)
48544864

48554865
async with await s_base.Base.anit() as base:
48564866

4857-
queue = s_queue.Queue(maxsize=10000)
4867+
queue = s_queue.Queue(maxsize=qsize)
48584868

48594869
async def fill():
48604870

@@ -4881,7 +4891,7 @@ async def fill():
48814891
for offs, edits in chunk:
48824892
# prevent push->push->push nodeedits growth
48834893
alledits.extend(edits)
4884-
if len(alledits) > 1000:
4894+
if len(alledits) > csize:
48854895
await layr1.storNodeEdits(alledits, meta)
48864896
await self.setStormVar(gvar, offs)
48874897
alledits.clear()

synapse/lib/const.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,11 @@
3939
week = day * 7
4040
month = day * 30
4141
year = day * 365
42+
43+
# function specific constants shared across multiple files
44+
layer_pdef_csize = 1_000
45+
layer_pdef_qsize = 10_000
46+
layer_pdef_csize_max = 1_000
47+
layer_pdef_qsize_max = 10_000
48+
assert layer_pdef_csize <= layer_pdef_csize_max
49+
assert layer_pdef_qsize <= layer_pdef_qsize_max

synapse/lib/schemas.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import synapse.lib.const as s_const
12
import synapse.lib.config as s_config
23
import synapse.lib.msgpack as s_msgpack
34

@@ -65,6 +66,25 @@
6566

6667
reqValidHttpExtAPIConf = s_config.getJsValidator(_HttpExtAPIConfSchema)
6768

69+
_LayerPushPullSchema = {
70+
'type': 'object',
71+
'properties': {
72+
'url': {'type': 'string'},
73+
'time': {'type': 'number'},
74+
'iden': {'type': 'string', 'pattern': s_config.re_iden},
75+
'user': {'type': 'string', 'pattern': s_config.re_iden},
76+
'queue:size': {'type': 'integer', 'default': s_const.layer_pdef_qsize,
77+
'minimum': 1, 'maximum': s_const.layer_pdef_qsize_max},
78+
'chunk:size': {'type': 'integer', 'default': s_const.layer_pdef_csize,
79+
'minimum': 1, 'maximum': s_const.layer_pdef_csize_max}
80+
81+
},
82+
'additionalProperties': True,
83+
'required': ['iden', 'url', 'user', 'time'],
84+
}
85+
reqValidPush = s_config.getJsValidator(_LayerPushPullSchema)
86+
reqValidPull = reqValidPush
87+
6888
_CronJobSchema = {
6989
'type': 'object',
7090
'properties': {

synapse/lib/stormtypes.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import synapse.lib.node as s_node
2929
import synapse.lib.time as s_time
3030
import synapse.lib.cache as s_cache
31+
import synapse.lib.const as s_const
3132
import synapse.lib.queue as s_queue
3233
import synapse.lib.scope as s_scope
3334
import synapse.lib.msgpack as s_msgpack
@@ -6238,6 +6239,11 @@ class Layer(Prim):
62386239
{'name': 'url', 'type': 'str', 'desc': 'A telepath URL of the target layer/feed.', },
62396240
{'name': 'offs', 'type': 'int', 'desc': 'The local layer offset to begin pushing from',
62406241
'default': 0, },
6242+
{'name': 'queue_size', 'type': 'int', 'desc': 'The queue size of the pusher.',
6243+
'default': s_const.layer_pdef_qsize},
6244+
{'name': 'chunk_size', 'type': 'int',
6245+
'desc': 'The chunk size of the pusher when pushing edits.',
6246+
'default': s_const.layer_pdef_csize}
62416247
),
62426248
'returns': {'type': 'dict', 'desc': 'Dictionary containing the push definition.', }}},
62436249
{'name': 'delPush', 'desc': 'Remove a push config from the layer.',
@@ -6251,6 +6257,11 @@ class Layer(Prim):
62516257
'args': (
62526258
{'name': 'url', 'type': 'str', 'desc': 'The telepath URL to a layer/feed.', },
62536259
{'name': 'offs', 'type': 'int', 'desc': 'The offset to begin from.', 'default': 0, },
6260+
{'name': 'queue_size', 'type': 'int', 'desc': 'The queue size of the puller.',
6261+
'default': s_const.layer_pdef_qsize},
6262+
{'name': 'chunk_size', 'type': 'int',
6263+
'desc': 'The chunk size of the puller when consuming edits.',
6264+
'default': s_const.layer_pdef_csize}
62546265
),
62556266
'returns': {'type': 'dict', 'desc': 'Dictionary containing the pull definition.', }}},
62566267
{'name': 'delPull', 'desc': 'Remove a pull config from the layer.',
@@ -6568,9 +6579,11 @@ async def getMirrorStatus(self):
65686579
layr = self.runt.snap.core.getLayer(iden)
65696580
return await layr.getMirrorStatus()
65706581

6571-
async def _addPull(self, url, offs=0):
6582+
async def _addPull(self, url, offs=0, queue_size=s_const.layer_pdef_qsize, chunk_size=s_const.layer_pdef_csize):
65726583
url = await tostr(url)
65736584
offs = await toint(offs)
6585+
queue_size = await toint(queue_size)
6586+
chunk_size = await toint(chunk_size)
65746587

65756588
useriden = self.runt.user.iden
65766589
layriden = self.valu.get('iden')
@@ -6592,6 +6605,8 @@ async def _addPull(self, url, offs=0):
65926605
'user': useriden,
65936606
'time': s_common.now(),
65946607
'iden': s_common.guid(),
6608+
'queue:size': queue_size,
6609+
'chunk:size': chunk_size,
65956610
}
65966611
todo = s_common.todo('addLayrPull', layriden, pdef)
65976612
await self.runt.dyncall('cortex', todo)
@@ -6608,9 +6623,11 @@ async def _delPull(self, iden):
66086623
todo = s_common.todo('delLayrPull', layriden, iden)
66096624
await self.runt.dyncall('cortex', todo)
66106625

6611-
async def _addPush(self, url, offs=0):
6626+
async def _addPush(self, url, offs=0, queue_size=s_const.layer_pdef_qsize, chunk_size=s_const.layer_pdef_csize):
66126627
url = await tostr(url)
66136628
offs = await toint(offs)
6629+
queue_size = await toint(queue_size)
6630+
chunk_size = await toint(chunk_size)
66146631

66156632
useriden = self.runt.user.iden
66166633
layriden = self.valu.get('iden')
@@ -6633,6 +6650,8 @@ async def _addPush(self, url, offs=0):
66336650
'user': useriden,
66346651
'time': s_common.now(),
66356652
'iden': s_common.guid(),
6653+
'queue:size': queue_size,
6654+
'chunk:size': chunk_size,
66366655
}
66376656
todo = s_common.todo('addLayrPush', layriden, pdef)
66386657
await self.runt.dyncall('cortex', todo)

synapse/tests/test_lib_layer.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1972,3 +1972,33 @@ async def test_layer_readonly_new(self):
19721972
writelayr = core.getLayer(writeidens[0])
19731973

19741974
self.eq(readlayr.meta.get('version'), writelayr.meta.get('version'))
1975+
1976+
async def test_push_pull_default_migration(self):
1977+
async with self.getRegrCore('2.159.0-layr-pdefs') as core:
1978+
def_tree = await core.saveHiveTree(('cortex', 'layers', '507ebf7e6ec7aadc47ace6f1f8f77954'))
1979+
dst_tree = await core.saveHiveTree(('cortex', 'layers', '9bf7a3adbf69bd16832529ab1fcd1c83'))
1980+
1981+
epulls = {'value':
1982+
{'28cb757e9e390a234822f55b922f3295':
1983+
{'chunk:size': 1000,
1984+
'iden': '28cb757e9e390a234822f55b922f3295',
1985+
'offs': 0,
1986+
'queue:size': 10000,
1987+
'time': 1703781215891,
1988+
'url': 'cell://./cells/pdefmigr00:*/layer/9bf7a3adbf69bd16832529ab1fcd1c83',
1989+
'user': '1d8e6e87a2931f8d27690ff408debdab'}}}
1990+
epushs = {'value':
1991+
{'e112f93f09e43f3a10ae945b84721778':
1992+
{'chunk:size': 1000,
1993+
'iden': 'e112f93f09e43f3a10ae945b84721778',
1994+
'offs': 0,
1995+
'queue:size': 10000,
1996+
'time': 1703781208684,
1997+
'url': 'cell://./cells/pdefmigr00:*/layer/9bf7a3adbf69bd16832529ab1fcd1c83',
1998+
'user': '1d8e6e87a2931f8d27690ff408debdab'}}}
1999+
2000+
self.eq(def_tree.get('kids').get('pulls'), epulls)
2001+
self.eq(def_tree.get('kids').get('pushs'), epushs)
2002+
2003+
self.notin('pulls', dst_tree.get('kids'))
2004+
self.notin('pushs', dst_tree.get('kids'))

synapse/tests/test_lib_storm.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3580,14 +3580,17 @@ async def test_storm_pushpull(self):
35803580
view0, layr0 = await core.callStorm('$view = $lib.view.get().fork() return(($view.iden, $view.layers.0.iden))')
35813581
view1, layr1 = await core.callStorm('$view = $lib.view.get().fork() return(($view.iden, $view.layers.0.iden))')
35823582
view2, layr2 = await core.callStorm('$view = $lib.view.get().fork() return(($view.iden, $view.layers.0.iden))')
3583+
view3, layr3 = await core.callStorm('$view = $lib.view.get().fork() return(($view.iden, $view.layers.0.iden))')
35833584

35843585
opts = {'vars': {
35853586
'view0': view0,
35863587
'view1': view1,
35873588
'view2': view2,
3589+
'view3': view3,
35883590
'layr0': layr0,
35893591
'layr1': layr1,
35903592
'layr2': layr2,
3593+
'layr3': layr3,
35913594
}}
35923595

35933596
# lets get some auth denies...
@@ -3729,6 +3732,25 @@ async def test_storm_pushpull(self):
37293732
msgs = await core.stormlist('layer.pull.list $layr2', opts=opts)
37303733
self.stormIsInPrint('No pulls configured', msgs)
37313734

3735+
# Add slow pushers
3736+
q = f'''$url="tcp://root:[email protected]:{port}/*/layer/{layr3}"
3737+
$pdef = $lib.layer.get($layr0).addPush($url, queue_size=10, chunk_size=1)
3738+
return($pdef.iden)'''
3739+
slowpush = await core.callStorm(q, opts=opts)
3740+
q = f'''$url="tcp://root:[email protected]:{port}/*/layer/{layr0}"
3741+
$pdef = $lib.layer.get($layr3).addPull($url, queue_size=20, chunk_size=10)
3742+
return($pdef.iden)'''
3743+
slowpull = await core.callStorm(q, opts=opts)
3744+
3745+
pushs = await core.callStorm('return($lib.layer.get($layr0).get(pushs))', opts=opts)
3746+
self.isin(slowpush, pushs)
3747+
3748+
pulls = await core.callStorm('return($lib.layer.get($layr3).get(pulls))', opts=opts)
3749+
self.isin(slowpull, pulls)
3750+
3751+
self.none(await core.callStorm(f'return($lib.layer.get($layr0).delPush({slowpush}))', opts=opts))
3752+
self.none(await core.callStorm(f'return($lib.layer.get($layr3).delPull({slowpull}))', opts=opts))
3753+
37323754
# add a push/pull and remove the layer to cancel it...
37333755
await core.callStorm(f'$lib.layer.get($layr0).addPush("tcp://root:[email protected]:{port}/*/layer/{layr1}")', opts=opts)
37343756
await core.callStorm(f'$lib.layer.get($layr2).addPull("tcp://root:[email protected]:{port}/*/layer/{layr1}")', opts=opts)
@@ -3750,9 +3772,11 @@ async def test_storm_pushpull(self):
37503772
await core.callStorm('$lib.view.del($view0)', opts=opts)
37513773
await core.callStorm('$lib.view.del($view1)', opts=opts)
37523774
await core.callStorm('$lib.view.del($view2)', opts=opts)
3775+
await core.callStorm('$lib.view.del($view3)', opts=opts)
37533776
await core.callStorm('$lib.layer.del($layr0)', opts=opts)
37543777
await core.callStorm('$lib.layer.del($layr1)', opts=opts)
37553778
await core.callStorm('$lib.layer.del($layr2)', opts=opts)
3779+
await core.callStorm('$lib.layer.del($layr3)', opts=opts)
37563780

37573781
# Wait for the active coros to die
37583782
for task in [t for t in tasks if t is not None]:

synapse/tests/test_lib_stormtypes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6420,7 +6420,7 @@ async def test_view_quorum(self):
64206420
opts = {'view': fork.iden, 'user': visi.iden}
64216421
await core.callStorm('return($lib.view.get().setMergeVote())', opts=opts)
64226422

6423-
msgs = await waiter.wait(timeout=3)
6423+
msgs = await waiter.wait(timeout=12)
64246424
self.eq(msgs[-2][1]['event'], 'view:merge:prog')
64256425
self.eq(msgs[-1][1]['event'], 'view:merge:fini')
64266426

0 commit comments

Comments
 (0)