1 Introduction

The past decade has witnessed the remarkable growth of Internet communication technology especially mobile Internet and sensor networks to perceive and obtain information. Organizations from industry, government, and academia possess and store large quantities of data which contain tremendous value. The potential value of big data [1] cannot be unearthed by simple collection or statistical analysis, currently referring to big data. Advanced big data analytics and applications require special technologies to efficiently cope with massive amounts of data. Data mining techniques [2] are now drawing attention from the practitioners of all data-related industries for this purpose. The aim of data mining is to explore data in search and interpretation of unforeseen trends or patterns between variables, and then to verify the results with the detected patterns applied to new subsets.

Since data gathered from a variety of data sources are often a series of isolated data, correlation analysis naturally becomes an important foundation for data mining and big data science [3]. Association rule mining [4, 5] was proposed to discover certain interesting correlation relationships among the itemsets of the data. Furthermore, frequent itemset mining [6] is an essential step in the process of association rule mining. Some well-known conventional algorithms, including Apriori [7], FP-growth [8], and other matrix-based algorithms [9] for frequent itemset mining working on single computers, have shown good performance in dealing with small amount of data. Nevertheless, conventional approaches come across significant challenges when computing power and memory space are limited in big data era. Some practices and attempts have been made to mine frequent itemsets from massive data by using parallel computing technologies [1012].

Parallel programming frameworks can be mainly classified into two categories: memory sharing and distributed architectures (share nothing). Although it’s easier to make algorithms implemented parallelism on memory sharing framework, the scalability of them is not satisfactory enough [13]. Message passing interface (MPI) [14, 15], which is a common framework for scientific distributed computing, takes the advantage of memory locality. Some researches thus apply MPI to mine frequent itemset [16, 17]. In spite of certain advantages in iterative computation, the disadvantages of MPI are its high communication load due to data exchanges between different computer nodes and the lacking of fault tolerance. MapReduce [18], a framework embedded in Apache Hadoop to process large amounts of distributed data in parallel, was designed to support distributed computing in a cloud computing paradigm, turning out to be an efficient platform for parallel data mining of large scale datasets [19]. However, the MapReduce framework is not appropriate for iterative computation, because repeated read-write operations to Hadoop distributed file system (HDFS) would lead to high I/O load and time cost.

To overcome the above problem, the Spark platform [20], a memory-based distributed framework, has been used as solution architecture in this paper. We propose a distributed Apriori-like algorithm, called DFIMA, which has significantly improved performance for the frequent itemset mining algorithm. Apriori algorithm [21] is an iterative process, including candidate itemsets generation and frequent itemsets generation. DFIMA reduces the amount of candidate itemsets by using a matrix-based pruning approach. Moreover, to further improve the efficiency of iterative computation, Spark is applied to adapt the algorithm to be distributed. This paper presents extensive experiments comparing the proposed method with the existing algorithm, parallel FP-growth (PFP) [22] on the Spark 1.3 platform. The results show that DFIMA is better in terms of speed and scalability. Besides, a case study is conducted to validate the feasibility of the proposed algorithm, which is also compared with PFP. In addition to accompanying cars recognition described in this study, there are several other potential applications of the proposed approach such as mining frequent events in online social networks [23, 24], understanding user pattern of E-Business [25] and identifying correlated sensors in Wireless Ad Hoc Sensor Networks [26]. The main contributions of this paper can be summarized as follows: One of the important contributions is that a matrix-based pruning technique is adopted into DFIMA, thus to greatly reduce the amount of candidate itemsets and the frequency of database scans. Furthermore, the proposed algorithm has been implemented over Apache Spark 1.3, a fast and general engine for large-scale data processing, which could provide a solution for big data analytics.

The rest of the paper is organized as follows: Sect. 2 describes the review of researches in parallel and distributed frequent itemset mining. Section 3 gives a brief introduction of frequent itemset mining and Apache Spark. Section 4 illustrates the proposed algorithm in details. Section 5 shows the experimental results. Finally, Sect. 6 provides some conclusions and discusses the future work.

2 Literature review

A number of research efforts have explored to address the problem of frequent itemset mining in parallel and distributed environments. Solutions in the literature generally aimed at improving PFP and adapting Apriori to be distributed.

PFP is a parallel form of the classical FP-Growth [27]. Li et al. [28] introduced PFP for query recommendation, which splits a large-scale mining task into independent and parallel tasks. A load balancing FP-Tree algorithm was developed by Yu et al. [29], the itemset for mining is divided by evaluating the tree’s width and depth, and a calculate function for loading degree is given. In the work of [30], a single prefix path compression technique was developed to identify frequent closed itemsets, and a partition-based projection mechanism is established to make the mining efficient and scalable for large databases. Chen et al. [31] proposed a parallel FP-growth algorithm running on computer cluster; a projection method was used to find all the conditional pattern bases of frequent items so as to avoid memory overflow. In summary, the main principle of PFP is to group the items and then distribute the conditional databases to the mappers, which is not efficient in memory or speed.

Research efforts have already been made to improve Apriori-like algorithms or Apriori-based algorithms and convert them into distributed versions, mostly under the MapReduce environment. Lin et al. [12] developed three versions of Apriori algorithm (SPC, FPC, and DPC) on MapReduce framework, SPC is a straight-forward algorithm while FPC aims at reducing the number of scheduling invocations, and DPC features in dynamically combining candidates of various lengths. Farzanyar et al. [32] presented a pruning technique to decrease the number of partial frequent itemsets in Map phase based on MapReduce model. The MrAdam algorithm [33] was proposed to obtain the approximate collections of frequent itemsets, which combines a Chernoff bound-based approach and the MapReduce framework. Moens et al. [34] introduced two approaches for frequent itemset mining, Dist-Eclat focuses on speed whereas BigFIM is optimized to run on really large datasets. By storing metadata in the form of Transaction Identifiers, Yu et al. [35] proposed a distributed parallel Apriori algorithm (DPA). Ozkural et al. [36] proposed a divide-and-conquer strategy to parallelize the FIM (frequent itemset mining) task, they used a top-down data partitioning scheme with selective replication. Aouad et al. [37] studied the performance of distributed Apriori-like frequent itemset mining, and found that the intermediate communication steps and remote support counts computation impose restrictions on global performance of classic distributed schemes. In order to avoid generating lots of candidate sets and scanning the transaction database repeatedly, Chen et al. [38] proposed BE-Apriori algorithm based on the classic Apriori algorithm, and applied pruned optimization strategy to reduce the generation of frequent itemsets, while used transaction reduction strategy to reduce the scale of transaction database. In general, the performance of these approaches might not be satisfactory due to the bottleneck of iterative computation when confronting with large scale datasets. Therefore, in this paper, a distributed algorithm for frequent itemset mining (DFIMA) is proposed to improve and speed-up the process of frequent itemset mining.

3 Preliminaries

3.1 Frequent itemset mining

Suppose that \(I={\{}I_{1},I_{2},\ldots ,I_{m}\)} is an itemset composed of m items. A database D consists of a series of transactions. Each transaction is a subset of I and has a unique label denoted by TID. A set of items is referred to as an itemset. An itemset that contains k items is a k-itemset. For instance, the set {beer, diaper} is a 2-itemset. The occurrence frequency of an itemset is the number of transactions that contain the itemset. Given an itemset X, the support number of X is the number of transactions in D that containX. If the support number of X is greater than or equal to the specified minimum support threshold (abbreviated as MinSup), then the itemset X is labeled as a frequent itemset. The purpose of frequent itemset mining is to find all frequent itemset in a given database.

3.2 Apache Spark

Conceptually, Apache Spark is an open-source in-memory data analytics cluster computing framework, developed in the AMPLab at UC Berkeley. As a MapReduce-like cluster computing engine, Spark also possesses good characteristics such as scalability, fault tolerance as MapReduce does. The main abstraction of Spark is resilient distributed datasets (RDDs) [39], which make Spark be well qualified to process iterative jobs, including PageRank algorithm [40], K-means algorithm and etc. RDDs are unique to Spark and thus differentiate Spark from conventional MapReduce engines.

In addition, on the basis of RDDs, applications on Spark can keep data in memory across queries and reconstruct automatically data lost during failures [41]. RDD is a read-only data collection, which can be either a file stored in an external storage system, such as HDFS, or a derived dataset generated by other RDDs. RDDs store much information, such as its partitions, and a set of dependencies on parent RDDs called lineage. With the help of the lineage, Spark recovers the lost data quickly and effectively. Spark shows great performance in processing iterative computation because it can reuse intermediate results, keep data in memory across multiple parallel operations.

4 Distributed frequent itemset mining algorithm (DFIMA)

This section will introduce a distributed frequent itemset mining algorithm, which is an Apriori-like algorithm. DFIMA is a breadth first search algorithm by use of a property called apriori (see Property 1) which is referred as a support-based technique firstly introduced by Agrawal, et al. [4]. In general, Apriori algorithm can be viewed as a two-step process: in the first step, scan the database, then count the number of each item in the database, i.e. calculating the support number of each item, and pick out all items whose support number are not less than MinSup to form frequent 1-itemset. In the second step, keep generating the entire candidate (k + 1)-itemset based on the whole frequent k-itemset, and scanning the database repeatedly till there is no candidate (k + 1)-itemset.

Property 1

If X is a frequent itemset, then any subset of X is a frequent itemset. In other words, if X is a frequent (k + 1)-itemset, the number of frequent k-itemset is greater than \(k+1\).

4.1 Matrix-based pruning algorithm

In this subsection, the main principles of matrix-based pruning algorithm for frequent itemset mining are illustrated. The key to this algorithm is to acquire the Boolean vector for each item of a given database and calculate the 2-itemset matrix, by which the amount of candidate itemset is reduced.

Assume that database D includes n transactions \(T={\{}T_{1},T_{2},\ldots ,T_{n}\)}, m different items I = {\(I_{1}\)\(I_{2}\), ..., \(I_{m}\)} and the value of MinSup is set to Min_ sup . The main steps are listed as follows:

Step 1: Obtain the Boolean vectors for each item of frequent 1-itemset.

Scan the database D to obtain n dimension Boolean vector, denoted as\(V_{i}\) = (\(b_{1}\)\(b_{2}\)\(\ldots \)\(b_{j}\)..., \(b_{n})^{T}\), where \(b_j =\left\{ {\begin{array}{l} 1,I_i \in T_j \\ 0,I_i \notin T_j \\ \end{array}} \right. ,({j=1,2,\ldots n})\) for each item I = {\(I_{1}\)\(I_{2}\), ..., \(I_{m}\)}. Thus the support number of \(I_{i}\) equals the number of nonzero elements in \(V_{i}\). Then record the items and corresponding Boolean vectors whose support number are not less than Min_ sup . After the whole frequent 1-itemset are obtained, sort them in ascending order of support number.

Step 2: Calculate the 2-itemset matrix according to the Boolean vectors of the entire frequent 1-itemset.

Figure 1 shows the process to generate the 2-itemset matrix M, of which each element is respectively produced by items of the entire frequent 1-itemset that has been sorted in ascending order. Specifically, if the number of the entire frequent 1-itemset is k, then M is a \(k \times k\) matrix. Let the Boolean vector \(V_{i}\) and \(V_{j}\) be the ith and jth item of the sorted frequent 1-itemset respectively. The number of nonzero elements of the vector that generated by a logical AND operation between \(V_{i}\) and \(V_{j}\) is recorded and defined as the value of \(M_{ij}(i\) \(\ne \) j), an element of matrix M. In particular, the value of \(M_{jj}\), i.e., the diagonal elements of M, is equal to the frequency of \(M_{ij}\) \(\ge \) Min_ sup  (\(i=0,1,\ldots ,j-1\)) for column j of M. We only need to calculate the value of \(M_{ij}(i\) \(\le \) j) since matrix M is symmetric. Finally, the whole frequent 2-itemset are acquired on the basis of the matrix M.

Fig. 1
figure 1

The process to generate the 2-itemset matrix M

Step 3: Obtain frequent (k + 1)-itemset by use of frequent k-itemset (k \(\ge \) 2).

Let \(L_{k}\) be the whole frequent k-itemset. The primary thing we should consider is the number of frequent k-itemset. If the number is greater than \(k+1\), then the frequent (k + 1)-itemset may exist according to Property 1. For each frequent k-itemset denoted by F, which has been sorted in ascending order of support number, let the first and last item of F be written as \(I_{first}\) and \(I_{last}\) respectively. Apparently, \(I_{first}\) has the minimum support number while \(I_{last}\) has the maximum support number. Assume that the row number in M which corresponds with \(I_{first}\) is u(u = 0, 1, ..., \(k-1\)) and the row number in M which corresponds with \(I_{last}\) is v (v = 0, 1, ..., \(k-1\)). Then search each column of the row v in M in turn. For each column w(w \(<\) v) in M, if meet the conditions that \(M_{vw}\) \(\ge \) Min_ sup , \(M_{vw}\) \(\ge \) k and \(M_{uw}\) \(\ge \) Min_ sup , then let the corresponding item of the column w be \(I_{new}\), and accordingly a candidate (k + 1)-itemsetis obtained by combining the frequent k-itemset, i.e., F, with the item \(I_{new}\). In the light of Definition 1, the support number of the candidate (k + 1)-itemset can be obtained. If the support number is greater than Min_ sup , then the candidate (k + 1)-itemset is frequent.

Definition 1

Suppose that \(V_{r}\) is the Boolean vector of item \(I_{r}\), then the support number of the itemset Q = {\(I_{o}\)\(I_{p}\), ..., \(I_{q}\)} is equal to the number of nonzero elements in the vector generated by the logical AND operation between all the Boolean vectors of the items in itemset Q.

4.2 Implementation over Spark

In this subsection, the implementation of the above algorithm for single machine based on Spark is stated in detail. In order to reduce the memory usage, HashMap is used in our program to save the Boolean vector for each frequent 1-itemset and all the frequent 2-itemset, instead of saving the 2-itemset matrix M.

Step 1: Obtain the Boolean vectors for all frequent 1-itemset, then produce all the frequent 2-itemset based on these Boolean vectors.

As illustrated in Fig. 2, read the input dataset from HDFS to create the HadoopRDD first. Next, the HadoopRDD will call the flatMap function to produce a FlatMappedRDD which contains all items of the input data. Apply the map function to the FlatMappedRDD so as to transform each item to a Tuple-(item, 1), thus a MappedRDD is obtained. Besides, count the support number of each item of MappedRDD by use of the reduceByKey function, further to pick out the items whose support number are not less than Min_ sup . Then, sort all the frequent 1-itemset in ascending order of their support number, and save them in an array arr. For each frequent 1-itemset in arr, get the relevant Boolean vector, and save them as a \(<\)item, vector\(>\) key/value pair in a HashMap. Finally, the entire frequent matrix can be obtained by using the array arr and these Boolean vectors. The matrix will be saved into a HashMap, whose key is a frequent 1-itemset. For an instance, let G be a key, then the value is a HashMap, whose key is an another frequent 1-itemset that can construct a frequent 2-itemset together with G, and the value of such a HashMap is the support number of current 2-itemset.

Fig. 2
figure 2

The process of step 1 on Spark

Step 2: Get all frequent (k + 1)-itemset by using frequent k-itemset(k \(\ge \) 2).

As shown in Fig. 3, the procedure of this step is similar to that of Step 3 stated in Sect. 4.1. To begin with, the RDDs that contain frequent k-itemset will call the map function to produce all candidate (k + 1)-itemset. Secondly, use the broadcast variable of Spark to save the Boolean vectors of all frequent 1-itemset. Moreover, we employ the Definition 1 to determine if a candidate (k + 1)-itemset is frequent. Consequently, all the frequent (k + 1)-itemset are obtained.

Fig. 3
figure 3

The process of step 2 on Spark

It’s worth noting that some RDDs are used repeatedly throughout the job, for example, the HadoopRDD saved with the input data is reused to get the Boolean vectors of all frequent 1-itemset, and the RDDs saved with the frequent k-itemset are reused to obtain all frequent (k + 1)-itemset. Therefore, the cache function is applied to store these RDDs in distributed workers of the cluster, and further to achieve in-memory computation. In this way, the input data only needs to be loaded once, which improves the performance of entire job.

In addition, some variables, stored in the memory of Spark cluster, should be shared when dealing with multiple parallel operations. For example, the variables that save the Boolean vectors of the frequent 1-itemset. In view of efficiency, we apply the broadcast function, it allows programmers to send shared variables to all workers of the cluster only once instead of delivering a copy of shared variables for every task.

5 Experimental results

In this section, comparisons of DFIMA and PFP are made to evaluate their performance of speed and scalability. Both the two algorithms have been implemented over the Spark version 1.3. After the performance evaluation, a case study is given to discuss the practicality of the two algorithms.

5.1 Performance evaluation

In this subsection, the datasets T10I4D100K and T40l10D100K are used for experiments. These two real datasets were presented at the first IEEE ICDM workshop on Frequent Itemset Mining (FIMI’ 03) [42]. Table 1 shows the detail of the two datasets.

Table 1 The properties of datasets used in experiment

The experiments were conducted on a cluster that consists of 3 computer nodes with Centos 7.0 installed. Every computer node was deployed with the same physical environment, i.e., Intel Core i5-4440M CPU 3.1 GHz, 16 GB memory and 1TB disk.

The aim of first comparison is to estimate the speed performance by analyzing the running time of DFIMA and PFP (Spark 1.3). In this case, we assume that the support degree varies while the number of computer nodes remains to be 3. Table 2 shows the specific running time (in seconds) of DFIMA and PFP for datasets T10l4D100K and T40l10D100K. The support degree is set to 0.01, 0.03 and 0.05 respectively. We can see from Table 2 that the two algorithms tend to be more efficient when support degree is set to a higher level. Note that the running time of DFIMA is apparently less than that of PFP, for both datasets. For example, for dataset T40l10D100K, when support degree is 0.05, the running time of PFP is 250s, nearly 3 times bigger than the time of DFIMA, i.e., 67s.

The running time with different support degree for dataset T10l4D100K and T40l10D100K is shown in Figs. 4 and 5 separately. The x-axis denotes the support degree and y-axis represents the running time. As the support degree grows from 0.1 to 0.5 %, we notice in Fig. 4 that the value of running time for PFP is consistently higher than the same value for DFIMA. Figure 5 shows that the superiority of DFIMA in terms of the remaining time metric is clearer when using bigger data sets. It can be seen that the difference between the two algorithms is bigger compared to the one noticed in Fig. 4.

Table 2 The running time comparison on T10l4D100K and T40l10D100K
Fig. 4
figure 4

The running time on T10l4D100K

Fig. 5
figure 5

The running time on T40l10D100K

Besides, another comparison of speed performance between DFIMA and PFP (running on Spark 1.3) was conducted on “T40l10D100K” datasets. These “large” versions of the original datasets are manually generated to increase the scale of datasets. It can be seen from Table 3 that DFIMA significantly outperforms PFP for all settings of support degree. When support number is small, the time cost of two algorithms seem to increase more quickly.

The following experiment evaluates the scalability of DFIMA, which is also measured by the running time (Spark 1.3). The dataset T10I4D100K is used here. The experiment is performed on condition that the number of cluster computer nodes ranges from 2 to 8 while the support degree remains to be 0.5 %. In Fig. 6, x-axis denotes the number of computer nodes of Spark cluster and y-axis represents the running time of DFIMA algorithm. Figure 6 illustrates the running time with various numbers of computer nodes. With more computer nodes, DFIMA needs less execution time, and the curve of DFIMA has a nearly linear decline. DFIMA shows a characteristic of near-linear scalability.

Table 3 The running time comparison on “T40l10D100K” databases

5.2 Case study

In this subsection, a case study is provided to evaluate the practicality and effectiveness of DFIMA, which is also compared with PFP-Growth (Spark 1.3).

Fig. 6
figure 6

The running time with different computer nodes

An automatic license plate recognition database was generated to obtain a dataset called Car dataset, so as to simulate the application of the accompanying cars recognition, which is used to identify suspect criminal gang. It is common that cars move in a queue in dynamic traffic flows. But in some cases, when the condition of accompanying occurs with a high frequency, these cars are regarded as accompanying cars and suspected of gang crime.

We simulate a database that contains more than 40 intelligent monitoring and recording systems of vehicles on highways. Information collected in the database includes the license plate, passing time, speed and etc. of each passed vehicle. For each intelligent monitoring and recording system, data is gathered and saved as one record every five minutes. Based on the data recording of nearly 6 months, we get a Car dataset (2.1 GB) that consists of 29204016 records. Considering that cars’ moving in queue is a normal phenomenon in rush hours, this simulation measures accompanying cars by defining a certain “accompanying times”, namely the support number.

Table 4 shows the execution time of the two algorithms, the support number is set to 100, 150 and 200 separately. In fact, we observe that both DFIMA and PFP perform well on Spark in this simulation. However, the time cost of PFP is more than twice the value of DFIMA in all cases. Therefore, the case study also testifies that the running time of the proposed approach is significantly less than that of PFP (running on Spark 1.3).

Table 4 The results of PFP-Growth and DFIMA on Car dataset

The above experimental results indicate that DFIMA is able to efficiently reduce the amount of candidate itemsets and is capable of using as a reliable approach for frequent itemset mining with a considerable performance.

6 Conclusion and future work

The performance bottleneck due to repeated database scanning of Apriori algorithm and iterative computation on MapReduce framework hinders frequent itemset mining from massive data. This paper presents a novel distributed algorithm, called DFIMA, in which a matrix-based pruning approach is introduced and used as a means of reducing the amount of candidate itemset. Moreover, to further promote the efficiency of iterative computation, we have integrated our approach into the Spark platform. The experimental results indicated that the proposed algorithm shows better efficiency and scalability compared to PFP (running on Spark 1.3). We also notice in experiments that our method performs especially well in occasions with a relatively high support degree. Further optimization will be considered to improve DFIMA and make it suitable for more mining cases.