<p>Although not being sure if the approach might be right, this example shows several workers performing a series of tasks simultaneously and synchronizing themselves before starting the next task.</p>
<p>Each worker has several tasks in order to complete his work. As an example, they have to calculate big sums. A worker can be idle during one of the tasks. The tasks are arranged so that we get a list of the first task of all workers, then a list of the second task of all workers, and so on. Idle workers are skipped. The tasks are taken out of the Task data type and returned as plain values. The notion of a worker vanishes here. The definition of the worker's tasks allows us to keep track of what each worker actually performs.</p>
<p>The function "runTasks" gets one of those groups of tasks and executes them in parallel and gathers the result of each task. But this function doesn't return until all tasks are finished. This function doesn't know what each worker is doing.</p>
<p>Once the first group of tasks is finished, a function is applied to the results (in the example, the results are simply added together).</p>
<p>Then the second group of tasks is passed to "runTasks", and so on.</p>
<p>The workers can have any number of tasks, and they can contain idle phases. That way, a worker doesn't need to join at the first task, and may skip tasks. The function "groupTasks" takes care of idle states.</p>
<li>A group of tasks must return values of the same type.</li>
<li>If each group of tasks should return values a different type, you have to define groups of workers for each different task instead of defining workers with several tasks each.</li>
<li>More flexibility can be achieved with the use of custom data types.</li>
<li>Due to the use of parallel computation, only pure functions (without side effects) can be executed. Moreover, only a few Haskell compilers (GHC among them) support parallel computation. The program must be compiled with the -threaded and -rtsopts options enabled and run with the +RTS -N commandline option for computations to be actually performed in parallel.</li>
<li>For effectful computations, you should use concurrent threads (forkIO and MVar from the module Control.Concurrent), software transactional memory (STM) or alternatives provided by other modules.</li>
<lang Haskell>import Control.Parallel
data Task a = Idle | Make a
type TaskList a = [a]
type Results a = [a]
type TaskGroups a = [TaskList a]
type WorkerList a = [Worker a]
type Worker a = [Task a]
-- run tasks in parallel and collect their results
-- the function doesn't return until all tasks are done, therefore
-- finished threads wait for the others to finish.
runTasks :: TaskList a -> Results a
runTasks [] = []
runTasks (x:[]) = x : []
runTasks (x:y:[]) = y `par` x : y : []
runTasks (x:y:ys) = y `par` x : y : runTasks ys
-- take a list of workers with different numbers of tasks and group
-- them: first the first task of each worker, then the second one etc.
groupTasks :: WorkerList a -> TaskGroups a
groupTasks [] = []
groupTasks xs
| allWorkersIdle xs = []
| otherwise =
concatMap extractTask xs : groupTasks (map removeTask xs)
-- return a task as a plain value
extractTask :: Worker a -> [a]
extractTask [] = []
extractTask (Idle:_) = []
extractTask (Make a:_) = [a]
-- remove the foremost task of each worker
removeTask :: Worker a -> Worker a
removeTask = drop 1
-- checks whether all workers are idle in this task
allWorkersIdle :: WorkerList a -> Bool
allWorkersIdle = all null . map extractTask
-- the workers must calculate big sums. the first sum of each worker
-- belongs to the first task, and so on.
-- because of laziness, nothing is computed yet.
-- worker1 has 5 tasks to do
worker1 :: Worker Integer
worker1 = map Make [ sum [1..n*1000000] | n <- [1..5] ]
-- worker2 has 4 tasks to do
worker2 :: Worker Integer
worker2 = map Make [ sum [1..n*100000] | n <- [1..4] ]
-- worker3 has 3 tasks to do
worker3 :: Worker Integer
worker3 = map Make [ sum [1..n*1000000] | n <- [1..3] ]
-- worker4 has 5 tasks to do
worker4 :: Worker Integer
worker4 = map Make [ sum [1..n*300000] | n <- [1..5] ]
-- worker5 has 4 tasks to do, but starts at the second task.
worker5 :: Worker Integer
worker5 = [Idle] ++ map Make [ sum [1..n*400000] | n <- [1..4] ]
-- group the workers' tasks
tasks :: TaskGroups Integer
tasks = groupTasks [worker1, worker2, worker3, worker4, worker5]
-- a workshop: take a function to operate the results and a group of tasks,
-- execute the tasks showing the process and process the results
workshop :: (Show a, Num a, Show b, Num b) => ([a] -> b) -> [[a]] -> IO ()
workshop func a = mapM_ doWork $ zip [1..length a] a
doWork (x, y) = do
putStrLn $ "Doing task " ++ show x ++ "."
putStrLn $ "There are " ++ show (length y) ++ " workers for this task."
putStrLn "Waiting for all workers..."
print $ func $ runTasks y
putStrLn $ "Task " ++ show x ++ " done."
main = workshop sum tasks