Hadoop Tutorials
Daniel Lanza
Zbigniew Baranowski
Z
4 sessions
• Hadoop Foundations (today)
• Data Ingestion (20-July)
• Spark (3-Aug)
• Data Analytic tools and techniques (31-Aug)
Z
Hadoop Foundations
Z
Goals for today
• Introduction to Hadoop
• Explore and run reports on example data with
Apache Impala (SQL)
• Visualize the result with HUE
• Evaluate different data formats and
techniques to improve performance
Z
Hands-on setup
• 12 node virtualized cluster
– 8GB of RAM, 4 cores per node
– 20GB of SSD storage per node
• Access (haperf10[1-12].cern.ch)
– Everybody who subscribed should have the access
– Try: ssh haperf105 'hdfs dfs -ls‘
• List of commands and queries to be used
$> sh /afs/cern.ch/project/db/htutorials/tutorial_follow_up
Z
What is Hadoop?
• A framework for large scale data processing
– Data Volume (Terabytes, Zettabytes)
– Data Variety (Structured, Unstructured)
– Data Velocity ( Stream processing)
6
Z
What is Hadoop? Architecture
• Data locality (shared nothing) – scales out
Interconnect network
CPU CPU CPU CPU CPU CPU
MEMORY MEMORY MEMORY MEMORY MEMORY MEMORY
Disks Disks Disks Disks Disks Disks
Node 1 Node 2 Node 3 Node 4 Node 5 Node X
7
Z
Z
Zookeeper
Coordination
Flume
Log data collector
HDFS
Impala
SQL
Spark
Large scale data proceesing
YARN
Hadoop Distributed File System
Mahout
Machine learning
Oozie
Workflow manager
Cluster resource manager
Sqoop
Data exchange with RDBMS
MapReduce
Pig
Scripting
Hive
SQL
What is Hadoop? Set of components
HBase
8
NoSql columnar store
Hadoop cluster architecture
• Master and slaves approach
Interconnect network
HDFS YARN Hive
NameNode ResourceManager metastore
Various Various Various
Various Various Various component component component
component component component agents and agents and agents and
agents and agents and agents and demons demons demons
masters masters demons
YARN Node YARN Node YARN Node YARN Node YARN Node YARN Node
Manager Manager Manager Manager Manager Manager
HDFS HDFS HDFS HDFS HDFS HDFS
DataNode DataNode DataNode DataNode DataNode DataNode
Node 1 Node 2 Node 3 Node 4 Node 5 Node X
9
Z
HDFS in nutshell
• Distributed files system for Hadoop
– Fault tolerant -> multiple replicas of data spread across
a cluster
– Scalable -> design to deliver high throughputs,
sacrificing an access latency
– Files cannot be modified in place
• Architecture
– NameNode -> maintains and manages file system
metadata (in RAM)
– DataNodes -> store and manipulate the data (blocks)
Z
How HDFS stores the data
1) File to be stored on HDFS
102 2) Splitting into 256MB
256MB 256MB 1.1GB
256MB 256MB MB blocks
3) Ask NameNode
4) Blocks with their replicas (by default 3) are where to put them
distributed across Data Nodes
256MB 256MB 256MB 256MB
256MB 256MB 256MB 256MB
256MB 102
256MB 256MB
MB
102 102
256MB
MB MB
Z DataNode1 DataNode2 DataNode3 DataNode4
Interacting with HDFS
• Command line (examples)
hdfs dfs –ls #listing home dir
hdfs dfs –ls /user #listing user dir…
hdfs dfs –du –h /user #space used
hdfs dfs –mkdir newdir #creating dir
hdfs dfs –put myfile.csv . #storing a file on HDFS
hdfs dfs –get myfile.csv . #getting a file fr HDFS
• Programing bindings
– Java, Python, C++
Z More about HDFS: https://indico.cern.ch/event/404527/
Using Hadoop for data processing
• Get/produce the data
• Load data to Hadoop
• (optional) restructure it into optimized form
• Process the data (SQL, Scala, Java)
• Present/visualise the results
D
Using Hadoop for data processing
• Get/produce the data
• Load data to Hadoop
• (optional) restructure it into optimized form
• Process the data (SQL, Scala, Java)
• Present/visualise the results
D
Example data
• Source
– Meetups are: neighbours getting together to learn
something, do something, share something…
• Streaming API
– curl -s http://stream.meetup.com/2/rsvps
D
Using Hadoop for data processing
• Get/produce the data
• Load data to Hadoop
• (optional) restructure it into optimized form
• Process the data (SQL, Scala, Java)
• Present/visualise the results
D
Loading the data with HDFS command
• Store it locally and then move it to HDFS
– curl -s http://stream.meetup.com/2/rsvps -o meetup_data.json
• Ctrl + C
– hdfs dfs -moveFromLocal meetup_data.json meetup.json
• Directly
– curl -s http://stream.meetup.com/2/rsvps | head -10 |
hdfs dfs -put - meetup.json
• Showing
– hdfs dfs -cat meetup.json
D
Pre-proccesing required
• Convert JSON to Parquet
– SparkSQL
> spark-shell
scala> val meetup_data = sqlContext.read.json("meetup.json")
scala> val sel = meetup_data.select("*").withColumnRenamed("group","group_info")
scala> sel.saveAsParquetFile("meetup_parquet")
• Convert to CSV with Impala
– Create external table
CREATE EXTERNAL TABLE meetup_parquet
LIKE PARQUETFILE '/user/<user_name>/meetup_parquet/<any_parquet_file>.gz.parquet'
STORED AS parquet
LOCATION '/user/<user_name>/meetup_parquet/';
– Create table as select
CREATE TABLE meetup_csv
row format delimited fields terminated by '\t' ESCAPED BY '"' LINES TERMINATED BY '\n'
AS SELECT
... all interesting columns ...
FROM meetup_parquet;
D
Using Hadoop for data processing
• Produce the data
• Load data to Hadoop
• (optional) restructure it into optimized form
• Process the data (SQL, Scala, Java)
• Visualise the results
D
Why SQL?
• It is simple and powerful
– interactive, ad-hoc
– declarative data processing
– no need to compile
• Good for data exploration and reporting
• Structured data
– organization of the data in table abstractions
– optimized processing
D
Apache Impala
• MPP SQL query engine running on Apache Hadoop
• Low latency SQL queries on
– Files stored on HDFS , Apache HBase and Apache Kudu
• Faster than Map-Reduce (Hive)
• C++, no Java GC
Application
ODBC
Q
Res
SS
LL
Q
ult
Res
ult
Query Planner Query Planner Query Planner
Query Coordinator Query Coordinator Query Coordinator
Query Executor Query Executor Query Executor
HDFS HDFS HDFS
D More about Impala and Hive: https://indico.cern.ch/event/434650/
Creating our own table
• Create table
CREATE TABLE meetup_csv
(event_id string, event_name string, ...);
CREATE TABLE meetup_csv
LIKE meetup_csv;
• Populate table
INSERT INTO meetup_csv
SELECT * FROM meetup_csv;
• Create table as select
CREATE TABLE meetup_csv
AS SELECT * from meetup_csv;
D
Querying the data
• Counting records (SQL Hello world!)
SELECT count(*) FROM meetup_csv;
• Most interesting meetups
SELECT DISTINCT event_name, group_name, venue_name
FROM meetup_csv
WHERE event_id IN
(SELECT event_id FROM meetup_csv
GROUP BY event_id ORDER BY count(*) desc
LIMIT 10);
• Not interesting meetings (people did not accept)
SELECT event_name, response, count(*)
FROM meetup_csv
WHERE response='no'
GROUP BY event_name, response
ORDER BY 3 desc;
D
Using Hadoop for data processing
• Produce the data
• Load data to Hadoop
• (optional) restructure it into optimized form
• Process the data (SQL, Scala, Java)
• Visualise the results
D
HUE – Hadoop User Experience
• Web interface to main Hadoop components
– HDFS, Hive, Impala, Sqoop, Oozie, Solr etc.
• HDFS: FS browser, permission and ACLs
configuration, file uploading
• SQL: query execution, results visualisation
• http://haperf100.cern.ch:8888/
D
How to check a profile of the execution
• Impala has build in query profile feature
$ impala-shell
> SELECT event_name, event_url, member_name, venue_name, venue_lat,
venue_lon FROM meetup_csv
WHERE time BETWEEN unix_timestamp("2016-07-06 10:30:00")*1000
AND unix_timestamp("2016-07-06 12:00:00")*1000;
> profile;
• See execution plan
• Per machine or cluster average
– How much data was read from HDFS
– How much CPU time was spent on certain operations
– etc.
Z
profile
• Execution plan profile
• Details for HDFS SCAN fragment (averaged)
Can we optimize the execution?
• Reading all the data: 159.57MB
• Data are stored as text -> not optimally!
• Binary format?
• Apache Avro
Z
Apache Avro data file
• Fast, binary serialization format
• Internal schema with multiple data types
including nested ones
– scalars, arrays, maps, structs, etc
• Schema in JSON
{
"type": "record",
"name": "test", Record {a=27, b=‘foo’}
"fields" : [
{"name": "a", "type":
"long"},
Encoded (hex): 36 06 66 6f 6f
{"name": "b", "type":
"string"}
long – variable- String
Z ] String chars
length zigzag length
Creating Avro table in Impala
• Creating table
CREATE TABLE meetup_avro
LIKE meetup_csv
STORED AS avro;
• Populating the table
INSERT INTO meetup_avro
SELECT * FROM meetup_csv;
• Data size in Avro: 76MB (in CSV was 159MB)
• Run the queries
– ~1.4s (in CSV was ~2s)
Z
Can we do it better? (2)
• Still reading more (all) data than needed!
• What if data is stored in such a way that only a
subset needs to be read
• Use partitioning!
Z
Data partitioning (horizontal)
• Group data by certain attribute(s) in separate
directories
• Will reduce amount of data to be read
Day Month Year No of customers
Aug 2013
10 Aug 2013 17
11 Aug 2013 15
/user/zaza/mydata/Aug2013/data
12 Aug 2013 21
2 Dec 2014 30
3 Dec 2014 34 Dec 2014
4 Dec 2014 31
17 Feb 2015 12 /user/zaza/mydata/Dec2014/data
18 Feb 2015 16
Feb 2015
Z /user/zaza/mydata/Dec2015/data
Partitioning the data with Impala
• Create a new partitioning table
CREATE TABLE meetup_avro_part
(event_id string, event_name string,
time bigint, event_url string,
group_id bigint, group_name string,
group_city string, group_country string,
group_lat double, group_lon double,
group_state string, group_urlname string,
guests bigint, member_id bigint,
member_name string, photo string,
mtime bigint, response string,
rsvp_id bigint, venue_id bigint,
venue_name string, venue_lat double,
venue_lon double)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS avro;
Z
Partitioning the data with Impala
• Populating partitioning table
– the data needs to be reload
INSERT INTO meetup_avro_part
PARTITION (year, month, day)
SELECT *,
year(from_unixtime(cast(time/1000 as bigint))),
month(from_unixtime(cast(time/1000 as bigint))),
day(from_unixtime(cast(time/1000 as bigint)))
FROM meetup_avro;
/user/zaza/mydata/year=2016/month=7/day=6/data
– Impala will create automatically directories like:
• Filter predicates has to be specified on partitioning columns
– where year=2016 and month=7 and day=6
Z
Can we do even better? (3)
• We are interested in reading certain columns
• But we are always reading entire row data
• Solution?
• Columnar store
Col1 Col2 Col3 Col4
D
Parquet data format
• Based on Google “Dremel”
Columnar storage
Pushdowns
D
Slicing and dicing
• Horizontal and vertical partitioning – for
efficient data processing
Col1 Col2 Col3 Col4
Col1 Col2 Col3 Col4
Col1 Col2 Col3 Col4
D
Horizontal and vertical partitioning
• Create a new table
CREATE TABLE meetup_parquet_part
(event_id string, event_name string,
time bigint, event_url string,
group_id bigint, group_name string,
group_city string, group_country string,
group_lat double, group_lon double,
group_state string, group_urlname string,
guests bigint, member_id bigint,
member_name string, photo string,
mtime bigint, response string,
rsvp_id bigint, venue_id bigint,
venue_name string, venue_lat double,
venue_lon double)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS parquet;
D
Horizontal and vertical partitioning
• Populating partitioning table
– the data needs to be reload
INSERT INTO meetup_parquet_part
PARTITION (year, month, day)
SELECT *,
year(from_unixtime(cast(time/1000 as bigint))),
month(from_unixtime(cast(time/1000 as bigint))),
day(from_unixtime(cast(time/1000 as bigint)))
FROM meetup_avro;
– Size 42MB
• Run queries
D
Can we query faster? (4)
• Use compression?
– Snappy – lightweight with decent compression rate
– Gzip – to save more space but affect performance
• Using an index?
• In Hadoop there is a ‘format’ that has an index
-> HBase
Z
HBase in a nutshell
• HBase is a key-value store on top of HDFS
– horizontal (regions) + vertical (col. families) partitioning
– row key values are indexed within regions
– data typefree – data stored in bytes arrays
• Fast random data access by key
• Stored data can be modified (updated, deleted)
• Has multiple bindings
– SQL (Impala/Hive, Phoenix), Java, Python
• Very good for massive concurrent random data access
• ..but not good for big data sequential processing!
Z
HBase: master-slaves architecture
• HBase master
– assigns table regions/partitions to region servers
– maintains metadata and table schemas
• HBase region servers
– servers clients requests (reading and writing)
– maintain and store the region data on HDFS
– writes WAL in order to recover the data after a
failure
– performs region splitting when needed
HBase table data organisation
More about HBase: https://indico.cern.ch/event/439742/
Creating and loading data to an HBase table with SQL
• Creating HBase table (with 4 column families)
$ hbase shell
> create 'meetup_<username>', 'event', 'group', 'member', 'venue'
> quit
• Mapping Hive/Impala table to HBase table
$ hive
> CREATE EXTERNAL TABLE meetup_hbase
(key string, event_id string, event_name string, time bigint, ...)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,event:event_name,event:time,event:event_url,group:group_id,group:gro
up_name,group:group_city,group:group_country,group:group_lat,… ")
TBLPROPERTIES("hbase.table.name" = "meetup_<username>");
• Populating the table (key=event_time,even_if,modification time)
$ hive
$ hive
> INSERT INTO meetup_hbase SELECT concat(
cast(nvl(time, 0) as string), event_id, cast(mtime as string)), *
FROM meetup_csv;
Z
Query data by key on HBase through Impala/Hive
• Run queries
$ impala-shell
> SELECT *
FROM meetup_hbase
WHERE key BETWEEN "1462060800" AND "1467331200";
> SELECT *
FROM meetup_hbase
WHERE key BETWEEN
cast(unix_timestamp("2016-07-06 10:30:00") as string)
AND cast(unix_timestamp("2016-07-06 12:00:00") as string);
> SELECT * FROM meetup_hbase
WHERE key = '14679936000002319268721467404430663'
Z
Formats summary
• Hands-on results
data size (MB)
900
query time (s)
770 2 1.9
800
700 1.4
600 1.5
500 0.87 0.8
1
400
300 0.5
159 0.5
200
76 76 42
100 0
0 CSV Avro Avro Parquet HBase
CSV Avro Avro partitioned Parquet HBase partitioned partitioned
partitioned
• Production data
When to use what?
• Partitioning -> always when possible
• Fast full data (all columns) processing -> Avro
• Fast analytics on subset of columns -> Parquet
• Only when predicates on the same key columns -> HBase
(data deduplication, low latency, parallel access)
• Compression in order to further reduce the data volume
– without sacrificing performance -> Snappy
– when data access is sporadic -> Gzip/Bzip or derived
Z
Summary
• Hadoop is a framework for distributed data
processing
– designed to scale out
– optimized for sequential data processing
– HDFS is the core of the system
– many components with multiple functionalities
• You do not have to be a Java guru to start using it
• Choosing data format, partitioning scheme is a key
to achieve good performance and optimal resource
utilisation
Z
Questions & feedback
[email protected]