Graph-based Neural Structured Learning in TFX

This tutorial describes graph regularization from the Neural Structured Learning framework and demonstrates an end-to-end workflow for sentiment classification in a TFX pipeline.

This notebook classifies movie reviews as positive or negative using the text of the review. This is an example of binary classification, an important and widely applicable kind of machine learning problem.

We will demonstrate the use of graph regularization in this notebook by building a graph from the given input. The general recipe for building a graph-regularized model using the Neural Structured Learning (NSL) framework when the input does not contain an explicit graph is as follows:

  1. Create embeddings for each text sample in the input. This can be done using pre-trained models such as word2vec, Swivel, BERT etc.
  2. Build a graph based on these embeddings by using a similarity metric such as the 'L2' distance, 'cosine' distance, etc. Nodes in the graph correspond to samples and edges in the graph correspond to similarity between pairs of samples.
  3. Generate training data from the above synthesized graph and sample features. The resulting training data will contain neighbor features in addition to the original node features.
  4. Create a neural network as a base model using Estimators.
  5. Wrap the base model with the add_graph_regularization wrapper function, which is provided by the NSL framework, to create a new graph Estimator model. This new model will include a graph regularization loss as the regularization term in its training objective.
  6. Train and evaluate the graph Estimator model.

In this tutorial, we integrate the above workflow in a TFX pipeline using several custom TFX components as well as a custom graph-regularized trainer component.

Below is the schematic for our TFX pipeline. Orange boxes represent off-the-shelf TFX components and pink boxes represent custom TFX components.

TFX Pipeline

To avoid upgrading Pip in a system when running locally, check to make sure that we're running in Colab. Local systems can of course be upgraded separately.

import sys
if 'google.colab' in sys.modules:
  !pip install --upgrade pip

Install Required Packages

# TFX has a constraint of 1.16 due to the removal of tf.estimator support.
!pip install -q \
  "tfx<1.16" \
  neural-structured-learning \
  tensorflow-hub \

Dependencies and imports

import apache_beam as beam
import gzip as gzip_lib
import numpy as np
import os
import pprint
import shutil
import tempfile
import urllib
import uuid
pp = pprint.PrettyPrinter()

import tensorflow as tf
import neural_structured_learning as nsl

import tfx
from tfx.components.evaluator.component import Evaluator
from tfx.components.example_gen.import_example_gen.component import ImportExampleGen
from tfx.components.example_validator.component import ExampleValidator
from tfx.components.model_validator.component import ModelValidator
from tfx.components.pusher.component import Pusher
from tfx.components.schema_gen.component import SchemaGen
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.trainer import executor as trainer_executor
from tfx.components.trainer.component import Trainer
from tfx.components.transform.component import Transform
from tfx.dsl.components.base import executor_spec
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import evaluator_pb2
from tfx.proto import example_gen_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2

from tfx.types import artifact
from tfx.types import artifact_utils
from tfx.types import channel
from tfx.types import standard_artifacts
from tfx.types.standard_artifacts import Examples

from tfx.dsl.component.experimental.annotations import InputArtifact
from tfx.dsl.component.experimental.annotations import OutputArtifact
from tfx.dsl.component.experimental.annotations import Parameter
from tfx.dsl.component.experimental.decorators import component

from tensorflow_metadata.proto.v0 import anomalies_pb2
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_metadata.proto.v0 import statistics_pb2

import tensorflow_data_validation as tfdv
import tensorflow_transform as tft
import tensorflow_model_analysis as tfma
import tensorflow_hub as hub
import tensorflow_datasets as tfds

print("TF Version: ", tf.__version__)
print("Eager mode: ", tf.executing_eagerly())
    "GPU is",
    "available" if tf.config.list_physical_devices("GPU") else "NOT AVAILABLE")
print("NSL Version: ", nsl.__version__)
print("TFX Version: ", tfx.__version__)
print("TFDV version: ", tfdv.__version__)
print("TFT version: ", tft.__version__)
print("TFMA version: ", tfma.__version__)
print("Hub version: ", hub.__version__)
print("Beam version: ", beam.__version__)
2024-08-02 09:07:42.541870: E external/local_xla/xla/stream_executor/cuda/] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-08-02 09:07:42.541914: E external/local_xla/xla/stream_executor/cuda/] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-08-02 09:07:42.543415: E external/local_xla/xla/stream_executor/cuda/] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
TF Version:  2.15.1
Eager mode:  True
GPU is available
NSL Version:  1.4.0
TFX Version:  1.15.1
TFDV version:  1.15.1
TFT version:  1.15.0
TFMA version:  0.46.0
Hub version:  0.15.0
Beam version:  2.57.0

IMDB dataset

The IMDB dataset contains the text of 50,000 movie reviews from the Internet Movie Database. These are split into 25,000 reviews for training and 25,000 reviews for testing. The training and testing sets are balanced, meaning they contain an equal number of positive and negative reviews. Moreover, there are 50,000 additional unlabeled movie reviews.

Download preprocessed IMDB dataset

The following code downloads the IMDB dataset (or uses a cached copy if it has already been downloaded) using TFDS. To speed up this notebook we will use only 10,000 labeled reviews and 10,000 unlabeled reviews for training, and 10,000 test reviews for evaluation.

train_set, eval_set = tfds.load(
    split=["train[:10000]+unsupervised[:10000]", "test[:10000]"],

Let's look at a few reviews from the training set:

for tfrecord in train_set.take(4):
  print("Review: {}".format(tfrecord["text"].numpy().decode("utf-8")[:300]))
  print("Label: {}\n".format(tfrecord["label"].numpy()))
Review: This was an absolutely terrible movie. Don't be lured in by Christopher Walken or Michael Ironside. Both are great actors, but this must simply be their worst role in history. Even their great acting could not redeem this movie's ridiculous storyline. This movie is an early nineties US propaganda pi
Label: 0

Review: I have been known to fall asleep during films, but this is usually due to a combination of things including, really tired, being warm and comfortable on the sette and having just eaten a lot. However on this occasion I fell asleep because the film was rubbish. The plot development was constant. Cons
Label: 0

Review: Mann photographs the Alberta Rocky Mountains in a superb fashion, and Jimmy Stewart and Walter Brennan give enjoyable performances as they always seem to do. <br /><br />But come on Hollywood - a Mountie telling the people of Dawson City, Yukon to elect themselves a marshal (yes a marshal!) and to e
Label: 0

Review: This is the kind of film for a snowy Sunday afternoon when the rest of the world can go ahead with its own business as you descend into a big arm-chair and mellow for a couple of hours. Wonderful performances from Cher and Nicolas Cage (as always) gently row the plot along. There are no rapids to cr
Label: 1
def _dict_to_example(instance):
  """Decoded CSV to tf example."""
  feature = {}
  for key, value in instance.items():
    if value is None:
      feature[key] = tf.train.Feature()
    elif value.dtype == np.integer:
      feature[key] = tf.train.Feature(
    elif value.dtype == np.float32:
      feature[key] = tf.train.Feature(
      feature[key] = tf.train.Feature(
  return tf.train.Example(features=tf.train.Features(feature=feature))

examples_path = tempfile.mkdtemp(prefix="tfx-data")
train_path = os.path.join(examples_path, "train.tfrecord")
eval_path = os.path.join(examples_path, "eval.tfrecord")

for path, dataset in [(train_path, train_set), (eval_path, eval_set)]:
  with as writer:
    for example in dataset:
              "label": np.array([example["label"].numpy()]),
              "text": np.array([example["text"].numpy()]),
Run TFX Components Interactively

In the cells that follow you will construct TFX components and run each one interactively within the InteractiveContext to obtain ExecutionResult objects. This mirrors the process of an orchestrator running components in a TFX DAG based on when the dependencies for each component are met.

context = InteractiveContext()
WARNING:absl:InteractiveContext pipeline_root argument not provided: using temporary directory /tmpfs/tmp/tfx-interactive-2024-08-02T09_08_00.145456-zzsoiua3 as root for pipeline outputs.
WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /tmpfs/tmp/tfx-interactive-2024-08-02T09_08_00.145456-zzsoiua3/metadata.sqlite.

The ExampleGen Component

In any ML development process the first step when starting code development is to ingest the training and test datasets. The ExampleGen component brings data into the TFX pipeline.

Create an ExampleGen component and run it.

input_config = example_gen_pb2.Input(splits=[
    example_gen_pb2.Input.Split(name='train', pattern='train.tfrecord'),
    example_gen_pb2.Input.Split(name='eval', pattern='eval.tfrecord')

example_gen = ImportExampleGen(input_base=examples_path, input_config=input_config), enable_cache=True)
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.'t find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
for artifact in example_gen.outputs['examples'].get():

print('\nexample_gen.outputs is a {}'.format(type(example_gen.outputs)))

Artifact(artifact: id: 1
type_id: 14
uri: "/tmpfs/tmp/tfx-interactive-2024-08-02T09_08_00.145456-zzsoiua3/ImportExampleGen/examples/1"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:train,num_files:1,total_bytes:27706811,xor_checksum:1722589677,sum_checksum:1722589677\nsplit:eval,num_files:1,total_bytes:13374744,xor_checksum:1722589680,sum_checksum:1722589680"
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.15.1"
state: LIVE
, artifact_type: id: 14
name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET

example_gen.outputs is a <class 'dict'>
{'examples': OutputChannel(artifact_type=Examples, producer_component_id=ImportExampleGen, output_key=examples, additional_properties={}, additional_custom_properties={}, _input_trigger=None, _is_async=False)}
["train", "eval"]

The component's outputs include 2 artifacts:

  • the training examples (10,000 labeled reviews + 10,000 unlabeled reviews)
  • the eval examples (10,000 labeled reviews)

The IdentifyExamples Custom Component

To use NSL, we will need each instance to have a unique ID. We create a custom component that adds such a unique ID to all instances across all splits. We leverage Apache Beam to be able to easily scale to large datasets if needed.

def make_example_with_unique_id(example, id_feature_name):
  """Adds a unique ID to the given `tf.train.Example` proto.

  This function uses Python's 'uuid' module to generate a universally unique
  identifier for each example.

    example: An instance of a `tf.train.Example` proto.
    id_feature_name: The name of the feature in the resulting `tf.train.Example`
      that will contain the unique identifier.

    A new `tf.train.Example` proto that includes a unique identifier as an
    additional feature.
  result = tf.train.Example()
  unique_id = uuid.uuid4()
  return result

def IdentifyExamples(orig_examples: InputArtifact[Examples],
                     identified_examples: OutputArtifact[Examples],
                     id_feature_name: Parameter[str],
                     component_name: Parameter[str]) -> None:

  # Get a list of the splits in input_data
  splits_list = artifact_utils.decode_split_names(
  # For completeness, encode the splits names and payload_format.
  # We could also just use input_data.split_names.
  identified_examples.split_names = artifact_utils.encode_split_names(
  # TODO(b/168616829): Remove populating payload_format after tfx 0.25.0.

  for split in splits_list:
    input_dir = artifact_utils.get_split_uri([orig_examples], split)
    output_dir = artifact_utils.get_split_uri([identified_examples], split)
    with beam.Pipeline() as pipeline:
       | 'ReadExamples' >>
           os.path.join(input_dir, '*'),
       | 'AddUniqueId' >> beam.Map(make_example_with_unique_id, id_feature_name)
       | 'WriteIdentifiedExamples' >>
           file_path_prefix=os.path.join(output_dir, 'data_tfrecord'),

identify_examples = IdentifyExamples(
    id_feature_name=u'id'), enable_cache=False)

The StatisticsGen Component

The StatisticsGen component computes descriptive statistics for your dataset. The statistics that it generates can be visualized for review, and are used for example validation and to infer a schema.

Create a StatisticsGen component and run it.

# Computes statistics over data for visualization and example validation.
statistics_gen = StatisticsGen(
    examples=identify_examples.outputs["identified_examples"]), enable_cache=True)

The SchemaGen Component

The SchemaGen component generates a schema for your data based on the statistics from StatisticsGen. It tries to infer the data types of each of your features, and the ranges of legal values for categorical features.

Create a SchemaGen component and run it.

# Generates schema based on statistics files.
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'], infer_feature_shape=False), enable_cache=True)

The generated artifact is just a schema.pbtxt containing a text representation of a schema_pb2.Schema protobuf:

train_uri = schema_gen.outputs['schema'].get()[0].uri
schema_filename = os.path.join(train_uri, 'schema.pbtxt')
schema = tfx.utils.io_utils.parse_pbtxt_file(
    file_name=schema_filename, message=schema_pb2.Schema())

It can be visualized using tfdv.display_schema() (we will look at this in more detail in a subsequent lab):


The ExampleValidator Component

The ExampleValidator performs anomaly detection, based on the statistics from StatisticsGen and the schema from SchemaGen. It looks for problems such as missing values, values of the wrong type, or categorical values outside of the domain of acceptable values.

Create an ExampleValidator component and run it.

# Performs anomaly detection based on statistics and data schema.
validate_stats = ExampleValidator(
    schema=schema_gen.outputs['schema']), enable_cache=False)

The SynthesizeGraph Component

Graph construction involves creating embeddings for text samples and then using a similarity function to compare the embeddings.

We will use pretrained Swivel embeddings to create embeddings in the tf.train.Example format for each sample in the input. We will store the resulting embeddings in the TFRecord format along with the sample's ID. This is important and will allow us match sample embeddings with corresponding nodes in the graph later.

Once we have the sample embeddings, we will use them to build a similarity graph, i.e, nodes in this graph will correspond to samples and edges in this graph will correspond to similarity between pairs of nodes.

Neural Structured Learning provides a graph building library to build a graph based on sample embeddings. It uses cosine similarity as the similarity measure to compare embeddings and build edges between them. It also allows us to specify a similarity threshold, which can be used to discard dissimilar edges from the final graph. In the following example, using 0.99 as the similarity threshold, we end up with a graph that has 111,066 bi-directional edges.

swivel_url = ''
hub_layer = hub.KerasLayer(swivel_url, input_shape=[], dtype=tf.string)

def _bytes_feature(value):
  """Returns a bytes_list from a string / byte."""
  return tf.train.Feature(bytes_list=tf.train.BytesList(value=value))

def _float_feature(value):
  """Returns a float_list from a float / double."""
  return tf.train.Feature(float_list=tf.train.FloatList(value=value))

def create_embedding_example(example):
  """Create tf.Example containing the sample's embedding and its ID."""
  sentence_embedding = hub_layer(tf.sparse.to_dense(example['text']))

  # Flatten the sentence embedding back to 1-D.
  sentence_embedding = tf.reshape(sentence_embedding, shape=[-1])

  feature_dict = {
      'id': _bytes_feature(tf.sparse.to_dense(example['id']).numpy()),
      'embedding': _float_feature(sentence_embedding.numpy().tolist())

  return tf.train.Example(features=tf.train.Features(feature=feature_dict))

def create_dataset(uri):
  tfrecord_filenames = [os.path.join(uri, name) for name in os.listdir(uri)]
  return, compression_type='GZIP')

def create_embeddings(train_path, output_path):
  dataset = create_dataset(train_path)
  embeddings_path = os.path.join(output_path, 'embeddings.tfr')

  feature_map = {
      'label':[], tf.int64),

  with as writer:
    for tfrecord in dataset:
      tensor_dict =, feature_map)
      embedding_example = create_embedding_example(tensor_dict)

def build_graph(output_path, similarity_threshold):
  embeddings_path = os.path.join(output_path, 'embeddings.tfr')
  graph_path = os.path.join(output_path, 'graph.tsv')
  graph_builder_config = nsl.configs.GraphBuilderConfig(
      random_seed=12345)[embeddings_path], graph_path,
"""Custom Artifact type"""

class SynthesizedGraph(tfx.types.artifact.Artifact):
  """Output artifact of the SynthesizeGraph component"""
  TYPE_NAME = 'SynthesizedGraphPath'
      'span': standard_artifacts.SPAN_PROPERTY,
      'split_names': standard_artifacts.SPLIT_NAMES_PROPERTY,

def SynthesizeGraph(identified_examples: InputArtifact[Examples],
                    synthesized_graph: OutputArtifact[SynthesizedGraph],
                    similarity_threshold: Parameter[float],
                    component_name: Parameter[str]) -> None:

  # Get a list of the splits in input_data
  splits_list = artifact_utils.decode_split_names(

  # We build a graph only based on the 'Split-train' split which includes both
  # labeled and unlabeled examples.
  train_input_examples_uri = os.path.join(identified_examples.uri,
  output_graph_uri = os.path.join(synthesized_graph.uri, 'Split-train')

  print('Creating embeddings...')
  create_embeddings(train_input_examples_uri, output_graph_uri)

  print('Synthesizing graph...')
  build_graph(output_graph_uri, similarity_threshold)

  synthesized_graph.split_names = artifact_utils.encode_split_names(

synthesize_graph = SynthesizeGraph(
    similarity_threshold=0.99), enable_cache=False)
Creating embeddings...
Synthesizing graph...
train_uri = synthesize_graph.outputs["synthesized_graph"].get()[0].uri
graph_path = os.path.join(train_uri, "Split-train", "graph.tsv")
print("node 1\t\t\t\t\tnode 2\t\t\t\t\tsimilarity")
!head {graph_path}
!tail {graph_path}
node 1                  node 2                  similarity
1e5a20fa-113d-4a4b-b901-2f51f4b74670    0a39f83e-8d3c-4ad5-849f-38675194e720    0.991234
0a39f83e-8d3c-4ad5-849f-38675194e720    1e5a20fa-113d-4a4b-b901-2f51f4b74670    0.991234
0a39f83e-8d3c-4ad5-849f-38675194e720    771c9b5c-07a4-491f-a515-bb10a11a8705    0.990838
771c9b5c-07a4-491f-a515-bb10a11a8705    0a39f83e-8d3c-4ad5-849f-38675194e720    0.990838
030d35c0-0db9-4f28-8707-cfa8c5f483d3    771c9b5c-07a4-491f-a515-bb10a11a8705    0.990184
771c9b5c-07a4-491f-a515-bb10a11a8705    030d35c0-0db9-4f28-8707-cfa8c5f483d3    0.990184
55763705-7e7a-4a39-9a65-c61f06871924    095d197d-3cf3-40df-a4b0-1ebdd6ace9a5    0.992823
095d197d-3cf3-40df-a4b0-1ebdd6ace9a5    55763705-7e7a-4a39-9a65-c61f06871924    0.992823
c471129f-08ff-4eb7-887f-7c8e47b6c224    465070dd-60cf-4a54-8171-369bc1c68637    0.990020
465070dd-60cf-4a54-8171-369bc1c68637    c471129f-08ff-4eb7-887f-7c8e47b6c224    0.990020
c5c4b0cd-b721-4993-9840-0831d3fdb5f0    7c8b2f1c-2e23-43ad-acab-4acf280e007b    0.991327
7c8b2f1c-2e23-43ad-acab-4acf280e007b    c5c4b0cd-b721-4993-9840-0831d3fdb5f0    0.991327
202e428e-c62b-4202-a2a4-eeb0eccf6cee    d5dfaadb-7e66-41cf-9b30-fa594b63534c    0.991046
d5dfaadb-7e66-41cf-9b30-fa594b63534c    202e428e-c62b-4202-a2a4-eeb0eccf6cee    0.991046
58c91cb8-59ef-421f-b014-eff6e66c5214    564daa6f-1f36-46a0-87cc-56b0a6c7bd4c    0.991198
564daa6f-1f36-46a0-87cc-56b0a6c7bd4c    58c91cb8-59ef-421f-b014-eff6e66c5214    0.991198
007922e8-2a03-4ba9-9e9c-af99f22303b2    e6319e60-e3c8-4d1b-b269-97dc552d8a15    0.990260
e6319e60-e3c8-4d1b-b269-97dc552d8a15    007922e8-2a03-4ba9-9e9c-af99f22303b2    0.990260
3e42725a-bd42-4fba-91d1-eb6fd6e70a8b    ee568e35-a727-489e-b458-e4b7cce70a81    0.991317
ee568e35-a727-489e-b458-e4b7cce70a81    3e42725a-bd42-4fba-91d1-eb6fd6e70a8b    0.991317
wc -l {graph_path}
222132 /tmpfs/tmp/tfx-interactive-2024-08-02T09_08_00.145456-zzsoiua3/SynthesizeGraph/synthesized_graph/6/Split-train/graph.tsv

The Transform Component

The Transform component performs data transformations and feature engineering. The results include an input TensorFlow graph which is used during both training and serving to preprocess the data before training or inference. This graph becomes part of the SavedModel that is the result of model training. Since the same input graph is used for both training and serving, the preprocessing will always be the same, and only needs to be written once.

The Transform component requires more code than many other components because of the arbitrary complexity of the feature engineering that you may need for the data and/or model that you're working with. It requires code files to be available which define the processing needed.

Each sample will include the following three features:

  1. id: The node ID of the sample.
  2. text_xf: An int64 list containing word IDs.
  3. label_xf: A singleton int64 identifying the target class of the review: 0=negative, 1=positive.

Let's define a module containing the preprocessing_fn() function that we will pass to the Transform component:

_transform_module_file = ''
%%writefile {_transform_module_file}

import tensorflow as tf

import tensorflow_transform as tft

VOCAB_SIZE = 10000
OOV_SIZE = 100

def tokenize_reviews(reviews, sequence_length=SEQUENCE_LENGTH):
  reviews = tf.strings.lower(reviews)
  reviews = tf.strings.regex_replace(reviews, r" '| '|^'|'$", " ")
  reviews = tf.strings.regex_replace(reviews, "[^a-z' ]", " ")
  tokens = tf.strings.split(reviews)[:, :sequence_length]
  start_tokens = tf.fill([tf.shape(reviews)[0], 1], "<START>")
  end_tokens = tf.fill([tf.shape(reviews)[0], 1], "<END>")
  tokens = tf.concat([start_tokens, tokens, end_tokens], axis=1)
  tokens = tokens[:, :sequence_length]
  tokens = tokens.to_tensor(default_value="<PAD>")
  pad = sequence_length - tf.shape(tokens)[1]
  tokens = tf.pad(tokens, [[0, 0], [0, pad]], constant_values="<PAD>")
  return tf.reshape(tokens, [-1, sequence_length])

def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.

    inputs: map from feature keys to raw not-yet-transformed features.

    Map from string feature key to transformed feature operations.
  outputs = {}
  outputs["id"] = inputs["id"]
  tokens = tokenize_reviews(_fill_in_missing(inputs["text"], ''))
  outputs["text_xf"] = tft.compute_and_apply_vocabulary(
  outputs["label_xf"] = _fill_in_missing(inputs["label"], -1)
  return outputs

def _fill_in_missing(x, default_value):
  """Replace missing values in a SparseTensor.

  Fills in missing values of `x` with the default_value.

    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.
    default_value: the value with which to replace the missing values.

    A rank 1 tensor where missing values of `x` have been filled in.
  if not isinstance(x, tf.sparse.SparseTensor):
    return x
  return tf.squeeze(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),

Create and run the Transform component, referring to the files that were created above.

# Performs transformations and feature engineering in training and serving.
transform = Transform(
    module_file=_transform_module_file), enable_cache=True)
Please avoid running ```` directly.
        Instead, use pypa/build, pypa/installer or other
        standards-based tools.

        See for details.

WARNING:absl:Tables initialized inside a tf.function  will be re-initialized on every invocation of the function. This  re-initialization can have significant impact on performance. Consider lifting  them out of the graph context using  `tf.init_scope`.: compute_and_apply_vocabulary/apply_vocab/text_file_init/InitializeTableFromTextFileV2
WARNING:absl:Tables initialized inside a tf.function  will be re-initialized on every invocation of the function. This  re-initialization can have significant impact on performance. Consider lifting  them out of the graph context using  `tf.init_scope`.: compute_and_apply_vocabulary/apply_vocab/text_file_init/InitializeTableFromTextFileV2
INFO:tensorflow:Assets written to: /tmpfs/tmp/tfx-interactive-2024-08-02T09_08_00.145456-zzsoiua3/Transform/transform_graph/7/.temp_path/tftransform_tmp/394d576f22a342099b3a96428fe005f1/assets
INFO:tensorflow:Assets written to: /tmpfs/tmp/tfx-interactive-2024-08-02T09_08_00.145456-zzsoiua3/Transform/transform_graph/7/.temp_path/tftransform_tmp/394d576f22a342099b3a96428fe005f1/assets
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:Assets written to: /tmpfs/tmp/tfx-interactive-2024-08-02T09_08_00.145456-zzsoiua3/Transform/transform_graph/7/.temp_path/tftransform_tmp/1d81f1db01d34c12bd9832f63dede4e7/assets
INFO:tensorflow:Assets written to: /tmpfs/tmp/tfx-interactive-2024-08-02T09_08_00.145456-zzsoiua3/Transform/transform_graph/7/.temp_path/tftransform_tmp/1d81f1db01d34c12bd9832f63dede4e7/assets
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:struct2tensor is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_decision_forests is not available.
INFO:tensorflow:tensorflow_text is not available.
INFO:tensorflow:tensorflow_text is not available.

The Transform component has 2 types of outputs:

  • transform_graph is the graph that can perform the preprocessing operations (this graph will be included in the serving and evaluation models).
  • transformed_examples represents the preprocessed training and evaluation data.
{'transform_graph': OutputChannel(artifact_type=TransformGraph, producer_component_id=Transform, output_key=transform_graph, additional_properties={}, additional_custom_properties={}, _input_trigger=None, _is_async=False),
 'transformed_examples': OutputChannel(artifact_type=Examples, producer_component_id=Transform, output_key=transformed_examples, additional_properties={}, additional_custom_properties={}, _input_trigger=None, _is_async=False),
 'updated_analyzer_cache': OutputChannel(artifact_type=TransformCache, producer_component_id=Transform, output_key=updated_analyzer_cache, additional_properties={}, additional_custom_properties={}, _input_trigger=None, _is_async=False),
 'pre_transform_schema': OutputChannel(artifact_type=Schema, producer_component_id=Transform, output_key=pre_transform_schema, additional_properties={}, additional_custom_properties={}, _input_trigger=None, _is_async=False),
 'pre_transform_stats': OutputChannel(artifact_type=ExampleStatistics, producer_component_id=Transform, output_key=pre_transform_stats, additional_properties={}, additional_custom_properties={}, _input_trigger=None, _is_async=False),
 'post_transform_schema': OutputChannel(artifact_type=Schema, producer_component_id=Transform, output_key=post_transform_schema, additional_properties={}, additional_custom_properties={}, _input_trigger=None, _is_async=False),
 'post_transform_stats': OutputChannel(artifact_type=ExampleStatistics, producer_component_id=Transform, output_key=post_transform_stats, additional_properties={}, additional_custom_properties={}, _input_trigger=None, _is_async=False),
 'post_transform_anomalies': OutputChannel(artifact_type=ExampleAnomalies, producer_component_id=Transform, output_key=post_transform_anomalies, additional_properties={}, additional_custom_properties={}, _input_trigger=None, _is_async=False)}

Take a peek at the transform_graph artifact: it points to a directory containing 3 subdirectories:

train_uri = transform.outputs['transform_graph'].get()[0].uri
['metadata', 'transformed_metadata', 'transform_fn']

The transform_fn subdirectory contains the actual preprocessing graph. The metadata subdirectory contains the schema of the original data. The transformed_metadata subdirectory contains the schema of the preprocessed data.

Take a look at some of the transformed examples and check that they are indeed processed as intended.

def pprint_examples(artifact, n_examples=3):
  print("artifact:", artifact)
  uri = os.path.join(artifact.uri, "Split-train")
  print("uri:", uri)
  tfrecord_filenames = [os.path.join(uri, name) for name in os.listdir(uri)]
  print("tfrecord_filenames:", tfrecord_filenames)
  dataset =, compression_type="GZIP")
  for tfrecord in dataset.take(n_examples):
    serialized_example = tfrecord.numpy()
    example = tf.train.Example.FromString(serialized_example)
artifact: Artifact(artifact: id: 8
type_id: 14
uri: "/tmpfs/tmp/tfx-interactive-2024-08-02T09_08_00.145456-zzsoiua3/Transform/transformed_examples/7"
properties {
  key: "split_names"
  value {
    string_value: "[\"eval\", \"train\"]"
custom_properties {
  key: "name"
  value {
    string_value: "transformed_examples:2024-08-02T09:09:59.887990"
custom_properties {
  key: "producer_component"
  value {
    string_value: "Transform"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.15.1"
state: LIVE
name: "transformed_examples:2024-08-02T09:09:59.887990"
, artifact_type: id: 14
name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
uri: /tmpfs/tmp/tfx-interactive-2024-08-02T09_08_00.145456-zzsoiua3/Transform/transformed_examples/7/Split-train
tfrecord_filenames: ['/tmpfs/tmp/tfx-interactive-2024-08-02T09_08_00.145456-zzsoiua3/Transform/transformed_examples/7/Split-train/transformed_examples-00000-of-00001.gz']
features {
  feature {
    key: "id"
    value {
      bytes_list {
        value: "d7a8912a-cea4-4453-a79e-fbbe5785e9ee"
  feature {
    key: "label_xf"
    value {
      int64_list {
        value: 0
  feature {
    key: "text_xf"
    value {
      int64_list {
        value: 13
        value: 8
        value: 14
        value: 32
        value: 338
        value: 310
        value: 15
        value: 95
        value: 27
        value: 10001
        value: 9
        value: 31
        value: 1173
        value: 3153
        value: 43
        value: 495
        value: 10060
        value: 214
        value: 26
        value: 71
        value: 142
        value: 19
        value: 8
        value: 204
        value: 339
        value: 27
        value: 74
        value: 181
        value: 238
        value: 9
        value: 440
        value: 67
        value: 74
        value: 71
        value: 94
        value: 100
        value: 22
        value: 5442
        value: 8
        value: 1573
        value: 607
        value: 530
        value: 8
        value: 15
        value: 6
        value: 32
        value: 378
        value: 6292
        value: 207
        value: 2276
        value: 388
        value: 0
        value: 84
        value: 1023
        value: 154
        value: 65
        value: 155
        value: 52
        value: 0
        value: 10080
        value: 7871
        value: 65
        value: 250
        value: 74
        value: 3202
        value: 20
        value: 10000
        value: 3720
        value: 10020
        value: 10008
        value: 1282
        value: 3862
        value: 3
        value: 53
        value: 3952
        value: 110
        value: 1879
        value: 17
        value: 3153
        value: 14
        value: 166
        value: 19
        value: 2
        value: 1023
        value: 1007
        value: 9405
        value: 9
        value: 2
        value: 15
        value: 12
        value: 14
        value: 4504
        value: 4
        value: 109
        value: 158
        value: 1202
        value: 7
        value: 174
        value: 505
        value: 12

features {
  feature {
    key: "id"
    value {
      bytes_list {
        value: "ed97ab48-01a6-46f7-81d7-2bf3f448ff22"
  feature {
    key: "label_xf"
    value {
      int64_list {
        value: 0
  feature {
    key: "text_xf"
    value {
      int64_list {
        value: 13
        value: 7
        value: 23
        value: 75
        value: 494
        value: 5
        value: 748
        value: 2155
        value: 307
        value: 91
        value: 19
        value: 8
        value: 6
        value: 499
        value: 763
        value: 5
        value: 2
        value: 1690
        value: 4
        value: 200
        value: 593
        value: 57
        value: 1244
        value: 120
        value: 2364
        value: 3
        value: 4407
        value: 21
        value: 0
        value: 10081
        value: 3
        value: 263
        value: 42
        value: 6947
        value: 2
        value: 169
        value: 185
        value: 21
        value: 8
        value: 5143
        value: 7
        value: 1339
        value: 2155
        value: 81
        value: 0
        value: 18
        value: 14
        value: 1468
        value: 0
        value: 86
        value: 986
        value: 14
        value: 2259
        value: 1790
        value: 562
        value: 3
        value: 284
        value: 200
        value: 401
        value: 5
        value: 668
        value: 19
        value: 17
        value: 58
        value: 1934
        value: 4
        value: 45
        value: 14
        value: 4212
        value: 113
        value: 43
        value: 135
        value: 7
        value: 753
        value: 7
        value: 224
        value: 23
        value: 1155
        value: 179
        value: 4
        value: 0
        value: 18
        value: 19
        value: 7
        value: 191
        value: 0
        value: 2047
        value: 4
        value: 10
        value: 3
        value: 283
        value: 42
        value: 401
        value: 5
        value: 668
        value: 4
        value: 90
        value: 234
        value: 10023
        value: 227

features {
  feature {
    key: "id"
    value {
      bytes_list {
        value: "8fe0aa92-b194-4ed1-98e7-3f72be67ea75"
  feature {
    key: "label_xf"
    value {
      int64_list {
        value: 0
  feature {
    key: "text_xf"
    value {
      int64_list {
        value: 13
        value: 4577
        value: 7158
        value: 0
        value: 10047
        value: 3778
        value: 3346
        value: 9
        value: 2
        value: 758
        value: 1915
        value: 3
        value: 2280
        value: 1511
        value: 3
        value: 2003
        value: 10020
        value: 225
        value: 786
        value: 382
        value: 16
        value: 39
        value: 203
        value: 361
        value: 5
        value: 93
        value: 11
        value: 11
        value: 19
        value: 220
        value: 21
        value: 341
        value: 2
        value: 10000
        value: 966
        value: 0
        value: 77
        value: 4
        value: 6677
        value: 464
        value: 10071
        value: 5
        value: 10042
        value: 630
        value: 2
        value: 10044
        value: 404
        value: 2
        value: 10044
        value: 3
        value: 5
        value: 10008
        value: 0
        value: 1259
        value: 630
        value: 106
        value: 10042
        value: 6721
        value: 10
        value: 49
        value: 21
        value: 0
        value: 2071
        value: 20
        value: 1292
        value: 4
        value: 0
        value: 431
        value: 11
        value: 11
        value: 166
        value: 67
        value: 2342
        value: 5815
        value: 12
        value: 575
        value: 21
        value: 0
        value: 1691
        value: 537
        value: 4
        value: 0
        value: 3605
        value: 307
        value: 0
        value: 10054
        value: 1563
        value: 3115
        value: 467
        value: 4577
        value: 3
        value: 1069
        value: 1158
        value: 5
        value: 23
        value: 4279
        value: 6677
        value: 464
        value: 20
        value: 10004

The GraphAugmentation Component

Since we have the sample features and the synthesized graph, we can generate the augmented training data for Neural Structured Learning. The NSL framework provides a library to combine the graph and the sample features to produce the final training data for graph regularization. The resulting training data will include original sample features as well as features of their corresponding neighbors.

In this tutorial, we consider undirected edges and use a maximum of 3 neighbors per sample to augment training data with graph neighbors.

def split_train_and_unsup(input_uri):
  'Separate the labeled and unlabeled instances.'

  tmp_dir = tempfile.mkdtemp(prefix='tfx-data')
  tfrecord_filenames = [
      os.path.join(input_uri, filename) for filename in os.listdir(input_uri)
  train_path = os.path.join(tmp_dir, 'train.tfrecord')
  unsup_path = os.path.join(tmp_dir, 'unsup.tfrecord')
  with as train_writer, \ as unsup_writer:
    for tfrecord in
        tfrecord_filenames, compression_type='GZIP'):
      example = tf.train.Example()
      if ('label_xf' not in example.features.feature or
          example.features.feature['label_xf'].int64_list.value[0] == -1):
        writer = unsup_writer
        writer = train_writer
  return train_path, unsup_path

def gzip(filepath):
  with open(filepath, 'rb') as f_in:
    with + '.gz', 'wb') as f_out:
      shutil.copyfileobj(f_in, f_out)

def copy_tfrecords(input_uri, output_uri):
  for filename in os.listdir(input_uri):
    input_filename = os.path.join(input_uri, filename)
    output_filename = os.path.join(output_uri, filename)
    shutil.copyfile(input_filename, output_filename)

def GraphAugmentation(identified_examples: InputArtifact[Examples],
                      synthesized_graph: InputArtifact[SynthesizedGraph],
                      augmented_examples: OutputArtifact[Examples],
                      num_neighbors: Parameter[int],
                      component_name: Parameter[str]) -> None:

  # Get a list of the splits in input_data
  splits_list = artifact_utils.decode_split_names(

  train_input_uri = os.path.join(identified_examples.uri, 'Split-train')
  eval_input_uri = os.path.join(identified_examples.uri, 'Split-eval')
  train_graph_uri = os.path.join(synthesized_graph.uri, 'Split-train')
  train_output_uri = os.path.join(augmented_examples.uri, 'Split-train')
  eval_output_uri = os.path.join(augmented_examples.uri, 'Split-eval')


  # Separate the labeled and unlabeled examples from the 'Split-train' split.
  train_path, unsup_path = split_train_and_unsup(train_input_uri)

  output_path = os.path.join(train_output_uri, 'nsl_train_data.tfr')
  pack_nbrs_args = dict(
      graph_path=os.path.join(train_graph_uri, 'graph.tsv'),
  print(' arguments:', pack_nbrs_args)**pack_nbrs_args)

  # Downstream components expect gzip'ed TFRecords.

  # The test examples are left untouched and are simply copied over.
  copy_tfrecords(eval_input_uri, eval_output_uri)

  augmented_examples.split_names = identified_examples.split_names

# Augments training data with graph neighbors.
graph_augmentation = GraphAugmentation(
    num_neighbors=3), enable_cache=False) arguments: {'labeled_examples_path': '/tmpfs/tmp/tfx-dataetf_kj51/train.tfrecord', 'unlabeled_examples_path': '/tmpfs/tmp/tfx-dataetf_kj51/unsup.tfrecord', 'graph_path': '/tmpfs/tmp/tfx-interactive-2024-08-02T09_08_00.145456-zzsoiua3/SynthesizeGraph/synthesized_graph/6/Split-train/graph.tsv', 'output_training_data_path': '/tmpfs/tmp/tfx-interactive-2024-08-02T09_08_00.145456-zzsoiua3/GraphAugmentation/augmented_examples/8/Split-train/nsl_train_data.tfr', 'add_undirected_edges': True, 'max_nbrs': 3}
pprint_examples(graph_augmentation.outputs['augmented_examples'].get()[0], 6)
artifact: Artifact(artifact: id: 15
type_id: 14
uri: "/tmpfs/tmp/tfx-interactive-2024-08-02T09_08_00.145456-zzsoiua3/GraphAugmentation/augmented_examples/8"
properties {
  key: "split_names"
  value {
    string_value: "[\"eval\", \"train\"]"
custom_properties {
  key: "name"
  value {
    string_value: "augmented_examples:2024-08-02T09:10:31.301021"
custom_properties {
  key: "producer_component"
  value {
    string_value: "GraphAugmentation"
name: "augmented_examples:2024-08-02T09:10:31.301021"
, artifact_type: id: 14
name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
uri: /tmpfs/tmp/tfx-interactive-2024-08-02T09_08_00.145456-zzsoiua3/GraphAugmentation/augmented_examples/8/Split-train
tfrecord_filenames: ['/tmpfs/tmp/tfx-interactive-2024-08-02T09_08_00.145456-zzsoiua3/GraphAugmentation/augmented_examples/8/Split-train/nsl_train_data.tfr.gz']
features {
  feature {
    key: "NL_num_nbrs"
    value {
      int64_list {
        value: 0
  feature {
    key: "id"
    value {
      bytes_list {
        value: "d7a8912a-cea4-4453-a79e-fbbe5785e9ee"
  feature {
    key: "label_xf"
    value {
      int64_list {
        value: 0
  feature {
    key: "text_xf"
    value {
      int64_list {
        value: 13
        value: 8
        value: 14
        value: 32
        value: 338
        value: 310
        value: 15
        value: 95
        value: 27
        value: 10001
        value: 9
        value: 31
        value: 1173
        value: 3153
        value: 43
        value: 495
        value: 10060
        value: 214
        value: 26
        value: 71
        value: 142
        value: 19
        value: 8
        value: 204
        value: 339
        value: 27
        value: 74
        value: 181
        value: 238
        value: 9
        value: 440
        value: 67
        value: 74
        value: 71
        value: 94
        value: 100
        value: 22
        value: 5442
        value: 8
        value: 1573
        value: 607
        value: 530
        value: 8
        value: 15
        value: 6
        value: 32
        value: 378
        value: 6292
        value: 207
        value: 2276
        value: 388
        value: 0
        value: 84
        value: 1023
        value: 154
        value: 65
        value: 155
        value: 52
        value: 0
        value: 10080
        value: 7871
        value: 65
        value: 250
        value: 74
        value: 3202
        value: 20
        value: 10000
        value: 3720
        value: 10020
        value: 10008
        value: 1282
        value: 3862
        value: 3
        value: 53
        value: 3952
        value: 110
        value: 1879
        value: 17
        value: 3153
        value: 14
        value: 166
        value: 19
        value: 2
        value: 1023
        value: 1007
        value: 9405
        value: 9
        value: 2
        value: 15
        value: 12
        value: 14
        value: 4504
        value: 4
        value: 109
        value: 158
        value: 1202
        value: 7
        value: 174
        value: 505
        value: 12

features {
  feature {
    key: "NL_num_nbrs"
    value {
      int64_list {
        value: 0
  feature {
    key: "id"
    value {
      bytes_list {
        value: "ed97ab48-01a6-46f7-81d7-2bf3f448ff22"
  feature {
    key: "label_xf"
    value {
      int64_list {
        value: 0
  feature {
    key: "text_xf"
    value {
      int64_list {
        value: 13
        value: 7
        value: 23
        value: 75
        value: 494
        value: 5
        value: 748
        value: 2155
        value: 307
        value: 91
        value: 19
        value: 8
        value: 6
        value: 499
        value: 763
        value: 5
        value: 2
        value: 1690
        value: 4
        value: 200
        value: 593
        value: 57
        value: 1244
        value: 120
        value: 2364
        value: 3
        value: 4407
        value: 21
        value: 0
        value: 10081
        value: 3
        value: 263
        value: 42
        value: 6947
        value: 2
        value: 169
        value: 185
        value: 21
        value: 8
        value: 5143
        value: 7
        value: 1339
        value: 2155
        value: 81
        value: 0
        value: 18
        value: 14
        value: 1468
        value: 0
        value: 86
        value: 986
        value: 14
        value: 2259
        value: 1790
        value: 562
        value: 3
        value: 284
        value: 200
        value: 401
        value: 5
        value: 668
        value: 19
        value: 17
        value: 58
        value: 1934
        value: 4
        value: 45
        value: 14
        value: 4212
        value: 113
        value: 43
        value: 135
        value: 7
        value: 753
        value: 7
        value: 224
        value: 23
        value: 1155
        value: 179
        value: 4
        value: 0
        value: 18
        value: 19
        value: 7
        value: 191
        value: 0
        value: 2047
        value: 4
        value: 10
        value: 3
        value: 283
        value: 42
        value: 401
        value: 5
        value: 668
        value: 4
        value: 90
        value: 234
        value: 10023
        value: 227

features {
  feature {
    key: "NL_num_nbrs"
    value {
      int64_list {
        value: 0
  feature {
    key: "id"
    value {
      bytes_list {
        value: "8fe0aa92-b194-4ed1-98e7-3f72be67ea75"
  feature {
    key: "label_xf"
    value {
      int64_list {
        value: 0
  feature {
    key: "text_xf"
    value {
      int64_list {
        value: 13
        value: 4577
        value: 7158
        value: 0
        value: 10047
        value: 3778
        value: 3346
        value: 9
        value: 2
        value: 758
        value: 1915
        value: 3
        value: 2280
        value: 1511
        value: 3
        value: 2003
        value: 10020
        value: 225
        value: 786
        value: 382
        value: 16
        value: 39
        value: 203
        value: 361
        value: 5
        value: 93
        value: 11
        value: 11
        value: 19
        value: 220
        value: 21
        value: 341
        value: 2
        value: 10000
        value: 966
        value: 0
        value: 77
        value: 4
        value: 6677
        value: 464
        value: 10071
        value: 5
        value: 10042
        value: 630
        value: 2
        value: 10044
        value: 404
        value: 2
        value: 10044
        value: 3
        value: 5
        value: 10008
        value: 0
        value: 1259
        value: 630
        value: 106
        value: 10042
        value: 6721
        value: 10
        value: 49
        value: 21
        value: 0
        value: 2071
        value: 20
        value: 1292
        value: 4
        value: 0
        value: 431
        value: 11
        value: 11
        value: 166
        value: 67
        value: 2342
        value: 5815
        value: 12
        value: 575
        value: 21
        value: 0
        value: 1691
        value: 537
        value: 4
        value: 0
        value: 3605
        value: 307
        value: 0
        value: 10054
        value: 1563
        value: 3115
        value: 467
        value: 4577
        value: 3
        value: 1069
        value: 1158
        value: 5
        value: 23
        value: 4279
        value: 6677
        value: 464
        value: 20
        value: 10004

features {
  feature {
    key: "NL_num_nbrs"
    value {
      int64_list {
        value: 0
  feature {
    key: "id"
    value {
      bytes_list {
        value: "66bc21ee-32ed-486e-8c16-ccca8252442c"
  feature {
    key: "label_xf"
    value {
      int64_list {
        value: 1
  feature {
    key: "text_xf"
    value {
      int64_list {
        value: 13
        value: 8
        value: 6
        value: 0
        value: 251
        value: 4
        value: 18
        value: 20
        value: 2
        value: 6783
        value: 2295
        value: 2338
        value: 52
        value: 0
        value: 468
        value: 4
        value: 0
        value: 189
        value: 73
        value: 153
        value: 1294
        value: 17
        value: 90
        value: 234
        value: 935
        value: 16
        value: 25
        value: 10024
        value: 92
        value: 2
        value: 192
        value: 4218
        value: 3317
        value: 3
        value: 10098
        value: 20
        value: 2
        value: 356
        value: 4
        value: 565
        value: 334
        value: 382
        value: 36
        value: 6989
        value: 3
        value: 6065
        value: 2510
        value: 16
        value: 203
        value: 7264
        value: 2849
        value: 0
        value: 86
        value: 346
        value: 50
        value: 26
        value: 58
        value: 10020
        value: 5
        value: 1464
        value: 58
        value: 2081
        value: 2969
        value: 42
        value: 2
        value: 2364
        value: 3
        value: 1402
        value: 10062
        value: 138
        value: 147
        value: 614
        value: 115
        value: 29
        value: 90
        value: 105
        value: 2
        value: 223
        value: 18
        value: 9
        value: 160
        value: 324
        value: 3
        value: 24
        value: 12
        value: 1252
        value: 0
        value: 2142
        value: 10
        value: 1832
        value: 111
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1

features {
  feature {
    key: "NL_num_nbrs"
    value {
      int64_list {
        value: 0
  feature {
    key: "id"
    value {
      bytes_list {
        value: "036e6ead-d36e-4fa3-83ad-15b1f66250c2"
  feature {
    key: "label_xf"
    value {
      int64_list {
        value: 1
  feature {
    key: "text_xf"
    value {
      int64_list {
        value: 13
        value: 16
        value: 423
        value: 23
        value: 1367
        value: 30
        value: 0
        value: 363
        value: 12
        value: 153
        value: 3174
        value: 9
        value: 8
        value: 18
        value: 26
        value: 667
        value: 338
        value: 1372
        value: 0
        value: 86
        value: 46
        value: 9200
        value: 282
        value: 0
        value: 10091
        value: 4
        value: 0
        value: 694
        value: 10028
        value: 52
        value: 362
        value: 26
        value: 202
        value: 39
        value: 216
        value: 5
        value: 27
        value: 5822
        value: 19
        value: 52
        value: 58
        value: 362
        value: 26
        value: 202
        value: 39
        value: 474
        value: 0
        value: 10029
        value: 4
        value: 2
        value: 243
        value: 143
        value: 386
        value: 3
        value: 0
        value: 386
        value: 579
        value: 2
        value: 132
        value: 57
        value: 725
        value: 88
        value: 140
        value: 30
        value: 27
        value: 33
        value: 1359
        value: 29
        value: 8
        value: 567
        value: 35
        value: 106
        value: 230
        value: 60
        value: 0
        value: 3041
        value: 5
        value: 7879
        value: 28
        value: 281
        value: 110
        value: 111
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1
        value: 1

features {
  feature {
    key: "NL_num_nbrs"
    value {
      int64_list {
        value: 0
  feature {
    key: "id"
    value {
      bytes_list {
        value: "3ba1a554-cd8d-4095-8185-d2d0a04ec5bb"
  feature {
    key: "label_xf"
    value {
      int64_list {
        value: 1
  feature {
    key: "text_xf"
    value {
      int64_list {
        value: 13
        value: 8
        value: 6
        value: 2
        value: 18
        value: 69
        value: 140
        value: 27
        value: 83
        value: 31
        value: 1877
        value: 905
        value: 9
        value: 10057
        value: 31
        value: 43
        value: 2115
        value: 36
        value: 32
        value: 2057
        value: 6133
        value: 10
        value: 6
        value: 32
        value: 2474
        value: 1614
        value: 3
        value: 2707
        value: 990
        value: 4
        value: 10067
        value: 9
        value: 2
        value: 1532
        value: 242
        value: 90
        value: 3757
        value: 3
        value: 90
        value: 10026
        value: 0
        value: 242
        value: 6
        value: 260
        value: 31
        value: 24
        value: 4
        value: 0
        value: 84
        value: 497
        value: 177
        value: 1151
        value: 777
        value: 9
        value: 397
        value: 552
        value: 7726
        value: 10051
        value: 34
        value: 14
        value: 379
        value: 33
        value: 1829
        value: 9
        value: 123
        value: 0
        value: 916
        value: 10028
        value: 7
        value: 64
        value: 571
        value: 12
        value: 8
        value: 18
        value: 27
        value: 687
        value: 9
        value: 30
        value: 5609
        value: 16
        value: 25
        value: 99
        value: 117
        value: 66
        value: 2
        value: 130
        value: 21
        value: 8
        value: 842
        value: 7726
        value: 10051
        value: 6
        value: 338
        value: 1107
        value: 3
        value: 24
        value: 10020
        value: 29
        value: 53
        value: 1476

The Trainer Component

The Trainer component trains models using TensorFlow.

Create a Python module containing a trainer_fn function, which must return an estimator. If you prefer creating a Keras model, you can do so and then convert it to an estimator using keras.model_to_estimator().

# Setup paths.
_trainer_module_file = ''
%%writefile {_trainer_module_file}

import neural_structured_learning as nsl

import tensorflow as tf

import tensorflow_model_analysis as tfma
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils

LABEL_KEY = 'label'

def _transformed_name(key):
  return key + '_xf'

def _transformed_names(keys):
  return [_transformed_name(key) for key in keys]

# Hyperparameters:
# We will use an instance of `HParams` to inclue various hyperparameters and
# constants used for training and evaluation. We briefly describe each of them
# below:
# -   max_seq_length: This is the maximum number of words considered from each
#                     movie review in this example.
# -   vocab_size: This is the size of the vocabulary considered for this
#                 example.
# -   oov_size: This is the out-of-vocabulary size considered for this example.
# -   distance_type: This is the distance metric used to regularize the sample
#                    with its neighbors.
# -   graph_regularization_multiplier: This controls the relative weight of the
#                                      graph regularization term in the overall
#                                      loss function.
# -   num_neighbors: The number of neighbors used for graph regularization. This
#                    value has to be less than or equal to the `num_neighbors`
#                    argument used above in the GraphAugmentation component when
#                    invoking ``.
# -   num_fc_units: The number of units in the fully connected layer of the
#                   neural network.
class HParams(object):
  """Hyperparameters used for training."""
  def __init__(self):
    ### dataset parameters
    # The following 3 values should match those defined in the Transform
    # Component.
    self.max_seq_length = 100
    self.vocab_size = 10000
    self.oov_size = 100
    ### Neural Graph Learning parameters
    self.distance_type = nsl.configs.DistanceType.L2
    self.graph_regularization_multiplier = 0.1
    # The following value has to be at most the value of 'num_neighbors' used
    # in the GraphAugmentation component.
    self.num_neighbors = 1
    ### Model Architecture
    self.num_embedding_dims = 16
    self.num_fc_units = 64

HPARAMS = HParams()

def optimizer_fn():
  """Returns an instance of `tf.Optimizer`."""
  return tf.compat.v1.train.RMSPropOptimizer(
    learning_rate=0.0001, decay=1e-6)

def build_train_op(loss, global_step):
  """Builds a train op to optimize the given loss using gradient descent."""
  with tf.name_scope('train'):
    optimizer = optimizer_fn()
    train_op = optimizer.minimize(loss=loss, global_step=global_step)
  return train_op

# Building the model:
# A neural network is created by stacking layers—this requires two main
# architectural decisions:
# * How many layers to use in the model?
# * How many *hidden units* to use for each layer?
# In this example, the input data consists of an array of word-indices. The
# labels to predict are either 0 or 1. We will use a feed-forward neural network
# as our base model in this tutorial.
def feed_forward_model(features, is_training, reuse=tf.compat.v1.AUTO_REUSE):
  """Builds a simple 2 layer feed forward neural network.

  The layers are effectively stacked sequentially to build the classifier. The
  first layer is an Embedding layer, which takes the integer-encoded vocabulary
  and looks up the embedding vector for each word-index. These vectors are
  learned as the model trains. The vectors add a dimension to the output array.
  The resulting dimensions are: (batch, sequence, embedding). Next is a global
  average pooling 1D layer, which reduces the dimensionality of its inputs from
  3D to 2D. This fixed-length output vector is piped through a fully-connected
  (Dense) layer with 16 hidden units. The last layer is densely connected with a
  single output node. Using the sigmoid activation function, this value is a
  float between 0 and 1, representing a probability, or confidence level.

    features: A dictionary containing batch features returned from the
      `input_fn`, that include sample features, corresponding neighbor features,
      and neighbor weights.
    is_training: a Python Boolean value or a Boolean scalar Tensor, indicating
      whether to apply dropout.
    reuse: a Python Boolean value for reusing variable scope.

    logits: Tensor of shape [batch_size, 1].
    representations: Tensor of shape [batch_size, _] for graph regularization.
      This is the representation of each example at the graph regularization

  with tf.compat.v1.variable_scope('ff', reuse=reuse):
    inputs = features[_transformed_name('text')]
    embeddings = tf.compat.v1.get_variable(
            HPARAMS.vocab_size + HPARAMS.oov_size, HPARAMS.num_embedding_dims
    embedding_layer = tf.nn.embedding_lookup(embeddings, inputs)

    pooling_layer = tf.compat.v1.layers.AveragePooling1D(
        pool_size=HPARAMS.max_seq_length, strides=HPARAMS.max_seq_length)(
    # Shape of pooling_layer is now [batch_size, 1, HPARAMS.num_embedding_dims]
    pooling_layer = tf.reshape(pooling_layer, [-1, HPARAMS.num_embedding_dims])

    dense_layer = tf.compat.v1.layers.Dense(
        16, activation='relu')(

    output_layer = tf.compat.v1.layers.Dense(
        1, activation='sigmoid')(

    # Graph regularization will be done on the penultimate (dense) layer
    # because the output layer is a single floating point number.
    return output_layer, dense_layer

# A note on hidden units:
# The above model has two intermediate or "hidden" layers, between the input and
# output, and excluding the Embedding layer. The number of outputs (units,
# nodes, or neurons) is the dimension of the representational space for the
# layer. In other words, the amount of freedom the network is allowed when
# learning an internal representation. If a model has more hidden units
# (a higher-dimensional representation space), and/or more layers, then the
# network can learn more complex representations. However, it makes the network
# more computationally expensive and may lead to learning unwanted
# patterns—patterns that improve performance on training data but not on the
# test data. This is called overfitting.

# This function will be used to generate the embeddings for samples and their
# corresponding neighbors, which will then be used for graph regularization.
def embedding_fn(features, mode, **params):
  """Returns the embedding corresponding to the given features.

    features: A dictionary containing batch features returned from the
      `input_fn`, that include sample features, corresponding neighbor features,
      and neighbor weights.
    mode: Specifies if this is training, evaluation, or prediction. See

    The embedding that will be used for graph regularization.
  is_training = (mode == tf.estimator.ModeKeys.TRAIN)
  _, embedding = feed_forward_model(features, is_training)
  return embedding

def feed_forward_model_fn(features, labels, mode, params, config):
  """Implementation of the model_fn for the base feed-forward model.

    features: This is the first item returned from the `input_fn` passed to
      `train`, `evaluate`, and `predict`. This should be a single `Tensor` or
      `dict` of same.
    labels: This is the second item returned from the `input_fn` passed to
      `train`, `evaluate`, and `predict`. This should be a single `Tensor` or
      `dict` of same (for multi-head models). If mode is `ModeKeys.PREDICT`,
      `labels=None` will be passed. If the `model_fn`'s signature does not
      accept `mode`, the `model_fn` must still be able to handle `labels=None`.
    mode: Optional. Specifies if this training, evaluation or prediction. See
    params: An HParams instance as returned by get_hyper_parameters().
    config: Optional configuration object. Will receive what is passed to
      Estimator in `config` parameter, or the default `config`. Allows updating
      things in your model_fn based on configuration such as `num_ps_replicas`,
      or `model_dir`. Unused currently.

     A `tf.estimator.EstimatorSpec` for the base feed-forward model. This does
     not include graph-based regularization.

  is_training = mode == tf.estimator.ModeKeys.TRAIN

  # Build the computation graph.
  probabilities, _ = feed_forward_model(features, is_training)
  predictions = tf.round(probabilities)

  if mode == tf.estimator.ModeKeys.PREDICT:
    # labels will be None, and no loss to compute.
    cross_entropy_loss = None
    eval_metric_ops = None
    # Loss is required in train and eval modes.
    # Flatten 'probabilities' to 1-D.
    probabilities = tf.reshape(probabilities, shape=[-1])
    cross_entropy_loss = tf.compat.v1.keras.losses.binary_crossentropy(
        labels, probabilities)
    eval_metric_ops = {
        'accuracy': tf.compat.v1.metrics.accuracy(labels, predictions)

  if is_training:
    global_step = tf.compat.v1.train.get_or_create_global_step()
    train_op = build_train_op(cross_entropy_loss, global_step)
    train_op = None

  return tf.estimator.EstimatorSpec(
          'probabilities': probabilities,
          'predictions': predictions

# Tf.Transform considers these features as "raw"
def _get_raw_feature_spec(schema):
  return schema_utils.schema_as_feature_spec(schema).feature_spec

def _gzip_reader_fn(filenames):
  """Small utility returning a record reader that can read gzip'ed files."""

def _example_serving_receiver_fn(tf_transform_output, schema):
  """Build the serving in inputs.

    tf_transform_output: A TFTransformOutput.
    schema: the schema of the input data.

    Tensorflow graph which parses examples, applying tf-transform to them.
  raw_feature_spec = _get_raw_feature_spec(schema)

  # We don't need the ID feature for serving.

  raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
      raw_feature_spec, default_batch_size=None)
  serving_input_receiver = raw_input_fn()

  transformed_features = tf_transform_output.transform_raw_features(

  # Even though, LABEL_KEY was removed from 'raw_feature_spec', the transform
  # operation would have injected the transformed LABEL_KEY feature with a
  # default value.
  return tf.estimator.export.ServingInputReceiver(
      transformed_features, serving_input_receiver.receiver_tensors)

def _eval_input_receiver_fn(tf_transform_output, schema):
  """Build everything needed for the tf-model-analysis to run the model.

    tf_transform_output: A TFTransformOutput.
    schema: the schema of the input data.

    EvalInputReceiver function, which contains:
      - Tensorflow graph which parses raw untransformed features, applies the
        tf-transform preprocessing operators.
      - Set of raw, untransformed features.
      - Label against which predictions will be compared.
  # Notice that the inputs are raw features, not transformed features here.
  raw_feature_spec = _get_raw_feature_spec(schema)

  # We don't need the ID feature for TFMA.

  raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
      raw_feature_spec, default_batch_size=None)
  serving_input_receiver = raw_input_fn()

  transformed_features = tf_transform_output.transform_raw_features(

  labels = transformed_features.pop(_transformed_name(LABEL_KEY))
  return tfma.export.EvalInputReceiver(

def _augment_feature_spec(feature_spec, num_neighbors):
  """Augments `feature_spec` to include neighbor features.
      feature_spec: Dictionary of feature keys mapping to TF feature types.
      num_neighbors: Number of neighbors to use for feature key augmentation.
      An augmented `feature_spec` that includes neighbor feature keys.
  for i in range(num_neighbors):
    feature_spec['{}{}_{}'.format(NBR_FEATURE_PREFIX, i, 'id')] = \
    # We don't care about the neighbor features corresponding to
    # _transformed_name(LABEL_KEY) because the LABEL_KEY feature will be
    # removed from the feature spec during training/evaluation.
    feature_spec['{}{}_{}'.format(NBR_FEATURE_PREFIX, i, 'text_xf')] = \[HPARAMS.max_seq_length], dtype=tf.int64,
                              default_value=tf.constant(0, dtype=tf.int64,
    # The 'NL_num_nbrs' features is currently not used.

  # Set the neighbor weight feature keys.
  for i in range(num_neighbors):
    feature_spec['{}{}{}'.format(NBR_FEATURE_PREFIX, i, NBR_WEIGHT_SUFFIX)] = \[1], dtype=tf.float32, default_value=[0.0])

  return feature_spec

def _input_fn(filenames, tf_transform_output, is_training, batch_size=200):
  """Generates features and labels for training or evaluation.

    filenames: [str] list of CSV files to read data from.
    tf_transform_output: A TFTransformOutput.
    is_training: Boolean indicating if we are in training mode.
    batch_size: int First dimension size of the Tensors returned by input_fn

    A (features, indices) tuple where features is a dictionary of
      Tensors, and indices is a single Tensor of label indices.
  transformed_feature_spec = (

  # During training, NSL uses augmented training data (which includes features
  # from graph neighbors). So, update the feature spec accordingly. This needs
  # to be done because we are using different schemas for NSL training and eval,
  # but the Trainer Component only accepts a single schema.
  if is_training:
    transformed_feature_spec =_augment_feature_spec(transformed_feature_spec,

  dataset =
      filenames, batch_size, transformed_feature_spec, reader=_gzip_reader_fn)

  transformed_features =
  # We pop the label because we do not want to use it as a feature while we're
  # training.
  return transformed_features, transformed_features.pop(

# TFX will call this function
def trainer_fn(hparams, schema):
  """Build the estimator using the high level API.
    hparams: Holds hyperparameters used to train the model as name/value pairs.
    schema: Holds the schema of the training examples.
    A dict of the following:
      - estimator: The estimator that will be used for training and eval.
      - train_spec: Spec for training.
      - eval_spec: Spec for eval.
      - eval_input_receiver_fn: Input function for eval.
  train_batch_size = 40
  eval_batch_size = 40

  tf_transform_output = tft.TFTransformOutput(hparams.transform_output)

  train_input_fn = lambda: _input_fn(

  eval_input_fn = lambda: _input_fn(

  train_spec = tf.estimator.TrainSpec(

  serving_receiver_fn = lambda: _example_serving_receiver_fn(
      tf_transform_output, schema)

  exporter = tf.estimator.FinalExporter('imdb', serving_receiver_fn)
  eval_spec = tf.estimator.EvalSpec(

  run_config = tf.estimator.RunConfig(
      save_checkpoints_steps=999, keep_checkpoint_max=1)

  run_config = run_config.replace(model_dir=hparams.serving_model_dir)

  estimator = tf.estimator.Estimator(
      model_fn=feed_forward_model_fn, config=run_config, params=HPARAMS)

  # Create a graph regularization config.
  graph_reg_config = nsl.configs.make_graph_reg_config(

  # Invoke the Graph Regularization Estimator wrapper to incorporate
  # graph-based regularization for training.
  graph_nsl_estimator = nsl.estimator.add_graph_regularization(

  # Create an input receiver for TFMA processing
  receiver_fn = lambda: _eval_input_receiver_fn(
      tf_transform_output, schema)

  return {
      'estimator': graph_nsl_estimator,
      'train_spec': train_spec,
      'eval_spec': eval_spec,
      'eval_input_receiver_fn': receiver_fn

Create and run the Trainer component, passing it the file that we created above.

# Uses user-provided Python function that implements a model using TensorFlow's
# Estimators API.
trainer = Trainer(
WARNING:absl:`custom_executor_spec` is deprecated. Please customize component directly.
WARNING:absl:`transformed_examples` is deprecated. Please use `examples` instead.
Take a peek at the trained model which was exported from Trainer.

train_uri = trainer.outputs['model'].get()[0].uri
serving_model_path = os.path.join(train_uri, 'Format-Serving')
exported_model = tf.saved_model.load(serving_model_path)
exported_model.graph.get_operations()[:10] + ["..."]
[<tf.Operation 'global_step/Initializer/zeros' type=Const>,
 <tf.Operation 'global_step' type=VarHandleOp>,
 <tf.Operation 'global_step/IsInitialized/VarIsInitializedOp' type=VarIsInitializedOp>,
 <tf.Operation 'global_step/Assign' type=AssignVariableOp>,
 <tf.Operation 'global_step/Read/ReadVariableOp' type=ReadVariableOp>,
 <tf.Operation 'input_example_tensor' type=Placeholder>,
 <tf.Operation 'ParseExample/ParseExampleV2/names' type=Const>,
 <tf.Operation 'ParseExample/ParseExampleV2/sparse_keys' type=Const>,
 <tf.Operation 'ParseExample/ParseExampleV2/dense_keys' type=Const>,
 <tf.Operation 'ParseExample/ParseExampleV2/ragged_keys' type=Const>,

Let's visualize the model's metrics using Tensorboard.

#docs_infra: no_execute

# Get the URI of the output artifact representing the training logs,
# which is a directory
model_run_dir = trainer.outputs['model_run'].get()[0].uri

%load_ext tensorboard
%tensorboard --logdir {model_run_dir}

Model Serving

Graph regularization only affects the training workflow by adding a regularization term to the loss function. As a result, the model evaluation and serving workflows remain unchanged. It is for the same reason that we've also omitted downstream TFX components that typically come after the Trainer component like the Evaluator, Pusher, etc.


We have demonstrated the use of graph regularization using the Neural Structured Learning (NSL) framework in a TFX pipeline even when the input does not contain an explicit graph. We considered the task of sentiment classification of IMDB movie reviews for which we synthesized a similarity graph based on review embeddings. We encourage users to experiment further by using different embeddings for graph construction, varying hyperparameters, changing the amount of supervision, and by defining different model architectures.