Keywords

1 Introduction

Nowadays, time series are pervasive in a wide spectrum of applications with data intensive analytics, e.g. climate modelling [1], economic forecasting [21], medical monitoring [6], etc. Many time series analytical problems require subsequence similarity search as a subtask, which assumes the following. A query subsequence and a longer time series are given, and a subsequence of the time series should be found, whose similarity to the query is the maximum among all the subsequences.

Currently, Dynamic Time Warping (DTW) [3] is considered as the best similarity measure in most domains [5]. Since computation of DTW is time-consuming there are parallel algorithms for FPGA [25] and GPU [17] have been proposed.

Our research [10,11,12,13] addresses the task of accelerating similarity search with the Intel Xeon Phi many-core system, which can be considered as an attractive alternative to FPGA and GPU. Phi provides a large number of compute cores with 512-bit wide vector processing units. Phi is based on the Intel x86 architecture and supports the same programming methods and tools as a regular Intel Xeon. The first generation of Phi, Knights Corner (KNC) [4], is a coprocessor with up to 61 cores, which supports native applications and offloading of calculations from a host CPU. The second generation product, Knights Landing (KNL) [20], is a bootable processor with up to 72 cores, which runs applications only in native mode. In [11,12,13], we proposed CPU+Phi computational scheme for subsequence similarity search on Phi KNC. In [10], we changed such an approach for Phi KNL having implemented advanced data layout and computational scheme, which allow to efficiently vectorizing computations.

This paper is a revised extended version of [10]. We consider more complicated case of very large time series when computing cluster system of Phi KNL nodes is utilized for the similarity search. We propose an advanced parallel algorithm, called PhiBestMatch, which parallelizes computations both among cluster nodes (through MPI technology), and within a single cluster node (through OpenMP technology). We performed additional series of experiments, which showed good scalability of PhiBestMatch.

The rest of the paper is organized as follows. Section 2 discusses related work. Section 3 gives formal statement of the problem. In Sect. 4, we present the proposed algorithm. We describe experimental evaluation of our algorithm in Sect. 5. Finally, Sect. 6 concludes the paper.

2 Related Work

In recent decade, parallel and distributed algorithms for subsequence similarity search under the DTW measure have been extensively developed for various hardware platforms.

In [26], a GPU-based implementation was proposed. The warping matrix is generated in parallel, but the warping path is searched serially. Since the matrix generation step and the path search step are split into two kernels, this leads to overheads for storage and transmission of the warping matrix for each DTW calculation.

In [17], GPU and FPGA implementations of subsequence similarity search were presented. The GPU implementation is based on the same ideas as [26]. The system consists of two modules, namely Normalizer (z-normalization of subsequences) and Warper (DTW calculation), and is generated by a C-to-VHDL tool, which exploits the fine-grained parallelism of the DTW. However, this implementation suffers from lacking flexibility, i.e. it must be recompiled if length of query is changed. In [25], authors proposed a framework for FPGA-based subsequence similarity search, which utilizes the data reusability of continuous DTW calculations to reduce the bandwidth and exploit the coarse-grain parallelism.

In [22], authors proposed subsequence similarity search on CPU cluster. Subsequences starting from different positions of the time series are sent to different nodes, and each node calculates DTW in the naïve way. In [23], authors accelerated subsequence similarity search with SMP system. They distribute different queries into different cores, and each subsequence is sent to different cores to be compared with different patterns in the naïve way. In both implementations, the data transfer becomes the bottleneck. In [18], authors proposed an approach to subsequence similarity search on Apache Spark cluster. Time series is fragmented and fragments are shared among cluster nodes as files under HDFS (Hadoop Distributed File System). Each node processes in parallel as many fragments as its CPU cores where each core implements the UCR-DTW algorithm [15].

3 Notation and Problem Background

3.1 Definitions and Notation

A time series \( T \) is a sequence of real-valued elements: \( T = (t_{1} ,t_{2} , \ldots ,t_{m} ) \). Length of a time series \( T \) is denoted by \( \left| T \right| \).

Given two time series, \( X = \left( {x_{1} ,x_{2} , \ldots ,x_{m} } \right) \) and \( Y = (y_{1} ,y_{2} , \ldots ,y_{m} ) \), the Dynamic Time Warping (DTW) distance between \( X \) and \( Y \) is denoted by \( DTW\left( {X,Y} \right) \) and defined as below.

$$ \begin{aligned} & DTW\left( {X,Y} \right) = d\left( {m,m} \right), d\left( {i,j} \right) = \left( {x_{i} - y_{j} } \right)^{2} + \hbox{min} \left\{ {\begin{array}{*{20}c} {d\left( {i - 1,j} \right)} \\ {d\left( {i,j - 1} \right)} \\ {d\left( {i - 1,j - 1} \right)} \\ \end{array} ,} \right. \\ & \,\,\,\,\,\,\,\,\,\,\,\,\,d\left( {0,0} \right) = 0, d\left( {i,0} \right) = d\left( {0,j} \right) = \infty ,1 \le i \le m, 1 \le j \le m. \\ \end{aligned} $$
(1)

In the formulas above, \( \left( {d_{ij} } \right) \in {\mathbb{R}}^{m \times m} \) is considered as a warping matrix for the alignment of the two respective time series. A warping path is a contiguous set of warping matrix elements that defines a mapping between two time series. The warping path must start and finish in diagonally opposite corner cells of the warping matrix, the steps in the warping path are restricted to adjacent cells, and the points in the warping path must be monotonically spaced in time.

A subsequence \( T_{i,k} \) of a time series \( T \) is its contiguous subset of \( k \) elements, which starts from position \( i \): \( T_{i,k} = (t_{i} ,t_{i + 1} , \ldots ,t_{i + k - 1} ) \), \( 1 \le i \le m - k + 1 \). A set of all subsequences of \( T \) with length \( n \) is denoted by \( S_{T}^{n} . \) Let \( N = \left| T \right| - n + 1 = m - n + 1 \) denotes a number of subsequences in \( S_{T}^{n} . \)

Given a time series \( T \) and a time series \( Q \) as a user specified query where \( m = \left| T \right| \gg \left| Q \right| = n \), the best matching subsequence \( T_{i,n} \) meets the property

$$ \exists T_{i,n} \in S_{T}^{n} \;\forall k \,DTW\left( {Q,T_{i,n} } \right) \le DTW\left( {Q,T_{k,n} } \right),1 \le i,k \le m - n + 1 . $$
(2)

In what follows, where there is no ambiguity, we refer to subsequence \( T_{i,n} \) as \( C \), as a candidate in match to a query \( Q \).

3.2 The UCR-DTW Serial Algorithm

Currently, UCR-DTW [15] is the fastest serial algorithm of subsequence similarity search, which integrates a large number of algorithmic speedup techniques. Since our algorithm is based on UCR-DTW, we briefly describe its basic features.

Squared Distances.

The Euclidean distance (ED) between two subsequences \( Q \) and \( C \) where \( \left| Q \right| = \left| C \right| \), is defined as below.

$$ ED\left( {Q,C} \right) = \sqrt {\sum\nolimits_{i = 1}^{n} {(q_{i} - c_{i} )^{2} } } . $$
(3)

Instead of use square root in DTW and ED distance calculation, it is possible to use the squares thereof since it does not change the relative rankings of subsequences.

Z-normalization.

Both the query subsequence and each subsequence of the time series need to be z-normalized before the comparison [24]. The z-normalization of a time series \( T \) is defined as a time series \( {\hat{T}} = \left( {\hat{t}_{1} ,\hat{t}_{2} , \ldots ,\hat{t}_{m} } \right) \) where

$$ \hat{t}_{i} = \frac{{t_{i} - \mu }}{\sigma },\,\,\mu = \frac{1}{m}\sum\nolimits_{i = 1}^{m} {t_{i} } ,\,\,\sigma^{2} = \frac{1}{m}\sum\nolimits_{i = 1}^{2} {t^{2}_{i} - \mu^{2} } . $$
(4)

Cascading Lower Bounds.

Lower bound (LB) is an easy computable threshold of the DTW distance measure to identify and prune clearly dissimilar subsequences [5]. In what follows, we refer this threshold as the best-so-far distance (or \( bsf \) for brevity). If LB has exceeded \( bsf \), the DTW distance will exceed \( bsf \) as well, and the respective subsequence is assumed to be clearly dissimilar and pruned without calculation of DTW. UCR-DTW initializes \( bsf \) as \( + \infty \) and then scans the time series with sliding window and calculates \( bsf \) on the ith step as follows:

$$ bsf_{\left( i \right)} = \hbox{min} \left( {bsf_{{\left( {i - 1} \right)}} , \left\{ {\begin{array}{*{20}c} { + \infty , LB\left( {Q,T_{i,n} } \right) > bsf_{{\left( {i - 1} \right)}} } \\ {DTW\left( {Q,T_{i,n} } \right),otherwise} \\ \end{array} } \right.} \right). $$
(5)

UCR-DTW exploits three LBs, namely LBKimFL [15], LBKeoghEC, LBKeoghEQ [8] applying them in a cascade.

The LBKimFL lower bound uses the distances between the First (Last) pair of points from \( C \) and \( Q \) as a lower bound, and defined as below.

$$ LB_{{Kim}} FL(Q,C): = ED\left( {\hat{q}_{1} ,\hat{c}_{1} } \right) + ED\left( {\hat{q}_{n} ,\hat{c}_{n} } \right) $$
(6)

The LBKeoghEC lower bound is the distance from the closer of the two so-called envelopes of the query to a candidate subsequence, and defined as below.

$$ LB_{Keogh} EC\left( {Q,C} \right) = \mathop \sum \limits_{i = 1}^{n} \left\{ {\begin{array}{*{20}c} {\left( {\hat{c}_{i} - u_{i} } \right)^{2} , if\; \hat{c}_{i} > u_{i} } \\ {\left( {\hat{c}_{i} - \ell_{i} } \right)^{2} ,if\; \hat{c}_{i} < \ell_{i} } \\ {0, otherwise} \\ \end{array} } \right.. $$
(7)

In the equation above, subsequences \( U = (u_{1} , \ldots ,u_{n} ) \) and \( L = \left( {\ell_{1} , \ldots ,\ell_{n} } \right) \) are the upper envelope and lower envelope of the query, respectively, and defined as below.

$$ u_{i} = \mathop {\hbox{max} }\limits_{i - r \le k \le i + r} \hat{q}_{k} ,\ell_{i} = \mathop {\hbox{min} }\limits_{i - r \le k \le i + r} \hat{q}_{k} , $$
(8)

where the parameter \( r\,\left( {1 \le r \le n} \right) \) denotes the Sakoe–Chiba band constraint [16], which states that the warping path cannot deviate more than \( r \) cells from the diagonal of the warping matrix.

The LBKeoghEQ lower bound is the distance from the query and the closer of the two envelopes of a candidate subsequence (i.e. the roles of the query and the candidate subsequence are reversed as opposed to LBKeoghEC).

$$ LB_{Keogh} EQ\left( {Q,C} \right)\;{:=}\;LB_{Keogh} EC\left( {C,Q} \right) . $$
(9)

Firstly, UCR-DTW calculates z-normalized version of the query and its envelopes, and \( bsf \) is assumed to be equal to infinity. Then the algorithm scans the input time series applying the cascade of LBs to the current subsequence. If the subsequence is not pruned, then DTW distance is calculated. Next, \( bsf \) is updated if it is greater than the value of DTW distance calculated above. By doing so, in the end, UCR-DTW finds the best matching subsequence of the given time series.

4 The PhiBestMatch Parallel Algorithm

In this section, we present a novel parallel algorithm for subsequence similarity search in very long time series on computing cluster of Phi KNL nodes, called PhiBestMatch. PhiBestMatch is based on the following ideas.

Computations are parallelized on two levels, namely at the level of all cluster nodes, and within a single cluster node. The time series is divided into equal-length partitions and distributed among cluster nodes. During the search in its own partition, each node communicates with rest nodes by functions of the MPI standard to improve local \( bsf \) and reduce the amount of computations.

Within a single cluster node, computations are performed by the thread-level parallelism and the OpenMP technology. In addition, data structures are aligned in main memory, and computations are organized with as many vectorizable loops as possible. Vectorization means a compiler’s ability to transform the loops into sequences of vector operations [2] of VPUs. We should avoid unaligned memory access since it can cause inefficient vectorization due to timing overhead for loop peeling [2]. Within a single cluster node, the algorithm involves additional data structures and redundant computations [10].

4.1 Partitioning of the Time Series

We partition the time series among cluster nodes as follows. Let \( F \) is a number of fragments and \( T^{\left( k \right)} \) is \( k \)-th \( \left( {0 \le k \le F - 1} \right) \) partition of \( T \), then \( T^{\left( k \right)} \) is defined as a subsequence \( T_{start,len} \) as below.

$$ start = k \cdot \left\lfloor {\frac{N}{F}} \right\rfloor + 1,\,len = \left\{ {\begin{array}{*{20}c} {\left\lfloor {\frac{N}{F}} \right\rfloor + \left( {N\,mod\,F} \right) + n - 1, k = F - 1} \\ {\left\lfloor {\frac{N}{F}} \right\rfloor + n - 1, otherwise} \\ \end{array} } \right.. $$
(10)

This means the head part of every partition except first overlaps with the tail part of the previous partition in \( n - 1 \) data points, where n is the query length. Such a technique prevents us from loss of the resulting subsequences in the junctions of two neighbor partitions.

4.2 Data Layout

We propose data layout aiming to provide organize computations over aligned data with as many auto-vectorizable loops as possible.

Given a subsequence \( C \) and VPU width \( w \), we denote pad length as \( pad = w - \left( {n \bmod w} \right) \) and define aligned subsequence \( \tilde{T}_{i,n} \) as below:

$$ \tilde{T}_{i,n} = \left\{ {\begin{array}{*{20}c} {(t_{i} ,t_{i + 1} , \ldots ,t_{i + n - 1} ,\mathop {\underbrace {0,0, \ldots ,0)}_{pad}}\limits_{{}} , if \;n\, mod\, w > 0} \\ {(t_{i} ,t_{i + 1} , \ldots ,t_{i + n - 1} ), \quad \; otherwise.} \\ \end{array} } \right. $$
(11)

According to (1), \( \forall Q,C: \left| Q \right| = \left| C \right| \) \( DTW\left( {Q,C} \right) = DTW\left( {\tilde{Q},\tilde{C}} \right) \). Thus, in what follows, we will assume the aligned versions of the query and a subsequence of the input time series.

Next, we store all (aligned) subsequences of a time series in the subsequence matrix \( S_{T}^{n} \in {\mathbb{R}}^{{N \times \left( {n + pad} \right)}} \), which is defined as below.

$$ S_{T}^{n} \left( {i,j} \right)\;{:=}\;\tilde{t}_{i + j - 1} \cdot $$
(12)

Let us denote the number of LBs exploited by the algorithm as \( lb_{max}\,\left( {lb_{max} \ge 1} \right) \), and denote these LBs as \( LB_{1} ,LB_{2} , \ldots ,LB_{{lb_{max} }} \), enumerating them according to the order in the lower bounding cascade. Given a time series \( T \), we define the LB-matrix of all subsequences of length \( n \) from \( T \), \( L_{T}^{n} \in {\mathbb{R}}^{{N \times lb_{max} }} \) as below.

$$ L_{T}^{n} \left( {i,j} \right)\;{:=}\;LB_{j} \left( {T_{i,n} ,Q} \right) . $$
(13)

The bitmap matrix is a vector-column \( B_{T}^{n} \in {\mathbb{B}}^{N} \), which for all subsequences of length \( n \) from \( T \) stores the logical conjunction of \( bsf \) and every LB:

$$ B_{T}^{n} \left( i \right)\;{:=}\;\mathop {\bigwedge }\limits_{j = 1}^{{lb_{max} }} \left( {L_{T}^{n} \left( {i,j} \right) < bsf} \right). $$
(14)

We establish the candidate matrix to store those subsequences from the \( S_{T}^{n} \) matrix, which have not been pruned after the lower bounding. The candidate matrix will be processed in parallel by calculating of DTW distance measure between each row of the matrix and the query. Then the minimum of DTW distances is used as bsf.

To provide parallel calculations of the candidate matrix, we denote the segment size of the matrix as \( s \in {\mathbb{N}}\,\text{ }(s \le \frac{N}{p} \) where \( p \) is the number of threads employed by the parallel algorithm) and define the candidate matrix, \( C_{T}^{n} \in {\mathbb{R}}^{{\left( {s \cdot p} \right) \times \left( {n + pad} \right)}} \) as below.

$$ C_{T}^{n} \left( {i,\cdot} \right)\;{:=}\;S_{T}^{n} \left( {k,\cdot} \right)\!: B_{T}^{n} \left( i \right) = TRUE . $$
(15)

In further experiments, we take the segment size \( s = 100 \).

4.3 Computational Scheme

Figure 1 depicts the PhiBestMatch pseudo-code, and Fig. 2 shows data structures of the algorithm. At initialization, the algorithm assigns the number of the current process to \( myrank \) by the MPI function. In what follows, each process deals with the subsequence matrix \( S_{{T^{{\left( {myrank} \right)}} }}^{n} \) of the \( T^{{\left( {myrank} \right)}} \) partition. The variable bsf is initialized by the DTW distance between the query and a random subsequence of the partition.

Fig. 1.
figure 1

PhiBestMatch pseudo-code

Fig. 2.
figure 2

Data flow of PhiBestMatch

Then we perform preprocessing by forming the subsequence matrix of the aligned subsequences, z-normalizing each subsequence, and calculating each LB of the lower bounding cascade. Strictly speaking, the latter step brings redundant calculations. In contrast, UCR-DTW calculates the next LB in the cascade only if a current subsequence is not clearly dissimilar after the calculation of the previous LB. However, we perform precomputations once and parallelize them keeping in mind they further can be efficiently vectorized by the compiler since the absence of data dependencies in LBs.

After that, the algorithm improves the bsf threshold by the following loop until each node completes its partition. At first, the bitmap matrix is calculated in parallel based on the pre-calculated LB-matrix. Then each subsequence with TRUE in the respective element of the bitmap matrix is added to the candidate matrix. After the candidate matrix is filled, we calculate in parallel the DTW distance measure between each candidate and the query and find the minimum distance. If the minimum distance is less than \( bsf \) then \( bsf \) is updated. Then, we find the minimum value of \( bsf \) among all the partitions by the MPI_Allreduce global reduction operation. Finally, the latter operation is used to check if each node completes its partition.

5 Experiments

In order to evaluate the developed algorithm, we performed experiments on two platforms, namely a single cluster node and a whole cluster system.

5.1 Experimental Setup

Objectives. In the experiments on a single cluster node, we studied performance and scalability of the algorithm with respect to the \( r \) warping constraint and the \( n \) query length. In the experiments on the cluster system, we studied the algorithm’s scaled speedup with respect to the query length. Finally, we compare PhiBestMatch performance with analogous algorithm [18].

Measures.

In the experiments, we investigated the algorithm’s performance (measuring the run time after deduction of the I/O time) and scalability. We calculated the algorithm’s speedup and parallel efficiency, which are defined as follows. Speedup and parallel efficiency of a parallel algorithm employing k threads are calculated, respectively, as

$$ s\left( k \right) = \frac{{t_{1} }}{{t_{k} }},e\left( k \right) = \frac{s\left( k \right)}{k}, $$
(16)

where \( t_{1} \) and \( t_{k} \) are run times of the algorithm when one and \( k \) threads are employed, respectively.

In the experiments on the cluster system, we investigated scaled speedup of the parallel algorithm, which refers to linear increasing of the problem size proportionally with the number of computational nodes added to the system, and is calculated as follows:

$$ s_{scaled} = \frac{p \cdot m}{{t_{{p\left( {p \cdot m} \right)}} }} , $$
(17)

where \( p \) is the number of nodes, \( m \) is the problem size, and \( t_{{p\left( {p \cdot m} \right)}} \) is the algorithm’s run time when a problem of size \( p \cdot m \) is processed on p nodes.

Hardware.

We performed our experiments on two supercomputers, namely Tornado SUSU [9] and NKS-1P [19] with the characteristics summarized in Table 1.

Table 1. Specifications of hardware

For the experiments on a single node, we used the simplified version of PhiBestMatch [10], which treats the time series as one partition.

Datasets.

In the experiments, we used datasets summarized in Table 2. RW-SN, RW-CS, and RW-SN are the datasets generated according to the Random Walk model [14]. The EPG (Electrical Penetration Graph) dataset is a series of signals, which was used by entomologists to study of Aster leafhopper (macrosteles quadrilineatus) behavior [17]. The ECG dataset [7] represents electrocardiogram signals digitized at 128 Hz.

Table 2. Datasets used in experiments

5.2 Evaluation on a Single Cluster Node

Figures 3 and 4 depict the performance of PhiBestMatch depending on \( r \) and \( n \), respectively. As we can see, at lower values of the parameters (approximately, \( 0 < r \le 0.5n \) and \( n < 512 \)), the algorithm runs slightly faster or about the same way on two Intel Xeon host than on Intel Xeon Phi. At high values of the parameters (\( 0.5n < r \le n \) and \( n \ge 512 \)), the algorithm is faster on Intel Xeon Phi. It means that PhiBestMatch better utilizes vectorization capabilities of Intel Xeon Phi with greater computational load.

Fig. 3.
figure 3

PhiBestMatch performance w.r.t. the warping constraint

Fig. 4.
figure 4

PhiBestMatch performance w.r.t. the query length

Figures 5 and 6 depict the experimental results on the synthetic (RW-SN) and the real (EPG) datasets, respectively. As we can see, PhiBestMatch shows speedup closer to linear and efficiency closer to 100%, if the number of threads matches the number of physical cores the algorithm is running on. When more than one thread per physical core is used, speedup became sub-linear, and parallel efficiency decreases accordingly. The best speedup and efficiency are achieved when the \( r \) parameter ranges from 0.8 to 1 of \( n \).

Fig. 5.
figure 5

PhiBestMatch speedup and parallel efficiency on synthetic data (RW-SN dataset)

Fig. 6.
figure 6

PhiBestMatch speedup and parallel efficiency on real data (EPG dataset, \( n = 360 \))

5.3 Evaluation on a Cluster System

In the experiments studying PhiBestMatch scaled speedup, we utilized from 16 to 128 nodes of the Tornado SUSU supercomputer. We varied the query length while took the parameter \( r = n \). Figures 7 and 8 depict the performance of PhiBestMatch on synthetic and real data, respectively.

Fig. 7.
figure 7

PhiBestMatch scaled speedup on synthetic data (RW-CS dataset, \( r = 0.8n \))

Fig. 8.
figure 8

PhiBestMatch scaled speedup on real data (ECG-CS dataset, \( r = 0.8n \))

As we can see, PhiBestMatch shows closer to linear scaled speedup. At the same time, the similarity search for a subsequence of greater length demonstrates a higher scaled speedup, since it provides a greater amount of computations on a single node.

5.4 Comparison with Analogue

In [18], Shabib et al. presented the hybrid search algorithm, which exploits Apache Spark cluster of multi-core nodes. The algorithm was evaluated on six cluster nodes each with Intel Xeon E3-1200 (4-core at 3.1 GHz) CPU onboard for the RW-SH dataset with the parameter \( r = 0.05n \). We compared the performance of Shabib et al. algorithm and PhiBestMatch performance on six nodes of Tornado SUSU for the same dataset and parameter \( r \). Table 4 depicts the results.

Table 4. Performance of PhiBestMatch in comparison with Shabib et al. algorithm

6 Conclusion

In this paper, we presented PhiBestMatch, a novel parallel algorithm for subsequence similarity search in very large time series data on computing cluster of the modern Intel Xeon Phi Knights Landing (Phi KNL) nodes. Phi KNL is many-core system with 512-bit wide vector processing units, which supports the same programming methods and tools as a regular Intel Xeon, and can be considered as an alternative to FPGA and GPU.

PhiBestMatch performs parallel computations on two levels, namely at the level of all cluster nodes, and within a single cluster node. The time series is divided into equal-length partitions and distributed among cluster nodes. During the search in its own partition, each node communicates with rest nodes by functions of the MPI standard to improve local best-so-far similarity threshold and reduce the amount of computations. Within a single cluster node, PhiBestMatch exploits the thread-level parallelism and the OpenMP technology. The algorithm involves additional data structures, which are aligned in main memory, and redundant computations. Computations are organized with as many vectorizable loops as possible to provide the highest performance of Phi KNL.

We performed experiments on synthetic and real-word datasets, which showed good scalability of PhiBestMatch. Within a single cluster node, the algorithm demonstrates closer to linear speedup when the number of threads matches the number of Phi KNL physical cores the algorithm is running on. On the whole cluster system, PhiBestMatch showed close to linear scaled speedup. The algorithm better utilizes vectorization capabilities of Phi KNL with greater computational load (i.e. with longer query length and greater value of the Sakoe–Chiba band constraint).