The join operation is performed in the map phase itself. Therefore, in the map side join, the mapper performs the join and it is mandatory that the input to each map is partitioned and sorted according to the keys.
Reduce-side Join
Mapper reads the input data which are to be combined based on common column or join key.
The mapper processes the input and adds a tag to the input to distinguish the input belonging from different sources or data sets or databases.
The mapper outputs the intermediate key-value pair where the key is nothing but the join key.
After the sorting and shuffling phase, a key and the list of values is generated for the reducer.
Now, the reducer joins the values present in the list with the key to give the final aggregated output.
When to Use MapReduce
is a very low-level framework
for experienced Java developers who are comfortable with the MapReduce programming paradigm
where detailed control of the execution has significant advantages
Spark
Overview
In 2009 Matei Zaharia and his team at UC Berkeley’s AMPLab researched possible improvements to the MapReduce framework.
Improves on
iterative machine learning
interactive data analysis
reusing a data set cached in memory for multiple processing tasks
main function to define the RDD (resilient distributed datasets) and their transformations / actions
Dag Scheduler
optimize the code and arrive an efficient DAG
Task Scheduler
cluster manager: YARN, Mesos, etc. has info
workers
assigned threads
location of data blocks
assigning tasks to workers
Worker
receives work and data
executes task without knowledge of the entire DAG
Basic Spark Concepts
RDD (RESILIENT DISTRIBUTED DATASETS)
RDDs are collections of serializable elements, and such a collection may be partitioned, in which case it is stored on multiple nodes
Spark determines the number of partitions by the input format
RDDs store their lineage — the set of transformations that was used to create the current state, starting from the first input format that was used to create the RDD
If the data is lost, Spark will replay the lineage to rebuild the lost RDDs so the job can continue
Spark would replay the “Good Replay” boxes and the “Lost Block” boxes to get the data needed to execute the final step
SHARED VARIABLES
broadcast variables
accumulator variables
SPARKCONTEXT
represents the connection to a Spark cluster
used to create RDDs, broadcast data, and initialize accumulators
TRANSFORMATIONS
transformations take one RDD and return another RDD
RDDs are immutable
transformations in Spark are always lazy
calling a transformation function only creates a new RDD with this specific lineage
transformations is only executed when an action is called
allows optimize the execution graph
Some core transformations:
map()
filter()
keyBy()
join()
groupByKey()
sort()
ACTION
take an RDD, perform a computation, and return the result to the driver application
result of the computation can be a collection, values printed to the screen, values saved to file, or similar
massively parallel processing (MPP) data warehouses
such as Netezza, Greenplum, and Teradata
delivers query latency and concurrency
significantly lower than that of Hive running on MapReduce
uses Hive SQL dialect and Hive Metastore
supports both HDFS and HBase as data sources, like Hive
supports the popular data formats
delimited text, SequenceFiles, Avro, and Parquet
Overview
shared nothing architecture
Impala daemons, impalad
running on each nodes, identical and interchangeable
responsible for
query planner
query coordinator
query execution engine
focus on the core functionality, executing queries as fast as possible
off-loaded data store to HDFS and HBase
off-loaded database and table management to Hive Metastore
distributed join strategies
broadcast hash joins
partitioned hash joins
query profiles
table scan rates
actual data sizes
amount of memory used
execution times
Speed-Oriented Design
in-memory processing
could spill to disk from 2.0 and later
minimum of 128GB to 256GB of RAM
not fault-tolerant, node lose will cause query failed
long running daemons
no startup cost
high concurrency
colocate for data locality
could be managed by YARN or Linux CGroups
efficient execution engine
implemented in C++
better advantage of vectorization, CPU instructions for text parsing, CRC32 computation, etc.
no JVM overhead
no Java GC latency
use of LLVM
Low Level Virtual Machine
compile the query to optimized machine code
machine code improves the efficiency of the code execution in the CPU by getting rid of the polymorphism
machine code generated uses optimizations available in modern CPUs (such as Sandy Bridge) to improve its I/O efficiency
the entire query and its functions are compiled into a single context of execution, Impala doesn’t have the same overhead of context switching because all function calls are inlined and there are no branches in the instruction pipeline, which makes execution even faster
data management system such as relational databases
logs, event data
files from existing storage system
Considerations:
Timeliness of data ingestion and accessibility
Incremental updates
Data access and processing
Source system and data structure
Partitioning and splitting of data
Storage format
Data transformation
Timeliness of Data Ingestion
Timeliness classification:
Macro batch (15 mins -)
Microbatch (2 mins -)
Near-Real-Time Decision Support (2 secs -)
Near-Real-Time Event Processing (100 msecs -)
Real Time
System complexity, cost, disk or memory.
Incremental Updates
append
notice the small files problem, may require periodic process to merge small files
write to
update
HDFS
delta file and compaction job
only work for multi-minute timeliness intervals
HBase
milliseconds timeliness
8 - 10 times slower scan (compare to HDFS)
Access Patterns
scan: HDFS (supports memory cache from Hadoop 2.3.0)
random access: HBase
search: Solr
Tool
Use cases
Storage device
MapReduce
Large batch processes
HDFS preferred
Hive
Batch processing with SQL-like language
HDFS preferred
Pig
Batch processing with a data flow language
HDFS preferred
Spark
Fast interactive processing
HDFS preferred
Giraph
Batch graph processing
HDFS preferred
Impala
MPP style SQL
HDFS is preferred for most cases
HBase API
Atomic puts, gets, and deletes on record-level data
HBase
Original Source System and Data Structure
READ SPEED OF THE DEVICES ON SOURCE SYSTEMS
Disk I/O: often a major bottleneck in any processing pipeline. Generally, with Hadoop we’ll see read speeds of anywhere from 20 MBps to 100 MBps, and there are limitations on the motherboard or controller for reading from all the disks on the system. To maximize read speeds, make sure to take advantage of as many disks as possible on the source system. On a typical drive three threads is normally required to maximize throughput, although this number will vary.
to use as many as multiple disks
multi-threads
ORIGINAL FILE TYPE
COMPRESSION
RELATIONAL DATABASE MANAGEMENT SYSTEMS
Apache Sqoop: a very rich tool with lots of options, but at the same time it is simple and easy to learn
batch process: slow timeliness
Sqoop
data pipeline split: one to RDBMS, one to HDFS
Flume or Kafka
network limited: edge node
RDBMS file dump
STREAMING DATA
LOGFILES
anti-pattern: read the logfiles from disk as they are written
because this is almost impossible to implement without losing data
recommend: Flume or Kafka
Transformations
Options:
Transformation
Partitioning
Splitting
For timeliness,
batch transformation
Hive, Pig, MapReduce, Spark
checkpoint for failure
all-or-nothing
streaming ingestion
Flume
interceptors
notice the performance issue, external call, GC, etc.
selectors
decide which of the roads of event data will go down
Network Bottlenecks
increase bandwidth
compress data
Network Security
encrypt with OpenSSL
Push or Pull
requirements:
Keeping track of what has been sent
Handling retries or failover options in case of failure
Being respectful of the source system that data is being ingested from
Access and security
SQOOP
a pull solution, requires
connection information for the source database
one or more tables to extract
ensure at a defined extraction rate
scheduled to not interfere with the source system’s peak load time
FLUME
Log4J appender
pushing events through a pipeline
spooling directory source or the JMS source
events are being pulled
Failure Handling
Failure scenarios need to be documented, including failure delay expectations and how data loss will be handled.
double-hop from a edge node due to some network policy
MOUNTABLE HDFS
allow to use common filesystem commands
not full POSIX semantic
not support random write
potential misuse: small files problem
mitigated by Solr for indexing, HBase, container format as Avro
implementations example
Fuse-DFS
no need to modify the Linux kernel
multiple hops, poor consistency model
not yet production ready
NFSv3
scaling by Hadoop NFS Gateway nodes
recommended to use only in small, manual data transfers
Considerations
single sink or multiple sinks
reliability or not
transformation or not
Sqoop: Batch Transfer Between Hadoop and Relational Databases
When used for importing data into Hadoop, Sqoop generates map-only MapReduce jobs where each mapper connects to the database using a Java database connectivity (JDBC) driver, selects a portion of the table to be imported, and writes the data to HDFS.
CHOOSING A SPLIT-BY COLUMN
split-by
num-mappers
USING DATABASE-SPECIFIC CONNECTORS WHENEVER AVAILABLE
USING THE GOLDILOCKS METHOD OF SQOOP PERFORMANCE TUNING
start with a very low number of mappers
gradually increase it to achieve a balance
LOADING MANY TABLES IN PARALLEL WITH FAIR SCHEDULER THROTTLING
Load the tables sequentially
Load the tables in parallel (Fair Scheduler)
DIAGNOSING BOTTLENECKS
Network bandwidth
likely to be either 1 GbE or 10 GbE (120 MBps or 1.2 GBps)
RDBMS
review the query generated by the mappers
in Sqoop incremental mode
make sure using index
ingest entire table
full table scans are typically preferred
monitor the database
schedule the execution time
Data skew
default splitting range evenly with min to max of split_by column values
choose --split-by
define --boundary-query
Connector
RDBMS-specific connector is preferred
Hadoop
check disk I/O, CPU utilization, and swapping on the DataNodes where the mappers are running
Inefficient access path
incredibly important the split column is either the partition key or has an index
if no such column, then use only one mapper
KEEPING HADOOP UPDATED
small table
just overwrite it
large table
delta
Incremental Sequence ID
Timestamp
write to a new directory
check {output_dir}/_SUCCESS
compaction with sqoop-merge
sorted and partitioned data set could be optimized in merge, map-only job
Flume: Event-Based Data Collection and Processing
FLUME ARCHITECTURE
[Flume Components]
Main components inside of the Flume agent JVM:
Sources
consume events from external sources and forward to channels
including AvroSource, SpoolDirectorySource, HTTPSource, and JMSSource
Interceptors
allow events to be intercepted and modified in flight
anything that can be implemented in a Java class
formatting, partitioning, filtering, splitting, validating, or applying metadata to events
Selectors
routing for events
fork to multiple channels, or send to a specific channel based on the event
Channels
store events until they’re consumed by a sink
memory channel, file channel, balancing performance with durability
Sinks
remove events from a channel and deliver to a destination
FLUME PATTERNS
Fan-in
Agents on Hadoop edge nodes.
Splitting data on ingest
Backup HDFS cluster for disaster recovery (DR).
Partitioning data on ingest
Ex. partition events by timestamp.
Splitting events for streaming analytics
Send to streaming such as Storm or Spark Streaming. (point a Flume Avro sink to Spark Streaming’s Flume Stream)
FILE FORMATS
Text files
with container format SequenceFiles or Avro
Avro is preferred because
it stores schema as part of the file,
and also compresses more efficiently.
providing better failure handling.
Columnar formats
RCFile, ORC, or Parquet are also not well suited for Flume
more data lose risk due to batch processing
Customized event serializers
override the EventSerializer interface to apply your own logic and create a custom output format
RECOMMENDATIONS
Flume sources
Batch size
notice of the network latency for sending acknowledge
start from 1,000
Threads
pushing source: add more clients or client threads
pulling source: configure more sources in the agent
Flume sinks
Number of sinks
channel to sink, is one-to-many
sink is single thread
limitation with more sinks should be the network or the CPU
Batch Sizes
overhead of an fsync system call
only big downside to large batches with a sink is an increased risk of duplicate events
balance between throughput and potential duplicates
Flume interceptors
capability to take an event or group of events and modify them, filter them, or split them
custom code comes with risk of issues like memory leaks or consuming excessive CPU
Flume memory channels
if performance is your primary consideration, and data loss is not an issue
better for streaming analytics sink
Flume file channels
it’s more durable than the memory channel
to use multiple disks
if using multiple file channels, use distinct directories, and preferably separate disks, for each channel
use dual checkpoint directories
better for persistent sink
JDBC channels
persists events to any JDBC-compliant data store
most durable channel, but also the least performant
Sizing Channels
Memory channels
can be fed by multiple sources
can be fetched from by multiple sinks
so for a pipeline, one channel in a node usually is enough
Channel size
large memory channel could have garbage collection activity that could slow down the whole agent
FINDING FLUME BOTTLENECKS
Latency between nodes
batch size or more threads
Throughput between nodes
data compression
Number of threads
Number of sinks
Channel
Garbage collection issues
Kafka
producers, brokers, consumers
topic, partition, offset
Large number of partitions:
Each partition can be consumed by at most one consumer from the same group
Therefore, we recommend at least as many partitions per node as there are servers in the cluster, possibly planning for a few years of growth.
There are no real downsides to having a few hundred partitions per topic.
KAFKA FAULT TOLERANCE
replica
leader, followers
Producer acknowledge:
all synchronized replicas
leader only
asynchronized
Consumer only read committed (all synchronized) messages.
Supported semantics,
at least once
consumer advances the offset after processing the messages
at most once
consumer advances the offset before processing the messages
exactly once
consumer advances the offset, and processes the messages at the same time with two-phase commits
Multiple data centers deployment.
KAFKA AND HADOOP
Kafka
Flume
Hadoop ingest solution
less
more
Required code writing
yes
no
Fault tolerant
higher
lower
Performance
higher
lower
Flume with Kafka
Kafka source
consumer, reads data from Kafka and sends it to the Flume channel
adding multiple sources with the same groupId for load balancing and high availability
batch size tuning
Kafka sink
producer, sends data from a Flume channel to Kafka
batch size tuning
Kafka channel
combines a producer and a consumer
each batch will be sent to a separate Kafka partition, so the writes will be load-balanced
Camus
ingesting data from Kafka to HDFS
automatic discovery of Kafka topics from ZooKeeper
conversion of messages to Avro and Avro schema management
automatic partitioning
all-or-nothing batch processing
need to write decoder to convert Kafka messages to Avro
Data Extraction
Moving data from Hadoop to an RDBMS or data warehouse
In most cases, Sqoop will be the appropriate choice for ingesting the transformed data into the target database.
Moving data between Hadoop clusters
DistCp uses MapReduce to perform parallel transfers of large volumes of data.
DistCp is also suitable when either the source or target is a non-HDFS filesystem—for example, an increasingly common need is to move data into a cloud-based system, such as Amazon’s Simple Storage System (S3).
The sync marker permits seeking to a random point in a file and then re-synchronizing input with record boundaries. This is required to be able to efficiently split large files for MapReduce processing.
But what it actually marks? And how it could be used in “seeking”?
Here is my starting investigation.
The code piece for sync marker generation.
1 2 3 4 5 6 7
publicstaticclassWriterimplementsjava.io.Closeable, Syncable{ ... MessageDigest digester = MessageDigest.getInstance("MD5"); long time = Time.now(); digester.update((new UID()+"@"+time).getBytes(StandardCharsets.UTF_8)); sync = digester.digest(); ...
From these codes, the sync marker seems just being generated in the “Writer” initialization once, and write into the file header and the output while the output buffer full over a certain size.
In Writer & RecordCompressWriter: refer to the SYNC_INTERVAL
refer to this commit, it has been changed from 100 * SYNC_SIZE to 5 * 1024 * SYNC_SIZE
In BlobkCompressWriter: refer to IO_SEQFILE_COMPRESS_BLOCKSIZE_KEY/DEFAULT (default: 1,000,000)
/** Seek to the next sync mark past a given position.*/ publicsynchronizedvoidsync(long position)throws IOException { if (position+SYNC_SIZE >= end) { seek(end); return; }
if (position < headerEnd) { // seek directly to first record in.seek(headerEnd); // note the sync marker "seen" in the header syncSeen = true; return; }
try { seek(position+4); // skip escape in.readFully(syncCheck); int syncLen = sync.length; for (int i = 0; in.getPos() < end; i++) { int j = 0; for (; j < syncLen; j++) { if (sync[j] != syncCheck[(i+j)%syncLen]) break; } if (j == syncLen) { in.seek(in.getPos() - SYNC_SIZE); // position before sync return; } syncCheck[i%syncLen] = in.readByte(); } } catch (ChecksumException e) { // checksum failure handleChecksumException(e); } }
has some deficiencies that prevent optimal performance for query times and compression (what’s this exactly?)
ORC
- lightweight, always-on compression - zlib, LZO, Snappy - predicate push down - Hive type, including decimal, complex - splittable
- only designed for Hive, not general purpose
Parquet
- per-column level compression - support nested data structure - full metadata, self-documenting - fully support Avro, Thrift API - efficient and extensible encoding schemas, RLE
Avro and Parquet
single interface: recommended if you are choosing for the single interface
compatibility: Parquet can be read and written to with Avro APIs and Avro schemas
Compression
disk & network I/O
source & intermediate data
trade with CPU loading
splittability for parallelism and data locality
Format
Summary
Limitation
Snappy
- developed at Google - high speed and reasonable compression rate
- not inherently splittable - intended to be used with a container format
LZO
- very efficient decompression - splittable
- requires additional indexing - requires a separated installation from Hadoop because of license prevention
Gzip
- good compression rate, 2.5x to Snappy - read almost as fast as Snappy
- write speed about half to Snappy - not splittable - fewer blocks might lead to lower parallelism => using smaller blocks
bzip2
- excellent compression rate, 9% better than Gzip
- slow read / write, 10x slower than Gzip - only used in archival purposes
Compression Recommendation
Enable compression of the MapReduce intermediate data
Compress on columnar chunks
With splittable container formats, ex. Avro or SequenceFiles, make the compression & decompression could be processed individually
Unlike traditional data warehouses, however, HDFS doesn’t store indexes on the data. This lack of indexes plays a large role in speeding up data ingest, but it also means that every query will have to read the entire data set even when you’re processing only a small subset of the data (a pattern called full table scan).
Main purpose: reduce the amount of I/O required
Common pattern: <data set name>/<partition_column_name=partition_column_value>/{files}
Understood by: HCatalog, Hive, Impala, Pig, etc.
BUCKETING
Not always the key is good for partitioning. ex. physician, may result in too many partitions and too small in file size.
small files problem:
Storing a large number of small files in Hadoop can lead to excessive memory use for the NameNode, since metadata for each file stored in HDFS is held in memory. Also, many small files can lead to many processing tasks, causing excessive overhead in processing.
Bucketing is the solution,
be able to control the size of the data subsets
good average bucket size is a few multiples of the HDFS block size
having an even distribution of data when hashed on the bucketing column is important because it leads to consistent bucketing
having the number of buckets as a power of two is quite common
joining with bucketing
reduce-side join
if for two data sets, both are bucketed on the join key
and the number of buckets is factor and multiple
could be done by bucket individually join, to save reduce-side join complexity
map-side join
if the bucket size can be fit into memory, map-side join can further improve performance
merge join
if the data in the buckets is sorted, it is also possible to use a merge join
requires less memory
Based on common query patterns,
decide partitioning and bucketing
for multiple patterns, consider to have multiple store
trade space to query speed
DENORMALIZING
In relational databases, data is often stored in third normal form. In Hadoop, however, joins are often the slowest operations and consume the most resources from the cluster.
prejoined, preaggregated
consolidates many of the small dimension tables into a few larger dimensions
data preprocessing, like aggregation or data type conversion, Materialized Views
HBase Schema Design
Distributed key-value store which could operate,
put
get
iterate
value increment
delete
Row Key
RECORD RETRIEVAL
unlimited columns
single key
may need to combine multiple pieces of information in a single key
get single record is the fastest
put most common uses of the data into a single get
denormalized
very “wide” table
DISTRIBUTION
Row key determines scattering throughout various regions. So, it’s usually best to choose row keys so the load on the cluster is fairly distributed.
Anti-pattern: use a timestamp for row keys
easy to hit into single region and defeats the parallelism
BLOCK CACHE
HBase block in chunks of default 64 KB with least recently used (LRU) cache.
Anti-pattern: row key by hash of some attribute
records in the same block could be “un-relevance”, and to reduce the cache hit rate
ABILITY TO SCAN
A wise selection of row key can be used to co-locate related records in the same region.
HBase scan rates are about eight times slower than HDFS scan rates.
SIZE
Trade-off:
shorter row keys: lower storage overhead and faster read/write performance
longer row keys: better get/scan properties
READABILITY
Recommend to use readable prefix.
easier to identify and debug issues
easier to use the HBase console
UNIQUENESS
Require to be unique key.
Timestamp
timestamp’s important purposes:
determines newer record put
determines the order when multiple versions are requested
determines to remove while time-to-live (TTL)
Hops
Hops: the number of synchronized get requests required to retrieve the requested info
best to avoid them through better schema design. ex, by leveraging denormalization.
every hop is a round-trip to HBase that incurs a significant performance overhead
Tables and Regions
one region server per node
multiple regions pre region server
for a region, it’s pinned to a region server at a time
tables are split into regions and scattered across region servers
The number of regions for a table is a trade-off between put performance and compaction time.
Put performance
memstore:
cache structure present on every HBase region server
wrtie => cahce => sort => flush
more regions in a region server => less memstore space pre region => smaller flush & HFiles => less performant
ideal flush size: 100 MB
Compaction time
region size limit: 20GB (default) - 120GB
region assignment:
auto splitting
forever-growing data set, only update most recent data, with periodic TTL-based compaction, no need to compact the ole regions
assign the region number
recommended in most of cases
set the region size to a high enough value (e.g., 100 GB per region) to avoid autosplitting
split policy selected, ConstantSizeRegionSplitPolicy or DisabledRegionSplitPolicy
Using Columns
Two different schema structures:
Physical Columns
RowKey
TimeStamp
Column
Value
101
1395531114
F
A1
101
1395531114
B
B1
Combined Logical Columns
RowKey
TimeStamp
Column
Value
101
1395531114
X
A1|B1
Considerations:
dependency on read, write, TTL
number of records can fit in the block cache
amount of data can fit through the WAL
number of records can fit into the memstore
compaction time
Using Column Families
column families: a column family is essentially a container for columns, each column family has its own set of HFiles and gets compacted independently of other column families in the same table.
Use case: the get/put rate of the subset of columns are significant different, separate them to different culomn families would be beneficial of
lower compaction cost (by put)
better use of block cache (by get)
Time-to-Live
TTL: built-in feature of HBase that ages out data based on its timestamp
ignore outdated records during the major compaction
the HFile record timestamp will be used
if TTL not used, but delete records manually, it’d require full scan and insert the “delete records” (could be TBs), and also need the major compaction eventually
Managing Metadata
What Is Metadata?
In general, refers to data about the data.
about logical data sets, usually stored in a separate metadata repository
location
dir path in HDFS
table name in HBase
schema
partitioning and sorting properties
format
about files on HDFS, usually stored and managed by Hadoop NameNode
permissions and ownership
location of various blocks of that file on data nodes
about tables in HBase, stored and managed by HBase itself
about data set statistics, useful for various tools that can leverage it for optimizing their execution plans but also for data analysts, who can do quick analysis based on it
the number of rows in a data set
the number of unique values in each column
a histogram of the distribution of data
maximum and minimum values
Why Care About Metadata?
It allows to,
interact with higher-level logical abstraction
supply information that can then be leveraged by various tools
data management tools to “hook” into this metadata and allow you to perform data discovery and lineage analysis
Where to Store Metadata?
Hive metastore (database & service)
deployed mode
embedded metastore
local metastore
remote metastore
MySQL (most common), PostgreSQL, Derby, and Oracle
could be used by Hive, Impala seamless
HCatalog
WebHCat REST API
could be used for MapReduce, Pig, and standalone applications
Java API
could be used for MapReduce, Spark, or Cascading
CLI
Limitations of the Hive Metastore and HCatalog
High availability
HA for metastore database
HA for metastore service
concurrency issue unresolved, HIVE-4759
Fixed schema
only for tabular abstraction data sets
ex. not for image or video data sets
Additional dependency
the metastore database itself is just another dependent component
Other Ways of Storing Metadata
Embedded in HDFS paths
partitioned data sets
<data set name>/<partition_column_name=partition_column_value>/{files}