Introduction

Nowadays, smart water networks, smart homes, smart cities, smart health, smart grid, intelligent transportation, are infrastructure systems that connect our world more than we ever thought possible. The common vision of such systems is usually associated with one single concept, the Internet of Things (IoT) [1]. The idea of IoT was developed in parallel to wireless sensor networks (WSNs). Therefore, the main components that enable IoT are WSNs [2].

A wireless sensor networks is a wireless network consisting of spatially distributed autonomous devices using sensors to monitor physical or environmental conditions. A WSN is usually deployed with static sensor nodes to perform monitoring in the region of interest, but WSN can be deployed together with mobile sensor nodes to perform monitoring in different ad hoc selected locations of interest. WSNs may be homogeneous or heterogeneous. Homogeneous sensors send only one type of information (e.g., the water temperature), while heterogeneous sensors send more than one type of information (e.g., temperature and dissolved oxygen). All these sensors send observational data referred to as sensor stream data to a remote server. Therefore, sensory data come from multiple sensors of different modalities in distributed locations [3].

Furthermore, sensor stream data are enabled to the web through the Sensor Web (SW). SW by incorporating technologies of the Semantic Web creates the Semantic Sensor Web (SSW) [4]. As a result, adding semantic annotations to sensor stream data with concept definitions from domain knowledge (e.g., ontologies) allows for the interpretation and understanding of sensor data and metadata. The real-time integration of sensor data as dynamic data with semantics is defined as real-time semantic annotation, while sensor data that are stored in repository (data store) as static data, and then integrated with semantics is defined as non-real-time semantic annotation [5]. Organizations such as Open Geospatial Consortium (OGC) and World Wide Web Consortium (W3C) have proposed industry standards such as Sensor Web Enablement (SWE), which are aimed at providing unified standards [6].

The paper is organized as follows: “Literature Review” provides a discussion on related work for semantic annotations to the sensor stream data. “Background” is an overview of the data stream models, annotated sensor data streams, and used technologies. “Proposed Model” represents a proposed management model of integrated semantic annotations to the sensor stream data for the IoT. Prototype implementation of the proposed model is presented in “Implementation of Proposed Model”, which includes (A) “Received Sensor Stream Data Format”, (B) “Integration and Interpretation of Semantic Annotations into the Sensor Stream Data”, and (C) “System Outputs”. Finally, “Conclusion and Future Work” concludes the paper and relevels some of the future perspectives of the proposed model.

Literature Review

A sensor stream data is a (potentially unbounded) sequence of tuples. Each tuple consists of a set of attributes, similar to a row in database table. The rate of received sensor stream data is very important for their processing since as bigger the rate it is harder to manage them and process them [7]. For some sensors, the data transmission rate is not very high, such as water quality monitoring sensors, air quality monitoring, and agro-sensors. Therefore, the management and processing of their data can be distinguished by those who have high data transmission rates because they can be stored or even archived. Raw sensor stream data is useless unless properly annotated. Therefore, determining how to integrate semantic annotations to the sensor stream data and make them machine-interpretable is an important issue.

Recently, some researchers have already shown up with several investigations related to semantic enrichment of sensor stream data.

Sejdiu et al. [8] presented a management model, which enables to manage the real-time integration of semantic annotations into heterogeneous sensor stream data with context in the IoT and using OGC SOS standards. Lin et al. [9] proposed a semantic annotation method to annotate IoT sensor data through semantics. They used K-means clustering method for knowledge discovery. The proposed mechanism is semiautomatic, because after clustering the data, there are just clusters without knowledge. The knowledge of the clusters is defined by people. Duy et al. [10] presented a light-weight data model proposal for presenting environment data using semantic middleware based on Semantic Sensor Network (SSN) Ontology for environment events management and query. This model can combine data measurement and other sensor metadata such as geo location, station, and sensor information. Xiaomin et al. [11] proposed an approach for ontology modeling which is used in evaluating the river water quality and its relevant processing knowledge. The presented model consists of the data acquisition layer, diagnosis layer and decision support layer. Rasyid et al. [12] implemented a Semantic Sensor Web (SSW) with existing gas sensor to perform readings from the environment and store into a database. In addition, they deployed API to allow users to monitor and access data gas sensors. Sejdiu el al. [3] presented a system named IoTSAS, which enables real-time integration and interpretation of semantics into observed WSN data in two IoT domains, such as air quality monitoring and weather alert monitoring.

In addition, in this study, are identified different solutions, [13,14,15], to semantically annotate sensor stream data. Those proposed solutions used non-real-time semantic annotation because the sensor stream data are stored in data store or ontology as static data and then integrated with semantics. However, how to advance techniques and models for integration and interpretation of the semantic annotations in real time in the sensor stream data is still an open issue that should be addressed.

The main contributions of our approach are presented as follows:

  • A data stream management model of WSNs for real-time IoT monitoring systems has been developed, which support real-time integration and interpretation of data from heterogeneous sensor with semantic annotations.

  • To validate the proposed model, an IoT system for real-time water quality monitoring is built, which enables real-time integration and interpretation of semantic annotations to the observational data on water quality coming from wireless sensors.

  • By incorporating OGC SOS standards, it hides the heterogeneity of data sensors from arbitrary sources and provides a real-time service interface for published enriched sensor stream data with semantic annotations to display in IoT real-time monitoring systems.

  • The developed system applies international regulatory of water quality such as Water Framework Directive (WFD) and United Nations Economic Commission for Europe (UNECE).

Background

The Data Stream Models

Depending on characteristics, source of data transmission and saving of stream data, those can be modeled in various ways: Real-time data stream, Stream items, and Window models. In the following, each of them is described:

Real-time data stream—data stream in real time is a sequence of data which arrive in order and/or in pre-processed ways, and therefore, creating a possible list of the models [16]: Unordered cash register (data from various domains that does not arrive in a specific order or with pre-processing), Ordered cash register (individual data from various domains are not pre-processed, but they attain a well-known order in some way), Unordered aggregate (individual data from the same domain that has been pre-processed and only one data from the domain comes without being ordered), or Ordered aggregate (pre-processed individual data from the same domain, with only one data arriving in a well-known order).

Stream items—since data are received in streams, those can be modeled as a sequence in a list of elements, which can take the form of relational rows or object instances [17].

Window modelsin many cases, only a fragment of the streaming data is of interest at any time, and this way encouraging window models, which can be classified according to the three criteria: Fixed sliding window (which includes only the most actual data or by showing only the newest data according to the timeframe), Landmark window (a time reference point is established and from that time data are included. This criteria is not used so much due to the increase of data quantity within the window), and Adaptive window (dynamically the window is changed based on the input data and specified values from users).

Annotated Sensor Data Streams

WSNs consist of small-scale devices that enable observe various physical phenomenon, which provide sensor data in raw format. Typically, classical IoT applications cannot interpret the sensor data and understand its context. This makes it nearly impossible to get the high-level information of the events and infer additional knowledge to gain situational awareness [18].

To provide meaning (semantics) to raw data, annotated sensor data stream is required. The annotated sensor data become more meaningful and understandable, enabling end-users to get high-level details about the real-world situations instead of raw sensor data. It is known as Semantic Sensor Web (SSW). These annotations provide more meaningful descriptions and enhanced access to sensor data than SWE alone, and they act as a linking mechanism to bridge the gap between the primarily syntactic XML-based metadata standards of the Sensor Web Enablement (SWE) and the RDF/OWL-based metadata standards of the Semantic Web [19].

To encode semantic annotations and data gathered by sensors, in this paper is used SWE, respectively, version 2.0 of the Sensor Observation Service (SOS) standard relies on the OGC Observation and Measurement (O&M).

Technologies

The proposed management model of real-time integrated semantic annotations to the sensor stream data for the IoT utilizes:

  • Spark StreamingFootnote 1—is an extension of the Apache Spark which enables to build scalable fault-tolerant IoT applications. Major studies conclude that Spark Streaming works best with high throughput when the incoming volume is huge [20]. Data can be ingested from many sources such as Apache Kafka, TCP sockets, Flume, and Twitter, and processed data using complex algorithms expressed with high-level functions such as map, reduce, join and windows. Finally, processed data can be pushed out to file systems, databases, and live dashboards.

  • Apache KafkaFootnote 2—is a distributed streaming platform which has capabilities to publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.

  • Apache Cassandra databaseFootnote 3—is a distributed store for structured data that scale-out on cheap. It is designed to handle large amounts of data.

Proposed Model

In Fig. 1, an overview of the model architecture for processing and managing sensor stream data for IoT real-time monitoring systems, such as water quality monitoring and air quality monitoring, is presented.

Fig. 1
figure 1

An overview of the model architecture

The WSNs are deployed in different locations. They produce a continuous stream of data, and transmit to Apache Kafka in various formats (e.g., binary, JSON, and XML). Kafka is utilized to transform them in a specific format that will be processed by Spark Streaming in real-time and parallel. The Spark Streaming enables a real-time integration of semantics into sensor stream data using association rules, mining data streams, WSNs metadata, and archival data streams, with concept definitions from ontologies or other semantic sources, which provides the understanding and more meaningful descriptions to enable application areas of IoT to become much more intelligent.

The enriched sensor stream data with semantic annotations results are stored in the Cassandra database, and will be displayed in IoT real-time monitoring systems in format of SOS O&M standard using stakes, such as XLink (without including XPath). A fragment of an output example is presented in Fig. 2.

Fig. 2
figure 2

A general case of integrated semantic annotations to the sensor stream data

As shown in Fig. 1, the proposed data stream management model supports real-time data integration of heterogeneous sensors with semantic annotations, continuous queries on streaming data, outlier validation of streaming data, ad hoc queries, and archive stream data with their semantic annotations for applications that need to answer queries from the archival store (persistent data stored). The proposed model consists of the main components: (A) Input Data Stream, (B) Stream Processor, (C) Data Modeling, (D) OGC standards, and (E) Ontology.

Each component of the model is described in detail as follows:

A. Input Data Stream—is implemented in Apache Kafka and accepts in real-time input streaming data sent by the WSNs. Unordered streams come without any kind of pre-processing—as unordered cash register. Each stream can provide elements at its own schedule, they do not need to have the same data rates or data types, and the time between elements of one stream need not be uniform.

B. Stream Processor is developed in Spark Streaming and contains Outlier Stream Validator and Classificator, Query Process, Ad hoc Queries, and Semantic Annotations Stream Process:

  • Outlier Stream Validator and Classificator—is part of the stream processor in charge of real-time validating sensor streaming data, which marks stream data with one of the following statuses, ‘valid’ or ‘outlier’. Validated data continues to processing forward, while invalid data are stored in Invalid Data Streams (IDS). A data stream object is considered an outlier if it does not conform to expected behavior, which corresponds to either noise or anomaly. For example, the observed value of the pH sensor is ‘-2’ or ‘NULL’, in this case this sensor data will be classified as outlier because the range value of pH phenomena is 0 to 14. Outliers can arise due to different reasons such as mechanical faults, other changes in the system, fraudulent behavior, instrument error, human error or natural deviation [21]. Therefore, the Outlier Stream Validator and Classificator provides data quality for IoT real-time monitoring systems. In addition, this component is advanced to notify system users, when the sensors are sending the outlier data (invalid data).

  • Query Processor—supports continuous queries for streaming data, and are continuously as data streams continue to arrive. The answer of the continuous query is produced over time, always reflecting the stream data seen so far. The answers of our Query Processor can include semantic annotated data in the result.

  • Ad hoc Queries—queries executed ad hoc from users. A question asked once about the current state of a stream or streams. Users can also specify ad hoc queries that integrate streaming data and persistent data stored on Working Data Streams, WSNs Metadata or Archival Data Streams. In addition, the result of Ad hoc Queries can include semantic annotated data in the result.

  • Semantic Annotation Stream Processor—is part of the Spark Streaming processor which enable a real-time integration of semantic annotations into heterogeneous sensor stream data with context in the Internet of Things. This component can use sensor metadata, archival data streams, mining data streams, association rules (with concept from ontologies), or other sematic sources for adding semantic annotations to the sensor stream data. The semantic annotated of sensor stream data are stored in Working Data Stream Annotations.

C. Data Modeling is developed in Apache Cassandra database and contains: Processor Data Streams, Working Data Streams, Working Data Stream Annotations, Archival Data Streams, Archival Data Stream Annotations, Invalid Data Streams, and WSNs Metadata:

  • Processor Data Streams (PDS)—contains a summary of streaming data for Stream Processor, which can be used for answering queries. For each deployed sensor, only a row is saved which includes information described in Table 1.

  • •Working Data Streams (WDS)—contains streaming data for operation of Stream Processor, which are configurable according the quantity and can be used for answering queries. Therefore, it is a Fixed Sliding Window that contains the last sensor streaming data (e.g., 15 last measured values—its configurable number). For each measured values, information are stored as described in Table 2.

  • Working Data Stream Annotations (WDSA)—store semantic annotations of sensor streaming data. The Semantic Annotation Stream Processor component is used to tag in real-time integration the sensor data stream with semantic annotations. One measurement that is stored in Working Data Streams can obtain several sematic annotations, which include information described in Table 3.

  • Archival Data Streams (ADS)—archives data streams for generating reports and different statistics. The structure of data modeling of ADS is same as WDS.

  • Archival Data Stream Annotations (ADSA)—archives semantic annotations of sensor stream data for generating reports and different statistics. The structure of data modeling of ADSA is same as WDSA.

  • WSNs Metadata (WMD)—the data describing wireless sensor networks itself, its devices and the corresponding site allocation data. This data is named as static data that describes the wireless sensor networks in the field, its configuration which might involve node types, such as sensing nodes, gateway nodes, central monitoring node, and description of sensors as devices (sensor name, serial number, manufacturer, and type), as well as data about the deployment sites, such as sensors’ location, example for water system monitoring, the river basins, and municipalities the rivers belong to.

  • Invalid Data Streams (IDS)—archives invalid sensor stream data that are classified as outlier by Outlier Stream Validator and Classificator. The data stored in IDS is optional and it is depending on the system requirements.

Table 1 Processor data streams (PDS) model
Table 2 Working data streams (WDS) model
Table 3 Working data stream annotations (WDSA) model

D. OGC Standards—as mentioned above, the enriched sensor stream data with the semantic annotations results will be published to IoT real-time applications in format of OGC standards, respectively, version 2.0 of the SOS O&M standard.

E. Ontology—an ontology named ‘ont-core.owl’ is created, shown in Fig. 3. This ontology contains semantic annotations for each IoT domains, example for Water Quality Monitoring domain, Air Quality Monitoring domain, and Weather Alerts Monitoring domain. In our case are used Water Quality Monitoring developed semantic annotations. More details are presented in “Integration and Interpretation of Semantic Annotations into the Sensor Stream Data”.

Fig. 3
figure 3

‘ont-core.owl’ ontology

Details about the working cycle of this model are as follows: streaming data are sent by the wireless sensors networks in Input Data Streams (Apache Kafka). Sensor stream data is an array of different types containing sensor id (sid), name of the parameter, measured value of the sensor, geographical position (latitude and longitude) and timestamp, as seen as follows:

‘SId: 1; Parameter: Temperature; Value: 17.15; Lat: 42.706703186; Long: 21.038431; Timestamp: 20200312165213’

The stream data elements are then validated using the Outlier Stream Validator, in which every sensor stream data takes the status of validity (true—data is valid or false—data is outlier). When data takes the validation status ‘true’, it will be transmitted for further processing in Semantic Annotated Stream Processor which makes real-time integration of semantic annotations into those stream data. Then the enriched sensor stream data with the semantic annotations results will be stored in WDS and WDSA and will be transform in SOS O&M format to displayed in IoT real-time monitoring systems.

It is worth mentioning, when a new value is measured by the sensor, it arrives in WDS (and their semantic annotations are stored in WDSA) then the oldest value is removed from there and goes to ADS (respectively, ADSA) for archiving. Therefore, in ADS and ADSA are archived data which serve for generating reports and statistics for longer time frames.

Implementation of Proposed Model

To validate the proposed model in this study, we implemented a prototype illustrating a water quality monitoring use case, named as Water Quality Monitoring System (WQMS). This system enables the measurement of water quality in real time by applying the latest technology trends, such as wireless sensor networks, which provide continuous monitoring, and consist of nodes referred to as motes that are sensitive to the environment where they are deployed.

Received Sensor Stream Data Format

WQMS enables water parameters monitoring such as temperature, potential of hydrogen (pH), and dissolved oxygen (DO). The type, rank, and unit of these parameters are shown in Table 4. The data produced by the sensor are a real number, e.g., 16.5 °C for temperature measurement. The sensors are configured in such a way that each node will send data every 10 min in XML format. Therefore, the rate of the data stream is not high, therefore, the storing of these data by applying the proposed model, nowadays, will not be a problem for the new technologies.

Table 4 Specification of water parameters

The WQMS system architecture is presented in Fig. 4. It mainly consists of the static wireless sensing nodes (which are located in Plemetin (42.70670318, 21.03843116)—Fig. 5.a), mobile wireless sensing nodes (which can observe data from different locations—Fig. 5.b), gateway node and the monitoring node. Static wireless sensing nodes are located in a given position and through gateway node, continuously transmit sensed data to the central monitoring node, while mobile wireless sensing nodes can move from one position to the other to measure water parameters.

Fig. 4
figure 4

System architecture: implementation of proposed data stream management model in WQMS

Fig. 5
figure 5

System implementation in Plemetin: a static sensor nodes (left), and b mobile sensor nodes (right)

The sensor data in the central monitoring node are transmitted via 3G/GPRS using web services. The WQMS software is installed in the central monitoring node, in which the proposed management model is implemented.

Integration and Interpretation of Semantic Annotations into the Sensor Stream Data

As mentioned above, an ontology named ‘ont-core.owl’ is created. As shown in Fig. 6, here are developed semantic annotations for international regulatory of water quality, such as:

  • Water Framework Directive (WFD) with semantic annotations of water status:

  • Good,

  • Moderate,

  • Poor,

  • Bad, and

  • High.

  • United Nations Economic Commission for Europe (UNECE) with semantic annotations of water status:

  • Class I,

  • Class II,

  • Class III,

  • Class IV, and

  • Class V.

Fig. 6
figure 6

‘ont-core.owl’ ontology for Water Quality Monitoring

An example of annotation for the Conductivity parameter (µS/cm) is given in Fig. 7. If the value observed by the conductivity sensor is in the range 0–500 µS/cm, the water status is categorized as high and the semantic annotation result is #High. If the value observed by the sensor is between 500–700 µS/cm, then the system creates the annotation #Good. Both of these types of stators are accepted in terms of water quality by WDF. For other annotations like: #Moderate (700–1000 µS/cm), #Poor (1000–2000 µS/cm), and #Bad (2000–5000 µS/cm), failing to achieve good (unacceptable—does not meet WDF goals).

Fig. 7
figure 7

Water status annotation

The sensor stream data may arrive in different formats to Kafka server (XML format—in our case), which will transform them in a specific format that will be processed by Spark Streaming. After that, through the Spark Streaming, based on measuring values, the sensor data stream will semantically be annotated and converted in SOS O&M format. A fragment of an example of integrated semantic annotations to the SOS O&M format using stakes such as XLink and Embedded is presented in Fig. 8.

Fig. 8
figure 8

An example of integrated semantic annotations to the IoT water quality monitoring sensor stream data

After the real-time integration of semantics into heterogeneous water sensor stream data, the real-time interpretation of the sensor stream data is done to give a better understanding and infer new knowledge from the water sensor stream data. The following interpretation pattern for water quality IoT domain is developed:

Now (@[#timestamp]) in location [#location(lat, long)] is detected ‘[#ParamValue] [#ParamUnit]’ [#ParamName], ‘[#WaterStatus]’ water status, which [#WaterStatus_Indicates].

As shown in Fig. 8, the interpretation of semantic annotations contains the data about timestamp, location (including latitude and longitude) of sensing node, the phenomenon with the measured value which has caused the water status, and meaning of the current water status:

“Now (@2020-04-03 16:40) in location 'Pelemetin (42.70670318, 21.03843116)' is detected '47.20%' dissolved oxygen, which indicates a ‘High’ water status with no or very low human pressure and the water is not polluted at all.”

System Outputs (UI)

To display the enriched sensor stream data with semantic annotations and interpretation, a web-based IoT real-time application is developed. The interface of the software is shown in Figs. 9 and 10, which enables real-time water quality monitoring through static and mobile sensors. This software includes modules for system administration (to manage users, user groups, rights of users, and change password), definition of the continuous queries, execute of the ad hoc queries, and configuration of the WSNs metadata.

Fig. 9
figure 9

Outputs of proposed model in WQMS

Fig. 10
figure 10

Interpretation of semantic annotations

WQMS software executes continuous queries of the proposed model to display information. The displayed information in the textboxes for each parameter are obtained from Processor Data Streams through continuous queries. The information displayed in the charts is obtained from Working Data Streams, while the semantic annotations data that indicate the water status is obtained from Working Data Stream Annotations. As mentioned above, Working Data Streams represent a fixed sliding window which is configured in the WQMS by a certain pre-configured size, say 15. This means that the charts show the last 15 measurements for each sensor. As soon as the data reaches the system from the WSNs, the trigger for execution of queries continuously is activated.

The system outputs (UI) is developed in ASP.NET Core MVC,.NET 5.0 C#, a cross-platform, high-performance, open source for building modern, cloud-based, and internet-connected applications. To read data from Apache Cassandra database, the ‘DataStax C# for Apache Cassandra’ is used.

Conclusion and Future Work

In this research, the proposed management model of integrated semantic annotations into the sensor stream data for the Internet of Things is described.

This model supports managing stream data of homogeneous sensors, real-time integration of semantic annotations to the sensor stream data, continuous queries on streaming data, ad hoc queries, outlier validation of streaming data, archive stream data with semantic annotations for applications that need to answer queries form archival store (persistent data stored).

The proposed model support standards like Sensor Web Enablement (SWE), respectively, version 2.0 of the Sensor Observations Service (SOS) standard that relies on the Open Geospatial Consortium (OGC) and Observation & Measurement (O&M), to encode semantic annotations and data observed by sensors.

To validate the conceptual model proposed, a system for water quality monitoring, named Water Quality Monitoring System (WQMS) is implemented by applying advanced technologies of the Internet of Things such as WSNs. The implemented WQM System enables water quality monitoring in real time.

For the future work, among other several extensions under way of the proposed model, the following should be considered:

  • 1.To advance annotation techniques, such as XPath, for integration and interpretation of the semantic annotations in real-time into heterogeneous sensor observation data and metadata with context in the Internet of Things.

  • 2.To implement other international regulatory of water quality.

  • 3.To evaluate the system performance and to compare the proposed model with other existing similar management schemes.