Metered concurrency: Difference between revisions
omit m4 |
m Fixed lang tags. |
||
Line 6: | Line 6: | ||
The interface for the counting semaphore is defined in an Ada package specification: |
The interface for the counting semaphore is defined in an Ada package specification: |
||
<lang ada> |
<lang ada>package Semaphores is |
||
protected type Counting_Semaphore(Max : Positive) is |
|||
entry Acquire; |
|||
procedure Release; |
|||
function Count return Natural; |
|||
private |
|||
Lock_Count : Natural := 0; |
|||
end Counting_Semaphore; |
|||
end Semaphores;</lang> |
|||
The ''Acquire'' entry has a condition associated with it. A task can only execute the ''Acquire'' entry when ''Lock_Count'' is less than ''Max''. This is the key to making this structure behave as a counting semaphore. This condition, and all the other aspects of ''Counting_Semaphore'' are contained in the package body. |
The ''Acquire'' entry has a condition associated with it. A task can only execute the ''Acquire'' entry when ''Lock_Count'' is less than ''Max''. This is the key to making this structure behave as a counting semaphore. This condition, and all the other aspects of ''Counting_Semaphore'' are contained in the package body. |
||
<lang ada> |
<lang ada>package body Semaphores is |
||
------------------------ |
|||
-- Counting_Semaphore -- |
|||
------------------------ |
|||
protected body Counting_Semaphore is |
|||
------------- |
|||
-- Acquire -- |
|||
------------- |
|||
entry Acquire when Lock_Count < Max is |
|||
begin |
|||
Lock_Count := Lock_Count + 1; |
|||
end Acquire; |
|||
----------- |
|||
-- Count -- |
|||
----------- |
|||
function Count return Natural is |
|||
begin |
|||
return Lock_Count; |
|||
end Count; |
|||
------------- |
|||
-- Release -- |
|||
------------- |
|||
procedure Release is |
|||
begin |
|||
if Lock_Count > 0 then |
|||
Lock_Count := Lock_Count - 1; |
|||
end if; |
|||
end Release; |
|||
end Counting_Semaphore; |
|||
end Semaphores;</lang> |
|||
We now need a set of tasks to properly call an instance of ''Counting_Semaphore''. |
We now need a set of tasks to properly call an instance of ''Counting_Semaphore''. |
||
<lang ada> |
<lang ada>with Semaphores; |
||
with Ada.Text_Io; use Ada.Text_Io; |
|||
procedure Semaphores_Main is |
|||
-- Create an instance of a Counting_Semaphore with Max set to 3 |
|||
Lock : Semaphores.Counting_Semaphore(3); |
|||
-- Define a task type to interact with the Lock object declared above |
|||
task type Worker is |
|||
entry Start (Sleep : in Duration; Id : in Positive); |
|||
end Worker; |
|||
task body Worker is |
|||
Sleep_Time : Duration; |
|||
My_Id : Positive; |
|||
begin |
|||
accept Start(Sleep : in Duration; Id : in Positive) do |
|||
My_Id := Id; |
|||
Sleep_Time := Sleep; |
|||
end Start; |
|||
--Acquire the lock. The task will suspend until the Acquire call completes |
|||
Lock.Acquire; |
|||
Put_Line("Task #" & Positive'Image(My_Id) & " acquired the lock."); |
|||
-- Suspend the task for Sleep_Time seconds |
|||
delay Sleep_Time; |
|||
-- Release the lock. Release is unconditional and happens without suspension |
|||
Lock.Release; |
|||
end Worker; |
|||
-- Create an array of 5 Workers |
|||
type Staff is array(Positive range 1..5) of Worker; |
|||
Crew : Staff; |
|||
begin |
|||
for I in Crew'range loop |
|||
Crew(I).Start(2.0, I); |
|||
end loop; |
|||
end Semaphores_Main;</lang> |
|||
=={{header|ALGOL 68}}== |
=={{header|ALGOL 68}}== |
||
<lang algol68>SEMA sem = LEVEL 1; |
|||
PROC job = (INT n)VOID: ( |
|||
printf(($" Job "d" acquired Semaphore ..."$,n)); |
|||
TO 10000000 DO SKIP OD; |
|||
printf(($" Job "d" releasing Semaphore"l$,n)) |
|||
); |
|||
PAR ( |
|||
( DOWN sem ; job(1) ; UP sem ) , |
|||
( DOWN sem ; job(2) ; UP sem ) , |
|||
( DOWN sem ; job(3) ; UP sem ) |
|||
⚫ | |||
) |
|||
Output: |
Output: |
||
<lang algol68>Job 1 acquired Semaphore ... Job 1 releasing Semaphore |
|||
Job 3 acquired Semaphore ... Job 3 releasing Semaphore |
|||
Job 2 acquired Semaphore ... Job 2 releasing Semaphore</lang> |
|||
=={{header|C}}== |
=={{header|C}}== |
||
Line 259: | Line 259: | ||
===Phobos with tools=== |
===Phobos with tools=== |
||
Using the scrapple.tools extension library for Phobos .. |
Using the scrapple.tools extension library for Phobos .. |
||
<lang d> |
<lang d>module metered; |
||
module metered; |
|||
import tools.threads, tools.log, tools.time, tools.threadpool; |
import tools.threads, tools.log, tools.time, tools.threadpool; |
||
Line 280: | Line 279: | ||
for (int i = 0; i < 10; ++i) |
for (int i = 0; i < 10; ++i) |
||
done.acquire; |
done.acquire; |
||
}</lang> |
|||
} |
|||
⚫ | |||
=={{header|E}}== |
=={{header|E}}== |
||
This semaphore slightly differs from the task description; the release operation is not on the semaphore itself but given out with each acquisition, and cannot be invoked too many times. |
This semaphore slightly differs from the task description; the release operation is not on the semaphore itself but given out with each acquisition, and cannot be invoked too many times. |
||
<lang e>def makeSemaphore(maximum :(int > 0)) { |
|||
var current := 0 |
|||
def waiters := <elib:vat.makeQueue>() |
|||
def notify() { |
|||
while (current < maximum && waiters.hasMoreElements()) { |
|||
current += 1 |
|||
waiters.optDequeue().resolve(def released) |
|||
when (released) -> { |
|||
current -= 1 |
|||
notify() |
|||
} |
|||
} |
|||
} |
|||
def semaphore { |
|||
to acquire() { |
|||
waiters.enqueue(def response) |
|||
notify() |
|||
return response |
|||
} |
|||
to count() { return current } |
|||
} |
|||
return semaphore |
|||
} |
|||
def work(label, interval, semaphore, timer, println) { |
|||
when (def releaser := semaphore <- acquire()) -> { |
|||
println(`$label: I have acquired the lock.`) |
|||
releaser.resolve( |
|||
timer.whenPast(timer.now() + interval, fn { |
|||
println(`$label: I will have released the lock.`) |
|||
}) |
|||
) |
|||
} |
|||
} |
|||
def semaphore := makeSemaphore(3) |
|||
for i in 1..5 { |
|||
work(i, 2000, semaphore, timer, println) |
|||
}</lang> |
|||
} |
|||
=={{header|Java}}== |
=={{header|Java}}== |
||
Line 392: | Line 390: | ||
Python threading module includes a semaphore implementation. This code show how to use it. |
Python threading module includes a semaphore implementation. This code show how to use it. |
||
⚫ | |||
<pre> |
|||
⚫ | |||
import threading |
import threading |
||
Line 429: | Line 426: | ||
running = 0 |
running = 0 |
||
for t in workers: |
for t in workers: |
||
t.join() |
t.join()</lang> |
||
</pre> |
|||
=={{header|Raven}}== |
=={{header|Raven}}== |
||
Counting semaphores are built in: |
Counting semaphores are built in: |
||
<lang raven># four workers may be concurrent |
|||
4 semaphore as sem |
|||
thread worker |
|||
5 each as i |
|||
sem acquire |
|||
# tid is thread id |
|||
tid "%d acquired semaphore\n" print |
|||
2000 ms |
|||
sem release |
|||
# let others acquire |
|||
100 ms |
|||
# start 10 threads |
|||
group |
|||
10 each drop worker |
|||
list as workers</lang> |
|||
Thread joining is automatic by default. |
Thread joining is automatic by default. |
||
Line 526: | Line 522: | ||
The number of concurrent jobs can be set by issuing that many echo '1''s at the begining to sem. |
The number of concurrent jobs can be set by issuing that many echo '1''s at the begining to sem. |
||
<lang bash>rm -f sem ; mkfifo sem |
|||
acquire() { |
|||
x='';while test -z "$x"; do read x; done; |
|||
} |
|||
release() { |
|||
echo '1' |
|||
} |
|||
job() { |
|||
n=$1; echo "Job $n acquired Semaphore">&2 ; sleep 2; echo "Job $n released Semaphore">&2 ; |
|||
} |
|||
( acquire < sem ; job 1 ; release > sem ) & |
|||
( acquire < sem ; job 2 ; release > sem ) & |
|||
( acquire < sem ; job 3 ; release > sem ) & |
|||
echo 'Initialize Jobs' >&2 ; echo '1' > sem</lang> |
|||
=={{header|Visual Basic .NET}}== |
=={{header|Visual Basic .NET}}== |
||
Line 550: | Line 546: | ||
This code shows using a local semaphore. Semaphores can also be named, in which case they will be shared system wide. |
This code shows using a local semaphore. Semaphores can also be named, in which case they will be shared system wide. |
||
<lang vbnet>Dim sem As New Semaphore(5, 5) 'Indicates that up to 5 resources can be aquired |
|||
sem.WaitOne() 'Blocks until a resouce can be aquired |
|||
Dim oldCount = sem.Release() 'Returns a resource to the pool |
|||
'oldCount has the Semaphore's count before Release was called</lang> |
|||
{{omit from|TI-83 BASIC}} {{omit from|TI-89 BASIC}} <!-- Does not have concurrency or background processes. --> |
{{omit from|TI-83 BASIC}} {{omit from|TI-89 BASIC}} <!-- Does not have concurrency or background processes. --> |
Revision as of 02:11, 21 November 2009
You are encouraged to solve this task according to the task description, using any language you may know.
The goal of this task is to create a counting semaphore used to control the execution of a set of concurrent units. This task intends to demonstrate coordination of active concurrent units through the use of a passive concurrent unit. The operations for a counting semaphore are acquire, release, and count. Each active concurrent unit should attempt to acquire the counting semaphore before executing its assigned duties. In this case the active concurrent unit should report that it has acquired the semaphore. It should sleep for 2 seconds and then release the semaphore.
Ada
The interface for the counting semaphore is defined in an Ada package specification: <lang ada>package Semaphores is
protected type Counting_Semaphore(Max : Positive) is entry Acquire; procedure Release; function Count return Natural; private Lock_Count : Natural := 0; end Counting_Semaphore;
end Semaphores;</lang> The Acquire entry has a condition associated with it. A task can only execute the Acquire entry when Lock_Count is less than Max. This is the key to making this structure behave as a counting semaphore. This condition, and all the other aspects of Counting_Semaphore are contained in the package body. <lang ada>package body Semaphores is
------------------------ -- Counting_Semaphore -- ------------------------
protected body Counting_Semaphore is
------------- -- Acquire -- -------------
entry Acquire when Lock_Count < Max is begin Lock_Count := Lock_Count + 1; end Acquire;
----------- -- Count -- -----------
function Count return Natural is begin return Lock_Count; end Count;
------------- -- Release -- -------------
procedure Release is begin if Lock_Count > 0 then Lock_Count := Lock_Count - 1; end if; end Release;
end Counting_Semaphore;
end Semaphores;</lang> We now need a set of tasks to properly call an instance of Counting_Semaphore. <lang ada>with Semaphores; with Ada.Text_Io; use Ada.Text_Io;
procedure Semaphores_Main is
-- Create an instance of a Counting_Semaphore with Max set to 3 Lock : Semaphores.Counting_Semaphore(3);
-- Define a task type to interact with the Lock object declared above task type Worker is entry Start (Sleep : in Duration; Id : in Positive); end Worker;
task body Worker is Sleep_Time : Duration; My_Id : Positive; begin accept Start(Sleep : in Duration; Id : in Positive) do My_Id := Id; Sleep_Time := Sleep; end Start; --Acquire the lock. The task will suspend until the Acquire call completes Lock.Acquire; Put_Line("Task #" & Positive'Image(My_Id) & " acquired the lock."); -- Suspend the task for Sleep_Time seconds delay Sleep_Time; -- Release the lock. Release is unconditional and happens without suspension Lock.Release; end Worker;
-- Create an array of 5 Workers type Staff is array(Positive range 1..5) of Worker; Crew : Staff;
begin
for I in Crew'range loop Crew(I).Start(2.0, I); end loop;
end Semaphores_Main;</lang>
ALGOL 68
<lang algol68>SEMA sem = LEVEL 1;
PROC job = (INT n)VOID: (
printf(($" Job "d" acquired Semaphore ..."$,n)); TO 10000000 DO SKIP OD; printf(($" Job "d" releasing Semaphore"l$,n))
);
PAR (
( DOWN sem ; job(1) ; UP sem ) , ( DOWN sem ; job(2) ; UP sem ) , ( DOWN sem ; job(3) ; UP sem )
)</lang> Output: <lang algol68>Job 1 acquired Semaphore ... Job 1 releasing Semaphore Job 3 acquired Semaphore ... Job 3 releasing Semaphore Job 2 acquired Semaphore ... Job 2 releasing Semaphore</lang>
C
<lang c>#include <stdio.h>
- include <stdlib.h>
- include <sys/types.h>
- include <pthread.h>
- include <unistd.h>
- include <signal.h>
- define MAX_LOCKS 3
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t mutex_cond = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; int counter=0;
void acquire() {
pthread_mutex_lock(&mutex_cond); while( counter > MAX_LOCKS) { pthread_cond_wait(&cond, &mutex_cond); } pthread_mutex_unlock(&mutex_cond); pthread_mutex_lock(&mutex); counter++; pthread_mutex_unlock(&mutex);
}
void release() {
pthread_mutex_lock(&mutex); counter--; pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex_cond); if ( counter < MAX_LOCKS ) { pthread_cond_signal(&cond); } pthread_mutex_unlock(&mutex_cond);
}
int getcnt() {
int rc; pthread_mutex_lock(&mutex); rc = counter; pthread_mutex_unlock(&mutex); return rc;
}
int running = 1;
void *worker(void *d)
{
while(running) { acquire(); printf("%08X acquired semaphore (%d)\n", pthread_self(), getcnt()); sleep(2); release(); sleep(1); }
}
- define NUM_OF_WORKERS 5
- define MAX_LOCKS 3
int main() {
pthread_t workers[NUM_OF_WORKERS]; int i, err; for(i=0; i<NUM_OF_WORKERS; i++) { err = pthread_create(&workers[i], NULL, worker, NULL); if ( err ) { fprintf(stderr, "error creating a thread..."); exit(1); } } sleep(20); running = 0; for(i=0;i<NUM_OF_WORKERS; i++) { pthread_join(workers[i], NULL); }
}</lang>
D
<lang d>module meteredconcurrency ; import std.stdio ; import std.thread ; import std.c.time ;
class Semaphore {
private int lockCnt, maxCnt ; this(int count) { maxCnt = lockCnt = count ;} void acquire() { if(lockCnt < 0 || maxCnt <= 0) throw new Exception("Negative Lock or Zero init. Lock") ; while(lockCnt == 0) Thread.getThis.yield ; // let other threads release lock synchronized lockCnt-- ; } void release() { synchronized if (lockCnt < maxCnt) lockCnt++ ; else throw new Exception("Release lock before acquire") ; } int getCnt() { synchronized return lockCnt ; }
}
class Worker : Thread {
private static int Id = 0 ; private Semaphore lock ; private int myId ; this (Semaphore l) { super() ; lock = l ; myId = Id++ ; } override int run() { lock.acquire ; writefln("Worker %d got a lock(%d left).", myId, lock.getCnt) ; msleep(2000) ; // wait 2.0 sec lock.release ; writefln("Worker %d released a lock(%d left).", myId, lock.getCnt) ; return 0 ; }
}
void main() {
Worker[10] crew ; Semaphore lock = new Semaphore(4) ; foreach(inout c ; crew) (c = new Worker(lock)).start ; foreach(inout c ; crew) c.wait ;
}</lang>
Phobos with tools
Using the scrapple.tools extension library for Phobos .. <lang d>module metered;
import tools.threads, tools.log, tools.time, tools.threadpool;
void main() {
log_threads = false; auto done = new Semaphore, lock = new Semaphore(4); auto tp = new Threadpool(10); for (int i = 0; i < 10; ++i) { tp.addTask(i /apply/ (int i) { scope(exit) done.release; lock.acquire; logln(i, ": lock acquired"); sleep(2.0); lock.release; logln(i, ": lock released"); }); } for (int i = 0; i < 10; ++i) done.acquire;
}</lang>
E
This semaphore slightly differs from the task description; the release operation is not on the semaphore itself but given out with each acquisition, and cannot be invoked too many times.
<lang e>def makeSemaphore(maximum :(int > 0)) {
var current := 0 def waiters := <elib:vat.makeQueue>() def notify() { while (current < maximum && waiters.hasMoreElements()) { current += 1 waiters.optDequeue().resolve(def released) when (released) -> { current -= 1 notify() } } } def semaphore { to acquire() { waiters.enqueue(def response) notify() return response } to count() { return current } } return semaphore
}
def work(label, interval, semaphore, timer, println) {
when (def releaser := semaphore <- acquire()) -> { println(`$label: I have acquired the lock.`) releaser.resolve( timer.whenPast(timer.now() + interval, fn { println(`$label: I will have released the lock.`) }) ) }
}
def semaphore := makeSemaphore(3) for i in 1..5 {
work(i, 2000, semaphore, timer, println)
}</lang>
Java
<lang java>public class CountingSemaphore{
private int lockCount = 0; private int maxCount;
CountingSemaphore(int Max){ maxCount = Max; } public synchronized void acquire() throws InterruptedException{ while( lockCount >= maxCount){ wait(); } lockCount++; } public synchronized void release(){ if (lockCount > 0) { lockCount--; notifyAll(); } } public synchronized int getCount(){ return lockCount; }
}
public class Worker extends Thread{
private CountingSemaphore lock; private int id;
Worker(CountingSemaphore coordinator, int num){ lock = coordinator; id = num; } Worker(){ } public void run(){ try{ lock.acquire(); System.out.println("Worker " + id + " has acquired the lock."); sleep(2000); } catch (InterruptedException e){ } finally{ lock.release(); } } public static void main(String[] args){ CountingSemaphore lock = new CountingSemaphore(3); Worker crew[]; crew = new Worker[5]; for (int i = 0; i < 5; i++){ crew[i] = new Worker(lock, i); crew[i].start(); }
}
}</lang>
Perl
See Coro::Semaphore.
Python
Python threading module includes a semaphore implementation. This code show how to use it.
<lang python>import time import threading
- Only 4 workers can run in the same time
sem = threading.Semaphore(4)
workers = [] running = 1
def worker():
me = threading.currentThread() while 1: sem.acquire() try: if not running: break print '%s acquired semaphore' % me.getName() time.sleep(2.0) finally: sem.release() time.sleep(0.01) # Let others acquire
- Start 10 workers
for i in range(10):
t = threading.Thread(name=str(i), target=worker) workers.append(t) t.start()
- Main loop
try:
while 1: time.sleep(0.1)
except KeyboardInterrupt:
running = 0 for t in workers: t.join()</lang>
Raven
Counting semaphores are built in:
<lang raven># four workers may be concurrent 4 semaphore as sem
thread worker
5 each as i sem acquire # tid is thread id tid "%d acquired semaphore\n" print 2000 ms sem release # let others acquire 100 ms
- start 10 threads
group
10 each drop worker
list as workers</lang>
Thread joining is automatic by default.
Tcl
Uses the Thread package, which is expected to form part of the overall Tcl 8.6 release. <lang tcl>package require Thread
- Create the global shared state of the semaphore
set handle semaphore0 tsv::set $handle mutex [thread::mutex create] tsv::set $handle cv [thread::cond create] tsv::set $handle count 0 tsv::set $handle max 3
- Make five worker tasks
for {set i 0} {$i<5} {incr i} {
lappend threads [thread::create -preserved {
# Not bothering to wrap this in an object for demonstration proc init {handle} { global mutex cv count max set mutex [tsv::object $handle mutex] set cv [tsv::object $handle cv] set count [tsv::object $handle count] set max [tsv::get $handle max] } proc acquire {} { global mutex cv count max thread::mutex lock [$mutex get] while {[$count get] >= $max} { thread::cond wait [$cv get] [$mutex get] } $count incr thread::mutex unlock [$mutex get] } proc release {} { global mutex cv count max thread::mutex lock [$mutex get] if {[$count get] > 0} { $count incr -1 thread::cond notify [$cv get] } thread::mutex unlock [$mutex get] }
# The core task of the worker
proc run {handle id} { init $handle acquire puts "worker $id has acquired the lock" after 2000 release puts "worker $id is done" }
# Wait for further instructions from the main thread
thread::wait
}]
}
- Start the workers doing useful work, giving each a unique id for pretty printing
set i 0 foreach t $threads {
puts "starting thread [incr i]" thread::send -async $t [list run $handle $i]
}
- Wait for all the workers to finish
foreach t $threads {
thread::release -wait $t
}</lang>
UnixPipes
The number of concurrent jobs can be set by issuing that many echo '1s at the begining to sem.
<lang bash>rm -f sem ; mkfifo sem
acquire() {
x=;while test -z "$x"; do read x; done;
}
release() {
echo '1'
}
job() {
n=$1; echo "Job $n acquired Semaphore">&2 ; sleep 2; echo "Job $n released Semaphore">&2 ;
}
( acquire < sem ; job 1 ; release > sem ) & ( acquire < sem ; job 2 ; release > sem ) & ( acquire < sem ; job 3 ; release > sem ) &
echo 'Initialize Jobs' >&2 ; echo '1' > sem</lang>
Visual Basic .NET
This code shows using a local semaphore. Semaphores can also be named, in which case they will be shared system wide.
<lang vbnet>Dim sem As New Semaphore(5, 5) 'Indicates that up to 5 resources can be aquired sem.WaitOne() 'Blocks until a resouce can be aquired Dim oldCount = sem.Release() 'Returns a resource to the pool 'oldCount has the Semaphore's count before Release was called</lang>