Synchronous concurrency

From Rosetta Code
Revision as of 18:05, 20 November 2009 by 83.132.111.76 (talk)
Task
Synchronous concurrency
You are encouraged to solve this task according to the task description, using any language you may know.

The goal of this task is to create two concurrent activities ("Threads" or "Tasks", not processes.) that share data synchronously. Your language may provide syntax or libraries to perform concurrency. Different languages provide different implementations of concurrency, often with different names. Some languages use the term threads, others use the term tasks, while others use co-processes. This task should not be implemented using fork, spawn, or the Linux/UNIX/Win32 pipe command, as communication should be between threads, not processes.

One of the concurrent units will read from a file named "input.txt" and send the contents of that file, one line at a time, to the other concurrent unit, which will print the line it receives to standard output. The printing unit must count the number of lines it prints. After the concurrent unit reading the file sends its last line to the printing unit, the reading unit will request the number of lines printed by the printing unit. The reading unit will then print the number of lines printed by the printing unit.

This task requires two-way communication between the concurrent units. All concurrent units must cleanly terminate at the end of the program.

Ada

This Ada example starts by creating a package defining a single instance of a printing task. Ada requires packages to be separated into two parts. The package specification defines the interface to all public members of the package. <lang Ada> package Synchronous_Concurrent is

  task Printer is
     entry Put(Item : in String);
     entry Get_Count(Count : out Natural);
  end Printer;

end Synchronous_Concurrent; </lang> The package body contains the implementation of all the subprograms and tasks defined in the specification. <lang Ada> with Ada.Text_Io; use Ada.Text_Io; with Ada.Strings.Unbounded; use Ada.Strings.Unbounded;

package body Synchronous_Concurrent is

  task body Printer is
     Num_Iter : Natural := 0;
     Line     : Unbounded_String;
  begin
     loop
        select
           accept Put(Item : in String) do
              Line := To_Unbounded_String(Item);
           end Put;
           Put_Line(To_String(Line));
           Num_Iter := Num_Iter + 1;
        or
           accept Get_Count(Count : out Natural) do
              Count := Num_Iter;
           end Get_Count;
        or terminate;
        end select;
     end loop;
  end Printer;

end Synchronous_Concurrent; </lang> Note that the task body contains an accept block for each entry defined in the task specification. When some other task calls an entry in the Printer task the communication between the tasks is synchronized.

This example uses an infinite loop in the printer task. There is no way to know ahead of time how many lines the printer task will need to print. Each iteration through the loop causes the task to execute a selective accept. That means that it can either accept a call on the Put entry, or it can accept a call on the Get_Count entry. The terminate option is execute only when the program contains no more tasks that can call the entries in the Printer task. If no task has called either entry the Printer task will wait for a task to call one of the entries, or for the terminate option to apply.

The next file contains the main procedure for this program. The main or entry-point procedure for a program always runs in the environment task. For this program, the environment task is takes on the role of the file reading concurrent unit while the Printer task takes on the role of the printing concurrent unit. <lang Ada> with Synchronous_Concurrent; use Synchronous_Concurrent; with Ada.Text_Io; use Ada.Text_Io;

procedure Synchronous_Concurrent_Main is

  Num_Strings : Natural;
  The_File : File_Type;
  Line : String(1..255);
  Length : Natural;

begin

  Open(File => The_File, Mode => In_File, Name => "input.txt");
  while not End_Of_File(The_File) loop
     Get_Line(File => The_File, Item => Line, Last => Length);
     Printer.Put(Line(1..Length));
  end loop;
  Close(The_File);
  Printer.Get_Count(Num_Strings);
  New_Line;
  Put_Line("The task wrote" & Natural'Image(Num_Strings) & " strings.");

end Synchronous_Concurrent_Main; </lang> In this example only the environment task can call the entries on the Printer task. When the environment task completes the terminate option of the Printer task applies, terminating the Printer task. The environment task completes right after printing the number of lines sent through the Printer task. Because of the terminate option, the Printer task terminates just after the environment task prints the count.

ALGOL 68

(
  STRING line;
  INT count := 0, errno;
  BOOL input complete := FALSE;
  SEMA output throttle = LEVEL 0, input throttle = LEVEL 1;
 
  FILE input txt;
  errno := open(input txt, "input.txt", stand in channel);
 
  PROC call back done = (REF FILE f) BOOL: ( input complete := TRUE );
  on logical file end(input txt, call back done);
 
  PAR (
    WHILE
      DOWN input throttle;
      get(input txt,(line, new line));
      UP output throttle;
      NOT input complete
    DO
      count+:=1
    OD
  ,
    WHILE
      DOWN output throttle;
      NOT input complete
    DO
      print((line, new line));
      UP input throttle
    OD
  );
  print((count))
)

Clojure

The writer executes as an agent on a thread from a thread pool. The state of the agent is the count of written lines, initialized to 0. The reader will send the writer calls to the write-line function, which the agent will execute asynchronously. The state argument is the agent's state at the start of the call, and the last expression becomes the agent's new state.

<lang clojure> (import '(java.io FileReader BufferedReader) ;used in the reader

(def writer (agent 0))

(defn write-line [state line]

 (println line)
 (inc state))

</lang>

The reader executes on the main thread. It passes each line to the writer -- the send call returns immediately, waits until the writer has finished writing all the lines, gets the line count & prints it, and terminates the writer.

<lang clojure> (doseq [line (-> "input.txt" FileReader. BufferedReader. line-seq)]

 (send writer write-line line))

(await writer) (println "lines written:" @writer) (shutdown-agents) </lang> That's it!

Common Lisp

First, implement message-passing:

<lang lisp>(defvar *self*)

(defclass queue ()

 ((condition :initform (make-condition-variable)
             :reader condition-of) 
  (mailbox :initform '()
           :accessor mailbox-of)
  (lock :initform (make-lock)
        :reader lock-of)))

(defun message (recipient name &rest message)

 (with-lock-held ((lock-of recipient))
   ;; it would have been better to implement tail-consing or a LIFO
   (setf (mailbox-of recipient)
         (nconc (mailbox-of recipient)
                (list (list* name message))))
   (condition-notify (condition-of recipient)))
 message)

(defun mklist (x)

 (if (listp x)
     x
     (list x)))

(defun slurp-message ()

 (with-lock-held ((lock-of *self*))
   (if (not (endp (mailbox-of *self*)))
       (pop (mailbox-of *self*))
       (progn (condition-wait (condition-of *self*)
                              (lock-of *self*))
              (assert (not (endp (mailbox-of *self*))))
              (pop (mailbox-of *self*))))))

(defmacro receive-message (&body cases)

 (let ((msg-name (gensym "MESSAGE")) 
       (block-name (gensym "BLOCK")))
   `(let ((,msg-name (slurp-message))) 
      (block ,block-name
        ,@(loop for i in cases
                for ((name . case) . body) = (cons (mklist (car i))
                                                   (cdr i))
                when (typep i '(or (cons (eql quote)
                                         t)
                                   (cons (cons (eql quote) t)
                                         t)))
                  do (warn "~S is a quoted form" i)
                collect `(when ,(if (null name)
                                    't
                                    `(eql ',name (car ,msg-name)))
                           (destructuring-bind ,case
                               (cdr ,msg-name)
                             (return-from ,block-name
                               (progn ,@body)))))
        (error "Unknown message: ~S" ,msg-name)))))

(defmacro receive-one-message (message &body body)

 `(receive-message (,message . ,body)))

(defun queue () (make-instance 'queue))</lang>

Should be easy from now on:

<lang lisp>(defun reader (pathname writer)

 (with-open-file (stream pathname)
   (loop for line = (read-line stream nil)
         while line
         do (message writer '|here's a line for you| line) 
         finally
      (message writer '|how many lines?|)
      (receive-one-message (|line count| count)
         (format t "line count: ~D~%" count))
      (message writer '|looks like i've got no more lines|))))

(defun writer (stream reader)

 ;; that would work better with ITERATE
 (loop with line-count = 0 do
   (receive-message
    ((|here's a line for you| line)
     (write-line line stream)
     (incf line-count))
    (|looks like i've got no more lines|
     (return))
    (|how many lines?|
     (message reader '|line count| line-count)))))

(defmacro thread (queue &body body)

 `(make-thread (lambda (&aux (*self* ,queue))
                 ,@body)))

(defun synchronous-concurrency (&key (pathname "input.txt"))

 (let ((reader (queue))
       (writer (queue)))
   (thread reader (reader pathname writer))
   (thread writer (writer *standard-output* reader)))
 (values))</lang>

And now an example:

<lang lisp>CL-USER> (synchronous-concurrency :pathname "/tmp/input.txt") foo bar baz xenu 666 line count: 4

No value</lang>

Note that to run the example from the SLIME REPL you need to put:

 (setq swank:*globally-redirect-io* t)

in your ~/.swank.lisp

D

Library: tools

<lang d>import tools.threads, std.stdio, std.stream, std.thread;

void main() {

 // line or EOF
 struct InputLine {
   string data;
   bool eof;
   static InputLine opCall(string s) { InputLine res; res.data = s; return res; }
   static InputLine EOF() { InputLine res; res.eof = true; return res; }
 }
 auto LineCh = new MessageChannel!(InputLine),
   ResultCh = new MessageChannel!(int);
 auto printer = new Thread({
   int count;
   while (true) {
     auto line = LineCh.get();
     if (line.eof) break;
     count ++;
     writefln(count, ": ", line.data);
   }
   ResultCh.put(count);
   return 0;
 });
 printer.start;
 auto file = new File("input.txt");
 while (!file.eof()) {
   auto line = file.readLine();
   LineCh.put(InputLine(line));
 }
 LineCh.put(InputLine.EOF());
 writefln("Count: ", ResultCh.get());

} </lang>

E

def printer := {
    var count := 0
    def printer {
        to run(item) {
            count += 1
            println(item)
        }
        to getCount() {
            return count 
        }
    }
}
def sender(lines) {
    switch (lines) {
        match [] {
            when (def count := printer <- getCount()) -> {
                println(`$count lines were printed.`)
            }
        }
        match [line] + rest {
            when (printer <- run(line)) -> {
                sender(rest)
            }
        }
    }
}
# Stream IO in E is not finished yet, so this example just uses a list.
sender(<file:input.txt>.getText().split("\n"))

Erlang

-module(cc).
-export([start/0, reader/2]).

start() ->
   Pid = spawn(cc,reader,[self(), 0]),
   case file:open("input.txt", read) of
       {error, Any} -> io:fwrite("Error ~p~n",[Any]);
       {ok, Io} ->
           process(Io, Pid),
           file:close(Io)
   end,
   ok.

process(Io, Pid) ->
   case io:get_line(Io,"") of
       eof ->
           Pid ! count,
           wait();
       Any ->
           Pid ! Any,
           process(Io, Pid)
   end.

wait() ->
   receive
       I -> io:fwrite("Count:~p~n", [I])
   end.

reader(Pid, C) ->
   receive
       count -> Pid ! C;
       Any ->
           io:fwrite("~s", [Any]),
           reader(Pid, C+1)
   end.

Haskell

The following Haskell code uses simple MVars for thread communication. While the GHC libraries for concurrency give quite a wide design space for thread communication, I felt that the following was fairly reasonable.

For those who are unaware of MVars, they are essentially mutable cells which may be empty or hold a single value, and which have the following important properties:

  • takeMVar will get the contents of an MVar when it is full, emptying it.
  • takeMVar will block if the MVar is empty, until it has been filled by another thread.
  • putMVar will fill an empty MVar with a given value.
  • putMVar will block until the MVar is empty if it is full.

So MVars are essentially bounded channels which hold a maximum of one element at a time.

The code below defines various signals in terms of takeMVar and putMVar and then passes those to the parts of the code which should be permitted to use them. Note that this way, it is impossible for the reader process to take the current line, for example.

import Control.Concurrent
import Control.Concurrent.MVar

main =
    do lineVar <- newEmptyMVar
       countVar <- newEmptyMVar

       let takeLine  = takeMVar lineVar
           putLine   = putMVar lineVar . Just
           putEOF    = putMVar lineVar Nothing
           takeCount = takeMVar countVar
           putCount  = putMVar countVar

       forkIO $ writer takeLine putCount
       reader putLine putEOF takeCount

The reader simply reads the file lazily, applying putLine to each of the lines in turn, which blocks until the writer has taken the line. It then signals that it is finished with putEOF, and then takes the count and prints it.

reader putLine putEOF takeCount =
    do ls <- fmap lines (readFile "input.txt")
       mapM putLine ls
       putEOF
       n <- takeCount
       print n

The writer gets the lines in a loop with takeLine until it receives Nothing, at which point it uses putCount to tell the reader how many lines there were.

writer takeLine putCount = loop 0
  where loop n = do l <- takeLine
                    case l of 
                       Just x  -> do putStrLn x
                                     loop (n+1)
                       Nothing -> putCount n

Icon

Using Co-routines

procedure main()
   local prod, cons
   prod := create producer("input.txt")
   cons := create consumer(prod)
   @cons
end

procedure producer(fname)
   local f
   f := open(fname) | stop("Unable to open ", fname)
   # send what we read [read(f)] to the consumer (&source)
   while read(f) @ &source
   # send it 'null' which we use as a signal to request count
   write("count = ", &null @ &source)
end

procedure consumer(p)
   local value, i
   i := 1
   value := @p
   while \value do {
      write("=> ",value)
      value := @ &source
      i := i + 1
   }
   # send producer our count
   i @ &source
end

OCaml

Using only the standard library

We use the built-in Event module to provide communication channels between threads. <lang ocaml> open Event </lang>

The reader is a simple loop. It starts by opening a file, then reads lines from that file until there is nothing left to read. After each line, it sends Some v on channel lines_dest, where v is the contents of the line. Once there are no lines anymore, exception End_of_file is raised and we send None on that same channel. After that, it's just the matter of waiting for one message on count_source, closing the file and printing the result: <lang ocaml> let reader count_source count_source =

 let file = open_in "input.txt" in
 let rec aux () =
   let line = try  Some (input_line file)
              with End_of_file -> None    in
     sync (send count_source line); 
     match line with

| Some _ -> aux () | None -> let printed = sync (receive count_source) in Printf.printf "The task wrote %i strings\n" printed; close_in file

 in aux ()

</lang>

The printer is also a simple loop. It keeps receiving messages on lines_source. If a message has structure Some v, then v is a line, print it and increment the counter. Otherwise, the message has structure None, which means that we're done, just send the number of lines on count_dest: <lang ocaml> let printer lines_source count_target =

 let rec aux i =
   match sync (receive lines_source) with
     | Some line -> print_endline line; aux ( i + 1 )
     | None      -> sync (send count_target i)
 in aux 0

</lang>

Finally, our main program creates both communication channels and backgrounds treatment of printer: <lang ocaml> let _ =

 let count = new_channel ()
 and lines = new_channel ()
 in
 let _ = Thread.create (printer lines) count
 in reader count lines

</lang>

Note that, had we decided to background treatment of reader instead, an additional synchronization would have been necessary to prevent the program from leaving once the main thread is over.

Perl

<lang perl>use threads; use Thread::Queue qw();

my $q1 = Thread::Queue->new; my $q2 = Thread::Queue->new;

my $reader = threads->create(sub {

    my $q1 = shift;
    my $q2 = shift;

    open my $fh, '<', 'input.txt';
    $q1->enqueue($_) while <$fh>;
    close $fh;
    $q1->enqueue(undef);

    print $q2->dequeue;

}, $q1, $q2);

my $printer = threads->create(sub {

    my $q1 = shift;
    my $q2 = shift;

    my $count;
    while (my $line = $q1->dequeue) {
        print $line;
        $count++;
    };

    $q2->enqueue($count);

}, $q1, $q2);

$reader->join; $printer->join;</lang>

Python

Notes: instead of hardcoding the input and output files in the units, each unit is created with a file and read or write the given file.

<lang python> import sys from Queue import Queue from threading import Thread

lines = Queue(1) count = Queue(1)

def read(file):

   try:
       for line in file:
           lines.put(line)
   finally:
       lines.put(None)
   print count.get()

def write(file):

   n = 0
   while 1:
       line = lines.get()
       if line is None:
           break
       file.write(line)
       n += 1
   count.put(n)

reader = Thread(target=read, args=(open('input.txt'),)) writer = Thread(target=write, args=(sys.stdout,)) reader.start() writer.start() reader.join() writer.join() </lang>

Raven

'input.txt' as src_file

class Queue

    new list  as items
    condition as ready

    define item_put
        items push ready notify

    define item_get
        items empty if ready wait
        items shift

Queue as lines
Queue as count

thread reader
    "file://r:%(src_file)s" open each lines.item_put
    NULL lines.item_put count.item_get "reader: %d\n" print

thread writer
    0 repeat lines.item_get dup while
        "writer: %s" print 1+
    drop count.item_put

reader as r
writer as w

Scala

A possible implementation using Actors

<lang scala> case class HowMany(asker:Actor)

val printer = actor {

 var count = 0
   while(true){
     receive{

case line:String => print(line); count = count + 1

       case HowMany(asker:Actor) => asker ! count ; exit()
     }
   }

}

def reader(printer:Actor) {

 scala.io.Source.fromFile("c:\\input.txt").getLines foreach {printer ! _ }
 printer ! HowMany(
   actor {
     receive{

case count:Int => println("line count = "+count)

     }
   })

}

reader(printer) </lang>

Tcl

Uses the Thread package. <lang tcl>package require Thread

  1. Define the input thread

set input [thread::create {

   proc readFile {filename receiver} {

set f [open $filename] while {[gets $f line] >= 0} { thread::send $receiver [list line $line] } close $f thread::send $receiver lineCount lines puts "got $lines lines"

   }
   thread::wait

}]

  1. Define the output thread

set output [thread::create {

   set lines 0
   proc line {string} {

puts $string incr ::lines

   }
   proc lineCount {} {return $::lines}
   thread::wait

}]

  1. Connect everything together and start the processing

thread::send $input [list readFile "input.txt" $output]</lang>

UnixPipes

This task has been flagged for clarification. Code on this page in its current state may be flagged incorrect once this task has been clarified. See this page's Talk page for discussion.

the main process is the one started by cat file. The subshell created by >(...) is the secondary process the main pipeline waits for the secondary process to finish, collects and prints the count. This falls a-foul of the requirement that fork should not be used as every thing in pipes is done using forks.

rm -f node ; mkfifo node
cat file | tee >(wc -l > node ) | cat - node

Visual Basic .NET

This can be improved by adding a blocking Dequeue instead of spinning on TryDequeue.

Imports System.Threading

Module Module1

   Sub Main()
       Dim t1 As New Thread(AddressOf Reader)
       Dim t2 As New Thread(AddressOf Writer)
       t1.Start()
       t2.Start()
       t1.Join()
       t2.Join()
   End Sub

   Sub Reader()
       For Each line In IO.File.ReadAllLines("input.txt")
           m_WriterQueue.Enqueue(line)
       Next
       m_WriterQueue.Enqueue(Nothing)

       Dim result As Integer
       Do Until m_ReaderQueue.TryDequeue(result)
           Thread.Sleep(10)
       Loop

       Console.WriteLine(result)

   End Sub

   Sub Writer()
       Dim count = 0
       Dim line As String = Nothing
       Do
           Do Until m_WriterQueue.TryDequeue(line)
               Thread.Sleep(10)
           Loop
           If line IsNot Nothing Then
               Console.WriteLine(line)
               count += 1
           Else
               m_ReaderQueue.Enqueue(count)
               Exit Do
           End If
       Loop
   End Sub

   Private m_WriterQueue As New SafeQueue(Of String)
   Private m_ReaderQueue As New SafeQueue(Of Integer)

End Module
Class SafeQueue(Of T)
   Private m_list As New Queue(Of T)
   Public Function TryDequeue(ByRef result As T) As Boolean
       SyncLock m_list
           If m_list.Count = 0 Then Return False
           result = m_list.Dequeue
           Return True
       End SyncLock
   End Function
   Public Sub Enqueue(ByVal value As T)
       SyncLock m_list
           m_list.Enqueue(value)
       End SyncLock
   End Sub
End Class