1616use async_trait:: async_trait;
1717use config:: {
1818 meta:: stream:: { FileKey , FileMeta , PartitionTimeLevel , StreamStats , StreamType } ,
19- utils:: parquet:: parse_file_key_columns,
19+ utils:: { hash :: Sum64 , parquet:: parse_file_key_columns} ,
2020} ;
2121use hashbrown:: HashMap ;
2222use sqlx:: { Executor , Postgres , QueryBuilder , Row } ;
@@ -630,6 +630,20 @@ UPDATE stream_stats
630630 async fn get_pending_jobs ( & self , node : & str , limit : i64 ) -> Result < Vec < super :: MergeJobRecord > > {
631631 let pool = CLIENT . clone ( ) ;
632632 let mut tx = pool. begin ( ) . await ?;
633+ let lock_key = "file_list_jobs:get_pending_jobs" ;
634+ let lock_id = config:: utils:: hash:: gxhash:: new ( ) . sum64 ( lock_key) ;
635+ let lock_id = if lock_id > i64:: MAX as u64 {
636+ ( lock_id >> 1 ) as i64
637+ } else {
638+ lock_id as i64
639+ } ;
640+ let lock_sql = format ! ( "SELECT pg_advisory_xact_lock({lock_id})" ) ;
641+ if let Err ( e) = sqlx:: query ( & lock_sql) . execute ( & mut * tx) . await {
642+ if let Err ( e) = tx. rollback ( ) . await {
643+ log:: error!( "[POSTGRES] rollback get_pending_jobs error: {}" , e) ;
644+ }
645+ return Err ( e. into ( ) ) ;
646+ } ;
633647 // get pending jobs group by stream and order by num desc
634648 let ret = match sqlx:: query_as :: < _ , super :: MergeJobPendingRecord > (
635649 r#"
@@ -638,8 +652,7 @@ SELECT stream, max(id) as id, COUNT(*)::BIGINT AS num
638652 WHERE status = $1
639653 GROUP BY stream
640654 ORDER BY num DESC
641- LIMIT $2
642- FOR UPDATE;"# ,
655+ LIMIT $2;"# ,
643656 )
644657 . bind ( super :: FileListJobStatus :: Pending )
645658 . bind ( limit)
0 commit comments