Hadoop Application Architectures Ch.6 Orchestration

Overview

System of,

  • workflow orchestration
  • workflow automation
  • business process automation
  • scheduling, coordinating, and managing workflows

Each of jobs, referred to as an action, could be

  • scheduled
    • at a particular time
    • periodic interval
    • triggered by events or status
  • coordinated
    • when a previous action finishes successfully
  • managing to
    • send notification mails
    • record the time taken

Good workflow orchestration engines will

  • expressed as a DAG
  • help defining the interfaces between workflow components
  • support metadata and data lineage tracking
  • integration between various software system
  • data lifecycle management
  • track and report data quality
  • workflow components repository
  • flexible scheduling
  • dependency management
  • centralized status monitoring
  • workflow failure recovery
  • workflow rolling back
  • report generation
  • parameterized workflow
  • arguments passing

Orchestration Framworks

Workflow Engine Summary
Apache Oozie developed by Yahoo!, in order to support its growing Hadoop clusters and the increasing number of jobs and workflows running on those clusters
Azkaban developed by LinkedIn, with the goal of being a visual and easy way to manage workflows
Luigi an open source Python package from Spotify, that allows you to orchestrate long-running batch jobs and has built-in support for Hadoop
Airflow created by Airbnb, an open source Python workflow management system designed for authoring, scheduling, and monitoring workflows

Considerations,

  • ease of installation
  • community involvement and uptake
  • UI support
  • testing
  • logs
  • workflow management
  • error handling

Oozie Architecture

Azkaban Architecture

Workflow Patterns

Point-to-Point Workflow

Fan-out Workflow

Capture-and-Decision Workflow

Scheduling Patterns

  • Frequency Scheduling
    • Note: DST cause that a day (with Timezone info) will not always be 24 hours
  • Time and Data Triggers

Hadoop Application Architectures Ch.5 Graph Processing on Hadoop

Use cases:

  • page ranking
  • social network
  • investment funds underlying equities
  • route planning

gragh querying v.s. graph processing

onion joining v.s. message sending

The Bulk Synchronous Parallel (BSP) Model

  • proposed by Leslie Valiant of Harvard, a British computer scientist
  • at the core of the Google graph processing solution, Pregel
  • the distributed processes can send messages to each other, but they cannot act upon those messages until the next superstep

Graph

  • an open source implementation of Google’s Pregel
  • main stages
    • read and partition the data
    • batch-process the graph with BSP
    • write the graph back to disk

GraphX

  • contains an implementation of the Pregel API built on the Spark DAG engine
  • RDD representation of EdgeRDD and VertexRDD
  • could be mixed with Spark transformations

Join in Distributed SQL Engine

Join in MapReduce

https://www.edureka.co/blog/mapreduce-example-reduce-side-join/

  • Replicated Join (Map Side Join)
  • Reduce Side Join
  • Reduce Side Join with Bloom Filter
  • Composite Join

Join in Spark SQL

https://medium.com/datakaresolutions/optimize-spark-sql-joins-c81b4e3ed7da

  • Sort-Merge Join
  • Broadcast Join
  • Shuffle Hash Join

Join in Impala

https://impala.apache.org/docs/build/html/topics/impala_perf_joins.html

  • Broadcast Join
  • Partitioned Join

Join in Presto

https://prestodb.io/docs/current/admin/properties.html#general-properties

  • Broadcast Join
  • Distributed Hash Join

Hadoop Application Architectures Ch.4 Common Hadoop Processing Patterns

Examples:

  • Removing Duplicate Records by Primary Key (Compaction)
  • Using Windowing Analysis
  • Updating Time Series Data

Removing Duplicate Records by Primary Key

  • Spark
    • map() to keyedRDD, reduceByKey() to compaction
  • SQL
    • GROUP BY primary key, SELECT MAX(TIME_STAMP)
    • JOIN back to filter on the original table

Windowing Analysis

Find the valley and peak.

  • Spark
    • partition by primary key’s hash, sorted by timestamp
    • mapPartitions
      • iterate the sorted partition to address peak and valley
  • SQL
    • SELECT LEAD() and LAG() OVER (PARTITION BY PRIMARY_KEY ORDER BY POSITION)
    • SELECT CASE
      • WHEN VALUE > LEAD and LAG, THEN 'PEAK'
      • WHEN VALUE < LEAD and LAG, THEN 'VALLEY'
    • Note: multiple windowing operations with SQL will increase the disk I/O overhead and lead to performance decrease

Time Series Modifications

  • HBase and Versioning
    • advantage:
      • modifications are very fast, simply update
    • disadvantage:
      • penalty in getting historical versions
      • performing large scans or block cache reads
  • HBase with a RowKey of RecordKey and StartTime
    • get existing record
    • put back with update stop time
    • put the new current record
    • advantage:
      • faster version retrieve
    • disadvantage:
      • slower update, requires 1 get and 2 puts
      • still has the large scan and block cache problems
  • Partitions on HDFS for Current and Historical Records
    • partitioning into
      • most current records partition
      • historic records partition
    • batch update
      • for updated “current” records, update the stop time and append to historic records partition
      • add new update into most current records partition

Hadoop Application Architectures Ch.3 Processing Data in Hadoop

  • MapReduce
  • Spark
  • Hive, Pig, Crunch, Cascading

Shared Nothing Architectures

  • scalability
  • fault-tolerant

MapReduce

Overview

  • introduced by Jeffrey Dean and Sanjay Ghemawat from Google with paper
  • map phase / sort & shuffle phase / reduce phase
  • input / output of each phase are key-value pairs
  • output of mapper and reducer is written to disk
    • syncronization barrier (inefficient for iterative processing)
  • mapper processes a single pair at a time
  • mapper pass key-value pairs as output to reducers
  • mapper can’t pass information to other mappers

Mapper

  • InputFormat class
    • getSplits()
      • determines the number of map processes
      • determines the cluster nodes on which they will execute
      • commonly used TextInputFormat generates an input split per block
    • getReader()
      • provides a reader to map tasks
      • could be overridden
  • RecordReader class
    • reads the data blocks, returns key-value records
    • implementations: text delimited, SequenceFile, Avro, Parquet, etc.
  • Mapper.setup()
    • Configuration object
  • Mapper.map()
    • inputs: key, value, and a context
    • output data would be buffered and sorted, io.sort.mb
  • Partitioner
    • default, key hashed
    • custom partitioner
      • ex. secondary sort
        • key as ticker-time for sorted, partitioner on ticker symbol
  • Mapper.cleanup()
    • flies closing, logging message, etc.
  • Combiner.combine()
    • aggregate locally
    • output has to be identical format with map()
    • could assumes the input is sorted

Reducer

  • Shuffle
    • copy the output of the mappers from the map nodes to the reduce nodes
  • Reducer.setup()
    • initialize variables and file handles
  • Reducer.reduce()
    • sorted key
    • input with values
    • a key and all its values will never be split across more than one reducer
      • skewness, review partitioning
    • output to outputFileFormat
  • Reducer.cleanup()
    • flies closing, logging message, etc.
  • OutputFormat
    • a single reducer will always write a single file
      • ex. part-r-00000

Join

Reference: https://www.edureka.co/blog/mapreduce-example-reduce-side-join/

Map-side Join

The join operation is performed in the map phase itself. Therefore, in the map side join, the mapper performs the join and it is mandatory that the input to each map is partitioned and sorted according to the keys.

Reduce-side Join
  • Mapper reads the input data which are to be combined based on common column or join key.
  • The mapper processes the input and adds a tag to the input to distinguish the input belonging from different sources or data sets or databases.
  • The mapper outputs the intermediate key-value pair where the key is nothing but the join key.
  • After the sorting and shuffling phase, a key and the list of values is generated for the reducer.
  • Now, the reducer joins the values present in the list with the key to give the final aggregated output.

When to Use MapReduce

  • is a very low-level framework
  • for experienced Java developers who are comfortable with the MapReduce programming paradigm
  • where detailed control of the execution has significant advantages

Spark

Overview

  • In 2009 Matei Zaharia and his team at UC Berkeley’s AMPLab researched possible improvements to the MapReduce framework.
  • Improves on
    • iterative machine learning
    • interactive data analysis
    • reusing a data set cached in memory for multiple processing tasks
    • DAG model (directed acyclic graphs)
      • ex. Oozie for MapReduce
  • reference books

Spark Components

  • Driver
    • main function to define the RDD (resilient distributed datasets) and their transformations / actions
  • Dag Scheduler
    • optimize the code and arrive an efficient DAG
  • Task Scheduler
    • cluster manager: YARN, Mesos, etc. has info
      • workers
      • assigned threads
      • location of data blocks
      • assigning tasks to workers
  • Worker
    • receives work and data
    • executes task without knowledge of the entire DAG

Basic Spark Concepts

RDD (RESILIENT DISTRIBUTED DATASETS)
  • RDDs are collections of serializable elements, and such a collection may be partitioned, in which case it is stored on multiple nodes
  • Spark determines the number of partitions by the input format
  • RDDs store their lineage — the set of transformations that was used to create the current state, starting from the first input format that was used to create the RDD
  • If the data is lost, Spark will replay the lineage to rebuild the lost RDDs so the job can continue
  • Spark would replay the “Good Replay” boxes and the “Lost Block” boxes to get the data needed to execute the final step
SHARED VARIABLES
  • broadcast variables
  • accumulator variables
SPARKCONTEXT
  • represents the connection to a Spark cluster
  • used to create RDDs, broadcast data, and initialize accumulators
TRANSFORMATIONS
  • transformations take one RDD and return another RDD
  • RDDs are immutable
  • transformations in Spark are always lazy
  • calling a transformation function only creates a new RDD with this specific lineage
  • transformations is only executed when an action is called
    • allows optimize the execution graph

Some core transformations:

  • map()
  • filter()
  • keyBy()
  • join()
  • groupByKey()
  • sort()
ACTION
  • take an RDD, perform a computation, and return the result to the driver application
  • result of the computation can be a collection, values printed to the screen, values saved to file, or similar
  • an action will never return an RDD

Benefits of Using Spark

SIMPLICITY
  • simpler than those of MapReduce
VERSATILITY
  • extensible, general-purpose parallel processing framework
  • support a stream-processing framework called Spark Streaming
  • a graph processing engine called GraphX
REDUCED DISK I/O
  • Spark’s RDDs can be stored in memory and processed in multiple steps or iterations without additional I/O
STORAGE
  • the developer controls the persistence
MULTILANGUAGE
  • Spark APIs are implemented for Java, Scala, and Python
RESOURCE MANAGER INDEPENDENCE
  • Spark supports YARN, Mesos, & Kubernetes
INTERACTIVE SHELL
  • REPL (read-eval-print loop)
APACHE TEZ: AN ADDITIONAL DAG-BASED PROCESSING FRAMEWORK
  • Tez is a framework that allows for expressing complex DAGs for processing data
  • the architecture of Tez is intended to provide performance improvements and better resource management than MapReduce

Abstraction

  • ETL Model: Pig, Crunch, and Cascading
  • Query Model: Hive

Apache Pig

  • developed at Yahoo, and released to Apache in 2007
  • Pig-specific workflow language, Pig Latin
  • compiled into a logical plan and then into a physical plan
  • Data container
    • relations, bag, tuples
  • Transformation functions
    • no execution is done until the STORE command is called - nothing is done until the saveToTextFile is called
  • DESCRIBE and EXPLAIN
  • support UDFs
  • CLI to access HDFS

Apache Crunch

  • based on Google’s FlumeJava
  • in Java
  • full access to all MapReduce functionality
  • separation of business logic from integration logic
  • Pipeline object
  • actual execution of a Crunch pipeline occurs with a call to the done() method
  • MRPipeline, SparkPipeline, PCollection, PTable

Cascading

  • in Java
  • like Crunch, full access to all MapReduce functionality
  • like Crunch, separation of business logic from integration logic

Hive

Overview

  • SQL on Hadoop
  • cornerstone of newer SQL implementations
    • Impala, Presto, Spark SQL, Apache Drill
  • biggest drawback, performance, due to MapReduce execution engine
    • addressed by
      • Hive-on-Tez, from 0.13.0
      • Hive-on-Spark, HIVE-7292
      • Vectorized query execution, from 0.13.0, supports on ORC and Parquet
  • Hive Metastore, becomes the standard for metadata management and sharing among different systems

Hive Architecture

  • In CREATE TABLE
    • external table, underlying data remains intact while table deletion
    • storage format declarartion
  • ANALYZE STATISTICS
    • ANALYZE TABLE foo COMPUTE STATISTICS;
    • hive.stats.autogater, default true, but only triggered by INSERT
    • import or moving still need explicit ANALYZE command
  • optimized join
    • available in newer version only
    • hive.auto.convert.join
      • map join
      • bucketed join
      • sorted bucketed merge join
      • regular join
  • SQL is great for query, but not for
    • machine learning, text processing, graph algorithms
  • should always reviewing under the hood, ex. by EXPLAIN

When to Use Hive

  • Hive Metastore
  • SQL
  • Pluggable
    • custom data format, serialization / deserialization
    • execution engine, MapReduce, Tez, Spark
  • Batch processing
  • Fault-tolerant
  • Feature-rich
    • nested types

Impala

  • 2012, Google had published F1 and Dremel
  • Impala was inspired by Dremel
  • massively parallel processing (MPP) data warehouses
    • such as Netezza, Greenplum, and Teradata
  • delivers query latency and concurrency
    • significantly lower than that of Hive running on MapReduce
  • uses Hive SQL dialect and Hive Metastore
  • supports both HDFS and HBase as data sources, like Hive
  • supports the popular data formats
    • delimited text, SequenceFiles, Avro, and Parquet

Overview

  • shared nothing architecture
  • Impala daemons, impalad
    • running on each nodes, identical and interchangeable
    • responsible for
      • query planner
      • query coordinator
      • query execution engine
  • focus on the core functionality, executing queries as fast as possible
    • off-loaded data store to HDFS and HBase
    • off-loaded database and table management to Hive Metastore
  • distributed join strategies
    • broadcast hash joins
    • partitioned hash joins
  • query profiles
    • table scan rates
    • actual data sizes
    • amount of memory used
    • execution times

Impala Architecture

Speed-Oriented Design

  • in-memory processing
    • could spill to disk from 2.0 and later
    • minimum of 128GB to 256GB of RAM
    • not fault-tolerant, node lose will cause query failed
  • long running daemons
    • no startup cost
    • high concurrency
    • colocate for data locality
    • could be managed by YARN or Linux CGroups
  • efficient execution engine
    • implemented in C++
      • better advantage of vectorization, CPU instructions for text parsing, CRC32 computation, etc.
      • no JVM overhead
      • no Java GC latency
  • use of LLVM
    • Low Level Virtual Machine
    • compile the query to optimized machine code
    • machine code improves the efficiency of the code execution in the CPU by getting rid of the polymorphism
    • machine code generated uses optimizations available in modern CPUs (such as Sandy Bridge) to improve its I/O efficiency
    • the entire query and its functions are compiled into a single context of execution, Impala doesn’t have the same overhead of context switching because all function calls are inlined and there are no branches in the instruction pipeline, which makes execution even faster

When to Use Impala

  • much faster than Hive
  • compare to Hive
    • not fault-tolerant
    • not supports nested data types
    • not supports custom data format

Other Tools

Hadoop Application Architectures Ch.2 Data Movement

  • Ingestion
  • Extraction

Data Ingestion Considerations

Common data sources:

  • data management system such as relational databases
  • logs, event data
  • files from existing storage system

Considerations:

  • Timeliness of data ingestion and accessibility
  • Incremental updates
  • Data access and processing
  • Source system and data structure
  • Partitioning and splitting of data
  • Storage format
  • Data transformation

Timeliness of Data Ingestion

Timeliness classification:

  • Macro batch (15 mins -)
  • Microbatch (2 mins -)
  • Near-Real-Time Decision Support (2 secs -)
  • Near-Real-Time Event Processing (100 msecs -)
  • Real Time

System complexity, cost, disk or memory.

Incremental Updates

  • append
    • notice the small files problem, may require periodic process to merge small files
    • write to
  • update
    • HDFS
      • delta file and compaction job
      • only work for multi-minute timeliness intervals
    • HBase
      • milliseconds timeliness
      • 8 - 10 times slower scan (compare to HDFS)

Access Patterns

  • scan: HDFS (supports memory cache from Hadoop 2.3.0)
  • random access: HBase
  • search: Solr
Tool Use cases Storage device
MapReduce Large batch processes HDFS preferred
Hive Batch processing with SQL-like language HDFS preferred
Pig Batch processing with a data flow language HDFS preferred
Spark Fast interactive processing HDFS preferred
Giraph Batch graph processing HDFS preferred
Impala MPP style SQL HDFS is preferred for most cases
HBase API Atomic puts, gets, and deletes on record-level data HBase

Original Source System and Data Structure

READ SPEED OF THE DEVICES ON SOURCE SYSTEMS

Disk I/O: often a major bottleneck in any processing pipeline.
Generally, with Hadoop we’ll see read speeds of anywhere from 20 MBps to 100 MBps, and there are limitations on the motherboard or controller for reading from all the disks on the system.
To maximize read speeds, make sure to take advantage of as many disks as possible on the source system.
On a typical drive three threads is normally required to maximize throughput, although this number will vary.

  • to use as many as multiple disks
  • multi-threads
ORIGINAL FILE TYPE
COMPRESSION
RELATIONAL DATABASE MANAGEMENT SYSTEMS

Apache Sqoop: a very rich tool with lots of options, but at the same time it is simple and easy to learn

  • batch process: slow timeliness
    • Sqoop
  • data pipeline split: one to RDBMS, one to HDFS
    • Flume or Kafka
  • network limited: edge node
    • RDBMS file dump
STREAMING DATA
LOGFILES
  • anti-pattern: read the logfiles from disk as they are written
    • because this is almost impossible to implement without losing data
  • recommend: Flume or Kafka

Transformations

Options:

  • Transformation
  • Partitioning
  • Splitting

For timeliness,

  • batch transformation
    • Hive, Pig, MapReduce, Spark
    • checkpoint for failure
    • all-or-nothing
  • streaming ingestion
    • Flume
      • interceptors
        • notice the performance issue, external call, GC, etc.
      • selectors
        • decide which of the roads of event data will go down

Network Bottlenecks

  • increase bandwidth
  • compress data

Network Security

  • encrypt with OpenSSL

Push or Pull

requirements:

  • Keeping track of what has been sent
  • Handling retries or failover options in case of failure
  • Being respectful of the source system that data is being ingested from
  • Access and security
SQOOP

a pull solution, requires

  • connection information for the source database
  • one or more tables to extract
  • ensure at a defined extraction rate
  • scheduled to not interfere with the source system’s peak load time
FLUME
  • Log4J appender
    • pushing events through a pipeline
  • spooling directory source or the JMS source
    • events are being pulled

Failure Handling

Failure scenarios need to be documented, including failure delay expectations and how data loss will be handled.

Level of Complexity

ease of use, ex. HDFS CLI, FUSE, or NFS

Data Ingestion Options

In here,

  • File transfer
  • Sqoop
  • Flume
  • Kafka

File Transfers

Hadoop client’s hadoop fs -put and hadoop fs -get

  • simplest
  • all-or-nothing batch processing
  • single-threaded
  • no transformations
  • any file types
HDFS CLIENT COMMANDS
  • configurable replicas, common default 3
  • checksum file accompanies each block
  • double-hop from a edge node due to some network policy
MOUNTABLE HDFS
  • allow to use common filesystem commands
  • not full POSIX semantic
  • not support random write
  • potential misuse: small files problem
    • mitigated by Solr for indexing, HBase, container format as Avro

implementations example

  • Fuse-DFS
    • no need to modify the Linux kernel
    • multiple hops, poor consistency model
    • not yet production ready
  • NFSv3
    • scaling by Hadoop NFS Gateway nodes
    • recommended to use only in small, manual data transfers

Considerations

  • single sink or multiple sinks
  • reliability or not
  • transformation or not

Sqoop: Batch Transfer Between Hadoop and Relational Databases

When used for importing data into Hadoop, Sqoop generates map-only MapReduce jobs where each mapper connects to the database using a Java database connectivity (JDBC) driver, selects a portion of the table to be imported, and writes the data to HDFS.

CHOOSING A SPLIT-BY COLUMN
  • split-by
  • num-mappers
USING DATABASE-SPECIFIC CONNECTORS WHENEVER AVAILABLE
USING THE GOLDILOCKS METHOD OF SQOOP PERFORMANCE TUNING
  • start with a very low number of mappers
  • gradually increase it to achieve a balance
LOADING MANY TABLES IN PARALLEL WITH FAIR SCHEDULER THROTTLING
  • Load the tables sequentially

  • Load the tables in parallel (Fair Scheduler)

DIAGNOSING BOTTLENECKS
  • Network bandwidth
    • likely to be either 1 GbE or 10 GbE (120 MBps or 1.2 GBps)
  • RDBMS
    • review the query generated by the mappers
    • in Sqoop incremental mode
      • make sure using index
    • ingest entire table
      • full table scans are typically preferred
    • monitor the database
    • schedule the execution time
  • Data skew
    • default splitting range evenly with min to max of split_by column values
    • choose --split-by
    • define --boundary-query
  • Connector
    • RDBMS-specific connector is preferred
  • Hadoop
    • check disk I/O, CPU utilization, and swapping on the DataNodes where the mappers are running
  • Inefficient access path
    • incredibly important the split column is either the partition key or has an index
    • if no such column, then use only one mapper
KEEPING HADOOP UPDATED
  • small table
    • just overwrite it
  • large table
    • delta
      • Incremental Sequence ID
      • Timestamp
    • write to a new directory
      • check {output_dir}/_SUCCESS
    • compaction with sqoop-merge
      • sorted and partitioned data set could be optimized in merge, map-only job

Flume: Event-Based Data Collection and Processing

FLUME ARCHITECTURE

[Flume Components]

Main components inside of the Flume agent JVM:

  • Sources
    • consume events from external sources and forward to channels
    • including AvroSource, SpoolDirectorySource, HTTPSource, and JMSSource
  • Interceptors
    • allow events to be intercepted and modified in flight
    • anything that can be implemented in a Java class
    • formatting, partitioning, filtering, splitting, validating, or applying metadata to events
  • Selectors
    • routing for events
    • fork to multiple channels, or send to a specific channel based on the event
  • Channels
    • store events until they’re consumed by a sink
    • memory channel, file channel, balancing performance with durability
  • Sinks
    • remove events from a channel and deliver to a destination
FLUME PATTERNS
Fan-in

Agents on Hadoop edge nodes.

Splitting data on ingest

Backup HDFS cluster for disaster recovery (DR).

Partitioning data on ingest

Ex. partition events by timestamp.

Splitting events for streaming analytics

Send to streaming such as Storm or Spark Streaming. (point a Flume Avro sink to Spark Streaming’s Flume Stream)

FILE FORMATS
  • Text files
    • with container format SequenceFiles or Avro
    • Avro is preferred because
      • it stores schema as part of the file,
      • and also compresses more efficiently.
      • providing better failure handling.
  • Columnar formats
    • RCFile, ORC, or Parquet are also not well suited for Flume
    • more data lose risk due to batch processing
  • Customized event serializers
    • override the EventSerializer interface to apply your own logic and create a custom output format
RECOMMENDATIONS
Flume sources
  • Batch size
    • notice of the network latency for sending acknowledge
    • start from 1,000
  • Threads
    • pushing source: add more clients or client threads
    • pulling source: configure more sources in the agent
Flume sinks
  • Number of sinks
    • channel to sink, is one-to-many
    • sink is single thread
    • limitation with more sinks should be the network or the CPU
  • Batch Sizes
    • overhead of an fsync system call
    • only big downside to large batches with a sink is an increased risk of duplicate events
    • balance between throughput and potential duplicates
Flume interceptors
  • capability to take an event or group of events and modify them, filter them, or split them
  • custom code comes with risk of issues like memory leaks or consuming excessive CPU
Flume memory channels
  • if performance is your primary consideration, and data loss is not an issue
  • better for streaming analytics sink
Flume file channels
  • it’s more durable than the memory channel
  • to use multiple disks
  • if using multiple file channels, use distinct directories, and preferably separate disks, for each channel
  • use dual checkpoint directories
  • better for persistent sink
JDBC channels
  • persists events to any JDBC-compliant data store
  • most durable channel, but also the least performant
Sizing Channels
  • Memory channels
    • can be fed by multiple sources
    • can be fetched from by multiple sinks
    • so for a pipeline, one channel in a node usually is enough
  • Channel size
    • large memory channel could have garbage collection activity that could slow down the whole agent
FINDING FLUME BOTTLENECKS
  • Latency between nodes
    • batch size or more threads
  • Throughput between nodes
    • data compression
  • Number of threads
  • Number of sinks
  • Channel
  • Garbage collection issues

Kafka

  • producers, brokers, consumers
  • topic, partition, offset

Large number of partitions:

  • Each partition can be consumed by at most one consumer from the same group
  • Therefore, we recommend at least as many partitions per node as there are servers in the cluster, possibly planning for a few years of growth.
  • There are no real downsides to having a few hundred partitions per topic.
KAFKA FAULT TOLERANCE
  • replica
  • leader, followers

Producer acknowledge:

  • all synchronized replicas
  • leader only
  • asynchronized

Consumer only read committed (all synchronized) messages.

Supported semantics,

  • at least once
    • consumer advances the offset after processing the messages
  • at most once
    • consumer advances the offset before processing the messages
  • exactly once
    • consumer advances the offset, and processes the messages at the same time with two-phase commits

Multiple data centers deployment.

KAFKA AND HADOOP
Kafka Flume
Hadoop ingest solution less more
Required code writing yes no
Fault tolerant higher lower
Performance higher lower
Flume with Kafka
  • Kafka source
    • consumer, reads data from Kafka and sends it to the Flume channel
    • adding multiple sources with the same groupId for load balancing and high availability
    • batch size tuning
  • Kafka sink
    • producer, sends data from a Flume channel to Kafka
    • batch size tuning
  • Kafka channel
    • combines a producer and a consumer
    • each batch will be sent to a separate Kafka partition, so the writes will be load-balanced
Camus
  • ingesting data from Kafka to HDFS
  • automatic discovery of Kafka topics from ZooKeeper
  • conversion of messages to Avro and Avro schema management
  • automatic partitioning
  • all-or-nothing batch processing
  • need to write decoder to convert Kafka messages to Avro

Data Extraction

  • Moving data from Hadoop to an RDBMS or data warehouse
    • In most cases, Sqoop will be the appropriate choice for ingesting the transformed data into the target database.
  • Moving data between Hadoop clusters
    • DistCp uses MapReduce to perform parallel transfers of large volumes of data.
    • DistCp is also suitable when either the source or target is a non-HDFS filesystem—for example, an increasingly common need is to move data into a cloud-based system, such as Amazon’s Simple Storage System (S3).

Hadoop SequenceFile Sync Marker

From Hadoop wiki,

The sync marker permits seeking to a random point in a file and then re-synchronizing input with record boundaries. This is required to be able to efficiently split large files for MapReduce processing.

But what it actually marks? And how it could be used in “seeking”?

Here is my starting investigation.

The code piece for sync marker generation.

1
2
3
4
5
6
7
public static class Writer implements java.io.Closeable, Syncable {
...
MessageDigest digester = MessageDigest.getInstance("MD5");
long time = Time.now();
digester.update((new UID()+"@"+time).getBytes(StandardCharsets.UTF_8));
sync = digester.digest();
...

code

and the piece for insertion.

1
2
3
4
5
6
7
public void sync() throws IOException {
if (sync != null && lastSyncPos != out.getPos()) {
out.writeInt(SYNC_ESCAPE); // mark the start of the sync
out.write(sync); // write sync
lastSyncPos = out.getPos(); // update lastSyncPos
}
}

code

From these codes, the sync marker seems just being generated in the “Writer” initialization once, and write into the file header and the output while the output buffer full over a certain size.

  • In Writer & RecordCompressWriter: refer to the SYNC_INTERVAL
    • refer to this commit, it has been changed from 100 * SYNC_SIZE to 5 * 1024 * SYNC_SIZE
  • In BlobkCompressWriter: refer to IO_SEQFILE_COMPRESS_BLOCKSIZE_KEY/DEFAULT (default: 1,000,000)
1
2
3
4
5
6
7
8
9
/**
* @see
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
* core-default.xml</a>
*/
public static final String IO_SEQFILE_COMPRESS_BLOCKSIZE_KEY =
"io.seqfile.compress.blocksize";
/** Default value for IO_SEQFILE_COMPRESS_BLOCKSIZE_KEY */
public static final int IO_SEQFILE_COMPRESS_BLOCKSIZE_DEFAULT = 1000000;

code

1
2
3
4
5
/**
* The number of bytes between sync points. 100 KB, default.
* Computed as 5 KB * 20 = 100 KB
*/
public static final int SYNC_INTERVAL = 5 * 1024 * SYNC_SIZE; // 5KB*(16+4)

code

Then, in the reading part, the sync marker will be read in the Reader init.

code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/** Seek to the next sync mark past a given position.*/
public synchronized void sync(long position) throws IOException {
if (position+SYNC_SIZE >= end) {
seek(end);
return;
}

if (position < headerEnd) {
// seek directly to first record
in.seek(headerEnd);
// note the sync marker "seen" in the header
syncSeen = true;
return;
}

try {
seek(position+4); // skip escape
in.readFully(syncCheck);
int syncLen = sync.length;
for (int i = 0; in.getPos() < end; i++) {
int j = 0;
for (; j < syncLen; j++) {
if (sync[j] != syncCheck[(i+j)%syncLen])
break;
}
if (j == syncLen) {
in.seek(in.getPos() - SYNC_SIZE); // position before sync
return;
}
syncCheck[i%syncLen] = in.readByte();
}
} catch (ChecksumException e) { // checksum failure
handleChecksumException(e);
}
}

code

Conclusion

  • This sync marker allows the seeking operation to align to records or blocks boundary.
  • But it relies on an existing seeking operation, which is implemented in Seekable.seek().
  • Next question, “How is the seek implemented among a distributed file”.

References

Hadoop Application Architectures Ch.1 Data Modeling in Hadoop

The power of context in Hadoop: “Schema-on-Read”, compares to “Schema-on-Write”:

  • the structure imposed at processing time based on the requirements
  • shorter cycles of analysis, data modeling, ETL, testing, etc. before data can be processed
  • agility on schema revolutions

Considerations perspectives of storing:

  • Data storage formats
  • Multitenancy
  • Schema Design
  • Metadata Management

Beyond the scope:

Data Storage Options

  • File format
  • Compression
  • Data storage system

Standard File Formats

Text data

  • ex. server logs, emails, CSV files
  • with “splittable” compression, for parallel processing
    • container format: SequenceFiles, Avro

Structured text data

  • ex. XML, JSON
  • challenging to make XML or JSON splittable
    • using container format such as Avro
    • XMLLoader in PiggyBank library
    • LzoJaonInputFormat in Elephant Bird project

Binary data

  • ex. images
  • in most of cases, container format is preferred
  • in the cases the binary data is larger than a certain size, ex. 64MB, consider not using container format.

Hadoop File Types

Important characteristics:

  • Splittable Compression
    • parallel processing
    • data locality
  • Agnostic Compression
    • codec in header metadata

File-based data structures

  • ex. SequenceFiles, MapFiles, SetFiles, ArrayFiles, and BloomMapFiles
  • MapReduce specific
  • SequenceFiles
    • most common
    • binary key-value pair
    • formats:
      • uncompressed
      • record-compressed (single record)
      • block-compressed (batch, “not” HDFS block)
    • sync maker
      • to allow for seeking

Serialization Formats

byte stream <=> data structures

Term:

  • IDL (Interface Definition Language)
  • RPC (Remote Procedure Calls)
Format Summary Limitation
Writables - simple, efficient, serializable Only in Hadoop & Java
Thrift - language-natrual
- by Facebook
- use IDL
-robust RPC
- no internal compression of records
- not splittable
- not native MapReduce support
(addressed by Elephant Bird)
Protocol Buffers - language-natrual
- by Google
- use IDL, stub code generation
same as Thrift
Avro - language-natrual
- optional IDL: JSON, C-like
- native support for MapReduce
- compressible: Snappy, Deflate
- splittable: sync marker
- self-decribing: schema in each file header’s metadata

Additional refer: http://blog.maxkit.com.tw/2017/10/thrift-protobuf-avro.html

Columnar Formats

  • skip I/O and decompression
  • efficient columnar compression rate

Term:

  • RCFile (Record Columnar File)
  • ORC (Optimized Row Columnar)
  • RLE (bit-packaging/run length encoding)
Format Summary Limitation
RCFile column-oriented storage within each row splits has some deficiencies that prevent optimal performance for query times and compression
(what’s this exactly?)
ORC - lightweight, always-on compression
- zlib, LZO, Snappy
- predicate push down
- Hive type, including decimal, complex
- splittable
- only designed for Hive, not general purpose
Parquet - per-column level compression
- support nested data structure
- full metadata, self-documenting
- fully support Avro, Thrift API
- efficient and extensible encoding schemas, RLE
Avro and Parquet
  • single interface: recommended if you are choosing for the single interface
  • compatibility: Parquet can be read and written to with Avro APIs and Avro schemas

Compression

  • disk & network I/O
  • source & intermediate data
  • trade with CPU loading
  • splittability for parallelism and data locality
Format Summary Limitation
Snappy - developed at Google
- high speed and reasonable compression rate
- not inherently splittable
- intended to be used with a container format
LZO - very efficient decompression
- splittable
- requires additional indexing
- requires a separated installation from Hadoop because of license prevention
Gzip - good compression rate, 2.5x to Snappy
- read almost as fast as Snappy
- write speed about half to Snappy
- not splittable
- fewer blocks might lead to lower parallelism => using smaller blocks
bzip2 - excellent compression rate, 9% better than Gzip - slow read / write, 10x slower than Gzip
- only used in archival purposes

Compression Recommendation

  • Enable compression of the MapReduce intermediate data
  • Compress on columnar chunks
  • With splittable container formats, ex. Avro or SequenceFiles, make the compression & decompression could be processed individually

Compression Block

HDFS Schema Design

Standard directory structure:

  • Easier to share data sets between teams
  • Allows access and quota controls
  • “Stage” data during process pipeline
  • Tool conventions compliant

Location of HDFS Files

  • /user/<username>
  • /etl
    • ex. /etl/<group>/<application>/<process>/{input,processing,output,bad}
  • /tmp
  • /data
  • /app
    • ex. /app/<group>/<application>/<version>/<artifact directory>/<artifact>
  • /metadata

Advanced HDFS Schema Design

PARTITIONING

Unlike traditional data warehouses, however, HDFS doesn’t store indexes on the data. This lack of indexes plays a large role in speeding up data ingest, but it also means that every query will have to read the entire data set even when you’re processing only a small subset of the data (a pattern called full table scan).

  • Main purpose: reduce the amount of I/O required
  • Common pattern: <data set name>/<partition_column_name=partition_column_value>/{files}
  • Understood by: HCatalog, Hive, Impala, Pig, etc.

BUCKETING

Not always the key is good for partitioning. ex. physician, may result in too many partitions and too small in file size.

small files problem:

Storing a large number of small files in Hadoop can lead to excessive memory use for the NameNode, since metadata for each file stored in HDFS is held in memory. Also, many small files can lead to many processing tasks, causing excessive overhead in processing.

Bucketing is the solution,

  • be able to control the size of the data subsets
  • good average bucket size is a few multiples of the HDFS block size
  • having an even distribution of data when hashed on the bucketing column is important because it leads to consistent bucketing
  • having the number of buckets as a power of two is quite common

joining with bucketing

  • reduce-side join
    • if for two data sets, both are bucketed on the join key
    • and the number of buckets is factor and multiple
    • could be done by bucket individually join, to save reduce-side join complexity
  • map-side join
    • if the bucket size can be fit into memory, map-side join can further improve performance
  • merge join
    • if the data in the buckets is sorted, it is also possible to use a merge join
    • requires less memory

Based on common query patterns,

  • decide partitioning and bucketing
  • for multiple patterns, consider to have multiple store
  • trade space to query speed

DENORMALIZING

In relational databases, data is often stored in third normal form. In Hadoop, however, joins are often the slowest operations and consume the most resources from the cluster.

  • prejoined, preaggregated
  • consolidates many of the small dimension tables into a few larger dimensions
  • data preprocessing, like aggregation or data type conversion, Materialized Views

HBase Schema Design

Distributed key-value store which could operate,

  • put
  • get
  • iterate
  • value increment
  • delete

Row Key

RECORD RETRIEVAL
  • unlimited columns
  • single key
    • may need to combine multiple pieces of information in a single key
  • get single record is the fastest
    • put most common uses of the data into a single get
    • denormalized
    • very “wide” table
DISTRIBUTION

Row key determines scattering throughout various regions.
So, it’s usually best to choose row keys so the load on the cluster is fairly distributed.

  • Anti-pattern: use a timestamp for row keys
    • easy to hit into single region and defeats the parallelism
BLOCK CACHE

HBase block in chunks of default 64 KB with least recently used (LRU) cache.

  • Anti-pattern: row key by hash of some attribute
    • records in the same block could be “un-relevance”, and to reduce the cache hit rate
ABILITY TO SCAN

A wise selection of row key can be used to co-locate related records in the same region.

  • HBase scan rates are about eight times slower than HDFS scan rates.
SIZE

Trade-off:

  • shorter row keys: lower storage overhead and faster read/write performance
  • longer row keys: better get/scan properties
READABILITY

Recommend to use readable prefix.

  • easier to identify and debug issues
  • easier to use the HBase console
UNIQUENESS

Require to be unique key.

Timestamp

timestamp’s important purposes:

  • determines newer record put
  • determines the order when multiple versions are requested
  • determines to remove while time-to-live (TTL)

Hops

Hops: the number of synchronized get requests required to retrieve the requested info

  • best to avoid them through better schema design. ex, by leveraging denormalization.
  • every hop is a round-trip to HBase that incurs a significant performance overhead

Tables and Regions

Region Table Topology

  • one region server per node
  • multiple regions pre region server
  • for a region, it’s pinned to a region server at a time
  • tables are split into regions and scattered across region servers

The number of regions for a table is a trade-off between put performance and compaction time.

Put performance

memstore:

  • cache structure present on every HBase region server
  • wrtie => cahce => sort => flush
  • more regions in a region server => less memstore space pre region => smaller flush & HFiles => less performant
  • ideal flush size: 100 MB
Compaction time

region size limit: 20GB (default) - 120GB

region assignment:

  • auto splitting
    • forever-growing data set, only update most recent data, with periodic TTL-based compaction, no need to compact the ole regions
  • assign the region number
    • recommended in most of cases
    • set the region size to a high enough value (e.g., 100 GB per region) to avoid autosplitting
    • split policy selected, ConstantSizeRegionSplitPolicy or DisabledRegionSplitPolicy

Using Columns

Two different schema structures:

  • Physical Columns

    RowKey TimeStamp Column Value
    101 1395531114 F A1
    101 1395531114 B B1
  • Combined Logical Columns

    RowKey TimeStamp Column Value
    101 1395531114 X A1|B1

Considerations:

  • dependency on read, write, TTL
  • number of records can fit in the block cache
  • amount of data can fit through the WAL
  • number of records can fit into the memstore
  • compaction time

Using Column Families

column families: a column family is essentially a container for columns, each column family has its own set of HFiles and gets compacted independently of other column families in the same table.

Use case: the get/put rate of the subset of columns are significant different, separate them to different culomn families would be beneficial of

  • lower compaction cost (by put)
  • better use of block cache (by get)

Time-to-Live

TTL: built-in feature of HBase that ages out data based on its timestamp

  • ignore outdated records during the major compaction
  • the HFile record timestamp will be used
  • if TTL not used, but delete records manually, it’d require full scan and insert the “delete records” (could be TBs), and also need the major compaction eventually

Managing Metadata

What Is Metadata?

In general, refers to data about the data.

  • about logical data sets, usually stored in a separate metadata repository
    • location
      • dir path in HDFS
      • table name in HBase
    • schema
    • partitioning and sorting properties
    • format
  • about files on HDFS, usually stored and managed by Hadoop NameNode
    • permissions and ownership
    • location of various blocks of that file on data nodes
  • about tables in HBase, stored and managed by HBase itself
    • table names
    • associated namespace
    • associated attributes (e.g., MAX_FILESIZE, READONLY, etc.)
    • names of column families
  • about data ingest and transformations
    • which user generated a given data set
    • where the data set came from
    • how long it took to generate it
    • how many records there are
    • the size of the data loaded
  • about data set statistics, useful for various tools that can leverage it for optimizing their execution plans but also for data analysts, who can do quick analysis based on it
    • the number of rows in a data set
    • the number of unique values in each column
    • a histogram of the distribution of data
    • maximum and minimum values

Why Care About Metadata?

It allows to,

  • interact with higher-level logical abstraction
  • supply information that can then be leveraged by various tools
  • data management tools to “hook” into this metadata and allow you to perform data discovery and lineage analysis

Where to Store Metadata?

  • Hive metastore (database & service)
    • deployed mode
      • embedded metastore
      • local metastore
      • remote metastore
        • MySQL (most common), PostgreSQL, Derby, and Oracle
    • could be used by Hive, Impala seamless
  • HCatalog
    • WebHCat REST API
      • could be used for MapReduce, Pig, and standalone applications
    • Java API
      • could be used for MapReduce, Spark, or Cascading
    • CLI

Limitations of the Hive Metastore and HCatalog

  • High availability
    • HA for metastore database
    • HA for metastore service
      • concurrency issue unresolved, HIVE-4759
  • Fixed schema
    • only for tabular abstraction data sets
    • ex. not for image or video data sets
  • Additional dependency
    • the metastore database itself is just another dependent component

Other Ways of Storing Metadata

  • Embedded in HDFS paths
    • partitioned data sets
    • <data set name>/<partition_column_name=partition_column_value>/{files}
  • Store in HDFS
    • maintain & manage by your own
      1
      2
      3
      /data/event_log
      /data/event_log/file1.avro
      /data/event_log/.metadata
    • Kite SDK
      • supports multiple metadata providers
      • allows easily transform metadata from one source (say HCatalog) to another (say the .metadata directory in HDFS)

How to Setup My Develop Environment

How to Setup My Develop Environment

The following settings are based on Ubuntu 14.04 LTS.

How to Setup GitHub Client

Follow this official document to setup GitHub auth key.

How to Setup My rc Files

Follow my dotfile‘s document to setup my .bashrc, .inputrc, and .vimrc.

Where the term, “rc”, originated from could found in here.

How to Setup Python pyenv

Follow this document to install required libraries:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ sudo apt-get install -y \
make \
build-essential \
libssl-dev \
zlib1g-dev \
libbz2-dev \
libreadline-dev \
libsqlite3-dev \
wget \
curl \
llvm \
libncurses5-dev \
libncursesw5-dev \
xz-utils \
tk-dev

Then by this document to install pyenv:

1
2
3
4
5
6
7
8
9
10
$ curl -L https://raw.githubusercontent.com/pyenv/pyenv-installer/master/bin/pyenv-installer | bash

...

# Load pyenv automatically by adding
# the following to ~/.bash_profile:

export PATH="/home/weasellin/.pyenv/bin:$PATH"
eval "$(pyenv init -)"
eval "$(pyenv virtualenv-init -)"

Install some python versions to pyenv:

1
2
3
4
5
6
7
8
9
10
$ pyenv install --list
...
$ pyenv install 2.7.8
$ pyenv install 3.6.0
$ pyenv install pypy3-2.4.0
$ pyenv versions
* system (set by /home/weasellin/.pyenv/version)
2.7.8
3.6.0
pypy3-2.4.0

Create virtual env from installed versions:

1
2
3
$ pyenv virtualenv 2.7.8 myenv-2.7.8
$ pyenv virtualenv 3.6.0 myenv-3.6.0
$ pyenv virtualenv pypy3-2.4.0 myenv-pypy3-2.4.0

Activate & deactivate virtual env:

1
2
$ pyenv activate myenv-3.6.0
$ pyenv deactivate

How to Setup Docker

Follow this official document to install Docker CE.
Then be sure to follow the post-install steps to allow non-super-user to use docker.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
$ sudo add-apt-repository \
"deb [arch=amd64] https://download.docker.com/linux/ubuntu \
$$(lsb_release -cs) \
stable"
$ sudo apt-get update
$ sudo apt-get install docker-ce
$ sudo gpasswd -a ${USER} docker
$ newgrp docker
$ docker run hello-world
========================
Hello from Docker!
This message shows that your installation appears to be working correctly.
......