Keywords

1 Introduction

Data stream processing has been widely studied in recent years  [6, 12], and many industrial systems are now using it  [13] with applications such as monitoring systems for networks, marketing, transportation, manufacturing or IoT systems. Retrieving useful insights from this continuously produced data has hence become a key issue. A common way to process those streams is to aggregate events with respect to the instant they occurred or arrived in the system.

Time is a first class dimension for stream events as it determines how they will be aggregated, but up to now a strong assumption was made about point events, leaving aside spanning events. Let us consider a network monitoring system where we want to evaluate the load of an antenna, with spanning transactions, e.g., phone calls, happening continuously. In a classical streaming system, the load would be based either on the start or on the end time of the event. With a spanning event stream the full event duration would be interpreted.

Fig. 1.
figure 1

End time vs. full-time events aggregation in a window-based stream system.

Figure 1 models a series of calls: events \(a_i\) show their reception time, while \(b_i\)’s show the full call duration. We want to analyse the load of the antenna every 5 min showed by \(W_i\)’s. With spanning events, stream intervals \(b_3\) and \(b_5\) span over resp. windows \(\{w_1,w_2\}\) and \(\{w_2,w_3\}\), while their matching timestamps \(a_3\) and \(a_5\) are uniquely assigned to resp. \(w_2\) and \(w_3\). Natively modeling event duration also allows to detect events which have no bounds in the window, like event \(b_7\) crossing window \(w_2\) and \(w_3\). Spanning event stream (SES) hence allows to get not only information about (dis)connections, but also to the full connection information, providing more accurate results than point event stream (PES).

To be able to provide such results, interval comparison predicates, inspired by Allen’s algebra [1], need to be properly set up to allow assignment of spanning events to windows. Furthermore, as a side-effect of spanning events, past windows can be affected by fresh new events and this without any delay in the stream: on Fig. 1, \(b_7\) not only impacts \(w_4\), but also past windows \(w_1\), \(w_2\), and \(w_3\). Those simple observations motivate the need for a close review of the many windows types, and a new window-based aggregation framework to address SES.

The rest of this paper is organized as follows: Sect. 2 focuses on the formal requirements to elaborate the framework. Section 3 discusses adaptation of common windows to spanning events. We propose, in Sect. 4, a straightforward implementation of the framework. Finally, we review prior works in Sect. 5, before the general conclusion in Sect. 6.

2 Problem Analysis and Definitions

2.1 About Time

Following the dominant view point, one defines the time domain as an infinite, totally ordered, discrete set \((\mathbb {T}, \prec _\mathbb {T})\), with units coined chronons  [2]. As in temporal databases  [2], two dimensions can be used: valid time and transaction time, being resp. the lifespan of an event in the real world and in the system. In streaming systems, transaction time is usually reduced to the start time point.

In this bi-temporal model, intervals are required to represent valid time. As an adjacent series of time points in \(\mathbb {T}\), they are entirely defined by their lower and upper bounds, as pairs \((\ell ,\textit{u})\in \mathbb {T}\times \mathbb {T}\) with \(\ell \prec _\mathbb {T} \textit{u}\). We denote by \(\mathbb {I} \subset \mathbb {T}\times \mathbb {T}\) the set of time intervals. For any t in \(\mathbb {I}\), \(\ell (t) \in t\) and \(\textit{u}(t)\notin t\) are resp. the lower and upper bounds of interval t, such that a chronon can be represented by \([c, c+1)\). Two intervals can be compared with the 13 Allen’s predicates  [1] which define precisely how the bounds compare to each other.

2.2 Spanning Event Stream

In this article, we extend the stream concept to incorporate spanning events with a lifespan as valid time. As a consequence, it is required to distinguish valid time (interval) and transaction time (point) of an event. Since data stream applications require near-real-time computation, valid and transaction time should be ideally connected, e.g., a phone call is recorded as soon as it ends.

We define a SES in Eq. 1, where \(\varOmega \) is any composite type that brings the content of each event \(e\in S\); t is the valid time interval; and \(\tau \) is the transaction timestamp of e. We denote by t(e) and \(\tau (e)\) those values for an event e.

$$\begin{aligned} S = (e_i)_{i\in \mathbb {N}}\quad \text {with}\ e_i = (x, t, \tau ) \in \varOmega \times \mathbb {I} \times \mathbb {T} \end{aligned}$$
(1)

We assume that we record events when they finish, in a no-delay stream setting, such that \(\textit{u}(t(e)) = \tau (e)\). In the following, we denote by S(.) any projection of stream S on one or more of its 3 components. For instance, S(t) refers to the sequence of valid time intervals of all the events. We also denote \(\tau _i\) the transaction time of the ith event in S, ie., \(\tau _i = \tau (e_i)\).

2.3 Temporal Windowing

In data stream processing, data are transient and queries persistent, meaning that queries are continuously re-evaluated as data arrive. Blocking operators, such as aggregates, are a challenge for those continuous queries as they require to scan all the data set before producing the first answer  [6]. A popular way to bypass this issue is to operate on a bounded sub-stream given by a window  [6]. Indeed, closing a window unblocks the operation and an answer can be given with respect to the events “inside” the window. A common practice is to define infinite family of windows such like “each hour”. Many window flavors exist  [4, 6, 12], and in this paper, we propose a new categorisation for those windows.

Measures. Measures are useful to set up the shape and/or frequency of windows in a window family. Each measure is time related, to provide finite window bounds. We denote the measure set by \(\mathcal {M} = \{\mathbb {T}\} \cup \{S(t)\} \cup \{S(\tau )\} \cup \{S(x, \tau )\}\) and consider it as extensive. Measures are the following:

  • Stream independent with a wall-clock time \(\mathbb {T}\): a system clock is used. Opening and/or closing windows are then independent from the stream.

  • Stream dependent with a stream projection:

    • Valid-time S(t) uses the valid time of events, e.g., sessions where bounds depend on the traffic flow.

    • Shape \(S(\tau )\) uses the transaction time of the events, more common example is Count-based which counts events arriving in the stream.

    • Data \(S(x, \tau )\) uses the data of events and orders them with their transaction time. Most common types are: Delta-based which uses a data part  [6], e.g., a transaction counter; Punctuation-based: where window bounds are sent in the stream as special events  [6].

Formal Definition. Formally, a temporal window is a regular time interval. A family of windows is \(W=(w_k)_{k \in \mathbb {N}}\), \(w_k \in \mathbb {I}\) ordered by increasing \(\ell (w_k)\) or \(\textit{u}(w_k)\). \(W \in \mathcal {W}\) is the set of windows families. We propose a couple \((F_{bounds}, P_{insert})\) to compute any window family.

\(F_{bounds}^n: \mathcal {M}^n \rightarrow \mathcal {W}\), defines the bounds of the window. It uses one or several measures, and outputs a set of intervals (as shown in Table 1).

Table 1. Examples of windows created with \(F_{bounds}\) and \(F_{bounds}^2\)

\(P_{insert}: \mathbb {I}^2 \rightarrow \mathcal {B}\), determines event belonging to a window with an Allen-like predicate. It takes as input two intervals: window bounds and event valid time, and outputs a Boolean. We define two specializations:

  • \(\mathrm{P}_{\triangle }\): True if \(\ell (w) \le \textit{u}(e) < \textit{u}(w)\) else False, deals with point events by using a chronon and a window interval

  • \(\mathrm{P}_{\cap }\): True if \(\textit{u}(e) > \ell (w) \wedge \ell (e) < \textit{u}(w)\) else False, asserts if an event has at least one chronon in a window or not

Common Windows Review. We will now review window types which are often used in the literature  [6, 12]. Within PES, event valid time is a point, and hence for all those window types, \(P_{insert} = \mathrm{P}_{\triangle }\).

Sliding window: window is defined by \(\omega \) the range or size of the window, and \(\beta \) the step which sets up the delay between two successive windows. Most common measures are wall-clock time: \(F_{bounds}(\mathbb {T}) = (\left( i, i + \omega ): i \mod \beta = 0\right) _{i \in \mathbb {T}}\) and count-based: \(F_{bounds}(S(\tau )) = (\left( \tau _i, \tau _{i + \omega }): i \mod \beta = 0\right) _{i \in \mathbb {N}}\)

Tumbling window: tumbling windows can be seen as a specialization of sliding windows where \(\omega = \beta \), meaning that only one window is open at a time.

Session window: a session is defined as a period of activity followed by a period of inactivity. Parameter \(\varepsilon \) gives the inactivity threshold time range. Session window family is .

3 Assigning Spanning Events to Temporal Windows

3.1 Adaptation of Point Events Windows to Spanning Events

We now detail modifications that using SES implies to the previously defined windows. This impact depends on the measures used and we will review of all common windows in a mono-measure context, \(F_{bounds}: \mathcal {M} \rightarrow \mathcal {W}\).

Within PES, we saw that we can always use \(\mathrm{P}_{\triangle }\) for \(P_{insert}\). With SES we have to take into account intervals. \(P_{insert}\) depends on the window bounds definition \(F_{bounds}\). If it used only transaction time we can keep \(\mathrm{P}_{\triangle }\), otherwise it needs to be modified to use an Allen’s predicate (see Table 2).

Table 2. Most common spanning event window definition with \((F_{bounds}, P_{insert})\)

Hence, we claim that among the most popular windows, stream shape and data sliding/tumbling windows can be used straightforwardly with SES. Time-based sliding/tumbling and session windows must conversely be extended.

3.2 Time-to-Postpone

When dealing with a SES where events are released only once ended, lifespan yields two problems: (1) the system should be able to wait for (expected) event completion before closing any window; and (2) long-standing events may be assigned to multiple windows. Figure 1 shows an example of such duration constraint with, for instance, the event \(b_7\) released in window \(w_4\), but assigned to \(w_1\), \(w_2\), and \(w_3\) as well. Therefore, working on a valid time interval requires to postpone the release date of the aggregates in order to accept events that started in, or before the window. We call this waiting time the Time-To-Postpone (TTP).

Of course, TTP is a patch to overcome limitations of exact aggregate computation for temporal windows, and can lead to approximate results since events may be ignored and aggregates already released. Advanced TTP techniques deserve to be explored in future works to leverage those approximations.

3.3 Time-Based Sliding and Tumbling Windows

Event assignment to a window depends on how their lifespan compare: an event e is assigned to window w if \(P_{Allen}(t(e), w)\), with \(P_{Allen}\) any Allen-like predicate searching for event in the window. Release time of the window \(\tau _R\) depends on the TTP parameter \(\delta \), satisfying \(\tau _R \ge \textit{u}(w)+\delta \). A full overview of the needed changes to deal with SES is presented in Table 3.

Fig. 2.
figure 2

3 strategies for session windows.

Table 3. Comparison between PES and SES for sliding and session windows

3.4 Session Windows

In session windows, each received event either enters in the current window, or creates a new one. The upper bound of a session depends only on the end of the assigned events \(\textit{u}(w)= \max \{ \textit{u}(t(e)) \}_{e\in S(w)}\). As for the lower bound, it must be chosen carefully. With PES, one can definitely decide the start of a session window as a fresh new event arrives. With SES, we must live-adjust this lower bound, since it requires to define an instant from a set of spanning events.

We model this problem as \(\ell (w)= \min \{ \lambda (e) \}_{e\in S_w}\), where \(\lambda :S \rightarrow \mathbb {T}\) is a choice function that gives a reference point for an event. For a PES, this function is written as \(\lambda (e)=\textit{u}(t(e))-1\), using only the end bound as shown in strategy (1) in Fig. 2. When considering the lifespan of the event, a first estimate of the lower bound is \(\lambda (e)=\ell (t(e))\), such that the event is starting in (and covers) the session. However, this strategy can lead to problems, as illustrated with strategy (2) where the last event leads to either re-opening or creating a session, causing impossible situations since the aggregate has already been released, and session overlaps are not allowed. To overcome this problem, we apply the TTP parameter \(\delta \) to restrict back-propagation of the update; with \(\lambda (e) = \max (\ell (t(e)),\tau (e) - \delta )\) as with strategy (3) which makes the long-standing event problem disappear.

Release of the session depends on the minimum inactivity period \(\varepsilon \) and the TTP \(\delta \), satisfying \(\tau _R \ge \textit{u}(w) + \varepsilon + \delta \). Several sessions can be active at the same time, and long events can yield to merge sessions. Table 3 details the adaptation between PES and SES sessions. We use \(\varepsilon (w) = (\textit{u}(w), \textit{u}(w) + \varepsilon )\) as the inactivity interval, and \(\varLambda (e) = (\lambda (e), \textit{u}(t(e)))\) as the re-considered event with the TTP.

4 Experiments

Experimental Setup. In this series of experiments, data is not received at specific instant based on machine clock, but better “as fast as possible.”

Data Set. We use 2 kinds of data sets: Generated data set allows fine-grained synthesis of SES with configurable parameters: event size, session duration, and inactivity. For each chronon, an event is created, which can be canceled with session creation. Each event size is generated by a normal distribution (\(\mu = 100\), \(\sigma =10\)) around the event size parameter. The generated set is 200K events. SS7 data set replays real-world-like data coming from a telephony network, assembling 1 min of communication with 3.2M events.

Aggregates. Aggregation in all experiments is a multi-measure of three aggregate functions: count, sum, and max.

Setup. All experiments were executed with an Intel(R) Core(TM) i7-8650U CPU @ 1.90 GHz with 16 GB RAM running under Linux Debian 10. Implementation is done in modern C++, using a single core.

Implementation. Implementation uses an event-at-a-time execution. For PES windows an unique FIFO queue is used, with new events added and old ones removed each time the window is released. With SES, such an implementation is not possible. Instead, events pointers are stored in a bucket per window.

Fig. 3.
figure 3

Temporal windows metrics

Results. All the scenarios chosen in this series of experiments have been motivated by industrial requirements, especially in the field of telecommunication.

Time-Based Windows. The predicate used for event assignment is \(\mathrm{P}_{\cap }\). As expected the error rate between PES and SES increases with the event size, and decreases with window range (see Fig. 3a). This validates the soundness of using PES but also the urge to choose wisely the window range. When using an Oracle, which knows all the stream, we can validate the need for a TTP within SES, which should be chosen accordingly to the event size (see Fig. 3b). Concerning the throughput, TTP has a restricted impact when the window range evolves (see Fig. 3c), which is not the case for increasing events size. SES yield to many duplicates among the windows, which comes with a cost in throughput. For increasing duplications, the throughput goes down, but it stays roughly the same for increasing window range with the same duplication rate (see Fig. 3d). Duplication induced by overlapping in sliding windows also has a strong impact on throughput (see Fig. 3e). Concerning real-world applications, on a naïve implementation the throughput of SES is only 30% slower than PES, and the error rate is still around 20% for 1 min windows, as we can see on Fig. 3f.

Session Windows. As shown on Figs. 3g and 3h, reducing window size as a negative impact on throughput. This is in accordance with time-based windows and refers to how often aggregates should be computed. Figure 3g highlights the impact of inactivity duration on throughput which is quite high. When maintaining inactivity periods at a same level, we can observe that TTP has a small influence on the throughput (see Fig. 3h).

Summary. This series of experiments shows that our framework is consistent with the all required assumptions for window-based aggregation on SES, and in particular the TTP. It then deserves to be pushed further in order to gain efficiency and completely meet the industrial requirements.

5 Related Work

The work done in this paper elaborates on previous work on data stream processing and temporal databases.

Window Aggregation in Data Stream Processing. Windowing is a common technique and a common categorization of window characteristics is given in  [4, 12] with CF, FCF and FCA classes. Depending on those characteristics, several optimisations techniques have been proposed, such as sub-aggregating the input stream, and using aggregate tree indexes  [4, 12]. Studied in the context of PES, we believe that the extension of such methods would be of great interest to fasten window-based aggregation of SES. Nevertheless, the window approach has been criticized for its inability to take into account delayed or out-of-order streams. Some methods have been proposed to fix the delay issue, among which an allowed waiting time (TTP in this paper), the use of punctuation in the stream  [7], or even the generation of heartbeats  [11].

Temporal Databases. Queries in a temporal database can be of various forms  [2]. Among those, sequenced queries, where the query spans over a time range, are close to our temporal window-based aggregates. However, pure sequenced query is resource demanding and barely evaluated with a one-pass algorithm  [8]. Several methods were proposed to evaluate sequenced queries, mainly with graph or indexes  [2, 9], but as an open issue, only few market databases implement them  [3, 5, 10]. Temporal aggregates on spanning events have not been widely studied. In  [14], the authors combine windows and full history with a fine-to-coarse grain along the timeline, using SB-tree structure to index events and evaluate the queries. However, the approach is out-dated w.r.t. recent advances in window-based stream processing and temporal databases.

6 Conclusion and Future Work

This paper aimed to introduce a brand new consideration in stream data with the integration of spanning events. To do so, we first introduced notions common to temporal databases with a valid time range and a transaction time point for events in a bi-temporal model. Then, a common solution to overcome the infinite stream problem with blocking operators is to use windowing. To that extent, we conducted a careful review that yielded to a new categorization of usual measures and the definition of a pattern (function, predicate) to define every popular window family as well as the forthcoming ones. We showed that, among the real-life window families, only time-based sliding/tumbling and session windows need to be adapted to handle spanning events.

Among those changes, we introduced pairwise interval comparison, as for Allen’s algebra, for event assignment to windows. We also had to define a Time-To-Postpone parameter that allows for long-standing events to be properly assigned to past windows. In the experiments, we showed that spanning events can be processed by a stream system. We demonstrated that our framework is effective for fixing PES errors. We also pointed out some behaviors, like assignment duplication of events, which is a great challenge for real-life applications.

As future work, we anticipate that the implementation should use more advanced techniques to share parts of computations among windows. Delay also should be studied in more details. Finally, extension of the aggregation, such as new operations like grouping or filtering, should also be considered with the ultimate goal of making the system fully operational in real-world conditions.