Synonyms

DAG scheduling; Workflow scheduling

Definition

Task Graph Scheduling is the activity that consists in mapping a task graph onto a target platform. The task graph represents the application: Nodes denote computational tasks, and edges model precedence constraints between tasks. For each task, an assignment (choose the processor that will execute the task) and a schedule (decide when to start the execution) are determined. The goal is to obtain an efficient execution of the application, which translates into optimizing some objective function, most usually the total execution time.

Discussion

Introduction

Task Graph Scheduling is the activity that consists in mapping a task graph onto a target platform. The task graph is given as input to the scheduler. Hence, scheduling algorithms are completely independent of models and methods used to derive task graphs. However, it is insightful to start with a discussion on how these task graphs are constructed.

Consider an application that is decomposed into a set of computational entities, called tasks. These tasks are linked by precedence constraints. For instance, if some task T produces some data that is used (read) by another tasks T′, then the execution of T′ cannot start before the completion of T. It is therefore natural to represent the application as a task graph: The task graph is a DAG (Directed Acyclic Graph), whose nodes are the tasks and whose edges are the precedence constraints between tasks.

The decomposition of the application into tasks is given to the scheduler as input. Note that the task graph may be directly provided by the user, but it can also be determined by some parallelizing compiler from the application program. Consider the following algorithm to solve the linear system Ax = b, where A is an \(n \times n\) nonsingular lower triangular matrix and b is a vector with n components:

$$\begin{array}{l} {\bf for}\,i = 1\,to\,n\,{\bf do} \\ \,\,\,\,{\mathop{\rm Task}\nolimits} \,T_{i,i} :x_i \leftarrow b_i /a_{i,i} \\ \,\,\,\,{\bf for}\,j = i + 1\,to\,N\,{\bf do} \\ \,\,\,\,\,\,\,\,\,{\mathop{\rm Task}\nolimits} \,T_{i,j} :b_i \leftarrow b_j - a_{i,i} \times x_i \\ \,\,\,\,{\bf end} \\ {\bf end} \\ \end{array}$$

For a given value of i, \(1 \leq i \leq n\), each task \({T}_{i,{_\ast}}\) represents some computations executed during the i-th iteration of the external loop. The computation of x i is performed first (task \({T}_{i,i}\)). Then, each component b j of vector b such that \(j> i\) is updated (task \({T}_{i,j}\)). In the original sequential program, there is a total precedence order between tasks. Write T < seq T′ if task T is executed before task T′ in the sequential code. Then:

$$\begin{array}{rcl} & & {T}_{1,1}\! {< }_{seq}{T}_{1,2}\! {< }_{seq}{T}_{1,3}\! {< }_{seq}\cdots \! {< }_{seq}{T}_{1,n}\! {< }_{seq} \\ & & {T}_{2,2}\! {< }_{seq}{T}_{2,3}\! {< }_{seq}\cdots \! {< }_{seq}{T}_{n,n}.\end{array}$$

However, there are independent tasks that can be executed in parallel. Intuitively, independent tasks are tasks whose execution orders can be interchanged without modifying the result of the program execution. A necessary condition for tasks to be independent is that they do not update the same variable. They can read the same value, but they cannot write into the same memory location (otherwise there would be a race condition and the result would be nondeterministic). For instance, tasks \({T}_{1,2}\) and \({T}_{1,3}\) both read \({x}_{1}\) but modify distinct components of b, hence they are independent.

This notion of independence can be expressed more formally. Each task T has an input set \({\rm In}(T)\) (read values) and an output set \({\rm Out}(T)\) (written values). In the example, \({\rm In}({T}_{i,i}) =\{ {b}_{i},{a}_{i,i}\}\) and \({\rm Out}({T}_{i,i}) =\{ {x}_{i}\}\). For j > i, \({\rm In}({T}_{i,j}) =\{ {b}_{j},{a}_{j,i},{x}_{i}\}\) and \({\rm Out}({T}_{i,j}) =\{ {b}_{j}\}\). Two tasks T and T′ are not independent (write \(T\perp T'\)) if they share some written variable:

$$T\perp T' \Leftrightarrow \left \{\begin{array}{lll} & {\rm In}(T) \cap {\rm Out}(T') &\neq \emptyset \\ \text{ or}&{\rm Out}(T) \cap {\rm In}(T') &\neq \emptyset \\ \text{ or}&{\rm Out}(T) \cap {\rm Out}(T')&\neq \emptyset \end{array} \right.$$

For instance, tasks \({T}_{1,1}\) and \({T}_{1,2}\) are not independent because \({\rm Out}({T}_{1,1}) \cap {\rm In}({T}_{1,2}) =\{ {x}_{1}\}\); therefore \({T}_{1,1}\perp {T}_{1,2}\). Similarly, \({\rm Out}({T}_{1,3}) \cap {\rm Out}({T}_{2,3}) =\{ {b}_{3}\}\), and hence \({T}_{1,3}\) and \({T}_{2,3}\) are not independent; hence \({T}_{1,3}\perp {T}_{2,3}\).

Given the dependence relation \(\perp \), a partial order \(\prec \) can be extracted from the total order \({< }_{seq}\) induced by the sequential execution of the program. If two tasks T and T′ are dependent, that is, \(T\perp T'\), they are ordered according to the sequential execution: \(T \prec T'\) if both \(T\perp T'\) and \(T {< }_{seq}T'\). The precedence relation \(\prec \) represents the dependences that must be satisfied to preserve the semantics of the original program; if \(T \prec T'\), then T was executed before T′ in the sequential code, and it has to be executed before T′ even if there are infinitely many resources, because T and T′ share a written variable. In terms of order relations, \(\prec \) is defined more accurately, as the transitive closure of the intersection of \(\perp \) and \({< }_{seq}\), and captures the intrinsic sequentiality of the original program. Note that transitive closure is needed to track dependence chains. In the example, \({T}_{2,4}\perp {T}_{4,4}\) and \({T}_{4,4}\perp {T}_{4,5}\), hence a path of dependences from \({T}_{2,4}\) to \({T}_{4,5}\), while \({T}_{2,4}\perp {T}_{4,5}\) does not hold.

A directed graph is drawn to represent the dependence constraints that need to be enforced. The vertices of the graph denote the tasks, while the edges express the dependence constraints. An edge \(e : T \rightarrow T'\) in the graph means that the execution of T′ must begin only after the end of the execution of T, whatever the number of available processors. Transitivity edges are not drawn, as they represent redundant information; only predecessor edges are shown. T is a predecessor of T′ if \(T \prec T'\) and if there is no task T′ in between, that is, such that \(T \prec T''\) and \(T'' \prec T'\). In the example, predecessor relationships are as follows (see Fig. 1):

  • \({T}_{i,i} \prec {T}_{i,j}\) for \(1 \leq i < j \leq n\) (the computation of x i must be done before updating b j at step i of the outer loop).

  • \({T}_{i,j} \prec {T}_{i+1,j}\) for \(1 \leq i < j \leq n\) (updating b j at step i of the outer loop is done before reading it at step i + 1).

Task Graph Scheduling. Fig. 1
figure 1_42figure 1_42

Task graph for the triangular system (n = 6)

In summary, this example shows how an application program can be decomposed into a task graph, either manually by the user, or with the help of a parallelizing compiler.

Fundamental Results

Traditional scheduling assumes that the target platform is a set of p identical processors, and that no communication cost is paid. Fundamental results are presented in this section, but only two proofs are provided, that of Theorem 1, an easy result on the efficiency of a schedule, and that of Theorem 4, Graham’s bound on list scheduling.

Definitions

Definition 1

A task graph is a directed acyclic vertex-weighted graph \(G = (V,E,w)\), where:

  • The set V of vertices represents the tasks ( note that V is finite ).

  • The set E of edges represents precedence constraints between tasks:e = (u, v) ∈ E if and only if u ≺ v.

  • The weight function \(w : V \rightarrow {\mathbb{N}}^{{_\ast}}\) gives the weight (or duration) of each task. Task weights are assumed to be positive integers.

For the triangular system (Fig. 1), it can be assumed that all tasks have equal weight: \(w({T}_{i,j})\ =\ 1\) for \(1 \leq i \leq j \leq n\). On a contrary, a division could be considered as more costly than a multiply-add, leading to a larger weight for diagonal tasks \({T}_{i,i}\).

A schedule \(\sigma \) of a task graph is a function that assigns a start time to each task.

Definition 2

A schedule of a task graph \(G = (V,E,w)\) is a function \(\sigma : V \rightarrow {\mathbb{N}}^{{_\ast}}\) such that \(\sigma (u) + w(u) \leq \sigma (v)\) whenever \(e = (u,v) \in E\).

In other words, a schedule must preserve the dependence constraints induced by the precedence relation \(\prec \) and embodied by the edges of the dependence graph; if \(u \prec v\), then the execution of u begins at time \(\sigma (u)\) and requires \(w(u)\) units of time, and the execution of v at time \(\sigma (v)\) must start after the end of the execution of u. Obviously, if there was a cycle in the task graph, no schedule could exist, hence the restriction to acyclic graphs (DAGs).

There are other constraints that must be met by schedules, namely, resource constraints. When there is an infinite number of processors (in fact, when there are as many processors as tasks), the problem is with unlimited processors, and denoted \(\text{ Pb}(\infty )\). When there is only a fixed number p of available processors, the problem is with limited processors, and denoted \(\text{ Pb}(p)\). In the latter case, an allocation function \(\text{ alloc} : V \rightarrow \mathcal{P}\) is required, where \(\mathcal{P} =\{ 1,\ldots, p\}\) denotes the set of available processors. This function assigns a target processor to each task. The resource constraints simply specify that no processor can be allocated more than one task at the same time. This translates into the following conditions:

$$\text{ alloc}(T) = \text{ alloc}(T') \Rightarrow \left \{\!\!\begin{array}{ll} &\sigma (T) + w(T) \leq \sigma (T')\\ \text{ or} &\sigma (T') + w(T') \leq \sigma (T). \end{array} \right.$$

This condition expresses the fact that if two tasks T and T′ are allocated to the same processor, then their executions cannot overlap in time.

Definition 3

Let \(G = (V,E,w)\) be a task graph.

  1. 1.

    Let \(\sigma \) be a schedule for G. Assume \(\sigma \) uses at most p processors ( let p = ∞ if the processors are unlimited ). The makespan \(\text{ MS}(\sigma, p)\) of \(\sigma \) is its total execution time:

    $$ MS(\sigma, p) {=\max }_{v\in V }\{\sigma (v) + w(v)\} {-\min }_{v\in V }\{\sigma (v)\}.$$
  2. 2.

    Pb(p) is the problem of determining a schedule \(\sigma \) of minimal makespan \(\text{ MS}(\sigma, p)\) assuming p processors ( let \(p = \infty \) if the processors are unlimited ). Let \({MS}_{opt}(p)\) be the value of the makespan of an optimal schedule with p processors:

    $${ MS}_{opt}(p) {=\min }_{\sigma }{ MS}(\sigma, p).$$

If the first task is scheduled at time 0, which is a common assumption, the expression of the makespan can be reduced to \(\text{ MS}(\sigma, p) {=\max }_{v\in V }\{\sigma (v) + w(v)\}\). Weights extend to paths in G as usual; if \(\Phi = ({T}_{1},{T}_{2},\ldots, {T}_{n})\) denotes a path in G, then \(w(\Phi ) ={ \sum \nolimits }_{i=1}^{n}w({T}_{i})\). Because schedules respect dependences, the following easy bound on the makespan is readily obtained:

Proposition 1

Let \(G = (V,E,w)\) be a task graph and \(\sigma \) a schedule for G with p processors. Then, \(\text{ MS}(\sigma, p) \geq w(\Phi )\) for all paths \(\Phi \) in G.

The last definition introduces the notions of speedup and efficiency for schedules.

Definition 4

Let G = (V, E, w) be a task graph and \(\sigma \) a schedule for G with p processors:

  1. 1.

    The speedup is the ratio \(s(\sigma, p) = \frac{\text{ Seq}} {\text{ MS} (\sigma, p)}\), where \(\text{ Seq} ={ \sum \nolimits }_{v\in V }\;w(v)\) is the sum of all task weights (\(\text{ Seq}\) is the optimal execution time \({\text{ MS}}_{opt}(1)\) of a schedule with a single processor).

  2. 2.

    The efficiency is the ratio \(e(\sigma, p) = \frac{s(\sigma, p)} {p} = \frac{\text{ Seq}} {p\times \text{ MS}(\sigma, p)}\).

Theorem 1

Let \(G = (V,E,w)\) be a task graph. For any schedule \(\sigma \) with p processors,

$$0 \leq e(\sigma, p) \leq 1.$$

Proof

Consider the execution of \(\sigma \) as illustrated in Fig. 2 (this is a fictitious example, not related to the triangular system example). At any time during execution, some processors are active, and some are idle. At the end, all tasks have been processed. Let Idle denote the cumulated idle time of the p processors during the whole execution. Because Seq is the sum of all task weights, the quantity Seq + Idle is equal to the area of the rectangle in Fig. 2, that is, the product of the number of processors by the makespan of the schedule: \(\text{ Seq} + \text{ Idle} = p \times \text{ MS}(\sigma, p)\). Hence, \(e(\sigma, p) = \frac{\text{ Seq}} {p\,\times \,\text{ MS} (\sigma, p)} \leq 1\).

Task Graph Scheduling. Fig. 2
figure 2_42figure 2_42

Active and idle processors during execution

Solving Pb

Let \(G = (V,E,w)\) be a given task graph and assume unlimited processors. Remember that a schedule \(\sigma \) for G is said to be optimal if its makespan \(\text{ MS}(\sigma, \infty )\) is minimal, that is, if \(\text{ MS}(\sigma, \infty ) ={ \text{ MS}}_{opt}(\infty )\).

Definition 5

Let \(G = (V,E,w)\) be a task graph.

  1. 1.

    For v ∈ V, PRED(v) denotes the set of all immediate predecessors of v, and SUCC(v) the set of all its immediate successors.

  2. 2.

    v ∈ V is an entry (top) vertex if and only if \(\text{ PRED}(v) = \emptyset \).

  3. 3.

    v ∈ V is an exit (bottom) vertex if and only if \(\text{ SUCC}(v) = \emptyset \).

  4. 4.

    For v ∈ V, the top level tl(v) is the largest weight of a path from an entry vertex to v, excluding the weight of v.

  5. 5.

    For v ∈ V, the bottom level bl(v) is the largest weight of a path from v to an output vertex, including the weight of v.

In the example of the triangular system, there is a single entry vertex, T 1, 1, and a single exit vertex, T n, n . The top level of T 1, 1 is 0, and tl(T 1, 2) = tl(T 1, 1) + w(T 1, 1) = 1. The value of T 2, 3 is

$$\begin{array}{rcl} tl({T}_{2,3})& =& \max \{w({T}_{1,1}) + w({T}_{1,2}) \\ & & \!\!+w({T}_{2,2}),w({T}_{1,1}) + w({T}_{1,3})\} = 3\end{array}$$

because there are two paths from the entry vertex to T 2, 3.

The top level of a vertex can be computed by a traversal of the DAG; the top level of an entry vertex is 0, while the top level of a non-entry vertex v is

$$tl(v) =\max \{ tl(u) + w(u);u \in \text{ PRED}(v)\}.$$

Similarly, \(bl(v) =\max \{ bl(u);u \in \text{ SUCC}(v)\} + w(v)\) (and \(bl(v) = w(v)\) for an exit vertex v). The top level of a vertex is the earliest possible time at which it can be executed, while its bottom level represents a lower bound of the remaining execution time once starting its execution. This can be stated more formally as follows.

Theorem 2

Let G = (V,E,w) be a task graph and define σ free as follows:

$$\forall v \in V,\;{\sigma }_{free}(v) = tl(v).$$

Then, σ free is an optimal schedule for G.

From Theorem 2:

$${\text{ MS}}_{opt}(\infty ) = \text{ MS}({\sigma }_{free},\infty ) {=\max }_{v\in V }\{tl(v) + w(v)\}.$$

Hence, MS opt () is simply the maximal weight of a path in the graph. Note that σ free is not the only optimal schedule.

Corollary 1

Let \(G = (V,E,w)\) be a directed acyclic graph. \(\text{ Pb}(\infty )\) can be solved in time \(O(\vert V \vert + \vert E\vert )\).

Going back to the triangular system (Fig. 1), because all tasks have weight 1, the weight of a path is equal to its length plus 1. The longest path is

$${T}_{1,1} \rightarrow {T}_{1,2} \rightarrow {T}_{2,2} \rightarrow \cdots \rightarrow {T}_{n-1,n-1} \rightarrow {T}_{n,-1,n} \rightarrow {T}_{n,n},$$

whose weight is 2n − 1. Not as many processors as tasks are needed to achieve execution within 2n − 1 time units. For example, only n − 1 processors can be used. Let \(1 \leq i \leq n\); at time 2i − 2, processor P 1 starts the execution of task T i, i , while at time 2i − 1, the first ni processors P 1, P 2, , \({P}_{n-i}\) execute tasks \({T}_{i,j}\), \(i + 1 \leq j \leq n\).

NP-completeness of Pb p)

Definition 6

The decision problem Dec(p) associated with Pb(p) is as follows. Given a task graph G = (V, E, w), a number of processors p ≥ 1, and an execution bound \(K \in {\mathbb{N}}^{{_\ast}}\), does there exist a schedule σ for G using at most p processors, such that MS(σ, p) ≤ K? The restriction of Dec(p) to independent tasks (no dependence, that is, when \(E = \emptyset \)) is denoted Indep-tasks(p). In both problems, p is arbitrary (it is part of the problem instance). When p is fixed a priori, say p = 2, problems are denoted as Dec(2) and Indep-tasks(2).

Well-known complexity results are summarized in the following theorem.

Theorem 3

  • Indep-tasks(2) is NP-complete but can be solved by a pseudo-polynomial algorithm. Moreover, \(\forall \,\epsilon> 0\) , Indep-tasks(2) admits a \((1 + \epsilon )\) -approximation whose complexity is polynomial in \(\frac{1} {\epsilon }\).

  • Indep-tasks p is NP-complete in the strong sense.

  • Dec(2) ( and hence Dec(p) ) is NP-complete in the strong sense.

List Scheduling Heuristics

Because Pb(p) is NP-complete, heuristics are used to schedule task graphs with limited processors. The most natural idea is to use greedy strategies: At each instant, try to schedule as many tasks as possible onto available processors. Such strategies deciding not to deliberately keep a processor idle are called list scheduling algorithms. Of course, there are different possible strategies to decide which tasks are given priority in the (frequent) case where there are more free tasks than available processors. But a key result due to Graham [10] is that any list algorithm can be shown to achieve at most twice the optimal makespan.

Definition 7

Let G = (V, E, w) be a task graph and let σ be a schedule for G. A task v ∈ V is free at time t (note v ∈ FREE(σ, t) ) if and only if its execution has not yet started \((\sigma (v) \geq t)\) but all its predecessors have been executed \((\forall \,u \in \text{ PRED}(v),\ \sigma (u) + w(u) \leq t)\).

A list schedule is a schedule such that no processor is deliberately left idle; at each time t, if | FREE(σ, t) | = r ≥ 1, and if q processors are available, then min(r, q) free tasks start executing.

Theorem 4

Let G = (V,E,w) be a task graph and assume there are p available processors. Let σ be any list schedule of G. Let MS opt (p) be the makespan of an optimal schedule. Then,

$$\text{ MS}(\sigma, p) \leq \left (2 -\frac{1} {p}\right ){\text{ MS}}_{opt}(p).$$

It is important to point out that Theorem 4 holds for any list schedule, regardless of the strategy to choose among free tasks when there are more free tasks than available processors.

Lemma 1

There exists a dependence path Φ in G whose weight w(Φ) satisfies

$${\rm Idle} \leq (p - 1) \times {\rm w}(\Phi ),$$

where Idle is the cumulated idle time of the p processors during the whole execution of the list schedule.

Proof

Define the ancestors of a task are its predecessors, the predecessors of its predecessors, and so on. Let \({T}_{{i}_{1}}\) be a task whose execution terminates at the end of the schedule:

$$\sigma ({T}_{{i}_{1}}) + w({T}_{{i}_{1}}) = \text{ MS}(\sigma, p).$$

Let t 1 be the largest time smaller than \(\sigma ({T}_{{i}_{1}})\) and such that there exists an idle processor during the time interval \([{t}_{1},{t}_{1} + 1]\) (let \({t}_{1} = 0\) if such a time does not exist). Why is this processor idle? Because \(\sigma \) is a list schedule, no task is free at \({t}_{1}\), otherwise the idle processor would start executing a free task. Therefore, there must be a task \({T}_{{i}_{2}}\) that is an ancestor of \({T}_{{i}_{1}}\) and that is being executed at time \({t}_{1}\); otherwise \({T}_{{i}_{1}}\) would have been started at time \({t}_{1}\) by the idle processor. Because of the definition of \({t}_{1}\), it is known that all processors are active between the end of the execution of \({T}_{{i}_{2}}\) and the beginning of the execution of \({T}_{{i}_{1}}\).

Then, start the construction again from \({T}_{{i}_{2}}\) so as to obtain a task \({T}_{{i}_{3}}\) such that all processors are active between the end of \({T}_{{i}_{3}}\) and the beginning of \({T}_{{i}_{2}}\). Iterating the process, one ends up with r tasks \({T}_{{i}_{r}},{T}_{{i}_{r-1}},\ldots, {T}_{{i}_{1}}\) that belong to a dependence path \(\Phi \) of G and such that all processors are active except perhaps during their execution. In other words, the idleness of some processors can only occur during the execution of these r tasks, during which at least one processor is active (the one that executes the task). Hence, \(\text{ Idle} \leq (p - 1) \times {\sum \nolimits }_{j=1}^{r}w({T}_{{i}_{j}}) \leq (p - 1) \times w(\Phi )\).

Proof

Going back to the proof of Theorem 4, recall that p ×MS(σ, p) = Idle + Seq, where Seq = ∑ vV w(v) is the sequential time, that is, the sum of all task weights (see Fig. 2). Now take the dependence path Φ constructed in Lemma 1: w(Φ) ≤ MS opt (p), because the makespan of any schedule is greater than the weight of all dependence paths in G (simply because dependence constraints are met). Furthermore, Seq ≤ p ×MS opt (p) (with equality only if all p processors are active all the time). Putting this together:

$$\begin{array}{lll} p \times \text{ MS}(\sigma, p)& =&\text{ Idle} + \text{ Seq} \leq (p - 1)w(\Phi ) + \text{ Seq} \\ & & \leq (p - 1){\text{ MS}}_{opt}(p) + p{\text{ MS}}_{opt}(p) \\ & =&(2p - 1){\text{ MS}}_{opt}(p), \end{array}$$

which proves the theorem.

Fundamentally, Theorem 4 says that any list schedule is within 50% of the optimum. Therefore, list scheduling is guaranteed to achieve half the best possible performance, regardless of the strategy to choose among free tasks.

Proposition 2

Let MS list (p) be the shortest possible makespan produced by a list scheduling algorithm. The bound

$${{ MS}}_{list}(p) \leq \frac{2p - 1} {p}{ { MS}}_{opt}(p)$$

is tight.

Note that implementing a list scheduling algorithm is not difficult, but it is somewhat lengthy to describe in full detail; see Casanova et al. [3].

Critical Path Scheduling

A widely used list scheduling technique is critical path scheduling. The selection criterion for free tasks is based on the value of their bottom level. Intuitively, the larger the bottom level, the more “urgent” the task. The critical path of a task is defined as its bottom level and is used to assign priority levels to tasks. Critical path scheduling is list scheduling where the priority level of a task is given by the value of its critical path. Ties are broken arbitrarily.

Consider the task graph shown in Fig. 3. There are eight tasks, whose weights and critical paths are listed in Table 1. Assume there are p = 3 available processors and let \(\mathcal{Q}\) be the priority queue of free tasks. At t = 0, \(\mathcal{Q}\) is initialized as \(\mathcal{Q} = ({T}_{3},{T}_{2},{T}_{1})\). These three tasks are executed. At t = 1, T 8 is added to the queue: \(\mathcal{Q} = ({T}_{8})\). There is one processor available, which starts the execution of T 8. At t = 2, the four successors of T 2 are added to the queue: \(\mathcal{Q} = ({T}_{5},{T}_{6},{T}_{4},{T}_{7})\). Note that ties have been broken arbitrarily (using task indices in this case). The available processor picks the first task T 5 in \(\mathcal{Q}\). Following this scheme, the execution goes on up to t = 10, as summarized in Fig. 4.

Task Graph Scheduling. Fig. 3
figure 3_42figure 3_42

A small example

Task Graph Scheduling. Table 1 Weights and critical paths for the task graph in Fig. 3
Task Graph Scheduling. Fig. 4
figure 4_42figure 4_42

Critical path schedule for the example in Fig. 3

Note that it is possible to schedule the graph in only 9 time units, as shown in Fig. 5. The trick is to leave a processor idle at time t = 1 deliberately; although it has the highest critical path, T 8 can be delayed by two time units. T 5 and T 6 are given preference to achieve a better load balance between processors. The schedule shown in Fig. 5 is optimal, because Seq = 26, so that three processors require at least \(\left\lceil\frac{26} {3}\right\rceil = 9\) time units. This small example illustrates the difficulty of scheduling with a limited number of processors.

Task Graph Scheduling. Fig. 5
figure 5_42figure 5_42

Optimal schedule for the example in Fig. 3

Taking Communication Costs into Account

The Macro-Dataflow Model

Thirty years ago, communication costs have been introduced in the scheduling literature. Because the performance of network communication is difficult to model in a way that is both precise and conducive to understanding the performance of algorithms, the vast majority of results hold for a very simple model, which is as follows.

The target platform consists of p identical processors that are part of of a fully connected clique. All interconnection links have same bandwidth. If a task T communicates data to a successor task T′, the cost is modeled as

$$\text{ cost}(T,T') = \left \{\begin{array}{ll} 0 &\text{ if }\text{ alloc}(T) = \text{ alloc}(T')\\ c(T, T') &\text{ otherwise,} \end{array} \right.$$

where alloc(T) denotes the processor that executes task T, and c(T, T′) is defined by the application specification. The time for communication between two tasks running on the same processor is negligible. This so-called macro-dataflow model makes two main assumptions: (i) communication can occur as soon as data are available and (ii) there is no contention for network links. Assumption (i) is reasonable as communication can overlap with (independent) computations in most modern computers. Assumption (ii) is much more questionable. Indeed, there is no physical device capable of sending, say, 1, 000 messages to 1, 000 distinct processors, at the same speed as if there were a single message. In the worst case, it would take 1, 000 times longer (serializing all messages). In the best case, the output bandwidth of the network card of the sender would be a limiting factor. In other words, assumption (ii) amounts to assuming infinite network resources. Nevertheless, this assumption is omnipresent in the traditional scheduling literature.

Definition 8

A communication task graph (or commTG) is a direct acyclic graph \(G = (V,E,w,c)\), where vertices represent tasks and edges represent precedence constraints. The computation weight function is \(w : V \rightarrow {\mathbb{N}}^{{_\ast}}\) and the communication cost function is \(c : E\rightarrow {\mathbb{N}}^{{_\ast}}\). A schedule \(\sigma \) must preserve dependences, which is written as

$$\forall e = (T,T') \in E,\left \{\begin{array}{ll} \sigma (T) + w(T) \leq \sigma (T') \\ \qquad \quad { if }\text{ alloc}(T) = \text{ alloc}(T') \\ \sigma (T) + w(T) + c(T,T') \leq \sigma (T') \\ \qquad \quad { otherwise.} \end{array} \right.$$

The expression of resource constraints is the same as in the no-communication case.

Complexity and List Heuristics with Communications

Including communication costs in the model makes everything difficult, including solving Pb\((\infty )\). The intuitive reason is that a trade-off must be found between allocating tasks to either many processors (hence balancing the load but communicating intensively) or few processors (leading to less communication but less parallelism as well). Here is a small example, borrowed from [9].

Consider the commTG in Fig. 6. Task weights are indicated close to the tasks within parentheses, and communication costs are shown along the edges, underlined. For the sake of this example, two non-integer communication costs are used: c(T 4, T 6) = c(T 5, T 6) = 1. 5. Of course, every weight w and cost c could be scaled to have only integer values. Observe the following:

  • On the one hand, if all tasks are assigned to the same processor, the makespan will be equal to the sum of all task weights, that is, 13.

  • On the other hand, with unlimited processors (no more than seven processors are needed because there are seven tasks), each task can be assigned to a different processor. Then, the makespan of the ASAP schedule is equal to 14. To see this, it is important to point out that once the allocation of tasks to processors is given, the makespan is computed easily: For each edge e : TT′, add a virtual node of weight c(T, T′) if the edge links two different processors (alloc(T)≠alloc(T′)), and do nothing otherwise. Then, consider the new graph as a DAG (without communications) and traverse it to compute the length of the longest path. Here, because all tasks are allocated to different processors, a virtual node is added on each edge. The longest path is \({T}_{1} \rightarrow {T}_{2} \rightarrow {T}_{7}\), whose length is \(w({T}_{1}) + c({T}_{1},{T}_{2}) + w({T}_{2}) + c({T}_{2},{T}_{7}) + w({T}_{7}) = 14\).

Task Graph Scheduling. Fig. 6
figure 6_42figure 6_42

An example commTG

There is a difficult trade-off between executing tasks in parallel (hence with several distinct processors) and minimizing communication costs. In the example, it turns out that the best solution is to use two processors, according to the schedule in Fig. 7, whose makespan is equal to 9. Using more processors does not always lead to a shorter execution time. Note that dependence constraints are satisfied in Fig. 7. For example, T 2 can start at time 1 on processor P 1 because this processor executes T 1, hence there is no need to pay the communication cost \(c({T}_{1},{T}_{2})\). By contrast, T 3 is executed on processor P 2, hence it cannot be started before time 2 even though P 2 is idle: σ(T 1) + w(T 1) + c(T 1, T 3) = 0 + 1 + 1 = 2.

Task Graph Scheduling. Fig. 7
figure 7_42figure 7_42

An optimal schedule for the example

With unlimited processors, the optimization problem becomes difficult: Pb() is NP-complete in the strong sense. Even the problem in which all task weights and communication costs have the same (unit) value, the so-called UET-UCT problem (unit execution time-unit communication time), is NP-hard [13].

With limited processors, list heuristics can be extended to take communication costs into account, but Graham’s bound does not hold any longer. For instance, the Modified Critical Path (MCP) algorithm proceeds as follows. First, bottom levels are computed using a pessimistic evaluation of the longest path, accounting for each potential communication (this corresponds to the allocation where there is a different processor per task). These bottom levels are used to determine the priority of free tasks. Then each free task is assigned to the processor that allows its earliest execution, given previous task allocation decisions. It is important to explain further what “previous task allocation decisions” means. Free tasks from the queue are processed one after the other. At any moment, it is known which processors are available and which ones are busy. Moreover, for the busy processors, it is known when they will finish computing their currently allocated tasks. Hence, it is always possible to select the processor that can begin executing the task soonest. It may well be the case that a currently busy processor is selected.

Extension to Heterogeneous Platforms

This section explains how to extend list scheduling techniques to heterogeneous platforms, that is, to platforms that consist of processors with different speeds and interconnection links with different bandwidths. Key differences with the homogeneous case are outlined.

Given a commTG with n tasks T 1, , T n , the goal is to schedule it on a platform with p heterogeneous processors P 1, , P p . There are now many parameters to instantiate:

  • Computation costs : The execution cost of T i on P q is modeled as w iq . Therefore, an n ×p matrix of values is needed to specify all computation costs. This matrix comes directly for the specific scheduling problem at hand. However, when attempting to evaluate competing scheduling heuristics over a large number of synthetic scenarios, one must generate this matrix. One can distinguish two approaches. In the first approach one generates a consistent (or uniform) matrix with w iq = w i ×γ q , where w i represents the number of operations required by T i and γ q is the inverse of the speed of P q (in operations per second). With this definition the relative speed of the processors does not depend on the particular task they execute. If instead some processors are faster for some tasks than some other processors, but slower for other tasks, one speaks of an inconsistent (or nonuniform) matrix. This corresponds to the case in which some processors are specialized for some tasks (e.g., specialized hardware or software).

  • Communication costs : Just as processors have different speeds, communication links may have different bandwidths. However, while the speed of a processor may depend upon the nature of the computation it performs, the bandwidth of a link does not depend on the nature of the bytes it transmits. It is therefore natural to assume consistent (or uniform) links. If there is a dependence e ij : T i T j , if T i is executed on P q and T j executed on P r , then the communication time is modeled as

    $$\text{ comm}(i,j,q,r) = \text{ data}(i,j) \times {v}_{qr},$$

    where data(i, j) is the data volume associated to e ij and v qr is the communication time for a unit-size message from P q to P r (i.e., the inverse of the bandwidth). Like in the homogeneous case, let v qr = 0 if q = r, that is, if both tasks are assigned the same processor. If one wishes to generate synthetic scenarios to evaluate competing scheduling heuristics, one then must generate two matrices: one of size n ×n for data and one of size p ×p for v qr .

The main list scheduling principle is unchanged. As before, the priority of each task needs to be computed, so as to decide which one to execute first when there are more free tasks than available processors. The most natural idea is to compute averages of computation and communication times, and use these to compute priority levels exactly as in the homogeneous case:

  • \(\bar w_i = \frac{{\sum\nolimits_{q = 1}^p {w_{iq} } }}{p}\), the average execution time of T i .

  • \(\overline {{\mathop{\rm comm}\nolimits} _{ij} } = {\mathop{\rm data}\nolimits} (i,j) \times \frac{{\sum\nolimits_{1 \le q,r \le p,q \ne r} {v_{qr} } }}{{p(p - 1)}}\), the average communication cost for edge e ij : T i T j .

The last (but important) modification concerns the way in which tasks are assigned to processors: Instead of assigning the current task to the processor that will start its execution first (given all already taken decisions), one should assign it to the processor that will complete its execution first (given all already taken decisions). Both choices are equivalent with homogeneous processors, but intuitively the latter is likely to be more efficient in the heterogeneous case. Altogether, this leads to the list heuristic called HEFT, for Heterogeneous Earliest Finish Time [19].

Workflow Scheduling

This section discusses workflow scheduling, that is, the problem of scheduling a (large) collection of identical task graphs rather than a single one. The main idea is to pipeline the execution of successive instances. Think of a sequence of video images that must be processed in a pipelined fashion: Each image enters the platform and follows the same processing chain, and a new image can enter the system while previous ones are still being executed. This section is intended to give a flavor of the optimization problems to be solved in such a context. It restricts to simpler problem instances.

Consider “chains,” that is, applications structured as a sequence of stages. Each stage corresponds to a different computational task. The application must process a large number of data sets, each of which must go through all stages. Each stage has its own communication and computation requirements: It reads an input from the previous stage, processes the data, and outputs a result to the next stage. Initial data are input to the first stage and final results are obtained as the output from the last stage. The pipeline operates in synchronous mode: After some initialization delay, a new task is completed every period. The period is defined as the longest “cycle-time” to operate a stage, and it is the inverse of the throughput that can be achieved.

For simplicity, it is assumed that each stage is assigned to a single processor, that is in charge of processing all instances (all data sets) for that stage. Each pipeline stage can be viewed a sequential task that may write some global data structure, to disk or to memory, for each processed data set. In this case, tasks must always be processed in a sequential order within a stage. Moreover, due to possible local updates, each stage must be mapped onto a single processor. For a given stage, one cannot process half of the tasks on one processor and the remaining half on another without maintaining global information, which might be costly and difficult to implement. In other words, a processor that is assigned a stage will execute the operations required by this stage (input, computation, and output) for all the tasks fed into the pipeline.

Of course, other assumptions are possible: some stages could be replicated, or even data-parallelized. The reader is referred the bibliographical notes at the end of the chapter for such extensions.

Objective Functions

An important metric for parallel applications that consists of many individual computations is the throughput. The throughput measures the aggregate rate of data processing; it is the rate at which data sets can enter the system. Equivalently, the inverse of the throughput, defined as the period, is the time interval required between the beginning of the execution of two consecutive data sets. The period minimization problem can be stated informally as follows: Which stage to assign to which processor so that the largest period of a processor is kept minimal?

Another important metric is derived from makespan minimization, but it must be adapted. With a large number of data sets, the total execution time is less relevant, but the execution time for each data set remains important, in particular for real-time applications. One talks of latency rather than of makespan, in order to avoid confusion. The latency is the time elapsed between the beginning and the end of the execution of a given data set, hence it measures the response time of the system to process the data set entirely.

Minimizing the latency is antagonistic to maximizing the throughput. In fact, assigning all application stages to the fastest processor (thus working in a fully sequential way) would suppress all communications and accelerate computations, thereby minimizing the latency, but achieving a very bad throughput. Conversely, mapping each stage to a different processor is likely to decrease the period, hence increase the throughput (work in a fully pipelined manner), but the resulting latency will be high, because all interstage communications must be accounted for in this latter mapping. Trade-offs will have to be found between these criteria.

How to deal with several objective functions? In traditional approaches, one would form a linear combination of the different objectives and treat the result as the new objective to optimize for. But it is not natural for the user to maximize a quantity like 0. 7T + 0. 3L, where T is the throughput and L the latency. Instead, one is more likely to fix a throughput T, and to search for the best latency that can be achieved while enforcing T? One single criterion is optimized, under the condition that a threshold is enforced for the other one.

Period and Latency

Consider a pipeline with n stages \({\mathcal{S}}_{k}\), 1 ≤ kn, as illustrated in Fig. 8. Tasks are fed into the pipeline and processed from stage to stage, until they exit the pipeline after the last stage. The k-th stage \({\mathcal{S}}_{k}\) receives an input from the previous stage, of size \({\text{ b}}_{k-1}\), performs a number of \({\text{ w}}_{k}\) operations, and outputs data of size b k to the next stage. The first stage \({\mathcal{S}}_{1}\) receives an initial input of size b 0, while the last stage \({\mathcal{S}}_{n}\) returns a final result of size \({\text{ b}}_{n}\).

Task Graph Scheduling. Fig. 8
figure 8_42figure 8_42

The application pipeline

The target platform is a clique with p processors P u , 1 ≤ up, that are fully interconnected (see Fig. 9). There is a bidirectional link link u, v : P u P v with bandwidth B u, v between each processor P u and P v , The literature often enforces more realistic communication models for workflow scheduling than for Task Graph Scheduling. For the sake of simplicity, a very strict model is enforced here: A given processor can be involved in a single communication at any time unit, either a send or a receive. Note that independent communications between distinct processor pairs can take place simultaneously. Finally, there is no overlap between communications and computations, so that all the operations of a given processor are fully sequentialized. The speed of processor P u is denoted as W u , and it takes XW u time units for P u to execute X operations. It takes XB u, v time units to send (respectively, receive) a message of size X to (respectively, from) P v .

Task Graph Scheduling. Fig. 9
figure 9_42figure 9_42

The target platform

The mapping problem consists in assigning application stages to processors. For one-to-one mappings, it is required that each stage \({\mathcal{S}}_{k}\) of the application pipeline be mapped onto a distinct processor \({P}_{\text{ alloc}(k)}\) (which is possible only if np). The function alloc associates a processor index to each stage index. For convenience, two fictitious stages \({\mathcal{S}}_{0}\) and \({\mathcal{S}}_{n+1}\) are created, assigning \({\mathcal{S}}_{0}\) to \({P}_{\text{ in}}\) and \({\mathcal{S}}_{n+1}\) to \({P}_{\text{ out}}\).

What is the period of P alloc(k), that is, the minimum delay between the processing of two consecutive tasks? To answer this question, one needs to know to which processors the previous and next stages are assigned. Let t = alloc(k − 1), u = alloc(k), and v = alloc(k + 1). P u needs b k − 1B t, u time units to receive the input data from P t , w k W u time units to process it, and b k B u, v time units to send the result to P v , hence a cycle-time of b k − 1B t, u + w k W u + b k B u, v time units for P u . These three steps are serialized (see Fig. 10 for an illustration). The period achieved with the mapping is the maximum of the cycle-times of the processors, which corresponds to the rate at which the pipeline can be activated.

Task Graph Scheduling. Fig. 10
figure 10_42figure 10_42

An example of one-to-one mapping with three stages and processors. Each processor periodically receives input data from its predecessor (), performs some computation ( ■ ), and outputs data to its successor ( ). Note that these operations are shifted in time from one processor to another. The cycle-time of P 1 and P 2 is 5 while that of P 3 is 4, hence T period = 5

In this simple instance, the optimization problem can be stated as follows: Determine a one-to-one allocation function alloc : [1, n] → [1, p] (augmented with alloc(0) = in and alloc(n + 1) = out) such that

$$\begin{array}{rcl}{T}_{\rm period} &=& \max\limits_{1\leq k\leq n}\left\{ \frac{{\rm b}_{k-1}}{B_{{\rm alloc}(k-1),{\rm alloc}(k)}} + \frac{{\rm w}_{k}}{{W}_{\rm alloc}(k)}\right. \\ & & \left. + \frac{{\rm b}_{k}}{{B}_{{\rm alloc}(k),{\rm alloc}(k+1)}}\right\} \\ \end{array}$$

is minimized.

Natural extensions are interval mappings, in which each participating processor is assigned an interval of consecutive stages. Note that when p < n interval mappings are mandatory. Intuitively, assigning several consecutive tasks to the same processor will increase its computational load, but will also decrease communication. The best interval mapping may turn out to be a one-to-one mapping, or instead may utilize only a very small number of fast computing processors interconnected by high-speed links. The optimization problem associated to interval mappings is formally expressed as follows. The intervals achieve a partition of the original set of stages \({\mathcal{S}}_{1}\) to \({\mathcal{S}}_{n}\). One searches for a partition of \([1,\ldots,n]\) into m intervals \({I}_{j} = [{d}_{j},{e}_{j}]\) such that \({d}_{j} \leq {e}_{j}\) for \(1 \leq j \leq m\), \({d}_{1} = 1\), \({d}_{j+1} = {e}_{j} + 1\) for \(1 \leq j \leq m - 1\), and e m = n. Recall that the function alloc: \([1,n] \rightarrow [1,p]\) associates a processor index to each stage index. In a one-to-one mapping, this function was a one-to-one assignment. In an interval mapping, for \(1 \leq j \leq m\), the whole interval \({I}_{j}\) is mapped onto the same processor \({P}_{\text{ alloc}({d}_{ j})}\), that is, for \({d}_{j} \leq i \leq {e}_{j}\), \(\text{ alloc}(i) = \text{ alloc}({d}_{j})\). Also, two intervals cannot be mapped to the same processor, that is, for \(1 \leq j,j' \leq m\), \(j\neq j'\), \(\text{ alloc}({d}_{j})\neq \text{ alloc}({d}_{j'})\). The period is expressed as

$$\begin{array}{rcl}{T}_{{\rm period}}& =& \max\limits_{1\leq j\leq m}\left\{\frac{{\rm b}_{{{d}_{j}}-1}} {{B}_{{\rm alloc}(d_{j}-1),{\rm alloc}(d_{j})}} + \frac{{\sum}_{i={d}_{j}}^{{e}_{j}}{{\rm w}}_{i}} {{W}_{\rm alloc}({d}_{j})} \right. \\ & & \left.+ \frac{{{\rm b}}_{{e}_{j}}} {{B}_{{\rm alloc}({d}_{j}),{\rm alloc}({e}_{j}+1)}}\right \} \\ \end{array}$$

Note that \(\text{ alloc}({d}_{j} - 1) = \text{ alloc}({e}_{j-1}) = \text{ alloc}({d}_{j-1})\) for j > 1 and d 1 − 1 = 0. Also, e j + 1 = d j + 1 for j < m, and e m + 1 = n + 1. It is still assumed that \(\text{ alloc}(0) = \text{ in}\) and \(\text{ alloc}(n + 1) = \text{ out}\). The optimization problem is then to determine the mapping that minimizes \({T}_{\text{ period}}\), over all possible partitions into intervals, and over all mappings of these intervals to the processors.

The latency of an interval mapping is computed as follows. Each data set traverses all stages, but only communications between two stages mapped on the same processors take zero time units. Overall, the latency is expressed as

$$\begin{array}{rcl}{ T}_{\text{ latency}}& =& {\sum \nolimits }_{1\leq j\leq m}\left \{ \frac{{\text{ b}}_{{d}_{j}-1}} {{B}_{\text{ alloc}({d}_{ j}-1),\text{ alloc}({d}_{j})}} + \frac{{\sum \nolimits }_{i={d}_{j}}^{{e}_{j}}{\text{ w}}_{i}} {{W}_{\text{ alloc}({d}_{ j})}} \right \} \\ & & + \frac{{\text{ b}}_{n}} {{B}_{\text{ alloc}(n),\text{ alloc}(n+1)}} \\ \end{array}$$

The latency for a one-to-one mapping obeys the same formula (with the restriction that each interval has length 1). Just as for the period, there are two minimization problems for the latency, with one-to-one and interval mappings.

It goes beyond the scope of this entry to assess the complexity of these period/latency optimization problems, and of their bi-criteria counterparts. The aim was to provide the reader with a quick primer on workflow scheduling, an activity that borrows several concepts from Task Graph Scheduling, while using more realistic platform models, and different objective functions.

Related Entries

Loop Pipelining

Modulo Scheduling and Loop Pipelining

Scheduling Algorithms

Recommended Reading

Without communication costs, pioneering work includes the book by Coffman [5]. The book by El-Rewini et al. [7] and the IEEE compilation of papers [15] provide additional material. On the theoretical side, Appendix A5 of Garey and Johnson [8] provides a list of NP-complete scheduling problems. Also, the book by Brucker [2] offers a comprehensive overview of many complexity results.

The literature with communication costs is more recent. See the survey paper by Chrétienne and Picouleau [4]. See also the book by Darte et al. [6], where many heuristics are surveyed. The book by Sinnen [16] provides a thorough discussion on communication models. In particular, it describes several extensions for modeling and accounting for communication contention.

Workflow scheduling is quite a hot topic with the advent of large-scale computing platforms. A few representative papers are [1111718].

Modern scheduling encompasses a wide spectrum of techniques: divisible load scheduling, cyclic scheduling, steady-state scheduling, online scheduling, job scheduling, and so on. A comprehensive survey is available in the book [14]. See also the handbook [12].

Most of the material presented in this entry is excerpted from the book by Casanova et al. [3].