Skip to content

Commit e397079

Browse files
committed
fix: get pending jobs with postgres
1 parent 30db54d commit e397079

File tree

1 file changed

+16
-3
lines changed

1 file changed

+16
-3
lines changed

src/infra/src/file_list/postgres.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
use async_trait::async_trait;
1717
use config::{
1818
meta::stream::{FileKey, FileMeta, PartitionTimeLevel, StreamStats, StreamType},
19-
utils::parquet::parse_file_key_columns,
19+
utils::{hash::Sum64, parquet::parse_file_key_columns},
2020
};
2121
use hashbrown::HashMap;
2222
use sqlx::{Executor, Postgres, QueryBuilder, Row};
@@ -630,6 +630,20 @@ UPDATE stream_stats
630630
async fn get_pending_jobs(&self, node: &str, limit: i64) -> Result<Vec<super::MergeJobRecord>> {
631631
let pool = CLIENT.clone();
632632
let mut tx = pool.begin().await?;
633+
let lock_key = "file_list_jobs:get_pending_jobs";
634+
let lock_id = config::utils::hash::gxhash::new().sum64(lock_key);
635+
let lock_id = if lock_id > i64::MAX as u64 {
636+
(lock_id >> 1) as i64
637+
} else {
638+
lock_id as i64
639+
};
640+
let lock_sql = format!("SELECT pg_advisory_xact_lock({lock_id})");
641+
if let Err(e) = sqlx::query(&lock_sql).execute(&mut *tx).await {
642+
if let Err(e) = tx.rollback().await {
643+
log::error!("[POSTGRES] rollback get_pending_jobs error: {}", e);
644+
}
645+
return Err(e.into());
646+
};
633647
// get pending jobs group by stream and order by num desc
634648
let ret = match sqlx::query_as::<_, super::MergeJobPendingRecord>(
635649
r#"
@@ -638,8 +652,7 @@ SELECT stream, max(id) as id, COUNT(*)::BIGINT AS num
638652
WHERE status = $1
639653
GROUP BY stream
640654
ORDER BY num DESC
641-
LIMIT $2
642-
FOR UPDATE;"#,
655+
LIMIT $2;"#,
643656
)
644657
.bind(super::FileListJobStatus::Pending)
645658
.bind(limit)

0 commit comments

Comments
 (0)