Synchronous concurrency: Difference between revisions
solution in Racket |
|||
Line 1,486: | Line 1,486: | ||
=={{header|Racket}}== |
=={{header|Racket}}== |
||
<lang racket> |
<lang racket> |
||
(define reader->printer-channel (make-channel)) |
(define reader->printer-channel (make-channel)) |
||
(define printer->reader-channel (make-channel)) |
(define printer->reader-channel (make-channel)) |
||
Line 1,516: | Line 1,515: | ||
(sync-line-counter "input.txt") |
(sync-line-counter "input.txt") |
||
</lang> |
</lang> |
||
=={{header|Raven}}== |
=={{header|Raven}}== |
||
<lang raven>'input.txt' as src_file |
<lang raven>'input.txt' as src_file |
Revision as of 15:58, 30 July 2012
You are encouraged to solve this task according to the task description, using any language you may know.
The goal of this task is to create two concurrent activities ("Threads" or "Tasks", not processes.) that share data synchronously. Your language may provide syntax or libraries to perform concurrency. Different languages provide different implementations of concurrency, often with different names. Some languages use the term threads, others use the term tasks, while others use co-processes. This task should not be implemented using fork, spawn, or the Linux/UNIX/Win32 pipe command, as communication should be between threads, not processes.
One of the concurrent units will read from a file named "input.txt" and send the contents of that file, one line at a time, to the other concurrent unit, which will print the line it receives to standard output. The printing unit must count the number of lines it prints. After the concurrent unit reading the file sends its last line to the printing unit, the reading unit will request the number of lines printed by the printing unit. The reading unit will then print the number of lines printed by the printing unit.
This task requires two-way communication between the concurrent units. All concurrent units must cleanly terminate at the end of the program.
Ada
This Ada example starts by creating a package defining a single instance of a printing task. Ada requires packages to be separated into two parts. The package specification defines the interface to all public members of the package. <lang Ada>package Synchronous_Concurrent is
task Printer is entry Put(Item : in String); entry Get_Count(Count : out Natural); end Printer;
end Synchronous_Concurrent;</lang> The package body contains the implementation of all the subprograms and tasks defined in the specification. <lang Ada>with Ada.Text_Io; use Ada.Text_Io; with Ada.Strings.Unbounded; use Ada.Strings.Unbounded;
package body Synchronous_Concurrent is
task body Printer is Num_Iter : Natural := 0; Line : Unbounded_String; begin loop select accept Put(Item : in String) do Line := To_Unbounded_String(Item); end Put; Put_Line(To_String(Line)); Num_Iter := Num_Iter + 1; or accept Get_Count(Count : out Natural) do Count := Num_Iter; end Get_Count; or terminate; end select; end loop; end Printer;
end Synchronous_Concurrent;</lang> Note that the task body contains an accept block for each entry defined in the task specification. When some other task calls an entry in the Printer task the communication between the tasks is synchronized.
This example uses an infinite loop in the printer task. There is no way to know ahead of time how many lines the printer task will need to print. Each iteration through the loop causes the task to execute a selective accept. That means that it can either accept a call on the Put entry, or it can accept a call on the Get_Count entry. The terminate option is execute only when the program contains no more tasks that can call the entries in the Printer task. If no task has called either entry the Printer task will wait for a task to call one of the entries, or for the terminate option to apply.
The next file contains the main procedure for this program. The main or entry-point procedure for a program always runs in the environment task. For this program, the environment task is takes on the role of the file reading concurrent unit while the Printer task takes on the role of the printing concurrent unit. <lang Ada>with Synchronous_Concurrent; use Synchronous_Concurrent; with Ada.Text_Io; use Ada.Text_Io;
procedure Synchronous_Concurrent_Main is
Num_Strings : Natural; The_File : File_Type; Line : String(1..255); Length : Natural;
begin
Open(File => The_File, Mode => In_File, Name => "input.txt"); while not End_Of_File(The_File) loop Get_Line(File => The_File, Item => Line, Last => Length); Printer.Put(Line(1..Length)); end loop; Close(The_File); Printer.Get_Count(Num_Strings); New_Line; Put_Line("The task wrote" & Natural'Image(Num_Strings) & " strings.");
end Synchronous_Concurrent_Main;</lang> In this example only the environment task can call the entries on the Printer task. When the environment task completes the terminate option of the Printer task applies, terminating the Printer task. The environment task completes right after printing the number of lines sent through the Printer task. Because of the terminate option, the Printer task terminates just after the environment task prints the count.
Aikido
Aikido supports threads and monitors natively (built in to the language). There is also support for closures, but this example will use threads: <lang aikido> monitor Queue {
var items = [] public function put (item) { items.append (item) notify() }
public function get() { while (items.size() == 0) { wait() } var item = items[0] items <<= 1 return item }
public function close { items.append (none) }
}
thread reader (queue) {
var numlines = 0 for (;;) { var line = queue.get() if (typeof(line) == "none") { break } print (line) numlines++ } println ("Number of lines: " + numlines)
}
thread writer (queue, lines) {
foreach line lines { queue.put (line) } queue.close()
}
var queue = new Queue() var lines = readfile ("input.txt") var r = reader(queue) var w = writer(queue, lines)
join (r) join (w)</lang>
ALGOL 68
<lang algol68>(
STRING line; INT count := 0, errno; BOOL input complete := FALSE; SEMA output throttle = LEVEL 0, input throttle = LEVEL 1;
�
FILE input txt; errno := open(input txt, "input.txt", stand in channel);
�
PROC call back done = (REF FILE f) BOOL: ( input complete := TRUE ); on logical file end(input txt, call back done);
�
PAR ( WHILE DOWN input throttle; get(input txt,(line, new line)); UP output throttle; NOT input complete DO count+:=1 OD , WHILE DOWN output throttle; NOT input complete DO print((line, new line)); UP input throttle OD ); print((count))
)</lang>
BCPL
<lang BCPL>// This is a BCPL implementation of the Rosettacode synchronous // concurrency test using BCPL coroutines and a coroutine implementation // of a Occum-style channels. // BCPL is freely available from www.cl.cam.ac.uk/users/mr10
SECTION "coinout"
GET "libhdr.h"
GLOBAL {
tracing: ug
}
LET start() = VALOF { LET argv = VEC 50
LET in_co, out_co = 0, 0 LET channel = 0 LET filename = "input.txt"
UNLESS rdargs("-f,-t/S", argv, 50) DO { writef("Bad arguments for coinout*n") RESULTIS 0 }
IF argv!0 DO filename := argv!0 // -f the source file tracing := argv!1 // -t/S tracing option
in_co := initco(infn, 500, @channel) out_co := initco(outfn, 500, @channel)
UNLESS in_co & out_co DO { writef("Trouble creating the coroutines*n") GOTO fin }
IF tracing DO writef("*nBoth in and out coroutines created*n*n")
callco(in_co, filename)
fin:
IF in_co DO deleteco(in_co) IF out_co DO deleteco(out_co)
IF tracing DO writef("Both in and out coroutines deleted*n*n")
RESULTIS 0
}
AND readline(line) = VALOF { LET ch, i = 0, 0
line%0 := 0
{ ch := rdch() IF ch=endstreamch RESULTIS FALSE i := i+1 line%0, line%i := i, ch IF ch='*n' | i=255 BREAK } REPEAT
RESULTIS TRUE
}
AND infn(args) BE { LET channelptr = args!0
LET name = cowait() // Get the file name LET instream = findinput(name) LET line = VEC 256/bytesperword
UNLESS instream DO { writef("*nTrouble with file: %s*n", name) RETURN }
selectinput(instream)
{ LET ok = readline(line) UNLESS ok BREAK IF tracing DO writef("inco: Sending a line to outco*n") cowrite(channelptr, line) } REPEAT
IF tracing DO writef("inco: Sending zero to outco*n")
writef("*nNumber of lines written was %n*n", cowrite(channelptr, 0))
endstream(instream)
}
AND outfn(args) BE { LET channelptr = args!0
LET linecount = 0
{ LET line = coread(channelptr) UNLESS line BREAK IF tracing DO writef("outfn: Received a line*n") writes(line) linecount := linecount + 1 } REPEAT
IF tracing DO writef("outfn: Received zero, so sent count=%n back to inco*n", linecount)
cowait(linecount)
}
// The following functions are a implementation of Occum-style channels // using coroutines.
// The first coroutine to request a transfer through a channel becomes // suspended and the second causes the data to be transfers and then allows // both coroutines to resume (in some order). The channel word is either // zero or points to a suspended (read or write) cocoutine.
// The use of resumeco in coread is somewhat subtle!
AND coread(ptr) = VALOF { LET cptr = !ptr
TEST cptr THEN { !ptr := 0 // Clear the channel word RESULTIS resumeco(cptr, currco) } ELSE { !ptr := currco // Set channel word to this coroutine RESULTIS cowait() // Wait for value from cowrite }
}
AND cowrite(ptr, val) BE { LET cptr = !ptr
TEST cptr THEN { !ptr := 0 callco(cptr, val) // Send val to coread } ELSE { !ptr := currco callco(cowait(), val) }
} </lang>
C
<lang c>#include <stdlib.h> /* malloc(), realloc(), free() */
- include <stdio.h> /* fopen(), fgetc(), fwrite(), printf() */
- include <libco.h> /* co_create(), co_switch() */
void fail(const char *message) { perror(message); exit(1); }
/*
* These are global variables of this process. All cothreads of this * process will share these global variables. */
cothread_t reader; cothread_t printer; struct { char *buf; /* Not a C string. No terminating '\0'. */ size_t len; /* Length of line in buffer. */ size_t cap; /* Maximum capacity of buffer. */ } line; size_t count; /* Number of lines printed. */
/*
* The reader cothread reads every line of an input file, passes each * line to the printer cothread, and reports the number of lines. */
void reader_entry(void) { FILE *input; size_t newcap; int c, eof, eol; char *newbuf;
input = fopen("input.txt", "r"); if (input == NULL) fail("fopen");
line.buf = malloc(line.cap = 4096); /* New buffer. */ if (line.buf == NULL) fail("malloc"); line.len = 0; /* Start with an empty line. */
do { c = fgetc(input); /* Read next character. */ if (ferror(input)) fail("fgetc");
eof = (c == EOF); if (eof) { /* * End of file is also end of line, ` * unless the line would be empty. */ eol = (line.len > 0); } else { /* Append c to the buffer. */ if (line.len == line.cap) { /* Need a bigger buffer! */ newcap = line.cap * 2; newbuf = realloc(line.buf, newcap); if (newbuf == NULL) fail("realloc"); line.buf = newbuf; line.cap = newcap; } line.buf[line.len++] = c;
/* '\n' is end of line. */ eol = (c == '\n'); }
if (eol) { /* Pass our complete line to the printer. */ co_switch(printer); line.len = 0; /* Destroy our line. */ } } while (!eof);
free(line.buf); line.buf = NULL; /* Stops a loop in the printer. */
printf("Printed %zu lines.\n", count); co_switch(printer); }
/*
* The printer cothread starts the reader cothread, prints every line * line from the reader cothread, and counts the number of lines. */
int main() { reader = co_create(4096, reader_entry); printer = co_active(); count = 0;
for (;;) { co_switch(reader); if (line.buf == NULL) break;
/* Print this line. Count it. */ fwrite(line.buf, 1, line.len, stdout); count++; }
co_delete(reader); return 0; }</lang>
Clojure
The writer executes as an agent on a thread from a thread pool. The state of the agent is the count of written lines, initialized to 0. The reader will send the writer calls to the write-line function, which the agent will execute asynchronously. The state argument is the agent's state at the start of the call, and the last expression becomes the agent's new state.
<lang clojure>(use '[clojure.java.io :as io])
(def writer (agent 0))
(defn write-line [state line]
(println line) (inc state))</lang>
The reader executes on the main thread. It passes each line to the writer -- the send call returns immediately, waits until the writer has finished writing all the lines, gets the line count & prints it, and terminates the writer.
<lang clojure>(with-open [r (io/reader "input.txt")]
(doseq [line (line-seq r)] (send writer write-line line)))
(await writer) (println "lines written:" @writer) (shutdown-agents)</lang> That's it!
Common Lisp
First, implement message-passing:
<lang lisp>(defvar *self*)
(defclass queue ()
((condition :initform (make-condition-variable) :reader condition-of) (mailbox :initform '() :accessor mailbox-of) (lock :initform (make-lock) :reader lock-of)))
(defun message (recipient name &rest message)
(with-lock-held ((lock-of recipient)) ;; it would have been better to implement tail-consing or a LIFO (setf (mailbox-of recipient) (nconc (mailbox-of recipient) (list (list* name message)))) (condition-notify (condition-of recipient))) message)
(defun mklist (x)
(if (listp x) x (list x)))
(defun slurp-message ()
(with-lock-held ((lock-of *self*)) (if (not (endp (mailbox-of *self*))) (pop (mailbox-of *self*)) (progn (condition-wait (condition-of *self*) (lock-of *self*)) (assert (not (endp (mailbox-of *self*)))) (pop (mailbox-of *self*))))))
(defmacro receive-message (&body cases)
(let ((msg-name (gensym "MESSAGE")) (block-name (gensym "BLOCK"))) `(let ((,msg-name (slurp-message))) (block ,block-name ,@(loop for i in cases for ((name . case) . body) = (cons (mklist (car i)) (cdr i)) when (typep i '(or (cons (eql quote) t) (cons (cons (eql quote) t) t))) do (warn "~S is a quoted form" i) collect `(when ,(if (null name) 't `(eql ',name (car ,msg-name))) (destructuring-bind ,case (cdr ,msg-name) (return-from ,block-name (progn ,@body))))) (error "Unknown message: ~S" ,msg-name)))))
(defmacro receive-one-message (message &body body)
`(receive-message (,message . ,body)))
(defun queue () (make-instance 'queue))</lang>
Should be easy from now on:
<lang lisp>(defun reader (pathname writer)
(with-open-file (stream pathname) (loop for line = (read-line stream nil) while line do (message writer '|here's a line for you| line) finally (message writer '|how many lines?|) (receive-one-message (|line count| count) (format t "line count: ~D~%" count)) (message writer '|looks like i've got no more lines|))))
(defun writer (stream reader)
;; that would work better with ITERATE (loop with line-count = 0 do (receive-message ((|here's a line for you| line) (write-line line stream) (incf line-count)) (|looks like i've got no more lines| (return)) (|how many lines?| (message reader '|line count| line-count)))))
(defmacro thread (queue &body body)
`(make-thread (lambda (&aux (*self* ,queue)) ,@body)))
(defun synchronous-concurrency (&key (pathname "input.txt"))
(let ((reader (queue)) (writer (queue))) (thread reader (reader pathname writer)) (thread writer (writer *standard-output* reader))) (values))</lang>
And now an example:
<lang lisp>CL-USER> (synchronous-concurrency :pathname "/tmp/input.txt") foo bar baz xenu 666 line count: 4
- No value</lang>
Note that to run the example from the SLIME REPL you need to put:
(setq swank:*globally-redirect-io* t)
in your ~/.swank.lisp
D
<lang d>import tools.threads, std.stdio, std.stream, std.thread;
void main() {
// line or EOF struct InputLine { string data; bool eof; static InputLine opCall(string s) { InputLine res; res.data = s; return res; } static InputLine EOF() { InputLine res; res.eof = true; return res; } } auto LineCh = new MessageChannel!(InputLine), ResultCh = new MessageChannel!(int); auto printer = new Thread({ int count; while (true) { auto line = LineCh.get(); if (line.eof) break; count ++; writefln(count, ": ", line.data); } ResultCh.put(count); return 0; }); printer.start; auto file = new File("input.txt"); while (!file.eof()) { auto line = file.readLine(); LineCh.put(InputLine(line)); } LineCh.put(InputLine.EOF()); writefln("Count: ", ResultCh.get());
}</lang>
Delphi
<lang Delphi> program Project2;
{$APPTYPE CONSOLE}
uses
SysUtils, Classes, Windows;
type
EThreadStackFinalized = class(Exception);
PLine = ^TLine; TLine = record Text: string; end;
TThreadQueue = class private FFinalized: Boolean; FQueue: THandle; public constructor Create; destructor Destroy; override; procedure Finalize; procedure Push(Data: Pointer); function Pop(var Data: Pointer): Boolean; property Finalized: Boolean read FFinalized; end;
TPrintThread = class(TThread) private FCount: Integer; FTreminateEvent: THandle; FDoneEvent: THandle; FQueue: TThreadQueue; public constructor Create(aTreminateEvent, aDoneEvent: THandle; aQueue: TThreadQueue); procedure Execute; override;
property Count: Integer read FCount; end;
{ TThreadQueue }
constructor TThreadQueue.Create; begin
FQueue := CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); FFinalized := False;
end;
destructor TThreadQueue.Destroy; begin
if FQueue <> 0 then CloseHandle(FQueue); inherited;
end;
procedure TThreadQueue.Finalize; begin
PostQueuedCompletionStatus(FQueue, 0, 0, Pointer($FFFFFFFF)); FFinalized := True;
end;
function TThreadQueue.Pop(var Data: Pointer): Boolean; var
A: Cardinal; OL: POverLapped;
begin
Result := True; if not FFinalized then GetQueuedCompletionStatus(FQueue, A, Cardinal(Data), OL, INFINITE);
if FFinalized or (OL = Pointer($FFFFFFFF)) then begin Data := nil; Result := False; Finalize; end;
end;
procedure TThreadQueue.Push(Data: Pointer); begin
if FFinalized then raise EThreadStackFinalized.Create('Stack is finalized');
PostQueuedCompletionStatus(FQueue, 0, Cardinal(Data), nil);
end;
{ TPrintThread }
constructor TPrintThread.Create(aTreminateEvent, aDoneEvent: THandle; aQueue: TThreadQueue); begin
inherited Create(True); FCount := 0; FreeOnTerminate := True; FTreminateEvent := aTreminateEvent; FDoneEvent := aDoneEvent; FQueue := aQueue;
end;
procedure TPrintThread.Execute; var
data: Pointer; line: PLine;
begin
repeat if FQueue.Pop(data) then begin line := data; try Writeln(line^.Text); if line^.Text = #0 then SetEvent(FDoneEvent); Inc(FCount); finally Dispose(line); end; end;
until False; WaitForSingleObject(FTreminateEvent, INFINITE);
end;
var
PrintThread: TPrintThread; Queue: TThreadQueue; lines: TStrings; i: Integer; line: PLine; TreminateEvent, DoneEvent: THandle;
begin
Queue := TThreadQueue.Create; try TreminateEvent := CreateEvent(nil, False, False, 'TERMINATE_EVENT'); DoneEvent := CreateEvent(nil, False, False, 'DONE_EVENT'); try PrintThread := TPrintThread.Create(TreminateEvent, DoneEvent, Queue); PrintThread.Start; lines := TStringList.Create; try lines.LoadFromFile('input.txt'); for i := 0 to lines.Count - 1 do begin New(line); line^.Text := lines[i]; Queue.Push(line); end;
New(line); line^.Text := #0; Queue.Push(line);
WaitForSingleObject(DoneEvent, INFINITE);
New(line); line^.Text := IntToStr(PrintThread.Count); Queue.Push(line);
SetEvent(TreminateEvent); finally lines.Free; end; finally CloseHandle(TreminateEvent); CloseHandle(DoneEvent) end;
Readln; finally Queue.Free; end;
end. </lang>
E
<lang e>def printer := {
var count := 0 def printer { to run(item) { count += 1 println(item) } to getCount() { return count } }
}
def sender(lines) {
switch (lines) { match [] { when (def count := printer <- getCount()) -> { println(`$count lines were printed.`) } } match [line] + rest { when (printer <- run(line)) -> { sender(rest) } } }
}
- Stream IO in E is not finished yet, so this example just uses a list.
sender(<file:input.txt>.getText().split("\n"))</lang>
Erlang
<lang erlang>-module(cc). -export([start/0, reader/2]).
start() ->
Pid = spawn(cc,reader,[self(), 0]), case file:open("input.txt", read) of {error, Any} -> io:fwrite("Error ~p~n",[Any]); {ok, Io} -> process(Io, Pid), file:close(Io) end, ok.
process(Io, Pid) ->
case io:get_line(Io,"") of eof -> Pid ! count, wait(); Any -> Pid ! Any, process(Io, Pid) end.
wait() ->
receive I -> io:fwrite("Count:~p~n", [I]) end.
reader(Pid, C) ->
receive count -> Pid ! C; Any -> io:fwrite("~s", [Any]), reader(Pid, C+1) end.</lang>
Euphoria
<lang euphoria>sequence lines sequence count lines = {} count = {}
procedure read(integer fn)
object line while 1 do line = gets(fn) if atom(line) then exit else lines = append(lines, line) task_yield() end if end while lines = append(lines,0) while length(count) = 0 do task_yield() end while printf(1,"Count: %d\n",count[1])
end procedure
procedure write(integer fn)
integer n object line n = 0 while 1 do while length(lines) = 0 do task_yield() end while line = lines[1] lines = lines[2..$] if atom(line) then exit else puts(fn,line) n += 1 end if end while count = append(count,n)
end procedure
integer fn atom reader, writer constant stdout = 1 fn = open("input.txt","r") reader = task_create(routine_id("read"),{fn}) writer = task_create(routine_id("write"),{stdout}) task_schedule(writer,1) task_schedule(reader,1)
while length(task_list()) > 1 do
task_yield()
end while</lang>
F#
This code will read lines from the file on one thread, and print them to the console on one or more other threads from the ThreadPool, using a MailboxProcessor for lock-free communication between threads and tracking the line count without the use of mutable state.
<lang fsharp> open System.IO
type Msg =
| PrintLine of string | GetCount of AsyncReplyChannel<int>
let printer =
MailboxProcessor.Start(fun inbox -> let rec loop count = async { let! msg = inbox.Receive() match msg with | PrintLine(s) -> printfn "%s" s return! loop (count + 1) | GetCount(reply) -> reply.Reply(count) return! loop count } loop 0 )
let reader (printAgent:MailboxProcessor<Msg>) file =
File.ReadLines(file) |> Seq.iter (fun line -> PrintLine line |> printAgent.Post) printAgent.PostAndReply(fun reply -> GetCount(reply)) |> printfn "Lines written: %i"
reader printer @"c:\temp\input.txt" </lang>
Go
<lang go>package main
import (
"fmt" "bufio" "io" "os"
)
// main, one of two goroutines used, will function as the "reading unit" func main() {
// get file open first f, err := os.Open("input.txt") if err != nil { fmt.Println(err) return } defer f.Close() lr := bufio.NewReader(f)
// that went ok, now create communication channels, // and start second goroutine as the "printing unit" lines := make(chan string) count := make(chan int) go printer(lines, count)
for { line, prefix, err := lr.ReadLine() switch { case err == io.EOF: case err != nil: fmt.Println(err) case prefix: fmt.Println("unexpected long line") default: lines <- string(line) continue } break } // this represents the request for the printer to send the count close(lines) // wait for the count from the printer, then print it, then exit fmt.Println("Number of lines:", <-count)
}
func printer(in <-chan string, count chan<- int) {
c := 0 // loop as long as in channel stays open for s := range in { fmt.Println(s) c++ } // make count available on count channel, then return (terminate goroutine) count <- c
}</lang>
Haskell
The following Haskell code uses simple MVars for thread communication. While the GHC libraries for concurrency give quite a wide design space for thread communication, I felt that the following was fairly reasonable.
For those who are unaware of MVars, they are essentially mutable cells which may be empty or hold a single value, and which have the following important properties:
- takeMVar will get the contents of an MVar when it is full, emptying it.
- takeMVar will block if the MVar is empty, until it has been filled by another thread.
- putMVar will fill an empty MVar with a given value.
- putMVar will block until the MVar is empty if it is full.
So MVars are essentially bounded channels which hold a maximum of one element at a time.
The code below defines various signals in terms of takeMVar and putMVar and then passes those to the parts of the code which should be permitted to use them. Note that this way, it is impossible for the reader process to take the current line, for example.
<lang haskell>import Control.Concurrent import Control.Concurrent.MVar
main =
do lineVar <- newEmptyMVar countVar <- newEmptyMVar
let takeLine = takeMVar lineVar putLine = putMVar lineVar . Just putEOF = putMVar lineVar Nothing takeCount = takeMVar countVar putCount = putMVar countVar
forkIO $ writer takeLine putCount reader putLine putEOF takeCount</lang>
The reader simply reads the file lazily, applying putLine to each of the lines in turn, which blocks until the writer has taken the line. It then signals that it is finished with putEOF, and then takes the count and prints it.
<lang haskell>reader putLine putEOF takeCount =
do ls <- fmap lines (readFile "input.txt") mapM putLine ls putEOF n <- takeCount print n</lang>
The writer gets the lines in a loop with takeLine until it receives Nothing, at which point it uses putCount to tell the reader how many lines there were.
<lang haskell>writer takeLine putCount = loop 0
where loop n = do l <- takeLine case l of Just x -> do putStrLn x loop (n+1) Nothing -> putCount n</lang>
Icon and Unicon
Using Co-routines <lang icon>procedure main()
local prod, cons prod := create producer("input.txt") cons := create consumer(prod) @cons
end
procedure producer(fname)
local f f := open(fname) | stop("Unable to open ", fname) # send what we read [read(f)] to the consumer (&source) while read(f) @ &source # send it 'null' which we use as a signal to request count write("count = ", &null @ &source)
end
procedure consumer(p)
local value, i i := 1 value := @p while \value do { write("=> ",value) value := @ &source i := i + 1 } # send producer our count i @ &source
end</lang>
Java
<lang java>import java.io.BufferedReader; import java.io.FileReader; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
class SynchronousConcurrency {
public static void main(String[] args) throws Exception { final AtomicLong lineCount = new AtomicLong(0); final BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); final String EOF = new String(); final Thread writerThread = new Thread(new Runnable() { public void run() { long linesWrote = 0; while (true) { try { String line = queue.take(); // Reference equality if (line == EOF) break; System.out.println(line); linesWrote++; } catch (InterruptedException ie) { } } lineCount.set(linesWrote); } } ); writerThread.start(); // No need to start a third thread for the reader, just use this thread BufferedReader br = new BufferedReader(new FileReader("input.txt")); String line; while ((line = br.readLine()) != null) queue.put(line); br.close(); queue.put(EOF); writerThread.join(); // AtomicLong is not needed here due to memory barrier created by thread join, but still need a mutable long since lineCount must be final to access it from an anonymous class System.out.println("Line count: " + lineCount.get()); return; }
} </lang>
Lua
<lang lua>function ReadFile()
local fp = io.open( "input.txt" ) assert( fp ~= nil )
for line in fp:lines() do
coroutine.yield( line )
end
fp:close()
end
co = coroutine.create( ReadFile )
while true do
local status, val = coroutine.resume( co ) if coroutine.status( co ) == "dead" then break end print( val )
end </lang>
OCaml
Using only the standard library
We use the built-in Event module to provide communication channels between threads. <lang ocaml>open Event</lang>
The reader is a simple loop. It starts by opening a file, then reads lines from that file until there is nothing left to read. After each line, it sends Some v on channel lines_dest, where v is the contents of the line. Once there are no lines anymore, exception End_of_file is raised and we send None on that same channel. After that, it's just the matter of waiting for one message on count_source, closing the file and printing the result: <lang ocaml>let reader count_source count_source =
let file = open_in "input.txt" in let rec aux () = let line = try Some (input_line file) with End_of_file -> None in sync (send count_source line); match line with
| Some _ -> aux () | None -> let printed = sync (receive count_source) in Printf.printf "The task wrote %i strings\n" printed; close_in file
in aux ()</lang>
The printer is also a simple loop. It keeps receiving messages on lines_source. If a message has structure Some v, then v is a line, print it and increment the counter. Otherwise, the message has structure None, which means that we're done, just send the number of lines on count_dest: <lang ocaml>let printer lines_source count_target =
let rec aux i = match sync (receive lines_source) with | Some line -> print_endline line; aux ( i + 1 ) | None -> sync (send count_target i) in aux 0</lang>
Finally, our main program creates both communication channels and backgrounds treatment of printer: <lang ocaml>let _ =
let count = new_channel () and lines = new_channel () in let _ = Thread.create (printer lines) count in reader count lines</lang>
Note that, had we decided to background treatment of reader instead, an additional synchronization would have been necessary to prevent the program from leaving once the main thread is over.
ooRexx
<lang ooRexx> queue = .workqueue~new input = .stream~new("jabberwocky.txt") output = .output
reader = .filereader~new(input, queue) writer = .filewriter~new(output, queue)
- class workQueue
- method init
expose queue stopped actionpending queue = .queue~new stopped = .false actionPending = .false
-- add an item to the work queue. This is a -- guarded method, which means this is a synchronized access
- method addItem guarded
expose queue actionPending use arg item -- add the item to the queue queue~queue(item) -- indicate there's something new. This is a condition variable -- that any will wake up any thread that's waiting on access. They'll -- be able to get access once we exit actionPending = .true
-- another method for coordinating access with the other thread. This indicates -- it is time to shut down
- method stop guarded
expose actionPending stopped -- indicate this has been stopped and also flip the condition variable to -- wake up any waiters stopped = .true actionPending = .true
-- read the next item off of the queue. .nil indicates we've reached -- the last item on the queue. This is also a guarded method, but we'll use -- the GUARD ON instruction to wait for work if the queue is currently empty
- method nextItem
expose queue stopped actionPending -- we might need to loop a little to get an item do forever -- if there's something on the queue, pull the front item and return if \queue~isEmpty then return queue~pull -- if the other thread says it is done sending is stuff, time to shut down if stopped then return .nil -- nothing on the queue, not stopped yet, so release the guard and wait until -- there's something pending to work on. guard on when actionPending end
-- one half of the synchronization effort. This will read lines and -- add them to the work queue. The thread will terminate once we hit end-of-file
- class filereader
- method init
-- accept a generic stream...the data source need not be a file use arg stream, queue
reply -- now multithreaded
signal on notready loop forever queue~addItem(stream~linein) end -- we come here on an EOF condition. Indicate we're done and terminate -- the thread notready: queue~stop
-- the other end of this. This class will read lines from a work queue -- and write it to a stream
- class filewriter
- method init
-- accept a generic stream...the data source need not be a file use arg stream, queue
reply -- now multithreaded
loop forever item = queue~nextItem -- .nil means last item received if item == .nil then return -- write to the stream stream~lineout(item) end
</lang>
Oz
<lang oz>declare
%% Helper function to read a file lazily. %% Returns a lazy list of lines. fun {ReadLines FN} F = {New class $ from Open.file Open.text end init(name:FN)} fun lazy {ReadNext} case {F getS($)} of false then nil [] Line then Line|{ReadNext} end end in %% close file when handle becomes unreachable {Finalize.register F proc {$ F} {F close} end} {ReadNext} end Count %% Will receive the number of lines PrinterPort
in
%% Printer thread thread Stream Counter = {NewCell 0} %% mutable variable in PrinterPort = {NewPort ?Stream} for Line in Stream do case Line of eof then Count = @Counter else {System.showInfo Line} Counter := @Counter + 1 end end end %% Send all lines to printer thread; make sure that eof is sent. try for Line in {ReadLines "input.txt"} do {Send PrinterPort Line} end finally {Send PrinterPort eof} end %% Sync on Count and print its value. {Wait Count} {Show Count}</lang>
Perl
<lang perl>use threads; use Thread::Queue qw();
my $q1 = Thread::Queue->new; my $q2 = Thread::Queue->new;
my $reader = threads->create(sub {
my $q1 = shift; my $q2 = shift; open my $fh, '<', 'input.txt'; $q1->enqueue($_) while <$fh>; close $fh; $q1->enqueue(undef); print $q2->dequeue;
}, $q1, $q2);
my $printer = threads->create(sub {
my $q1 = shift; my $q2 = shift; my $count; while (my $line = $q1->dequeue) { print $line; $count++; }; $q2->enqueue($count);
}, $q1, $q2);
$reader->join; $printer->join;</lang>
PicoLisp
PicoLisp has no threads, but synchronous background tasks and asynchronous signal handlers, or coroutines.
Using background tasks and signals
The following two tasks communicate via UDP, so in fact they don't need to run within the same process and not even the same machine. "input.txt" would rather be a device (like a named pipe or socket) than a plain file. <lang PicoLisp># Reading task (synchronous) (task (open "input.txt")
(let Fd @ (if (in Fd (line T)) # More lines? (udp "localhost" 4444 @) # Yes: Send next line (task (port T 4445) # Else install handler (prinl (udp @) " lines") # to receive and print count (task (close @)) ) (udp "localhost" 4444 T) # Send 'T' for "Done" (task (close Fd)) ) ) ) # Stop the task
- Printing task (asynchronous)
(sigio (setq "Sock" (port T 4444))
(job '((Cnt . 0)) (let? X (udp "Sock") (if (=T X) # Done? (prog (udp "localhost" 4445 Cnt) # Yes: Send count (sigio (close "Sock")) ) # and stop the task (println X) # Else print line to stdout (inc 'Cnt) ) ) ) ) # and increment count</lang>
If the two cases of 'sigio' in the printing task are replaced with 'task', that task would also be synchronous. The resulting behavior is the same.
Using coroutines
Coroutines are available only in the 64-bit version. <lang PicoLisp>(co 'unit1
(yield) # Allow 'unit2' to start (in "input.txt" # Read the file (while (line T) # Send each line (yield @ 'unit2) ) ) # to 'unit2' (prinl (yield NIL 'unit2) # Send 'NIL' for "Done", receive count " lines" ) )
(co 'unit2
(let Cnt 0 # Init counter (while (yield NIL 'unit1) # Receive line (println @) # Print it (inc 'Cnt) ) # Increment count (yield Cnt 'unit1) ) ) # Send count to 'unit1'</lang>
PureBasic
PureBasic uses Semaphores and Mutex's to coordinate threads. <lang PureBasic>Enumeration
#Write #Done
EndEnumeration
Structure commblock
txtline.s Order.i
EndStructure
Global MessageSent=CreateSemaphore() Global LineWritten=CreateSemaphore() Global LinesWritten, com.commblock
Procedure Writer(arg)
Repeat WaitSemaphore(MessageSent) If com\Order=#Write PrintN(com\txtline) LinesWritten+1 EndIf SignalSemaphore(LineWritten) Until com\Order=#Done
EndProcedure
Procedure Reader(arg)
Protected File=ReadFile(#PB_Any,OpenFileRequester("","input.txt","",0)) While file And Not Eof(file) com\txtline=ReadString(File) com\Order=#Write SignalSemaphore(MessageSent) WaitSemaphore(LineWritten) Wend com\Order=#Done SignalSemaphore(MessageSent) WaitSemaphore(LineWritten) PrintN(Str(LinesWritten)+" lines written.")
EndProcedure
If OpenConsole()
Define Thread1=CreateThread(@Reader(),0) Define Thread2=CreateThread(@Writer(),0) WaitThread(Thread1) And WaitThread(Thread2) Print("Press Enter to exit"):Input()
EndIf</lang>
Python
Notes: instead of hardcoding the input and output files in the units, each unit is created with a file and read or write the given file.
<lang python>import sys from Queue import Queue from threading import Thread
lines = Queue(1) count = Queue(1)
def read(file):
try: for line in file: lines.put(line) finally: lines.put(None) print count.get()
def write(file):
n = 0 while 1: line = lines.get() if line is None: break file.write(line) n += 1 count.put(n)
reader = Thread(target=read, args=(open('input.txt'),)) writer = Thread(target=write, args=(sys.stdout,)) reader.start() writer.start() reader.join() writer.join()</lang>
Racket
<lang racket> (define reader->printer-channel (make-channel)) (define printer->reader-channel (make-channel))
(define (sync-line-counter filename)
(define (reader) (define file-port (open-input-file filename)) (let loop ([line (read-line file-port)]) (when (not (eof-object? line)) (begin (channel-put reader->printer-channel line) (loop (read-line file-port))))) (channel-put reader->printer-channel eof) (let ([num-lines (channel-get printer->reader-channel)]) (printf "Number of lines printed = ~a~%" num-lines))) (define (printer) (define count 0) (let loop ([line (channel-get reader->printer-channel)]) (when (not (eof-object? line)) (begin (printf "~a~%" line) (set! count (add1 count)) (loop (channel-get reader->printer-channel))))) (channel-put printer->reader-channel count)) (thread reader) (thread printer))
(sync-line-counter "input.txt") </lang>
Raven
<lang raven>'input.txt' as src_file
class Queue
new list as items condition as ready
define item_put items push ready notify
define item_get items empty if ready wait items shift
Queue as lines Queue as count
thread reader
"file://r:%(src_file)s" open each lines.item_put NULL lines.item_put count.item_get "reader: %d\n" print
thread writer
0 repeat lines.item_get dup while "writer: %s" print 1+ drop count.item_put
reader as r writer as w</lang>
Ruby
The task is to refactor this program to use two concurrent units.
<lang ruby>count = 0 IO.foreach("input.txt") { |line| print line; count += 1 } puts "Printed #{count} lines."</lang>
The above program has no concurrency, because the printer is a block { |line| print line; count += 1 } that can print only one line. (The reader calls block more than one time.) After the refactoring, the printer will print all lines, and the program will jump between the reader and the printer. For the refactoring, Fibers might be best, Continuations work if our Ruby is too old for Fibers, and Threads are another alternative.
Fibers. Ruby 1.9 gives Fiber to us. Fibers provide concurrency. Each fiber has its own stack (to hold nested function calls). A pair of fibers has a boss-vassal relationship: the boss fiber calls vassal.resume to jump to the vassal, and the vassal fiber calls Fiber.yield to jump to the boss.
<lang ruby>count = 0 reader = Fiber.new do
IO.foreach("input.txt") { |line| Fiber.yield line } puts "Printed #{count} lines." nil
end
- printer
while line = reader.resume
print line count += 1
end</lang>
- We create a fiber as the reader, and we use the current fiber as the printer.
- Our reader is a generator, because reader.resume generates a line, and Fiber.yield passes the generated value.
- This generator allows Fiber.yield inside nested function calls (because a fiber has its own stack). We nested three calls: we have Fiber.yield inside our line block, inside IO.foreach, inside our fiber block.
- Because the reader and printer are in the same process, they can share the count variable. An alternate way is to use reader.resume(count) to pass the count, so Fiber.yield would return the count.
- If IO.foreach raises an IO error, then the reader dies, and reader.resume raises the same error in the printer. This is what we want. If we run this program with no input.txt file, then we see the error.
Continuations. Ruby 1.8 gives Continuation to us. (Ruby 1.9 still gives Continuation if we require 'continuation' from the standard library.) The trick is that you can continue a function call after you leave it. Continuations can provide concurrency. The problem is that continuations make spaghetti code with very confusing control flow.
<lang ruby>require 'continuation' unless defined? Continuation
count = 0 reader = proc do |cont|
IO.foreach("input.txt") { |line| cont = callcc { |cc| cont[cc, line] }} puts "Printed #{count} lines." cont[nil]
end
- printer
while array = callcc { |cc| reader[cc] }
reader, line = array print line count += 1
end</lang>
- The above program uses continuations almost like fibers. The reader continues the printer, and the printer continues the reader.
- The first call to reader[c] uses Proc#[] to start the reader; but later calls to reader[c] use Continuation#[] to continue the reader.
- The reader and printer share the same stack. The control flow when IO.foreach raises an IO error is very strange. The reader dies, and the original call to Proc#[] raises the same error in the printer.
Threads. Both Ruby 1.8 and Ruby 1.9 give Thread to us. Threads provide preemptive concurrency. The scheduler preempts threads and switches between threads, seemingly at random. Threads seem worse than continuations, because threads have unpredictable control flow, but we can use a Queue to restore some order. We use Thread with Queue.
<lang ruby>require 'thread'
counts = Queue.new lines = Queue.new reader = Thread.new do
begin File.foreach("input.txt") { |line| lines << line } lines << :EOF puts "Printed #{counts.pop} lines." ensure lines << nil end
end
- writer
count = 0 while line = lines.pop
case line when String print line count += 1 when :EOF counts << count end
end reader.join</lang>
- We create a thread as the reader, and use the current thread as the writer.
- If a thread tries to pop an empty queue, then the thread waits until some other thread queues something.
- The queue of lines can become long; the worst case allows the reader to read the entire file before the printer pops the first line! If you wanted to prevent a long queue, a SizedQueue.new(5) would hold only 5 elements.
- If IO.reader raises an IO error, then the reader dies. The writer would deadlock on an empty queue after the reader dies. To prevent this deadlock, the reader ensures to queue a final nil before it dies. The writer uses this nil to break its loop and call reader.join. If the reader dies with an IO error, then reader.join raises the same error.
Scala
A possible implementation using Actors
<lang scala>case class HowMany(asker:Actor)
val printer = actor {
var count = 0 while(true){ receive{
case line:String => print(line); count = count + 1
case HowMany(asker:Actor) => asker ! count ; exit() } }
}
def reader(printer:Actor) {
scala.io.Source.fromFile("c:\\input.txt").getLines foreach {printer ! _ } printer ! HowMany( actor { receive{
case count:Int => println("line count = "+count)
} })
}
reader(printer)</lang>
SystemVerilog
<lang SystemVerilog>program main;
mailbox#(bit) p2c_cmd = new; mailbox#(string) p2c_data = new; mailbox#(int) c2p_data = new;
initial begin int fh = $fopen("input.txt", "r"); string line; int count; while ($fgets(line, fh)) begin p2c_cmd.put(0); p2c_data.put(line); end p2c_cmd.put(1); c2p_data.get(count); $display( "COUNT: %0d", count ); end
initial begin bit done; int count; while (!done) begin p2c_cmd.get(done); if (done) begin c2p_data.put(count); end else begin string line; p2c_data.get(line); $display( "LINE: %s", line); count++; end end end
endprogram</lang>
Tcl
Uses the Thread package. <lang tcl>package require Thread
- Define the input thread
set input [thread::create {
proc readFile {filename receiver} {
set f [open $filename] while {[gets $f line] >= 0} { thread::send $receiver [list line $line] } close $f thread::send $receiver lineCount lines puts "got $lines lines"
} thread::wait
}]
- Define the output thread
set output [thread::create {
set lines 0 proc line {string} {
puts $string incr ::lines
} proc lineCount {} {return $::lines} thread::wait
}]
- Connect everything together and start the processing
thread::send $input [list readFile "input.txt" $output]</lang>
UnixPipes
the main process is the one started by cat file. The subshell created by >(...) is the secondary process the main pipeline waits for the secondary process to finish, collects and prints the count. This falls a-foul of the requirement that fork should not be used as every thing in pipes is done using forks.
<lang bash>rm -f node ; mkfifo node cat file | tee >(wc -l > node ) | cat - node</lang>
Visual Basic .NET
This can be improved by adding a blocking Dequeue instead of spinning on TryDequeue.
<lang vbnet>Imports System.Threading
Module Module1
Sub Main() Dim t1 As New Thread(AddressOf Reader) Dim t2 As New Thread(AddressOf Writer) t1.Start() t2.Start() t1.Join() t2.Join() End Sub
Sub Reader() For Each line In IO.File.ReadAllLines("input.txt") m_WriterQueue.Enqueue(line) Next m_WriterQueue.Enqueue(Nothing)
Dim result As Integer Do Until m_ReaderQueue.TryDequeue(result) Thread.Sleep(10) Loop
Console.WriteLine(result)
End Sub
Sub Writer() Dim count = 0 Dim line As String = Nothing Do Do Until m_WriterQueue.TryDequeue(line) Thread.Sleep(10) Loop If line IsNot Nothing Then Console.WriteLine(line) count += 1 Else m_ReaderQueue.Enqueue(count) Exit Do End If Loop End Sub
Private m_WriterQueue As New SafeQueue(Of String) Private m_ReaderQueue As New SafeQueue(Of Integer)
End Module
Class SafeQueue(Of T)
Private m_list As New Queue(Of T) Public Function TryDequeue(ByRef result As T) As Boolean SyncLock m_list If m_list.Count = 0 Then Return False result = m_list.Dequeue Return True End SyncLock End Function Public Sub Enqueue(ByVal value As T) SyncLock m_list m_list.Enqueue(value) End SyncLock End Sub
End Class</lang>
- Programming Tasks
- Concurrency
- Ada
- Aikido
- ALGOL 68
- BCPL
- C
- Libco
- Clojure
- Common Lisp
- Bordeaux Threads
- D
- Tools
- Delphi
- E
- Erlang
- Euphoria
- F Sharp
- Go
- Haskell
- Icon
- Unicon
- Java
- Lua
- OCaml
- OoRexx
- Oz
- Perl
- PicoLisp
- PureBasic
- Python
- Racket
- Raven
- Ruby
- Continuation
- Thread
- Scala
- SystemVerilog
- Tcl
- UnixPipes
- Clarify task
- Visual Basic .NET
- AWK/Omit
- GUISS/Omit
- TI-83 BASIC/Omit
- TI-89 BASIC/Omit
- M4/Omit
- PARI/GP/Omit
- Retro/Omit
- ZX Spectrum Basic/Omit