Keywords

These keywords were added by machine and not by the authors. This process is experimental and the keywords may be updated as the learning algorithm improves.

5.1 Introduction

The data mining community has often assumed that performance increase on existing techniques would be given by the continuous improvement of processor technology. Unfortunately, due to physical and economic limitations, it is not recommendable to rely on the exponential frequency scaling of CPUs anymore. Furthermore, the low price and ubiquity of data generation devices not only has led to larger datasets that need to be digested on a timely manner, but also to the growth of dimensionality, categories and formats of the data. Simultaneously, an increasingly heterogeneous computing ecosystem has defined three computing families:

  1. 1.

    Commodity Computing: It encompasses large-scale geographically distributed commodity machine clusters running primarily open source software. Its reliability to host batch processing systems, such as Hadoop [12], and storage systems, such as BigTable [5] or Cassandra [17], across tens of thousands of nodes and petabytes of data, have made commodity computing the foundation of internet-scale companies and the cloud.

  2. 2.

    High Performance Computing/Supercomputing: It refers to centralized multi-million computer systems capable of delivering high throughput for complex tasks that demand large computational power. Typically, these are funded and operated by governments or large corporations, and are utilized for the resolution of scientific problems.

  3. 3.

    Appliance Computing: It refers to highly specialized systems exclusively designed to carry out one or few similar tasks with maximum performance and reliability. These nodes combine state-of-the-art processor, storage and interconnect technologies and cost one order of magnitude less than supercomputers. These computing appliances have been successfully utilized for large-scale analytics and enterprise business intelligence operations.

Under these circumstances, it is required for the research community to investigate the adaptation of classic and novel data intensive algorithms to this heterogeneous variety of parallel computing ecosystems and the technologies that compose them. This adaptation process can be separated into two phases: The Extraction Phase, in which the parallelizable parts, called parallel Tasks, of the algorithm are identified and separated; and the Integration Phase, in which these tasks are implemented for the most suitable parallel computing platform or combination of them.

5.1.1 Extraction Phase

The extraction of parallelism on a data intensive algorithm can be carried out at different levels, with different impacts on performance and increased programming complexity:

  1. 1.

    Independent Runs: This is the most common technique; it simply runs the same algorithm with different configuration parameters on different processing nodes. Each of the runs is independent and parallel execution does not speed up individual runs.

  2. 2.

    Statistical Query & Summation: This technique decomposes the algorithm into an adaptive sequence of statistical queries, and parallelizes these queries over the sample [16]. This approach is satisfactory in speeding up slow algorithms, in which little communication is needed.

  3. 3.

    Structural Parallelism: This technique is based on the exploitation of fine-grained data parallelism [15]. This is achieved by handling each data point with one or few processing threads.

These three techniques are complementary and are often combined to yield maximum performance on a given target parallel computing platform. Successful parallelization transforms a computationally limited problem into a bandwidth bound problem, in which communication between processing units becomes the bottleneck and optimizing for minimum latency gains critical importance. The full exposure of the complexity of parallel programming will result in the largest performance gain.

Individual parallel tasks extracted through both Statistical Queries & Summation and/or Structural Parallelism, can be directly modeled using the MapReduce programming paradigm [8]. The MapReduce framework is illustrated in Fig. 5.1.

Fig. 5.1
figure 1

MapReduce primitives and runtime

The Map and Reduce operators are defined with respect to structured (key, value) pairs. Map (M) takes one pair of data with a type in one domain, and returns a list of pairs in a different domain:

$$\displaystyle\begin{array}{rcl} M\left [k_{1},v_{1}\right ] \rightarrow \left [k_{2},v_{2}\right ]& &{}\end{array}$$
(5.1)

The Map operator is applied in parallel to every item in the input dataset. This produces a list of (k 2, v 2) pairs for each call. Then, the framework collects all the pairs with the same key and groups them together. The Reduce (R) operator is then applied to produce a v 3 value.

$$\displaystyle\begin{array}{rcl} R\left [k_{2},\{v_{2}\}\right ] \rightarrow \left [v_{3}\right ]& &{}\end{array}$$
(5.2)

The advantage of the MapReduce model is that makes parallelism explicit, and more importantly, language or platform agnostic, which allows executing a given algorithm on any combination of platforms in the parallel computing ecosystem. M or R tasks are distributed dynamically among a collection of Workers. The Workers is an abstraction that can represent nodes, processors or Massively Parallel Processor (MPP) devices.

Researchers have focused their effort on the decomposition of Machine Learning algorithms as iterative flows of Map and Reduce tasks. Next, the decomposition of three classic Machine Learning algorithms into flows of Map (M) and Reduce (R) tasks is explained:

  • K-means: The K-means clustering algorithm can be represented as an iterative sequence of (M, R) tasks that run until the stop criteria are met. M represents the assignation of points to clusters and R the recalculation of the cluster centroids. K-means is illustrated in Fig. 5.2.

  • Expectation Maximization: The EM algorithm for Gaussian mixtures is represented by iterations of (M, R, R, R) tasks running until convergence. M corresponds to the E-step of the algorithm, while (R, R, R) correspond to the M-step that calculates the mixture weights a i , means \(\bar{\mu _{k}}\) and covariance matrices Σ k , respectively. EM is illustrated in Fig. 5.3.

  • Support Vector Machine: The resolution of the dual SVM problem using the Sequential Minimal Optimization (SMO) [20] is represented by iterations of (M, M, R, M) tasks running until convergence. These tasks reproduce the identification of the two Lagrange multipliers to be optimized in each iteration, and their analytic calculation. The SVM is illustrated in Fig. 5.4.

Fig. 5.2
figure 2

Decomposition of K-means into MapReduce tasks

Fig. 5.3
figure 3

Decomposition of EM using Gaussian mixtures into MapReduce tasks

Fig. 5.4
figure 4

Decomposition of SVM into MapReduce tasks

5.1.2 Integration Phase

The integration of parallel MapReduce tasks into diverse computing platforms spans a wide and heterogeneous variety of parallel system architectures. Originally, internet-scale companies decomposed indexing and log-processing jobs into Map and Reduce tasks that were executed in batches on top of a distributed file system hosted by hundreds or thousands of commodity nodes. Its proven reliability in production, along with its symbiosis towards virtualized environments, led the MapReduce model to be one of the key data processing paradigms of cloud service infrastructures. Research initiatives have investigated the applicability of this model in scientific environments and enterprise analytics, and have tested the implementation of MapReduce tasks on alternative platforms, such as multicore or MPPs (GPUs, Cell microprocessors or FPGAs), aiming to boost the performance of computationally expensive jobs.

In this chapter, a hybrid solution that boosts the computational throughput of commodity nodes is proposed, based on the integration of multiple MPPs into the MapReduce runtime. For this purpose, a programming model to orchestrate MPPs is developed. In order to test the computational capabilities of this solution, a multiclass Support Vector Machine (SVM) is implemented for this hybrid system and its performance results for large datasets reported.

The rest of this chapter is organized as follows: Sect. 5.2 reviews previous initiatives that accelerate the execution of data intensive MapReduce jobs, either by optimizing the cluster runtimes or exploiting the capabilities of massively parallel platforms. Section 5.3 enumerates the research contributions presented by this work. Our proposed unified heterogeneous architecture, is described in Sect. 5.4. The decomposition of the SVM problem into MapReduce tasks and its integration into the GPU cluster architecture is explained in Sect. 5.5. Section 5.6 contains details of the performance gain provided by our massively threaded implementation. The conclusions of this work are presented in Sect. 5.7.

5.2 Related Work

In general, the research efforts for the performance improvement of large-scale Machine Learning algorithms, expressed as MapReduce jobs, can be classified into two families, namely cluster category and multiprocessor category.

Cluster Category: These efforts focus on the adaptation of the cluster runtime to satisfy the particular needs of Machine Learning algorithms and facilitate their integration into clusters. These needs have been identified as: (1) support for iterative jobs, (2) static and bariable data types, and (3) dense and sparse BLAS operations. Ekanayake et al. [10] presented Twister, a modified runtime that accommodates multiple Machine Learning algorithms by supporting long-running stateful tasks. Ghoting et al. [11] designed System-ML, a declarative language to express Machine Learning primitives and simplify direct integration into clusters. The Apache Mahout [1] project compiled a library of the most popular MR-able algorithms for the standard Hadoop implementation of MapReduce.

Multiprocessor Category: In this category MapReduce jobs are scattered and gathered among multiple processing cores on a shared-memory multiprocessor device or multiple devices hosted by interconnected nodes. Typically the processing units in these systems are constructed to run tens of threads simultaneously reducing the load of MapReduce tasks assigned to each thread, while increasing the degree of parallelism. Communication between cores is carried out through the shared-memory hierarchy. Popular systems of this category are Phoenix (multicore) Phoenix (multicore) system[27], Mars (GPU)Mars (GPU) system [13], CellMR (Cell) CellMR (cell) system [21] and GPMR [22].

The hybrid MapReduce runtime proposed in this chapter is unique in the sense that it combines the best of both worlds to deliver an efficient framework that meets the specific needs of Machine Learning algorithms, and produces up to two orders of magnitude of acceleration using massively threaded hardware.

Particularly for the case of SVM implementations for shared-memory multiprocessors, Chu et al. [23] provide an SVM solver for multicore based on MapReduce jobs obtained through Statistical Query & Summation. Similarly, researchers have been focused on the GPU adaptation of dual form SVMs for binary and multiclass classification [3, 14]. Specifically for the case of SVM implementations in clusters, Chang et al. [6] provide the performance and scalability analysis of a deployment of their PSVM algorithm on Google’s MapReduce infrastructure.

5.3 Research Contributions

Typically, both system categories, Cluster and Multiprocessor, have shown complementary characteristics. Batch processing systems running on commodity clusters provide high reliability through redundancy and near-linear scalability by adding nodes to the cluster for a low cost. Nevertheless, by nature, its intensive cross-machine communication leads to higher latencies and increased complexity for computer cluster administrators. On the contrary, shared-memory multiprocessors do not have any built-in reliability mechanism and their scalability is limited by the number of processing cores and the capacity of the memory hierarchy in place. In these devices, latencies in cross-processor communication are orders of magnitude lower, and single-node execution drastically reduces the system administration complexity. Ideally, a unified system including the benefits of both solutions and meeting the needs of Machine Learning algorithms is desired, in order to execute these algorithms on large scale datasets and obtain the results on a timely manner.

The authors of this chapter believe that both categories can be merged to create a unified heterogeneous MapReduce framework and increase the computational throughput of individual nodes. The contributions of this new hybrid system are the following:

  1. 1.

    Runtime Adaptation: The original MapReduce runtime was not designed specifically for Machine Learning algorithms. Even though libraries, such as Mahout [1], implement a variety of classic algorithms, the framework has inherent inefficiencies that prevent it from providing timely responses. Our proposed hybrid design integrates a series of modifications to accommodate some of the common needs of Machine Learning algorithms. These runtime modifications are introduced next:

    • Iterative MapReduce Jobs: Most Machine Learning algorithms are iterative. The state of the algorithm is maintained through iterations and is reutilized towards the resolution of the problem in each step. Like Twister [10], our solution enables executing long-running iterative jobs that keep a state between iterations.

    • Static and Variable Data Support: Most iterative Machine Learning algorithms define two types of data: static and variable. Static data is read-only and is utilized in every iteration, while variable data can be modified and is typically of smaller size. In order to minimize data movements and memory transfers, our runtime allows specifying the nature of the data.

    • Dense & Sparse BLAS: The execution of a task may require as input the results of a dense or sparse BLAS operation. Our solution enables interleaving massively threaded BLAS operations to prepare the input data of M and R steps.

  2. 2.

    MPP Integration: As opposed to Mars [13], Phoenix [27] and CellMR [21], which were constructed to run MapReduce jobs within a single isolated multiprocessor and not designed to scale out, our solution takes a different approach based on the integration of MPPs into the existing MapReduce framework as coprocessors. GPMR [22] follows the same direction, but keeping the same runtime and not optimizing it for Machine Learning algorithms.

  3. 3.

    MPP Orchestration: A programming model to manage multiple MPPs towards the execution of MapReduce tasks is presented. We use an abstraction, called Asynchronous Port-based Programming, which allows creating coordination primitives, such as Scatter-Gather.

  4. 4.

    Massively Threaded SVM: While implementations of SVM solvers for multiprocessors and clusters provided satisfactory performance as part of isolated experiments, to the best of our knowledge, this work pioneers the execution of a multiclass SVM on a topology of multiple MPPs intertwining tens of CPU threads and thousands MPP threads collaboratively towards an even faster resolution of the SVM training problem.

5.4 A Unified Heterogeneous Architecture

In this section we provide an overview of the foundations of MapReduce-based batch processing systems. We take the Hadoop architecture as a reference due to its popularity and public nature. First, we explain the characteristics and principles of operation of currently existing data processing nodes, called Data Nodes (DN). Then, we proceed to introduce our modifications by integrating more powerful nodes composed by multiple Massively Parallel Processor (MPP) devices; we call these nodes MPP Nodes (MPPN). DNs and MPPNs may coexist within a MapReduce cluster, nevertheless, they are meant to address MapReduce jobs with different requirements: DNs should work on batch, high latency jobs, whereas MPPNs would take responsibility of compute intensive jobs.

5.4.1 MapReduce Architecture Background

Fig. 5.5
figure 5

The MapReduce architecture

Typically, the architecture of MapReduce and MapReduce-like systems consists of two layers: (i) a data storage layer in the form of a Distributed File System (DFS) responsible of providing scalability to the system and reliability through replication of the files, and (ii) a data processing layer in the form of a MapReduce Framework (MRF) responsible of distributing and load balancing tasks across nodes. Files in the DFS are broken into blocks of fixed size and distributed among the DNs in the cluster. The distribution and load balancing is managed centrally in a node called NameNode (NN). The NN does not only contain metadata about the files in the DFS, but also manages the replication policy. The MRF follows a master-slave paradigm. There is a single master, called JobTracker, and multiple slaves, called TaskTrackers. The JobTracker is responsible of scheduling MapReduce jobs in the cluster, along with maintaining information about each TaskTracker’s status and task load. Each job is decomposed into MapReduce tasks that are assigned to different TaskTrackers based on locality of the data and their status. In general, the output of the Map task is materialized to the disk before proceeding to the Reduce task. The Reduce task may get shuffled input data from different DNs. Periodically, TaskTrackers sent a heartbeat to the JobTracker to keep it up to date. Typically, TaskTrackers are single or dual threaded and consequently, can launch one or two Map or Reduce tasks simultaneously. Hence, each task is single-threaded and work on a single block point by point sequentially. The architecture is illustrated in Fig. 5.5.

5.4.2 MPP Integration

We propose the addition of massively parallel processors in order to increase the computational capabilities of the DNs. Currently, DNs use a single thread to process the entire set of data points confined on a given block. This is shown in Fig. 5.6. Parallelism is achieved through the partitioning of data into blocks and the concurrent execution of tasks on different nodes, nevertheless, this setup does not leverage fine-grained parallelism, which can be predominant on data mining algorithms. Fortunately, the introduction of MPPs enables MapReduce tasks to be carried out by hundreds or thousands of threads, giving to each thread one or few data points to work with. This is described in Fig. 5.7. The main differences between DNs and MPPNs are the following:

Fig. 5.6
figure 6

Data node (DN)

  • Multithreading: In DNs the TaskTracker assigns the pair (Task, Block) to a single core. Then, the thread running on that core executes the MapReduce function point by point in the block sequentially. On the contrary, in MPPNs the TaskTracker assigns the pair (Task, Block) to a massively threaded multiprocessor device. Then the device launches simultaneously hundreds or thousands of threads that will execute the same task on multiple data points simultaneously.

    Fig. 5.7
    figure 7

    Massively parallel processor node (MPPN)

  • Pipelining: In DNs the intermediate result generated by the Map task is materialized by writing the result locally in the node. Before the execution of the Reduce task, the intermediate result is read from the disk and possibly transmitted over the network to a different DN as part of the shuffling process. On the contrary, MPPNs do not materialize the intermediate result. The output of the Map task is kept on the MPP memory and, if necessary, is forwarded to a different device as part of the shuffling process.

  • Communication: In DNs the shuffling process requires slower cross-machine communication leading to increased latency between MapReduce operators. On the contrary, the shuffling process in MPPNs is carried out through message passing between host CPU threads.

  • Iteration: In DNs a job is terminated after the conclusion of the R step. Any additional iteration would be executed as an independent job. MPPNs provide support for iterative algorithms allowing repeatable tasks to be part of the same long-running iterative job.

5.4.3 MPP Orchestration

In general, DNs running on commodity hardware are single or dual threaded. Each CPU thread operates on a different data block, and since the results of each task are materialized to the disk, synchronization between CPU threads is not necessary. Nevertheless, the introduction of MPPs into data nodes requires the interaction of two different threading models, the classic CPU threads and the MPP threads.

As opposed to CPU threads, which are heavy, MPP threads are lighter, they have fewer registers at their disposal and will be slower, but can be launched simultaneously in groups toward the execution of the same task. Furthermore, the fact that MPP threads will run distributed across multiple devices within the same node, raises a challenge not only on the efficient coordination of thousands of these threads towards the collaborative execution of an algorithm, but also on the responsiveness and error handling of the devices running these threads.

In this section, we propose an event-driven model to orchestrate both CPU and MPP threads towards the execution of MapReduce tasks. Unlike ordinary event-driven libraries, which usually directly build upon the asynchronous operations, the method proposed in this chapter is based on the principles of Active Messages [25] and the abstraction layer provided by the Concurrency and Coordination Runtime (CCR) [7]. These abstractions are: the Port, the Arbiter and the Dispatcher Queue.

Fig. 5.8
figure 8

Port abstraction and its components

Figure 5.8 illustrates the three abstractions. The Port is an event channel in which messages are received. Posting a message to a port is a non-blocking call and the calling CPU thread continues to the next statement. The Arbiter decides which registered callback method should be executed to consume the message or messages. Once the method is selected, the arbiter creates the pair (Task, Block), which is passed to the Dispatcher Queue associated to the port. This is an indirection that enables the creation of high-level coordination primitives. Some of the possible primitives are discussed later in this section. Each port is assigned a Dispatcher Queue and multiple ports can be associated with the same Dispatcher Queue. The Dispatcher Queue consists of a thread pool composed by one or more CPU threads. Available threads pick (Task, Block) pairs passed by the Arbiter and proceed to the execution of the task on the corresponding data block in the MPP. The MPP is stageful. It keeps a state in the memory of the device (DState) across iterations to minimize memory transfers. If necessary, it synchronizes with the state in the host memory (HState).

Next, some of the coordination primitives that can be constructed using these three abstractions are introduced:

  • Single Item Receiver: It registers callback X to be launched when a single message of type M is received in Port A.

  • Multiple Item Receiver: Registers callback X to be launched when n messages are received in Port A. p messages will be of type M (success) and q messages of exception type (failures), so that \(p + q = n\).

  • Join Receiver: Registers callback X to be launched when one message of type M is received in Port A and another in Port B.

  • Choice: Registers callback X to be launched when one message of type M is received in Port A and registers callback Y to be launched when one message of type N is received in Port A.

In the context of the MapReduce framework, these abstractions are utilized to construct a Scatter-Gather mechanism in which a master CPU thread distributes MapReduce tasks among available MPP devices, and, upon termination, these return the control and the results back to the master thread. Each MPP device will have a Port instance for every type of MapReduce task, a single Arbiter and a single Dispatcher Queue. The Arbiter will register each Port following the Single Item Receiver primitive with the assigned callback method that represents the MapReduce task. Requests to launch a task will contain a pointer to the data block to be manipulated and the response port in which all the responses need to be gathered. The callback method will contain the invocation of the computing kernel, which spawns hundreds of MPP threads that operate the data simultaneously. The response Port follows a Multiple Item Receiver primitive and is registered to launch a callback method in the master thread when all the devices have answered.

This Scatter-Gather mechanism is illustrated in Fig. 5.9. One of the key benefits of this event-driven model is that not only enables coordinating multiple MPP devices towards the execution of MapReduce tasks, but also deals with the potential failure of any device, which is fundamental to conserve the robustness of the MapReduce framework.

Fig. 5.9
figure 9

Scatter-Gather using ports and MPPs

5.5 Massively Multithreaded SVM

In order to investigate the performance of the architecture proposed in Sect. 5.4, in this section we take the SVM classifier, explore its decomposition into a MapReduce job, and launch it on a MPPN composed by multiple MPP devices. First, we provide a brief introduction to the SVM classification problem. Second, we describe the MapReduce job that solves the training phase on a single MPP device. Third, we coordinate various MPP devices using the Scatter-Gather primitive to solve a larger classification problem. Figures in this section hide the Port, Arbiter, and Dispatcher Queue components to facilitate the understanding of MapReduce task sequences.

5.5.1 Binary SVM

The binary SVM classification problem is defined as follows: Find the classification function that, given l examples \(\left (\bar{x}_{1},y_{1}\right ),\ldots,(\bar{x}_{l},y_{l})\) with \(\bar{x}_{i} \in {R}^{n}\) and \(y_{i} \in \left \{-1,1\right \}\;\forall i\), predicts the label of new unseen samples \(\bar{z}_{i} \in {R}^{n}\). This is achieved by solving the following regularized learning problem, where the regularization is controlled via C.

$$\displaystyle{ \min _{f\in H}C\sum _{i=1}^{l}\left (1 - y_{ i}f(\bar{x_{i}})\right )_{+} + \frac{1} {2}\left \|f\right \|_{k}^{2}, }$$
(5.3)

where \((k)_{+} =\max (k,0)\). Then slack variables ξ i are introduced to overcome the problem introduced by its non-differentiability:

$$\displaystyle{ \min _{f\in H}C\sum _{i=1}^{l}\xi _{ i} + \frac{1} {2}\left \|f\right \|_{k}^{2} }$$
(5.4)

subject to: \(y_{i}f(x_{i}) \geq 1 -\xi _{i}\) and ξ i  ≥ 0, i = 1, , l. The dual form of this problem is given by:

$$\displaystyle{ \max _{\alpha \in {R}^{l}}\sum _{i=1}^{l}\alpha _{ i} -{\frac{1} {2}\alpha }^{T}K\alpha }$$
(5.5)

subject to: \(\sum _{i=1}^{l}y_{i}\alpha _{i} = 0\) and 0 ≤ α i  ≤ C, i = 1, , l, where \(K_{ij} = y_{i}y_{j}k\left (\bar{x}_{i},\bar{x}_{j}\right )\) is a kernel function. Equation 5.5 is a quadratic programming optimization problem and its solution defines the classification function:

$$\displaystyle{ f(x) =\sum _{ i=1}^{l}y_{ i}\alpha _{i}k\left (\bar{x},\bar{x}_{i}\right ) + b, }$$
(5.6)

where b is an unregularized bias term.

5.5.2 MapReduce Decomposition of the SVM

The binary SVM problem can be solved using the Sequential Minimal Optimization (SMO) algorithm [20]. SMO converts the dual form of the SVM problem into a large scale Quadratic Programming (QP) optimization that can be solved by choosing the smallest optimization problem at every step, which involves only two Lagrange multipliers \((\alpha _{I_{low}},\alpha _{I_{up}})\). For two Lagrange multipliers the QP problem can be solved analytically without the need of numerical QP solvers. Next, we present SMO as an iterative sequence of MapReduce operators.

First, there are two consecutive Map operators. The first Map updates the values of the classifier function f i based on the variation of the two Lagrange multipliers, \(\varDelta \alpha _{I_{low}} =\alpha _{ I_{low}}^{'} -\alpha _{ I_{low}}\), \(\varDelta \alpha _{I_{up}} =\alpha _{ I_{up}}^{'} -\alpha _{ I_{up}}\), their label values \(\left (y_{I_{up}},y_{I_{low}}\right )\) and their associated kernel evaluations:

$$\displaystyle\begin{array}{rcl} f_{i}^{'} = f_{ i}& +& \varDelta \alpha _{I_{up}}y_{I_{up}}k(\bar{x}_{I_{up}},\bar{x}_{i}) \\ & +& \varDelta \alpha _{I_{low}}y_{I_{low}}k(\bar{x}_{I_{low}},\bar{x}_{i}){}\end{array}$$
(5.7)

with i = 1… l. The initialization values for the first Map of the iterative sequence are: \(f_{i} = -y_{i}\), \(\varDelta \alpha _{I_{up}} =\varDelta \alpha _{I_{low}} = 0\), \(\alpha _{I_{low}} =\alpha _{I_{up}} = 0\), \(I_{low} = I_{up} = 0\).

$$\displaystyle\begin{array}{rcl} M\left [i,f_{i}\right ] \rightarrow \left [i,f_{i}^{'}\right ]& &{}\end{array}$$
(5.8)

The second Map classifies the function values \(f_{i}^{'}\) into two groups, k up and k low , according to these filters, in which C is the regularization parameter, \(k_{i} \in k_{up},k_{low}\).

$$\displaystyle\begin{array}{rcl} I_{0} =& & \left \{i: y_{i} = 1,0 <\alpha _{i} < C\right \} \cup \\ & &\left \{i: y_{i} = -1,0 <\alpha _{i} < C\right \}{}\end{array}$$
(5.9)
$$\displaystyle\begin{array}{rcl} I_{1} =& & \left \{i: y_{i} = 1,\alpha _{i} = 0\right \}{}\end{array}$$
(5.10)
$$\displaystyle\begin{array}{rcl} I_{2} =& & \left \{i: y_{i} = -1,\alpha _{i} = C\right \}{}\end{array}$$
(5.11)
$$\displaystyle\begin{array}{rcl} I_{3} =& & \left \{i: y_{i} = 1,\alpha _{i} = C\right \}{}\end{array}$$
(5.12)
$$\displaystyle\begin{array}{rcl} I_{4} =& & \left \{i: y_{i} = -1,\alpha _{i} = 0\right \}{}\end{array}$$
(5.13)
$$\displaystyle\begin{array}{rcl} k_{up} = \left \{i \in I_{0} \cup I_{1} \cup I_{2}\right \}& &{}\end{array}$$
(5.14)
$$\displaystyle\begin{array}{rcl} k_{low} = \left \{i \in I_{0} \cup I_{3} \cup I_{4}\right \}& &{}\end{array}$$
(5.15)
$$\displaystyle\begin{array}{rcl} M\left [i,\alpha _{i}\right ] \rightarrow \left [i,k_{i}\right ]& &{}\end{array}$$
(5.16)

The Reduce operator takes the list of values generated by the Maps and applies a different reduction operator based on the group they belong to. For k up min and \(\arg \min\) are used, while k low requires max and argmax.

$$\displaystyle\begin{array}{rcl} & & b_{up} =\min \left \{f_{i}: k_{i} = k_{up}\right \}{}\end{array}$$
(5.17)
$$\displaystyle\begin{array}{rcl} & & I_{up} =\arg \min _{k_{i}=k_{up}}f_{i}{}\end{array}$$
(5.18)
$$\displaystyle\begin{array}{rcl} & & b_{low} =\max \left \{f_{i}: k_{i} = k_{low}\right \}{}\end{array}$$
(5.19)
$$\displaystyle\begin{array}{rcl} & & I_{low} =\arg \max _{k_{i}=k_{low}}f_{i}{}\end{array}$$
(5.20)

The indices \(\left (I_{up},I_{low}\right )\) indicate the Lagrange multipliers that will be optimized.

$$\displaystyle\begin{array}{rcl} R\left [k,\{f_{i}\}_{k_{i}=k}\right ] \rightarrow \left [b,I\right ]& &{}\end{array}$$
(5.21)

The last Map uses these indices to calculate the new Lagrange multipliers:

$$\displaystyle\begin{array}{rcl} \alpha _{I_{up}}^{' } =\alpha _{I_{up}} -\frac{y_{I_{up}}(f_{I_{low}} - f_{I_{up}})} {\eta } & &{}\end{array}$$
(5.22)
$$\displaystyle\begin{array}{rcl} \alpha _{I_{low}}^{' } =\alpha _{I_{low}} + s(\alpha _{I_{up}} -\alpha _{I_{up}}^{' })& &{}\end{array}$$
(5.23)

where

$$\displaystyle\begin{array}{rcl} & s =& y_{I_{up}}y_{I_{low}} \\ & \eta = & 2k(\bar{x}_{I_{low}},\bar{x}_{I_{up}})-{}\end{array}$$
(5.24)
$$\displaystyle\begin{array}{rcl} & & k(\bar{x}_{I_{low}},\bar{x}_{I_{low}}) - k(\bar{x}_{I_{up}},\bar{x}_{I_{up}}){}\end{array}$$
(5.25)
$$\displaystyle\begin{array}{rcl} M\left [i,\alpha _{i}\right ] \rightarrow \left [i,\alpha _{i}^{'}\right ]& &{}\end{array}$$
(5.26)

Convergence is achieved when \(b_{low} < b_{up} + 2\tau\), where τ is the stopping criteria.

5.5.3 Single-MPP Device SVM

As we advanced in Sect. 5.4.2, unlike single or dual core based MapReduce-like systems, MPP devices can carry out multithreaded MapReduce tasks. For the case of the single-device SVM, the data block provided by the TaskTracker represents the entire training dataset. This data block is further split into subblocks that are passed to the processors in the device. Typically, each processor can run several threads simultaneously, which enables a large number of Map or Reduce tasks being executed in parallel. Figure 5.10 schematically shows the flow of MapReduce tasks on a MPP device. Two versions of the SVM MapReduce job were constructed, one for each data structure type: dense and sparse. In general, sparse data structures can reduce memory utilization and data transfer times, which benefits communication within the MapReduce framework. Nevertheless, the performance of sparse algebraic operations in the MPP directly depends on the degree of sparsity of the data and the effect might be adverse, since the duplication of memory accesses caused by the additional indirection can have a negative effect on performance. The dense and sparse matrix-vector multiplications on MPPs used in this work are based on Bell et al. [2] and Vazquez et al. [24], respectively. Their impact on the SVM speed is reflected in Sect. 5.6.3.

Fig. 5.10
figure 10

Binary SVM decomposed into MapReduce tasks

5.5.4 Multiple-MPP Device SVM

In Sect. 5.5.3 a data block representing the entire training set was forwarded to one MPP device where MapReduce tasks would be executed iteratively until convergence, without any interaction with other devices. In this section we enhance the decomposition of MapReduce tasks to be able to break the SVM problem into multiple MPP devices. Figure 5.11 describes the interactions between four MPP devices to solve a single SVM problem. The training dataset is split into four data blocks stored in the distributed file system. The TaskTracker, that manages the master thread, forwards the corresponding block to each device. Each device performs the Map operator and a local Reduce on its local data block. The results of the reduce are gathered by the master thread, which carries out a Global Reduce in order to find \(\left (b_{up},I_{up}\right )\) and \(\left (b_{low},I_{low}\right )\). Then these values are scattered to the devices in order to update the lagrange multipliers \(\left (\alpha _{I_{up}}^{'},\alpha _{ I_{low}}^{'}\right )\). Finally, the master thread synchronizes, checks for convergence, and if required, proceeds to scatter the next Map task to the MPP group.

Fig. 5.11
figure 11

Multiple-MPP device SVM

5.6 Implementation and Experimental Results

In this section we provide implementation details and performance results for the MapReduce jobs presented in Sect. 5.5, along with the incremental performance gain from one method to another. As a baseline for comparison, we take a popular SVM solver, LIBSVM [4], which is a single-threaded version of the SMO algorithm. Then, we compare LIBSVM to the SVM algorithm running on the standard Hadoop platform. Having evaluated these two popular options, we proceed to assess the performance boost obtained from the inclusion of GPUs in MapReduce cluster nodes. Throughout all the experiments the same SVM kernel functions \(k(\bar{x_{i}},\bar{x_{j}})\), regularization parameter C, and stopping criteria τ were used.

5.6.1 Datasets

SVM training performance comparisons are carried out over five publicly available datasets, WEB [20], MNIST [18], RCV1 [19], PROTEIN [26] and SENSIT [9]. These datasets were chosen based on their computational complexity since they have hundreds of features per data sample. The sizes of these datasets and the parameters used for training are indicated in Table 5.1. The Radial Basis kernel, \(k\left (\bar{x}_{i},\bar{x}_{j}\right ) = {e}^{-\beta {\left \|\bar{x}_{i}-\bar{x}_{j}\right \|}^{2} }\), was used for the training phase throughout all the experiments, as well as τ = 0. 001. Multiclass datasets, such as MNIST, RCV1, PROTEIN and SENSIT are decomposed into binary SVM problems following the One-vs-All (OVA) output code. Then, the resulting collection of binary SVMs is solved in parallel as independent MapReduce jobs.

Table 5.1 Datasets

5.6.2 Implementation and Setup

The measurements collected in the next subsection (i.e. Sect. 5.6.3) were carried out in a single machine with a dual socket Intel Xeon E5520 @ 2.26 GHz (8 cores, 16 threads) and 32 GB of RAM.

Hadoop Setup: Using this machine as a host, the SVM algorithm running on Hadoop was executed on 4 Virtual Machines (VMs), with a single core and 4 GB of RAM each. The host ran the Master Node, which contained the NameNode and the JobTracker, while the four VMs ran the DataNodes with the TaskTrackers.

Multiprocessor Setup: This machine also accommodated four GPUs. The multiprocessors utilized are NVIDIA Tesla C1060 GPUs with 240 Stream Processors @ 1.3 GHz. Each GPU has 4 GB of memory and a memory bandwidth of 102 GB/s. Similar to the Hadoop case, the host machine ran the Master Node, which contained the NameNode with the JobTracker, and a MPP Node with four GPU devices. The computing kernels representing MapReduce tasks were implemented using NVIDIA CUDA.

The distribution of resources across different experiments is summarized in Table 5.2. a + b represents a master threads and b device threads. We denote SD to the single-GPU experiment and MD to the multi-GPU experiment.

Table 5.2 SVM experiments
Table 5.3 Performance results for SVM training

5.6.3 Experimental Results

In this subsection we provide the performance gain obtained by each architecture/MapReduce task flow compared to the reference implementation for all the datasets: WEB, MNIST, RCV1, PROTEIN and SENSIT. For each of the experiments we present its training time, the measured acceleration with respect to the reference implementation and the accuracy obtained from testing the calculated Support Vectors (SVs) with the test dataset. These results are collected in Table 5.3. The acceleration of the testing phase falls out of the scope this work due to its triviality.

The execution of the Map and Reduce operators, introduced in Sect. 5.5.3, on the standard Hadoop infrastructure yielded a modest performance improvement in the range of (1. 20 ×−3. 07×) when compared to LIBSVM. Nevertheless, the results obtained from running these same operators on a SD SVM produced an order of magnitude of acceleration in the range of (15. 13 ×−60. 61×), which is consistent with the values obtained by Catanzaro et al. [3] and Herrero-Lopez et al. [14]. Scaling out the problem to four GPUs (MD SVM) and using the GPU orchestration model presented in this paper outperformed all the previous solutions producing an overall acceleration in the range of 29. 79 ×−192. 75×. These results also show that the use of sparse data structures is beneficial for cases with high degree of sparsity (WEB and RCV1), while results adverse for the rest. The execution of the sparse MD SVM on the WEB dataset produced a 1. 28× gain compared to the dense MD SVM on the same dataset, while the SVM for the RCV1 dataset could not be solved on its dense SVM versions nor single device SVM form since data structures would not fit in the GPU memory. The SVM for the RCV1 dataset was solved only for the sparse MD SVM version, which produced the highest acceleration (192. 75×) for this set of experiments. Finally, it is necessary to point out that no accuracy loss was observed and that the same classification results were obtained on all the testing datasets across all the different systems.

5.7 Conclusions and Future Work

In this chapter, our goal was to accelerate the execution of Machine Learning algorithms running on a MapReduce cluster, while maintaining the reliability and simplicity of its infrastructure. For this purpose, we integrated massively threaded multiprocessors into the nodes of the cluster, and proposed a concurrency model that allows orchestrating host threads and thousands of multiprocessor threads spread throughout different devices so as to collaboratively solve MapReduce jobs. In order to verify the validity of this system, we decomposed the SVM algorithms into MapReduce tasks, and created a combined solution that distills the maximum degree of fine-grained parallelism. The execution of the SVM algorithm in our proposed system yielded an acceleration in the range of 29. 79 ×−192. 75×, when compared to LIBSVM and in the range of 15. 68 ×−91. 83×, when compared to the standard Hadoop implementation. To the best of our knowledge this is the shortest training time reported on these datasets for a single machine, without leaving commodity hardware nor the MapReduce paradigm. In the future, it is planned to explore the possibility of maximizing the utilization of the GPUs in the MPP Node through the execution of multiple MapReduce tasks concurrently in each device.