@@ -108,19 +108,19 @@ def listen(self):
108108 (table , row ) = key .split (self .TABLE_NAME_SEPARATOR , 1 )
109109 if self .handlers .has_key (table ):
110110 client = self .get_redis_client (self .db_name )
111- data = self .__raw_to_typed (client .hgetall (key ))
111+ data = self .raw_to_typed (client .hgetall (key ))
112112 self .__fire (table , row , data )
113113 except ValueError :
114114 pass #Ignore non table-formated redis entries
115115
116- def __raw_to_typed (self , raw_data ):
117- if raw_data == None :
116+ def raw_to_typed (self , raw_data ):
117+ if raw_data is None :
118118 return None
119119 typed_data = {}
120120 for raw_key in raw_data :
121121 key = raw_key
122122 if PY3K :
123- key = raw_key .decode ('utf-8' )
123+ key = raw_key .decode ()
124124
125125 # "NULL:NULL" is used as a placeholder for objects with no attributes
126126 if key == "NULL" :
@@ -136,13 +136,13 @@ def __raw_to_typed(self, raw_data):
136136 typed_data [key [:- 1 ]] = value
137137 else :
138138 if PY3K :
139- typed_data [key ] = raw_data [raw_key ].decode ('utf-8' )
139+ typed_data [key ] = raw_data [raw_key ].decode ()
140140 else :
141141 typed_data [key ] = raw_data [raw_key ]
142142 return typed_data
143143
144- def __typed_to_raw (self , typed_data ):
145- if typed_data == None :
144+ def typed_to_raw (self , typed_data ):
145+ if typed_data is None :
146146 return None
147147 elif typed_data == {}:
148148 return { "NULL" : "NULL" }
@@ -183,11 +183,11 @@ def set_entry(self, table, key, data):
183183 key = self .serialize_key (key )
184184 client = self .get_redis_client (self .db_name )
185185 _hash = '{}{}{}' .format (table .upper (), self .TABLE_NAME_SEPARATOR , key )
186- if data == None :
186+ if data is None :
187187 client .delete (_hash )
188188 else :
189189 original = self .get_entry (table , key )
190- client .hmset (_hash , self .__typed_to_raw (data ))
190+ client .hmset (_hash , self .typed_to_raw (data ))
191191 for k in [ k for k in original .keys () if k not in data .keys () ]:
192192 if type (original [k ]) == list :
193193 k = k + '@'
@@ -205,10 +205,10 @@ def mod_entry(self, table, key, data):
205205 key = self .serialize_key (key )
206206 client = self .get_redis_client (self .db_name )
207207 _hash = '{}{}{}' .format (table .upper (), self .TABLE_NAME_SEPARATOR , key )
208- if data == None :
208+ if data is None :
209209 client .delete (_hash )
210210 else :
211- client .hmset (_hash , self .__typed_to_raw (data ))
211+ client .hmset (_hash , self .typed_to_raw (data ))
212212
213213 def get_entry (self , table , key ):
214214 """Read a table entry from config db.
@@ -222,7 +222,7 @@ def get_entry(self, table, key):
222222 key = self .serialize_key (key )
223223 client = self .get_redis_client (self .db_name )
224224 _hash = '{}{}{}' .format (table .upper (), self .TABLE_NAME_SEPARATOR , key )
225- return self .__raw_to_typed (client .hgetall (_hash ))
225+ return self .raw_to_typed (client .hgetall (_hash ))
226226
227227 def get_keys (self , table , split = True ):
228228 """Read all keys of a table from config db.
@@ -240,7 +240,7 @@ def get_keys(self, table, split=True):
240240 for key in keys :
241241 try :
242242 if PY3K :
243- key = key .decode ('utf-8' )
243+ key = key .decode ()
244244 if split :
245245 (_ , row ) = key .split (self .TABLE_NAME_SEPARATOR , 1 )
246246 data .append (self .deserialize_key (row ))
@@ -266,10 +266,10 @@ def get_table(self, table):
266266 data = {}
267267 for key in keys :
268268 try :
269- entry = self .__raw_to_typed (client .hgetall (key ))
269+ entry = self .raw_to_typed (client .hgetall (key ))
270270 if entry != None :
271271 if PY3K :
272- key = key .decode ('utf-8' )
272+ key = key .decode ()
273273 (_ , row ) = key .split (self .TABLE_NAME_SEPARATOR , 1 )
274274 data [self .deserialize_key (row )] = entry
275275 else :
@@ -325,13 +325,138 @@ def get_config(self):
325325 data = {}
326326 for key in keys :
327327 if PY3K :
328- key = key .decode ('utf-8' )
328+ key = key .decode ()
329329 try :
330330 (table_name , row ) = key .split (self .TABLE_NAME_SEPARATOR , 1 )
331- entry = self .__raw_to_typed (client .hgetall (key ))
331+ entry = self .raw_to_typed (client .hgetall (key ))
332332 if entry != None :
333333 data .setdefault (table_name , {})[self .deserialize_key (row )] = entry
334334 except ValueError :
335335 pass #Ignore non table-formated redis entries
336336 return data
337337
338+
339+ class ConfigDBPipeConnector (ConfigDBConnector ):
340+ REDIS_SCAN_BATCH_SIZE = 30
341+
342+ def __init__ (self , ** kwargs ):
343+ super (ConfigDBPipeConnector , self ).__init__ (** kwargs )
344+
345+ def __delete_entries (self , client , pipe , pattern , cursor ):
346+ """Helper method to delete table entries from config db using Redis pipeline
347+ with batch size of REDIS_SCAN_BATCH_SIZE.
348+ The caller should call pipeline execute once ready
349+ Args:
350+ client: Redis client
351+ pipe: Redis DB pipe
352+ pattern: key pattern
353+ cursor: position to start scanning from
354+
355+ Returns:
356+ cur: poition of next item to scan
357+ """
358+ cur , keys = client .scan (cursor = cursor , match = pattern , count = self .REDIS_SCAN_BATCH_SIZE )
359+ for key in keys :
360+ pipe .delete (key )
361+
362+ return cur
363+
364+ def __delete_table (self , client , pipe , table ):
365+ """Helper method to delete table entries from config db using Redis pipeline.
366+ The caller should call pipeline execute once ready
367+ Args:
368+ client: Redis client
369+ pipe: Redis DB pipe
370+ table: Table name.
371+ """
372+ pattern = '{}{}*' .format (table .upper (), self .TABLE_NAME_SEPARATOR )
373+ cur = self .__delete_entries (client , pipe , pattern , 0 )
374+ while cur != 0 :
375+ cur = self .__delete_entries (client , pipe , pattern , cur )
376+
377+ def __mod_entry (self , pipe , table , key , data ):
378+ """Modify a table entry to config db.
379+ Args:
380+ table: Table name.
381+ pipe: Redis DB pipe
382+ table: Table name.
383+ key: Key of table entry, or a tuple of keys if it is a multi-key table.
384+ data: Table row data in a form of dictionary {'column_key': 'value', ...}.
385+ Pass {} as data will create an entry with no column if not already existed.
386+ Pass None as data will delete the entry.
387+ """
388+ key = self .serialize_key (key )
389+ _hash = '{}{}{}' .format (table .upper (), self .TABLE_NAME_SEPARATOR , key )
390+ if data is None :
391+ pipe .delete (_hash )
392+ else :
393+ pipe .hmset (_hash , self .typed_to_raw (data ))
394+
395+ def mod_config (self , data ):
396+ """Write multiple tables into config db.
397+ Extra entries/fields in the db which are not in the data are kept.
398+ Args:
399+ data: config data in a dictionary form
400+ {
401+ 'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...},
402+ 'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...},
403+ ...
404+ }
405+ """
406+ client = self .get_redis_client (self .db_name )
407+ pipe = client .pipeline ()
408+ for table_name in data :
409+ table_data = data [table_name ]
410+ if table_data is None :
411+ self .__delete_table (client , pipe , table_name )
412+ continue
413+ for key in table_data :
414+ self .__mod_entry (pipe , table_name , key , table_data [key ])
415+ pipe .execute ()
416+ client .bgsave ()
417+
418+ def __get_config (self , client , pipe , data , cursor ):
419+ """Read config data in batches of size REDIS_SCAN_BATCH_SIZE using Redis pipelines
420+ Args:
421+ client: Redis client
422+ pipe: Redis DB pipe
423+ data: config dictionary
424+ cursor: position to start scanning from
425+
426+ Returns:
427+ cur: poition of next item to scan
428+ """
429+ cur , keys = client .scan (cursor = cursor , match = '*' , count = self .REDIS_SCAN_BATCH_SIZE )
430+ keys = [key .decode () for key in keys if key != self .INIT_INDICATOR ]
431+ for key in keys :
432+ pipe .hgetall (key )
433+ records = pipe .execute ()
434+
435+ for index , key in enumerate (keys ):
436+ (table_name , row ) = key .split (self .TABLE_NAME_SEPARATOR , 1 )
437+ entry = self .raw_to_typed (records [index ])
438+ if entry is not None :
439+ data .setdefault (table_name , {})[self .deserialize_key (row )] = entry
440+
441+ return cur
442+
443+ def get_config (self ):
444+ """Read all config data.
445+ Returns:
446+ Config data in a dictionary form of
447+ {
448+ 'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...},
449+ 'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...},
450+ ...
451+ }
452+ """
453+ client = self .get_redis_client (self .db_name )
454+ pipe = client .pipeline ()
455+ data = {}
456+
457+ cur = self .__get_config (client , pipe , data , 0 )
458+ while cur != 0 :
459+ cur = self .__get_config (client , pipe , data , cur )
460+
461+ return data
462+
0 commit comments