0% found this document useful (0 votes)
13 views3 pages

Functions

The document defines two PostgreSQL functions for managing data warehouse operations. The first function, 'dwh_dynamic_insert_update_delete', handles inserting, updating, and deleting records between source and target tables while logging the changes. The second function, 'process_job_queue', processes jobs from a queue, executing the first function for each pending job and updating their status accordingly.

Uploaded by

rafreenimam
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
13 views3 pages

Functions

The document defines two PostgreSQL functions for managing data warehouse operations. The first function, 'dwh_dynamic_insert_update_delete', handles inserting, updating, and deleting records between source and target tables while logging the changes. The second function, 'process_job_queue', processes jobs from a queue, executing the first function for each pending job and updating their status accordingly.

Uploaded by

rafreenimam
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as TXT, PDF, TXT or read online on Scribd

-- DROP FUNCTION datawarehouse.

dwh_dynamic_insert_update_delete(text, text, text,


text, text, text);

CREATE OR REPLACE FUNCTION


datawarehouse.dwh_dynamic_insert_update_delete(source_schema text, source_table
text, target_schema text, target_table text, unique_key text, hash_column text)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
inserted_count integer;
updated_count integer;
deleted_count integer;
BEGIN
-- Generate a dynamic SQL statement for inserting new records
EXECUTE format('
INSERT INTO %I.%I
SELECT
st.*,
''Y'' AS wa_iscurr,
''N'' AS wa_isdel,
nextval(''datawarehouse.wk_root'')::bigint AS wk_root,
current_timestamp AS load_date
FROM
%I.%I AS st
LEFT JOIN
%I.%I AS dw ON st.%I = dw.%I
WHERE
dw.%I IS NULL;
', target_schema, target_table, source_schema, source_table, target_schema,
target_table, unique_key, unique_key, unique_key);

GET DIAGNOSTICS inserted_count = ROW_COUNT;


-- Generate a dynamic SQL statement for updating records and closing previous
versions
EXECUTE format('
UPDATE
%I.%I AS dw
SET
wa_iscurr = ''N''
FROM
%I.%I AS st
WHERE
dw.%I = st.%I
AND dw.%I <> st.%I
AND EXISTS (
SELECT 1
FROM
%I.%I AS inserted
WHERE
inserted.wa_iscurr = ''Y''
AND inserted.%I = st.%I
);
', target_schema, target_table, source_schema, source_table, unique_key,
unique_key, hash_column, hash_column, target_schema, target_table, unique_key,
unique_key);

-- Generate a dynamic SQL statement for inserting updated records


EXECUTE format('
INSERT INTO %I.%I
SELECT
st.*,
''Y'' AS wa_iscurr,
''N'' AS wa_isdel,
nextval(''datawarehouse.wk_root'')::bigint AS wk_root,
current_timestamp AS load_date
FROM
%I.%I AS st
left join
%I.%I as dw on dw.%I = st.%I
where dw.hash_column <> st.hash_column
;
', target_schema, target_table, source_schema, source_table, target_schema,
target_table, unique_key, unique_key, hash_column, hash_column);

GET DIAGNOSTICS updated_count = ROW_COUNT;


-- Generate a dynamic SQL statement for updating records that are deleted in
the staging
EXECUTE format('
UPDATE
%I.%I AS dw
SET
wa_isdel = ''Y'', wa_iscurr = ''N''
WHERE
NOT EXISTS (
SELECT 1
FROM
%I.%I AS inserted
WHERE
inserted.wa_iscurr = ''Y''
AND inserted.%I = dw.%I
);
', target_schema, target_table, target_schema, target_table, unique_key,
unique_key);

GET DIAGNOSTICS deleted_count = ROW_COUNT;


-- Insert the get diagnostics into a log table so we can always check what
happend regarding in amount of records. (table is log_gegevens)
INSERT INTO datawarehouse.log_gegevens (table_name, inserted_count,
updated_count, deleted_count, load_time)
VALUES (target_table, inserted_count, updated_count, deleted_count,
current_timestamp);

END;
$function$
;

-- DROP FUNCTION datawarehouse.process_job_queue();

CREATE OR REPLACE FUNCTION datawarehouse.process_job_queue()


RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
remaining_jobs INTEGER;
curr_job RECORD;
BEGIN
-- Set all jobs to 'Pending' if all are 'Completed' (to prepare for the next
run)
SELECT COUNT(*) INTO remaining_jobs FROM job_queue_dwh WHERE status =
'Completed'
and target_table like 'dwh_wender%';

IF remaining_jobs = 0 THEN
UPDATE job_queue_dwh SET status = 'Pending';
END IF;

LOOP
SELECT * FROM job_queue_dwh WHERE status = 'Pending' ORDER BY id LIMIT 1
FOR UPDATE SKIP LOCKED INTO curr_job;

IF NOT FOUND THEN


EXIT; -- Exit loop when no more 'Pending' jobs found
END IF;

-- Execute dwh_dynamic_insert_update_delete_2 function using job details


PERFORM
datawarehouse.dwh_dynamic_insert_update_delete_2(curr_job.source_schema,
curr_job.source_table, curr_job.target_schema, curr_job.target_table,
curr_job.unique_key, curr_job.hash_column, curr_job.unique_key2,
curr_job.unique_key3);

-- Update job status to indicate completion


UPDATE job_queue_dwh SET status = 'Completed' WHERE id = curr_job.id;
END LOOP;
END;
$function$
;

You might also like