1 Introduction

With the recent proliferation of rich social media across a variety of personal devices, considerable attention has shifted to the challenge of adaptively distributing and streaming multimedia content over the Internet. Among the emergent technologies, cloud-based media streaming, transcoding, and distributed storage have been the most noteworthy and influential. Before the advent of cloud computing technology, multimedia services employed traditional distributed and cluster-based computing approaches for media transcoding and streaming processing. However, supporting the quality of service (QoS) is difficult for these multimedia services using traditional approaches because of the explosive growth in mobile services [1] and multimedia traffic caused by the high resolution and capacity of recent multimedia formats.

In the areas of traditional multimedia transcoding and streaming, much research has focused on distributed and cluster-based video media approaches [26] for reducing the transcoding processing time to facilitate the delivery and transmission of multimedia to the end user while considering limited network traffic, as well as streaming job distribution. However, these approaches have several problems and limitations. First, these transcoding approaches only focus on obtaining computing resources for multimedia transcoding processes simply by increasing the number of cluster machines in a parallel and distributed computing environment. Second, during the recovery phase of a QoS-guaranteed streaming system, these approaches do not include load balancing, fault tolerance, or a data replication method that ensures data protection and expedites recovery [7]. Thus, the absence of an automated recovery policy to prevent the loss of multimedia content and system faults in traditional streaming systems mean that these approaches cannot guarantee the reliability of a system when providing rich media services. Finally, most of the systems that use these approaches do not include or consider a media transcoding function for streaming services, thus vendors and developers find it difficult to simultaneously construct and develop transcoding and streaming modules for distributed and cluster environments.

To overcome these limitations and problems, many researchers and developers have adopted cloud computing technologies [812] for multimedia service owing to advantages such as a flexible dynamic IT infrastructure, QoS-guaranteed computing environments, cloud programming models [13], and configurable software services [14]. Cloud-based media streaming services that are distributed over the Internet have been released: iCloud (Apple), Cloud Player and Cloud Drive (Amazon), Azure Media Services (Microsoft), and Netflix. Cloud-based transcoding services such as Amazon Elastic transcoder, Zencoder, and Ankoder have been released [15].

Cloud-based technologies have emerged owing to the features of recent multimedia services: the heterogeneity of media, QoS, networks, and devices [16]. To support such features, media streaming, transcoding, and distribution must depend on massive—and massively scalable—computational resources: i.e., CPUs, memory, network bandwidth, and storage. Although cloud computing can provide these resources, in doing so, it also introduces a heavy burden on existing Internet infrastructure and cloud resources as well as a host of new challenges (e.g., cluster rebalancing, namespace management, data distribution/replication [17], auto-recovery, and fault tolerance), which are intensified by the massive swings in traffic associated with rich media streaming. Developers and service vendors have both found that these challenges are difficult to resolve, and they continue to hinder current media delivery systems.

To address these challenges, we propose a cloud-based distributed multimedia streaming service (CloudDMSS) system based on Hadoop [18, 19], which is designed to run on the current cloud computing infrastructure. The capacities of CloudDMSS include the following:

  1. (1)

    Transcoding of large volumes of media into the MPEG-4 video format for delivery to a variety of devices including PCs, smart pads, and phones;

  2. (2)

    Reduction in the transcoding time by incorporating the Hadoop distributed file system (HDFS) for the storage of multimedia data and MapReduce for distributed parallel processing;

  3. (3)

    Reduced content delay and traffic bottlenecks using a streaming job distribution algorithm;

  4. (4)

    Improvement in the overall performance using dual-Hadoop clustering for each physical cluster;

  5. (5)

    Efficient content distribution and improved scalability by adhering to Hadoop policies;

  6. (6)

    Conducting the workflow of sequential tasks automatically during streaming service deployment.

In this study, we describe the design of the CloudDMSS architecture with an HDFS-based [20, 21] distribution and storage function for source media files, a batch processing function to transcode a large number of media files, an automatic migration function for transcoding contents to content servers based on HDFS, and a streaming job distribution strategy. We designed and defined a set of systematic cloud-based multimedia streaming processing workflows. We focused on using CloudDMSS to conduct a workflow based on a dual-Hadoop cluster for each physical cluster using a variety of open sources and we developed a Web-based dashboard to support user interfaces by monitoring cloud resources. Experimental evaluations of CloudDMSS were conducted to verify its performance.

The remainder of this paper is organized as follows. Section 2 discusses relevant research on cloud-based streaming services. Section 3 describes the core architecture of CloudDMSS and the workflow of the streaming service deployment process, as well as presenting four robust algorithms that satisfy the design requirements of our service architecture. Section 4 explains the prototype of the proposed system and its configuration. In Sect. 5, we discuss the results of several experiments conducted using a 28-node cluster over a local testbed. In Sect. 6, we discuss performance evaluations conducted in an actual cloud environment. Section 7 presents our concluding remarks and plans for future work.

2 Related works

In recent years, many researchers have applied cloud computing technologies to rich media services in response to the explosion in demand. We consider the three aspects that are most relevant to our CloudDMSS system: Hadoop, media transcoding, and multimedia cloud computing.

2.1 Hadoop

Hadoop was inspired by Google’s MapReduce and Google File System [22], and it is a software framework that supports data-intensive distributed applications, which are capable of handling thousands of nodes and petabytes of data. Hadoop facilitates the scalable and timely analytical processing of large datasets to extract useful information. Hadoop comprises two important frameworks: (1) HDFS, which is a distributed, scalable, and portable file system written in Java, like the Google file system (GFS); and (2) MapReduce, which was the first framework developed by Google for processing large datasets. The MapReduce framework provides a specific programming model and a runtime system for processing and creating large datasets that are suitable for various real-world tasks [5]. This framework also handles automatic scheduling, communication, and synchronization during the processing of huge datasets and it has a fault tolerance capacity. The MapReduce programming model is executed in two main steps called mapping and reducing. Mapping and reducing are defined by mapper and reducer functions. Each phase requires a list of key and value pairs as the input and output. In the mapping step, MapReduce receives the input dataset and feeds each data element to the mapper in the form of key and value pairs. In the reducing step, all of the outputs from the mapper are processed and the final result is generated by the reducer using the merging process.

2.2 Media transcoding

The term media transcoding has been defined in many previous studies, such as [23, 24]. In [14], multimedia information must be adapted to bring multimedia contents and service to numerous heterogeneous client devices while retaining the capacity for mobile usage, which is referred to as media transcoding technology.

Figure 1 shows the architecture of a legacy transcoding system. First, the client requests a transcoding function from a transcoding server. The transcoding sever reads the original media data from the media server and then proceeds to transcode the data depending on user requested resolution, bit-rate, and frame rate. The transcoding server then sends the transcoded media data to the client [25]. However, this media transcoding processing imposes a heavy burden on the existing internet infrastructure and computing resources because more recent media files, such as video and image files, have changed to high capacity/high definition.

Fig. 1
figure 1

Architecture of a legacy transcoding system [25]

Therefore, many researchers have applied distributed and parallel computing to media transcoding methods. For example, Guo et al. [4] proposed a cluster-based multimedia web server, where they designed and implemented a media cluster that dynamically generates video units in order to satisfy the bit rate requested by many clients, as well as proposing seven load balance scheduling schemes for the MPEG transcoding service. Sambe et al. [26] designed and implemented a distributed video transcoding system with the capacity to transcode an MPEG-2 video file into diverse video formats with different rates. The main reason for transcoding a video file is that the transcoder chunks the MPEG-2 video file into small segments along the time axis, before transcoding them in a parallel and distributed manner.

Tian et al. [27] described a cluster-based transcoder that transcodes MPEG-2 format video files into MPEG-4 and H.264 format video files with a faster transcoding speed. This system comprises a master node and a number of worker nodes. The master node has six threads, a splitter, merger, sender, receiver, scheduler, and an audio transcoder.

2.3 Multimedia cloud computing

Multimedia traffic has increased dramatically over the Internet since the release of various personal devices and changes in the content of video and image files to high capacity and high definition. To support a high QoS for heterogeneous devices such as smartphones, personal computers, smart televisions, and smart pads, many researchers and developers have tried to apply cloud computing technologies to multimedia services. In this section, we introduce the basic concept of multimedia cloud computing and we consider some recent studies.

Zhu et al. [16] introduced the first principal concept of the multimedia cloud computing model, where they addressed multimedia cloud computing from multimedia-aware cloud (media cloud) and cloud-aware multimedia (cloud media) perspectives. Figure 2 shows the relationship between the media cloud and cloud media services. A multimedia-aware cloud perspective focuses on how the cloud can provide QoS for multimedia applications and services. A cloud-aware multimedia perspective focuses on how multimedia can perform content storage, processing, adaptation, rendering, and other functions in the cloud to best utilize cloud-computing resources, thereby delivering a high quality of experience for multimedia services.

Fig. 2
figure 2

Relationship between media cloud and cloud media services [16]

To the best of our knowledge, there have been few previous reports of multimedia cloud computing. Hui et al. [28] proposed MediaCloud, which is a layered architecture that defines a new paradigm for dealing with multimedia applications and services. The architecture comprises three layers—a Media Service Layer, a Media Overlay Layer, and a Resource Management Layer—and it addresses key challenges such as heterogeneity, scalability, and QoS provisioning. However, this architecture operates mainly at the conceptual level and it leaves most of the challenges of real-world implementation for future work [28]. By contrast, Luo et al. addressed the implementation challenge of QoS delivery over a virtualized infrastructure by presenting a practical architecture and mechanism for a private media cloud [29]. They described their system in terms of four major components: monitoring, load balancing, traffic management, and security. In the context of cloud-based streaming, Lee et al. [15] proposed a configuration scheme for connectivity-aware P2P networks based on algorithms for connectivity-aware mobile P2P network configuration and connectivity-aware P2P network reconfiguration. Chang et al. [30] described a cloud-based media streaming architecture that dynamically adjusts streaming services in response to mobile device resources, multimedia codec features, and the network environment. They also presented a design for a stream dispatcher component, including the real-time adaptation of codecs in response to client device profiling and a dynamic adjustment of multimedia streaming algorithm. Huang et al. [31] presented CloudStream, which is a cloud-based video proxy that is capable of delivering high-quality video streams by transcoding the original video in real time to a scalable codec, thereby allowing the adaptation of the stream to various network dynamics. They also proposed a multi-level transcoding parallelization framework with two mapping options: Hallsh-based mapping and lateness-first mapping.

3 Architecture and workflow of CloudDMSS

In this section, we describe the design strategy of the CloudDMSS system, including the overall workflow from uploading original multimedia content to supporting a streaming service for end users, without content delay and traffic bottlenecks. Figure 3 shows the fundamental concept of our service model.

Fig. 3
figure 3

Fundamental concept of the cloud-based distributed multimedia streaming service model (CloudDMSS)

Personal media data such as movies, music videos, and animations are distributed and stored on a transcoding Hadoop cluster. Users and administrators upload media data for transcoding to share the data with other users. After uploading, the media data are transcoded into a standard format (MPEG4) that is suitable for streaming to heterogeneous devices. To reduce the transcoding time, our model applies a transcoding function that utilizes the MapReduce framework. The transcoded contents are then migrated automatically and stored on the content servers of a streaming Hadoop cluster. The migrated contents are streamed to the end users with guaranteed QoS by controlling the streaming servers that run on the streaming Hadoop cluster. To reduce content delays and traffic bottlenecks, our service model utilizes a streaming job distribution algorithm, which balances and distributes the load of the streaming servers.

3.1 Overall system architecture

Our proposed CloudDMSS is designed to run on a Hadoop cluster for heterogeneous devices in a distributed manner. The overall system architecture is shown in Fig. 4. CloudDMSS has three main components: Hadoop-based distributed multimedia transcoding (HadoopDMT), Hadoop-based distributed multimedia streaming (HadoopDMS), and cloud multimedia management (CMM). The characteristics of our system are as follows. (1) Our system transcodes large amounts of media content into the MPEG-4 video format for delivery to a variety of devices, including PCs, smart pads, and phones. (2) It reduces the transcoding time by using HDFS for multimedia data storage and MapReduce for distributed parallel processing. (3) CloudDMSS controls streaming servers to reduce content delay and traffic bottlenecks using a streaming job distribution algorithm. (4) Dual-Hadoop clustering per physical cluster is used to improve the overall performance and distribute job tasks between transcoding and streaming. (5) CloudDMSS provides efficient content distribution and improved scalability by adhering to Hadoop policies. (6) It automatically conducts the workflow of sequential tasks for a streaming service deployment process.

Fig. 4
figure 4

Overall architecture of cloud-based distributed multimedia streaming service system

3.2 CloudDMSS system architectural components

3.2.1 HadoopDMT

The main role of HadoopDMT is to transcode a various multimedia data stored on a transcoding Hadoop cluster into MPEG4, which is the standard format for media streaming to a variety of devices. HadoopDMT improves the quality and speed by using the HDFS [32] to store video data from many sources, MapReduce [32] for distributed parallel processing of these data, and Xuggler [33] to transcode the data. The capabilities of HadoopDMT are as follows. (1) HadoopDMT comprises a codec transcoding function with a configurable display size, codec method, and container format. (2) It focuses mainly on the batch processing of large video files collected over a fixed period of time, rather than processing small video files collected in real time. (3) HDFS is used to avoid the high cost of communicating video files during data transfer for distributed processing. HDFS is also used because the large chunk size (64 MB) policy is suitable for processing video files and the user-level distributed system. (4) HadoopDMT incorporates the load balancing, fault tolerance, and merging and splitting policies provided by MapReduce for distributed processing.

HadoopDMT [34] is divided into four main domains: video data collection domain (VDCD), HDFS-based splitting and merging domain (HbSMD), MapReduce-based transcoding domain (MbTD), and cloud-based infrastructure service domain (CbISD). Figure 5 shows the detailed structure of HadoopDMT.

Fig. 5
figure 5

Detailed structure of HadoopDMT

First, the main contribution of VDCD is to collect the different types of original encoded video files created by media creators such as SNS providers, media sharing services, and personal users, as well as the storage of these files on the HDFS of the transcoding Hadoop cluster. It also collects the transcoded video datasets converted to a target format file by a transcoding processing step based on MapReduce in MbTD, and stores them on the HDFS. The period required to collect the original encoded video datasets can be set by administrators and users, depending on the dataset size and acquisition time. Second, the main role of HbSMD, which runs on HDFS, is to split collection of original video datasets into blocks with a configured size and to automatically distribute all of the blocks over the cluster. In HbSMD, the default block size is set to 64 MB, but it can be changed to various other values by administrators and users, such as 16, 32, 128, 256 MB, etc. When a block is distributed, it is replicated at three data nodes according to the Hadoop distribution policy, thereby complying with the overall distributed processing procedure and enabling recovery from a system failure caused by data loss. The other role of HbSMD is to merge the blocks transcoded by transcoders in MbTD into target video files and to transmit the video files to VDCD. The number of block replicas is set to 1, 2, 4, 5, etc. Third, MbTD performs several tasks that transcode the distributed blocks in each data node using a MapReduce-based distributed transcoder module with Xuggler. A single MapReduce-based distributed transcoding task is managed as one MapReduce job. It is also scheduled by JobTracker in a name node and TaskTracker in a data node. First, JobTracker schedules and monitors the overall job registered in HadoopDMT. If a transcoding job is submitted to HadoopDMT, one JobTracker task is run. JobTracker calculates the number of distributed tasks (Map tasks) that are split into blocks. When a 10-GB dataset, i.e., \(50 \times 200\) MB files, is transcoded with the default block size option of 64 MB, 200 Map tasks are generated. After the number of tasks has been determined, JobTracker assigns Map tasks to the TaskTrackers, and each TaskTracker performs the assigned tasks. As blocks are transcoded, each TaskTracker periodically sends heart bit method calls to the JobTracker to maintain the robustness against task failures. If a Map task fails, JobTracker detects the situation via the heart bit, and recovers the task failure by rescheduling and reassigning the remaining tasks, including failed tasks. Data node 1 and transcoder 1 are located in the same physical machine. First, the transcoders implement the decoding step. Next, the resizing step is implemented if the users and administrators require a change in the resolution of a video file. If such a change is not required, the transcoders skip this step. The transcoders encode the decoded blocks into a target file based on the requirements of the user. Finally, CbISD offers infrastructure services in a cloud computing environment via server, storage, CPU, and network virtualization techniques. Because of the massive storage space and enormous computing resource requirements of such systems, small service vendors are unable to afford the cost of building them. When users require logical computing resources to build and implement this system, CbISD automatically deploys a virtualized cluster environment. CbISD allows users to select a specific configuration of memory, CPU, storage, and the number of clusters. In addition, it provides the easy installation and configuration environment of HDFS and MapReduce, which require little effort from the user. In this study, we present the idea and concept of CbISD, but its implementation is not considered.

The other role of HadoopDMT is to automatically migrate the transcoded media content stored on the HDFS of the transcoding Hadoop cluster to the HDFS of a streaming Hadoop cluster in HadoopDMS. In addition, HadoopDMT extracts the thumbnail images of transcoded content and transmits the extracted thumbnail images to HadoopDMS. After the content migration and thumbnail extraction tasks have been completed, HadoopDMT deletes the original media content and transcoded content stored on the HDFS of the transcoding Hadoop cluster in order to release storage space for the next transcoding task.

3.2.2 HadoopDMS

HadoopDMS, which runs on the streaming Hadoop cluster, comprises HDFS-based data nodes that act as content storage servers, a name node that acts as a namespace server, and streaming severs. Our system uses a dual Hadoop cluster on one physical cluster, including the transcoding Hadoop cluster, in HadoopDMT and the streaming Hadoop cluster in HadoopDMS, which balances the task loads such as transcoding and streaming. The quality of the streaming service is not guaranteed if both a transcoding task, which requires intensive computing resources, and a streaming task are conducted at the same time.

The first role of HadoopDMS is to store the transcoded contents migrated from HadoopDMT on data nodes of the streaming Hadoop cluster in a distributed manner and to provide the capacity for content replication and recovery if data node failures and loss of content occur. The most important factor when providing a streaming service is the maintenance of a seamless streaming service, which requires that users cannot recognize data node failures and loss of content. A streaming service system that uses the traditional approach does not include content replication management and automated recovery policies, so the system reliability and seamless streaming service cannot be guaranteed. To overcome these problems, HadoopDMS includes intelligent content replication and recovery policies.

Algorithm 1 is the content replication policy. First, HadoopDMS splits the content into blocks with the configured size (default \(= 64\) MB) and it then creates two replicas of each original block. In HadoopDMS, the default block replication is set to 3 (one original block and two replicas of original block). However, this can be changed by administrators and users because the block replication factor affects the system performance, depending on the system specifications and physical system configuration. In the performance evaluation section (Sect. 5), we describe the optimal Hadoop options, including the block replication factors, and block size options. In particular, we experimentally verify that the performance is better when the block replication factor value is set to \(\ge \)3 (3, 4, 5). The worst performance is obtained when the value is set to 1, because new blocks with problems should be copied and transferred to a new data node on HDFS if task and disk failures and data loss occur. To maintain the robustness of the system to task failures, CloudDMSS uses two replicas as the default option. CloudDMSS is designed to be deployed on low-cost hardware and commodity hardware, thus our system focuses on high fault-tolerance rather than incurring the cost of building numerous hard disks for replicas. After the replicas have been created, they are distributed on the data nodes while complying with the Hadoop storage policy. The name node of HadoopDMS then collects metadata, including the status information for data nodes, location information of stored content, and the content file names required to maintain and control the streaming Hadoop cluster and multimedia content.

figure g

When data loss and data node failures occur, the name node in HadoopDMS detects the situation based on the status information, which is collected periodically by the node, and it performs an automatic recovery scheme using Algorithm 2. Algorithm 2 facilitates system recovery in the event of data loss and data node failures. First, the name node (nn) sends a check packet (cp) to all of the data nodes to detect data loss, before comparing the number and name of the blocks in the block information (bi) of the name node with those of the check packet (cp) generated by each data node. If the name node and data node have different bi, a new content replication task is performed by Algorithm 1 in each node. After checking the data loss in each data node, nn sends a heart bit (hb) to all of the data nodes to detect any failures. If nn receives the hb generated by each data node within 5 min, there is no system failure on the node. Otherwise, the policy for recovery from system failure is executed. The procedure of the recovery policy based on nn is as follows. (1) nn reconfigures the cluster with data nodes, excluding the failed node, and conducts a new content replication task. (2) The new bi is updated in nn. (3) nn automatically detects the cause of the system failure and recovers the failed node automatically via remote inspection. (4) After the completion of recovery, nn restarts the overall system and reconfigures the cluster to include the recovered data node. (5) nn repeats (1) and (2).

figure h

The second role of HadoopDMS is to provide users with the thumbnail images extracted from HadoopDMT so that they can easily search and select media content on the Web interface. These thumbnail images are stored on a specific data node and registered on a thumbnail DB of the DB server in CMM to facilitate their management.

3.2.3 CMM

As a core part of CloudDMSS, CMM controls and manages a various tasks created during each phase of the streaming service deployment process, while it also monitors the overall system usage and status information related to streaming processes, such as uploading, transcoding, and content migration for users and administrators.

Our CMM comprises a Web-based dashboard module, management server module, and database server module. The Web-based dashboard module provides an interface that allows users to select the options required for transcoding tasks, such as the resolution, bit rate, and frame rate, as well as monitoring the usage rate of the dual Hadoop cluster and streaming servers in HadoopDMS (CPU, RAM, and streaming transmission rate). The management server controls and conducts the scheduling of the overall process such as transcoding, migration, extracting thumbnail images, registering the images and information related to the transcoded content to the database server module of CMM, and job distribution. The management server is scheduled using Algorithm 3.

figure i

The core function of the management server is to effectively balance and distribute any rapidly increasing streaming tasks, while maintaining the simultaneous connections of multiple users. A streaming job distribution scheme is carried out by round robin (RR) [35], least connection (LC) [35], and streaming resource-based connection (SRC) algorithms. We describe the SRC algorithm in detail because RR and LC have been described in many previous studies. The RR streams media content by selecting streaming servers in a prescribed order after video streaming requests are received from users. In LC, the media content is streamed by selecting the streaming server with the lowest number of streaming tasks. However, systems that apply RR and LC do not consider the CPU utilization rate and network transmission rate, thus they are limited because they impose a heavy burden on the current Internet infrastructure and streaming servers. Thus, we introduce an SRC scheduling algorithm that considers the CPU, RAM, and streaming transmission rate usage of servers, which resolves the limitations of the RR and LC distribution methods. The algorithms consider the CPU, RAM, and streaming transmission rate usage of servers generated from each streaming server. The Linux command mpstat is used to generate statistics about the CPU usage servers. The free command is used to inquire about the RAM usage and /proc/net/dev is used to inquire about the streaming transmission rate. Algorithm 4 uses SRC in CloudDMSS.The database server module manages and registers the thumbnail images in the thumbnail DB and content information, including the file name and location, in the content DB. Users can easily access streaming services by utilizing the DB information on our Web interface.

figure j

3.3 Workflow of CloudDMSS

In this section, we define the workflow of the sequential tasks in CloudDMSS during the streaming service deployment process in the overall system. Figure 6 shows the overall workflow of CloudDMSS. The CloudDMSS workflow is divided into two parts: content transcoding processing and distributed streaming processing.

Fig. 6
figure 6

Overall workflow of CloudDMSS

3.3.1 Content transcoding processing workflow

We introduce the workflow required for content transcoding processing to stream content to heterogeneous devices. Figure 7 shows a diagram of the workflow of content transcoding processing. Users and administrators can access our user interface via CMM to upload their own original multimedia content. After an upload task is complete, they select the user-requested options (resolution, frame rate, bit rate, codec, and container) for transcoding and then request a content transcoding task from a MapReduce-based distributed transcoder of CMM. Immediately after the task is requested, CMM prepares a preprocessing procedure for the task. The preprocessing procedure comprises two steps. First, CMM creates an SSH shell in both HadoopDMT and CMM to allow remote command execution and secured content migration between both components. Second, the file names of the transcoded and original contents are the same, so CMM creates a new folder to store the transcoded contents to avoid any content loss due to file redundancy. After the preprocessing procedure is complete, a management server module in CMM performs the transcoding task by operating the MapReduce-based distributed transcoder in a distributed manner. Subsequently, the management server module commands a thumbnail extraction task from a thumbnail image extractor in HadoopDMT, and the extractor extracts thumbnail images of the transcoded content using FFmpeg supported by Xuggler [33] libraries. In the next step, the transcoded content and extracted thumbnail images are migrated to HadoopDMS via a content migrator in HadoopDMT. After migration, the original multimedia content, transcoded content, and thumbnail images are deleted by the management server, which then registers the thumbnail images to a thumbnail database in a CMM database server, as well as content information, including the file name and location, to a content database in the same server.

Fig. 7
figure 7

Diagram showing the workflow of content transcoding processing

3.3.2 Distributed streaming processing workflow

The distributed streaming processing workflow with a streaming job distribution scheme is the most important part of the overall processing flow. Figure 8 illustrates the distributed processing workflow. Thumbnail images are updated periodically in a Web interface and the metadata of the thumbnail images are registered each time a transcoding task is completed. When users select and request media content by clicking a thumbnail image, CMM obtains the location and file name of the selected media via a content database. CMM then requests the status of streaming servers via a system status collector and selects the optimal streaming server from those in HadoopDMT based on the SRC algorithm, as described in Sect. 3.2.3. The content streaming service begins after the optimal streaming server is selected.

Fig. 8
figure 8

Diagram showing the workflow of distributed streaming processing

4 Implementation and prototype

We designed a robust CloudDMSS based on Hadoop clustering to provide a seamless streaming service with adequate QoS over the current Internet. We focused on developing a system that automatically conducts the entire workflow of streaming tasks, including uploading, transcoding, migration, thumbnail extraction, content registration, and streaming job distribution, by a single click of the content upload button on the Web interface provided. We implemented a prototype of the overall system architecture running on a real cloud cluster to validate the feasibility of our service architecture according to the design issues described above.

4.1 Cluster configuration and implementation

Figure 9 shows the hardware configuration of CloudDMSS. In our prototype implementation of CloudDMSS, we constructed our own cluster servers in a cloud computing environment, which comprised 28 nodes in total. Each node consisted of Linux OS (Ubuntu 10.04 LTS) running on two Intel Xeon quad-core 2.13 GHz processors with 4 GB registered ECC DDR memory and 1 TB SATA-2 disk storage. All of the nodes were interconnected via 100 Mbps Ethernet adapters. To distribute the loads of transcoding and streaming tasks, we divided the cluster into a dual Hadoop cluster: a physical cluster that included a transcoding Hadoop cluster with 13 nodes and a streaming Hadoop cluster with 10 nodes. To implement the CMM component, one node was designated as our management server running Tomcat 7.0 and a second node was designated as a database server running on MySQL, which included a content database and thumbnail database. The management server used JSP to implement the Web-based dashboard, swfupload 2.2.0.1 libraries [36] for the content upload function, a Java library and bash shell script to run the SRC algorithm, and Google chart APIs to generate the graph showing the monitoring system status. The HadoopDMT component comprised one name node and 12 data nodes running on HDFS in the transcoding Hadoop cluster. The component used Hadoop 1.0.4 to store original multimedia content, Java 1.6.0_39 (64-bit), and Xuggler 3.4.1012 (64-bit) to implement a MapReduce-based distributed transcoder. The HadoopDMT component had 13 nodes: three streaming server nodes running on NginX [37] and 10 content storage servers. The content storage servers comprised one name node and nine data nodes running on HDFS in the streaming Hadoop cluster. Our software specifications for HadoopDMT included an H.264 streaming module 2.2.7 for implementing the streaming function, NginX 1.2.7 for the streaming servers, and fuse_dfs_ 0.1.0 to allow HDFS to be mounted on the UNIX system as a standard file system.

Fig. 9
figure 9

Detailed cluster configuration of CloudDMSS

4.2 Prototype

The output from the CloudDMSS prototype is shown in Figs. 10, 11 and 12. The interface provided by the Web-based dashboard module of CMM was designed and implemented for use by various Web browsers such as IE, Firefox, Chrome, and Safari, rather than requiring specific browsers. Figure 10 shows the front page of the Web-based dashboard for streaming transcoded content in a PC-based Web browser (left) and mobile-based Web browser (right), which allows users to select thumbnail images extracted from the transcoded content and menus for uploading, transcoding, and monitoring via the Web interface.

Fig. 10
figure 10

Front page of the Web-based dashboard for streaming transcoded content in a PC-based Web browser (left) and a mobile-based Web browser (right)

Fig. 11
figure 11

Web-based dashboard for transcoding tasks and resources

Fig. 12
figure 12

Web-based dashboard used to monitor the status of cluster servers

Figure 11 shows a screenshot of the web page used to manage transcoding tasks. Users and administrators can use this page to upload original content, select transcoding options (e.g., resolution, format, and codec), and stream the content to other users. Users can also monitor the progress of ongoing transcoding tasks.

Figure 12 shows a screenshot of the web page used to monitor the status of cluster servers. Users and administrators can use this page to monitor HDFS storage usage in each cluster and the streaming server usage (CPU, RAM, and network traffic) of HadoopDMS in real time.

5 Performance evaluation in a private cloud computing environment

5.1 Experimental environment description

We designed a CloudDMSS architecture, which can be deployed in a private cloud environment. We deployed 28 nodes using a local testbed, i.e., a physical cluster that comprised one management server, one database server, three streaming servers, 13 transcoding servers running on the transcoding Hadoop cluster, and 10 content storage servers running on the streaming Hadoop cluster. Section 4.1 describes the cluster configuration and software specifications in more detail. We conducted three different performance tests to validate the performance of the proposed CloudDMSS. First, to verify the transcoding performance of the transcoding Hadoop cluster, we measured the total transcoding time required to transcode large video files into target files. We compared our Hadoop-based transcoding approach with a traditional parallel processing approach: the Media Encoding Cluster [38]. Second, to validate the performance of the SRC algorithm, we compared our system with RR- and LC-based systems in terms of the network transmission rate based on the three streaming servers. Third, we measured the total execution time required for each task in the streaming service deployment process from uploading the original content files to service deployment, as reflected in our Web interface. We divided the streaming service deployment process into 10 steps. Finally, to demonstrate the reliability and robustness of CloudDMSS, we performed a set of experiments to evaluate the behavior of the transcoding process when data node and TaskTracker (Map task) failures occurred.

5.2 Media transcoding performance

In the first experiment, we measured the total transcoding time required to complete the transcoding of a video dataset into a specific target format. We present the results of several experiments conducted using our transcoding Hadoop cluster with 13 computational nodes (one master node and 12 data nodes) and describe the optimal Hadoop options for our system configuration. The parameters for each original and target transcoded video file are listed in Table 1. In the transcoding test, we used six types of video datasets, including several 200 MB video files, which are listed in Table 2.

Table 1 Parameters for each original and transcoded video file
Table 2 Video datasets used in the performance evaluations

The following default Hadoop options were used in the transcoding experiment. (1) The number of block replications was set to 3. (2) The block size was set to 64 MB. To verify the efficiency of our system, we conducted three sets of experiments to test: (1) the effects of changes in cluster size on the performance speedup; (2) the effects of different Hadoop options with various block sizes, i.e., 32, 64, 128, 256, and 512; and (3) the effects of different Hadoop options with various block replication factors, i.e., 1, 2, 3, 4, and 5.

The objective of the first set of experiments was to measure the total transcoding time and speedup with various cluster sizes, i.e., 1, 2, 4, 8, 10, and 12 data nodes, using the Hadoop default options. The speedup (SU) is used to evaluate the effects of parallelism and it is defined as: SU (n) \( =\)  transcoding time on 1 node/transcoding time on n nodes.

Table 3 shows the transcoding time and speedup results with various cluster sizes. Figures 13 and 14 also show the transcoding time as functions of the cluster size and cluster. According to Table 3, our system obtained excellent transcoding times with very large video files. For example, with 12 data nodes, our system required approximately 2,624 s (ca 44 min) to complete the transcoding process for a 20 GB video dataset with the default Hadoop options.

Fig. 13
figure 13

Transcoding time versus the cluster size

Fig. 14
figure 14

Speedup versus the cluster size

Table 3 Total transcoding time (s) and speedup (SU (n)) with various cluster sizes

The SU results indicate the following: (1) our system delivered excellent performance in terms of its parallel and distributed characteristic; (2) the SU performance was better with the 8, 10, and 20 GB datasets compared with the 1, 2, and 4 GB datasets, which suggests that the performance of our system increases with the size of the dataset.

In the second set of experiments, we measured the total time elapsed using different Hadoop options for the block size (default: 64 MB) and block replication factor (default: 3). Hadoop processes large portions of datasets in a parallel and distributed manner after the datasets are split into block sizes of 64 MB. However, users and programmers can change the block size options to improve the data processing performance, depending on the size and type of unstructured data. Furthermore, when large portions of datasets are stored in HDFS, HDFS splits the dataset into fixed size blocks to facilitate rapid searching and processing. With the default Hadoop option for block replication, the replicated data is stored on three data nodes of HDFS to rebalance, move copies around, and continue data replication if system errors occur, such as disk failures or network connection problems. Thus, to verify whether block replication and size factors affected the performance, we measured the time required to complete the media transcoding processing. Five block size options were used in the experiments, i.e., 32, 64, 128, 256, and 512 MB. Five block replication factor values were used, i.e., 1, 2, 3, 4 and 5. Table 4 lists the transcoding times (in s) required with different block sizes and block replication factor values.

Table 4 Total transcoding time required for various block sizes and block replications (s)

Table 4 and Figs. 15, 16 show clearly that the performance of our system was best when the block size was set to 256 and 512 MB, or when the block replication factor was set to three. Thus, it can be concluded that the block size option should be set to a value greater than or close to the original file size to ensure the best distributed video transcoding performance in our system. One video dataset had a file size of 200 MB, thus 256 or 512 MB block sizes provided better performance during transcoding processing. The performance of the transcoding process increased when the block size reached 512 MB because there was a correlation between the file size and block size. HadoopDMT split the input file into blocks of the configured size when the file size was greater than the block size. If the file size was equal to or less than the block size, a block was generated that was equal to the file size. Seven, four, and two blocks were created when the block size was set to 32, 64, and 128 MB, respectively. One file size block was generated when the block size was set to 256 and 512 MB. The number of tasks was determined by the number of divided blocks. The total transcoding time increased if the number of blocks increased because the scheduling time required to process the tasks generated was higher. In addition, the performance was degraded because of the increased delay time required to merge transcoded blocks after transcoding each block. Furthermore, the results showed that the block replication factor should be set to three, which provided the best performance and allowed the distributed systems to process massive media files in a reliable and robust manner, in terms of the recovery from system failure when data loss occurred and when a node failed.

Fig. 15
figure 15

Total transcoding times (in s) with various block size factors

Fig. 16
figure 16

Total transcoding times (in s) with various block replication factors

In the third set of experiments, we compared the performance of our Hadoop-based transcoding system with that of the Media Encoding Cluster [38], which is a traditional frame-based parallel transcoding approach. The Media Encoding Cluster is written in C and C++, and it was the first open-source project for frame-based encoding in a distributed and parallel manner that used commercial hardware to reduce the encoding time for a file. To encode original media files into target files, our Hadoop-based transcoding approach splits media files into fixed blocks whereas the Media Encoding Cluster splits media files into frame units.

We tested and compared both approaches using the same video datasets in the same cluster environment. First, we tested the Hadoop-based transcoding approach with 13 nodes. Second, we tested the Media Encoding Cluster approach by configuring the method in our same cluster environment (13 nodes) after shutting down the system configuration of the Hadoop-based transcoding approach. Table 5 lists the total transcoding time results obtained using the two versions of video transcoding. According to Table 5, the Hadoop-based transcoding approach delivered better performance than the Media Encoding Cluster in terms of the execution time for all datasets. The Media Encoding Cluster delivered lower performance than our module because it incurred high overheads due to the splitting and merging steps applied to the original media files. Our approach split the original video files into 64 MB blocks, which were merged after the transcoding process in MbTD, whereas the Media Encoding Cluster split the original video files into a significantly larger number of frame units compared with the Hadoop-based transcoding module’s blocks and it merged the chunked frames into target video files. For a 1 GB dataset (200 MB, 29 frames, 3 min 19 s), our module created 20 chunked blocks of 64 MB, whereas the Media Encoding Cluster produced approximately 29,000 chunked frames.

Table 5 Total transcoding times (in s) for both transcoding tasks

5.3 Streaming job distribution performance experiment

We proposed a SRC algorithm to balance and distribute the streaming tasks of streaming servers. We evaluated the performance using 10 content storage servers running on the streaming Hadoop cluster, three streaming servers running on NginX, and one management server. Each streaming server had a bandwidth of 100 Mbps. The management server for the Web interface and the SRC algorithm were implemented with JSP in a Tomcat7 environment, and the streaming performance testing tool used to calculate the average transmission rate per streaming server was developed in Java. A dataset was used that included 4 MB MP4 files transcoded from our transcoding task. In theperformance evaluation, we simulated 600 virtual users that accessed three streaming systems by applying three algorithms: RR, LC, and SRC. We calculated the overall transmission rate of each streaming server and the average transmission rate per second. Table 6 shows the performance results. The SRC algorithm delivered the best performance in terms of the average transmission rate compared with both the RR and LC algorithms.

Table 6 Comparison of the average transmission rate per streaming server using each algorithm-based system

5.4 Streaming service deployment with workflow experiment

We defined and described a workflow based on sequential tasks for the streaming service deployment process. To measure the total time required for the deployment process, we divided the workflow of a practical streaming service into 10 steps, from uploading to the deployment of the streaming content on the Web interface. Three datasets were used in this experiment: 1G, 2G, and 4G. Table 7 shows the time required for each task in the deployment process. The total times required for 1G, 2G, and 4G in the deployment process were 288.58 s (approximately 5 min), 546.67 s (9 min), and 997.19 s (16 min), respectively. According to Table 7, the transcoding task required the most time to execute, followed by the uploading task. However, even if the size of the dataset increased, there were no significant differences in the execution times of the streaming tasks. Thus, we can reduce the total execution time for the deployment process by improving the performance of the uploading and transcoding tasks.

Table 7 Time required for each task in the streaming service deployment process

5.5 Evaluation of the robustness of CloudDMSS

In this study, we proposed a robust Hadoop-based multimedia streaming service architecture that handles node failures (Sect. 3.2.2) and task failures (Sect. 3.2.1). To demonstrate the reliability and robustness of CloudDMSS, we performed a set of experiments to evaluate the behavior of transcoding process when data node and job (Map task) failures occurred. We experimentally verified the robustness of CloudDMSS in our local testbed (13 nodes) and we used two datasets (10 and 20  GB) with the default Hadoop options described in Sects. 5.1 and 5.2.

First, we measured the lost computing time by artificially injecting data node failures while the normal transcoding process was being performed. The lost computing time was calculated by subtracting the transcoding time with node failures from the normal transcoding time when the number of node failures was zero. If a data node failure occurred, the name node in CloudDMSS detected the situation and performed automatic recovery, including system reconfiguration and block relocation processes using Algorithms 1 and 2. Table 8 shows the lost computing time caused by data node failures. The normal times required to transcode 10 and 20 GB datasets were 1,311 and 2,544 s. When a data node failure was generated among 13 nodes, the losses were 48 of 600 blocks with the 10 GB dataset and 94 of 1,200 blocks with the 20 GB dataset, including replicas. Thus, the lost computing times with the 10 and 20 GB datasets were calculated as 149 and 336 s, respectively, with one data node failure. With four data node failures, i.e., one-third of the 13 nodes, the recovery times for the 10 and 20  GB datasets using CloudDMSS were calculated as approximately 10 and 17 min. Based on this evaluation of the data node failures, we experimentally verified that our system delivered excellent performance and it was robust to node failures.

Table 8 Lost computing time caused by data node failures in the local testbed

Second, we measured the lost computing time after artificially injecting TaskTracker failures. Our system performed a MapReduce-based distributed transcoding process, which was scheduled by JobTracker and TaskTracker, as described in Sect. 3.2.1. When a 10 GB dataset that included \(50 \times 200\) MB files was transcoded using the default block size option (64 MB), 200 Map tasks were generated. After determining the number of tasks, JobTracker assigned the Map tasks to TaskTrackers and each TaskTracker performed its assigned tasks. If TaskTracker failed, CloudDMSS compensated for the task failures by rescheduling and reassigning the remaining tasks, including the failed tasks. Table 9 shows the lost computing time caused by TaskTracker failures in the local testbed. According to Table 9, when four TaskTrackers were generated, the lost computing times with the 10 and 20 GB datasets were 291 and 439 s, respectively. Thus, the recovery times for task failures with the 10 and 20 GB datasets using our system were approximately 5 min and 7 min, respectively, in our local testbed. According to the performance evaluation based on TaskTracker failures, our system was robust to task failures.

Table 9 Amount of lost computing time caused by job (TaskTracker) failures in the local testbed

6 Performance evaluation in a public cloud computing environment

6.1 Experimental environment description

We conducted a performance evaluation using our local testbed environment in a LAN environment. Our cloud computing environment did not consider the unpredictable and unstable network traffic generated by commercial and public cloud computing environments, such as Amazon EC2 and Rackspace. Thus, to demonstrate the validity of CloudDMSS in commercial and public cloud computing environments over WAN, we performed several performance evaluations in an actual cloud environment, Cloudit 2.0 [39], which is operated by Innogrid. Table 10 compares the overall system configurations of CloudDMSS in Cloudit 2.0 and that of our local testbed.

Table 10 Comparison of the overall system configurations in Cloudit 2.0 and the local testbed

We deployed 50 virtual machines (50 VMs) provided by Cloudit 2.0, i.e., one management server, one database server, 10 streaming servers, 28 transcoding servers, and 10 content storage servers. Each VM comprised Linux OS (Ubuntu 12.04 LTS) running on four virtual cores with the equivalent of Intel Xeon quad-core 2.13 GHz processors, 8 GB of memory, and 100 GB of disk space on a shared hard drive. Cloudit 2.0 utilized Xen [40] virtualization software. The other environment, including the software specification (refer to Sect. 4) and the algorithms, was the same as the local testbed environment. We also used seven types of datasets, i.e., the six datasets listed in Table 2 and an additional 50 GB dataset.

6.2 Performance evaluations

The first set of experiments was similar to the performance evaluation conducted in the first part of Sect. 5.2. We measured the total transcoding time and SU with various cluster sizes, i.e., 1, 4, 8, 12, 16, 20, 24, and 28 data nodes, using the default Hadoop options. Table 11 and Fig. 17 show the total transcoding times with various cluster sizes. The SU results indicated the following. First, although the SU results were slightly different depending on the size of the dataset, our system delivered good performance in terms of its parallel and distributed characteristics up to 12 nodes. However, the performance improved only slightly 16 nodes to 28 nodes. For example, the results with 4, 8, and 12 nodes and the 20 GB dataset showed that the performance improved by approximately 4, 8, and 9 times compared with one node. By contrast, the results with 16, 20, 24, and 28 nodes improved only slightly to about 9.8, 10.6, 11.8, and 12.4 times, respectively. Second, our system had a higher capacity for effective distributed processing as the size of the dataset increased. There were performance improvements of about 57 % for 4 GB, 78 % for 10 GB, and 98 % for 50 GB compared with the transcoding of 1 GB using 28 nodes. Finally, although commercial cloud computing services run on unpredictable and unstable networks with VM I/O traffic, our CloudDMSS provided a stable and effective distributed transcoding service in an actual cloud computing environment based on all the performance evaluations.

Table 11 Total transcoding time (s) and speedup with various cluster size
Fig. 17
figure 17

Speedup versus cluster size

In the second experiment, we experimentally verified the performance of streaming job distribution using the same software specifications and simulation conditions described in Sect. 5.3. The only difference from the previous evaluation was the system configuration. To demonstrate the scalability of our system, we tested the performance using a streaming Hadoop cluster that comprised 10 streaming servers and 10 content storage servers. Each streaming server had a bandwidth of 100 Mbps. We calculated the overall transmission rate for each streaming server and the average transmission rate per second with a bandwidth of 100 Mbps. Table 12 shows that the performance levels were similar to the results in Table 6. Our SRC algorithm delivered better performance in terms of the average transmission rate compared with the RR and LC algorithms. The total transmission rate per second with the three streaming servers on the local testbed was about 10 MB while the rate per second in the 10 servers of Cloudit 2.0 was about 20 MB. Thus, our CloudDMSS effectively distributed the streaming jobs requested by numerous users in an actual cloud computing environment with a similar performance to that on the local testbed.

Table 12 Comparison of the average transmission rates of each streaming server using each algorithm-based system

In the final experiment, we measured the total time required for the workflow of the sequential tasks, as defined and described in Sect. 3.3. We also performed an experiment using three datasets, i.e., 1G, 2G, and 4G. The overall system configuration was the same as that of the local testbed, except for the transcoding servers. To facilitate a comparison with the results obtained in the same environment, as described in Sect. 5.4, we deployed 13 VMs as transcoding servers among 28 VMs running on the transcoding Hadoop cluster in CloudDMSS. According to Table 13, the total times required by 1G, 2G, and 4G for the overall deployment process were 357.93 s (approximately 6 min), 590.23 s (10 min), and 1,022.14 s (17 min), respectively. These results were very similar to those reported in Sect. 5.4, which shows that our CloudDMSS performs well in public cloud and actual cloud environments. The times required to perform the deployment process for the three datasets differed by approximately 1 min, respectively, compared with the results obtained using the local test bed. These differences in the execution time were attributable to the delay times caused by the upload and transcoding tasks in the overall process. The upload task required more time than that on the local testbed because there was an unexpected network bottleneck in the real public cloud environment. Furthermore, the transcoding task required more time in the real cloud environment than the local testbed because of an I/O virtualization bottleneck due to the sharing of physical resources by many users.

Table 13 Time required for each task during the streaming service deployment process

The three different experiments conducted using Cloudit 2.0 demonstrate that our CloudDMSS performed well in terms of the transcoding process, the average transmission rate, and the deployment process for the streaming service in the local testbed environment and in the commercial and public cloud computing environment over WAN with unpredictable and unstable network traffic.

7 Conclusion and future work

In this study, we focused on designing a robust cloud-based distributed multimedia streaming service with transcoding, which we call CloudDMSS. CloudDMSS is based on Hadoop clustering and it provides a seamless streaming service with suitable QoS over the current Internet. Our system was developed to perform the workflow of the streaming service deployment process automatically, including uploading, transcoding, migration, thumbnail extraction, content registration, and streaming job distribution, after a single click of a content upload button on the provided Web interface. In this study, we describe the design of CloudDMSS and its workflow, as well as four important algorithms for content replication, system recovery on HadoopDMS, management for CMM, and SRC for streaming job distribution. We determined a suitable hardware configuration for the current cluster environment based on a dual Hadoop cluster and we report on the implementation of our system.

To validate the performance of the proposed CloudDMSS with the job distribution scheme using the SRC algorithm, we conducted four different tests on a local testbed to evaluate the transcoding task, streaming job distribution conducted by SRC, streaming service deployment, and the robustness to data node and task failures. We demonstrated the excellent performance of our system and identified the optimal Hadoop options for media transcoding. When the block size option was set to a value greater than or close to the original file size and the block replication factor value was set to 3, our system delivered good performance for media transcoding processes. We also confirmed that the SRC algorithm had a better average transmission rate than the RR and LC algorithms for the streaming job distribution task. Finally, the streaming service deployment experiment showed that the total execution time required for the deployment process can be decreased by reducing the uploading and transcoding execution times. To demonstrate the validity and scalability of our system in a commercial cloud computing environment, we also conducted several experiments in Cloudit 2.0, which were similar to the experiments conducted on the local testbed. These experiments verified that CloudDMSS performed well in transcoding, streaming job distribution using SRC, and streaming service deployment.

In future research, we plan to improve the transcoding performance by developing improved content splitting and merging algorithms on the Hadoop cluster. In addition, we plan to implement our system on commercial cloud services such as Amazon EC2, Rackspace, and KT ucloud.