/*
What is Snowpipe?
-- Snowpipe enables loading data from files as soon as they’re available in a
stage.
-- This means you can load data from files in micro-batches, making it available to
users within minutes,
rather than manually executing COPY statements on a schedule to load larger
batches.
*/
-- Create table to load CSV data
CREATE or replace TABLE HEALTHCARE_CSV(
AVERAGE_COVERED_CHARGES NUMBER(38,6)
,AVERAGE_TOTAL_PAYMENTS NUMBER(38,6)
,TOTAL_DISCHARGES NUMBER(38,0)
,BACHELORORHIGHER NUMBER(38,1)
,HSGRADORHIGHER NUMBER(38,1)
,TOTALPAYMENTS VARCHAR(128)
,REIMBURSEMENT VARCHAR(128)
,TOTAL_COVERED_CHARGES VARCHAR(128)
,REFERRALREGION_PROVIDER_NAME VARCHAR(256)
,REIMBURSEMENTPERCENTAGE NUMBER(38,9)
,DRG_DEFINITION VARCHAR(256)
,REFERRAL_REGION VARCHAR(26)
,INCOME_PER_CAPITA NUMBER(38,0)
,MEDIAN_EARNINGSBACHELORS NUMBER(38,0)
,MEDIAN_EARNINGS_GRADUATE NUMBER(38,0)
,MEDIAN_EARNINGS_HS_GRAD NUMBER(38,0)
,MEDIAN_EARNINGSLESS_THAN_HS NUMBER(38,0)
,MEDIAN_FAMILY_INCOME NUMBER(38,0)
,NUMBER_OF_RECORDS NUMBER(38,0)
,POP_25_OVER NUMBER(38,0)
,PROVIDER_CITY VARCHAR(128)
,PROVIDER_ID NUMBER(38,0)
,PROVIDER_NAME VARCHAR(256)
,PROVIDER_STATE VARCHAR(128)
,PROVIDER_STREET_ADDRESS VARCHAR(256)
,PROVIDER_ZIP_CODE NUMBER(38,0)
);
--Create integration object for external stage
create or replace storage integration s3_int
type = external_stage
storage_provider = s3
enabled = true
storage_aws_role_arn = 'arn:aws:iam::435098453023:role/snowflake-role'
storage_allowed_locations = ('s3://testsnowflake/snowflake/',
's3://testxyzsnowflake/');
--Describe integration object to fetch external_id and to be used in s3
DESC INTEGRATION s3_int;
create or replace file format demo_db.public.csv_format
type = csv
field_delimiter = '|'
skip_header = 1
null_if = ('NULL', 'null')
empty_field_as_null = true;
create or replace stage demo_db.public.ext_csv_stage
URL = 's3://testsnowflake/snowflake/csv'
STORAGE_INTEGRATION = s3_int
file_format = demo_db.public.csv_format;
--create pipe to automate data ingestion from s3 to snowflake
create or replace pipe demo_db.public.mypipe auto_ingest=true as
copy into healthcare_csv
from @demo_db.public.ext_csv_stage
on_error = CONTINUE;
show pipes;
select * from healthcare_csv;