Migrating data from HDFS to Big Query
This guide will walk you through the entire process of migrating 500 Hive external tables in
Parquet format to BigQuery using Google Cloud services such as Google Cloud Storage
(GCS), Dataflow, Dataform, and Cloud Composer.
Step 1: Setup IAM Permissions
Ensure the necessary IAM roles are assigned to the service accounts used by Cloud
Composer, Dataflow, and other Google Cloud services:
Google Cloud Storage: roles/storage.admin
BigQuery: roles/bigquery.admin
Dataflow: roles/dataflow.admin
Cloud Composer: roles/composer.admin
Service Account: Ensure the service account used by Airflow has the necessary permissions
to execute the above roles.
Step 2: Transfer Data from HDFS to GCS using hadoop distcp
If you have direct access to the Hadoop cluster, you can use hadoop distcp to transfer the
data from HDFS to GCS:
sh
Copy code
hadoop distcp hdfs://namenode:8020/path-to-hive-tables/
gs://your-bucket/path/
Step 3: Create Dataflow Pipeline for Transformation and Loading
Create a Dataflow job to read Parquet files from GCS, add new columns, and load the data
into BigQuery.
Dataflow Pipeline (Python)
1. Create the Dataflow script (dataflow_pipeline.py):
python
Copy code
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions,
GoogleCloudOptions, StandardOptions
from apache_beam.io import ReadFromParquet, WriteToBigQuery
from datetime import datetime
def add_columns(element):
element['timestamp'] = datetime.utcnow().isoformat()
element['source_name'] = 'hive_source'
return element
def run():
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'your-project-id'
google_cloud_options.job_name = 'hive-to-bigquery'
google_cloud_options.staging_location = 'gs://your-bucket/staging'
google_cloud_options.temp_location = 'gs://your-bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'
with beam.Pipeline(options=options) as p:
(p
| 'ReadFromParquet' >> ReadFromParquet('gs://your-bucket/path-to-
parquet-files/*.parquet')
| 'AddColumns' >> beam.Map(add_columns)
| 'WriteToBigQuery' >> WriteToBigQuery(
'your-project-id:your_dataset.your_table',
schema='SCHEMA_AUTODETECT',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
if __name__ == '__main__':
run()
2. Upload the script to GCS:
sh
Copy code
gsutil cp dataflow_pipeline.py gs://your-bucket/path-to-dataflow-
script.py
Step 4: Setup Dataform for Schema Management
1. Initialize a Dataform project:
sh
Copy code
dataform init my_dataform_project
cd my_dataform_project
2. Configure dataform.json:
json
Copy code
{
"warehouse": "bigquery",
"defaultSchema": "your_dataset"
}
3. Create SQLX files for table definitions:
Create a definitions directory if it doesn't exist, and add SQLX files for your tables.
For example, definitions/example_table.sqlx:
sqlx
Copy code
config {
type: "table",
description: "This is an example table created from a Dataform
script",
columns: {
id: "The unique identifier",
name: "The name of the entity",
timestamp: "The timestamp when the row was inserted",
source_name: "The source of the data"
}
}
select
id,
name,
current_timestamp() as timestamp,
'hive_source' as source_name
from
your_dataset.your_table
4. Run Dataform:
sh
Copy code
dataform run
Step 5: Orchestrate the Process Using Cloud Composer
Create modularized Airflow DAGs to automate each step of the process.
5.1 Airflow DAG to Transfer Data to GCS
Create an Airflow DAG (transfer_data_to_gcs_dag.py) to transfer data from HDFS to
GCS:
python
Copy code
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'retries': 1,
}
with DAG('transfer_data_to_gcs', default_args=default_args,
schedule_interval='@daily') as dag:
transfer_data = BashOperator(
task_id='distcp_to_gcs',
bash_command='hadoop distcp hdfs://namenode:8020/path-to-hive-
tables/ gs://your-bucket/path/'
)
5.2 Airflow DAG to Run Dataflow Job
Create an Airflow DAG (load_data_to_bigquery_dag.py) to run the Dataflow job:
python
Copy code
from airflow import DAG
from airflow.providers.google.cloud.operators.dataflow import
DataflowCreatePythonJobOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'retries': 1,
}
with DAG('load_data_to_bigquery', default_args=default_args,
schedule_interval='@daily') as dag:
run_dataflow_job = DataflowCreatePythonJobOperator(
task_id='run_dataflow',
py_file='gs://your-bucket/path-to-dataflow-script.py',
dataflow_default_options={
'project': 'your-project-id',
'region': 'us-central1',
'staging_location': 'gs://your-bucket/staging',
'temp_location': 'gs://your-bucket/temp',
'runner': 'DataflowRunner'
}
)
5.3 Airflow DAG to Run Dataform
Create an Airflow DAG (run_dataform_dag.py) to run Dataform:
python
Copy code
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'retries': 1,
}
with DAG('run_dataform', default_args=default_args,
schedule_interval='@daily') as dag:
run_dataform = BashOperator(
task_id='run_dataform',
bash_command='dataform run --project-dir
/path/to/your/dataform/project'
)
Conclusion
By following these steps, you can efficiently migrate your Hive external tables to BigQuery.
This approach leverages GCS for intermediate storage, Dataflow for transformation and
loading, Dataform for schema management, and Cloud Composer for orchestration. Each
component is modular, allowing for maintainability and scalability.
4o