Checkpoint synchronization: Difference between revisions

Content deleted Content added
Removed version with concurrency because it wasn't working correctly
Added Haskell version with concurrency, including workers joining in and leaving the workshop.
Line 633:
main = workshop sum tasks
</lang>
<p>The following version works with the concurrency model provided by the module Control.Concurrent</p>
<p>A workshop is an MVar that holds three values: the number of workers doing something, the number of workers ready for the next task and the total number of workers at the moment.</p>
<p>A worker takes a list of actions. Before executing the actions, he joins the workshop and the total number of workers is increased. Then, he reports that he has started an action and the number of active workers is increased. Next, the worker carries out an action. After that, he reports that he is ready for the next action. The number of active and ready workers is updated. Then he enters the check point loop, where he stays until all other workers have reported being ready. Then he goes into active state again and executes the next action, and so on. After the last action, he leaves the workshop and the total number of workers is decreased.</p>
<p>The checkPoint function keeps reading the values of the MVar and looping until there are 0 or less active workers and the number of ready workers is equal to the total number of workers. At this point, the MVar is reset. In order to avoid race conditions, if there are zero active and ready workers, the function returns immediately as to allow the worker to start an action.</p>
<p>The example code prints some useful messages to the screen, such as when a check point is reached and what each worker is currently doing (it also shows his thread ID).</p>
<p>The "shop" function forks the worker threads and returns their ID's, so the threads can be killed easily from the "main" function when the user hits a key.</p>
<p>The "main" function launches three workers first, and after 5 seconds it launches two workers more. It then waits for a key press and kills all threads, if they're still active.</p>
<p>Other than the parallel version above, this code runs in the IO Monad and makes it possible to perform IO actions such as accessing the hardware. However, all actions must have the return type IO (). If the workers must return some useful values, the MVar should be extended with the necessary fields and the workers should use those fields to store the results they produce.</p>
<p>Note: This code has been tested on GHC 7.6.1 and will most probably not run under other Haskell implementations due to the use of some functions from the module Control.Concurrent. If run from GHCi or without using the -threaded compiler switch, the threads won't run in parallel.</p>
<lang Haskell>import Control.Concurrent
import Control.Monad -- needed for "forM", "forM_"
 
-- (workers working, workers done, workers total)
type Workshop = MVar (Int, Int, Int)
-- list of IO actions to be performed by one worker
type Actions = [IO ()]
 
newWorkshop :: IO Workshop
newWorkshop = newMVar (0, 0, 0)
 
-- check point: workers wait here for the other workers to
-- finish, before resuming execution/restarting
checkPoint :: Workshop -> IO ()
checkPoint w = do
(working, done, count) <- takeMVar w
-- all workers are done: reset counters and return (threads
-- resume execution or restart)
if working <= 0 && done == count
then do
putStrLn "---- Check Point"
putMVar w (0, 0, count)
-- mvar was just initialized: do nothing, just return.
-- otherwise, a race condition may arise
else if working == 0 && done == 0
then putMVar w (working, done, count)
-- workers are still working: wait for them (loop)
else do
putMVar w (working, done, count)
checkPoint w
 
-- join the workshop
addWorker :: Workshop -> ThreadId -> IO ()
addWorker w i = do
(working, done, count) <- takeMVar w
putStrLn $ "Worker " ++ show i ++ " has joined the group."
putMVar w (working, done, count + 1)
 
-- leave the workshop
removeWorker :: Workshop -> ThreadId -> IO ()
removeWorker w i = do
(working, done, count) <- takeMVar w
putStrLn $ "Worker " ++ show i ++ " has left the group."
putMVar w (working, done, count - 1)
-- increase the number of workers doing something.
-- optionally, print a message using the thread's ID
startWork :: Workshop -> ThreadId -> IO ()
startWork w i = do
(working, done, count) <- takeMVar w
putStrLn $ "Worker " ++ show i ++ " has started."
putMVar w (working + 1, done, count)
 
-- decrease the number of workers doing something and increase the
-- number of workers done. optionally, print a message using
-- the thread's ID
finishWork :: Workshop -> ThreadId -> IO ()
finishWork w i = do
(working, done, count) <- takeMVar w
putStrLn $ "Worker " ++ show i ++ " is ready."
putMVar w (working - 1, done + 1, count)
 
-- put a worker to do his tasks. the steps are:
-- 1. join the workshop "w"
-- 2. report that the worker has started an action
-- 3. perform one action
-- 4. report that the worker is ready for the next action
-- 5. wait for the other workers to finish
-- 6. repeat from 2 until the worker has nothing more to do
-- 7. leave the workshop
worker :: Workshop -> Actions -> IO ()
worker w actions = do
i <- myThreadId
addWorker w i
forM_ actions $ \action -> do
startWork w i
action
finishWork w i
checkPoint w
removeWorker w i
 
-- launch several worker threads. their thread ID's are returned
shop :: Workshop -> [Actions] -> IO [ThreadId]
shop w actions = do
forM actions $ \x -> forkIO (worker w x)
 
main = do
-- make a workshop
w <- newWorkshop
-- the workers won't be doing anything special, just wait for n
-- regular intervals. pids gathers the ID's of the threads
-- this are the first workers joining the workshop
pids1 <- shop w
[replicate 5 $ threadDelay 1300000
,replicate 10 $ threadDelay 759191
,replicate 7 $ threadDelay 965300]
-- wait for 5 secs before the next workers join
threadDelay 5000000
-- these are other workers that join the workshop later
pids2 <- shop w
[replicate 6 $ threadDelay 380000
,replicate 4 $ threadDelay 250000]
-- wait for a key press
getChar
-- kill all worker threads before exit, if they're still running
forM_ (pids1 ++ pids2) killThread</lang>
'''Output:'''
<pre style="height: 200px;overflow:scroll">
Worker ThreadId 30 has joined the group.
Worker ThreadId 31 has joined the group.
Worker ThreadId 32 has joined the group.
Worker ThreadId 30 has started.
Worker ThreadId 31 has started.
Worker ThreadId 32 has started.
Worker ThreadId 31 is ready.
Worker ThreadId 32 is ready.
Worker ThreadId 30 is ready.
---- Check Point
Worker ThreadId 32 has started.
Worker ThreadId 31 has started.
Worker ThreadId 30 has started.
Worker ThreadId 31 is ready.
Worker ThreadId 32 is ready.
Worker ThreadId 30 is ready.
---- Check Point
Worker ThreadId 32 has started.
Worker ThreadId 31 has started.
Worker ThreadId 30 has started.
Worker ThreadId 31 is ready.
Worker ThreadId 32 is ready.
Worker ThreadId 30 is ready.
---- Check Point
Worker ThreadId 32 has started.
Worker ThreadId 31 has started.
Worker ThreadId 30 has started.
Worker ThreadId 31 is ready.
Worker ThreadId 32 is ready.
Worker ThreadId 33 has joined the group.
Worker ThreadId 34 has joined the group.
Worker ThreadId 33 has started.
Worker ThreadId 34 has started.
Worker ThreadId 30 is ready.
Worker ThreadId 34 is ready.
Worker ThreadId 33 is ready.
---- Check Point
Worker ThreadId 32 has started.
Worker ThreadId 34 has started.
Worker ThreadId 31 has started.
Worker ThreadId 30 has started.
Worker ThreadId 33 has started.
Worker ThreadId 34 is ready.
Worker ThreadId 33 is ready.
Worker ThreadId 31 is ready.
Worker ThreadId 32 is ready.
Worker ThreadId 30 is ready.
---- Check Point
Worker ThreadId 31 has started.
Worker ThreadId 32 has started.
Worker ThreadId 34 has started.
Worker ThreadId 33 has started.
Worker ThreadId 30 has left the group.
Worker ThreadId 34 is ready.
Worker ThreadId 33 is ready.
Worker ThreadId 31 is ready.
Worker ThreadId 32 is ready.
---- Check Point
Worker ThreadId 34 has started.
Worker ThreadId 33 has started.
Worker ThreadId 31 has started.
Worker ThreadId 32 has started.
Worker ThreadId 34 is ready.
Worker ThreadId 33 is ready.
Worker ThreadId 31 is ready.
Worker ThreadId 32 is ready.
---- Check Point
Worker ThreadId 31 has started.
Worker ThreadId 33 has started.
Worker ThreadId 34 has left the group.
Worker ThreadId 32 has left the group.
Worker ThreadId 33 is ready.
Worker ThreadId 31 is ready.
---- Check Point
Worker ThreadId 33 has started.
Worker ThreadId 31 has started.
Worker ThreadId 33 is ready.
Worker ThreadId 31 is ready.
---- Check Point
Worker ThreadId 33 has left the group.
Worker ThreadId 31 has started.
Worker ThreadId 31 is ready.
---- Check Point
Worker ThreadId 31 has left the group.
</pre>
 
=={{header|J}}==