1 Introduction

During the past decades, the Internet information technology has developed rapidly at an alarming rate, and the amount of data is increasing rapidly, which ushered the era of big data [1,2,3,4,5]. There are many definitions about the big data, and the definition of 3 V is one of the most representative. The definition describes that the big data needs to satisfy three characteristics: volume, variety, and velocity [6, 7]. Besides, some researchers think the big data should have the characteristic of value [8, 9]. The value might be the opportunity, while dealing with practical problems about big data. The potential information discovered from the big data might be huge impetus in the development of social economy and scientific research [10,11,12,13,14,15,16,17]. As Hal Varian, the first economist at Google, has said: the data are usable widely, but the ability of extracting knowledge from the data is absent. What we authentically need is the ability to discover valuable information from the big data. Therefore, the technologies of big data mining came into being [18,19,20,21,22,23,24,25,26]. And among these technologies, clustering algorithm is one of the most important branches.

As an unsupervised learning algorithm of data mining, clustering algorithm divides similar data objects into a class based on their relationships and characteristics [27,28,29,30]. Moreover, the clustering algorithm can discover potential distribution patterns and relations in data set. So, clustering algorithm has a wide application in many areas such as image identification, information retrieval, and anomaly detection. [31,32,33,34,35]

There are five kinds of traditional clustering algorithms: partitioning-based clustering, hierarchical clustering, density-based clustering, model-based clustering, and grid-based clustering [36]. Among these algorithms, partitioning-based K-means [37] which uses distance as similarity evaluation index can be implemented easily. Moreover, it can run fast when the data volume is small, so the K-means is widely used since proposed.

However, when the data volume is huge, storing in a single machine is difficult, thereby leading to the failure of the traditional k-means algorithm to process a large amount of data in parallel [38, 39]. Therefore, traditional K-means will not perform well. Fortunately, Google has proposed the distributed computing framework named MapReduce [40, 41], whose open source implementation has been widely used such as Hadoop [42,43,44], which makes it possible for us to combine MapReduce with traditional clustering method to cluster data in parallel. Jain et al. [45] firstly proposed an approximation algorithm combining big data analysis and K-means. The algorithm calculates attributes of interest and a fixed value of algorithm iterations times, thereby reducing the final time complexity. However, this algorithm does not take into account the sensitivity of the initial center when clustering the data. Based on Jain’s work, Yin et al. [46] applied the K-means based on the improved algorithm under MapReduce framework to cluster big data set by agglomerative hierarchical clustering, which obtains the initial cluster centers. Li et al. [47] designed the K-Medoids parallel clustering algorithm under environment of cloud computing. The algorithm uses the maximum principle to obtain the optimized initial clustering centers. Even though these algorithms have improved the traditional random selection of center points, they have not completely solved the sensitivity of the initial center.

The communication overhead between each node is also a critical issue for mining data in parallel. To reduce the communication overhead, many researchers partition data by designing similarity functions. However, this method will cause data skewing for each partition. Therefore, Zhou et al. [48] designed a MapReduce-based load balancing strategy. The strategy obtains information of partitions at all Mappers, and then it uses the information to change partition function dynamically. The modified function of partition has added the corresponding index shift value of Reducer, so each reducer will get approximate same amount of data, which avoids the problem of data skewing.

To certain extent, the above algorithms still have not perfectly solved the randomness of the initial clustering centers. Besides, the new problems caused by applying MapReduce framework such as how to improve efficiency of MapReduce cluster, how to reduce communication overhead among MapReduce nodes are also needed to be further solved urgently.

The MR-PGDLSH is proposed in this paper. The main contributions of the algorithm are shown below:

  • A GDS was designed to select relatively reasonable initial clustering centers to avoid the problem of producing local optimal solution.

  • A DP-LSH data partition strategy is proposed to reduce communication overhead among MapReduce nodes.

  • A AGS was presented to solve the problem of data skew in data partitions on MapReduce, which improves the performance of MapReduce cluster.

  • A MR-PGDLSH was applied in real data sets, and the final experiment results showed that the MR-PGDLSH always obtains the best performance.

The novelty of this paper is that it firstly uses the GSD strategy to avoid the sensitivity of the initial center point effectively and improves the clustering performance greatly. Then, the similarity function is designed to process the data partitions, which reduces the communication overhead among nodes and improves the parallelization efficiency of clustering indirectly. Finally, an adaptive grouping strategy is designed to solve the data skewing problem of each data partition and effectively improve the efficiency of the cluster.

2 Preliminary

2.1 Locality-sensitive hash function

In the field of computer science, the locality-sensitive hashing (LSH) is a kind of algorithm hashing similar data objects to the same data “bucket” to reduce the dimensionality of data. The LSH is defined based on the following definitions.

Definition 2.1

Consider a data set O with \(n\) data objects and \(m\) attributes, now transform it into a \(m*n\) feature matrix \(M\), where the columns represent data objects and the rows represent the value of attribute. Then, the \(h\min j\) function of \(MinHash\) is used to reduce dimension of \(M\) to get the \(l*n\) signatures matrix [49].

Definition 2.2

For sets \(A\) and \(B\), their \(jaccard\) similarity is defined as \(\left| {A \cap B} \right|/\left| {A \cup B} \right|\). And by this, the distance between them is \(d\left( {A,B} \right) = 1 - \left| {A \cap B} \right|/\left| {A \cup B} \right|\).

Definition 2.3

The function \(H = \left\{ {h:S \to U} \right\}\) will be called \(\left( {R,cR,P_{1} ,P_{2} } \right)\)-sensitive if any two points \(p\) and \(q\) in space \(R^{d}\) meet the following two conditions:

  • \(If\;\left\| {p - q} \right\| \le R\quad {\text{then}}\quad \Pr_{H} (h\left( p \right) = h\left( q \right)) \ge P_{1}\)

  • \(If\;\left\| {p - q} \right\| \ge cR\quad {\text{then}}\quad \Pr_{H} (h\left( p \right) = h\left( q \right)) \le P_{2}\)

2.2 Grid density-based clustering

Grid density-based clustering (GDC) method focus on the clustering of high-dimensional data can be applied to cluster data with sundry shape [50]. The algorithm firstly partitions data into many data units, and then it deals with data units instead of data points. In the algorithm, a given minimum density threshold is essential, because each unit’s density needs to be compared with threshold. Then, it won’t be taken into account for the data unit whose density is less than the threshold. After that, the remaining units will be used for extending clusters. In general, the GDC includes three steps:

Step 1 Map the data point into proper data unit. Then use k-d tree to store information of the units such as position and density.

Step 2 Use the strategies of density-reachable and density-connected to cluster data. The GDC will use breadth-first search strategy to find cluster, which selects a data unit which hasn’t been visited as initial unit. If an adjacent unit of the initial unit isn’t clustered, it’ll be current cluster and added to clusters set. But if the adjacent unit belongs to existing clusters set, it will be skipped, and then GDC will look for the other adjacent unit as current cluster. Finally, the current cluster will be used as next initial unit to repeat these searching procedures until all reachable units have been visited.

Step 3 After getting data cluster, data points are mapped to the corresponding cluster.

3 The PBGDLSH-MR

MR-PGDLSH mainly consists of three stages: selecting initial cluster centers, partitioning data and parallelize clustering. In the stage of generating the initial cluster center, input the original data set and obtain the initial cluster centers by GDS. In the stage of data partitioning, using DP-LSH maps data objects with high association into the same sub-data set, which obtains the initial results, then the data are divided reasonably by AGS, which gets the final results of the data partitions. In the stage of clustering in parallel, input the initial cluster centers and data partitioning results, then the MapReduce executes parallel clustering to get the final clustering result. The whole operation process of the MR-PGDLSH is shown in Fig. 1.

Fig. 1
figure 1

The flow of the MR-PGDLSH

3.1 Initial cluster centers

It is essential for the partitioning-based clustering algorithm to select initial cluster centers, which will produce significant influence on final clustering results. Therefore, the MR-PGDLSH has obtained initial cluster centers by applying GDS. GDS is derived from the core concept of GDC: divide the data sets into grid units and compare them with density threshold. Below are the main steps of GDS:

Step 1 Suppose there is a \(m\)-dimensional data space, divide it into \(2^{m}\) initial data grid units. For each dimension of the data space, suppose it is named as \(Q_{i}\) which represents the data set of each dimension, sort the data points of dimension \(Q_{i}\) by their value and divide them into \(k\) shares to get max value \(Q_{\max }^{i}\) and min value \(Q_{\min }^{i}\) of data points of \(Q_{i}\). Finally, the side length \(G_{{{\text{LQ}}}}^{i}\) of \(Q_{i}\) can be calculated: \(G_{{{\text{LQ}}}}^{i} = \left( {Q_{\max }^{i} - Q_{\min }^{i} } \right)/\left( {k + 1} \right)\).

Step 2 In step1, the data grid units have been obtained by division, so the density of each grid can be calculated. After that, each gird’s density is compared with the grid density threshold \(MinS\) obtained by Eq. (1), and those grid units whose density is less than \(MinS\) won’t be used in following step.

Step 3 The remaining grid units obtained in step. 2 will be used to get initial cluster centers. That is, for each dimension of \(m\)-dimensional data space, it will be divided into \(k\) shares in step. 1, so the average value of each share can be calculated and represented as \(\left\{ {P_{q1} ,P_{q2} ,P_{q3} \ldots ,P_{qk} } \right\}\). Then, these average values will be set as initial cluster centers \(O = \left\{ {O_{1} ,O_{2} ,O_{3} \ldots ,O_{k} } \right\}\), where \(O_{k} = \left\{ {P_{1k} ,P_{2k} , \ldots ,P_{mk} } \right\}\).

Theorem 3.1

(Grid density threshold formula) Suppose \(O_{i}\) and \(O_{j}\) are two data objects in a data space, the data space is made up by grid unit set \(S\), and \(v\) is the number of data objects of min grid unit of \(S\), then the grid unit density threshold \(MinS\) can be calculated as follows:

$$MinS = v \times \min \left( {\frac{1}{n}\mathop \sum \limits_{{\begin{array}{*{20}c} {i = 1} \\ {i \ne j} \\ \end{array} }}^{n} O_{i} - O_{j} } \right)$$
(1)

Proof

Suppose the side length of the min grid unit is \(L\), the distance \(O_{i} - O_{j}\) of any two data objects in the min grid unit is smaller than \(\frac{L}{2}\).

$$\therefore \mathop \sum \limits_{{\begin{array}{*{20}c} {i = 1} \\ {i \ne j} \\ \end{array} }}^{n} O_{i} - O_{j} < n*\frac{L}{2},\quad \frac{1}{n}\mathop \sum \limits_{{\begin{array}{*{20}c} {i = 1} \\ {i \ne j} \\ \end{array} }}^{n} O_{i} - O_{j} < \frac{L}{2};$$
$$\because \min \left( {\frac{1}{n}\mathop \sum \limits_{{\begin{array}{*{20}c} {i = 1} \\ {i \ne j} \\ \end{array} }}^{n} O_{i} - O_{j} } \right) < \frac{1}{n}\mathop \sum \limits_{{\begin{array}{*{20}c} {i = 1} \\ {i \ne j} \\ \end{array} }}^{n} O_{i} - O_{j} ;$$
$$\therefore \min \left( {\frac{1}{n}\mathop \sum \limits_{{\begin{array}{*{20}c} {i = 1} \\ {i \ne j} \\ \end{array} }}^{n} O_{i} - O_{j} } \right) < \frac{L}{2};$$
$$\because v > 0,\quad \therefore v*\min \left( {\frac{1}{n}\mathop \sum \limits_{{\begin{array}{*{20}c} {i = 1} \\ {i \ne j} \\ \end{array} }}^{n} O_{i} - O_{j} } \right) < v*\frac{L}{2};$$
$$\therefore MinS < v*\frac{L}{2}.$$

Apparently, after the adoption of GDS, the distance among initial cluster centers will be farther and the dispersion will be bigger, so GDS will achieve expected performance. The pseudocode of GDS is shown in algorithm 1.

figure a

3.2 Data partitioning

The partitioning-based clustering algorithm is used to partition data objects evenly, which will result in the poor performance of algorithm which uses the MapReduce to cluster massive data. Therefore, MR-PGDLSH uses DP-LSH and AGS to work on data partitioning. Here will show their principles and implementations.

3.2.1 DP-LSH

DP-LSH is derived from the principle of LSH, and it is used to map data objects with high association to same sub-data set to reduce communication overhead. The main stages of DP-LSH are bucket array building, similar buckets merging and data partitioning. These stages are introduced as follows:

  1. (1)

    Bucket array building Firstly, get \(signatures\) matrix by \(MinHash\), and divide it into \(b\) shares to build bucket array, where each sub-matrix is made up by \(r\) rows. Then judge whether two data objects \(O_{i}\) and \(O_{j}\) in data space are similar. The principle of judgment is: for \(O_{i}\) and \(O_{j}\) in data space, if one or more data segments is hashed to the same bucket, then they are similar. After getting similar data objects, put them in \(i{-}{\text{th}}\) bucket array if their \(i{-}{\text{th}}\) data segment is same. Repeat the judging and putting operations for \(b\) times, then all data objects will be mapped to \(b\) buckets of bucket array and \(\mathop \sum \limits_{i = 1}^{b} K_{i}\) new buckets will be generated. Next, assuming vector \(G\left( {O_{i} } \right)\) represents \(O_{i}\), where \(G\left( {O_{i} } \right) = \left( {v_{1} \left( {O_{i} } \right),v_{2} \left( {O_{i} } \right), \ldots ,v_{b} \left( {O_{i} } \right)} \right)\) and \(v_{n} \left( {O_{i} } \right)\) is the position of data object \(O_{i}\) in \(n{-}{\text{th}}\) bucket. Finally, use Eq. (2) to measure similar data objects mapped into same bucket.

  2. (2)

    Similar buckets merging In previous stage, vector \(G\) is used to represent data objects from different buckets. And these objects will be mapped into \(b\) buckets, which will cause too many repeated buckets to be generated. So, the LSH and \(jaccard\) distance 49 of linear hash mapping \(H\left( {O_{i} } \right)\) are used to merge these repeated buckets. The \(H\left( {O_{i} } \right)\) can be obtained by simple calculation. That is, \(H\left( {O_{i} } \right) = (a_{1} *v_{1} \left( {O_{i} } \right) + a_{2} *v_{2} \left( {O_{i} } \right) + \cdots + a_{b} *v_{b} \left( {O_{i} } \right))\bmod M\), where \(M\) is the number of data partitions, \(a_{i}\) is the value between \(0\) with \(M - 1\).

  3. (3)

    Data partitioning The results which are obtained in the above stages will be placed into MapReduce nodes in the form of data partitions. This is very meaningful for the reduction of communication overhead among in MapReduce nodes, because the data objects in the same bucket are mapped into one data partition.

Theorem 3.2

(Similarity measure formula) Supposing \(b_{i}\) represents the bucket array, \(O_{i}\) and \(O_{j}\) are data objects in a data bucket, \(P\left( {O_{i} ,O_{j} } \right)\) represents association among them. Therefore, the similarity can be calculated as follows:

$$s\left( {O_{i} ,b_{i} } \right) = \frac{{\mathop \sum \nolimits_{{O_{j} \in b_{i} }} P\left( {O_{i} ,O_{j} } \right)}}{{\left| {b_{i} } \right|\left( {\left| {b_{i} } \right| - 1} \right)/2}}$$
(2)

Proof

  1. (1)

    For any \(O_{i} ,b_{i}\),\(\mathop \sum \limits_{{O_{j} \in b_{i} }} P\left( {O_{i} ,O_{j} } \right) \ge 0\) and \(\left| {b_{i} } \right|\left( {\left| {b_{i} } \right| - 1} \right)/2 > 0\), thereby \(s\left( {O_{i} ,b_{i} } \right) \ge 0\), so non-negative is met.

  2. (2)

    For any \(O_{i} ,b_{i}\), \(s\left( {O_{i} ,b_{i} } \right) = s\left( {b_{i} ,O_{i} } \right)\), then symmetry is met.

  3. (3)

    For any \(O_{i} ,b_{i}\), \(s\left( {O_{i} ,z} \right) + s\left( {z,b_{i} } \right) \ge s\left( {O_{i} ,b_{i} } \right)\), triangle inequality is met.

Equation (2) meets all of the three characteristics, so it’s similarity measure formula.

Based on the DL-LSH, the specific code implementation is given in the algorithm 2.

figure b

3.2.2 AGS

In DP-LSH, data partitions have been obtained. However, the problem of data skewing might appear when the partitions are input to Map. AGS is aimed to solve the problem of skewing data and improve parallelization efficiency of MapReduce cluster, so it has been applied in PBGDLSH-MR. The principle of AGS is as follows:

Firstly, use a counting function \({\text{Count}}\left( \right)\) and a counting array \(\left\{ {{\text{count}}_{0} ,{\text{count}}_{1} , \ldots , {\text{count}}_{b - 1} } \right\}\) to record the data volume of \(b\) buckets while the bucket array is generated in DP-LSH. Then update the data volume of \(m\) buckets to \(\left\{ {{\text{count}}_{0} ,{\text{count}}_{1} , \ldots ,{\text{count}}_{m - 1} } \right\}\) and distribute the \(m\) buckets to \(r\) MapReduce nodes randomly. Finally, use Eq. (3) to evaluate whether the data volume in each node is relatively balanced [51]. The problem of data skewing will be solved and MapReduce cluster’s performance will be improved if relative balance is achieved.

Theorem 3.3

(Sum of squared errors formula) Let \(r\) be the number of MapReduce nodes, \(R_{i}\) be the data volume of \(i{-}{\text{th}}\) node, \({\text{EX}}\) be the average value of data of \(i{-}{\text{th}}\) data node. The sum \({\text{SSE}}\) of squared errors can be calculated as follows:

$${\text{SSE}} = \mathop \sum \limits_{i = 0}^{r - 1} \left| {R_{i} - {\text{EX}}} \right|^{2}$$
(3)

Proof

$$\because \;\mathop \sum \limits_{i = 0}^{r - 1} \left( {R_{i} - {\text{EX}}} \right)^{2} = \mathop \sum \limits_{i = 0}^{r - 1} \left[ {\left( {R_{i} - \mathop {R_{i} }\limits^{ \wedge } } \right) + \left( {\mathop {R_{i} }\limits^{ \wedge } - {\text{EX}}} \right)} \right]^{2}$$
$$\mathop \sum \limits_{i = 0}^{r - 1} \left( {R_{i} - {\text{EX}}} \right)^{2} = \mathop \sum \limits_{i = 0}^{r - 1} \left[ {\left( {R_{i} - \mathop {R_{i} }\limits^{ \wedge } } \right) + \left( {\mathop {R_{i} }\limits^{ \wedge } - {\text{EX}}} \right)} \right]^{2}$$
$$\therefore \mathop \sum \limits_{i = 0}^{r - 1} \left( {R_{i} - {\text{EX}}} \right)^{2} = \mathop \sum \limits_{i = 0}^{r - 1} \left( {\mathop {R_{i} }\limits^{ \wedge } - {\text{EX}}} \right)^{2} + 2\varepsilon_{i} \mathop \sum \limits_{i = 0}^{r - 1} \left( {\mathop {R_{i} }\limits^{ \wedge } - {\text{EX}}} \right) + \mathop \sum \limits_{i = 0}^{r - 1} \left( {R_{i} - \mathop {R_{i} }\limits^{ \wedge } } \right)$$
$$\because \;{\text{covariance of}}\; \varepsilon_{i} {\text{and}} \mathop {R_{i} }\limits^{ \wedge } is 0,\;{\text{which means}}\;\varepsilon_{i} \mathop \sum \limits_{i = 0}^{r - 1} \left( {\mathop {R_{i} }\limits^{ \wedge } - {\text{EX}}} \right) = 0.$$
$$\therefore \mathop \sum \limits_{i = 0}^{r - 1} \left( {R_{i} - {\text{EX}}} \right)^{2} = \mathop \sum \limits_{i = 0}^{r - 1} \left( {\mathop {R_{i} }\limits^{ \wedge } - {\text{EX}}} \right)^{2} + \mathop \sum \limits_{i = 0}^{r - 1} \left( {R_{i} - \mathop {R_{i} }\limits^{ \wedge } } \right)^{2}$$

AGS also has the code implementation as algorithm 3 shows:

figure c

3.3 Cluster in parallel

In DP-LSH and AGS, data partitions which will be inputted to Map function have been generated. Therefore, the MapReduce can be applied to cluster data in parallel. The clustering procedures can mainly be divided into the Mapper stage and the Reducer stage as follows:

  1. (1)

    The Mapper stage Initial cluster centers and data partitioning results will be inputted into Map function, which will calculate the distance between the initial centers and the data object by the Euclidean distance formula [52]. Then, these data objects are assigned to proper cluster, which gets local clustering results. Finally, the results will be partitioned and merged in Shuffle.

  2. (2)

    The Reducer Stage The results which are generated by Shuffle will be the input of Reduce function. Then Reduce function will be used to merge local clusters and get output. Then, the cost function \(E\) [53] will be used to judge whether the result generated by Reduce function is the global optimal solution. That is, while \(E_{\min }\) is met, the result is the optimal and will be outputted as final clustering result. Otherwise, it will start the new MapReduce task to iterate until \(E_{\min }\) is met.

The whole flow of the Mapper and Reducer stages is shown in Fig. 2.

Fig. 2
figure 2

Running flow of the parallel clustering

figure d

3.4 Algorithm complexity

3.4.1 Time complexity

For MR-PGDLSH, the time complexity is mainly composed by above three stages: initial cluster centers generation, data partitioning, and parallel clustering. The time complexity of these steps is as follows:

  1. (1)

    Initial cluster center generation stage Assuming n represents the number of data points. These points are iterated when obtaining the initial cluster center, so the time complexity of obtaining the initial cluster center by using the GDS is \(O\left( {n^{2} } \right)\).

  2. (2)

    Data partitioning stage Supposing \(M\) is the number of data partitions. Due to the hash function needs to map similar objects into the bucket for data partitioning, the time complexity of constructing data partitions by using DP-LSH is \({ }O\left( {M^{2} } \right)\). The GDS is used to balance the data of each node to complete a reasonable data division. Suppose there are \(r\) calculate nodes which perform MapReduce task, so the time complexity is \(O\left( {\left( {n + M} \right)/r} \right)\). Then, the total time complexity of data partitioning is \(O\left( {M^{2} + \left( {n + M} \right)/r} \right)\).

  3. (3)

    Parallel clustering stage Multiple MapReduce tasks need to be performed and each data cluster need to be updated in parallel, so the time complexity of generating \(k\) clusters by parallel clustering is \(O\left( {r\log n} \right)\).

Therefore, for the MR-PGDLSH algorithm, the total time complexity is \(O\left( {n^{2} + M^{2} + \left( {n + M} \right)/r + r\log n} \right)\). And because \(M < n\) &\(r < < n\), the PBGDLSH-MR’s time complexity is approximately \(O\left( {n^{2} } \right)\).

3.4.2 Space complexity

For MR-PGDLSH, the space complexity is mainly made up by two parts: the memory usage of initial cluster centers and the memory usage of load balance. The space complexity of these two parts is analyzed as follows:

  1. (1)

    The GDS strategy is applied to obtain \(k\) cluster centers, and there is a data space with \(n\) points. Therefore, the space complexity of this part is \(O\left( {k\log n} \right)\).

  2. (2)

    In the part of balancing the load, the algorithm need run iteratively for \(e\) times. Suppose there are \(r\) MapReduce nodes. Therefore, the total space complexity of load balance is \(O\left( {\mathop \sum \limits_{i = 1}^{e} r^{2} + \mathop \sum \limits_{i = 1}^{k} \left( {r + n} \right)} \right)\).

So, the total space complexity of MR-PGDLSH is \(O\left( {k\log n + \mathop \sum \limits_{i = 1}^{e} r^{2} + \mathop \sum \limits_{i = 1}^{k} \left( {r + n} \right)} \right)\).

4 Evaluation

4.1 Experiment settings

In this paper, the experiments were accomplished under the Hadoop cluster with a Master node and three Slaver nodes environment. Each node is connected with others by the 1 Gb/s network, and these four nodes have the same configurations: 1 TB hard disk, 16 GB memory, Intel core i7-9750H processor, CentOS 6.0 operating system, Hadoop 2.7.7, and JDK1.8.0. Table 1 shows some detailed configurations of these nodes.

Table 1 The detailed configurations of each node

4.2 Experiment settings

To verify the rationality of the MR-PGDLSH algorithm for data sets with different characteristics, the paper selected the following four data:

In the experiment, data sets (\(Thyroid\), \(Dolbanb\), \(Knad\) and \(Balancescale\)) are from real data source of UCI [54]:

  • \(Thyroid\) from Garavan institute was given by Ross Quinlan which consists of 1,503,530 records, and each record includes 5 properties and 3 labels. The data set has 2800 training records and 972 test records, and it has the characteristics of large amount of data and few attributes.

  • \(Dolbanb\), with the characteristics of large amount of data and moderate attributes, is the real traffic data which is gathered from 9 commercial loT devices. It consists of 7,062,606 records, and each record contains 35 properties and 5 labels.

  • \(Knad\), with the characteristics of large amount of data and moderate attributes, contains nine different network attacks. It consists of 27,170,754 records, and each record contains 45 properties and 8 labels.

  • \(Balancescale\) was generated from model psychological experiments reported by Siegler which consists of 49,288,288 records, and each record contains 4 properties and 3 labels. The data set is characterized by large amount of data and few attributes.

The specific information of the data set is shown in Table 2:

Table 2 Experiment data sets

4.3 Performance evaluation

4.3.1 Comparison of speedup

The speedup is an important indicator for the performance evaluation of parallelized algorithms. Because it represents the ability of running clustering tasks in parallel of clustering algorithm. Generally, the speedup can be calculated as follows:

$$S_{{\text{p}}} = T_{s} /T_{{\text{p}}}$$
(4)

where \(T_{{\text{s}}}\) represents the running time of the algorithm in a single node, \(T_{{\text{p}}}\) represents the running time of parallel computing, and \(S_{{\text{p}}}\) is the value of speedup.

In Eq. (4), if the value of the \(S_{{\text{p}}}\) is the larger, it means that the time will be less in parallel calculation. This makes sense, the efficiency of the MapReduce cluster will be the higher. Figure 3 compares the speedup ratios of MR-PGDLSH with three algorithms, namely NGKCA, the MRGAK-Medoids and the MR-KMS, using data sets of different scales.

Fig. 3
figure 3

Speedup of four algorithms on four data sets

In Fig. 3, it appears that as the number of nodes increases, the speedup ratio of each algorithm follows an increasing trend. When the number of nodes is small, the speedup ratio of the MR-PGDLSH algorithm is not significantly different from the other three algorithms, or even smaller. This is due to the fact that the MR-PGDLSH algorithm generates time overhead in cluster operation, task scheduling, node storage, and other aspects, which reduces the speed of calculation of the algorithm. However, when the number of nodes exceeds 4, the speedup ratio of each algorithm on the \(Knad\) data set rises sharply, which reflects well the feasibility of each algorithm on a large data set. When the number of nodes reaches 6, the speedup ratio of the MR-PGDLSH reaches 7.5 on the \(Balancescale\) data set, which is higher than the NGKCA, MRGAK-Medoids and MR-KMS algorithms by 2.3, 3.1, and 2.1, respectively. In addition, focusing on the comparison of the different data sets, note that the speedup of the four algorithms on the \(Thyroid\) data set is lower than that on other three data sets. As the number of nodes increases to 6, the speedup ratio of the four algorithms on the \(Thyroid\) data set is 2.4, 2.9, 5.1, and 6.5, respectively, while the speedup ratio of the four algorithms is 6, 5.3, 4.8, 5.7, 7.2 on the \({\text{Knad}}\) data set. Especially in the MR-PGDLSH algorithm, its value exceeds 7, which is the highest among the four algorithms. This result further suggests that distributing the data to each node will result in the increment of the time when processing small data sets. Furthermore, the speedup also will fall into the bottleneck. However, when the MR-PGDLSH algorithm processes the massive data set, such as \(Balancescale\), the MR-PGDLSH algorithm maps the relevant data objects to the same sub-data set so that the data partitions on the Map are obtained. The similarity measurement function is designed to process the data partitions, which reduces the communication overhead among nodes greatly. Therefore, the MR-PGDLSH algorithm is suitable for processing large-scale data sets, and as the increasement of number of nodes, the efficiency of parallelization in algorithm will be higher.

4.3.2 Normalized mutual information

NMI (normalized mutual information) is a useful measure of information in information theory. Generally, the NMI can be defined based on Eqs. 5 and 6.

$${\text{MI}}(X,Y) = \sum\limits_{i = 1}^{|X|} {\sum\limits_{j = 1}^{|Y|} {P(i,j)\log \left( {\frac{P(i,j)}{{P(i)P(j)}}} \right)} }$$
(5)
$${\text{NMI}}(X,Y) = \frac{2MI(X,Y)}{{H(X) + H(Y)}}$$
(6)

where \(p\left( i \right)\) is the probability distribution function of \(i\), \(p\left( j \right)\) is the probability distribution function of \(j\), \(H\left( X \right)\) and \(H\left( Y \right)\) are the entropy of X and Y, respectively. NMI is the value of the accuracy.

Apparently, the performance of clustering algorithm is better when the \({\text{NMI}}\) is larger. Based on \(Thyroid\),\(Dolband\), Knad, and \(Balancdscale\), the MR-PGDLSH is compared the NMI with the NGKCA, the MRGAK-Medoids and the MR-KMS. The final comparison results are shown in Fig. 4.

Fig. 4
figure 4

NMI of four algorithms on four data sets

As shown in Fig. 4, compared with the other three algorithms, the MR-PGDLSH algorithm obtained the largest NMI when processing the four data sets. This means the MR-PGDLSH algorithm had better clustering performance. The NMI of the MR-PGDLSH algorithm is higher than the NGKCA, MRGAK-Medoids and MR-KMS algorithms by 0.01, 0.04, and 0.05, respectively, when processing the \({\text{Thyroid}}\) dataset. Interestingly, as the volume of data increases, the advantage of the MR-PGDLSH algorithm is more obvious. Note that the NMI of the MR-PGDLSH algorithm has exceeded 0.95 when processing the latter two datasets. However, the NMI of the other three algorithms has even reduced. The result is due to the fact that the GDS strategy is first proposed to calculate the side length of each dimension grid and the density of each data point. Then, in order to screen the data points, the grid density threshold is calculated according to the minimum average distance among the points. Simultaneously, for the sake of avoiding the impact of initial center sensitivity, the average value of each data segment is calculated and set as initial center point. In other words, the MR-PGDLSH has solved random selection of initial cluster centers by applying GDS. Therefore, the MR-PGDLSH algorithm can improve clustering performance significantly in a big data environment.

4.3.3 Comparison of runtime

It is necessary to record and compare the runtime of the MR-PGDLSH, NGKCA, MRGAK-Medoids and MR-KMS to evaluate performance of algorithm. Base on running each of the four algorithms for ten times in \(Thyroid\),\(Dolband\), Knad, and \(Balancdscale\), the average runtime has been obtained and shown in Fig. 5.

Fig. 5
figure 5

The runtime of four algorithms in each data set

As shown in Fig. 5, the MR-PGDLSH algorithm always spends the less time when processing each data set. The detailed results indicate that the running time of the MR-PGDLSH algorithm is 37.41% of the NGKCA, 43.25% of the MRGAK-Medoids, and 52.3% the MR-KMS algorithm when processing the \(Thyroid\) dataset. As the amount of data increases, the running time of MR-PGDLSH grows gently, while the other three algorithms increase geometrically. Especially when processing the massive dataset, such as \(Balancescale\), the running time of the MR-PGDLSH algorithm is 25.67% of the NGKCA, 33.25% of the MRGAK-Medoids, and 42.3% the MR-KMS algorithm, respectively. The chief reason is that the similar data objects are mapped into the same bucket by applying the DP-LSH when the MR-PGDLSH algorithm is calculating data partition. Simultaneously, the similarity measurement formula is designed to evaluate the results of data partition, which reduces frequent communication overhead among MapReduce nodes. The buckets are divided by the AGS to improve the utilization of nodes and solve the data skewing, thereby further reducing the runtime of the MR-PGDLSH algorithm. In a word, due to the above measures, the MR-PGDLSH improves the efficiency of parallelization greatly.

4.3.4 Comparison of memory usage

In the comparing the memory usage, according to the existing hardware configuration, we use four computational nodes to perform the parallel computational task. In \(Thyroid\), \(Dolbanb\), \(Knad\) and \(Balancescale\) datasets, the average value of memory usage is obtained by running each of the MR-PGDLSH, the NGKCA, MRGAK-Medoids, and MR-KMS algorithms ten times. Figure 6 shows the comparison of memory usage of the three algorithms in different datasets.

Fig. 6
figure 6

The memory usage of four algorithms

As shown in Fig. 6, the MR-PGDLSH has the less memory usage when processing different datasets. As the scale of data increases, the advantage of the less memory usage has also continuously expanded. Compared with MRGAK-Medoids, NGKCA, and MR-KMS, the memory usage of MR-PGDLSH is 52.31% of NGKCA, 49.13% of MRGAK-Medoids and 42.36% MR-KMS, respectively, when processing the \(Thyroid\). Note that the memory usage of the MR-PGDLSH increases slowly, while the memory usage of the remaining algorithms increases sharply. Compared with processing the Knad, the memory usage of MR-PGDLSH, NGKCA, MRGAK-Medoids and MR-KMS increases by 18.31%, 39.13%, and 32.36%, respectively, when processing the dataset. This is mainly caused by two reasons. On the one hand, the MR-PGDLSH has obtained better cluster centers by applying the GDS, thereby avoiding the high memory usage which is caused by too many unnecessary iterative computations. On the other hand, the AGS is used to perform the load balance of MapReduce nodes, which balances the number of calculating tasks in each node relatively. This can not only speedup the running process of algorithm, but also reduce the final memory usage.

5 Conclusion

To combat the shortcomings of selecting cluster centers randomly, expensive communication overhead and data skewing in data partitions of K-means clustering, this paper proposes the parallel clustering algorithm MR-PGDLSH. The algorithm can be applied in many fields. For example, in business, it helps market analysts find different customer groups from the basic customer database and uses purchase patterns to characterize different customer groups; biologically, it is used to derive plant and Classification of animals, classify genes, and gain an understanding of the inherent structure of the population. Moreover, the MR-PGDLSH has many advantages over other algorithms and solves the limitations. Firstly, cluster centers are obtained by applying the GDS, thereby effectively avoiding the problem of randomly selecting the initial cluster centers. Secondly, the DP-LSH is proposed to map similar data objects to the same bucket to get the data partition for Map, thereby reducing the frequent communication overhead between MapReduce nodes. Thirdly, the AGS is designed to avoid data skewing in partitions. Finally, the cluster centers will be mined in parallel under the MapReduce framework to generate the final clustering results. Besides these, we also conduct experiments to verify the MR-PGDLSH has better performance. In the experiments, four data sets (,, and) are used. And result of the MR-PGDLSH is compared with NGKCA and MRGAK-Medoids, respectively. The final experimental results reflect that the MR-PGDLSH has the best clustering performance when processing large-scale data set, and its parallelism efficiency has also been greatly improved. Apparently, both theorical analysis and experimental results have proved the MR-PGDLSH is superior.

The work has the limitation of large data dispersion coefficient when dividing the data and has a local optimal problem in the initialization of the centroid. Therefore, in future, we planned to use the Pearson correlation coefficient to replace the similarity function for data division, which can better divide the data set. Using the Gaussian Kernel function obtains the number of centroids and using optimized swarm intelligence algorithm achieves global centroid initialization, which can be better to improve clustering accuracy.