11use crate :: {
22 beacon_chain:: BeaconChainTypes ,
3- summaries_dag:: { DAGStateSummaryV22 , StateSummariesDAG } ,
3+ summaries_dag:: { DAGStateSummary , DAGStateSummaryV22 , StateSummariesDAG } ,
44} ;
5- use ssz:: Decode ;
5+ use ssz:: { Decode , Encode } ;
66use ssz_derive:: { Decode , Encode } ;
77use std:: {
88 sync:: Arc ,
99 time:: { Duration , Instant } ,
1010} ;
1111use store:: {
12- get_full_state_v22, hdiff:: StorageStrategy , hot_cold_store:: DiffBaseStateRoot , DBColumn , Error ,
13- HotColdDB , HotStateSummary , KeyValueStore , KeyValueStoreOp , StoreItem ,
12+ get_full_state_v22, hdiff:: StorageStrategy , hot_cold_store:: DiffBaseStateRoot ,
13+ store_full_state_v22, DBColumn , Error , HotColdDB , HotStateSummary , KeyValueStore ,
14+ KeyValueStoreOp , StoreItem ,
1415} ;
1516use tracing:: { debug, info, warn} ;
1617use types:: { EthSpec , Hash256 , Slot } ;
@@ -94,8 +95,8 @@ pub fn upgrade_to_v24<T: BeaconChainTypes>(
9495 slots_count = summaries_by_slot. len( ) ,
9596 min_slot = ?summaries_by_slot. first_key_value( ) . map( |( slot, _) | slot) ,
9697 max_slot = ?summaries_by_slot. last_key_value( ) . map( |( slot, _) | slot) ,
97- state_summaries_dag_roots = ?state_summaries_dag_roots,
98- hot_hdiff_start_slot = %hot_hdiff_start_slot,
98+ ?state_summaries_dag_roots,
99+ %hot_hdiff_start_slot,
99100 split_state_root = ?split. state_root,
100101 "Starting hot states migration"
101102 ) ;
@@ -178,21 +179,22 @@ pub fn upgrade_to_v24<T: BeaconChainTypes>(
178179 } ) ?
179180 } ;
180181
181- let diff_base_state_root =
182- if let Some ( diff_base_slot) = storage_strategy. diff_base_slot ( ) {
183- DiffBaseStateRoot :: new (
182+ let diff_base_state_root = if let Some ( diff_base_slot) =
183+ storage_strategy. diff_base_slot ( )
184+ {
185+ DiffBaseStateRoot :: new (
184186 diff_base_slot,
185187 state_summaries_dag
186188 . ancestor_state_root_at_slot ( state_root, diff_base_slot)
187189 . map_err ( |e| {
188190 Error :: MigrationError ( format ! (
189- "error computing ancestor_state_root_at_slot {e:?}"
191+ "error computing ancestor_state_root_at_slot({state_root:?}, {diff_base_slot}) {e:?}"
190192 ) )
191193 } ) ?,
192194 )
193- } else {
194- DiffBaseStateRoot :: zero ( )
195- } ;
195+ } else {
196+ DiffBaseStateRoot :: zero ( )
197+ } ;
196198
197199 let new_summary = HotStateSummary {
198200 slot,
@@ -227,8 +229,8 @@ pub fn upgrade_to_v24<T: BeaconChainTypes>(
227229 if last_log_time. elapsed ( ) > Duration :: from_secs ( 5 ) {
228230 last_log_time = Instant :: now ( ) ;
229231 info ! (
230- diffs_written = diffs_written ,
231- summaries_written = summaries_written ,
232+ diffs_written,
233+ summaries_written,
232234 summaries_count = state_summaries_dag. summaries_count( ) ,
233235 "Hot states migration in progress"
234236 ) ;
@@ -239,8 +241,8 @@ pub fn upgrade_to_v24<T: BeaconChainTypes>(
239241 // TODO(hdiff): Should run hot DB compaction after deleting potentially a lot of states. Or should wait
240242 // for the next finality event?
241243 info ! (
242- diffs_written = diffs_written ,
243- summaries_written = summaries_written ,
244+ diffs_written,
245+ summaries_written,
244246 summaries_count = state_summaries_dag. summaries_count( ) ,
245247 "Hot states migration complete"
246248 ) ;
@@ -249,10 +251,100 @@ pub fn upgrade_to_v24<T: BeaconChainTypes>(
249251}
250252
251253pub fn downgrade_from_v24 < T : BeaconChainTypes > (
252- _db : Arc < HotColdDB < T :: EthSpec , T :: HotStore , T :: ColdStore > > ,
254+ db : Arc < HotColdDB < T :: EthSpec , T :: HotStore , T :: ColdStore > > ,
253255) -> Result < Vec < KeyValueStoreOp > , Error > {
254- // TODO(hdiff): proper error
255- panic ! ( "downgrade not supported" ) ;
256+ let state_summaries = db
257+ . load_hot_state_summaries ( ) ?
258+ . into_iter ( )
259+ . map ( |( state_root, summary) | ( state_root, summary. into ( ) ) )
260+ . collect :: < Vec < ( Hash256 , DAGStateSummary ) > > ( ) ;
261+
262+ info ! (
263+ summaries_count = state_summaries. len( ) ,
264+ "DB downgrade of v24 state summaries started"
265+ ) ;
266+
267+ let state_summaries_dag = StateSummariesDAG :: new ( state_summaries)
268+ . map_err ( |e| Error :: MigrationError ( format ! ( "Error on new StateSumariesDAG {e:?}" ) ) ) ?;
269+
270+ let mut migrate_ops = vec ! [ ] ;
271+ let mut states_written = 0 ;
272+ let mut summaries_written = 0 ;
273+ let mut last_log_time = Instant :: now ( ) ;
274+
275+ // TODO(tree-states): What about the anchor_slot? Is it safe to run the prior version of
276+ // Lighthouse with an a higher anchor_slot than expected?
277+
278+ for ( state_root, summary) in state_summaries_dag
279+ . summaries_by_slot_ascending ( )
280+ . into_iter ( )
281+ . flat_map ( |( _, summaries) | summaries)
282+ {
283+ // If boundary state persist
284+ if summary. slot % T :: EthSpec :: slots_per_epoch ( ) == 0 {
285+ let ( state, _) = db
286+ . load_hot_state ( & state_root) ?
287+ . ok_or ( Error :: MissingState ( state_root) ) ?;
288+
289+ // Immediately commit the state. Otherwise we will OOM and it's stored in a different
290+ // column. So if the migration crashes we just get extra harmless junk in the DB.
291+ let mut state_write_ops = vec ! [ ] ;
292+ store_full_state_v22 ( & state_root, & state, & mut state_write_ops) ?;
293+ db. hot_db . do_atomically ( state_write_ops) ?;
294+ states_written += 1 ;
295+ }
296+
297+ // Persist old summary
298+ let epoch_boundary_state_slot = summary. slot - summary. slot % T :: EthSpec :: slots_per_epoch ( ) ;
299+ let old_summary = HotStateSummaryV22 {
300+ slot : summary. slot ,
301+ latest_block_root : summary. latest_block_root ,
302+ epoch_boundary_state_root : state_summaries_dag
303+ . ancestor_state_root_at_slot ( state_root, epoch_boundary_state_slot)
304+ . map_err ( |e| {
305+ Error :: MigrationError ( format ! (
306+ "error computing ancestor_state_root_at_slot({state_root:?}, {epoch_boundary_state_slot}) {e:?}"
307+ ) )
308+ } ) ?,
309+ } ;
310+ migrate_ops. push ( KeyValueStoreOp :: PutKeyValue (
311+ DBColumn :: BeaconStateSummary ,
312+ state_root. as_slice ( ) . to_vec ( ) ,
313+ old_summary. as_ssz_bytes ( ) ,
314+ ) ) ;
315+ summaries_written += 1 ;
316+
317+ // Delete existing data
318+ for db_column in [
319+ DBColumn :: BeaconStateHotSummary ,
320+ DBColumn :: BeaconStateHotDiff ,
321+ DBColumn :: BeaconStateHotSnapshot ,
322+ ] {
323+ migrate_ops. push ( KeyValueStoreOp :: DeleteKey (
324+ db_column,
325+ state_root. as_slice ( ) . to_vec ( ) ,
326+ ) ) ;
327+ }
328+
329+ if last_log_time. elapsed ( ) > Duration :: from_secs ( 5 ) {
330+ last_log_time = Instant :: now ( ) ;
331+ info ! (
332+ states_written,
333+ summaries_written,
334+ summaries_count = state_summaries_dag. summaries_count( ) ,
335+ "DB downgrade of v24 state summaries in progress"
336+ ) ;
337+ }
338+ }
339+
340+ info ! (
341+ states_written,
342+ summaries_written,
343+ summaries_count = state_summaries_dag. summaries_count( ) ,
344+ "DB downgrade of v24 state summaries completed"
345+ ) ;
346+
347+ Ok ( migrate_ops)
256348}
257349
258350fn new_dag < T : BeaconChainTypes > (
0 commit comments