Checkpoint synchronization: Difference between revisions

Content deleted Content added
Ce (talk | contribs)
requires concurrency
→‎E: new example
Line 428:
Thread #1 is done.
Press ENTER to exit</pre>
 
=={{header|E}}==
 
The problem as stated is somewhat unnatural in E. We would prefer to define the control flow in association with the data flow; for example, such that the workers return values that are combined at the checkpoint; the availability of that result value naturally defines when the workers should proceed with the next round.
 
That said, here is an implementation of the task as stated. We start by defining a 'flag set' data structure (which is hopefully also useful for other problems), which allows us to express the checkpoint algorithm straightforwardly while being protected against the possibility of a task calling <code>deliver</code> or <code>leave</code> too many times. Note also that each task gets its own reference denoting its membership in the checkpoint group; thus it can only speak for itself and not break any global invariants.
 
<lang e>/** A flagSet solves this problem: There are N things, each in a true or false
* state, and we want to know whether they are all true (or all false), and be
* able to bulk-change all of them, and all this without allowing double-
* counting -- setting a flag twice is idempotent.
*/
def makeFlagSet() {
# Each flag object is either in the true set or the false set.
def trues := [].asSet().diverge()
def falses := [].asSet().diverge()
return def flagSet {
/** Add a flag to the set. */
to join() {
def flag {
/** Get the value of this flag. */
to get() :boolean {
}
/** Set the value of this flag. */
to put(v :boolean) {
def [del,add] := if (v) { [falses,trues] } else { [trues,falses] }
if (del.contains(flag)) {
del.remove(flag)
add.addElement(flag)
}
}
/** Remove this flag from the set. */
to leave() :void {
trues.remove(flag)
falses.remove(flag)
}
}
falses.addElement(flag)
return flag
}
/** Are all the flags true (none false)? */
to allTrue() { return falses.size().isZero() }
/** Are all the flags false (none true)? */
to allFalse() { return trues.size().isZero() }
/** Set all the flags to the same value. */
to setAll(v :boolean) {
def [del,add] := if (v) { [falses,trues] } else { [trues,falses] }
add.addAll(del)
del.removeAll(del)
}
}
}
 
def makeCheckpoint() {
def [var continueSignal, var continueRes] := Ref.promise()
def readies := makeFlagSet()
/** Check whether all tasks have reached the checkpoint, and if so send the
* signal and go to the next round. */
def check() {
if (readies.allTrue()) {
readies.setAll(false)
continueRes.resolve(null) # send the continue signal
def [p, r] := Ref.promise() # prepare a new continue signal
continueSignal := p
continueRes := r
}
}
return def checkpoint {
to join() {
def &flag := readies.join()
return def membership {
to leave() {
(&flag).leave()
check <- ()
}
to deliver() {
flag := true
check <- ()
return continueSignal
}
}
}
}
}
 
def makeWorker(piece, checkpoint) {
def stops := timer.now() + 3000 + entropy.nextInt(2000)
var count := 0
def checkpointMember := checkpoint <- join()
def stopped
def run() {
# Pretend to do something lengthy; up to 1000 ms.
timer.whenPast(timer.now() + entropy.nextInt(1000), fn {
if (timer.now() >= stops) {
checkpointMember <- leave()
bind stopped := true
} else {
count += 1
println(`Delivering $piece#$count`)
when (checkpointMember <- deliver()) -> {
println(`Delivered $piece#$count`)
run()
}
}
})
}
run()
return stopped
}
 
def checkpoint := makeCheckpoint()
var waits := []
for piece in 1..5 {
waits with= makeWorker(piece, checkpoint)
}
interp.waitAtTop(promiseAllFulfilled(waits))</lang>
 
=={{header|Tcl}}==