0% found this document useful (0 votes)
47 views11 pages

Data Validation Tool (DVT) CLI (Command-Line Interface)

The Data Validation Tool (DVT) is an open-source Python CLI tool used for validating data quality and consistency in datasets, especially during migration projects. It supports various validation types, including column, row, schema, and custom query validations, and can output results to stdout or BigQuery. The document also explains an example Apache Airflow DAG that integrates DVT for data validation tasks in a GCP project, detailing its setup, execution, and the validation process.

Uploaded by

Varun Singh
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
47 views11 pages

Data Validation Tool (DVT) CLI (Command-Line Interface)

The Data Validation Tool (DVT) is an open-source Python CLI tool used for validating data quality and consistency in datasets, especially during migration projects. It supports various validation types, including column, row, schema, and custom query validations, and can output results to stdout or BigQuery. The document also explains an example Apache Airflow DAG that integrates DVT for data validation tasks in a GCP project, detailing its setup, execution, and the validation process.

Uploaded by

Varun Singh
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as DOCX, PDF, TXT or read online on Scribd

DVT(DATA VALIDATION TOOL) :

Data Validation Tool (DVT), which is an open-source Python-based CLI


(Command-Line Interface) tool for performing data validation tasks. This tool is
often used to validate and check data quality, format, and consistency within
datasets. Data validation is a critical step in a data warehouse, database, or
data lake migration project where data from both the source and the target
tables are compared to ensure they are matched and correct after each
migration step .

DVT supports the following validations:


 Column validation (count, sum, avg, min, max, stddev, group by)
 Row validation (Not supported for FileSystem connections)
 Schema validation
 Custom Query validation

Before using this tool, you will need to create connections to the source and
target tables. Once the connections are created, you can run validations on
those tables. Validation results can be printed to stdout (default) or outputted
to BigQuery (recommended).

Alternatives to running DVT in the CLI include deploying DVT to Cloud Run,
Cloud Functions, or Airflow

explain the following code

EXAMPLE DAG
This code defines an Apache Airflow DAG (Directed Acyclic Graph) that
integrates with the Data Validation Tool (DVT) for running data validation tasks
in a Google Cloud Platform (GCP) project. The DAG is set up to execute a
Python function inside a virtual environment using the
PythonVirtualenvOperator. Below is an explanation of the code in detail:
1. Imports and Default Arguments

from datetime import timedelta


import airflow
from airflow import DAG
from airflow import models
from [Link] import PythonVirtualenvOperator
 timedelta: This is used for time-related calculations (e.g., for retries,
timeouts).
 airflow: Importing Airflow components to create and manage the DAG.
 models: This is used to interact with Airflow’s variables (such as fetching
a project ID).
 PythonVirtualenvOperator: This operator runs a Python function in a
virtual environment, which is useful for isolating dependencies.
2. Default Arguments (default_args)
default_args = {
"start_date": [Link].days_ago(1),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
 start_date: Specifies the start date for the DAG’s execution. In this case,
the DAG starts running 1 day ago from the current date.
 retries: If a task fails, it will be retried once (since retries: 1).
 retry_delay: Specifies a delay of 5 minutes between retries for a failed
task.
3. DAG Definition (with DAG)
with DAG(
"dvt_example_dag",
default_args=default_args,
description="Data Validation Tool Example DAG",
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60),
) as dag:
 "dvt_example_dag": The ID of the DAG.
 default_args: Applies the default arguments defined earlier (like
start_date, retries, etc.).
 description: A description of what this DAG does.
 schedule_interval=None: This means the DAG is not scheduled to run
automatically. You need to trigger it manually.
 dagrun_timeout=timedelta(minutes=60): The maximum time allowed
for this DAG run is 60 minutes. If the DAG takes longer than that, it will
be terminated.
4. Validation Function (validation_function)
def validation_function(project):
from data_validation import data_validation
from data_validation.result_handlers import bigquery as bqhandler

BQ_CONN = {"source_type": "BigQuery", "project_id": project}

GROUPED_CONFIG_COUNT_VALID = {
"source_conn": BQ_CONN,
"target_conn": BQ_CONN,
"type": "Column",
"schema_name": "bigquery-public-data.new_york_citibike",
"table_name": "citibike_trips",
"aggregates": [
{
"field_alias": "count",
"source_column": None,
"target_column": None,
"type": "count",
},
],
}

handler =
[Link].get_handler_for_project(project)
validator = data_validation.DataValidation(
GROUPED_CONFIG_COUNT_VALID, verbose=True, result_handler=handler
)
[Link]()
 validation_function: This is the core function that performs the data
validation using DVT.
o It imports the necessary modules from the data_validation library
and BigQuery result handler (bqhandler).
o BQ_CONN: Defines the connection to BigQuery using the project
ID (project), which will be passed when the task runs.
o GROUPED_CONFIG_COUNT_VALID: This dictionary contains the
configuration for the data validation:
 source_conn and target_conn: Both are set to the BigQuery
connection details (BQ_CONN).
 type: "Column": Specifies that the validation type is
column-based.
 schema_name and table_name: The schema (bigquery-
public-data.new_york_citibike) and table (citibike_trips) in
BigQuery to validate.
 aggregates: Specifies an aggregation type of count (i.e.,
counting rows in the specified BigQuery table).
o handler: Retrieves the BigQuery result handler for the project
using bqhandler.
o validator: Creates a DataValidation object with the configuration
and the result handler.
o [Link](): Runs the validation based on the provided
configuration.
5. Fetching the GCP Project ID
gcp_project = [Link]("gcp_project")
 This line retrieves the GCP project ID stored in an Airflow Variable called
gcp_project. This variable is assumed to be predefined in Airflow and will
be passed to the validation_function when the task runs.
6. PythonVirtualenvOperator
virtualenv_task = PythonVirtualenvOperator(
task_id="dvt-virtualenv",
python_callable=validation_function,
op_args=[gcp_project],
requirements=["google-pso-data-validator"],
system_site_packages=False,
)
 task_id="dvt-virtualenv": This is the task ID for the
PythonVirtualenvOperator.
 python_callable=validation_function: The Python function that will be
executed in the virtual environment (i.e., validation_function).
 op_args=[gcp_project]: The argument passed to the validation_function.
The GCP project ID retrieved earlier (gcp_project) is passed as an
argument to the function.
 requirements=["google-pso-data-validator"]: Specifies the Python
dependencies that should be installed in the virtual environment. In this
case, it installs the google-pso-data-validator package, which is
presumably required for data validation.
 system_site_packages=False: This means the virtual environment will
not inherit system site packages, ensuring that the environment is
isolated and contains only the specified dependencies.
7. Final Execution
virtualenv_task
 This line just references the task (virtualenv_task) that will be executed
as part of the DAG. It ensures that the task is included in the DAG’s
execution plan.
Summary of What This Code Does
1. DAG Setup: The code defines an Airflow DAG (dvt_example_dag) with a
60-minute timeout for each run. It will run manually, not on a schedule.
2. Validation Function: The validation_function is defined to execute a data
validation task on a BigQuery table using the Data Validation Tool (DVT).
The function performs a count validation on a BigQuery table
(citibike_trips) and prints the results.
3. Virtual Environment: The validation function is executed inside a Python
virtual environment using the PythonVirtualenvOperator. The required
dependencies (google-pso-data-validator) are installed in the virtual
environment before the function runs.
4. Airflow Variable: The GCP project ID (gcp_project) is retrieved from an
Airflow variable and passed into the validation function.
When This DAG Runs
 The DAG will run manually, and when triggered, it will run the
validation_function in a virtual environment with the specified
dependencies. It will validate the data in BigQuery and ensure that it
matches the expected configuration (e.g., performing a COUNT on the
citibike_trips table).
This setup is useful for integrating data validation tasks within a workflow,
ensuring that the data in a table meets the expected conditions before it is
used downstream in other processes or pipelines.

Step-by-Step Explanation of How It Works:


1. Manual Trigger (since schedule_interval=None):
o The DAG (dvt_example_dag) is configured with
schedule_interval=None, which means it is not automatically
scheduled to run at regular intervals.
o Instead, you will need to manually trigger the DAG when you want
it to run.
2. DAG Execution:
o When you trigger the DAG, Airflow will start the execution of the
tasks defined in the DAG.
o The PythonVirtualenvOperator task (virtualenv_task) will run the
validation_function inside a Python virtual environment. This
function is responsible for the data validation using the Data
Validation Tool (DVT).
3. Data Validation:
o Inside validation_function, the DVT library will:
 Connect to BigQuery using the project ID (gcp_project).
 Perform a column-based validation on the specified table
(citibike_trips).
 The validation logic uses the count aggregation as an
example.
o The results of the validation will be handled by the
BigQueryResultHandler (bqhandler), and the validation will be
executed.
4. Execution in a Virtual Environment:
o The PythonVirtualenvOperator ensures that the validation
function runs inside an isolated Python environment with only the
dependencies you specify.
o In this case, the required dependency (google-pso-data-validator)
will be installed in the virtual environment, ensuring that the
function has the necessary libraries available to perform the
validation.

Characteristic Data Lake Data Warehouse


Data Raw, unstructured, semi- Structured, clean
Type structured data

Data ELT (Extract, Load, ETL (Extract,


Processing Transform) Transform, Load)
Big data analytics, machine BI, reporting,
Purpose
learning querying

Cost More cost-effective More expensive


Lower performance (due High performance
Performance
to raw data) for queries
Data scientists, engineers, Business analysts,
Users
analysts executives
Structured Data: Highly organized, follows predefined schema (e.g., tables in
relational databases).
Semi-structured Data: Some organization or tags but not as rigid as structured
data (e.g., JSON, XML).
Unstructured Data: Raw, unorganized, with no specific schema (e.g., text
documents, videos, social media posts).

Batch Data:
 Definition: Batch data refers to data that is collected, stored, and
processed in chunks or batches over a specific time period (e.g., daily,
hourly).
 Processing: Processing of batch data occurs at scheduled intervals,
typically after the data has been accumulated.
 Latency: Since the data is processed in batches, there is usually a delay
between the data collection and its processing/analysis. The latency can
be minutes, hours, or even days.

Streaming Data:
 Definition: Streaming data refers to data that is continuously generated
and processed in real-time or near-real-time. It is typically transmitted
and analysed as it is created.
 Processing: Streaming data is processed on the fly, often in small,
incremental chunks, allowing for real-time insights and actions.
 Latency: Because data is processed in real-time, the latency is much
lower compared to batch data. Processing occurs immediately as new
data arrives.
IAM: In Google Cloud Platform (GCP), IAM stands for Identity and Access
Management. IAM is a service that allows you to control who can access your
resources, what actions they can perform on those resources, and under what
conditions. It helps you manage users, groups, and permissions in a secure and
organized way.
If you have a team working on a GCP project, you could:
 Assign a developer role to some users so they can create and manage
virtual machines but not change billing settings.
 Assign an Owner role to a project lead who needs full access to configure
and manage all resources.
 Assign Viewer roles to stakeholders who only need to read project data
without making any changes.

Tell me some alternatives for simple English words that can add variety
and sophistication to your vocabulary

I want to make a proper timetable for the upcoming CGL exam which is
scheduled after 6 months. Help me craft a proper nice time table such
that all the four sections i.e. maths, reasoning, general knowledge and
English can be given equal and enough time. Take into consideration that
I wake up at 7 am, my breakfast time is at 9:30 am and i have office from
12:30 pm to 8:30 pm (I don’t work continuously in these hours, the work
is only for some hours in between, so u can include something in this
time that I can study in between these hours whenever I am free) and I
sleep at 1 am .Also give proper breaks in between for enjoyment like
phone , games or a refreshing walk and also add break after completing
one session , and add time for dinner , lunch and breakfast too.

You might also like