1 Introduction

MapReduce [1] is a flexible programming model proposed by Google for processing and creating data sets over a cluster of computers. The MapReduce model hides extraneous details inherent in distributed programming such as parallelization, fault tolerance, data distribution and load balancing within a library. This simplifies the process of writing distributed programs, which is an advantage MapReduce has over other distributed programming models such as MPI that requires the programmer to explicitly handle the data flow [2].

Programmers who use the MapReduce library need to write two functions, a map function and a reduce function. The purpose of the map function is to take the input key/value pairs from an input source, process it, and then produce a set of intermediate key/value pairs. The intermediate key/value pairs it generates is then sent to a reduce function as input. The reduce function processes this input and then generates its own set of key/value pairs as output.

Parallelism is achieved by running multiple map and reduce functions on multiple processors or machines. The intermediate key/value pairs produced by each of the map functions are partitioned so that intermediate key/value pairs that share the same key are all sent to the same reduce function. The reduce function then processes each key alongside a list of associated values.

Since its conception by Google, MapReduce has had many adopters in industry and academia. One of the more well-known adopters is Yahoo, who developed an open source implementation known as Hadoop [3]. Hadoop is a Java-based implementation, which by default uses its own distributed file system (HDFS). Because Hadoop is open source, well documented and easy to use, the tool has gained prominence in the distributed programming community. For this reason, we use Hadoop as our reference platform for MapReduce in this paper.

The MapReduce model is effective at processing large amounts of data or datasets. A dataset is essentially a set of tuples stored in a file. Most computing devices can generate datasets, and the data can be about almost anything. Examples of data sources include log files [4], sensors [5] and social media [6]. In this paper, we look at one of the more common data processing operations called a join. A join combines two or more datasets together based on some common value. There are many possible ways to implement a join. The efficiency of a join implementation depends on how many data sets there are and how large the data sets are. A MapReduce join can be implemented as a map-side join or a reduce-side join and multiple datasets may be handled using either a cascade join or a multiway join [7].

Cascade Joins handle multiple joins using successive two-way joins. In other words, a cascade join is performed by joining datasets two at a time. Essentially, a cascade join is an iterative version of a two-way join. The main advantages of a cascade join is that it can handle any number of datasets of any size so long as they can all be stored on the HDFS. With n tables, T\(_{1}\),T\(_{2}\),T\(_{3,\ldots ,}\)T\(_\mathrm{n}\), table T\(_{1}\) and table T\(_{2}\) are joined in one job. The table created by this join is then joined with T\(_{3}\). This continues until all the tables are joined. Using relational algebra this can be expressed as:

$$\begin{aligned} (\ldots (((\text{ T }_1\;\bowtie \;\text{ T }_2)\;\bowtie \;\text{ T }_3)\ldots \;\bowtie \;\text{ T }_{\text{ n }-1})\;\bowtie \;\text{ T }_\mathrm{n} \end{aligned}$$
(1)

Multiway joins handle multiple joins simultaneously. Using relational algebra a multiway join can be expressed as:

$$\begin{aligned} \text{ T }_1 \;\bowtie \;\text{ T }_2 \;\bowtie \;\text{ T }_3 \;\bowtie \;\ldots \;\bowtie \;\text{ T }_{\text{ n }-1} \;\bowtie \;\text{ T }_\mathrm{n} \end{aligned}$$
(2)

Multiway joins have certain advantages and disadvantages over cascade joins. First, it avoids considerable overhead since it does not need to setup multiple jobs. Second, it can save space on the network since it does not need to store intermediate results. However, there are some drawbacks to multiway joins. When a multiway join is performed, it needs to buffer tuples. This can lead to memory problems, especially if the data is skewed. Therefore, the number of datasets and the size of datasets are limited by the memory resources available.

The main idea of SmartJoin is to improve processing time of a multiway join by dynamically redistributing the workload between reducers. Furthermore, it does this by considering network topology during workload redistribution.

The main contributions of our work are as follows. First, we present a model to redistribute tuples amongst reducers on the MapReduce framework for a multiway join. Second, we show how the SmartJoin redistribution algorithm can reduce job response times for a multiway join by considering network distance and reducer workload. Third, we compare our method to alternative methods that do not take into account these factors.

The rest of this paper is organized as follows. Section 2 explains our network model. Section 3 presents the proposed techniques on multiway joins and tuple redistribution. In Sect. 4, the simulation results and performance analysis are given to weigh the pros and cons of the proposed method. In Sect. 5, we discuss related work. Finally, the conclusion and future work are presented in Sect. 6.

2 Background

2.1 Network model

The research model for this study is presented in Fig. 1, which shows a network environment consisting of switches, racks and nodes.

Fig. 1
figure 1

Network model used in this paper a a tree network consisting of racks and nodes. b A node running a set of reduce tasks

The two-level tree topology shown in Fig. 1a is a common network layout used by Hadoop. Each rack contains a set of servers (nodes) all interlinked by a switch. The racks themselves then uplink to a core switch or router. The nodes run map or reduce tasks as shown in Fig. 1b. In this paper, map tasks and reduce tasks are also referred to as mappers and reducers respectively.

The purpose of the MapReduce model is to process large amounts of data over a cluster of computers. This requires sending data between nodes. The rate one can send data over the network is restricted by the bandwidth.

In Hadoop, the distance between two nodes is calculated as the sum of the distances to the lowest common ancestor in the network tree. Although the structure of the tree is not predetermined, it is common to assign the levels of the tree based on the data center, rack and node on which a process resides. The purpose of this model is to reflect the way bandwidth decreases between processes running on the same node, nodes on the same rack, nodes on different racks and potentially nodes in different clusters.

Three different network scenarios may occur in a data center. These scenarios can be expressed using the notation N\(_{(i,j)}\) which represents node\( i \)on rack \(j\). Table 1 describes the scenarios that may arise on a network with a two-level tree topology, as shown in Fig. 1, and the subsequent network distances.

Table 1 Network distance

Based on the network distance one can determine the relative distance between two processes residing on a network. If a process resides on the same node and same rack as another process its network distance is 0, since both processes reside on the same node. If a process resides on a different node and on the same rack, its distance is 2. If a process resides on a different node and on a different rack its distance is 4. Essentially, the distance metric counts the number of hops that data needs to travel to get from one node to another.

When data is sent by one process to another on the network, the distance the data needs to travel increases. Essentially, network distance is based on the number of switches or devices that data has to traverse to reach its destination (not including the point of origin). If both processes reside on the same node, communication and resources can be shared locally. If both processes reside on the same rack, data has to traverse the switch on that rack, before it can arrive at its destination node. If both processes reside on different racks, data has to traverse multiple switches. Thus, if a network has a two-level tree topology, and if data is being sent between nodes on different racks, there would be three switches for the data to traverse.

The speed of switches on a network can differ in practice. The costs of a fiber optic network used by high speed networks is often prohibitive for many consumers, and often a traditional switch is all that is required on any given rack. If the speed of a switch on one rack is slower than another rack, the effective network bandwidth between those racks is reduced to that of the slowest switch.

2.2 Join algorithms

Join algorithms have been studied extensively over the years, with many different variants existing for each type of algorithm. Many join algorithms in academia predate the invention of MapReduce, due to their ubiquitous use throughout the database community. The multiway join algorithm presented in this paper is a hybrid join and is a combination of a reduce-side join and a hash-join.

2.2.1 Reduce-side join

Reduce-side joins are based on the MapReduce programming model, which is composed of a map phase and a reduce phase. In the map phase, the datasets are read by a map function, which is executed by a map task, one tuple at a time. The purpose of the map function is to pre-process the tuples and sort them by the join key. Before tuples are partitioned based on their join key, they are tagged so that the reduce function can know which table the tuple originated from. In Hadoop, a tag can be performed with a custom TextPair class. As its name suggests, a TextPair class contains two Text values. The purpose of the TextPair class is to bind a tag to the key and value so that the reducer can discern which table they came from.

After the key and value are tagged, the mapper partitions the key-value pairs. In Hadoop, a partitioner class determines how the partitions are divided, and the number of reducers determines the number of partitions. The partitioner class partitions the data based on the join key so that all the tuples that share the same key are sent to the same reducer. A custom partitioner class is then used to override the default partitioner so that partitioning is only performed with the key and not the tag.

Although this ensures all tuples go to the correct reducer, the reducer groups all values based on the key and processes them together in the reduce function. Since the key is a TextPair class containing two values the default grouping function handles this incorrectly as a single entity. Therefore a custom comparator class is also required so that only the key is considered when the reducer processes these groups. These groups are sorted by the composite key so that tuples are secondary sorted by their tag. In the 2-way join shown in Fig. 2, it means tuples from the first table would arrive before tuples from the second table,

Fig. 2
figure 2

Reduce-side join (2-way)

The reducer then calls its reduce function for each group of keys. In Hadoop, tuples are read from the HDFS stream. There is no random data access, so the reduce function buffers the tuples from the first dataset. These are then joined to tuples from the second dataset, which is read directly from the HDFS stream. Only a single key is presented for each group, therefore the tag on the values is used to identify which dataset a tuple came from.

2.2.2 Hash join

A hash join is a traditional algorithm used by databases for joining two datasets together. A hash join consists of two distinct phases a ‘build’ phase and a ‘probe’ phase. In the build phase, the smallest dataset is inserted into an in-memory hash table. In the probe phase, the largest dataset is scanned and joined with the appropriate tuples stored in the hash table.

Consider two datasets P and Q. The algorithm for a simple hash join is as follows

figure e

2.2.3 Map-side join

MapReduce joins can be performed on either the map-side or the reduce-side. Map-side joins can occur between large inputs by joining the data prior to the execution of the map function. However, the inputs to each mapper have to be partitioned and sorted in a particular way. Firstly, all inputs must be sorted by the same join-key. Secondly, all records of a specific key must be stored on the same partition. Finally, each input must be divided into the same number of partitions [3]. These requirements can be met if the inputs have been preprocessed and outputted via a MapReduce job.

Consequently, for the Map-side join to work, the data has to have been pre-processed by another job. This is in contrast to the Reduce-side Join, which does not require the input dataset to be structured in any particular way. In this paper, the two largest tables are joined first via a reduce-side join. A reduce-side join is used in order to avoid the various limitations imposed by a map-side join and because a map-side join would require an additional MapReduce job.

Finally, the SmartJoin presented in this paper, joins smaller tables on the reducer-side in order to exploit the fact that the partitioned tables are sorted by their join attribute prior to the reduce phase [8]. Since smaller tables tend to be joined first, this join tends to be more effective when the initial join is highly selective.

3 SmartJoin

In this section, we present our proposed multiway join. The purpose of a multiway join is to join multiple datasets together. Our proposed join improves performance of the multiway join by redistributing the workload amongst reducers. Unlike other schemes that redistribute the workload using a distributed queue [8], our methodology redistributes the workload directly between reducers with help of a mediator service, as shown in Fig. 3.

Fig. 3
figure 3

Multiway join tuple redistribution with SmartJoin.

Reduce-side joins and hash joins are both two-way joins. Two-way joins are joins that involve only two tables. Multiway joins are joins involving more than two tables. The multiway join presented in this paper uses a reduce-side join to join the two largest datasets and a hash join on the reducer side to join that result with several smaller datasets. The two largest datasets are partitioned and sent to the various reducers using the typical reduce-side join mechanism.

Unlike the typical multiway join mechanism [9], the smaller datasets are sent to all the reducers. This can be done by duplicating tuples in the mapper phase so that a copy is sent to each reducer, or by using Hadoop’s distributed cache mechanism. Therefore, this method is appropriate in situations where the aggregate size of the smaller datasets can fit in the memory of each node used to execute a reduce task.

For the sake of clarity in this paper, we define three types of reducers, unregistered, senders and receivers. The term unregistered refers to reducers that are working on the primary join and have yet to register with the mediator. The term sender refers to reducers that have the potential to send data and refers to those reducers that are working on the hash join and still have tuples to join. The term receivers refer to reducers that have already processed their workload.

Once the mappers have sent the reducers all the tuples, the reducers are able to process the tuples. The two largest datasets are then joined with a traditional reduce-side join. After completing the initial primary join, the reducer registers with the mediator. The reducer then provides the mediator details about how many tuples it has, and details on which node it resides. The mediator then stores this information in its registry. From then on reducers update the mediator periodically their status and the amount of tuples they have left to process. Reducers then continue to process their data using a hash join. At this stage, the reducers are potential senders that may send tuples to receivers if requested. Before a sender sends its tuples to a receiver, both reducers (sender and receiver) are marked as busy in the registry. After they complete the transaction of the tuples, they update the mediator registry to signal that they are no longer busy. Once the hash join has completed all its allocated tuples, reducers become receivers. These receivers then request the mediator to have tuples sent to them. If there are no senders available, the receiver is stored in a list (Table 2).

Table 2 Mediator registry : reducer metadata

Once one or more senders are available the mediator determines which senders should send which receivers tuples. Since the time a MapReduce job takes to complete depends on the last reducer to complete its workload, priority is given to senders based on which ones have the most tuples. The pairwise algorithm is describes as follows:

figure f

Once a sender has been identified, the mediator finds the receiver nearest to that sender in terms of network distance. When redistributing data in MapReduce one must contend with the network topology. For reasons of both performance and reducing network traffic, the target node should be as close as possible to the source node. Whether two nodes are close together is determined by their network distance. The second factor one needs to contend with when sending data over the network is the speed of the switches on the network. When sending data between racks on the network, the time it takes to send data between racks is dependent on the speed of the slowest switch. Therefore, when sending nodes between racks, the best rack to send data to, is a rack using the same switch speed or faster.

The speed of the switch on each rack is stored in a configuration file. The mediator uses the configuration file to sort racks by their switch speed. In the pairwise algorithm, if there are no receivers available on the same rack, then a receiver is chosen from a different rack. When data is sent between racks, the speed of transmission is determined by the speed of the slowest switch. Therefore, the pairwise algorithm will try to find a receiver that is on a rack with the switch speed equal to or greater to the switch speed of the sender. If the sender can only send tuples to racks that have a slower switch, the pairwise algorithm will give preference to the rack with the fastest (least slowest) switch.

Once the mediator makes a pairing it informs the sender that a receiver is requesting tuples. The sender then sends a batch of tuples directly to the receiver. During the transfer of tuples from the sender to the receiver, the mediator flags both reducers as busy. The mediator then ignores the busy reducers until the transfer of tuples is complete. The redistribution of tuples continues until all tuples have been processed.

The number of tuples transferred in a batch is based on the number of tuples stored on reducers registered with the mediator. The following algorithm calculates the number of tuples a sender sends to a receiver.

figure g

The batchsize algorithm is based on the following rules:

  1. 1.

    localTuples = number of tuples stored on the local reducer

  2. 2.

    averageTuples = average number of tuples stored on each reducer

  3. 3.

    Sender should not send tuples if (localTuples \(<\) = averageTuples)

  4. 4.

    Sender should not send more than half the tuples stored locally.

  5. 5.

    Sender should not send more tuples than averageTuples

These rules ensure that when sending tuples to a receiver, the receiver’s tuples do not exceed the number of tuples each reducer would need, if it were to have a balanced workload. Furthermore, it ensures that the number of tuples sent is less than the amount that remains on the local sender. This is because it is better to spend the time processing tuples locally, then it is to send a proportionally larger workload to another reducer. Finally, the sender should not send tuples at all if the sender itself has less tuples than or equal the target workload needed for that sender to be balanced. It is better at this point for that sender simply to process all its own tuples, after which it can start to acquire tuples as a receiver instead.

4 Evaluation

4.1 Experiment configuration

To evaluate the performance of the proposed technique, we implemented the SmartJoin method and tested its performance on a simulated MapReduce environment. The simulated environment was built in-house and was based on the Hadoop MapReduce platform [3]. We then evaluated the performance of SmartJoin against tuple redistribution methods that do not take into account the network configuration (Fig. 4).

Fig. 4
figure 4

Experiment network configuration.

In order to test our proposed algorithm the MapReduce environment was setup to emulate a cluster of computers. For this purpose, we model a network with three racks. One rack has a 750 Mbps switch for intra rack communication, one rack has a 500 Mbps switch and another rack has a 250 Mbps switch. Connecting these three racks is a 1Gbps switch for inter rack communication, with 10 nodes per rack. Each node on the network contains four reducers, which are executing on separate cores with identical characteristics. To emulate this environment, switches in the simulator processed tuples at 750, 500 and 250 tuples per second (tps).

4.2 Experiment results

To evaluate the performance of the proposed technique, we implemented the SmartJoin and tested these methodologies using a set of randomly generated input data on a simulated MapReduce environment. We then evaluated the SmartJoin using different CPU speeds, by changing the number of nodes and reducers executing on the network, and by changing the type of loads executing on the network.

The performance of the SmartJoin was compared against systems with no data redistribution, with random reducer selection, and with worst-case reducer selection. Random reducer selection distributes data from the sender with the largest workload to any available receiver picked at random. Random methods were run multiple times and an average result was taken. In worst-case reducer selection (WorstJoin), SmartJoins were modified so that they would pairwise senders and receivers based on the greatest network distance (instead of the least network distance).

Each reducer on the network was loaded with a number of tuples to process. The number of tuples on each reducer initially ranged from a thousand tuples to a million tuples. The initial processing capability of the CPU used in the simulation was then set to 100 tps. The performance of the SmartJoin was then tested and the results recorded. To investigate the efficacy of the SmartJoin at redistributing tuples with different workloads, the minimum load of the reducers was increased and the test was rerun. The loading of the reducers on the network for these tests is shown in Fig. 5.

Fig. 5
figure 5

Workload distribution on reducers. a 1K-1M tuples b 100K-1M tuples c 500K-1M tuples d 800K-1M tuples

As shown in Fig. 6, the efficacy of SmartJoin declines as the difference between the minimum load and maximum load decreases regardless of CPU speed. In Fig. 6a there is a case when a SmartJoin takes longer to process the data than a WorstJoin. This is due to their being no free receivers being available when the last unregistered reducer finished its primary workload and became a sender. Later, when a receiver became available, it was on a different rack from the sender. The WorstJoin algorithm managed to reverse this situation by offloading the workload onto different racks first. This meant the only receiver available to the last sender was one that happened to be on the same rack. In most cases, the SmartJoin outperforms the other methods.

Fig. 6
figure 6

The affect of workload on SmartJoin performance. a CPU 100 tps b CPU 150 tps c CPU 200 tps

The performance of the SmartJoin was then compared against networks with different number of nodes. The simulation used workloads that ranged from a thousand tuples to a million tuples. The average number of tuples on each reducer is initially 380,000 tuples for a network environment containing 10 nodes per rack. The number of nodes on each rack where then changed from 10 nodes to 20 nodes and then finally to 30 nodes. During these tests, the workload is redistributed amongst the nodes so that the total number of tuples used in the test remains unchanged (approximately 46 million tuples).

As shown in Fig. 7, the efficacy of SmartJoin seems to increase as the number of nodes increases. As shown in the following table SmartJoin’s best performance occurs when there are 30 nodes per rack and the CPU is executing at 200 tps (Table 3).

Fig. 7
figure 7

SmartJoin performance after increasing the number of nodes per rack. a CPU 100 tps b CPU 150 tps c CPU 200 tps

Table 3 SmartJoin performance: different number of nodes per rack

The performance of the SmartJoin was then compared against networks with different number of reducers per node. The simulation used workloads that ranged from a thousand tuples to a million tuples. The average number of tuples on each reducer is initially 380,000 tuples for a network environment containing 10 nodes per rack. The number of reducers on each node where then changed from 4 reducers to 8 reducers and then finally to 12 reducer. Each reducer is assumed to run on a separate processing core. During these tests, the workload is redistributed amongst the reducers so that the total number of tuples used in the test remains unchanged (approximately 46 million tuples).

As shown in Fig. 8, the efficacy of SmartJoin seems to increase as the number of reducers increases. As shown in the following table SmartJoin’s best performance occurs when there are 12 reducers per node and the CPU is executing at 200 tps (Table 4).

Fig. 8
figure 8

SmartJoin performance after increasing the reducers per node. a CPU 100 tps b CPU 150 tps c CPU 200 tps

Table 4 SmartJoin performance: different number of reducers per node

Overall, the performance of SmartJoin is better than other tuple redistribution methods. By having more nodes, SmartJoin has more opportunities to match a receiver and sender on the same rack. By having more reducers on each node, SmartJoin has more opportunities to match a receiver and sender on the same node. Consequently, the performance of SmartJoin markedly improves as one adds more nodes and more reducers.

5 Related work

Joins have been studied in detail by many sources and investigations and descriptions of various joins have been collated in other works [9, 10]. A recent investigation into hash joins by [11] has shown that the hash join algorithm is an efficient algorithm for performing joins in modern multicore processors in main memory environments. The results of the hash join algorithm study showed that a simple hash join technique without partitioning any of its input relations often outperforms other more complex partitioning-based join alternatives. In addition, the relative performance of this simple hash join technique rapidly improves with increasing skew, and it outperforms every other algorithm in the presence of even small amounts of skew. Overall both [9] and [11] indicate hash join as being an efficient method for handling joins on a processor, however it is limited by the memory available to store the hash table.

Data skew has a big impact on overall performance in MapReduce. It is for this reason SmartJoin redistributes its tuples amongst reducers. An alternative technique to tuple redistribution is to preempt the overloading of any particular reducer in the first place. For this purpose, a Skew hANDling join called a SAND join is proposed in [12]. The SAND join is a two-way join that replaces the hash partitioning approach used by reduce-side joins in favor of its own range partitioning approach and samples data before partitioning. This method helps reduce skew amongst the reducers. SmartJoin’s approach to workload balancing via tuple redistribution could work in conjunction with this approach.

The use of a bloom filter [13] can reduce the amount of work processed by a join. A bloom filter is used to filter out redundant intermediate records. By filtering out tuples that are not matched in the join, the bloom filter reduces the workload. Consequently, this improves the efficiency of the join. This approach is an orthogonal approach toward MapReduce joins and can be used alongside the approach used by SmartJoin.

Another work that discusses handling joins using a mediator over a network is presented by [14]. This system employs a balanced network utilization metric to optimize the use of all network paths in a global-scale database federation. It uses a metric that allows algorithms to exploit excess capacity in the network, while avoiding narrow, long-haul paths. Another work similar to our paper that handles tuple redistribution in multiway joins is presented by [8] but it uses a distributed queue [15] rather than using peerwise network connections and does not take into account network distance when redistributing tuples.

Awareness of where data is located, is an issue that needs to be considered in MapReduce. This is because the physical location of nodes, processes and data on the network affects MapReduce performance. SmartJoin takes advantage of physical location of nodes when it distributes tuples between reducers. To take advantage of data locality between mappers and reducers Purlieus resource allocation system [16] uses locality awareness in both the map and reduce stages thereby reducing job execution time and reducing network contention within the data center. Data locality is also considered by the Mesos [17] platform, which is a resource allocation system which shares resources in a fine-grained manner amongst different frameworks such as Hadoop and MPI. Mesos allows each framework to achieve data locality by taking turns reading data stored on each machine. As an extension to the Mesos system, an alternative resource allocation system is presented in [18], which attempts to distribute resources fairly in a system containing different resource types and where different applications have different resource requirements. An alternative topology aware resource allocation (TARA) [19] system also takes into account physical location of resources on the physical network in order to optimize allocation of resources for Infrastructure-as-a-Service (IaaS)-based cloud systems. The purpose of TARA is to overcome deficiencies in current IaaS systems, which do not consider the resource requirements of its hosted application and allocated resources independent of its needs. TARA’s approach is to incorporate a prediction engine into the resource allocation system. This prevents IaaS providers relying on clients to provide possibly flawed resource requirements. TARA’s prediction engine is based on a lightweight simulator that estimates the performance of a specific resource allocation and a genetic algorithm that it uses to search for an optimal solution.

Research in recent years on joins and the MapReduce programming model has resulted in the creation of new programming models to improve join performance in various scenarios. Research by Wang et al [20] investigates how to perform an equi-join on large datasets on a ring architecture distributed system rather than the master-slave architecture distributed system used by MapReduce programming models like Hadoop. Meanwhile, research by Jiang et al [21] extends the MapReduce model to a MapJoinReduce model, which performs filtering-join-aggregation tasks in two successive MapReduce jobs. This approach allows multiple data sets to be joined in one go and avoids frequent checkpointing and shuffling of intermediate results. These methods are unlike the proposed SmartJoin, which builds on top of the pre-existing framework rather than changing the programming model or working environment.

In order to improve joins for multiple datasets a new frame called Llama [22] was created that has a distributed file system that stores data in both row-wise and column-wise format. The developers of Llama noted that the MapReduce model needs to process multiple joins using multiple jobs. Since this requires storing intermediate results of consecutive jobs to a file system like HDFS (Hadoop Distributed File System) it incurs a very high I/O cost. Llama’s new DFS allows it to handle joins more efficiently using join algorithms designed to take advantage of its unique DFS.

Researchers have studied MapReduce joins for use in specific applications. Research into matrix multiplication [23] identified how multiway joins can be beneficial when multiplying large matrices on MapReduce as it reduced the number of binary multiplications. Matrix multiplications were performed in this paper by translating a multiplication into a join operation on a database system. Research such as this could be positively impacted by SmartJoin. Other research into MapReduce joins include similarity joins, such as a distance-based similarity self-join [24, 25] which can process large vector data sets and the V-SMART-Join [26] that improves performance for all-pair similarity joins for multisets and vectors.

As shown in the literature [2126], there has been much research into MapReduce joins. Since the join operation is a common data processing task, it is an attractive target for optimization. The need for such optimization will likely continue as researchers try to marry traditional database technologies like SQL with MapReduce [2732].

6 Conclusion and future work

In this paper, a network aware multiway MapReduce join called SmartJoin is presented that redistributes the workload in a MapReduce job. The simulation results show that SmartJoin can significantly improve tuple redistribution for multiway joins in MapReduce applications. SmartJoin has shown up to 39 % improvement compared to the non-redistribution method, with up to 26.8 % improvement over the random redistribution method and up to 27.6 % improvement over the WorstJoin redistribution method.

SmartJoin is designed for users who intend to perform multiway joins between two large datasets and several smaller datasets with MapReduce. In future work it would be desirable to explore how this system could be extended to handle more than two large datasets and how to improve its performance on different network topologies or hardware configurations. We leave these tasks for future work.