Introduction

Similarity search is the principle operation not only in databases but also in interdisciplinary fields of study such as machine learning, recommendation systems, biology, or data analytics. The main goal of similarity search is to find the object similar to the given pivot, know as the query. The similarity computing, unfortunately, suffers high overheads due to distance calculation pair by pair together with similarity metrics [18]. It is more inevitable especially when data grow bigger and have a higher dimensionality. As a result, the challenge is how to speed up the similarity searching process to enjoy its benefits.

There are several approaches to achieve the goal, for example, by improving indexing [3, 15], hashing [13], filtering [16], or processing in parallel [1]. Among them, the parallel processing approach in distributed environment calls much attention to researchers worldwide [6, 9, 10, 14]. This trend keeps motivating both academia and industry due to the large amount of data. Other approaches may fail to assure scalability when processing massive datasets. As a result, we catch the trend by employing MapReduce, a large-scale processing paradigm [4], when enhancing the performance of similarity search.

Even though MapReduce helps us process an enormous amount of data, it would suffer heavy overheads when processing big unnecessary or irrelevant objects. The scenario becomes even worse when those big objects are involved in the similarity search. In other words, those redundant objects are combined with every other object to evaluate their similarity pair by pair. Moreover, the MapReduce-based process is strictly bound by I/O costs, so processing irrelevant or unnecessary data leads to extra penalty.

Meanwhile, those recent literatures only deal with a single similarity query. Consequently, when given a query batch, each query is processed one by one, which slows down the whole performance of batch processing. In fact, we observe that queries in the batch may share their search space. As a consequence, it would be better to search the same shared search space for all queries in the batch rather than looking for the search space for one query and then redo it several times for other queries.

In our work, we, therefore, take batch processing into account rather than single query processing. Furthermore, we propose our strategies to make our MapReduce-based solution more effective. Specifically, our main contributions are as follows:

  • We present a query batch processing scheme that not only handles sets of query but also does similarity search in an incremental way.

  • We introduce a simple but efficient method to support quick pruning in similarity search by sorted inverted indexes.

  • We propose an indexing scheme with metadata so that we can diminish duplicate data and then build lightweight indexes.

  • We perform our empirical experiments on real datasets. The results verify the efficiency of our proposed solution when it does similarity search with query batches.

It is worth noting that this paper is the extended version of our work [8]. The new content is added as follows:

The rest of paper is organized as follows. Sect. “Related Work” presents our related work. Additionally, Sect. “Preliminaries” introduces our background related to the similarity search and MapReduce paradigm. In Sect. “Our Proposed Solution”, we propose our solution for similarity search in general and that with query batches in particular. After that, we conduct our experiments in Sect. “Empirical Experiments and Evaluations” before making our remarks in Sect. “Conclusion and Future Work”.

Related Work

Metwally and Faloutsos introduce a method for all-pair similarity joins of multisets and vectors [6]. Their method is composed of two main phases, each one comprised of two MapReduce jobs. While a MapReduce job is costly, the greater the usage of MapReduce jobs is, the higher the costs will be. Additionally, their method does not consider duplicate data items during the execution of MapReduce job, which usually adds extra penalty. Besides, Tang et al. presented their way, the so-called HA index, to speed up Hamming-based distance computing for range queries [14]. In addition, redundancy during the computing is also eliminated. Their whole process consumes three phases with two MapReduce jobs. Nevertheless, the costs for data pre-processing and post-processing are excluded from the MapReduce jobs.

Gao et al. bring up efficient and scalable metric similarity joins with MapReduce [5]. They focus on the load balancing and how to avoid unnecessary object pairs with their filtering methods, including the range-object filtering, the double-pivot filtering, the pivot filtering, and the plane sweeping techniques, so that they can achieve better query performance. In the meantime, Phan et al. proposed an efficient hybrid similarity search with MapReduce [10]. Their basic idea is to first cluster similar objects and second define upper and lower boundaries to shrink the search space before looking at similar pairs. The method is then to deploy in a hybrid MapReduce-based architecture that deals with challenges from big data. In addition, their empirical studies show that their method is efficient in terms of data processing and storage. Though their method works well with batch processing, each query is sequentially processed in a batch with regard to their search scheme, while we have indexing strategy for query batches supporting quick similarity search.

Nguyen et al. built the VP tree algorithm on top of the MapReduce framework to achieve good performance, scalability, and fault tolerance for similarity search over the large datasets in the distributed environment [7]. Moreover, their method can reduce the number of data that need to scanned during the search phase. In contrast, our approach is towards MapReduce-based scheme-driven algorithms that are independent of the underlying MapReduce framework. By doing this, we are able to gain two main advantages as follows: (1) no internal or additional changes from the framework; and (2) mutual support from both the top algorithms and the underlying framework.

Preliminaries

Similarity Search

A corpus, denoted as \(\varOmega \), consists of a set of document objects \(\hbox {D}_p\), which is formally represented as \(\varOmega = \{{D}_1, {D}_2, {D}_3, \ldots , {D}_n\}\). In addition, each document object \(\hbox {D}_p\) is composed of a set of words, which is shown as \({D}_p = \{{word}_1, {word}_2, {word}_3, \ldots , {word}_w\}\). Similarity search is the operation that retrieves all objects in the corpus \(\varOmega \) so that these objects satisfy required constraints. The constraints may be different from query to query. There are various similarity queries with different constraints. In this section, we present some fundamental similarity queries as follows:

  • Pairwise similarity query. For each document object in the corpus \(\varOmega \), the similarity search computes how much similarity is present between one document object and every (n – 1) other object in the corpus.

  • Query-by-example or pivot query. The similarity search looks for similar objects in the corpus \(\varOmega \) when given a query object called a pivot. When given a pivot \(\hbox {Q}_j\), the similarity search computes how much similar the pair (\(\hbox {Q}_j\), \(\hbox {D}_p\)) is for every document object \(\hbox {D}_p\) in the corpus.

  • Range query. When given a pre-defined range threshold \(\epsilon \), or sometimes it is known as a similarity threshold, the similarity search retrieves all objects in the corpus \(\varOmega \), whose similarity scores are greater or equal to the threshold \(\epsilon \).

  • K-nearest neighbor query. When given a pre-defined k parameter, the similarity search retrieves all objects in the corpus \(\varOmega \) such as they are the top-k similar objects. In other words, they are k objects that are the most similar to a query object.

To evaluate how similar a pair is, a similarity measure such as Euclidean distance, Cosine similarity, Hamming distance, or Jaccard coefficient is used to quantify their similarity [18]. The similarity score is usually standardized into the interval [0, 1] in that the pair is more similar when its similarity score is close to 1, while it is less similar when its similarity score is near 0. Moreover, a similarity threshold, like 80% similarity may be provided to filter those pairs whose similarity scores are greater or equal to 0.8.

In the meantime, we observe that modeling the content of a document as a set of words does not reflect much how really similar a pair is, because the two same words in different objects do not bring the same meaning. To better improve a part of semantic similarity, a concept of K-shingles [11], known as any sub-string having the length K found in the document, is used instead. As a consequence, each document object \(\hbox {D}_p\) is composed of a set of K-shingles, which is shown as \({D}_p = \{{S}_1, {S}_2, {S}_3, \ldots , {S}_z\}\).

Furthermore, to speed up the process of similarity search, different types of indexing are employed. One of the most well-known indexing supporting similarity search is the inverted index, a popular data structure used in information retrieval systems [2, 10]. In our work, we build another version of inverted index known as the sorted inverted index so that we can skip unnecessary computing for those elements not in either document or query objects when searching candidate pairs, which is discussed later on in Sect. “MapReduce-based Similarity Search”.

In general, a similarity search process consists of two main phases as follows:

  1. 1.

    Candidate generation phase. This is the phase where two objects are identified as a candidate pair.

  2. 2.

    Candidate verification phase. This is the phase where the pair is evaluated for its similarity score.

MapReduce

MapReduce is a parallel paradigm for large-scale processing [4]. The philosophy behind is to apply “divide-and-conquer” strategy to data. A large data is split into different smaller data chunks, which are then processed at various machines. The intermediate results generated by each machine are aggregated into the final result. To implement this strategy, a MapReduce job is composed of a Map task and Reduce task in that the former is specified by a Map function, while the latter is determined by a Reduce function. When a MapReduce job is executed on a cluster of commodity machines, those machines assigned Map tasks called mappers, whereas those assigned Reduce tasks are known as reducers. A Map task emits intermediate key-value pairs, while a Reduce task writes the final key-value pairs into the distributed file system. Moreover, there is a shuffle phase between the Map task and the Reduce task, which re-distributes data based on the output keys by the mappers.

Suppose that there are M mappers and R reducers, a single MapReduce job is described as follows:

  1. 1.

    Input data are loaded into the distributed file system and then divided into partitions based on their data size.

  2. 2.

    Mappers read their data partitions, perform the Map function, and emit intermediate results in the form of key-value pair \([k_i, v_j]\). These intermediate key-value pairs are locally stored at mappers.

  3. 3.

    The shuffle process aggregates the intermediate key-value pairs \([k_i, [v_j]]\) into R data partitions, which is based on their key values.

  4. 4.

    Reducers retrieve the intermediate key-value pairs \([k_i, [v_j]]\) from R data partitions and perform the Reduce function. The final output is written back to the distributed file system.

One of the most well-known open-source framework that implements the idea of MapReduce is Hadoop,Footnote 1 which is designed to perform scale-up computing with massive datasets. In our work, we employ Hadoop to deploy our MapReduce-based algorithm for efficient batch similarity processing.

Our Proposed Solution

Query Batch Processing Scheme

In this paper, we introduce our general similarity search scheme as illustrated in Fig. 1. Either data or query objects are indexed into either data or query pools, respectively. Besides, we employ inverted index as an index data structure. In addition, the indexes are organized in an ordered way to serve our quick pruning strategy later on. In general, our basic idea is to separate the data preparation phase from the similarity search phase. By doing it this way, we can perform incremental computing when data or query objects change. In other words, we do not need to pre-process either existing or unchanged data and query objects, but with those which are new or have been changed. To index data objects, we use one MapReduce job. For instance, a set of data object \(\{\hbox {D}_p\}\) is indexed into the data pool in the form of sorted inverted index (SII), known as SII(\(\hbox {D}_p\)). Similarly, a set of query object \(\{\hbox {Q}_j\}\) is indexed into the query pool in the form of sorted inverted index, known as SII(\(\hbox {Q}_j\)). After the indexing phase, both data and queries are ready for similarity search. Later on, another MapReduce job computes the similarity (SIM) among queries against data and produces the final result in the form SIM(\(\hbox {Q}_j\), \(\hbox {D}_p\)). In our work, we employ Jaccard coefficient, a well-known metric for fast set-based similarity [6, 10, 12, 17], to derive the similarity score of a pair as described in the Eq. 1 below.

$$\begin{aligned} SIM(Q_j, D_p) = \frac{\parallel Q_j \cap D_p \parallel }{\parallel Q_j \cup D_p \parallel } \end{aligned}$$
(1)

In that, \({\parallel Q_j \cap D_p \parallel }\) is the intersection cardinality between \(\hbox {Q}_j\) and \(\hbox {D}_p\), while \(\parallel Q_j \cup D_p \parallel \) is the union cardinality between \(\hbox {Q}_j\) and \(\hbox {D}_p\).

With the proposed scheme, we can perform similarity search in an incremental manner. The reason is that both data and query objects are available in the data and query pools in the form of SII. Hence, if there are new data or query objects, the pools will include them in the form of SII. Other existing data and query objects remain unchanged, because they have already been in the data and query pools, respectively. Then, a MapReduce job for similarity search can be configured to compute similarity scores from a set of SII in the pools as required. Furthermore, the general scheme is applied not only to single query processing but also to query batches. Figure 2 shows three cases for incremental computing with our proposed scheme as follows.

  • Case 1. New data objects appear. Assume that there is a set of new data objects \(\{D_\mathrm{new}\}\). A MapReduce job processes it to produce SII(\(D_\mathrm{new}\)) in the data pool. It is worth noting that SII(\(D_\mathrm{new}\)) is now ready in the data pool and is independent of other forms of SII from other existing data objects (i.e., SII(\(D_a\)) ... SII(\(D_p\)) ... SII(\(D_z\))). Consequently, when doing similarity search according to a particular query set (e.g., SII(\(Q_i\)) ... SII(\(Q_j\)) ... SII(\(Q_m\))), another MapReduce job takes those relevant inputs from the data pool, which already includes new data objects.

  • Case 2. New query objects appear. Assume that there is a set of new query objects \(\{Q_\mathrm{new}\}\). A MapReduce job processes it to produce SII(\(Q_\mathrm{new}\)). It is worth noting that SII(\(Q_\mathrm{new}\)) is now available in the query pool and is independent of other forms of SII from other existing query objects (i.e., SII(\(Q_i\)) ... SII(\(Q_j\)) ... SII(\(Q_m\))). As a consequence, when doing similarity search according to a particular dataset (e.g., SII(\(D_a\)) ... SII(\(D_p\)) ... SII(\(D_z\))), another MapReduce job takes those relevant queries from the query pool, which already includes new query objects.

  • Case 3. Both new data and query objects appear. This is the combination of Case 1 and Case 2 above. In other words, both new data and query objects emerge together at the phase of doing a similarity search. Like the other two cases, if we do not take new data and query objects into account, the similarity search would produce its out-of-date result. Therefore, new data and query objects had better be processed to be available in the data and query pools, respectively. When conducting a similarity search, another MapReduce job pulls those related data and query input to compute up-to-date similarity scores.

Fig. 1
figure 1

Query batch processing scheme [8]

Fig. 2
figure 2

Incremental processing scheme

MapReduce-Based Similarity Search

Our MapReduce-based similarity search following the above scheme consists of two main phases: (1) indexing; and (2) searching. In the former phase, we will index data and query objects into the pool in the form of SII while doing the similarity search with Jaccard measure in the latter phase. Last but not least, we model our documents as bags of 4-shingles rather than sets of words [10, 11].

As illustrated in Fig. 3, at Phase 1: indexing, Map-1 takes original objects as its input. It is worth noting that objects mentioned here include data and query objects. Map-1 then processes the input and emits intermediate key-value pairs of the form [Element, URL] in that Element is a shingle of an object, while URL is the uniform resource locator of that object in the distributed environment. Here we do not use object identification as we want to clearly know where the object is in the distributed system so that we can retrieve that object after the similarity search. Next, Reduce-1 aggregates those intermediate key-value pairs emitted by Map-1 with regard to their key values. Moreover, Reduce-1 sorts the list of key-value pairs in the form of [Element, \([URL]]_{ord}\). At Phase 2: Searching, Map-2 starts considering candidate pairs for their similarity from those object already indexed at Phase 1. If the query object and the data object share the same element, Map-2 generates the candidate pair of the form \([URL_D-URL_Q, ||D \cup Q||]\). After that, Reduce-2 aggregates the same candidate pairs, computes their similarity scores, and outputs the final result of the form \([URL_D-URL_Q, SIM(D, Q)]\).

Fig. 3
figure 3

MapReduce flow chart

Fig. 4
figure 4

Map-1 algorithm

Fig. 5
figure 5

Reduce-1 algorithm

Fig. 6
figure 6

Map-2 algorithm

Fig. 7
figure 7

Reduce-2 algorithm

To better understand our proposed method, we present here our algorithms for each MapReduce job. Figure 4 illustrates Map-1 algorithm, which gets the input of documents \(\hbox {D}_i\) and then produces the intermediate key-value pairs of the form \([SH_k, URL_i@NOS_i]\). First, necessary variables are initialized as in steps 1–3. Next, we generate shingles from the input documents \(\hbox {D}_i\) as in step 4. For each shingle, we insert it into a \(\hbox {shingleList}_i\), regarding the document \(\hbox {D}_i\), as in steps 5–7. Later, we filter duplicate shingles as in step 8, because duplicates do not contribute to the overall similarity scores, which is based on the Jaccard measure. Besides, we get the number of shingles as in step 9 and get the \(\hbox {URL}_i\) of the corresponding document \(\hbox {D}_i\) as in step 10. Finally, we let mappers emit the intermediate key-value pairs as in steps 11–12.

Figure 5 illustrates Reduce-1 algorithm, which gets the input from Map-1 of the form \([SH_k, URL_i@NOS_i]\) and then builds a sorted inverted index, which has the form as \([SH_k, [URL_i@NOS_i]]_{ord}\). First, reducers read input data as in step 1, while necessary variables are initialized as in steps 2–5. Next, steps 6–17 aggregate those documents that share the same shingle, and we keep their tracks using a two-dimensional matrix. Then, we sort the matrix to create a sorted list of key values (i.e., shingle values) as in step 18. Finally, we let reducers emit the key-value pairs as in steps 19–20.

Figure 6 illustrates Map-2 algorithm, which gets the input including a set of sorted inverted index SII(\(\hbox {D}_i\)) of document objects and a set of sorted inverted index SII(\(\hbox {Q}_j\)) of query objects and then produces the candidate pairs of the form \([URL_D-URL_Q, NOS_{D \cup Q}]\). First, mappers read data and query inputs as in steps 1–2. Besides, we also get the query number information from the query set as in step 3. For each document and query, we examine whether there is any intersection between \(\hbox {D}_i\) and \(\hbox {Q}_j\) as in steps 4–11. If yes, we get necessary information such as \(\hbox {URL}_i\) in step 8, \(\hbox {URL}_j\) in step 9, the total number of shingles between \(\hbox {D}_i\) and \(\hbox {Q}_j\) as in step 10, and let the mappers emit the candidate pairs as in step 11.

Figure 7 illustrates Reduce-2 algorithm, which gets the input from Map-2 of the form of candidate pairs \([URL_D-URL_Q, NOS_{D \cup Q}]\) and then produces similar pairs of the form \([URL_D-URL_Q, SIM(D, Q)]\). First, reducers read input data as in step 1, while necessary variables are initialized as in steps 2–5. Next, steps 6–17 aggregate the same candidate pairs, count the number of shingles shared by the two pair, and compute the similarity score between each pair.

Furthermore, Fig. 8 shows an example of data indexing by a MapReduce job. Assume that we have three data documents \(\hbox {D}_p = [D_1, D_2, D_3]\) with their corresponding shingle-based contents. The Map task is to emit intermediate key-value pairs in the form of \([SH_p, URL_p]\) in that \(\hbox {SH}_p\) is a shingle of a document \(\hbox {D}_p\) and \(\hbox {URL}_p\) is the path location of \(\hbox {D}_p\) in the distributed environment. It is worth noting that duplicate shingles from the same document are discarded, because they do not contribute to the similarity scores with Jaccard measure. We, thus, filter them at this Map task to avoid additional overheads after that. The Reduce task then produces SII(\(\hbox {D}_p\)) in the form of \([SH_u, [URL_v@S_v]]\). It is worth noting that we need to keep the size S of those documents so that we can derive their similarity scores later on. Likewise, Fig. 9 implies an example of query indexing by a MapReduce job. Assume that we have three query documents \(\hbox {Q}_j = [Q_1, Q_2, Q_3]\) with their corresponding shingle-based contents. The Map task is to emit intermediate key-value pairs in the form of \([SH_j, URL_j]\), whereas the Reduce task produces SII(\(\hbox {Q}_j\)) in the form of \([SH_u, [URL_v@S_v]]\).

The similarity search phase is done by one MapReduce job. Figure 10 illustrates the Map task with SII(\(\hbox {D}_p\)) and SII(\(\hbox {Q}_j\)). It compares key by key and emits the pair whenever they share the same shingles in the form of \([URL_j-URLp, (S_j + S_p)]\). To sooner discard unnecessary pairs, we apply our quick pruning strategy in the comparison to speed up the searching process. Due to the fact that we already organize SII(\(\hbox {D}_p\)) and SII(\(\hbox {Q}_j\)) in an ordered way, we can achieve the two following advantages for our quick pruning during the comparison:

  1. 1.

    We can stop the comparison halfway whenever \(\hbox {SH}_p > \hbox {SH}_j\).

  2. 2.

    We can discard those shingles from the comparing set SII(\(\hbox {D}_p\)) whenever \(\hbox {SH}_p < \hbox {SH}_j\). By doing it this way, we can reduce the size of the comparing set SII(\(\hbox {D}_p\)) during the comparison.

Finally, the Reduce task aggregates the pairs emitted by the Map task and computes their similarity scores. As shown in Fig. 11, we will have eight pairs with their corresponding similarity scores.

Fig. 8
figure 8

Example of data indexing [8]

Fig. 9
figure 9

Example of query indexing [8]

Fig. 10
figure 10

Example of Map task [8]

Fig. 11
figure 11

Example of Reduce task [8]

Lightweight Indexing

By observing, we see that there are duplicate values in the indexes. Whenever a pair shares the same shingle, the value of the form \(URL_v@S_v\) emerges. In the distributed environment, a URL may be long due to its location path. Therefore, those repeated values make the size of indexes bigger. As a result, they add to the heavy cost of MapReduce-based processing, since it is strictly bound by I/O cost. To minimize the redundancy and speed up the MapReduce-based searching process, we propose a metadata-based approach as follows.

  1. 1.

    We build a list L of document URLs as metadata in the form of \(\{URL_v@S_v\}\). The metadata is put as header of each file produced by Reduce task.

  2. 2.

    For each value in the inverted index, we replace it with the index of L with regard to the corresponding document URL.

Fig. 12
figure 12

Reduce-1 algorithm with metadata

Fig. 13
figure 13

Map-2 algorithm with metadata

To implement our idea, we change the behaviors of Reduce-1 and Map-2 in that Reduce-1 builds lightweight indexes with metadata, while Map-2 processes the similarity search with those metadata generated by Reduce-1. Figure 12 illustrates Reduce-1 algorithm with metadata, which gets the input from Map-1 of the form \([SH_k, URL_i@NOS_i]\) and then builds a sorted inverted index as \([SH_k, [URL_i@NOS_i]]_{ord}\) with embedded metadata. First, reducers read input data as in step 1, while necessary variables are initialized as in steps 2–8. More specifically, the variable metaDataList, as in step 6, keeps the list of URLs, the variable m_index, as in step 7, keeps the last index of the metaDataList, while the variable current_m_index, as in step 8, keeps the current index of a document \(D_i\). Steps 9–32 almost show the same way as in the original Reduce-1, which means those documents that share the same shingle are aggregated, and their tracks are kept using a two-dimensional matrix. However, the difference is that we check whether the examining document object has already been in the metaDataList or not as in step 11. If not, we generate the next index of the metaDataList and combine it with that object as in steps 12–15. Otherwise, we combine that object with its current index in the metaDataList as in steps 16–17. Moreover, we let reducers output metadata in metaDataList before the sorted inverted index list, as in steps 34–35.

Due to changes from data structure after Reduce-1, we also need to modify the Map-2 algorithm to match the new way of data processing. Figure 13 illustrates Map-2 algorithm with metadata, which still gets the input including a set of sorted inverted index SII(\(\hbox {D}_i\)) of document objects and a set of sorted inverted index SII(\(\hbox {Q}_j\)) of query objects and then produces the candidate pairs of the form \([URL_D-URL_Q, NOS_{D \cup Q}]\). First, mappers read data and query inputs as in steps 1–2. Next, we retrieve shingle list from query and store it in queryShingleList as in step 3 as well as metadata list from query and store it in queryMetaDataList. Besides, we use the variable shingle as in step 5 to store key element and value element. Moreover, we also use the variable metaDataFlag as in step 6 to separate the metadata processing as in steps 8–10 from the data processing as in steps 11–24. If metaDataFlag is true, we get the metadata information and store it into metaDataList. Otherwise, we start processing the data and let mappers emit candidate pairs. When comparing shingles from data and query, if the key value of data shingle is greater than the key value of query shingle and the length of queryShingleList is different from 0, we pop that query shingle out of the queryShingleList as in steps 12–13 due to the fact that key values in both data and query indexing are sorted in advance, whereas we only need to find those that are shared by both data and query at this phase. Otherwise, we retrieve query list and document list as in steps 14–16. Next, for each query in the query list and for each data in the data list, we get their necessary information such as the metadata index of the examining query as in step 18, URL of query as in step 19, the metadata index of the examining data as in step 21, URL of data as in step 22, and the total number of shingles between the document object D and the query object Q as in step 23. Finally, we let mappers emit the candidate pairs as in step 24.

Figure 14 shows an example of metadata of dataset, while Fig. 15 gives an example of metadata of query set. For instance, we build the metadata from the dataset as \(\{\hbox {URL}_{{D1}}@8, \hbox {URL}_{{D2}}@6, \hbox {URL}_{{D3}}@7\}\). Consequently, the pair [A, \([\hbox {URL}_{{D1}}@8, \hbox {URL}_{{D2}}@6, \hbox {URL}_{{D3}}@7]]\) is replaced by [A, [0, 1, 2]]. Meanwhile, we build the metadata from the query set as \(\{\hbox {URL}_{{Q1}}@4, \hbox {URL}_{{Q2}}@5, \hbox {URL}_{{Q3}}@5\}\). Consequently, the pair [N, [\(\hbox {URL}_{{Q1}}@4\), \(\hbox {URL}_{{Q2}}@5\), \(\hbox {URL}_{{Q3}}@5\)]] is replaced by [N, [0, 1, 2]]. When the data input is large, the indexes with our metadata is much more lightweight than those without that. Our experiments in Sect. “Empirical Experiments and Evaluations” show how much lightweight they are and the efficiency they devote in speeding up the searching process.

Fig. 14
figure 14

Metadata of dataset [8]

Fig. 15
figure 15

Metadata of query set [8]

Towards Other Similarity Queries

Our presentation so far is for query by example, which looks for similar objects when given either a query or a set of queries. However, our proposed approach would be adapted to other fundamental kinds of similarity queries as follows.

  • Pairwise similarity query. In this context, we would like to compute the similarity between one object and the others in the dataset. In other words, if we have N objects in the dataset, we need to calculate how similar one object is with (N-1) other objects. To do that, we interfere the algorithm 3: MAP-2, shown in Fig. 6, or the algorithm 6: MAP-2 with metadata, shown in Fig. 13. In this case, we replace the set of query objects with the same set of data objects. Moreover, we need to add the condition in step 6 of the algorithm 3 and in step 12 of the algorithm 6, so that the examining pair must not be the same.

  • Range query. When given a threshold \(\epsilon \), we would like to retrieve only those objects whose similarity scores are greater or equal to that \(\epsilon \). To do that, we interfere the algorithm 4: REDUCE-2, shown in Fig. 7. After either step 11 or step 16, we check whether the similarity score is greater or equal to the given \(\epsilon \) or not. If yes, we let reducers output that pair. Furthermore, if there are different similarity thresholds for different queries, we need to check the similarity score against the corresponding threshold.

  • K-nearest neighbor query. When given a k parameter, we would like to retrieve only k objects that are the most similar as a query object. To do that, we need to aggregate similarity scores after either step 11 or step 16 of the algorithm 4: REDUCE-2, shown in Fig. 7 and perform ranking based on those similarity scores. After that, we choose to output k pairs that have highest similarity scores with regard to their query objects. In case we need to process query batches, we may have different k parameters for different queries. In this scenario, we rank the similarity scores and group them by their queries. It is worth noting that our modified method here may return a super set of the real top-k result.

Empirical Experiments and Evaluations

Environmental Setting

We deploy Hadoop-based two-node cluster on a PC, in which each node has 2.00 GB RAM and 50 GB HDD. The PC has Intel® Core i5-4460, 3.20GHz CPU, 8.00 GB RAM, 500 GB HDD, and 64-bit operating system. Additionally, the Hadoop version is 2.7.3Footnote 2 run with its default settings. Nevertheless, we set the number of reducers to 4, which is based on four CPU cores.

Dataset

We employ datasets retrieved from Gutenberg Project,Footnote 3 an online data storage with over 56,000 free e-books, for our experiments. The datasets are randomly chosen from the storage and organized into different data as well as query packages, which is illustrated in Table 1. The data type serves as the data input for similarity search, while the query type is the pivot input to look for similar documents from the data type. For the data type, we organize five different data packages as D5, D100, D300, D500, and D1K with 5, 100, 300, 500, and 1000 files, respectively. As it was randomly chosen, the file size ranges from 1 to 102 KB. In the meantime, the query type is organized into four different query packages as Q1, Q10, Q100, and Q1K with 1, 10, 100, and 1000 files, respectively. In addition, the query size range is 59 KB.

Table 1 Data organization

Measurement

We compare the two different methods as follows:

  • Sorted inverted index (SII). This method follows our proposed similarity search scheme to build sorted inverted indexes for both data sources and query batches. In addition, the method performs query processing with our pruning strategy.

  • Sorted inverted index With metadata (SIIWMD). This method follows our proposed similarity search scheme to build sorted inverted indexes with metadata organization. Additionally, the method performs query processing with our pruning strategy.

Evaluation

In our first experiment, we measure the performance of the two comparing methods, known as SIIWMD and SII, for indexing query batches. Figure 16a shows the query indexing time when the sorted inverted indexes are built for query batches. In general, the processing time of the two methods is not much different with Q10 and Q100. In fact, SII tends to have less query indexing time than SIIWMD when the number of queries sharply increases due to the fact that it does not suffer overheads for metadata organization. For example, the gap is around 4.55% with Q1K. Meanwhile, Fig. 16b indicates the query indexing size between the two comparing methods. Generally, SIIWMD generates sorted inverted indexes much lighter than SII. When the query batch size increases from Q10, Q100 to Q1K, SIIWMD saves nearly 12 times more data output than SII on the average. As a result, SIIWMD generates much more lightweight indexes than SII.

Fig. 16
figure 16

Query and data indexing [8]

Fig. 17
figure 17

Query processing [8]

To experience the indexing building with larger dataset, we do the same experiment for data packages. As illustrated in Fig. 16c, we observe that the indexing time between SIIWMD and SII is not much different when the dataset is small as with D100, D300, and D500. Nevertheless, when the dataset size is large as with D1K, the gap is around 10.37%. In the meantime, the result from Fig. 16d keeps enforcing the fact that SIIWMD saves much more data output when building indexes than SII. On the average, SIIWMD saves nearly 6 times more data output than SII.

In terms of query processing, we do our next experiments with different query and data packages. Figure 17a shows the MapReduce performance with D5. With the small number of queries such as Q10, the query processing time of SIIWMD is nearly 33% faster than that of SII. Nevertheless, when the query size rapidly increases, the performance gap between them is much bigger. More specifically, the performance gap sharply rises as 85.7% with Q100, whereas it is 90.28% with Q1K. Consequently, SIIWMD processes query batches 6 times much faster than SII does on the average. In parallel, Fig. 17b displays the MapReduce performance with Q100 when the dataset changes. Generally, SIIWMD gives much faster query processing time than SII does. When the dataset size is small as with D100, SII processes Q100 nearly 2 times slower than SIIWMD does. In addition, the query processing time of SII sharply rises when the dataset size becomes larger. More concretely, the performance gap sharply rises as 79% with D300, whereas it is 82.5% with D500. Furthermore, the query processing time of SIIWMD slightly increases while that of SII dramatically rises when the dataset size rapidly grows. As a consequence, SIIWMD processes query batches 4 times much faster than SII does on the average.

Conclusion and Future Work

In this paper, we propose a general MapReduce-based similarity search scheme that not only effectively works for a single query processing but also efficiently deals with batch processing in an incremental fashion. Additionally, we build ordered inverted indexes for both datasets and query sets so that we can benefit our quick pruning strategy, which discards inessential accesses during the similarity search. Moreover, we embed metadata inside the index structures, so that we can generate much more lightweight indexes, which consequently helps reduce I/O costs as well as extra overheads caused by redundant data. By getting all to work as one, we improved the performance of our MapReduce-based similarity search while keeping the indexing size small, especially when the dataset becomes larger. In the end, the results from our empirical experiments verify the efficiency of our proposed solution. More concretely, our indexes with metadata are much lighter than baseline inverted indexes, while the building time is not that much. Furthermore, our method saves much more processing time than the baseline method.

In our future work, we are going to apply our proposed method to the variants of similarity queries. Moreover, we plan to experience a larger dataset size as well as the cluster size for large-scale similarity processing. Last but not least, we consider the load-balancing problem whose solution further improves the overall performance of MapReduce-based similarity search.