2828import synapse .lib .node as s_node
2929import synapse .lib .time as s_time
3030import synapse .lib .cache as s_cache
31+ import synapse .lib .const as s_const
3132import synapse .lib .queue as s_queue
3233import synapse .lib .scope as s_scope
3334import synapse .lib .msgpack as s_msgpack
@@ -6238,6 +6239,11 @@ class Layer(Prim):
62386239 {'name' : 'url' , 'type' : 'str' , 'desc' : 'A telepath URL of the target layer/feed.' , },
62396240 {'name' : 'offs' , 'type' : 'int' , 'desc' : 'The local layer offset to begin pushing from' ,
62406241 'default' : 0 , },
6242+ {'name' : 'queue_size' , 'type' : 'int' , 'desc' : 'The queue size of the pusher.' ,
6243+ 'default' : s_const .layer_pdef_qsize },
6244+ {'name' : 'chunk_size' , 'type' : 'int' ,
6245+ 'desc' : 'The chunk size of the pusher when pushing edits.' ,
6246+ 'default' : s_const .layer_pdef_csize }
62416247 ),
62426248 'returns' : {'type' : 'dict' , 'desc' : 'Dictionary containing the push definition.' , }}},
62436249 {'name' : 'delPush' , 'desc' : 'Remove a push config from the layer.' ,
@@ -6251,6 +6257,11 @@ class Layer(Prim):
62516257 'args' : (
62526258 {'name' : 'url' , 'type' : 'str' , 'desc' : 'The telepath URL to a layer/feed.' , },
62536259 {'name' : 'offs' , 'type' : 'int' , 'desc' : 'The offset to begin from.' , 'default' : 0 , },
6260+ {'name' : 'queue_size' , 'type' : 'int' , 'desc' : 'The queue size of the puller.' ,
6261+ 'default' : s_const .layer_pdef_qsize },
6262+ {'name' : 'chunk_size' , 'type' : 'int' ,
6263+ 'desc' : 'The chunk size of the puller when consuming edits.' ,
6264+ 'default' : s_const .layer_pdef_csize }
62546265 ),
62556266 'returns' : {'type' : 'dict' , 'desc' : 'Dictionary containing the pull definition.' , }}},
62566267 {'name' : 'delPull' , 'desc' : 'Remove a pull config from the layer.' ,
@@ -6568,9 +6579,11 @@ async def getMirrorStatus(self):
65686579 layr = self .runt .snap .core .getLayer (iden )
65696580 return await layr .getMirrorStatus ()
65706581
6571- async def _addPull (self , url , offs = 0 ):
6582+ async def _addPull (self , url , offs = 0 , queue_size = s_const . layer_pdef_qsize , chunk_size = s_const . layer_pdef_csize ):
65726583 url = await tostr (url )
65736584 offs = await toint (offs )
6585+ queue_size = await toint (queue_size )
6586+ chunk_size = await toint (chunk_size )
65746587
65756588 useriden = self .runt .user .iden
65766589 layriden = self .valu .get ('iden' )
@@ -6592,6 +6605,8 @@ async def _addPull(self, url, offs=0):
65926605 'user' : useriden ,
65936606 'time' : s_common .now (),
65946607 'iden' : s_common .guid (),
6608+ 'queue:size' : queue_size ,
6609+ 'chunk:size' : chunk_size ,
65956610 }
65966611 todo = s_common .todo ('addLayrPull' , layriden , pdef )
65976612 await self .runt .dyncall ('cortex' , todo )
@@ -6608,9 +6623,11 @@ async def _delPull(self, iden):
66086623 todo = s_common .todo ('delLayrPull' , layriden , iden )
66096624 await self .runt .dyncall ('cortex' , todo )
66106625
6611- async def _addPush (self , url , offs = 0 ):
6626+ async def _addPush (self , url , offs = 0 , queue_size = s_const . layer_pdef_qsize , chunk_size = s_const . layer_pdef_csize ):
66126627 url = await tostr (url )
66136628 offs = await toint (offs )
6629+ queue_size = await toint (queue_size )
6630+ chunk_size = await toint (chunk_size )
66146631
66156632 useriden = self .runt .user .iden
66166633 layriden = self .valu .get ('iden' )
@@ -6633,6 +6650,8 @@ async def _addPush(self, url, offs=0):
66336650 'user' : useriden ,
66346651 'time' : s_common .now (),
66356652 'iden' : s_common .guid (),
6653+ 'queue:size' : queue_size ,
6654+ 'chunk:size' : chunk_size ,
66366655 }
66376656 todo = s_common .todo ('addLayrPush' , layriden , pdef )
66386657 await self .runt .dyncall ('cortex' , todo )
0 commit comments