1 Introduction

Interactions between people in social media, telephone calls between phone users, collaboration between scientists and several other social interactions have significant correlation with the behavior of people. Decisions taken by people are influenced by their social interactions. A network is a natural way of modeling these social interactions. The social actors are modeled as nodes and the dyadic relationships between them are modeled as edges. Analysis on these social networks have helped us in understanding several social phenomena (Carrington et al 2005) such as information spread/epidemic spread through the network (Lerman & Ghosh 2010), stability of the network and formation of new links (Linyuan & Tao 2011). It has also improved personalized recommendations and personalized web searches.

Finding important nodes/edges in the network is one of the chief challenges addressed by social network analysis. The measure of importance of nodes/edges, known as centrality (Bonacich 1987), varies depending on the context. Generally, a centrality measure which is optimal in one of the context will be sub-optimal in a different context. The popular centrality measures are degree centrality, closeness centrality, betweenness centrality and page rank centrality. Degree centrality of a node is the number of links incident upon that node. It is useful in the context of finding the single node which gets affected by the diffusion of any information in the network. It follows from the fact that the node with high degree centrality has the chance of getting affected from many number of sources. Closeness centrality of a node is the sum of the inverse of the shortest distance from the node to every other node in the network. In applications such as package delivery, a node with high closeness centrality can be considered as the central point. Betweenness centrality of an edge is the fraction of shortest paths between all pairs of nodes in the network that passes through this edge. A node with high betweenness centrality will typically be the one which acts as a bridge between many pairs of nodes. Page rank centrality of a node depends on the number and the quality of the neighbors who have links to the node. One of the popular applications of page rank centrality is finding the relevant page from the web.

Information cascade is another important context of measurement of centrality. The behavior of the neighbors influences the behavior of an individual to a larger extent. It is useful in several application such as viral marketing where the initial set of influencers determine the success (Bass 1969; Brown & Reingen 1987; Domingos & Richardson 2001; Richardson & Domingos 2002), identification of critical nodes in a power systems where the failure of a critical node may cause a cascading failure leading to failure of the entire network (Asavathiratham et al 2001), in modeling the epidemic spread and in finding the critical nodes in computer networking. The traditional centrality measures do not consider the collective importance of nodes and hence fail to identify optimal nodes in the context of information diffusion. A greedy algorithm was introduced and was proven to model the context of information cascade better than the traditional centrality measures (Kempe et al 2003). But, greedy algorithm is of exponential complexity and hence it cannot be applied on large networks. Game theoretic centrality algorithms model the importance of nodes when combined with other nodes in the network (Narayanam & Narahari 2008, 2011). This makes it suitable to be applied in the context of information diffusion. Polynomial time algorithms to compute game theoretic centrality were introduced by Tomasz et al (2013).

Huge volume of data is being generated by online social media, online businesses, scientific research, etc. (McKinsey Global Institute 2011). As of 2013, Netflix was having 3.14 PB of videos and Facebook was having 240 billion photos. The capacity of the world to exchange information was 281 PB in 1986, 471 PB in 1993, 2200 PB in 2000 and 65000 PB in 2007 (Hilbert & Löpez 2011). Further, the availability of easy tracking mechanisms of these interactions has led to the development of huge networks. With the advent of technologies, we are able to store these very large amount of data. The network size have grown by leaps and bounds that even small order polynomial time algorithms need to be parallelized to have a better scaling behavior. In this article, we introduce the techniques to parallelize the game theoretic centrality algorithms.

There are many parallelization frameworks available to parallelize graph algorithms. We chose map-reduce technique to parallelize our algorithms. As of 2013, more than half of the fortune 50 companies use map-reduce. Facebook uses map-reduce cluster to process 100 PB of data which is the largest in the world by 2013. It is also available commercially in cloud under the names Elastic map-reduce and Azure HDInsight offered by Amazon and Microsoft, respectively. Apart from being widely used, it has other advantages such as easy to use, fault tolerance, high scalability, etc.

Rest of the paper is organized as follows. In section 2, we give a brief overview of map-reduce programming model and hadoop. In section 3, we present the challenges in the parallelization using hadoop. In section 4, we present the five game-theoretic centrality algorithms proposed in Tomasz et al (2013) and the techniques to parallelize these algorithms in hadoop. In section 5, we evaluate the performance of our parallelized algorithms using the synthetic and real world data sets. Conclusions are provided in section 6.

2 Map-reduce and Hadoop

Map-reduce (Dean & Ghemawat 2004) is a parallel programming model for data intensive applications. Every map-reduce task starts with a map phase. The input and output to this phase are key-value pairs. One key-value pair is read at a time and one or many key-value pairs are written in the output.

$$\left( Key1, Value1 \right) \rightarrow \left( list(Key2,Value2) \right) $$

An optional combiner can be used at the end of the map phase to reduce the size of the output of the map phase. This reduces the amount of data to be transferred over the network which otherwise would be a bottleneck. This phase is followed by the sort and shuffle phase. The output of the map (or combiner) is sorted and are sent to the respective reducers based on a partition algorithm. The default partition algorithm uses the hash value of the key to make sure that same key goes to a single reducer. It can also be overridden by a custom partitioning algorithm. Once the sort and shuffle phase ends, the reduce phase begins. The input to this phase is the key and the list of values corresponding to that key. For each key, one or many key-value pairs are written into the output by the reducer.

$$\left( Key2, list(Value2) \right) \rightarrow \left( list(Key3,Value3) \right) $$

Hadoop (Apache Software Foundation 2011) is an open source implementation of the map-reduce programming model. It follows a master–slave architecture where the master (Job Tracker) takes up a job, assigns a part of the job (task) to each of the slaves (Task Tracker) and tracks the progress of the job from the reports of the slaves to the master. Generally, slaves are the data nodes wherein the input data is stored and processed. But, master can also act as a data node and do computation. The input file is broken into several chunks of equal size (except the last chunk) and are distributed among the data nodes in the Hadoop distributed file system (HDFS) (Borthakur 2007). Each block or a chunk is replicated and are stored in many machines providing fault tolerance. Every machine can run any number of map or reduce tasks at a time which is configurable (figure 1). This framework is used to parallelize the five centrality algorithms proposed in Tomasz Michalak et al (2013) to run them on big data.

Figure 1
figure 1

Map-reduce model with 3 mappers and 2 reducers.

2.1 Challenges in parallelization

The challenges in parallelization of graph algorithms included representation, programming perspective and optimization of running time.Representation: In general, large graphs are represented in the edgelist format. In this format, each line of the input file is an edge. For example, edge AB is represented by “A [space] B” where A and B are the nodes forming the edge.

Input format for the graph represented in figure 2 could be:

$$\begin{array}{@{}rcl@{}} 1 &\quad & 2 \\ 2 &\quad & 3 \\ 2 &\quad & 4 \\ 2 &\quad & 5 \\ 3 &\quad & 5 \end{array} $$
Figure 2
figure 2

An example network which is unweighted.

Input format for the graph represented in figure 3 could be:

$$\begin{array}{@{}rcl@{}} 1 &\quad & 2 \\ 1 &\quad & 2 \\ 2 &\quad & 3\\ 2 &\quad & 4 \\ 2 &\quad & 5 \\ 2 &\quad & 5 \\ 5 &\quad & 2 \\ 3 &\quad & 5 \\ 5 &\quad & 3 \end{array} $$

Programming perspective: There are two challenges from the programming perspective.

  1. (i).

    The input file is partitioned into equal chunks and are distributed across the machines in the cluster. So, the information related to a particular node may be stored partially in many machines. For example, edges “1 2” and “1 3” may be present in two different machines. In this case, we say that the information about node 1 is partially available in each of the two machines.

  2. (ii).

    The mapper reads the input chunk (or an input file) in a sequential manner such that it reads only one line at a time. In our case, since each line corresponds to an edge, we say that map processes one edge at a time. Essentially, map will have knowledge only about the edge which is getting processed at that time and will not have any information about the other edges in the network. Similarly, the reducer processes one key at a time and will have information about only that particular key and its values at a time.

Figure 3
figure 3

An example network which is weighted.

Optimization of running time: In the map-reduce setting, the optimization for the running time of an algorithm can be done only in two areas. One is to reduce the number of map-reduce phases. As the number of map-reduce phases reduce, amount of computation decreases and hence there will be a decrease in the running time of the algorithm. Another is to reduce the amount of intermediate data i.e., the output of map. This intermediate data have to be shuffled and sorted and have to be sent to the reducer through the network. If the intermediate data is large, it will become a bottleneck and the algorithm slows down.

3 Algorithms

In this section, we describe the five game theoretic centrality algorithms proposed in Tomasz et al (2013) and the techniques to parallelize them. The class of centrality measures proposed in Tomasz et al (2013) are defined using cooperative games on networks. The following definitions describe the general methodology used for characterizing these measures.

Definition: A cooperative game is characterized by a set of players N and a payoff function \(\nu :2^{N}\rightarrow \mathcal {R}\), that assigns a real value to any subset of players, or coalition.

Definition: The Shapley value of a node in a cooperative game is the expected marginal contribution of the node to any coalition. This is determined by computing the expected increase in the value of a randomly chosen coalition when this node is added to it.

The notion of Shapley value is one of the central concepts of cooperative game theory and is typically used to apportion payoffs to members of a coalition in a stable way. For a more formal definition of Shapley value, see Gibbons (1992). What is of importance in this context is that the Shapley value considers a node’s contribution in relation to groups of other nodes and does not look at a node in isolation. This allows us to define newer notions of centrality as explored in Tomasz et al (2013).

In the context of networks, the nodes of the network are considered as agents in a co- operative game that models the process of influence propagation. For different models of propagation, we get different definition of games. In particular (Tomasz et al 2013) introduce the notion of fringe of a coalition — the set of nodes influenced by a coalition at least to some extent is known as the fringe of the coalition. By changing the definition of the fringe we can model different influence propagation mechanisms. The Shapley value of a node now is the marginal contribution that a node makes to the fringe across all coalitions that the node is a part of. The Shapely value can then be used as a centrality measure in the context of information diffusion. It has been shown (Tomasz Michalak et al 2013; Narayanam & Narahari 2011, 2008) that Shapley value based centrality measures are equally effective if not better than classical notions of centrality for choosing most influential users under appropriate diffusion models. More importantly, these centrality measures are more efficient to compute than classical measures. For more details of the approach, refer to Tomasz et al (2013).

3.1 An example: Degree centrality

Degree centrality is the easiest of the centralities in terms of computation. The degree centrality of a node is defined as the degree of the node. Let the input file be in the edgelist format. The degree centrality can be calculated in one map-reduce phase as in algorithm 1.

figure a

The input file gets partitioned into several chunks and every machine in the cluster will have few chunks of this file. In the map phase, mapper is started in every machine and each mapper processes one chunk of the input file. Each chunk comprises a portion of the edgelist and all the chunks together form the edgelist of the network. The mapper processes one line at a time i.e., the map function gets called for every input line. In this case, since each line is an edge of the network, we can say that it processes one edge at a time. The map function tokenizes the input line into tokens. The tokens are the two nodes (say A and B) which had formed the edge. For every edge, two (key,value) pairs are written to the output. They are:

$$\begin{array}{@{}rcl@{}} A &\quad & 1 \\ B &\quad & 1 \end{array} $$

Once every mapper in all the machines has completed processing their chunk, the output of all the mappers go to a sorting and shuffle phase. In this phase, all the map outputs are sorted and a partitioner sends them to the reducer. The partitioner uses a hash function and makes sure that all the (key,value) pairs which have the same key goes to the same reducer. In reducer, the values corresponding to a single key are put together in a list. The reduce function gets called for every (key, value) pair. It aggregates the values in the list corresponding to each key. Each key here is a node in the network and the aggregated sum is the degree of that node. So, the output of the reducer is just the pair (node, degree). The output file from all the machines are concatenated which gives the degree centrality of all nodes in the network.

3.2 Game 1: Number of agents at-most 1 hop away

The first game theoretic centrality algorithm computes centrality by considering the fringe as the set of all nodes that are reached in at-most one hop. Algorithm 2 gives the steps to calculate this centrality. It has a running time of O(V+E).

figure b

From algorithm 2, we observe that in order to compute the centrality of a node, we need to find degree of the node and its neighbors. The intuition behind this algorithm is that the centrality of a node will be higher not just when the node has a high degree but also when the node has more neighbors who have low degree. We parallelize this algorithm using two map-reduce phases.

In the first map-reduce phase, we calculate degree of all the nodes and their neighbors in a parallel fashion as described in algorithm 3. The map-phase of this stage do not do any computation but modifies the input edge in such a way that degree of nodes can be calculated from the edgelist. For every edge in the input file, two lines are written in the output. Let “A B” be the line which is getting processed by the mapper. The first line of the output will have A as the key and B as the value. This indicates that B is a neighbor of A. Since the graph is undirected, A is a neighbor of B. So, the second line of the output will have B as the key and A as the value. The combiner is not needed in this phase since we are not interested in merging the values corresponding to the same key. We use the default partitioner which makes sure that all the (key, value) pairs with the same key goes to the same reducer.

The reducer in this stage receives nodes as keys and their neighbors as values. We calculate the degree of nodes by counting the number of their neighbors. Note that, if a weighted graph is given as input, duplicate representations of the same neighbor will be found. We find the duplicates using the hash value of the neighbors and ignore the duplicates while counting the number of neighbors.

figure c
figure d

Since, we are interested in the calculation of the marginal contributions of each node, which is \(\frac {1}{1+degree}\) in this case, we write the marginal contributions of the nodes in the output instead of their degrees. Every node needs its marginal contribution in order to calculate its own centrality. So, the reducer writes the node as the key and its marginal contribution as value. Also, all the neighbors need the marginal contribution of the node in order to calculate their centrality. So, the reducer writes every neighbor as key and marginal contribution of the node as value for all the neighbors.

In the second map-reduce phase, centrality values of all the nodes are calculated. The mapper in this phase does not do any computation. It reads the input from the file written by the reducer of first phase and tokenizes them into (key,value) pairs and writes them to the output. The reducer in this phase gets nodes as keys and the marginal contributions of all their neighbors as values. So, the reducer aggregates the values corresponding to the nodes which will give the centrality of the nodes. A combiner is used in this phase which does the same computation as the reducer.

3.3 Game 2: Number of agents with at least k neighbors in C

The second game theoretic centrality algorithm computes centrality by considering the fringe as the set of all nodes that are either in the coalition or that are adjacent to at least k nodes which are already in the coalition. Algorithm 5 gives the steps to calculate this centrality. It has a running time of O(V+E).

figure e

From algorithm 5, we observe that in order to compute the centrality of a node, we need to find the degree of the node and its neighbors. The intuition behind this algorithm is that every node will be influenced only when k of its neighbors are already influenced. This k is the threshold which varies from 1 to degree(node)+1. A threshold of degree(node)+1 for a node indicates that the node cannot be influenced even when all its neighbors are influenced. We parallelize this algorithm using two map-reduce phases.

The first map-reduce phase of this algorithm is given in algorithm 6. The only difference between the first phase of game 1 and game 2 algorithms is the way in which the marginal contributions are calculated. So, the mapper does the same job as algorithm 3. In the reduce phase, marginal contributions for a node is calculated by the formula \(\frac {k}{1+degree}\) and the marginal contribution of its neighbors are calculated by the formula \(\frac {degree-k+1}{degree(1+degree)}\).

figure f

The second map-reduce phase of this algorithm is essentially the same as that of the second phase of game 1 (algorithm 4) which aggregates the marginal contributions to obtain the centrality.

figure g

3.4 Game 3: Number of agents at-most d cutoff away

The third game theoretic centrality algorithm computes centrality by considering the fringe as the set of all nodes that are within the distance of d cutoff from the node. Algorithm 7 gives the steps to calculate this centrality. It has a running time of O(V E+V 2 l o g(v)).

figure h

From algorithm 7, we observe that in order to compute the centrality of a node, we need to find the extDegree and extNeighbors of every node. This algorithm is an extension of game 1 to the weighted networks. The intuition behind this algorithm is that a node can be influenced by an influencer only when the distance between the node and the influencer is not more than d cutoff. This d cutoff is generally fixed to a constant value and we have fixed it as 2 in our parallelization. For an unweighted graph, all the nodes which are one and two hops away will form the extNeighbors but for a weighted graph, it depends on the weights of the edges of the graph. We parallelize this algorithm using four map-reduce phases.

The first map-reduce phase of this algorithm is given in algorithm 8. The map phase of this algorithm does the same job as the map phase of the first map-reduce phase of game 1 and game 2 algorithms. The reducer in this phase gets nodes as keys and list of their neighbors as values. These neighbors are one hop neighbors i.e., they are connected to the node by a single edge. The number of times a neighbor appears in the list is the weight of the edge between the node and neighbor. So, the number of occurrences of each of the neighbors is calculated and is stored in map named neighbor2weight. The key to this map is the neighbor and the value to this map will be the weight of the edge between the node and the neighbor. We create another map named weight2neighbors using this map where key is weight of the edge between node and neighbor and value is the list of neighbors who are connected to the node with that weight.Footnote 1

figure i
figure j

Two hop neighbors are the neighbors who are reachable in at most two hops. Let A and B be two nodes which are two hops away i.e., A and B are not connected directly but connected through another node. Let the node through which A and B are connected be C. Now, A and B are one hop neighbors of C and similarly all the pair of nodes which are two hops away will have a common neighbor from which both of them will be one hop away. So, each node in the list of neighbors (input to the reducer of this stage) are two hops away from every other node in the same list of neighbors. Also, some of the nodes which are two hops away might as well be connected by a single edge. In this case, the shortest distance between them is 1 and not 2. So, whenever a reducer encounters a neighbor having two different values as weight for a same edge, it always has to choose the least value. Also, the edges may have weights. So, the reducer also has to check whether the sum of weights of edges is less than the d cutoff. The reducer in this phase essentially finds the neighbors which are one hop more than what it has received as input. In the current map-reduce phase, reducer has received the neighbors which are one hops away and has found the neighbors which are two hops away. This can be extended further for higher hops in the same way.

The reducer constructs a neighborString which contains neighbors that one more hop away and whose path length is less than or equal to the d cutoff. So, for every neighbor in the list of neighbors, the reducer writes the neighbor as key and a neighborString as value depending on the weight of the neighbor to the node (input to the reducer of this phase).

The second map-reduce phase of this algorithm makes the neighborhood grow by one more hop. The mapper in this phase is given in algorithm 9. It reads the output of the previous phase and breaks the line into (key,value) pairs and sends it to the reducer. The reducer in this phase is essentially the same as that of the previous map-reduce phase. This map-reduce phase is run iteratively until all the neighbors which are within d cutoff away are visited.

The third map-reduce phase of this algorithm is given in algorithm 10. The mapper in this phase reads the input from the file and splits them into (key, value) pairs. The reducer in this stage receives nodes as keys and their extNeighbors as values. We calculate the extDegree of nodes by removing the duplicates and then counting the number of extNeighbors. Once the extDegree is calculated, marginal contribution is calculated by the formula \(\frac {1}{1+extDegree}\). The reducer writes the marginal contributions to the output similar to game 1.

The fourth and the final map-reduce phase of this algorithm is essentially the same as that of the second phase of game 1 (algorithm 4). The mapper in this phase does not do any computation. The reducer in this phase gets nodes as keys and the marginal contributions of all their neighbors as values. It aggregates the marginal contributions of all the neighbors of a node to obtain the centrality of the node.

We note that game 3 algorithm can be applied on weighted networks also. The sum of weights on the path between node u and v is calculated as D i s t a n c e(u) from v which will be compared with d cutoff to find whether node u can be included in the extended neighborhood of node v or not. In the unweighted networks, weights on the edges are assumed to be 1.

3.5 Game 4: Number of agents in the network

The fourth game theoretic centrality algorithm computes centrality by considering the fringe as the set of all nodes in the network. Algorithm 11 gives the steps to calculate this centrality. It has a running time of O(V E+V 2 l o g(v)).

figure k

From algorithm 11, we observe that in order to compute the centrality of a node, we need to find the extNeighbors and the distance to the extNeighbors for every node. This algorithm is an extension of game 3. The intuition behind this algorithm is that the power of influence decreases as the distance increases. Generally, the extNeighbors are all the nodes in the network for this game. But the contribution of neighbors who are farther away is not highly significant and so in our parallelization, we have fixed the d cutoff to be 2. We parallelize this algorithm using four map-reduce phases.

The first and second phase of this algorithm is essentially the same as game 3 (algorithm 8 and 9). In these phases, neighbors of each of the nodes are found. Each map-reduce phase extends the neighborhood by one hop. This process is repeated iteratively until there is no more unvisited neighbors within the d cutoff.

The third map-reduce phase of this algorithm is given in algorithm 12. The mapper in this phase reads the input from the file and splits them into (key, value) pairs such that every node in the graph is the key and the list of neighbors and weights as values. The weight here represents the distance from the node to the neighbor. Once the neighbors and weights are known, the marginal contributions are calculated according to the lines 3 to 17 of algorithm 11. The reducer writes the nodes and the marginal contributions to the output.

The fourth and the final map-reduce phase of this algorithm is essentially the same as that of the second phase of game 1 (algorithm 4) which aggregates the marginal contributions to obtain the centrality.

figure l

3.6 Game 5: Number of agents with \(\sum \)(weights inside C) ≥ W cutoff(agent)

The fifth game theoretic centrality algorithm computes centrality by considering the fringe as the set of all nodes whose agent specific threshold is less than the sum of influences on the node by the nodes who are already in the coalition. Algorithm 13 gives the steps to calculate this centrality. It has a running time of O(V+E 2).

From algorithm 13, we observe that in order to compute the centrality of a node, we need to find the degree of the node and its neighbors. This algorithm is an extension of game 2 (algorithm 5) for weighted networks. The intuition behind this algorithm is that every node will be influenced only when the sum of weights to all the active neighbors is greater than the cut-off of the node. Let α v be the sum of weights of edges to all the neighbors of node v and β v be the sum of squares of weights of edges to all the neighbors of v. Then, the results of the analysis done in Tomasz et al (2013) for this algorithm are as follows:

$$\begin{array}{@{}rcl@{}} \mu(X^{\mathit{vv}}_{m}) &=& \frac{m}{deg(\mathit{v})} \alpha_{\mathit{v}}\\ \sigma(X^{\mathit{vv}}_{m}) &=& \frac{m(deg(\mathit{v})-m)}{deg(\mathit{v})(deg(\mathit{v})-1)} \left( \beta_{\mathit{v}} - \frac{{\alpha_{\mathit{v}}^{2}}}{deg(\mathit{v})}\right)\\ \mu(X^{u\mathit{v}}_{m}) &=& \frac{m}{deg(\mathit{v})-1} (\alpha_{\mathit{v}}-w(u,\mathit{v})) \end{array} $$
$$\begin{array}{@{}rcl@{}} \sigma(X^{u\mathit{v}}_{m}) &=& \frac{m(deg(\mathit{v})-1-m)}{(deg(\mathit{v})-1)(deg(\mathit{v})-2)} \left( \beta_{\mathit{v}} - w(u,\mathit{v})^{2} - \frac{(\alpha_{v}-\mathit{w}(u,\mathit{v}))^{2}}{deg(\mathit{v})-1} \right) \\ Z^{u\mathit{v}}_{m} &=& \frac{1}{2} \left[{}erf \left( \frac{W_{\text{cutoff}}(\mathit{v})-\mu(X^{u\mathit{v}}_{m})}{\sqrt{2}\sigma(X^{u\mathit{v}}_{m})} \right) - erf \left( \frac{W_{\text{cutoff}}(\mathit{v})- \mathit{w}(u,\mathit{v}) -\mu(X^{u\mathit{v}}_{m})}{\sqrt{2}\sigma(X^{u\mathit{v}}_{m})} \right){}\right] \end{array} $$
figure m

We parallelize this algorithm using two map-reduce phases.

The first map-reduce phase of this algorithm is given in algorithm 14. The only difference between the first phase of game 1 and game 5 algorithms is the way in which the marginal contributions are calculated. So, the mapper does the same job as algorithm 3. The input to the reducer is nodes and their neighbors. Since, weighted graphs will be the input for game 5, we take the count of the occurrences of neighbor which will give the weight of the edge between the node and the neighbor. Marginal contribution of a node to itself is calculated as per the lines 5–11 of algorithm 13 in lines 5–6 of the reduce phase of algorithm 14. Marginal contribution of a node to its neighbor is calculated as per the lines 12–21 of algorithm 13 in lines 7–10 of the reduce phase of algorithm 14. Once the marginal contributions are calculated, the reducer writes the nodes and the corresponding marginal contributions to the output. The second map-reduce phase of this algorithm is essentially the same as that of the second phase of game 1 (algorithm 4) which aggregates the marginal contributions to obtain the centrality.

figure n

4 Experimental results

The experiments in this section are designed to show (i) that the game-theoretic centrality measures perform on par with the greedy algorithm in the context of information diffusion. (ii) The scalability of the parallelized algorithms with respective to the size of the input graph. (iii) The scalability of the algorithms with respect to the number of machines in the hadoop cluster.

We have verified the results of the parallelized algorithms on several small synthetic networks generated using the E-R model and found them to be correct, in that they returned the analytically computed Shapley values for all the nodes.

4.1 Cascade experiment

Influence spread was explored in the literature in two different contexts. One is top k problem and the other is λ coverage problem. In the top k problem, a set of k nodes that will maximize the influence spread in the network has to be selected. In the λ coverage problem, the minimum value of k that is needed to obtain the desired influence spread has to be found. We have chosen the context of top k problem for our experiment. We did the cascade experiment to find the influence spread of the top k nodes given by five game theory based centrality algorithms and the greedy algorithm. We have used the linear threshold model to find the influence spread.

Influence model: In this model, every node can influence the neighbor nodes and every neighbor node can influence the node. Consider the edge (u,v) from node u to node v in a graph. Let d v be the sum of weights of incoming edges of node v. In an unweighted network, d v becomes degree of node v. Let C u,v be the weight of the edge from u to v. Then, node u has an influence of \(\frac {C_{u,\mathit {v}}}{d_{\mathit {v}}}\) on node v. Note that the sum of influences on any node is 1. A node can influence its neighbors only if it is influenced already. There will be initial set of influencers who are assumed to be influenced by external forces. This model assumes that every node in graph has a threshold associated with it. A node is said to be influenced if sum of influences on that node by its neighbors is greater than threshold of the node. Neighbors of the newly influenced node might get influenced based on the above criteria. The influence spread stops when there is no more node in the graph which could be influenced. The threshold of the nodes assumed by this model cannot be determined in the real world scenarios. So, a number of different sets of thresholds are sampled from an appropriate probability distribution and the influence spread is run for every set of threshold. The average of all these runs gives the number of nodes influenced. Typically, several different thresholds are sampled and run to cancel out the effect of threshold.

Greedy algorithm: Consider a network with n nodes. Greedy algorithm runs the cascade spread n times with every node as a initial influencer. The node which is able to influence most number of nodes in the network is considered to be the top 1 influencer. Then it considers the rest of n−1 nodes and runs the cascade experiment n-1 times with top 1 influencer and every one of the n-1 nodes as initial influencers. The node which when combined with the top 1 influencer influences most number of nodes is added to the set of top influencers and thus top 2 influencers are formed. This process is repeated k times to find the top k influencers of the network. The problem of finding the exact top k nodes is hard. Greedy algorithm is the best approximation for this problem (Kempe et al 2003).

We ran the cascade experiment in the collaboration network data. The network data can be obtained from http://snap.stanford.edu/data/ca-GrQc.html. This network is between collaborations of authors. Nodes in the network correspond to the authors. If an author u had collaborated with an author v in at least one paper, then an undirected edge is established between nodes u and v in the network. This network consists of 5242 nodes and 14496 edges. Figure 4 shows the plot of the number of nodes influenced against the number of initial influencers in this network. We observe that greedy algorithm performs well when the number of initial influencers is very less and the performance of game theoretic centrality algorithms increases as the number of initial influencers is increased. At k = 30, Game 4 and Game 5 perform marginally better than the greedy algorithm while Game 1 performs close to the greedy algorithm. Though Game 2 and Game 3 (dotted lines in figure 4) algorithms give a slightly lesser performance than greedy algorithm, the difference is not enormous. We also note that the greedy algorithm does not scale well in terms of size of the network size as well as the number of initial influencers. Detailed analysis of the game theoretic centrality algorithms is done by Tomasz et al (2013) and Narayanam & Narahari (2011).

Figure 4
figure 4

Result of cascade experiment in collaboration network.

4.2 Scalability experiments

We generated synthetic networks to analyse the scalability of our algorithms. The synthetic networks were generated using the Barabasi–Albert model and Erdos–Renyi model.

Barabasi–Albert model: This model starts with small seed graph. This seed graph needs to be connected i.e., there should be a path from every node in the graph to every other node in the graph, to make sure that the final graph will be connected. The average degree of nodes in the network can calculated by the formula,

$$\text{Average degree } = \frac{2*\text{ Number of edges in the graph}}{\text{Number of nodes in the graph}}. $$

Let x denote half of the average degree. The number of nodes in the seed graph should be a little higher than x. For every other node in the network, x nodes have to be chosen from the seed graph and all the x nodes should be connected to this new node. The seed graph grows with the addition of every node and once every node gets added to the seed graph, it becomes the final graph. Note that the probability with which the x nodes in the seed graph gets selected is proportional to the degree of the nodes of the seed graph i.e., higher the degree, more is the probability of getting selected. So, the resulting graph will have few nodes with high degree and many nodes with average degree.

Erdos–Renyi(E–R) model: This model generates network with the probability of existence of the edge. Depending on the density of the edges needed in the graph, the probability of existence of an edge can be calculated as follows.

$$\text{Probability of existence an edge, }P = \frac{2*\text{ Number of edges in the graph}}{\text{Number of possible edges in the graph}}. $$

Every edge in the graph is included with probability P and discarded otherwise. We observe that the graph will have no edges if P is zero and all the edges if P is 1.

Scalability with respect to input size: We ran our parallelized algorithms in a hadoop cluster having 10 machines with each machine having 8 cores. We have varied the size of the input graph and have calculated the running time of these parallelized algorithms. Tables 16 show the running time of parallelized version of the Game 1, Game 2, Game 3, Game 4 and Game 5 algorithms in seconds for different input sizes and densities of the graphs.

Table 1 Running time of game 1 on E-R graphs of different densities in seconds.
Table 2 Running time of game 2 on E-R graphs of different densities in seconds.
Table 3 Running time of game 3 on E-R graphs of different densities in seconds.
Table 4 Running time of game 4 on E-R graphs of different densities in seconds.
Table 5 Running time of game 5 on E-R graphs of different densities in seconds.
Table 6 Running time of all the game theoretic algorithms on Barabasi Albert graphs of density 0.1 in seconds.

Scalability with respect to number of CPUs in synthetic networks: We ran our parallelized algorithms in a hadoop cluster by varying the number of machines in the cluster for a fixed input size. We chose the number of edges in the input graph to be 1 million for running game 3 and game 4 while we chose it to be 10 million for game 1, game 2 and game 5. Tables 7 and 8 show the running time of the algorithms when the number of machines in the cluster is varied. Each machine in our hadoop cluster had an 8 core CPU. So, 64 mappers or reducers can be run at a time.

Table 7 Running time of game 1, game 2 and game 5 algorithms in seconds on a 10 million edge network.
Table 8 Running time of game 3 and game 4 algorithms in seconds on a 1 million edge network.

Scalability with respect to number of CPUs in real world graphs: We obtained the real world networks from Stanford large network data. The size of the network is given in table 9.

Table 9 Networks with ground-truth communities.

Tables 1012 show the results on the Amazon, DBLP and Youtube networks. Increasing the number of CPUs did not have much effect on the Amazon and DBLP network. On the other hand, a performance improvement is seen in the Youtube network for game 3 and game 4 algorithms when we use multiple CPUs instead of 1 CPU. In the Amazon and DBLP network, for game 1 and game 2, one CPU performs better than multiple CPUs. The reason is that when we force hadoop to all the CPUs, when they are not required, communication overhead becomes a bottleneck.

Table 10 Running time of all the game theoretic algorithms on Amazon network in seconds.
Table 11 Running time of all the game theoretic algorithms on DBLP network in seconds.
Table 12 Running time of all the game theoretic algorithms on Youtube network in seconds.

From the above experiments, we observe the following:

  • We can process the networks of million edges in few seconds using the parallelization techniques even if the algorithm has a quadratic time complexity (e.g., game 3 and game 4).

  • Game 1, game 2 and game 5 are highly scalable as these algorithm take only few seconds for running on network sizes which are as large as few millions of edges.

  • The running time of the algorithms decreases as the density of edges in the graph decreases. This is due to the fact that every node in the sparse graph will have less number of neighbors when compared to the nodes in the dense graphs.

  • Performance cannot be increased beyond a point by adding more number of CPUs for a given input size which can be inferred from the results of game 1 and game 2 algorithms on Amazon and DBLP networks.

  • The running time reduces as the number of machines in the cluster is increased which can be inferred from the results of game 3 and game 4 algorithms on the Youtube network.

5 Conclusions

In this work, we have presented parallel versions of several Shapley value based centrality algorithms and have shown their scaling behavior on networks with a few million edges. We were able to process most of these large networks in few seconds. We have also presented an elegant way to construct the adjacency list from the edge list representation of a graph. This allows us to easily handle large scale interaction data, where the data is generated one edge at a time and solves the basic problem of finding one hop neighbors in graph in a parallelized fashion. We achieved this scaling with only 64 cores. By employing more cores we can process large graphs of the orders of magnitude common in online social networks, without any modification in the algorithms proposed in this work. Currently we are conducting empirical studies on large real interaction network data in collaboration with a leading telecommunications company. In summary, this work has opened up the possibility of using complex information diffusion centrality models in large networks. This was not possible earlier due to the computational complexity and the inherent serial nature of the popular measures of centrality.