Plugins make Versatile Data Kit adaptable to any organization's use-cases. Example use-cases for plugins are different database connections, file systems, different transformation templates, or organization-specific best practices and patterns.
You can find a list of plugins that we have already developed in plugins directory.
Installing a third-party plugin can be quickly done with pip:
pip install vdk-PLUGIN-NAME
Once the plugin is installed, vdk automatically finds it and activates it.
Install the latest Cookiecutter if you haven't installed it yet (this requires Cookiecutter 1.4.0 or higher):
pip install -U cookiecutter
Generate a VDK Plugin package project:
cookiecutter https://github.com/tozka/cookiecutter-vdk-plugin.git
Then
- Include your implementation files inside the
src
folder; - Include any tests inside the
tests
so they can be ran by CI framework automatically.
A plugin is a python module that enhances or changes the behavior of Versatile Data Kit.
A plugin is simply an implementation of one or more plugin hooks.
See all supported hook function specifications in specs.py. The spec documentation contains details and examples for how a hook can be used.
To create a new plugin, there are only two steps necessary:
- Create your implementation of the plugin's hook(s):
You will need to mark it with the
hookimpl
decorator. Check out its documentation here to see how you can configure the hook execution order
# this is module myproject.pluginmodule, which will be our plugin
# define hookimpl as follows
# you need to have vdk-core as dependency
from vdk.api.plugin.hook_markers import hookimpl
# name of function must match name of hookspec function
@hookimpl(tryfirst=True)
def vdk_configure(config_builder: ConfigurationBuilder) -> None:
"""
Here we define what configuration settings are needed with reasonable defaults.
Other plugins will populate them. For example, there could be a plugin that reads env variables or parses config files.
"""
config_builder.add(
key="my_config",
default_value="default-value-to-use-if-not-set-later",
description="Description of my config.",
)
# And here we can create another hook implementation
# let's use our configuration to print bar if it is set to foo everytime a job runs
@hookimpl
def run_job(self, context: JobContext):
value = context.configuration.get_required_option('my_config')
if value == "foo":
print("bar")
- Register it as a plugin by listing the plugin modules in the vdk.plugin.run entry_point in your setup.py:
entry_points={ 'vdk.plugin.run': ['name_of_plugin = myproject.pluginmodule'] }
- Execution order
By default, hook implementations follow a Last-In, First-Out (LIFO) execution order. This means the last registered hook is executed first. Hooks marked with historic=True deviate from the standard LIFO order. Instead, they operate on a First-In, First-Out (FIFO) basis. This unique behavior ensures that a historic hook is executed immediately after its registration. An example of such usage is within vdk_start, commonly employed for registering plugin classes that contain hook implementations. This mechanism is designed to ensure that essential setup hooks run in the expected order.
- Handling prioritization conflicts
In vdk-core, numerous hooks are annotated with tryfirst or trylast markers. These markers are critical for managing execution priority among various hook implementations. If you encounter unexpected behavior, it's likely due to the execution order of plugin hook implementations versus core hooks. A practical approach to diagnosing and resolving such issues involves debugging with a sample data job. This process will allow you to observe the execution order and adjust your hook's prioritization marker (tryfirst, trylast) accordingly to achieve the desired operational flow.
- Document hook dependencies
Clearly document any dependencies or expected execution order among your hooks. This will aid in future debugging efforts and facilitate a clearer understanding for new developers. For instance, should you employ trylast=True in vdk_initialize, it's advisable to annotate its necessity within a docstring or comment. Example:
@hookimpl(
trylast=True
) # trylast because we want to have any lineage loggers already initialized
def vdk_initialize(self, context: CoreContext) -> None:
self._lineage_logger = context.state.get(LINEAGE_LOGGER_KEY)
The plugin system is based on pluggy.
As hook implementations can be functions (without class) it is recommended all hooks of the same family to share prefix. For example: db_connection_start, job_initialize.
Versatile Data Kit is used for executing different commands, some of them provided as plugins. Using the above hooks, one can extend the functionality of any command by adding monitoring, customizing logging, or adding new options.
Check out the CoreHookSpec class documentation for more details.
The above image shows the normal run cycle of a data job. The hooks shown are only invoked when the "vdk run" command is invoked to execute a data job.
Check out the JobRunSpecs class documentation for more details.
These are hook specifications that enable plugins to hook at PEP249 connection and cursor events during execution. Sequence of evaluation:
- db_connection_validate_operation
- db_connection_decorate_operation
- db_connection_execute_operation
- in case of recovery needed -> db_connection_recover_operation
Check out the connection hook spec documentation for more details.
Data engineers use one of the IIngester methods to send data for ingestion. The way data is ingested is controlled by different ingestion plugins which implement one of three possible methods (hooks)
- pre_ingest_process - called before data is about to be ingested
- ingest_payload - does the actual ingestion (sending the data to remote store)
- post_ingest_process - called after data is ingested (or failed to ingest in case of error)
Details about ingestion hooks can be seen here.
You can see an example of an ingest plugin here
Ingestion hooks can be used for the following example use-cases (this is not an exhaustive list):
- Data Validation (pre_ingest_process): Plugin validates incoming data against a predefined schema or rules. For example, it verifies if all necessary fields in sales data are present and correctly formatted.
- Data Transformation (pre_ingest_process): Plugin transforms the data into the required format. For instance, it might convert product names to uppercase or generate new fields in sales data, or anonymize PII data.
- Data Ingestion (ingest_payload): The Plugin Destination pushes data to the final storage, managing connections to systems like Amazon S3, Google Cloud Storage, or SQL databases.
- Data Auditing (post_ingest_process): In the post-ingest phase, the plugin serves as a data auditing tool, generating reports detailing data volume, errors, and timestamps of the ingestion process.
- Metadata Update (post_ingest_process): A plugin updates a metadata repository with information about the ingested data, like source, time, volume, schema.
Any backwards compatibility guarantees apply only to public interfaces. Public interfaces are modules and packages defined or imported in vdk.api.*. unless the documentation explicitly declares them to be provisional or internal interfaces. Anything else is considered internal. All public interfaces (classes or methods) must have documentation.