Checkpoint synchronization: Difference between revisions

From Rosetta Code
Content added Content deleted
No edit summary
Line 187: Line 187:
D ends shift
D ends shift
</pre>
</pre>

=={{header|C}}==
Using OpenMP. Compiled with <code>gcc -Wall -fopenmp</code>.
<lang C>#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <omp.h>

int main()
{
int jobs = 41, tid;
omp_set_num_threads(5);

#pragma omp parallel shared(jobs) private(tid)
{
tid = omp_get_thread_num();
while (jobs > 0) {
/* this is the checkpoint */
#pragma omp barrier
if (!jobs) break;

printf("%d: taking job %d\n", tid, jobs--);
usleep(100000 + rand() / (double) RAND_MAX * 3000000);
printf("%d: done job\n", tid);
}

printf("[%d] leaving\n", tid);

/* this stops jobless thread from exiting early and killing workers */
#pragma omp barrier
}

return 0;
}</lang>


=={{header|E}}==
=={{header|E}}==
Line 308: Line 342:
}
}
interp.waitAtTop(promiseAllFulfilled(waits))</lang>
interp.waitAtTop(promiseAllFulfilled(waits))</lang>

=={{header|Go}}==
=={{header|Go}}==
As of February 2011, Go has checkpoint synchronization in the standard library, with a type called WaitGroup in the package sync. Code below uses this feature and completes the task with the workshop scenario, including workers joining and leaving.
As of February 2011, Go has checkpoint synchronization in the standard library, with a type called WaitGroup in the package sync. Code below uses this feature and completes the task with the workshop scenario, including workers joining and leaving.

Revision as of 22:10, 18 June 2011

Task
Checkpoint synchronization
You are encouraged to solve this task according to the task description, using any language you may know.

The checkpoint synchronization is a problem of synchronizing multiple tasks. Consider a workshop where several workers (tasks) assembly details of some mechanism. When each of them completes his work they put the details together. There is no store, so a worker who finished its part first must wait for others before starting another one. Putting details together is the checkpoint at which tasks synchronize themselves before going their paths apart.

The task

Implement checkpoint synchronization in your language.

Make sure that the solution is race condition-free. Note that a straightforward solution based on events is exposed to race condition. Let two tasks A and B need to be synchronized at a checkpoint. Each signals its event (EA and EB correspondingly), then waits for the AND-combination of the events (EA&EB) and resets its event. Consider the following scenario: A signals EA first and gets blocked waiting for EA&EB. Then B signals EB and loses the processor. Then A is released (both events are signaled) and resets EA. Now if B returns and enters waiting for EA&EB, it gets lost.

When a worker is ready it shall not continue before others finish. A typical implementation bug is when a worker is counted twice within one working cycle causing its premature completion. This happens when the quickest worker serves its cycle two times while the laziest one is lagging behind.

If you can, implement workers joining and leaving.

Ada

<lang Ada>with Ada.Calendar; use Ada.Calendar; with Ada.Numerics.Float_Random; with Ada.Text_IO; use Ada.Text_IO;

procedure Test_Checkpoint is

  package FR renames Ada.Numerics.Float_Random;
  No_Of_Cubicles: constant Positive := 3;
    -- That many workers can work in parallel
  No_Of_Workers: constant Positive := 6;
    -- That many workers are potentially available
    -- some will join the team when others quit the job
  type Activity_Array is array(Character) of Boolean;
    -- we want to know who is currently working
  protected Checkpoint is
     entry Deliver;
     entry Join (Label : out Character; Tolerance: out Float);
     entry Leave(Label : in Character);
  private
     Signaling     : Boolean   := False;
     Ready_Count   : Natural   := 0;
     Worker_Count  : Natural   := 0;
     Unused_Label  : Character := 'A';
     Likelyhood_To_Quit: Float := 1.0;
     Active        : Activity_Array := (others => false);
     entry Lodge;
  end Checkpoint;
  protected body Checkpoint is
     entry Join (Label : out Character; Tolerance: out Float)
     when not Signaling and Worker_Count < No_Of_Cubicles is
     begin
        Label        := Unused_Label;
        Active(Label):= True;
        Unused_Label := Character'Succ (Unused_Label);
        Worker_Count := Worker_Count + 1;
        Likelyhood_To_Quit := Likelyhood_To_Quit / 2.0;
        Tolerance    := Likelyhood_To_Quit;
     end Join;
     entry Leave(Label: in Character) when not Signaling is
     begin
        Worker_Count  := Worker_Count - 1;
        Active(Label) := False;
     end Leave;
     entry Deliver when not Signaling is
     begin
        Ready_Count := Ready_Count + 1;
        requeue Lodge;
     end Deliver;
     entry Lodge when Ready_Count = Worker_Count or Signaling is
     begin
        if Ready_Count = Worker_Count then
           Put("---Sync Point [");
           for C in Character loop
              if Active(C) then
                 Put(C);
              end if;
           end loop;
           Put_Line("]---");
        end if;
        Ready_Count := Ready_Count - 1;
        Signaling   := Ready_Count /= 0;
     end Lodge;
  end Checkpoint;
  task type Worker;
  task body Worker is
     Dice      : FR.Generator;
     Label     : Character;
     Tolerance : Float;
     Shift_End : Time := Clock + 2.0;
       -- Trade unions are hard!
  begin
     FR.Reset (Dice);
     Checkpoint.Join (Label, Tolerance);
     Put_Line(Label & " joins the team");
     loop
        Put_Line (Label & " is working");
        delay Duration (FR.Random (Dice) * 0.500);
        Put_Line (Label & " is ready");
        Checkpoint.Deliver;
        if FR.Random(Dice) < Tolerance then
           Put_Line(Label & " leaves the team");
           exit;
        elsif Clock >= Shift_End then
           Put_Line(Label & " ends shift");
           exit;
        end if;
     end loop;
     Checkpoint.Leave(Label);
  end Worker;
  Set : array (1..No_Of_Workers) of Worker;

begin

  null; -- Nothing to do here

end Test_Checkpoint;

</lang> Sample output:

A joins the team
A is working
B joins the team
B is working
C joins the team
C is working
B is ready
C is ready
A is ready
---Sync Point [ABC]---
A is working
C is working
B is working
C is ready
B is ready
A is ready
---Sync Point [ABC]---
A is working
B is working
C is working
B is ready
C is ready
A is ready
---Sync Point [ABC]---
A leaves the team
D joins the team
D is working
C is working
B is working
C is ready
B is ready
D is ready
---Sync Point [BCD]---
D is working
C is working
B is working
C is ready
B is ready
D is ready
---Sync Point [BCD]---
D is working
C is working
B is working
D is ready
B is ready
C is ready
---Sync Point [BCD]---
C leaves the team
E joins the team
E is working
D is working
B leaves the team
F joins the team
F is working
D is ready
E is ready
F is ready
---Sync Point [DEF]---
D is working
F is working
E is working
D is ready
F is ready
E is ready
---Sync Point [DEF]---
E ends shift
F ends shift
D ends shift

C

Using OpenMP. Compiled with gcc -Wall -fopenmp. <lang C>#include <stdio.h>

  1. include <stdlib.h>
  2. include <unistd.h>
  3. include <omp.h>

int main() {

       int jobs = 41, tid;
       omp_set_num_threads(5);
       #pragma omp parallel shared(jobs) private(tid)
       {
               tid = omp_get_thread_num();
               while (jobs > 0) {
                       /* this is the checkpoint */
                       #pragma omp barrier
                       if (!jobs) break;
                       printf("%d: taking job %d\n", tid, jobs--);
                       usleep(100000 + rand() / (double) RAND_MAX * 3000000);
                       printf("%d: done job\n", tid);
               }
               printf("[%d] leaving\n", tid);
               /* this stops jobless thread from exiting early and killing workers */
               #pragma omp barrier
       }
       return 0;

}</lang>

E

The problem as stated is somewhat unnatural in E. We would prefer to define the control flow in association with the data flow; for example, such that the workers return values that are combined at the checkpoint; the availability of that result value naturally defines when the workers should proceed with the next round.

That said, here is an implementation of the task as stated. We start by defining a 'flag set' data structure (which is hopefully also useful for other problems), which allows us to express the checkpoint algorithm straightforwardly while being protected against the possibility of a task calling deliver or leave too many times. Note also that each task gets its own reference denoting its membership in the checkpoint group; thus it can only speak for itself and not break any global invariants.

<lang e>/** A flagSet solves this problem: There are N things, each in a true or false

 * state, and we want to know whether they are all true (or all false), and be
 * able to bulk-change all of them, and all this without allowing double-
 * counting -- setting a flag twice is idempotent.
 */

def makeFlagSet() {

 # Each flag object is either in the true set or the false set.
 def trues := [].asSet().diverge()
 def falses := [].asSet().diverge()
 return def flagSet {
   /** Add a flag to the set. */
   to join() {
     def flag {
       /** Get the value of this flag. */
       to get() :boolean {
         
       }
       /** Set the value of this flag. */
       to put(v :boolean) {
         def [del,add] := if (v) { [falses,trues] } else { [trues,falses] }
         if (del.contains(flag)) {
           del.remove(flag)
           add.addElement(flag)
         }
       }
       /** Remove this flag from the set. */
       to leave() :void {
         trues.remove(flag)
         falses.remove(flag)
       }
     }
     falses.addElement(flag)
     return flag
   }
   /** Are all the flags true (none false)? */
   to allTrue() { return falses.size().isZero() }
   /** Are all the flags false (none true)? */
   to allFalse() { return trues.size().isZero() }
   /** Set all the flags to the same value. */
   to setAll(v :boolean) {
     def [del,add] := if (v) { [falses,trues] } else { [trues,falses] }
     add.addAll(del)
     del.removeAll(del)
   }
 }

}

def makeCheckpoint() {

 def [var continueSignal, var continueRes] := Ref.promise()
 def readies := makeFlagSet()
 
 /** Check whether all tasks have reached the checkpoint, and if so send the
   * signal and go to the next round. */
 def check() {
   if (readies.allTrue()) {
     readies.setAll(false)
     
     continueRes.resolve(null)    # send the continue signal
     
     def [p, r] := Ref.promise()  # prepare a new continue signal
     continueSignal := p
     continueRes := r
   }
 }
 
 return def checkpoint {
   to join() {
     def &flag := readies.join()
     return def membership {
       to leave() {
         (&flag).leave()
         check <- ()
       }
       to deliver() {
         flag := true
         check <- ()
         return continueSignal
       }
     }
   }
 }

}

def makeWorker(piece, checkpoint) {

 def stops := timer.now() + 3000 + entropy.nextInt(2000)
 var count := 0
 def checkpointMember := checkpoint <- join()
 def stopped
 def run() {
   # Pretend to do something lengthy; up to 1000 ms.
   timer.whenPast(timer.now() + entropy.nextInt(1000), fn {
     if (timer.now() >= stops) {
       checkpointMember <- leave()
       bind stopped := true
     } else {
       count += 1
       println(`Delivering $piece#$count`)
       when (checkpointMember <- deliver()) -> {
         println(`Delivered $piece#$count`)
         run()
       }
     }
   })
 }
 run()
 return stopped

}

def checkpoint := makeCheckpoint() var waits := [] for piece in 1..5 {

 waits with= makeWorker(piece, checkpoint)

} interp.waitAtTop(promiseAllFulfilled(waits))</lang>

Go

As of February 2011, Go has checkpoint synchronization in the standard library, with a type called WaitGroup in the package sync. Code below uses this feature and completes the task with the workshop scenario, including workers joining and leaving.

Also see the Go solution(s) to concurrent computing That is a much simpler task, and shown there are two different implementations of checkpoint synchronization. <lang go>package main

import (

   "log"
   "os"
   "rand"
   "sync"
   "time"

)

const nMech = 5 const detailsPerMech = 4

var l = log.New(os.Stdout, "", 0)

func main() {

   assemble := make(chan int)
   var complete sync.WaitGroup
   go solicit(assemble, &complete, nMech*detailsPerMech)
   for i := 1; i <= nMech; i++ {
       complete.Add(detailsPerMech)
       for j := 0; j < detailsPerMech; j++ {
           assemble <- 0
       }
       // Go checkpoint feature
       complete.Wait()
       // checkpoint reached
       l.Println("mechanism", i, "completed")
   }

}

func solicit(a chan int, c *sync.WaitGroup, nDetails int) {

   rand.Seed(time.Nanoseconds())
   var id int // worker id, for output
   for nDetails > 0 {
       time.Sleep(5e8 + rand.Int63n(5e8)) // some random time to find a worker
       id++
       // contract to assemble a certain number of details
       contract := rand.Intn(5) + 1
       if contract > nDetails {
           contract = nDetails
       }
       dword := "details"
       if contract == 1 {
           dword = "detail"
       }
       l.Println("worker", id, "contracted to assemble", contract, dword)
       go worker(a, c, contract, id)
       nDetails -= contract
   }

}

func worker(a chan int, c *sync.WaitGroup, contract, id int) {

   // some random time it takes for this worker to assemble a detail
   assemblyTime := 5e8 + rand.Int63n(5e8)
   l.Println("worker", id, "enters shop")
   for i := 0; i < contract; i++ {
       <-a
       l.Println("worker", id, "assembling")
       time.Sleep(assemblyTime)
       l.Println("worker", id, "completed detail")
       c.Done()
   }
   l.Println("worker", id, "leaves shop")

}</lang> Output:

worker 1 contracted to assemble 2 details
worker 1 enters shop
worker 1 assembling
worker 2 contracted to assemble 5 details
worker 2 enters shop
worker 2 assembling
worker 1 completed detail
worker 1 assembling
worker 2 completed detail
worker 2 assembling
worker 3 contracted to assemble 1 detail
worker 3 enters shop
worker 1 completed detail
worker 1 leaves shop
worker 2 completed detail
mechanism 1 completed
worker 3 assembling
worker 2 assembling

...

worker 5 completed detail
worker 7 completed detail
worker 7 leaves shop
mechanism 4 completed
worker 6 assembling
worker 5 assembling
worker 6 completed detail
worker 6 assembling
worker 5 completed detail
worker 5 leaves shop
worker 6 completed detail
worker 6 assembling
worker 6 completed detail
worker 6 leaves shop
mechanism 5 completed

J

The current implementations of J are all single threaded. However, the language definition offers a lot of parallelism which I imagine will eventually be supported, once performance gains significantly better than a factor of 2 on common problems become economically viable.

For example in 1 2 3 + 4 5 6, we have three addition operations which are specified to be carried out in parallel, and this kind of parallelism pervades the language definition.

Java

<lang Java>import java.util.Scanner; import java.util.Random;

public class CheckpointSync{ public static void main(String[] args){ System.out.print("Enter number of workers to use: "); Scanner in = new Scanner(System.in); Worker.nWorkers = in.nextInt(); System.out.print("Enter number of tasks to complete:"); runTasks(in.nextInt()); }

/* * Informs that workers started working on the task and * starts running threads. Prior to proceeding with next * task syncs using static Worker.checkpoint() method. */ private static void runTasks(int nTasks){ for(int i = 0; i < nTasks; i++){ System.out.println("Starting task number " + (i+1) + "."); runThreads(); Worker.checkpoint(); } }

/* * Creates a thread for each worker and runs it. */ private static void runThreads(){ for(int i = 0; i < Worker.nWorkers; i ++){ new Thread(new Worker(i+1)).start(); } }

/* * Worker inner static class. */ public static class Worker implements Runnable{ public Worker(int threadID){ this.threadID = threadID; } public void run(){ work(); }

/* * Notifies that thread started running for 100 to 1000 msec. * Once finished increments static counter 'nFinished' * that counts number of workers finished their work. */ private synchronized void work(){ try { int workTime = rgen.nextInt(900) + 100; System.out.println("Worker " + threadID + " will work for " + workTime + " msec."); Thread.sleep(workTime); //work for 'workTime' nFinished++; //increases work finished counter System.out.println("Worker " + threadID + " is ready"); } catch (InterruptedException e) { System.err.println("Error: thread execution interrupted"); e.printStackTrace(); } }

/* * Used to synchronize Worker threads using 'nFinished' static integer. * Waits (with step of 10 msec) until 'nFinished' equals to 'nWorkers'. * Once they are equal resets 'nFinished' counter. */ public static synchronized void checkpoint(){ while(nFinished != nWorkers){ try { Thread.sleep(10); } catch (InterruptedException e) { System.err.println("Error: thread execution interrupted"); e.printStackTrace(); } } nFinished = 0; }

/* inner class instance variables */ private int threadID;

/* static variables */ private static Random rgen = new Random(); private static int nFinished = 0; public static int nWorkers = 0; } }</lang> Output:

Enter number of workers to use: 5
Enter number of tasks to complete:3
Starting task number 1.
Worker 1 will work for 882 msec.
Worker 2 will work for 330 msec.
Worker 3 will work for 618 msec.
Worker 4 will work for 949 msec.
Worker 5 will work for 805 msec.
Worker 2 is ready
Worker 3 is ready
Worker 5 is ready
Worker 1 is ready
Worker 4 is ready
Starting task number 2.
Worker 1 will work for 942 msec.
Worker 2 will work for 247 msec.
Worker 3 will work for 545 msec.
Worker 4 will work for 850 msec.
Worker 5 will work for 888 msec.
Worker 2 is ready
Worker 3 is ready
Worker 4 is ready
Worker 5 is ready
Worker 1 is ready
Starting task number 3.
Worker 2 will work for 976 msec.
Worker 1 will work for 194 msec.
Worker 4 will work for 532 msec.
Worker 3 will work for 515 msec.
Worker 5 will work for 326 msec.
Worker 1 is ready
Worker 5 is ready
Worker 3 is ready
Worker 4 is ready
Worker 2 is ready
Works with: Java version 1.5+

<lang java5>import java.util.Random; import java.util.concurrent.CountDownLatch;

public class Sync { static class Worker implements Runnable { private final CountDownLatch doneSignal; private int threadID;

public Worker(int id, CountDownLatch doneSignal) { this.doneSignal = doneSignal; threadID = id; }

public void run() { doWork(); doneSignal.countDown(); }

void doWork() { try { int workTime = new Random().nextInt(900) + 100; System.out.println("Worker " + threadID + " will work for " + workTime + " msec."); Thread.sleep(workTime); //work for 'workTime' System.out.println("Worker " + threadID + " is ready"); } catch (InterruptedException e) { System.err.println("Error: thread execution interrupted"); e.printStackTrace(); } } }

public static void main(String[] args) { int n = 3;//6 workers and 3 tasks for(int task = 1; task <= n; task++) { CountDownLatch latch = new CountDownLatch(n * 2); System.out.println("Starting task " + task); for(int worker = 0; worker < n * 2; worker++) { new Thread(new Worker(worker, latch)).start(); } try { latch.await();//wait for n*2 threads to signal the latch } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task " + task + " complete"); } } }</lang> Output:

Starting task 1
Worker 0 will work for 959 msec.
Worker 1 will work for 905 msec.
Worker 3 will work for 622 msec.
Worker 2 will work for 969 msec.
Worker 4 will work for 577 msec.
Worker 5 will work for 727 msec.
Worker 4 is ready
Worker 3 is ready
Worker 5 is ready
Worker 1 is ready
Worker 0 is ready
Worker 2 is ready
Task 1 complete
Starting task 2
Worker 0 will work for 305 msec.
Worker 2 will work for 541 msec.
Worker 4 will work for 663 msec.
Worker 1 will work for 883 msec.
Worker 3 will work for 324 msec.
Worker 5 will work for 459 msec.
Worker 0 is ready
Worker 3 is ready
Worker 5 is ready
Worker 2 is ready
Worker 4 is ready
Worker 1 is ready
Task 2 complete
Starting task 3
Worker 0 will work for 554 msec.
Worker 2 will work for 727 msec.
Worker 1 will work for 203 msec.
Worker 4 will work for 249 msec.
Worker 3 will work for 612 msec.
Worker 5 will work for 723 msec.
Worker 1 is ready
Worker 4 is ready
Worker 0 is ready
Worker 3 is ready
Worker 5 is ready
Worker 2 is ready
Task 3 complete

Perl

The perlipc man page details several approaches to interprocess communication. Here's one of my favourites: socketpair and fork. I've omitted some error-checking for brevity.

<lang perl>#!/usr/bin/perl use warnings; use strict; use v5.10;

use Socket;

my $nr_items = 3;

sub short_sleep($) {

   (my $seconds) = @_;
   select undef, undef, undef, $seconds;

}

  1. This is run in a worker thread. It repeatedly waits for a character from
  2. the main thread, and sends a value back to the main thread. A short
  3. sleep introduces random timing, just to keep us honest.

sub be_worker($$) {

   my ($socket, $value) = @_;
   for (1 .. $nr_items) {
       sysread $socket, my $dummy, 1;
       short_sleep rand 0.5;
       syswrite $socket, $value;
       ++$value;
   }
   exit;

}

  1. This function forks a worker and sends it a socket on which to talk to
  2. the main thread, as well as an initial value to work with. It returns
  3. (to the main thread) a socket on which to talk to the worker.

sub fork_worker($) {

   (my $value) = @_;
   socketpair my $kidsock, my $dadsock, AF_UNIX, SOCK_STREAM, PF_UNSPEC
       or die "socketpair: $!";
   if (fork // die "fork: $!") {
       # We're the parent
       close $dadsock;
       return $kidsock;
   }
   else {
       # We're the child
       close $kidsock;
       be_worker $dadsock, $value;
       # Never returns
   }

}

  1. Fork two workers, send them start signals, retrieve the values they send
  2. back, and print them

my $alpha_sock = fork_worker 'A'; my $digit_sock = fork_worker 1;

for (1 .. $nr_items) {

   syswrite $_, 'x'   for $alpha_sock, $digit_sock;
   sysread $alpha_sock, my $alpha, 1;
   sysread $digit_sock, my $digit, 1;
   say $alpha, $digit;

}

  1. If the main thread were planning to run for a long time after the
  2. workers had terminate, it would need to reap them to avoid zombies:

wait; wait;</lang>

A sample run:

msl@64Lucid:~/perl$ ./checkpoint 
A1
B2
C3
msl@64Lucid:~/perl$ 

PicoLisp

The following solution implements each worker as a coroutine. Therefore, it works only in the 64-bit version.

'checkpoints' takes a number of projects to do, and a number of workers. Each worker is started with a random number of steps to do (between 2 and 5), and is kept in a list of 'Staff' members. Whenever a worker finishes, he is removed from that list, until it is empty and the project is done.

'worker' takes a number of steps to perform. It "works" by printing each step, and returning NIL when done. <lang PicoLisp>(de checkpoints (Projects Workers)

  (for P Projects
     (prinl "Starting project number " P ":")
     (for
        (Staff
           (mapcar
              '((I) (worker (format I) (rand 2 5)))  # Create staff of workers
              (range 1 Workers) )
           Staff                                     # While still busy
           (filter worker Staff) ) )                 # Remove finished workers
     (prinl "Project number " P " is done.") ) )

(de worker (ID Steps)

  (co ID
     (prinl "Worker " ID " has " Steps " steps to do")
     (for N Steps
        (yield ID)
        (prinl "Worker " ID " step " N) )
     NIL ) )</lang>

Output:

: (checkpoints 2 3)  # Start two projects with 3 workers
Starting project number 1:
Worker 1 has 2 steps to do
Worker 2 has 3 steps to do
Worker 3 has 5 steps to do
Worker 1 step 1
Worker 2 step 1
Worker 3 step 1
Worker 1 step 2
Worker 2 step 2
Worker 3 step 2
Worker 2 step 3
Worker 3 step 3
Worker 3 step 4
Worker 3 step 5
Project number 1 is done.
Starting project number 2:
Worker 1 has 4 steps to do
Worker 2 has 3 steps to do
Worker 3 has 2 steps to do
Worker 1 step 1
Worker 2 step 1
Worker 3 step 1
Worker 1 step 2
Worker 2 step 2
Worker 3 step 2
Worker 1 step 3
Worker 2 step 3
Worker 1 step 4
Project number 2 is done.

PureBasic

PureBasic normally uses Semaphores and Mutex’s to synchronize parallel systems. This system only relies on semaphores between each thread and the controller (CheckPoint-procedure). For exchanging data a Mutex based message stack could easily be added, both synchronized according to this specific task or non-blocking if each worker could be allowed that freedom. <lang PureBasic>#MaxWorktime=8000 ; "Workday" in msec

Structure that each thread uses

Structure MyIO

 ThreadID.i
 Semaphore_Joining.i
 Semaphore_Release.i
 Semaphore_Deliver.i
 Semaphore_Leaving.i

EndStructure

Array of used threads

Global Dim Comm.MyIO(0)

Master loop synchronizing the threads via semaphores

Procedure CheckPoint()

 Protected i, j, maxthreads=ArraySize(Comm())
 Protected Worker_count, Deliver_count
 Repeat
   For i=1 To maxthreads
     With Comm(i)
       If TrySemaphore(\Semaphore_Leaving)
         Worker_count-1
       ElseIf TrySemaphore(\Semaphore_Deliver)
         Deliver_count+1
         If Deliver_count=Worker_count
           PrintN("All Workers reported in, starting next task.")
           Deliver_count=0
           For j=1 To maxthreads
             SignalSemaphore(Comm(j)\Semaphore_Release)
           Next j
         EndIf
       ElseIf TrySemaphore(\Semaphore_Joining)
         PrintN("A new Worker joined the force.")
         Worker_count+1: SignalSemaphore(\Semaphore_Release)
       ElseIf Worker_count=0
         ProcedureReturn 
       EndIf
     Next i
   EndWith
 ForEver
 StartAll=0

EndProcedure

A worker thread, all orchestrated by the Checkpoint() routine

Procedure Worker(ID)

 Protected EndTime=ElapsedMilliseconds()+#MaxWorktime, n
 With Comm(ID)
   SignalSemaphore(\Semaphore_Joining)
   Repeat
     Repeat ; Use a non-blocking semaphore check to avoid dead-locking at shutdown.
       If ElapsedMilliseconds()>EndTime
         SignalSemaphore(\Semaphore_Leaving)
         PrintN("Thread #"+Str(ID)+" is done.")
         ProcedureReturn
       EndIf
       Delay(1)
     Until TrySemaphore(\Semaphore_Release)
     n=Random(1000)
     PrintN("Thread #"+Str(ID)+" will work for "+Str(n)+" msec.")
     Delay(n): PrintN("Thread #"+Str(ID)+" delivering")
     SignalSemaphore(\Semaphore_Deliver)
   ForEver
 EndWith

EndProcedure

User IO & init

If OpenConsole()

 Define i, j
 Repeat
   Print("Enter number of workers to use [2-2000]: ")
   j=Val(Input())
 Until j>=2 And j<=2000
 ReDim Comm(j)
 For i=1 To j
   With Comm(i)
     \Semaphore_Release =CreateSemaphore()
     \Semaphore_Joining =CreateSemaphore()
     \Semaphore_Deliver =CreateSemaphore()
     \Semaphore_Leaving =CreateSemaphore()
     \ThreadID = CreateThread(@Worker(),i)
   EndWith
 Next
 PrintN("Work started, "+Str(j)+" workers has been called.")
 CheckPoint()
 Print("Press ENTER to exit"): Input()  

EndIf</lang>

Enter number of workers to use [2-2000]: 5
Work started, 5 workers has been called.
A new Worker joined the force.
A new Worker joined the force.
A new Worker joined the force.
A new Worker joined the force.
A new Worker joined the force.
Thread #5 will work for 908 msec.
Thread #3 will work for 405 msec.
Thread #1 will work for 536 msec.
Thread #2 will work for 632 msec.
Thread #4 will work for 202 msec.
Thread #4 delivering
Thread #3 delivering
Thread #1 delivering
Thread #2 delivering
Thread #5 delivering
All Workers reported in, starting next task.
Thread #2 will work for 484 msec.
Thread #4 will work for 836 msec.
Thread #5 will work for 464 msec.
Thread #3 will work for 251 msec.
Thread #1 will work for 734 msec.
Thread #3 delivering
Thread #5 delivering
Thread #2 delivering
Thread #1 delivering
Thread #4 delivering
All Workers reported in, starting next task.
Thread #3 will work for 864 msec.
Thread #1 will work for 526 msec.
Thread #5 will work for 145 msec.
Thread #2 will work for 762 msec.
Thread #4 will work for 283 msec.
Thread #5 delivering
Thread #4 delivering
Thread #1 delivering
Thread #2 delivering
Thread #3 delivering
All Workers reported in, starting next task.
Thread #2 will work for 329 msec.
Thread #4 will work for 452 msec.
Thread #1 will work for 176 msec.
Thread #5 will work for 702 msec.
Thread #3 will work for 500 msec.
Thread #1 delivering
Thread #2 delivering
Thread #4 delivering
Thread #3 delivering
Thread #5 delivering
All Workers reported in, starting next task.
Thread #5 will work for 681 msec.
Thread #3 will work for 71 msec.
Thread #2 will work for 267 msec.
Thread #1 will work for 151 msec.
Thread #4 will work for 252 msec.
Thread #3 delivering
Thread #1 delivering
Thread #4 delivering
Thread #2 delivering
Thread #5 delivering
All Workers reported in, starting next task.
Thread #5 will work for 963 msec.
Thread #3 will work for 378 msec.
Thread #1 will work for 209 msec.
Thread #4 will work for 897 msec.
Thread #2 will work for 736 msec.
Thread #1 delivering
Thread #3 delivering
Thread #2 delivering
Thread #5 delivering
Thread #4 delivering
All Workers reported in, starting next task.
Thread #2 will work for 44 msec.
Thread #4 will work for 973 msec.
Thread #1 will work for 700 msec.
Thread #3 will work for 505 msec.
Thread #5 will work for 256 msec.
Thread #2 delivering
Thread #5 delivering
Thread #3 delivering
Thread #1 delivering
Thread #4 delivering
All Workers reported in, starting next task.
Thread #2 will work for 703 msec.
Thread #4 will work for 296 msec.
Thread #1 will work for 702 msec.
Thread #3 will work for 99 msec.
Thread #5 will work for 114 msec.
Thread #3 delivering
Thread #5 delivering
Thread #4 delivering
Thread #1 delivering
Thread #2 delivering
All Workers reported in, starting next task.
Thread #3 will work for 97 msec.
Thread #5 will work for 192 msec.
Thread #2 will work for 762 msec.
Thread #1 will work for 232 msec.
Thread #4 will work for 484 msec.
Thread #3 delivering
Thread #5 delivering
Thread #1 delivering
Thread #4 delivering
Thread #2 delivering
All Workers reported in, starting next task.
Thread #1 will work for 790 msec.
Thread #5 will work for 602 msec.
Thread #3 will work for 105 msec.
Thread #2 will work for 449 msec.
Thread #4 will work for 180 msec.
Thread #3 delivering
Thread #4 delivering
Thread #2 delivering
Thread #2 is done.
Thread #4 is done.
Thread #3 is done.
Thread #5 delivering
Thread #5 is done.
Thread #1 delivering
Thread #1 is done.
Press ENTER to exit

Ruby

This example may be incorrect.
This code might or might not do the correct task. See comment at Talk:Checkpoint synchronization.
Please verify it and remove this message. If the example does not match the requirements or does not work, replace this message with Template:incorrect or fix the code yourself.

<lang ruby>require 'socket'

  1. A Workshop runs all of its workers, then collects their results. Use
  2. Workshop#add to add workers and Workshop#work to run them.
  3. This implementation forks some processes to run the workers in
  4. parallel. Ruby must provide Kernel#fork and 'socket' library must
  5. provide UNIXSocket.
  6. Why processes and not threads? C Ruby still has a Global VM Lock,
  7. where only one thread can hold the lock. One platform, OpenBSD, still
  8. has userspace threads, with all threads on one cpu core. Multiple
  9. processes will not compete for a single Global VM Lock and can run
  10. on multiple cpu cores.

class Workshop

 # Creates a Workshop.
 def initialize
   @sockets = {}
 end
 # Adds a worker to this Workshop. Returns a worker id _wid_ for this
 # worker. The worker is a block that takes some _args_ and returns
 # some value. Workshop#work will run the block.
 #
 # This implementation forks a process for the worker. This process
 # will use Marshal with UNIXSocket to receive the _args_ and to send
 # the return value. The _wid_ is a process id. The worker also
 # inherits _IO_ objects, which might be a problem if the worker holds
 # open a pipe or socket, and the other end never reads EOF.
 def add
   child, parent = UNIXSocket.pair
   wid = fork do
     # I am the child.
     child.close
     @sockets.each_value { |sibling| sibling.close }
     # Prevent that all the children print their backtraces (to a mess
     # of mixed lines) when user presses Control-C.
     Signal.trap("INT") { exit! }
     loop do
       # Wait for a command.
       begin
         command, args = Marshal.load(parent)
       rescue EOFError
         # Parent probably died.
         break
       end
       case command
       when :work
         # Do work. Send result to parent.
         result = yield *args
         Marshal.dump(result, parent)
       when :remove
         break
       else
         fail "bad command from workshop"
       end
     end
   end
   # I am the parent.
   parent.close
   @sockets[wid] = child
   wid
 end
 # Runs all of the workers, and collects the results in a Hash. Passes
 # the same _args_ to each of the workers. Returns a Hash that pairs
 # _wid_ => _result_, where _wid_ is the worker id and _result_ is the
 # return value from the worker.
 #
 # This implementation runs the workers in parallel, and waits until
 # _all_ of the workers finish their results. Workshop provides no way
 # to start the work without waiting for the work to finish. If a
 # worker dies (for example, by raising an Exception), then
 # Workshop#work raises a RuntimeError.
 def work(*args)
   message = [:work, args]
   @sockets.each_pair do |wid, child|
     Marshal.dump(message, child)
   end
   # Checkpoint! Wait for all workers to finish.
   result = {}
   @sockets.each_pair do |wid, child|
     begin
       # This waits until the child finishes a result.
       result[wid] = Marshal.load(child)
     rescue EOFError
       fail "Worker #{wid} died"
     end
   end
   result
 end
 # Removes a worker from the Workshop, who has a worker id _wid_.
 # If there is no such worker, raises ArgumentError.
 #
 # This implementation kills and reaps the process for the worker.
 def remove(wid)
   unless child = @sockets.delete(wid)
     raise ArgumentError, "No worker #{wid}"
   else
     Marshal.dump([:remove, nil], child)
     child.close
     Process.wait(wid)
   end
 end

end


  1. First create a Workshop.

require 'pp' shop = Workshop.new wids = []

  1. Our workers must not use the same random numbers after the fork.

@fixed_rand = false def fix_rand

 unless @fixed_rand; srand; @fixed_rand = true; end

end

  1. Start with some workers.

6.times do

 wids << shop.add do |i|
   # This worker slowly calculates a Fibonacci number.
   fix_rand
   f = proc { |n| if n < 2 then n else f[n - 1] + f[n - 2] end }
   [i, f[25 + rand(10)]]
 end

end

6.times do |i|

 # Do one cycle of work, and print the result. 
 pp shop.work(i)
 # Remove a worker.
 victim = rand(wids.length)
 shop.remove wids[victim]
 wids.slice! victim
 # Add another worker.
 wids << shop.add do |j|
   # This worker slowly calculates a number from
   # the sequence 0, 1, 2, 3, 6, 11, 20, 37, 68, 125, ...
   fix_rand
   f = proc { |n| if n < 3 then n else f[n - 1] + f[n - 2] + f[n - 3] end }
   [j, i, f[20 + rand(10)]]
 end

end

  1. Remove all workers.

wids.each { |wid| shop.remove wid } pp shop.work(6)</lang>

Example of output:

{23187=>[0, 1346269],
 17293=>[0, 1346269],
 9974=>[0, 317811],
 31730=>[0, 196418],
 30156=>[0, 2178309],
 25663=>[0, 832040]}
...
{23187=>[5, 5702887],
 17293=>[5, 832040],
 31730=>[5, 514229],
 17459=>[5, 2, 24548655],
 18683=>[5, 3, 187427],
 4494=>[5, 4, 1166220]}
{}

Tcl

This implementation works by having a separate thread handle the synchronization (inter-thread message delivery already being serialized). The alternative, using a read-write mutex, is more complex and more likely to run into trouble with multi-core machines. <lang tcl>package require Tcl 8.5 package require Thread

namespace eval checkpoint {

   namespace export {[a-z]*}
   namespace ensemble create
   variable members {}
   variable waiting {}
   variable event
   # Back-end of join operation
   proc Join {id} {

variable members variable counter if {$id ni $members} { lappend members $id } return $id

   }
   # Back-end of leave operation
   proc Leave {id} {

variable members set idx [lsearch -exact $members $id] if {$idx > -1} { set members [lreplace $members $idx $idx] variable event if {![info exists event]} { set event [after idle ::checkpoint::Release] } } return

   }
   # Back-end of deliver operation
   proc Deliver {id} {

variable waiting lappend waiting $id

variable event if {![info exists event]} { set event [after idle ::checkpoint::Release] } return

   }
   # Releasing is done as an "idle" action to prevent deadlocks
   proc Release {} {

variable members variable waiting variable event unset event if {[llength $members] != [llength $waiting]} return set w $waiting set waiting {} foreach id $w { thread::send -async $id {incr ::checkpoint::Delivered} }

   }
   # Make a thread and attach it to the public API of the checkpoint
   proc makeThread Template:Script "" {

set id [thread::create thread::wait] thread::send $id { namespace eval checkpoint { namespace export {[a-z]*} namespace ensemble create

# Call to actually join the checkpoint group proc join {} { variable checkpoint thread::send $checkpoint [list \  ::checkpoint::Join [thread::id]] } # Call to actually leave the checkpoint group proc leave {} { variable checkpoint thread::send $checkpoint [list \  ::checkpoint::Leave [thread::id]] } # Call to wait for checkpoint synchronization proc deliver {} { variable checkpoint # Do this from within the [vwait] to ensure that we're already waiting after 0 [list thread::send $checkpoint [list \  ::checkpoint::Deliver [thread::id]]] vwait ::checkpoint::Delivered } } } thread::send $id [list set ::checkpoint::checkpoint [thread::id]] thread::send $id $script return $id

   }
   # Utility to help determine whether the checkpoint is in use
   proc anyJoined {} {

variable members expr {[llength $members] > 0}

   }

}</lang> Demonstration of how this works.

Translation of: Ada

<lang tcl># Build the workers foreach worker {A B C D} {

   dict set ids $worker [checkpoint makeThread {

proc task {name} { checkpoint join set deadline [expr {[clock seconds] + 2}] while {[clock seconds] <= $deadline} { puts "$name is working" after [expr {int(500 * rand())}] puts "$name is ready" checkpoint deliver } checkpoint leave thread::release; # Ask the thread to finish }

   }]

}

  1. Set them all processing in the background

dict for {name id} $ids {

   thread::send -async $id "task $name"

}

  1. Wait until all tasks are done (i.e., they have unregistered)

while 1 {

   after 100 set s 1; vwait s; # Process events for 100ms
   if {![checkpoint anyJoined]} {

break

   }

}</lang> Output:

A is working
C is working
B is working
D is working
B is ready
A is ready
D is ready
C is ready
B is working
A is working
D is working
C is working
D is ready
A is ready
C is ready
B is ready
B is working
D is working
A is working
C is working
D is ready
C is ready
B is ready
A is ready
D is working
C is working
B is working
A is working
D is ready
A is ready
C is ready
B is ready
D is working
C is working
A is working
B is working
C is ready
A is ready
B is ready
D is ready