Metered concurrency: Difference between revisions

From Rosetta Code
Content added Content deleted
No edit summary
(→‎{{header|Ada}}: syntax highlighting)
Line 6: Line 6:


The interface for the counting semaphore is defined in an Ada package specification:
The interface for the counting semaphore is defined in an Ada package specification:
package Semaphores is
<Ada> package Semaphores is
protected type Counting_Semaphore(Max : Positive) is
protected type Counting_Semaphore(Max : Positive) is
entry Acquire;
entry Acquire;
Line 14: Line 14:
Lock_Count : Natural := 0;
Lock_Count : Natural := 0;
end Counting_Semaphore;
end Counting_Semaphore;
end Semaphores;
end Semaphores;</Ada>
The ''Acquire'' entry has a condition associated with it. A task can only execute the ''Acquire'' entry when ''Lock_Count'' is less than ''Max''. This is the key to making this structure behave as a counting semaphore. This condition, and all the other aspects of ''Counting_Semaphore'' are contained in the package body.
The ''Acquire'' entry has a condition associated with it. A task can only execute the ''Acquire'' entry when ''Lock_Count'' is less than ''Max''. This is the key to making this structure behave as a counting semaphore. This condition, and all the other aspects of ''Counting_Semaphore'' are contained in the package body.
package body Semaphores is
<Ada> package body Semaphores is
------------------------
------------------------
Line 55: Line 55:
end Counting_Semaphore;
end Counting_Semaphore;
end Semaphores;
end Semaphores;</Ada>
We now need a set of tasks to properly call an instance of ''Counting_Semaphore''.
We now need a set of tasks to properly call an instance of ''Counting_Semaphore''.
with Semaphores;
<Ada> with Semaphores;
with Ada.Text_Io; use Ada.Text_Io;
with Ada.Text_Io; use Ada.Text_Io;
Line 93: Line 93:
Crew(I).Start(2.0, I);
Crew(I).Start(2.0, I);
end loop;
end loop;
end Semaphores_Main;
end Semaphores_Main;</ada>


=={{header|ALGOL 68}}==
=={{header|ALGOL 68}}==

Revision as of 15:28, 4 October 2008

Task
Metered concurrency
You are encouraged to solve this task according to the task description, using any language you may know.

The goal of this task is to create a counting semaphore used to control the execution of a set of concurrent units. This task intends to demonstrate coordination of active concurrent units through the use of a passive concurrent unit. The operations for a counting semaphore are acquire, release, and count. Each active concurrent unit should attempt to acquire the counting semaphore before executing its assigned duties. In this case the active concurrent unit should report that it has acquired the semaphore. It should sleep for 2 seconds and then release the semaphore.

Ada

Works with: GNAT version GPL 2006

The interface for the counting semaphore is defined in an Ada package specification: <Ada> package Semaphores is

   protected type Counting_Semaphore(Max : Positive) is
      entry Acquire;
      procedure Release;
      function Count return Natural;
   private
      Lock_Count : Natural := 0;
   end Counting_Semaphore;
end Semaphores;</Ada>

The Acquire entry has a condition associated with it. A task can only execute the Acquire entry when Lock_Count is less than Max. This is the key to making this structure behave as a counting semaphore. This condition, and all the other aspects of Counting_Semaphore are contained in the package body. <Ada> package body Semaphores is

   ------------------------
   -- Counting_Semaphore --
   ------------------------ 

   protected body Counting_Semaphore is

      -------------
      -- Acquire --
      -------------

      entry Acquire when Lock_Count < Max is
      begin
         Lock_Count := Lock_Count + 1;
      end Acquire;

      -----------
      -- Count --
      -----------

      function Count return Natural is
      begin
         return Lock_Count;
      end Count;

      -------------
      -- Release --
      -------------

      procedure Release is
      begin
         if Lock_Count > 0 then
            Lock_Count := Lock_Count - 1;
         end if;
      end Release;

   end Counting_Semaphore;

end Semaphores;</Ada>

We now need a set of tasks to properly call an instance of Counting_Semaphore. <Ada> with Semaphores;

with Ada.Text_Io; use Ada.Text_Io;

procedure Semaphores_Main is
   -- Create an instance of a Counting_Semaphore with Max set to 3
   Lock : Semaphores.Counting_Semaphore(3);

   -- Define a task type to interact with the Lock object declared above
   task type Worker is
      entry Start (Sleep : in Duration; Id : in Positive);
   end Worker;

   task body Worker is
      Sleep_Time : Duration;
      My_Id : Positive;
   begin
      accept Start(Sleep : in Duration; Id : in Positive) do
         My_Id := Id;
         Sleep_Time := Sleep;
      end Start;
      --Acquire the lock. The task will suspend until the Acquire call completes
      Lock.Acquire;
      Put_Line("Task #" & Positive'Image(My_Id) & " acquired the lock.");
      -- Suspend the task for Sleep_Time seconds
      delay Sleep_Time;
      -- Release the lock. Release is unconditional and happens without suspension
      Lock.Release;
   end Worker;

   -- Create an array of 5 Workers
   type Staff is array(Positive range 1..5) of Worker;
   Crew : Staff;
begin
   for I in Crew'range loop
      Crew(I).Start(2.0, I);
   end loop;
end Semaphores_Main;</ada>

ALGOL 68

SEMA sem = LEVEL 1;

PROC job = (INT n)VOID: (
   printf(($" Job "d" acquired Semaphore ..."$,n));
   TO 10000000 DO SKIP OD;
   printf(($" Job "d" releasing Semaphore"l$,n))
);

PAR (
  ( DOWN sem ; job(1) ; UP sem ) ,
  ( DOWN sem ; job(2) ; UP sem ) ,
  ( DOWN sem ; job(3) ; UP sem )
)

Output:

Job 1 acquired Semaphore ... Job 1 releasing Semaphore
Job 3 acquired Semaphore ... Job 3 releasing Semaphore
Job 2 acquired Semaphore ... Job 2 releasing Semaphore

D

<d>module meteredconcurrency ; import std.stdio ; import std.thread ; import std.c.time ;

class Semaphore {

 private int lockCnt, maxCnt ;
 this(int count) { maxCnt = lockCnt = count ;}
 void acquire() {
   if(lockCnt < 0 || maxCnt <= 0)
     throw new Exception("Negative Lock or Zero init. Lock") ;
   while(lockCnt == 0)
     Thread.getThis.yield ; // let other threads release lock
   synchronized lockCnt-- ;  
 }
 void release() {
   synchronized 
     if (lockCnt < maxCnt)
       lockCnt++ ;
     else
       throw new Exception("Release lock before acquire") ;    
 }
 int getCnt() { synchronized return lockCnt ; }

}

class Worker : Thread {

 private static int Id = 0 ;
 private Semaphore lock ;
 private int myId ;
 this (Semaphore l) { super() ; lock = l ; myId = Id++ ; }
 override int run() {
   lock.acquire ;  
   writefln("Worker %d got a lock(%d left).", myId, lock.getCnt) ;
   msleep(2000) ;  // wait 2.0 sec
   lock.release ; 
   writefln("Worker %d released a lock(%d left).", myId, lock.getCnt) ;
   return 0 ;
 } 

}

void main() {

 Worker[10] crew ;
 Semaphore lock = new Semaphore(4) ;
 
 foreach(inout c ; crew)
   (c = new Worker(lock)).start ;
 foreach(inout c ; crew)
   c.wait ;

}</d>

Phobos with tools

Using the scrapple.tools extension library for Phobos .. <d> module metered;

import tools.threads, tools.log, tools.time, tools.threadpool;

void main() {

 log_threads = false;
 auto done = new Semaphore, lock = new Semaphore(4);
 auto tp = new Threadpool(10);
 for (int i = 0; i < 10; ++i) {
   tp.addTask(i /apply/ (int i) {
     scope(exit) done.release;
     lock.acquire;
     logln(i, ": lock acquired");
     sleep(2.0);
     lock.release;
     logln(i, ": lock released");
   });
 }
 for (int i = 0; i < 10; ++i)
   done.acquire;

} </d>

E

This semaphore slightly differs from the task description; the release operation is not on the semaphore itself but given out with each acquisition, and cannot be invoked too many times.

def makeSemaphore(maximum :(int > 0)) {
    var current := 0
    def waiters := <elib:vat.makeQueue>()
    def notify() {
        while (current < maximum && waiters.hasMoreElements()) {
            current += 1
            waiters.optDequeue().resolve(def released)
            when (released) -> {
                current -= 1
                notify()
            }
        }
    }
    def semaphore {
        to acquire() {
            waiters.enqueue(def response)
            notify()
            return response
        }
        to count() { return current }
    }
    return semaphore
}

def work(label, interval, semaphore, timer, println) {
    when (def releaser := semaphore <- acquire()) -> {
        println(`$label: I have acquired the lock.`)
        releaser.resolve(
            timer.whenPast(timer.now() + interval, fn {
                println(`$label: I will have released the lock.`)
            })
        )
    }
}

def semaphore := makeSemaphore(3)
for i in 1..5 {
    work(i, 2000, semaphore, timer, println)
}

Java

public class CountingSemaphore
{
   private int lockCount = 0;
   private int maxCount;

   CountingSemaphore(int Max)
   {
      maxCount = Max;
   }
  
   public synchronized void acquire() throws InterruptedException
   {
      while( lockCount >= maxCount)
      {
         wait();
      }
      lockCount++;
   }
   public synchronized void release()
   {
      if (lockCount > 0)
      {
         lockCount--;
         notifyAll();
      }
   }
   public synchronized int getCount()
   {
      return lockCount;
   }
}
public class Worker extends Thread
{
   private CountingSemaphore lock;
   private int id;

   Worker(CountingSemaphore coordinator, int num)
   {
      lock = coordinator;
      id = num;
   }
   Worker()
   {
   }
   public void run()
   {
      try
      {
         lock.acquire();
         System.out.println("Worker " + id + " has acquired the lock.");
         sleep(2000);
      }
      catch (InterruptedException e)
      {
      }
      finally
      {
         lock.release();
      }
   }
   public static void main(String[] args)
   {
      CountingSemaphore lock = new CountingSemaphore(3);
      Worker crew[];
      crew = new Worker[5];
      for (int i = 0; i < 5; i++)
      {
         crew[i] = new Worker(lock, i);
         crew[i].start();
      }

   }
}

Perl

See Coro::Semaphore.

Python

Python threading module includes a semaphore implementation. This code show how to use it.

import time
import threading

# Only 4 workers can run in the same time
sem = threading.Semaphore(4)

workers = []
running = 1


def worker():
    me = threading.currentThread()
    while 1:
        sem.acquire()
        try:
            if not running:
                break
            print '%s acquired semaphore' % me.getName()
            time.sleep(2.0)
        finally:
            sem.release()
        time.sleep(0.01) # Let others acquire

# Start 10 workers
for i in range(10):
    t = threading.Thread(name=str(i), target=worker)
    workers.append(t)
    t.start()

# Main loop
try:
    while 1:
        time.sleep(0.1)
except KeyboardInterrupt:
    running = 0
    for t in workers:
        t.join()

Raven

Counting semaphores are built in:

# four workers may be concurrent
4 semaphore as sem

thread worker
    5 each as i
        sem acquire
        # tid is thread id
        tid "%d acquired semaphore\n" print
        2000 ms
        sem release
        # let others acquire
        100 ms

# start 10 threads
group
    10 each drop worker
list as workers

Thread joining is automatic by default.

UnixPipes

The number of concurrent jobs can be set by issuing that many echo '1s at the begining to sem.

rm -f sem ; mkfifo sem
acquire() {
   x=;while test -z "$x"; do read x; done;
}
release() {
   echo '1'
}
job() {
   n=$1; echo "Job $n acquired Semaphore">&2 ; sleep 2; echo "Job $n released Semaphore">&2 ;
}
( acquire < sem ; job 1 ; release > sem ) &
( acquire < sem ; job 2 ; release > sem ) &
( acquire < sem ; job 3 ; release > sem ) &
echo 'Initialize Jobs' >&2 ; echo '1' > sem