Keywords

1 Introduction to Big Data

Big Data has changed the way we manage, analyze, and leverage data across all industry sectors. Big Data has the potential to examine and reveal trends, find unseen patterns, discover hidden correlations, reveal new information, extract insight, enhance decision making and automation, etc. Managing and analyzing data, in particular in the era of IoT, has always been one of the greatest challenges within organizations across industries. Finding an efficient and scalable approach to capturing, integrating, organizing, and analyzing information about IoT devices, products, and services can be a perplexing task for any organization regardless of the size or line of business. In the age of the Internet and digital transformation, the notion of Big Data reflects the changing world we live in. Everywhere around the world, more and more data is captured and recorded every day. Companies and organizations are becoming overwhelmed by the complexity and sheer volume of their data. While some data is still structured and stored in a traditional relational databases or data warehouses, a vast majority of the modern data sources are producing unstructured data including documents, conversations, pictures, videos, Tweets, posts, Snapchats, sensor readouts, click streams, and machine-to-machine data. Further, the availability and adoption of newer, more powerful mobile devices, coupled with ubiquitous access to global networks, is continuously driving the creation of new sources for data.

Although each data source can be independently managed and searched, the challenge today is how to make sense of the intersection of all these different types of data. When large amounts of data are coming from so many different forms, traditional data management techniques falter. While there has always been a challenging amount of data for existing IT infrastructure, the difference today is stark in terms of the sheer volume, speed, and complexity of the data. In parallel, the value of data is growing in terms of its volume and diversity. Data is emerging as the world’s newest resource for competitive advantage, as it enables efficient, data-driven decision making. As the value of data continues to grow, new technologies are emerging to support the new requirements surrounding it.

1.1 Defining Big Data

There has been much hype about Big Data in the past several years for various reasons. Before investigating the major drivers of Big Data’s popularity, we must first define the meaning of Big Data as a term. Dictionary.com defines Big Data as:

…data sets, typically consisting of billions or trillions of records, that are so vast and complex that they require new and powerful computational resources to process.

The British Dictionary defines Big Data as:

Data held in such large amounts that it can be difficult to process.

Big Data can be defined and often is described in terms of its four major characteristics, as shown in Fig. 6.1: volume, velocity, variety, and veracity:

Fig. 6.1
figure 1

4 Vs of Big Data

1.2 Volume

Volume indicates how much data has been collected. It is perhaps the most obvious characteristic of Big Data, as the current amount of data created is quite staggering. For example, in 1 minute on the Internet, Snapchat users share over half a million messages, YouTube viewers watch over 4 million videos, Twitter users post over four hundred thousand Tweets, and so on. All of these transactions can be archived for later consideration.

1.3 Velocity

Velocity refers to the speed at which data is being transacted. Streaming data can arrive in milliseconds and require a response within seconds or less. For example, Facebook’s data warehouse not only stores hundreds of petabytes of data but needs to accommodate data coming in at the rate of more than 600 terabytes per data per day. Similarly, Google processes more than 3.5 billion searches per day, which translates to a velocity of over 40,000 search queries per second.

1.4 Variety

The huge variety of the types and structures of data has become one of the critical challenges of Big Data. Big Data requires systems to handle not only structured data but also semi-structured and unstructured data as well. As described above, most Big Data is in unstructured forms such as audio, images, video files, social media updates, log files, click data, machine and sensor data, etc. Unfortunately, most analytics techniques have traditionally been focused on analyzing only structured data, so new techniques and approaches need to be developed. This fact alone helps explain why there are such a large growing number of start-ups in Big Data.

1.5 Veracity

Perhaps the most nuanced of the four Vs is veracity. Veracity describes how accurate and trustworthy data is in predicting business value through Big Data analytics. Uncertainty in data is typically due to inconsistency, incompleteness, latency ambiguities, approximations, etc. Data must be able to be verified based on both accuracy and context. It is necessary to identify the right amount of high-quality data that can be analyzed in order to impact business or outcomes.

These four definitions suggest that Big Data typically requires resources (computation and data infrastructure, tools, techniques, expertise, etc.) beyond the current capabilities of many organizations [1]. Big Data solutions comprise a set of analytical tools that are geared toward the fast and meaningful processing of large data sets. This is a key aspect of Big Data analytics, as the goal is to derive meaning or insight from data that can be used for making data-driven business decisions. Big Data can be thought of as a process that is used when traditional data processing and handling techniques alone cannot uncover the insights and meaning of the underlying data. Often times, real-time processing is needed for a massive amount of different types and frequencies of data in order to reveal patterns, trends, and associations. This is especially true when relating to human behavior and interactions. The goal for Big Data is to enable organizations to gather, store, manage, and manipulate vast amounts of data at the right speed and at the right time in order to gain the most valuable insights.

The data growth in the past decade has been unprecedented. There are 2.5 quintillion bytes of data created each day, and this will only increase more rapidly with the growth of the Internet of Things (IoT): an estimated 50 billion IoT devices will be connected by 2020 [2]. With IoT’s network of RFID tags, machines, appliances, smartphones, buildings, and many other devices with embedded technology that can be accessed over the Internet, estimates are projecting between 40 and 50 Zettabytes (or 270 bytes equaling 1,180,591,620,717,411,303,424 bytes) of data will be created by 2020. Over 92% of the data in the world was generated in the last 2 years alone. A large contributor to this skyrocketing data generation are Internet applications like Snapchat, YouTube, Twitter, and Instagram [3].

In order to convert the vast amount of available data into insight, it is important to consider the functional requirements for Big Data. Figure 6.2 illustrates a set of iterative steps in the Big Data functional requirements life cycle. Data first needs to be captured, then organized, and integrated. The integration process includes data cleaning and preparation. Once integrated, data can be analyzed in order to solve the business problems at hand. Often times this analysis includes developing predictive models utilizing Data Science approaches. Finally, the organization can act based on the outcome of the Big Data analysis, frequently via the implementation and utilization of the predictive models.

Fig. 6.2
figure 2

Big Data life cycle

2 Big Data Management and Computing Platforms

As the volume and velocity of data grows, so grows the need to manage and process it. Optimization, enabling the rapid formulation and testing of many diverse models and real-time operations, becomes essential, especially in the case of streaming data. Distributed and parallel processing approaches are well suited for these kinds of problems. Distributed processing typically segments large datasets, while parallel processing simultaneously processes all data or subsets of data. In order for modern systems to process Petabytes to Exabytes of data in a scalable and practical manner, a system must be able to sustain partial failure. Any Big Data processing system needs to continue processing in the face of failures without losing any data and should be able to recover failed components and then allow them to rejoin the process when ready. Additionally, failures during execution should not affect the final result for consistency purposes, and the addition of resources should automatically increase performance to enable seamless scalability. The Hadoop framework is able to satisfy all of these requirements and has become one of the leading components of the Big Data platforms for the last decade.

Hadoop is based on work done by Google in the early 2000s [4, 5]. More specifically, Hadoop leverages the Google File System’s (GFS) MapReduce concepts, taking a fundamentally new approach to distributed computing [6,7,8]. Hadoop meets the requirement for a low-cost, scalable, flexible, and fault-tolerant large-scale system, while enabling the shared nothing architecture and the use of applications written in high-level programming languages. The overall goal of Hadoop is to execute computation where the data is stored, instead of moving large amount of data to computational resources. Additionally, data is replicated multiple times on the system for increased availability and reliability.

The Apache Hadoop software library [9] is an open source framework that enables distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than relying on hardware to deliver high availability, the library itself is designed to detect and handle failures at the application layer. This enables highly available services on top of a commodity cluster where each machine may be prone to failures. Additional open source projects have been built around the original Hadoop implementation with the addition of Spark. Figure 6.3 depicts the basic Hadoop environment, which enables large scale data management and computing.

Fig. 6.3
figure 3

Hadoop ecosystem. (A list of all of the Apache projects can be found at https://hadoop.apache.org/)

2.1 Big Data System Architecture Components

This section examines in depth the key concepts required when considering implementation of Big Data, both from the technical and an analytics perspective. As data systems became larger in recent years, performance became an acute concern. An approach to efficiently solving a wide range of problems without needing to change the underlying environment was needed. This type of approach would have to take advantage of parallel processing of data and additional CPUs availability. The goal of any analyst or data scientist, regardless of the field of study, is to work with as much data as is available, build as many models as possible, and improve model training time and accuracy by simply adding additional CPUs. The ultimate goal of any Big Data system is to enable development of the best data analytics in a shortest amount of time. Finding the patterns hidden within a large, complex dataset requires processing, filtering, and analyzing massive amounts of data. Over time, several solutions have been proposed such as developing parallel data mining algorithms and parallel data source on parallel hardware.

From the technology perspective, Hadoop initially enabled the power of Big Data by providing large-scale computing sufficiently flexible and affordable storage, so that a wide variety of organizations could leverage Big Data technology. This is one of the reasons behind the synonymous use of the terms Big Data and Hadoop. However, the evolution of technologies has changed the high-tech landscape with a number of additional tools as discussed in the following sections.

2.2 Hadoop History

One of the pillars of the Big Data framework is Apache Hadoop. Hadoop is an Apache-managed software framework derived from MapReduce and Big Table. Hadoop is an Apache top-level project built and used by a global community of contributors and users. It is licensed under the Apache License Hadoop and was originally developed by Doug Cutting and Mike Cafarella in 2005 to support distribution for the Nutch search engine project at Yahoo [7]. Doug named the project after his then 2-year-old son’s toy elephant, hence the product’s current pachyderm logo.

Hadoop allows applications based on MapReduce to run on large clusters of commodity hardware. Hadoop is designed to parallelize data processing across computing nodes to speed computations and minimize latency. Two major components of Hadoop are the massively scalable distributed file system that can support petabytes of data and a massively scalable MapReduce engine that computes results in batch mode.

Hadoop started out as a scalability solution to high volume and velocity batch processing. The idea behind the processing paradigm called MapReduce [10] is to provide a simple yet powerful computing framework that enables computations to scale over big data easily. This type of system requires high efficiency. Therefore, instead of moving data to computation, computation is moved closer to data in Hadoop. Hadoop and MapReduce provide a shared and integrated foundation for enabling this type of computation and seamlessly integrate additional tools to support other necessary system functionalities. All of the modules in Hadoop are designed with a fundamental assumption that hardware failures, whether of individual machines or racks of machines, are common and thus should be automatically handled in software by the framework. The notion behind Hadoop’s reliability requirements is based on the idea that if one computer fails once a year, then a 365-computer cluster will have a failure daily. If this number is scaled by an order of magnitude, the cluster could be expected to have a hardware failure hourly. It is essential for truly scalable systems to endure failure of any component.

Since data has been regarded as “new oil” or “new gold” in terms of an asset value, a new approach to storing and computing has emerged. As the cost of storing the data continues to decrease, organizations have developed a new approach of keeping all data. Since data is growing rapidly in size and complexity, the “schema on read style” has become the approach of choice. This approach enables all of the data to be ingested in a rough form and then projected into the schema on the fly, as it is pulled out of the stored location, thereby enabling experiments and new types of analysis.

2.3 The Apache Hadoop Framework Components

All of the Hadoop modules are designed around a fundamental assumption of hardware failures. The entire Apache Hadoop “platform” is now commonly considered to consist of a number of related projects including Apache Pig, Apache Hive, Apache HBase, and others. These components will be described in the following sections and are illustrated in Fig. 6.4.

Fig. 6.4
figure 4

Apache Hadoop basic components

The Apache Hadoop framework is composed of the following modules:

  • Hadoop Common: contains the libraries and utilities needed by other Hadoop modules

  • Hadoop Distributed File System (HDFS): a distributed file system that stores data on the commodity machines, providing very high aggregate bandwidth across the cluster

  • Hadoop YARN: a resource-management platform responsible for managing compute resources in clusters and using them for scheduling of users’ applications

  • Hadoop MapReduce: a programming model for large-scale data processing

Although the MapReduce Java code is common, any programming language can be utilized to implement the “map” and “reduce” parts of the user’s program. Apache Pig [12] and Apache Hive [13], among other related projects, expose higher level user interfaces like Pig Latin and a SQL variant, respectively. The Hadoop framework itself is mostly written in the Java programming language, with some native code in C and command line utilities written as shell scripts.

The two primary components at the core of Apache Hadoop version 1 are the Hadoop Distributed File System (HDFS) [14] and the MapReduce parallel processing framework (Fig. 6.5). These are both open source projects, inspired by technologies initially developed by Google. Hadoop’s MapReduce and HDFS components originally derived from Google’s MapReduce and Google File System (GFS) [11], respectively.

Fig. 6.5
figure 5

Hadoop’s distributed data storage and processing

2.4 Hadoop Distributed File System

The Hadoop Distributed File System (HDFS) is a distributed, scalable, and portable file system written in Java. Each node in a Hadoop instance typically has a single NameNode, and a cluster of DataNodes forming the HDFS cluster [14]. Each DataNode provides blocks of data using a HDFS-specific block protocol (Fig. 6.6). The file system uses the TCP/IP layer for network communication. HDFS enables storing and manipulating large data sets by distributing them across multiple hosts. In addition, it enables reliability by replicating the data across multiple hosts, without requiring RAID (Redundant Array of Independent Disk) storage. Typically, data is duplicated on three nodes including two on the same rack, and one on a different physical rack. Data nodes can communicate to rebalance data, transfer copies of data and to keep the replication of data at the specified level [14]. HDFS provides the high-availability capabilities and automatic fail over in the event of failure.

Fig. 6.6
figure 6

Hadoop Distributed File System operations

The HDFS architecture includes a secondary NameNode; however its role is not to take over when the primary NameNode goes offline. The secondary NameNode connects with the primary NameNode on a regular basis and generates snapshots of the primary NameNode’s directory information. This information is then saved by the system and can be used to restart a failed primary NameNode in order to create an up-to-date directory structure without having to repeat the entire set of file system actions [15].

In the original Hadoop release, NameNode was the single metadata storage and management information center. However, with the continually increasing number of files, this approach created challenges. HDFS Federation solves the bottleneck issue by allowing multiple name spaces served by distinct NameNodes and thereby enables data awareness between the job tracker and task tracker. The job tracker schedules map or reduces jobs, minimizing the amount of data movement. This can have a significant impact on job completion times, especially with data-intensive jobs.

2.4.1 Overview of Data Formats

There are a number of file and compression formats supported by the Hadoop framework, each with a corresponding set of application-specific strengths and weaknesses. HDFS enables several formats for storing data including HBase for data access functionality and Hive for data management and querying functionality. These file formats are designed for MapReduce or Spark computing engines for specific purposes, ranging from basic analytics to machine learning applications.

Choosing the most appropriate file format can have a significant impact on performance. It influences many aspects of the file system including read and write times, the ability to split files into smaller components, enabling partial reads and advanced compression support.

Hadoop enables the storage of text, binary, images, or other Hadoop-specific formats. It provides built-in support for a number of formats specifically optimized for Hadoop storage and processing. Some of the most common basic data formats include text, CVS files, and JSON records. More complex formats such as Apache Avro, Parquet, HBase, or Kudu can also be utilized. While text and CSV files are very common, they do not support block compression and therefore often come with a significant read performance cost. One common approach is to create a JSON document in order to add structure to text files and utilize structured data in HDFS. JSON is a common data format often used for asynchronous communication. JSON stands for JavaScript Object Notation records and is an open-standard file format that uses human-readable text to transmit data objects consisting of attribute-value pairs and array data types. JSON files store metadata and can “split” files; however it doesn’t support block compression [16].

Several additional more sophisticated and specialized file formats are available in the Hadoop environment. One such format is Avro [17]. Avro is a data serialization standard for the compact binary data format used for storing persistent data on HDFS. It provides numerous benefits and has evolved into the de facto standard. Avro’s lightweight and fast data serialization and deserialization enables fast data ingestion. It stores metadata with the data itself and allows specification of an independent schema for reading the files. It can quickly navigate to the data collections in fast, random data access fashion. In addition, Avro files are “splittable,” support block compression, and are accompanied by a wide and mature set of open source tools.

The RC (Record Columnar) file format was the first columnar file in Hadoop. It provides substantial compression and query performance benefits. However, it does not support schema evaluation. Optimized RC Files (ORC) represent the compressed version of RC files with additional improvements including enhanced compression and faster querying.

The Parquet file format is another column-oriented data serialization standard enabling compression, encodings, query performance benefits, and efficient data analytics. This format has gained popularity as it became the choice of format for Cloudera Impala. This optimization and usability contributed to its popularity in other ecosystems as well.

Apache HBase is a scalable and distributed NoSQL database on HDFS for storing key-value pairs. Keys are indexed, which typically enables fast access to the records.

The Apache Kudu file format is scalable and distributed table-based storage. Kudu provides indexing and columnar data organization to achieve a balance between ingestion speed and analytics performance. As in the case of HBase, Kudu’s API enables modification of the data that is already stored in the system.

In general, three major factors should be considered when choosing the best format for the task at hand: write performance, partial read performance, and full read performance. These factors provide an indication of how fast the data can be written, how fast individual columns can be read, and how fast can data element be read from the data source. Columnar formats typically perform better in terms of read performance. CSV and other non-compressed formats typically demonstrate better write performance, but generally demonstrate slower reads due to lack of compression. Some additional key factors that should be considered while selecting the best file format include the type of the Hadoop distribution and associated formats. Additionally, querying and processing requirements should be considered alongside the processing tools. Further, extraction requirements merit attention, especially when extracting data from a Hadoop environment into an external database or other platforms. Finally, storage requirements are critical, as volume may become a significant factor, and compression may be required.

2.5 MapReduce

The MapReduce paradigm is the core of the Hadoop system. MapReduce is a distributed computing-based processing technique (Fig. 6.7). MapReduce was designed by Google in order to satisfy the need for efficient execution of a set of functions on a large amount of data in batch mode. The “map” function distributes the programming tasks across a large number of commodity cluster nodes. It handles the placement of the tasks in a way that balances the load and manages recovery from failures. After the distributed computation is completed, another function called “reduce” aggregates all the elements back together in a “shuffle” and organizes the result. An example of MapReduce usage would be to determine a word count across thousands of newspaper articles.

Fig. 6.7
figure 7

The MapReduce process

MapReduce works with the underlying file system and typically consists of one JobTracker that receives the client’s MapReduce job requests (Fig. 6.8). The JobTracker distributes processing to the available TaskTracker nodes in the cluster, while striving to keep the work as close to the data as possible. JobTracker is aware of which node contains the data and what neighboring processing is available. If for some reason processing cannot be executed on the same data hosting node, then priority is given to computing nodes in the same rack, thereby minimizing network traffic.

Fig. 6.8
figure 8

MapReduce, detailed view

If a TaskTracker fails or times out, that part of the job is rescheduled. The TaskTracker on each node issues a separate Java Virtual Machine process to prevent the TaskTracker from failing. A heartbeat is sent from the TaskTracker to the JobTracker on a regular basis to check its status.

2.6 YARN

Apache Hadoop YARN is a sub-project of Hadoop. MapReduce underwent a complete retrofit in an early version (v0.23) and the MapReduce 2.0 was designed with YARN [18] as a key component. It separates the resource management and processing components enabling a broader array of interaction patterns for data stored in HDFS. The YARN-based architecture provides a more general processing platform that is not constrained by MapReduce limitations.

The fundamental concept underlying YARN is to split up the two major functionalities of the Job Tracker, resource management and job scheduling/monitoring, into separate functionalities. The Global Resource Manager (RM) and per-application Application Master (AM) are key components. The RM is the ultimate authority that arbitrates resources among all the applications in the system. The AM is a framework-specific library tasked with negotiating resources from the RM and working with the Node Manager(s) to execute and monitor the tasks [18].

YARN enhances the power of a Hadoop-based cluster in several key ways. First, it enables a higher level of scalability as the processing power in data centers continues to grow quickly. The YARN RM focuses solely on scheduling and therefore is able to manage extremely large clusters very efficiently. Furthermore, YARN enables compatibility with existing MapReduce applications without disruption to the existing processes. Additionally, YARN significantly improves cluster utilization by enabling workloads beyond that of MapReduce. The MapReduce RM is a pure scheduler that optimizes cluster utilization according to specified criteria such as capacity guarantees, fairness, and SLAs. In contrast, YARN enables additional programming models for real-time processing such as Spark, graph processing, machine learning, and iterative modeling. YARN’s processing approach has the additional ability to evolve independently of the underlying RM layer in a much more agile manner.

3 An Introduction to Big Data Modeling and Manipulation

With the evolution of computing technology, it is now possible to manage immense volumes of data that previously could have only been handled at great expense by supercomputers. Prices of computing and storage systems continue to drop, and as a result, new techniques for distributed computing have become mainstream. The key inflection point for Big Data occurred when companies like Yahoo!, Google, and Facebook came to the realization that they there was an opportunity to monetize the massive amounts of data collected. New technologies were required to create large data stores, access those stores, and process huge amounts of data in near real-time. The resulting solutions have transformed the data management market. In particular, Hadoop, MapReduce, and Big Table proved to be the start of a new approach to data management. These technologies address one of the most fundamental problems: the need to process massive amounts of data efficiently, cost effectively, and in a timely fashion.

3.1 Big Table

Big Table was developed by Google as a distributed storage system intended to manage highly scalable structured data. Data is organized into tables with rows and columns. Unlike a traditional relational database model, Big Table is a sparse, distributed, persistent multidimensional sorted map. It is intended to store massive volumes of data across a scalable array of commodity servers.

3.2 Pig

Pig is the high-level programming component running on top of Hadoop’s MapReduce component. Pig is a procedural language for creating MapReduce programs used with Hadoop [12]. Pig was originally developed at Yahoo Research in 2006 to enable ad hoc execution of MapReduce jobs on very large data sets. The initial language was called Pig Latin, enabling a variety of data manipulations in Hadoop. It is a SQL-like language that enables a multi-query approach on a nested relational data model where schema is optional. Apache Pig provides a rich set of built-in operators to support data operations including joining, filtering, sorting, ordering, nested data types, etc. on both structured and unstructured data. In addition, through the User Defined Functions (UDF), Pig can invoke code in many other languages including JRuby, Jython, and Java. This allows for the development of larger, more complex applications.

Pig is typically used in ETL applications for describing how a process will extract data from a source, transform it according to a rule set, and then load it into a data store. Pig can ingest data from files, streams, or other sources using the UDF. Once data is ingested, operations similar to the select command, various iterations, and other complex transformations can be performed. Once the processing is finalized, Pig stores the results of the transformations into the HDFS. Throughout the processing steps, Pig scripts are translated into a series of MapReduce jobs executed on the underlying Hadoop cluster.

3.3 Sqoop

Apache Sqoop is a tool designed for efficiently transferring bulk data between Hadoop and structured data stores such as relational databases [18]. Sqoop is a portmanteau that stands for SQL-to-Hadoop and is a simple command-line tool with the several valuable capabilities. Sqoop has the capability to import individual tables or entire databases to files in HDFS. It also generates Java classes to allow interaction with imported data. Additionally, Sqoop provides the ability to import from SQL databases directly into the Hive data warehouse within the Hadoop environment, thereby enabling computing on the data very rapidly.

3.4 Hive

Hive is the data warehouse software platform that enables a SQL-like language for facilitating, querying, and managing, large datasets residing in HDFS storage [13]. Often times referred to as the Hadoop data warehouse, Hive infrastructure sits on top of Hadoop and provides data query and analysis. HiveQL is the mechanism used to project structure onto the data and query the data using a SQL-like language [19]. HiveQL provides schema on read and transparently converts queries to MapReduce, Apache Tez, and Spark jobs. All three execution engines can run in Hadoop YARN. To accelerate queries, it provides indexes including bitmap indexes [13]. Additionally, Hive allows traditional and custom map and reduce mechanisms when HiveQL might be insufficient.

Initially developed by Facebook, Apache Hive is now used and developed industry wide. Hive supports analysis of large datasets stored in Hadoop’s HDFS as well as compatible file systems such as the Amazon S3 filesystem.

3.5 HBase

Apache HBase is a column-oriented, distributed, and scalable database management system that runs on top of HDFS. HBase is a key component of the Hadoop stack, enabling fast, random access to large data sets. HBase is modeled after Google’s BigTable, in order to handle massive data tables containing billions of rows and millions of columns.

It is well suited for sparse data sets, which are common in numerous Big Data use cases. Unlike relational database systems, HBase does not support a structured query language like SQL. HBase applications are written in Java similar to a typical MapReduce application. HBase also supports writing applications in Avro, REST, and Thrift.

3.6 Oozie

Apache Oozie is a scalable, reliable, and extensible system workflow scheduler system that manages and coordinates Apache Hadoop jobs while supporting MapReduce, Pig, Hive, Sqoop, etc. Oozie workflow coordinator jobs are Directed Acyclic Graphs (DAGs) of actions that are recurrent jobs triggered by time frequency and data availability [20]. Oozie is integrated with the Hadoop stack, typically with YARN, and supports numerous types of Hadoop jobs including Java MapReduce, Pig, Hive, Streaming MapReduce, Sqoop, general-purpose Java code, shell scripts, etc. Oozie itself is a Java Web application that combines multiple jobs sequentially into one logical unit of work. Oozie Bundle enables packaging of multiple coordinator and workflow jobs and management of the job’s life cycle. It enables cluster administrators to develop complex data transformations with multiple component tasks, thereby providing greater job control recurrence.

3.7 Zookeeper

Apache ZooKeeper [21] provides operational services for a Hadoop cluster by enabling a distributed configuration service, a synchronization service, group services, and a naming registry for distributed systems [22]. Distributed applications use Zookeeper to store and mediate updates to important configuration information.

Due to the diversity of types of service implementations for applications, management can become rather challenging when the applications are deployed. ZooKeeper’s purpose is to extract the essence of these different services into a simple interface via a centralized coordination service. The service itself is distributed and reliable supporting consensus, group management, and presence protocols. Application-specific utilization consists of a mixture of specific components of ZooKeeper and application-specific conventions. ZooKeeper recipes [22] provide a simple service that can be used to build powerful abstractions.

3.8 Data Lakes and Warehouses

A data lake is defined as a centralized storage repository or system that contains a massive amount of structured, semi-structured, and unstructured raw data. The data structure and requirements are not defined until the data is needed to run different types of analytics. Often times a data lake is a single store of all raw enterprise data including copies of source system data, log files, clickstreams, social media, and output from IoT devices. It can also include transformed data used for delivering dashboards, reporting, visualization, new types of real-time analytics, and machine learning.

A data warehouse is a database optimized to analyze relational data coming from transactional systems and business applications. The data structure and schema are defined in advance to optimize for fast queries. The data from a warehouse is typically used for reporting and analysis. Data is cleaned, enriched, and transformed, so it can act as the “single source of truth” that users can trust [23]. Many organizations support both a data warehouse and a data lake, as they serve different needs and use cases. A data lake enables storage of non-relational data from mobile apps, IoT devices, and social media and does not require the schema to be defined when data is captured (referred to as “schema on read”). Massive amounts of data can be stored without careful schema design, thereby enabling flexibility in terms of the kinds of questions or data analytics that might need to be performed in the future. Data lakes enable different types of analytics including big data analytics, text mining, real-time stream data processing, and machine learning.

One great, early example of a successful data lake is the Big Data implementation at Mercy Hospital. The hospital leveraged technology to improve medical outcomes for patients by utilizing one of the first comprehensive, integrated electronic health record (EHR) systems to provide real-time, paperless access to patient information. Utilizing the EHR from Epic Systems, every patient’s activity, including clinical and financial interactions, was captured. The hospital needed to address several typical challenges associated with Big Data implementations including scalability, data schema requirements, and response to large data queries. Mercy, in partnership with Hortonworks (one of the early Big Data providers at the time), has created the Mercy Data Library, a Hadoop-based data lake. This data lake enabled the integration, ingestion, and processing of large amounts of batch data extracts from relational systems, real-time data directly from Epic HER, and information from social media and even weather sources [24]. The combination of all of these data sets in a common platform enables the hospital to ask and answer questions at that were previously impossible due to scale, cost, or both.

One of the projects implemented on the Mercy system utilized thee advanced analytics techniques on a large amount of intensive care unit (ICU) patients’ vitals data. When a patient is admitted to ICU, devices reading the patient’s vitals send a new data record every second. Each ICU patient generates a large amount of data, which is often times very noisy. This data is traditionally summarized into 15 minute or longer intervals due to either the systems’ inability to store and process the large amounts of more granular data or the cost-effectiveness of doing so. This approach limits the types of signal detection and analysis that can be performed on the data at scale. For example, determining which medicines bring down fever fastest would require a fine-grained measure of various streams (heart rate, breathing, movement, pain, etc.). Determining the efficacy of the medicine and enabling decision making in real time or near real time would require data time resolution of seconds or minutes. By implementing the data lake, researchers at Mercy Hospital were able to capture diagnostic data at high temporal rates, which in turn enabled real-time and near-real-time processing of the data.

This real-time data-on-demand model for researchers and clinicians is enabled by a combination of Sqoop, Storm, and HBase for more granular updates. In addition, Hive has provided a SQL-like approach to enabling the scalability of the Hadoop data lake.

4 An Introduction to Spark: An Innovative Paradigm in Big Data

One fundamental component to the Big Data Ecosystem not yet mentioned is Spark [25]. Although Hadoop captures the most attention for distributed data analytics, there are alternatives that provide advantages to the typical Hadoop platform. Apache Spark is an open source cluster computing framework originally developed in the AMPLab at the University of California, Berkeley, but was later donated to the Apache Software Foundation where it remains today [26].

Spark is a scalable data analytics platform that incorporates primitives for in-memory computing and typically demonstrates a significant speedup of the classic Hadoop’s cluster compute and storage approach. Spark is implemented in the Scala programming language and provides a unique environment for large-scale data storage and processing. In contrast to Hadoop’s two-stage, disk-based MapReduce paradigm, Spark’s multi-stage, in-memory primitives approach provides performance up to 100 times faster for certain applications [25]. By allowing user programs to load data into a cluster’s memory and query it repeatedly, Spark can enable a fast, efficient implementation of a variety of machine learning algorithms.

Similar to traditional Hadoop system, Spark requires a cluster manager and a distributed storage system. For cluster management, Spark supports the standalone native Spark cluster, Hadoop YARN or Apache Mesos. For distributed storage, Spark can interface with a wide variety of systems including HDFS, Cassandra, OpenStack Swift, Amazon S3, or custom solutions.

4.1 The Spark Ecosystem

While often compared to Hadoop and MapReduce, Spark is not a modified version of Hadoop. Hadoop is simply one of several ways of implementing Spark. In fact, Spark can run completely independently from Hadoop, powered by its own cluster management.

Spark typically leverages Hadoop in two ways by utilizing its storage and processing. As Spark has its own cluster management computation, it often times only uses Hadoop for storage.

Spark Core is the underlying general execution engine for the Spark platform. It enables in-memory computing and referencing datasets in external storage systems (Fig. 6.9).

Fig. 6.9
figure 9

Spark ecosystem

Spark SQL is a component on top of Spark Core that introduces a new data abstraction called SchemaRDD, which provides support for structured and semi-structured data [27].

Spark Streaming leverages Spark Core’s fast scheduling capability to perform streaming analytics. It ingests data in mini-batches and performs RDD (Resilient Distributed Datasets) transformations on those mini-batches of data [28].

Spark’s machine learning library called MLlib is a distributed machine learning framework capable of taking advantage of the computational speedup related to the distributed memory-based Spark architecture. Spark MLlib is often an order of magnitude faster than Hadoop’s original, but now retired, Mahout library [29].

GraphX is a distributed graph-processing framework layered on top of Spark. It provides an API for expressing graph computation. It also enables and optimizes user-defined graphs and processing by leveraging Pregel abstraction API [30].

Spark provides built-in APIs, supports, and is compatible with many languages and frameworks including Java, Scala, Python, R, Ruby, JavaScript, SparkSQL, Hive, Pig, H20, etc.

Spark can run standalone or on top of a cluster computing framework such as Hadoop. It handles batch, interactive, and real-time analysis within a single framework. It provides native integration with Java, Python, and Scala, thereby enabling programming at a higher level of abstraction.

One of the main advantages of Spark is that it is a general-purpose computing engine that seamlessly encompasses data streaming management, data queries, machine learning prediction, and real-time access to various analyses.

4.2 The Core Difference Between Spark and Hadoop

MapReduce can enable users to write parallel computations using a set of high-level operators without having to worry about work distribution and fault tolerance. However, it also demonstrates a number of limitations for complex computational tasks. While MapReduce is great at one-pass computation, it becomes rather inefficient for multi-step algorithms due to the lack of efficient data sharing. Every state between the map and reduce steps requires connection to the distributed file system and is slow due to replication and disk storage. In most current frameworks, the only way to reuse data between MapReduce jobs is to write data to an external storage system and then read from the same location at later time. Since, iterative and interactive applications require faster data sharing across parallel jobs, Hadoop’s slow data sharing via MapReduce, due to replication, serialization, and disk I/O can cause serious computational delays. This is a major contributor to the often-presented statistic claiming that most of the Hadoop applications spend more than 90% of the time on the HDFS read and write operations [31].

Iterative operations on MapReduce typically have a need to reuse intermediate results across multiple computations in multi-stage applications. Figure 6.10 depicts how a traditional Hadoop framework enables iterative operations via MapReduce, showcasing data replication, disk I/O, and serialization overheads, causing the overall computational time consequences.

Fig. 6.10
figure 10

MapReduce execution of iterative operations

Interactive operations on MapReduce are typically performed when ad hoc queries are executed on the same subset of data. In this scenario, each query will perform the disk I/O, which can dominate application execution time. Figure 6.11 illustrates how the traditional Hadoop framework accomplishes the interactive queries on MapReduce.

Fig. 6.11
figure 11

MapReduce interactive execution illustration

As one might expect, large performance hits were experienced when complex workflows were executed on large amounts of data. The need for executing complex workflows without writing intermediate results to disk after every operation becomes a challenge. A new two-pronged approach was proposed to solve these challenges. One aspect of the approach is to cache intermediate results in memory. The second is to allow users to specify persistence in memory and partition the dataset across nodes. In order to ensure fault tolerance, granular atomicity via partitions and transaction logging were used instead of replication.

The highest-level unit of computation in MapReduce is a job that loads data, applies a map function, shuffles, applies a reduce function, or writes data to persistent storage. In Spark, on the other hand, the highest-level unit of computation is an application that can be used for a single batch job, an interactive session with multiple jobs, or a server repeatedly fulfilling requests. A Spark job can consist of more than just a single map and reduce. Spark application processes can run on its behalf even when it’s not running a job. Furthermore, multiple tasks can run within the same executor, resulting in orders of magnitude faster performance when compared to MapReduce.

Spark’s goal was to generalize MapReduce to support new applications and enable more complex, often iterative or recursive, computations within same engine. Two main additions to Hadoop’s approach were powerful enough to express these types of computation and overcome deficiencies of the previous models: fast data sharing and general Directed Acyclic Graphs (DAGs) for computation. These approaches will be presented in more detail in the following sections. This approach enables a much more efficient and much simpler approach, as end users can utilize libraries instead of specialized systems to run complex computational workflows.

4.3 Resilient Distributed Datasets in Spark

The Resilient Distributed Dataset (RDD) is Spark’s fundamental data structure. It is an immutable foundational distributed collection of objects [32]. The original published paper proposed the concept of the RDD as a resilient, fault-tolerant data structure. The RDD lineage graph is able to recompute missing or damaged partitions due to node failures. RDDs are distributed with data residing on multiple nodes in a cluster. RDDs do not change once created and can only be transformed using transformations to new RDDs.

Each dataset in a RDD is divided into logical partitions, which are often computed on a variety of nodes of the cluster. By definition, RDD is a read-only, partitioned, fault-tolerant collection of records that can be processed in parallel manner [32]. RDDs can contain any type of Python, Java, Scala, or user-defined classes and objects. RDDs can be created in one of two ways: either by parallelizing an existing collection or referencing a dataset in an external storage system (HDFS, HBase, etc.). Spark makes use of the concept of RDD to achieve faster and more efficient MapReduce operations.

As shown in Fig. 6.12, iterative Operations on Spark RDDs store intermediate results in a distributed memory and thereby offer much faster computation. In cases when the distributed memory (RAM) is not sufficient to store intermediate results, Spark will default to storing those results on the disk.

Fig. 6.12
figure 12

Spark’s approach to fast data sharing for iterative operation

The interactive operations on Spark RDD illustrated in Fig. 6.13 are typically utilized when different queries are executed on the same set of data repeatedly. This particular data can be kept in memory to realize significant improvements in execution times.

Fig. 6.13
figure 13

Spark’s approach to fast data sharing for queries

Each transformed RDD may be recomputed each time an action is executed on that RDD. However, RDDs can also persist in memory, in which case Spark will keep the elements on the cluster for considerably faster access the next time it is queried. There is also support for persisting RDDs on disk in Spark and replication across multiple nodes.

4.4 RDD Transformations and Actions

RDDs enable two main types of operations: transformations and actions. Transformation operations could be applied on RDDs and typically return another RDD, while action operations trigger computation and return values.

Spark transformation is a function that produces new RDDs from existing RDDs. It takes RDDs as input and produces one or more RDDs as output based on the transformation applied. Each time a transformation is applied, a new RDD is created. Note that the input RDDs cannot be changed due to the RDD design requirement that they are immutable by nature.

The process of applying transformations builds a RDD lineage. It keeps track of all of the parent RDDs of the final RDD(s). RDD lineage is also known as the RDD operator graph or RDD dependency graph. It represents a logical execution plan in the form of a Directed Acyclic Graph (DAG) of the entire set of parent RDDs.

RDD transformations are evaluated in a “lazy” manner, by performing the computation only when an action requires a result to be returned. Therefore, they are not executed immediately. Two of the most basic and often used transformations are the map and filter. A map function iterates over every line in a RDD and applies that function to every element of RDD, possibly enabling the flexibility that the input and the return type of RDD may differ from each other. For example, the input RDD type can be a string and after applying the map function the return RDD can be Boolean. Filter functions return a new RDD, containing only the elements that meet a predicate [32].

After the transformation, the resultant RDD is different from its parent RDD. It can be smaller if functions like count, filter, or sample are applied or larger when functions like Cartesian or union are applied. Alternatively, it could remain the same size when a map function is applied.

There are two types of transformations: narrow and wide. In narrow transformations, all the elements that are required to compute the records in single partition reside in the single partition of the parent RDD. A limited subset of partitions is used to calculate the result. Typical narrow transformations are considered to be the result of map or filter functions. Alternatively, in a wide transformation, all the elements that are required to compute the records in the single partition may reside in numerous partitions of the parent RDD. Wide transformations typically result from the application of join or intersection.

Transformations create RDDs from each other. However, in order to work with the actual dataset, actions need to be performed. Actions are Spark RDD operations that create non-RDD values. When an action is triggered and the result is calculated, the new RDD is not automatically formed as it was the case with transformations. The values of actions are stored to drivers or to the external storage system. In the example below, the first line defines a base RDD from an external file.

#Create an RDD from a file on HDFS text = sc.textFile('hdfs://user1/mytext.txt') #Transform the RDD of lines into an RDD of words mywords=text.flatMap(lambda line: line.split()) #Transform the RDD of words into an RDD of key/value pairs mykeyvals=mywords.map(lambda word:(word,1))

RDD transformation vs. action

#Map Transformation example counting length lineLength = text_map(Lambda s: len(s)) #Reduce Action Example totalLength = lineLength.reduce (Lambda a, b: a+b) #More RDD manipulation examples # The saveAsTextFile action writes the contents of # an RDD to the disk rdd.saveAsTextFile('hdfs://user1/myRDDoutput.txt') # The count action returns the number of elements # in an RDD numElements=rdd.count(); numElements; print(numElements)

This approach enables executions to be optimized while operations are automatically parallelized and distributed on the clusters. These operations are able to handle many machine learning algorithms that are iterative by nature, and the interactive ad hoc queries needed for many analytics applications in an efficient manner. This is enabled by reusing intermediate in-memory results across multiple data-intensive workloads, thereby avoiding movement of large amounts of data over the network.

4.5 Datasets and DataFrames in Spark

A Dataset is a distributed collection of data [33]. The Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs including the strong typing and powerful lambda functions, with the benefits of Spark SQL’s optimized execution engine [33, 34]. A Dataset can be constructed from JVM objects and then manipulated using functional transformations mentioned in the RDD section including application of map, filter, etc. The Dataset API is available in Scala and Java. Python and R do not have the support for the Dataset API, but due to those languages’ dynamic nature, many of the benefits of the Dataset API are already available and easily usable.

A DataFrame is a dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R or Python, but with richer built-in optimizations. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R, typically represented by a Dataset of Rows [ 33]. More details on DataFrames are presented below.

4.6 The Spark Processing Engine

While MapReduce is widely adopted for processing and generating large datasets with a parallel, distributed algorithm on a cluster, more and more iterative and interactive modes of operation have emerged that require faster data sharing across parallel jobs. Data sharing is slow in MapReduce due to replication, serialization, and disk IO. As noted earlier, studies have shown that most of the Hadoop applications spend more than 90% of the time doing HDFS read-write operations [25]. In contrast to Hadoop’s two-stage disk-based MapReduce paradigm, Spark’s iterative in-memory approach enables added computing flexibility and significant speedup. Additionally, Spark’s ability to load data into memory and query it repeatedly enables scalable machine learning algorithm performance via the MLlib library [29]. The ability to perform sophisticated, advanced analytics is one of the main advantages of Spark, as it also supports SQL queries, streaming data, machine learning (ML), and graph algorithms.

While MapReduce can achieve complex tasks by defining and chaining various maps and reduce tasks, it is limited to one directional, sequential execution of the mappers and reducers. This limitation has been overcome by allowing task definition using DAGs in Spark. DAG in Apache Spark is an alternative to the MapReduce. It is a programming style used in distributed systems enabling multiple levels that form a tree structure without having to write the intermediate results to disk. A DAG is a finite directed graph with no directed cycles, consisting of finitely many vertices and edges. Each edge is directed from one vertex to another in a consistently directed sequence of edges that can never form a cycle [20].

The DAG concept has successfully been applied to Spark processing. A DAG is represented by a set of vertices and edges, where vertices represent the RDDs and edges represent the operation to be applied on the RDD. In order for action to be executed on the RDD, Spark creates the DAG of the tasks to be executed. The DAG is then submitted to the DAG scheduler that divides operators into stages of tasks in order to perform parallel computation.

The principal unit of Spark’s computations is a job. It is typically a piece of code that reads some input from HDFS, performs computation on the data, and writes output data. Jobs are divided into stages. Stages are classified as a Map or Reduce stages and are divided based on computational boundaries. Most computations are executed over many stages. Each stage has some number of tasks, and typically one task is executed on one partition of data on one machine. The Executor is the process responsible for executing a task. The program/process responsible for running the job over the Spark engine is typically referred to as a driver. A driver runs on the master node, while the machine on which the executor runs is referred to as the slave.

4.7 Spark Components

Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object located in the driver (see Fig. 6.14). The driver separates the process to be executed and creates the SparkContext in order to schedule jobs and negotiate with the cluster manager.

Fig. 6.14
figure 14

Spark components

SparkContext can connect to several types of cluster managers (standalone Spark, Mesos, YARN, or Kubernets), in order to allocate resources across applications. Once connected, Spark acquires executors on nodes in the cluster. Executor processes run computations and store application data. Subsequently, it sends application code as defined by JAR or Python files passed to SparkContext, to the executors. Finally, SparkContext sends tasks to the executors to run.

Spark jobs contain a series of operators and run on a set of data. All the operators in a job are used to construct a DAG as shown in Fig. 6.15. The DAG is optimized by rearranging and combining operators where possible. For example, if the submitted Spark job contains a map operation followed by a filter operation, Spark’s DAG optimizer will reorder the operators, as filtering reduces the number of records to before applying the map operation.

Fig. 6.15
figure 15

DAG for a set of Spark jobs

The Spark system is divided into various layers with individual responsibilities in order to execute tasks efficiently. The layers are independent of each other. The primary layer is the interpreter, and Spark uses a Scala interpreter. As commands are entered in Spark console, Spark will create an operator graph. When an action is executed, the graph is submitted to a DAG Scheduler. The DAG scheduler divides the operator graph into a map and reduces stages. Each stage is comprised of tasks based on partitions of the input data. The DAG scheduler orders operators to optimize the graph, which is key to Spark’s fast performance. The final result of a DAG scheduler is a set of stages that are passed to the Task Scheduler. The Task Scheduler launches tasks via a cluster manager (Spark Standalone, Yarn, Mesos, Kubernets). Nonetheless, it is unaware of any dependencies among stages as illustrated in Fig. 6.16. The Worker executes the tasks; however, it knows only about the code that is has received.

Fig. 6.16
figure 16

Spark’s internal job scheduling process

There are several advantages of DAG in Spark. In case of a lost RDD, Spark can recover the information using the DAG and, with multiple levels of execution, can execute a SQL query or ML operations with much more flexibility and efficiency than MapReduce.

4.8 Spark SQL

Spark SQL is a Spark module for structured data processing on very large data sets. Spark SQL provides Spark with additional information about the structure of data and computation and uses this additional information to perform optimizations. Spark SQL provides a fast execution engine by utilizing Spark as the underlying execution engine for low-latency, interactive queries. It also provides the ability for scale-out and failure recovery. The most common use of Spark SQL is to execute SQL queries. However, it is also Hive compatible via Hive Query Language (HQL). This allows it to read data from an existing Hive warehouse without a need to change queries or move data [26]. Spark enables querying of various data sources in addition to Hive tables including Parquet and JSON. In addition, Spark SQL enables combining SQL queries with the data manipulations and complex analytics supported by RDDs in Python, Java, and Scala [35].

Spark SQL provides three main capabilities for using structured and semi-structured data. First, it provides a DataFrame abstraction in Python, Java, and Scala, thereby simplifying the manipulation of structured datasets. Second, it can read and write data in a variety of structured formats including JSON, Hive Tables, and Parquet. Third, it enables data query using SQL. This can be accomplished both inside a Spark program and from external tools that connect Spark SQL to third-party tools via standard database connectors like JDBC and ODBC [36].

4.9 Spark DataFrames

A DataFrame in Spark represents a distributed collection of data organized into named columns [33]. A DataFrame is conceptually equivalent to a table in a relational database, a data frame in R or Python’s Panda DataFrame, but with additional optimizations for the Spark engine. DataFrames support and can be constructed from a wide array of sources including structured data files, Hive tables, JSON, Parquet, external databases, HDFS, S3, etc. Additionally, through Spark SQL’s external data sources API, DataFrames can be extended to support any third-party data formats or sources, including Avro, CSV, ElasticSearch, Cassandra, etc. DataFrames are evaluated lazily, just like RDDs, while operations are automatically parallelized and distributed on clusters. State-of-the-art optimization and code generation is woven throughout the Spark SQL Catalyst optimizer utilizing a tree transformation framework. DataFrames can be easily integrated with the rest of the Hadoop ecosystem tools and frameworks via Spark Core and provides an API for Python, Java, Scala, and R Programming (Fig. 6.17).

Fig. 6.17
figure 17

Many ways to create a DataFrame in Spark

4.10 Creating a DataFrame

In order to start any Spark computation, a basic Spark session needs to be initialized using the sparkR.session() command [ 33]. Code presented below is adapted from the Spark http://spark.apache.org website.

From pyspark.sql import SparkSession spark = SparkSession .builder .appName("Python Spark example") .config("myspark.config.option", "myvalue") .getOrCreate()

Reading data from a JSON file into a DataFrame is demonstrated in the code below:

# Read the data file in JSON format df = spark.read.json("/user1/employees.json") # Displays the content of the DataFrame df.show()

4.10.1 Example of Reading DataFrame from the Parquet File

dfParquet=spark.read.parquet ("/user1/employees.parquet") display(dfParquet)

4.11 DataFrame Operations

DataFrames provide a domain-specific language for structured data manipulation in Scala, Java, Python, and R. DataFrames are Dataset of Rows in the Scala and Java APIs. These operations are also referred to as “untyped transformations” in contrast to “typed transformations” typically associated with strongly typed Scala or Java Datasets.

Basic examples of structured data processing using Datasets is demonstrated below:

# Print the schema in a tree format df.printSchema() # Select only the "name" column df.select("name").show() # Select employees with salary greater than 3000 df.filter(df['salary'] > 3000).show() # Count people by salary df.groupBy("salary").count().show()

The advantage of a SQL function on a SparkSession is that it enables applications to run SQL queries programmatically and returns the result as a DataFrame [33]. Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If there is a need for a temporary view to persist and be shared among all sessions until the Spark application terminates, a global temporary view should be created. A global temporary view is tied to a system preserved database global_temp and must use the qualified name to refer it, e.g., SELECT * FROM global_temp.employee.

#Temporary view utilized to query the data df.createOrReplaceTempView("employees") sqlDF = spark.sql("SELECT * FROM employees ") sqlDF.show() #DataFrame registered as a global temporary view df.createGlobalTempView("employees") spark.sql("SELECT * FROM global_temp.employees").show() #This will be available in the new Spark session spark.newSession().sql("SELECT * FROM global_temp.employee").show()

Remember to only use SELECT * in cases of small data sets similar to the ones used in these illustrative examples; otherwise the WHERE clause should be utilized to prevent the possibly very large amount of queried data.

Some additional examples of the queries enabled by Spark SQL are shown below:

From pyspark.sql import functions as F #Show all entries in the column named First name df.select("firstName").show() # Show all entries where salary >2000 df.select(df['salary'] > 2000).show() # Show first name and 0 or 1 depending if they are # older or younger than 25 df.select("firstName", F.when(df.age > 25, 1) .otherwise(0)).show()

4.12 Spark MLlib

Spark MLlib is a library containing various machine learning (ML) functionalities optimized for the Spark computing framework. MLlib provides an extensive number of machine learning algorithms and utilities including classification, regression, clustering, association rules, sequential pattern mining, ensemble models, decomposition, topic modeling, and collaborative filtering [30]. In addition, MLlib supports various functionalities such as feature extraction, model evaluation, and validation. All of these methods are designed and optimized to scale across a Spark cluster. Spark’s machine learning utilities enable construction of pipelines including tasks that range from data ingest and feature transformations, data standardization, normalization, summary statistics, dimensionality reduction, etc. to model building, hyper-parameter tuning, and evaluation. Finally, Spark enables machine learning persistence by saving and loading models and pipelines [37].

4.13 MLlib Capabilities

MLlib’s capabilities enable utilization of the large number of major machine learning algorithms including regression (linear, generalized linear, logistic), classification algorithms (including decision trees, random forest, gradient-boosted tree, multilayer perceptron, support vector machine, naive Bayes, etc.), clustering (K-means, K-medoids, bisecting k-means,) latent Dirichlet allocation, Gaussian mixture model, and collaborative filtering. In addition, it supports feature extraction, transformations, dimensionality reduction, selection, and the designing, constructing, evaluating, and tuning of machine learning pipelines.

There are many advantages of MLlib’s design including simplicity, scalability, and compatibility. Spark’s APIs are simple by design and provide utilities that look and feel like typical data science tools such as R and Python. Machine learning methods can easily be executed with effective parameter tuning [38]. Additionally, MLlib provides seamless scalability by enabling the execution of the ML methods with minimal or no adjustment to the code on a large computing cluster. Spark is compatible with R, Python pandas, scikit-learn, and many other prevalent ML tools. Spark’s DataFrames and MLlib provide common data science tool integration with existing workflows.

The goal of most machine learning experiments is to create an accurate model in order to predict on future, unseen data. In order to accomplish this goal, a training data set is used to “train” to fit the model, and a testing data set is used to evaluate and validate the model obtained on the training data set.

Utilizing the PySpark MLlib features, traditional approaches to machine learning can now be scaled to large and complex data sets. For example, we can use the traditionally utilized Iris data set to demonstrate the capabilities of the MLlib to develop predictive models on Spark.

from pyspark.ml import Pipeline from pyspark.ml.classification import DecisionTreeClassifier from pyspark.ml.evaluation import MulticlassClassificationEvaluator from pyspark.ml.feature import StringIndexer, VectorIndexer from pyspark.ml.linalg import Vectors from pyspark.ml.feature import VectorAssembler #Get Spark Context from Spark Session SpContext = SpSession.sparkContext #Load the Iris.CSV file df = spark.read. csv ("Iris.data", inferSchema=True) .toDF ("sepLenght", "sepWidth", "petLenght", "petWidth", "class") #Print the first 10 rows of the DataFrame df. show (10)

Index the label by converting class to numeric using StringIndexer:

class_index = StringIndexer (inputCol="class", outputCol="classIndex") df = class_index. fit (df). transform (df) # Split the data into train and test (trainingData, testData) = df. randomSplit ([0.8, 0.2]) #train the Decision Tree Model dt = DecisionTreeClassifier (labelCol="ClassIndex", featuresCol="features") model = dt. fit (trainingData) predictions = model. transform (testData) #Evaluate models’ accuracy evaluation = MulticlassClassificationEvaluator ( labelCol="labelIndex", predictionCol="prediction", metricName="accuracy") accuracy = evaluation. evaluate (predictions) #Print models’ error print ("Test Error = %g " % (1.0 – accuracy)) #Print Model summary Print(model)

Spark enables solving multiple data problems on one platform, from analytics to graph analysis and machine learning. The Spark ecosystem also provides a utility for graph computations called GraphX in addition to streaming and real-time interactive query processing with Spark SQL and DataFrames [36].

4.14 Spark Streaming

Spark Streaming is a Spark component that enables processing of live streams of data and enables scalable, high-throughput, fault-tolerant data stream processing.

4.15 Intro to Batch and Stream Processing

Before looking into how specifics of how Spark Streaming works, the difference between batch and stream processing should be defined. Typically, batch processing collects a large volume of data elements into a group at once. The entire group is then processed simultaneously in a batch at a specified time. The time of batch computation can be quantified in a number of ways. The computation time can be determined on a prespecified scheduled time interval or on specific triggered condition including a number of elements of or amount of data collected. Batch data processing is a very efficient way to process large amounts of data collected over a period of time when there is no need for real-time analytics. Historically, this has been the most common data processing approach. Traditional databases and data warehouses, including Hadoop, are common examples of batch systems processing.

Stream processing typically utilizes continuous data and is a key component in enabling fast data processing. Streaming enables almost instantaneously data analysis of the data streaming from one device to another. This method of continuous computation happens as data flows through the system with no required time limitations on the output. Due to the near instant data flow, systems do not require large amounts of data to be stored.

The streaming approach processes each new individual piece of data upon arrival. In contrast to batch processing, there is no waiting until the next batch processing interval. The term micro-batch is frequently associated with streaming, when batches are small or processed at small intervals. Although processing may occur at high frequency, data is still processed a batch at a time in the micro-batch paradigm. Spark Streaming is an example of a system that supports micro-batch processing. Stream processing is highly beneficial if the events are frequent, especially over rapid time intervals, and there is a need for fast detection and response.

4.16 Spark Streaming

Spark Streaming is a Spark component that enables processing of live streams of data by providing an API for manipulating data streams similar to Spark Core’s RDD API. It enables scalable, high-throughput, fault-tolerant data stream processing. Spark Streaming’s API enables the same high degree fault tolerance, throughput, and scalability as Spark Core. Spark Streaming receives input data streams and divides them into batches called DStreams. DStreams can be created from a number of sources such as Kafka, Flume, and Kinesis or by applying operations on other DStreams (Fig. 6.18).

Fig. 6.18
figure 18

Spark Streaming processing

4.17 Spark Functionality

Spark Streaming receives input data streams and divides the data into batches. These batches are then processed by the Spark engine to generate the final stream of results in batches.

Discretized Stream or DStream is the core concept enabled by Spark Streaming. It represents a continuous stream of data. DStream is represented by a continuous series of RDDs. Operations applied to DStreams translate to operations on the underlying RDDs. Spark Streaming discretizes the data into small micro-batches. Spark Streaming receivers accept data in parallel and buffer it in the workers nodes’ memory. The Spark engine processes the batches, while optimizing latency, and outputs the results to external systems as shown in Fig. 6.19.

Fig. 6.19
figure 19

Spark Streaming functionality

Spark Streaming maintains a state based on data coming in a stream often referred to as stateful computations. In addition, Spark Streaming allows window operations where a specified time frame could be used to perform operations on the data. The sliding time interval in the window is used for updating the window, utilizing the window length and sliding interval parameters. When the window slides over a source DStream, the underlying RDDs are combined and operated upon to produce the RDDs of the windowed DStream [28]. Spark tasks are assigned to the workers dynamically on the basis of data locality and available resources, therefore optimizing load balancing and fault recovery.

Spark Streaming’s data stream can originate from the source data stream or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs. Every input DStream is associated with a Receiver, which receives the data from a source and stores it in executor memory.

Analogous to Spark RDDs, Spark transformations enable DStream modifications. Input DStreams support many transformations that are applicable to RDDs, including map, filter, count, countbyvalue, reduce, union, etc. Spark Streaming enables two categories of built-in streaming sources: basic and advanced sources. Basic sources are typically directly available in the StreamingContext API, like file systems, and socket connections. Advanced sources typically include Kafka, Flume, Kinesis, etc. and are available through extra utility classes. This requires linking against extra dependencies via linking utilities [28]. If the application requires multiple streams of data in parallel, multiple DStreams can be created. Multiple receivers, simultaneously receiving multiple data streams, can be created, often requiring allocation of multiple cores to process all receiver’s data [28].

DStream’s data output to external systems, including HDFS, databases or other file systems, utilizes output operations. Output operations trigger the actual execution of the DStream transformations as defined by one of many operations including print, saveAsTextFiles, saveAsObjectFiles, saveAsHadoopFiles, etc. DStreams similar to RDDs execute lazily by the output operations.

The example below illustrates a basic application of Spark Streaming: counting the number of words in text data received from a data server listening on a TCP socket, as adapted from [28].

from pyspark import SparkContext from pyspark.streaming import StreamingContext

# Create a local StreamingContext # batch interval of set to 3 seconds sc = SparkContext(appName= "NetworkWordCount") ssc = StreamingContext(sc, 5) #Create a DStream for the TCP data stream #Specify local host and port number where the system will listen for streaming data MyStream = ssc.socketTextStream("localhost", 9999)

# Split each DStream line into individual words #Utilize flatMap to create new DStream of words wordStream = MyStream.flatMap( lambda line: MyStream.split(" "))

# Count each word per batch

wordPairs = wordStream.map(lambda word: (word, 1)) wordCounts = wordPairs.reduceByKey( _+_ )

# Print the first ten elements of each RDD wordCounts.pprint()

# In order to start the computation ssc.start() # Wait for the computation to terminate ssc.awaitTermination()

# Run Netcat to enable application execution in Spark # Open a socket on port 9999 $ nc -lk 9999 #Check that the port is open $ nc localhost 9999

And type the text you would like to be counted – possible example:

Spark streaming is amazing! This is a great example of a spark streaming application Run Execute Run

#In another terminal run the $ spark-submit mynetworkcount.py localhost 9999

The output on the screen will indicate the number of words counted.

This example illustrates the Spark Streaming process of ingesting data into a Discretized Streaming framework. DStreams enable users to capture the data and perform many different types of computations, as illustrated in this example by a simple word count of the incoming data set. DStreaming and RDDs are a crucial set of building blocks that enable construction of complex streaming applications with Spark and Spark Streaming.

5 Big Data Analytics: Building the Data Pipeline

Several different maturity levels can be considered with regard to Big Data Analytics. There are a number of organizations (DAMM, Gartner, IIA, HIMMS, TDWI, IBM, etc.) that have defined their own version of analytics maturity levels. However, they all agree on three general tiers. All organizations start with raw data and move first to cleaned, standardized, and organized data. They next progress to basic and advanced reporting. Finally, they may finally graduate to building predictive models. This process highlights the levels of sophistication in analytics moving from descriptive, to diagnostic, to predictive, and finally to prescriptive modeling. Descriptive analytics help understand what has happened in the past, while diagnostic analytics looks into reasons of why something might have happened. Predictive analytics techniques build machine learning models to predict what will happen. These models can then be fed into prescriptive models, which take the process directly to decision making and action by recommending what should be done under certain conditions.

5.1 Developing Predictive and Prescriptive Models

John Naisbitt famously said, “We are drowning in data, but starving for knowledge!” A great quote that is made more amazing when one considers that it was made in 1982. His observation rings ever more true today. While the scale of data has changed, the need for skills, tools, and techniques to find meaning in the mayhem of the Big Data world has not. It is costly to collect, store, and secure Big Data properly, and real return on investment (ROI) hinges on the ability to extract actionable information from the data. The field of Data Science is one angle from which to approach the data deluge. Data scientists endeavor to extract meaning and tell the story of the data in order to provide insight and guidance. Data scientists have established technologies that uncover relationships and patterns within large volumes of data that then can be leveraged to predict future behavior and events. For example, the development of predictive modeling techniques utilizing machine learning methods was driven by the necessity to address the data explosion. This technology learns from experience and predicts future outcomes in order to drive better business decisions. It extracts rules, regularities, patterns, and constraints from raw data, with the goal of discovering implicit, previously unknown and unexpected, valuable information from data.

5.2 The Cross Industry Standard Process for Data Mining (CRISP-DM)

The process of moving from raw data to effective models is an iterative and multi-phase one. As discussed in Chap. 5, the CRISP-DM standard, depicted in Fig. 6.20, identifies the six major phases of this data mining process. When approaching predictive model development, it is essential to deeply understand the application domain characteristics. This is the goal of phase one, the Business Understanding phase.

Fig. 6.20
figure 20

CRISP-DM process model

Once the business problem and the overall project goals are fully understood, the project moves into the Data Understanding phase. Creating the proper dataset is the goal of this phase. It may involve bringing together data from different sources and of different types to be able to develop comprehensive models. The rate, quantity, and quality of the data are carefully considered. The execution of this phase may require reconsideration of the business understanding based on data availability, resource limitations, and such.

The data preparation phase is frequently the most time consuming and resource intensive phase of the process. The preprocessing and cleaning of the data undertaken in this phase can require considerable effort and should not be underestimated. Careful, advanced planning of data collection and storage can help minimize the effort expended in this phase.

The modeling phase can be initiated once the data has been sufficiently prepared. However, it is typical for data preparation efforts to continue and/or be revised based on the progress made and insights gained during the modeling process. The modeling phase involves applying one or more data science techniques to the data set in order to extract actionable insight.

Once models are developed (or “trained”) in the modeling phase, the evaluation phase considers the value of the models in the context of the original business understanding. Frequently, multiple iterations through the process are required to arrive at a satisfactory data mining solution.

Finally, the deployment phase addresses the implementation of the models within the organization and completes the process. This may involve multiple personnel and expertise from a wide variety of groups in addition to the data science team.

6 Conclusion

Big Data is fundamentally changing the way organizations and businesses operate and compete. Big data and IoT also share a closely knitted future to offer data-driven analysis and insight. In this chapter, we explained how to build and maintain reliable, scalable, distributed systems with Apache Hadoop and Apache Spark. We also discussed how to utilize Hadoop and Spark for different types of big data analytics in IoT projects, including batch and real-time stream analysis as well as machine learning.