0% found this document useful (0 votes)
17 views10 pages

171 - PDFsam - Programming Pig

Uploaded by

mitmak
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
17 views10 pages

171 - PDFsam - Programming Pig

Uploaded by

mitmak
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd

Using partitions

Some types of storage partition their data, allowing you to read only the relevant sec-
tions for a given job. The LoadMetadata interface also provides methods for working
with partitions in your data. In order for Pig to request the relevant partitions, it must
know how the data is partitioned. Pig determines this by calling getPartitionKeys. If
this returns a null or the LoadMetadata interface is not implemented by your loader, Pig
will assume it needs to read the entire input.
Pig expects getPartitionKeys to return an array of strings, where each string represents
one field name. Those fields are the keys used to partition the data. Pig will look for a
filter statement immediately following the load statement that includes one or more
of these fields. If such a statement is found, it will be passed to setPartitionFilter. If
the filter includes both partition and nonpartition keys and it can be split,† Pig will
split it and pass just the partition-key-related expression to setPartitionFilter. As an
example, consider an HCatalog‡ table web_server_logs that is partitioned by two fields,
date and colo:
logs = load 'web_server_logs' using HCatLoader();
cleaned = filter logs by date = '20110614' and NotABot(user_id);
...

Pig will call getPartitionKeys, and HCatLoader will return two key names, date and
colo. Pig will find the date field in the filter statement and rewrite the filter as shown
in the following example, pushing down the date = '20110614' predicate to HCat
Loader via setPartitionFilter:
logs = load 'web_server_logs' using HCatLoader();
cleaned = filter logs by NotABot(user_id);
...

It is now up to HCatalog loader to assure that it only returns data from


web_server_logs where date is 20110614.
The one exception to this is fields used in eval funcs or filter funcs. Pig assumes that
loaders do not understand how to invoke UDFs, so Pig will not push these expressions.
Our example loader works on file data, so it does not implement getPartitionKeys or
setPartitionFilter. For an example implementation of these methods, see the
HCatalog code at http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/
apache/hcatalog/pig/HCatLoader.java?view=markup.

† Meaning that the filter can be broken into two filters—one that contains the partition keys and one that does
not—and produce the same end result. This is possible when the expressions are connected by and but not
when they are connected by or.
‡ HCatalog is a table-management service for Hadoop. It includes Pig load and store functions. See “Metadata
in Hadoop” on page 169 for more information on HCatalog.

Load Functions | 155

www.it-ebooks.info
Casting bytearrays
If you need to control how binary data that your loader loads is cast to other data types,
you can implement the LoadCaster interface. Because this interface contains a lot of
methods, implementers often implement it as a separate class. This also allows load
functions to share implementations of LoadCaster, since Java does not support multiple
inheritance.
The interface consists of a series of methods: bytesToInteger, bytesToLong, etc. These
will be called to convert a bytearray to the appropriate type. Starting in 0.9, there are
two bytesToMap methods. You should implement the one that takes a ResourceField
Schema; the other one is for backward-compatibility. The bytesToBag, bytesToTuple, and
bytesToMap methods take a ResourceFieldSchema that describes the field being conver-
ted. Calling getSchema on this object will return a schema that describes this bag, tuple,
or map, if one exists. If Pig does not know the intended structure of the object, get
Schema will return null. Keep in mind that the schema of the bag will be one field, a
tuple, which in turn will have a schema describing the contents of that tuple.
A default load caster, Utf8StorageConverter, is provided. It handles converting UTF8-
encoded text to Pig types. Scalar conversions are done in a straightforward way. Maps
are expected to be surrounded by [] (square brackets), with keys separated by values
with # (hash) and key-value pairs separated by , (commas). Tuples are surrounded by
() (parentheses) and have fields separated by , (commas). Bags are surrounded by {}
(braces) and have tuples separated by , (commas). There is no ability to escape these
special characters.

Pushing down projections


Often a Pig Latin script will need to read only a few fields in the input. Some types of
storage formats store their data by fields instead of by records (for example, Hive’s
RCFile). For these types of formats, there is a significant performance gain to be had
by loading only those fields that will be used in the script. Even for record-oriented
storage formats, it can be useful to skip deserializing fields that will not be used.
As part of its optimizations, Pig analyzes Pig Latin scripts and determines what fields
in an input it needs at each step in the script. It uses this information to aggressively
drop fields it no longer needs. If the loader implements the LoadPushDown interface, Pig
can go a step further and provide this information to the loader.
Once Pig knows the fields it needs, it assembles them in a RequiredFieldList and passes
that to pushProjection. In the load function’s reply, it indicates whether it can meet the
request. It responds with a RequiredFieldResponse, which is a fancy wrapper around a
Boolean. If the Boolean is true, Pig will assume that only the required fields are being
returned from getNext. If it is false, Pig will assume that all fields are being returned by
getNext, and it will handle dropping the extra ones itself.

156 | Chapter 11: Writing Load and Store Functions

www.it-ebooks.info
The RequiredField class used to describe which fields are required is slightly complex.
Beyond allowing a user to specify whether a given field is required, it provides the ability
to specify which subfields of that field are required. For example, for maps, certain keys
can be listed as required. For tuples and bags, certain fields can be listed as required.
Load functions that implement LoadPushDown should not modify the schema object
returned by getSchema. This should always be the schema of the full input. Pig will
manage the translation between the schema having all of the fields and the results of
getNext having only some.
Our example loader does not implement LoadPushDown. For an example of a loader that
does, see HCatLoader at http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/
org/apache/hcatalog/pig/HCatLoader.java?view=markup.

Store Functions
Pig’s store function is, in many ways, a mirror image of the load function. It is built on
top of Hadoop’s OutputFormat. It takes Pig Tuples and creates key-value pairs that its
associated output format writes to storage.
StoreFunc is an abstract class, which allows it to provide default implementations for
some methods. However, some functions implement both load and store functionality;
PigStorage is one example. Because Java does not support multiple inheritance, the
interface StoreFuncInterface is provided. These dual load/store functions can imple-
ment this interface rather than extending StoreFunc.
Store function operations are split between the frontend and backend of Pig. Pig does
planning and optimization on the frontend. Store functions have an opportunity at this
time to check that a valid schema is being used and set up the storage location. On the
backend, store functions take a tuple from Pig, convert it to a key-value pair, and pass
it to a Hadoop RecordWriter. Store functions can pass information from frontend in-
vocations to backend invocations via UDFContext.

Store Function Frontend Planning


Store functions have three tasks to fulfill on the frontend:
• Instantiate the OutputFormat they will use to store data.
• Check the schema of the data being stored.
• Record the location where the data will be stored.

Determining OutputFormat
Pig calls getOutputFormat to get an instance of the output format that your store function
will use to store records. This method returns an instance rather than the classname or
the class itself. This allows your store function to control how the class is instantiated.

Store Functions | 157

www.it-ebooks.info
The example store function JsonStorage uses TextOutputFormat. This is an output for-
mat that stores text data in HDFS. We have to instantiate this with a key of LongWrita
ble and a value of Text to match the expectations of TextInputFormat:
// JsonStorage.java
public OutputFormat getOutputFormat() throws IOException {
return new TextOutputFormat<LongWritable, Text>();
}

Setting the output location


Pig calls setStoreLocation to communicate the location string the user provides to your
store function. Given the Pig Latin store Z into 'output';, “output” is the location
string. This method, called on both the frontend and the backend, could be called
multiple times; consequently, it should not have any side effects that will cause a prob-
lem if this happens. Your store function will need to communicate the location to its
output format. Our example store function uses the FileOutputFormat utility function
setOutputPath to do this:
// JsonStorage.java
public void setStoreLocation(String location, Job job) throws IOException {
FileOutputFormat.setOutputPath(job, new Path(location));
}

The Hadoop Job is passed to this function as well. Most output formats store the lo-
cation information in the job.
Pig calls setStoreLocation on both the frontend and backend because output formats
usually store their location in the job, as we see in our example store function. This
works for MapReduce jobs, where a single output format is guaranteed. But due to the
split operator, Pig can have more than one instance of the same store function in a
job. If multiple instances of a store function call FileOutputFormat.setOutputPath,
whichever instance calls it last will overwrite the others. Pig avoids this by keeping
output-specific information and calling setStoreLocation again on the backend so that
it can properly configure the output format.
For HDFS files, the user might provide a relative path. Pig needs to resolve these to
absolute paths using the current working directory at the time the store is called. To
accomplish this, Pig calls relToAbsPathForStoreLocation with the user-provided loca-
tion string before calling setStoreLocation. This method translates between relative
and absolute paths. For store functions writing to HDFS, the default implementation
in StoreFunc handles the conversion. If you are writing a store function that does not
use file paths (e.g., HBase), you should override this method to return the string it is
passed.

Checking the schema


As part of frontend planning, Pig gives your store function a chance to check the schema
of the data to be stored. If you are storing data to a system that expects a certain schema

158 | Chapter 11: Writing Load and Store Functions

www.it-ebooks.info
for the output (such as an RDBMS) or you cannot store certain data types, this is the
place to perform those checks. Oddly enough, this method returns a void rather than
a Boolean. So if you detect an issue with the schema, you must throw an IOException.
Our example store function does not have limitations on the schemas it can store.
However, it uses this function as a place to serialize the schema into UDFContext so that
it can be used on the backend when writing data:
// JsonStorage.java

public void checkSchema(ResourceSchema s) throws IOException {


UDFContext udfc = UDFContext.getUDFContext();
Properties p =
udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
p.setProperty("pig.jsonstorage.schema", s.toString());
}

Store Functions and UDFContext


Store functions work with UDFContext exactly as load functions do, but with one ex-
ception: the signature for store functions is passed to the store function via setStore
FuncUDFContextSignature. See “Passing Information from the Frontend to the Back-
end” on page 148 for a discussion of how load functions work with UDFContext. Our
example store function stores the signature in a member variable for later use:
// JsonStorage.java
public void setStoreFuncUDFContextSignature(String signature) {
udfcSignature = signature;
}

Writing Data
During backend processing, the store function is first initialized, and then takes Pig
tuples and converts them to key-value pairs to be written to storage.

Preparing to write
Pig calls your store function’s prepareToWrite method in each map or reduce task before
writing any data. This call passes a RecordWriter instance to use when writing data.
RecordWriter is a class that OutputFormat uses to write individual records. Pig will get
the record writer it passes to your store function by calling getRecordWriter on the
output format your store function returned from getOutputFormat. Your store function
will need to keep this reference so that it can be used in putNext.
The example store function JsonStorage also uses this method to read the schema out
of the UDFContext. It will use this schema when storing data. Finally, it creates a Json
Factory for use in putNext:
// JsonStorage.java
public void prepareToWrite(RecordWriter writer) throws IOException {

Store Functions | 159

www.it-ebooks.info
// Store the record writer reference so we can use it when it's time
// to write tuples.
this.writer = writer;

// Get the schema string from the UDFContext object.


UDFContext udfc = UDFContext.getUDFContext();
Properties p =
udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
String strSchema = p.getProperty("pig.jsonstorage.schema");
if (strSchema == null) {
throw new IOException("Could not find schema in UDF context");
}

// Parse the schema from the string stored in the properties object.
ResourceSchema schema =
new ResourceSchema(Utils.getSchemaFromString(strSchema));
fields = schema.getFields();

// Build a Json factory.


jsonFactory = new JsonFactory();
jsonFactory.configure(
JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
}

Writing records
putNext is the core method in the store function class. Pig calls this method for every
tuple it needs to store. Your store function needs to take these tuples and produce the
key-value pairs that its output format expects. For information on the Java objects in
which the data will be stored and how to extract them, see “Interacting with Pig val-
ues” on page 122.
JsonStorage encodes the contents of the tuple in JSON format and writes the resulting
string into the value field of TextOutputFormat. The key field is left null:
// JsonStorage.java
public void putNext(Tuple t) throws IOException {
// Build a ByteArrayOutputStream to write the JSON into.
ByteArrayOutputStream baos = new ByteArrayOutputStream(BUF_SIZE);
// Build the generator.
JsonGenerator json =
jsonFactory.createJsonGenerator(baos, JsonEncoding.UTF8);

// Write the beginning of the top-level tuple object.


json.writeStartObject();
for (int i = 0; i < fields.length; i++) {
writeField(json, fields[i], t.get(i));
}
json.writeEndObject();
json.close();

// Hand a null key and our string to Hadoop.


try {
writer.write(null, new Text(baos.toByteArray()));

160 | Chapter 11: Writing Load and Store Functions

www.it-ebooks.info
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}

private void writeField(JsonGenerator json,


ResourceFieldSchema field,
Object d) throws IOException {

// If the field is missing or the value is null, write a null.


if (d == null) {
json.writeNullField(field.getName());
return;
}

// Based on the field's type, write it out.


switch (field.getType()) {
case DataType.INTEGER:
json.writeNumberField(field.getName(), (Integer)d);
return;

case DataType.LONG:
json.writeNumberField(field.getName(), (Long)d);
return;

case DataType.FLOAT:
json.writeNumberField(field.getName(), (Float)d);
return;

case DataType.DOUBLE:
json.writeNumberField(field.getName(), (Double)d);
return;

case DataType.BYTEARRAY:
json.writeBinaryField(field.getName(), ((DataByteArray)d).get());
return;

case DataType.CHARARRAY:
json.writeStringField(field.getName(), (String)d);
return;

case DataType.MAP:
json.writeFieldName(field.getName());
json.writeStartObject();
for (Map.Entry<String, Object> e : ((Map<String, Object>)d).entrySet()) {
json.writeStringField(e.getKey(), e.getValue().toString());
}
json.writeEndObject();
return;

case DataType.TUPLE:
json.writeFieldName(field.getName());
json.writeStartObject();

ResourceSchema s = field.getSchema();

Store Functions | 161

www.it-ebooks.info
if (s == null) {
throw new IOException("Schemas must be fully specified to use "
+ "this storage function. No schema found for field " +
field.getName());
}
ResourceFieldSchema[] fs = s.getFields();

for (int j = 0; j < fs.length; j++) {


writeField(json, fs[j], ((Tuple)d).get(j));
}
json.writeEndObject();
return;

case DataType.BAG:
json.writeFieldName(field.getName());
json.writeStartArray();
s = field.getSchema();
if (s == null) {
throw new IOException("Schemas must be fully specified to use "
+ "this storage function. No schema found for field " +
field.getName());
}
fs = s.getFields();
if (fs.length != 1 || fs[0].getType() != DataType.TUPLE) {
throw new IOException("Found a bag without a tuple "
+ "inside!");
}
// Drill down the next level to the tuple's schema.
s = fs[0].getSchema();
if (s == null) {
throw new IOException("Schemas must be fully specified to use "
+ "this storage function. No schema found for field " +
field.getName());
}
fs = s.getFields();
for (Tuple t : (DataBag)d) {
json.writeStartObject();
for (int j = 0; j < fs.length; j++) {
writeField(json, fs[j], t.get(j));
}
json.writeEndObject();
}
json.writeEndArray();
return;
}
}

Failure Cleanup
When jobs fail after execution has started, your store function may need to clean up
partially stored results. Pig will call cleanupOnFailure to give your store function an
opportunity to do this. It passes the location string and the job object so that your store
function knows what it should clean up. In the HDFS case, the default implementation

162 | Chapter 11: Writing Load and Store Functions

www.it-ebooks.info
handles removing any output files created by the store function. You need to implement
this method only if you are storing data somewhere other than HDFS.

Storing Metadata
If your storage format can store schemas in addition to data, your store function can
implement the interface StoreMetadata. This provides a storeSchema method that is
called by Pig as part of its frontend operations. Pig passes storeSchema a Resource
Schema, the location string, and the job object so that it can connect to its storage. The
ResourceSchema is very similar to the Schema class described in “Input and Output Sche-
mas” on page 124. There is one important difference, however. In ResourceField
Schema, the schema object associated with a bag always has one field, which is a tuple.
The schema for the tuples in the bag is described by that tuple’s ResourceFieldSchema.
The example store function JsonStorage stores the schema in a side file named
_schema in the same directory as the data. The schema is stored as a string, using the
toString method provided by the class:
// JsonStorage.java
public void storeSchema(ResourceSchema schema, String location, Job job)
throws IOException {
// Store the schema in a side file in the same directory. MapReduce
// does not include files starting with "_" when reading data for a job.
FileSystem fs = FileSystem.get(job.getConfiguration());
DataOutputStream out = fs.create(new Path(location + "/_schema"));
out.writeBytes(schema.toString());
out.writeByte('\n');
out.close();
}

StoreMetadata also has a storeStatistics function, but Pig does not use this yet.

Store Functions | 163

www.it-ebooks.info
www.it-ebooks.info

You might also like