Synchronous concurrency: Difference between revisions

m
→‎{{header|Wren}}: Changed to Wren S/H
m (fixed a refcount issue)
m (→‎{{header|Wren}}: Changed to Wren S/H)
 
(30 intermediate revisions by 15 users not shown)
Line 8:
=={{header|Ada}}==
This Ada example starts by creating a package defining a single instance of a printing task. Ada requires packages to be separated into two parts. The package specification defines the interface to all public members of the package.
<langsyntaxhighlight Adalang="ada">package Synchronous_Concurrent is
task Printer is
entry Put(Item : in String);
entry Get_Count(Count : out Natural);
end Printer;
end Synchronous_Concurrent;</langsyntaxhighlight>
The package body contains the implementation of all the subprograms and tasks defined in the specification.
<langsyntaxhighlight Adalang="ada">with Ada.Text_Io; use Ada.Text_Io;
with Ada.Strings.Unbounded; use Ada.Strings.Unbounded;
Line 40:
end Printer;
end Synchronous_Concurrent;</langsyntaxhighlight>
Note that the task body contains an ''accept'' block for each ''entry'' defined in the task specification. When some other task calls an entry in the Printer task the communication between the tasks is synchronized.
 
Line 46:
 
The next file contains the ''main'' procedure for this program. The main or entry-point procedure for a program always runs in the ''environment'' task. For this program, the ''environment'' task is takes on the role of the file reading concurrent unit while the Printer task takes on the role of the printing concurrent unit.
<langsyntaxhighlight Adalang="ada">with Synchronous_Concurrent; use Synchronous_Concurrent;
with Ada.Text_Io; use Ada.Text_Io;
Line 64:
New_Line;
Put_Line("The task wrote" & Natural'Image(Num_Strings) & " strings.");
end Synchronous_Concurrent_Main;</langsyntaxhighlight>
In this example only the environment task can call the entries on the Printer task. When the environment task completes the ''terminate'' option of the Printer task applies, terminating the Printer task. The environment task completes right after printing the number of lines sent through the Printer task. Because of the ''terminate'' option, the Printer task terminates just after the environment task prints the count.
 
=={{header|Aikido}}==
Aikido supports threads and monitors natively (built in to the language). There is also support for closures, but this example will use threads:
<langsyntaxhighlight lang="aikido">
monitor Queue {
var items = []
Line 117:
 
join (r)
join (w)</langsyntaxhighlight>
 
=={{header|ALGOL 68}}==
<langsyntaxhighlight lang="algol68">(
STRING line;
INT count := 0, errno;
Line 151:
);
print((count))
)</langsyntaxhighlight>
 
=={{header|BCPL}}==
<langsyntaxhighlight BCPLlang="bcpl">// This is a BCPL implementation of the Rosettacode synchronous
// concurrency test using BCPL coroutines and a coroutine implementation
// of a Occum-style channels.
Line 293:
}
}
</syntaxhighlight>
</lang>
 
=={{header|C}}==
{{libheader|libco}}
<langsyntaxhighlight lang="c">#include <stdlib.h> /* malloc(), realloc(), free() */
#include <stdio.h> /* fopen(), fgetc(), fwrite(), printf() */
 
Line 408:
co_delete(reader);
return 0;
}</langsyntaxhighlight>
 
=={{header|C sharp}}==
<syntaxhighlight lang="csharp">using System;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.IO;
 
namespace SynchronousConcurrency
{
class Program
{
static void Main(string[] args)
{
BlockingCollection<string> toWriterTask = new BlockingCollection<string>();
BlockingCollection<int> fromWriterTask = new BlockingCollection<int>();
Task writer = Task.Factory.StartNew(() => ConsoleWriter(toWriterTask, fromWriterTask));
Task reader = Task.Factory.StartNew(() => FileReader(fromWriterTask, toWriterTask));
Task.WaitAll(writer, reader);
}
static void ConsoleWriter(BlockingCollection<string> input, BlockingCollection<int> output)
{
int nLines = 0;
string line;
while ((line = input.Take()) != null)
{
Console.WriteLine(line);
++nLines;
}
output.Add(nLines);
}
static void FileReader(BlockingCollection<int> input, BlockingCollection<string> output)
{
StreamReader file = new StreamReader("input.txt"); // TODO: check exceptions
string line;
while ((line = file.ReadLine()) != null)
{
output.Add(line);
 
}
output.Add(null); // EOF
Console.WriteLine("line count: " + input.Take());
}
}
}</syntaxhighlight>
{{out}}
<pre>
foo
bar
baz
xenu 666
line count: 4
</pre>
 
=={{header|C++}}==
{{works with|C++11}}
<langsyntaxhighlight lang="cpp">#include <future>
#include <iostream>
#include <fstream>
Line 472 ⟶ 524:
std::thread t2(printer, std::move(promise), std::ref(queue));
t1.join(); t2.join();
}</langsyntaxhighlight>
{{out}}
<pre>
Line 486 ⟶ 538:
 
Printed 9 lines
</pre>
 
=={{header|C sharp}}==
<lang csharp>using System;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.IO;
 
namespace SynchronousConcurrency
{
class Program
{
static void Main(string[] args)
{
BlockingCollection<string> toWriterTask = new BlockingCollection<string>();
BlockingCollection<int> fromWriterTask = new BlockingCollection<int>();
Task writer = Task.Factory.StartNew(() => ConsoleWriter(toWriterTask, fromWriterTask));
Task reader = Task.Factory.StartNew(() => FileReader(fromWriterTask, toWriterTask));
Task.WaitAll(writer, reader);
}
static void ConsoleWriter(BlockingCollection<string> input, BlockingCollection<int> output)
{
int nLines = 0;
string line;
while ((line = input.Take()) != null)
{
Console.WriteLine(line);
++nLines;
}
output.Add(nLines);
}
static void FileReader(BlockingCollection<int> input, BlockingCollection<string> output)
{
StreamReader file = new StreamReader("input.txt"); // TODO: check exceptions
string line;
while ((line = file.ReadLine()) != null)
{
output.Add(line);
 
}
output.Add(null); // EOF
Console.WriteLine("line count: " + input.Take());
}
}
}</lang>
{{out}}
<pre>
foo
bar
baz
xenu 666
line count: 4
</pre>
 
Line 545:
The ''state'' argument is the agent's state at the start of the call, and the last expression becomes the agent's new state.
 
<langsyntaxhighlight lang="clojure">(use '[clojure.java.io :as io])
 
(def writer (agent 0))
Line 551:
(defn write-line [state line]
(println line)
(inc state))</langsyntaxhighlight>
 
The reader executes on the main thread. It passes each line to the writer -- the ''send'' call returns immediately, waits until the writer has finished writing all the lines, gets the line count & prints it, and terminates the writer.
 
<langsyntaxhighlight lang="clojure">(with-open [r (io/reader "input.txt")]
(doseq [line (line-seq r)]
(send writer write-line line)))
(await writer)
(println "lines written:" @writer)
(shutdown-agents)</langsyntaxhighlight>
That's it!
 
Line 569:
First, implement message-passing:
 
<langsyntaxhighlight lang="lisp">(defvar *self*)
 
(defclass queue ()
Line 627:
`(receive-message (,message . ,body)))
 
(defun queue () (make-instance 'queue))</langsyntaxhighlight>
 
Should be easy from now on:
 
<langsyntaxhighlight lang="lisp">(defun reader (pathname writer)
(with-open-file (stream pathname)
(loop for line = (read-line stream nil)
Line 663:
(thread reader (reader pathname writer))
(thread writer (writer *standard-output* reader)))
(values))</langsyntaxhighlight>
 
And now an example:
 
<langsyntaxhighlight lang="lisp">CL-USER> (synchronous-concurrency :pathname "/tmp/input.txt")
foo
bar
Line 673:
xenu 666
line count: 4
; No value</langsyntaxhighlight>
 
Note that to run the example from the SLIME REPL you need to put:
Line 680:
 
in your <code>~/.swank.lisp</code>
 
=={{header|Crystal}}==
 
<syntaxhighlight lang="ruby">File.write("input.txt", "a\nb\nc")
lines = Channel(String).new
spawn do
File.each_line("input.txt") do |line|
lines.send(line)
end
lines.close
end
while line = lines.receive?
puts line
end
 
File.delete("input.txt")</syntaxhighlight>
 
<pre>
a
b
c
</pre>
 
=={{header|D}}==
<langsyntaxhighlight lang="d">import std.algorithm, std.concurrency, std.stdio;
 
void main() {
Line 702 ⟶ 727:
);
}
</syntaxhighlight>
</lang>
 
=={{header|Delphi}}==
<syntaxhighlight lang="delphi">
<lang Delphi>
program Project2;
 
Line 874 ⟶ 899:
end;
end.
</syntaxhighlight>
</lang>
 
=={{header|E}}==
<langsyntaxhighlight lang="e">def printer := {
var count := 0
def printer {
Line 906 ⟶ 931:
 
# Stream IO in E is not finished yet, so this example just uses a list.
sender(<file:input.txt>.getText().split("\n"))</langsyntaxhighlight>
 
=={{header|EchoLisp}}==
This task uses two processes, reader/writer, synchronized by a semaphore S and its queue of messages. The reader sends write or reader-count-please messages. The writer responds with ack or count messages.
<langsyntaxhighlight lang="scheme">
(require 'sequences)
(require 'tasks)
Line 946 ⟶ 971:
count)
 
</syntaxhighlight>
</lang>
{{out}}
<pre>
Line 981 ⟶ 1,006:
=={{header|Elixir}}==
{{trans|Erlang}}
<langsyntaxhighlight lang="elixir">defmodule RC do
def start do
my_pid = self
Line 1,011 ⟶ 1,036:
end
 
RC.start</langsyntaxhighlight>
 
=={{header|Erlang}}==
 
<langsyntaxhighlight lang="erlang">-module(cc).
 
-export([start/0]).
Line 1,041 ⟶ 1,066:
io:fwrite("~s", [Any]),
reader(Pid, C+1)
end.</langsyntaxhighlight>
 
=={{header|Euphoria}}==
<langsyntaxhighlight lang="euphoria">sequence lines
sequence count
lines = {}
Line 1,101 ⟶ 1,126:
while length(task_list()) > 1 do
task_yield()
end while</langsyntaxhighlight>
 
=={{header|F_Sharp|F#}}==
Line 1,108 ⟶ 1,133:
threads from the ThreadPool, using a MailboxProcessor for lock-free communication between threads and tracking the line count without the use of mutable state.
 
<langsyntaxhighlight lang="fsharp">
open System.IO
 
Line 1,138 ⟶ 1,163:
 
reader printer @"c:\temp\input.txt"
</syntaxhighlight>
</lang>
 
=={{header|Forth}}==
Tested in GForth 0.7.3. First, 'co.fs' - the coroutines library, then the output of the application on himself,
result of the command-line: 'cat synco.fs | gforth synco.fs'.
<syntaxhighlight lang="forth">\
\ co.fs Coroutines by continuations.
\
\ * Circular Queue. Capacity is power of 2.
\
VARIABLE HEAD VARIABLE TAIL
128 CELLS CONSTANT CQ#
\ * align by queue capacity
HERE DUP
CQ# 1- INVERT AND CQ# +
SWAP - ALLOT
\
HERE CQ# ALLOT CONSTANT START
\
: ADJUST ( -- ) [ CQ# 1- ]L AND START + ;
: PUT ( n-- ) TAIL @ TUCK ! CELL+ ADJUST TAIL ! ;
: TAKE ( --n ) HEAD @ DUP @ SWAP CELL+ ADJUST HEAD ! ;
: 0CQ ( -- ) START DUP HEAD ! TAIL ! ; 0CQ
: NOEMPTY? ( --f ) HEAD @ TAIL @ <> ;
: ;CO ( -- ) TAKE >R ;
\
\ * COROUTINES LEXEME
\
: CO: ( -- ) R> PUT ; \ Register continuation as coroutine. Exit.
: CO ( -- ) R> PUT TAKE >R ; \ Co-route.
: GO ( -- ) BEGIN NOEMPTY? WHILE ;CO REPEAT ; \ :-)
\
\ * CHANNELS LEXEME
\
: CHAN? ( a--f ) 2@ XOR ;
</syntaxhighlight>
{{out}}
<pre>
\ synco.fs Synchronous concurrency for RosettaCode
include co.fs
 
: STRING@ DUP CELL + SWAP @ ;
 
2VARIABLE CHAN
 
\
\ * READER LEXEME
\
4096 CONSTANT L#
CREATE Line 0 , L# ALLOT
: READER
CO:
BEGIN
Line cell+ L# STDIN read-line THROW
WHILE
Line !
0 Line CHAN 2!
CO
REPEAT DROP
Line DUP CHAN 2!
 
\ -- Wait for report back
BEGIN CO CHAN CHAN? UNTIL
 
\ -- Have it, show and go
CR S" -------" TYPE
CR S" LINES: " TYPE CHAN @ ?
;
\
\ * WRITER LEXEME
\
VARIABLE X
: WRITER
CO:
BEGIN
CHAN CHAN?
WHILE
CHAN @ STRING@ TYPE CR
1 X +!
CO
REPEAT
 
\ -- Chance to stop other writers
CO
 
\ -- First of writers reports back
\ -- the shared global counter
CHAN CHAN? 0=
IF
0 X CHAN 2!
CO
THEN
;
\
\ * RUNNER
\
0 X ! READER WRITER ( WRITER WRITER :-) GO CR BYE
 
-------
LINES: 59
</pre>
 
=={{header|Go}}==
<langsyntaxhighlight lang="go">package main
 
import (
Line 1,172 ⟶ 1,297:
close(lines)
fmt.Println("Number of lines:", <-count)
}</langsyntaxhighlight>
 
=={{header|Haskell}}==
Line 1,188 ⟶ 1,313:
The code below defines various signals in terms of takeMVar and putMVar and then passes those to the parts of the code which should be permitted to use them. Note that this way, it is impossible for the reader process to take the current line, for example.
 
<langsyntaxhighlight lang="haskell">import Control.Concurrent
import Control.Concurrent.MVar
 
Line 1,202 ⟶ 1,327:
 
forkIO $ writer takeLine putCount
reader putLine putEOF takeCount</langsyntaxhighlight>
 
The reader simply reads the file lazily, applying putLine to each of the lines in turn, which blocks until the writer has taken the line. It then signals that it is finished with putEOF, and then takes the count and prints it.
 
<langsyntaxhighlight lang="haskell">reader putLine putEOF takeCount =
do ls <- fmap lines (readFile "input.txt")
mapM putLine ls
putEOF
n <- takeCount
print n</langsyntaxhighlight>
 
The writer gets the lines in a loop with takeLine until it receives Nothing, at which point it uses putCount to tell the reader how many lines there were.
 
<langsyntaxhighlight lang="haskell">writer takeLine putCount = loop 0
where loop n = do l <- takeLine
case l of
Just x -> do putStrLn x
loop (n+1)
Nothing -> putCount n</langsyntaxhighlight>
 
=={{header|Icon}} and {{header|Unicon}}==
Line 1,230 ⟶ 1,355:
Unicon supports concurrent programming. A Unicon solution is:
 
<langsyntaxhighlight lang="unicon">procedure main(A)
fName := A[1]|"index.txt"
p := thread produce(fName)
Line 1,248 ⟶ 1,373:
# (blocking until each message arrives)
i@>>p # put row count into p's inbox
end</langsyntaxhighlight>
 
Sample output:
Line 1,265 ⟶ 1,390:
 
Implementation:
<langsyntaxhighlight Jlang="j">input=: 1 :0
nlines=: 0
u;._2@fread 'input.txt'
Line 1,274 ⟶ 1,399:
nlines=: nlines+1
smoutput y
)</langsyntaxhighlight>
 
Usage:
 
<syntaxhighlight lang J="j"> output input</langsyntaxhighlight>
 
Note that we are not using OS threads here, but instead - as specified by this task - are using two concurrent activities which share data synchronously.
Line 1,284 ⟶ 1,409:
=={{header|Java}}==
 
<langsyntaxhighlight lang="java">import java.io.BufferedReader;
import java.io.FileReader;
import java.util.concurrent.atomic.AtomicBoolean;
Line 1,336 ⟶ 1,461:
}
}
</syntaxhighlight>
</lang>
 
=={{header|Julia}}==
<syntaxhighlight lang="julia">
function inputlines(txtfile, iochannel)
for line in readlines(txtfile)
Base.put!(iochannel, line)
end
Base.put!(iochannel, nothing)
println("The other task printed $(take!(iochannel)) lines.")
end
 
function outputlines(iochannel)
totallines = 0
while (line = Base.take!(iochannel)) != nothing
totallines += 1
println(line)
end
Base.put!(iochannel, totallines)
end
 
c = Channel(0)
@async inputlines("filename.txt", c)
outputlines(c)
</syntaxhighlight>
 
=={{header|Kotlin}}==
 
===Using threads===
 
<syntaxhighlight lang="kotlin">import java.util.concurrent.SynchronousQueue
import kotlin.concurrent.thread
import java.io.File
 
const val EOT = "\u0004" // end of transmission
 
fun main(args: Array<String>) {
val queue = SynchronousQueue<String>()
 
val work = thread {
var count = 0
 
while (true) {
val line = queue.take()
if (line == EOT) {
queue.put(count.toString())
break
}
println(line)
count++
}
}
 
File("input.txt").forEachLine { line -> queue.put(line) }
queue.put(EOT)
work.join()
 
val count = queue.take().toInt()
println("\nNumber of lines printed = $count")
}</syntaxhighlight>
 
Content of input.txt:
<pre>
line1
line2
line3
line4
</pre>
 
{{out}}
<pre>
line1
line2
line3
line4
 
Number of lines printed = 4
</pre>
 
===Using coroutines===
Uses experimental features and will change in the future. Same output as the threads version.
<syntaxhighlight lang="kotlin">// version 1.3.20 with kotlinx-coroutines-core version 1.1.1
 
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.sumBy
import kotlinx.coroutines.coroutineScope
import java.io.File
 
suspend fun main() {
coroutineScope {
val lines = Channel<String>()
 
val count = async {
lines.sumBy { line ->
println(line)
1
}
}
 
File("input.txt").bufferedReader().forEachLine { line -> lines.send(line) }
println("\nNumber of lines printed = ${count.await()}")
}
}</syntaxhighlight>
 
=={{header|Logtalk}}==
The following example can be found in the Logtalk distribution and is used here with permission. The "input.txt" file contains the terms "a(0)..a(9)", one per line. Works when using SWI-Prolog, XSB, or YAP as the backend compiler.
<langsyntaxhighlight lang="logtalk">
:- object(team).
 
Line 1,373 ⟶ 1,601:
 
:- end_object.
</syntaxhighlight>
</lang>
 
Output:
 
<langsyntaxhighlight lang="text">
| ?- ?- team::start.
a(0)
Line 1,391 ⟶ 1,619:
Number of lines: 10
true.
</syntaxhighlight>
</lang>
 
=={{header|Lua}}==
<langsyntaxhighlight lang="lua">function ReadFile()
local fp = io.open( "input.txt" )
assert( fp ~= nil )
Line 1,412 ⟶ 1,640:
print( val )
end
</syntaxhighlight>
</lang>
 
=={{header|Mercury}}==
<langsyntaxhighlight Mercurylang="mercury">:- module synchronous_concurrency.
:- interface.
:- import_module io.
Line 1,473 ⟶ 1,701:
LineOrStop = stop,
mvar.put(MVar, Count, !IO)
).</langsyntaxhighlight>
 
=={{header|Nim}}==
Compile with <code>nim --threads:on c sync.nim</code>:
 
{{trans|Python}}
Two threads are spun up that communicate over a channel.
 
<syntaxhighlight lang="nim">var msgs: Channel[string]
var count: Channel[int]
 
const FILE = "input.txt"
 
proc read() {.thread.} =
for line in FILE.lines:
msgs.send(line)
msgs.send("")
echo count.recv()
count.close()
 
proc print() {.thread.} =
var n = 0
while true:
var msg = msgs.recv()
if msg.len == 0:
break
echo msg
n += 1
msgs.close()
count.send(n)
 
var reader_thread = Thread[void]()
var printer_thread = Thread[void]()
 
msgs.open()
count.open()
createThread(reader_thread, read)
createThread(printer_thread, print)
joinThreads(reader_thread, printer_thread)</syntaxhighlight>
 
=={{header|OCaml}}==
===Using only the standard library===
We use the built-in Event module to provide communication channels between threads.
<syntaxhighlight lang ="ocaml">open Event</langsyntaxhighlight>
 
The reader is a simple loop. It starts by opening a file, then reads lines from that file until there is nothing left to read. After each line, it sends <tt>Some v</tt> on channel <tt>lines_dest</tt>, where <tt>v</tt> is the contents of the line. Once there are no lines anymore,
exception <tt>End_of_file</tt> is raised and we send <tt>None</tt> on that same channel. After that, it's just the matter of waiting for one message on <tt>count_source</tt>, closing the file and printing the result:
<langsyntaxhighlight lang="ocaml">let reader count_source lines_dest =
let file = open_in "input.txt" in
let rec aux () =
Line 1,493 ⟶ 1,759:
Printf.printf "The task wrote %i strings\n" printed;
close_in file
in aux ()</langsyntaxhighlight>
 
The printer is also a simple loop. It keeps receiving messages on <tt>lines_source</tt>. If a message has structure <tt>Some v</tt>, then <tt>v</tt> is a line, print it and increment the counter. Otherwise, the message has structure <tt>None</tt>, which means that we're done, just send the number of lines on <tt>count_dest</tt>:
<langsyntaxhighlight lang="ocaml">let printer lines_source count_target =
let rec aux i =
match sync (receive lines_source) with
| Some line -> print_endline line; aux ( i + 1 )
| None -> sync (send count_target i)
in aux 0</langsyntaxhighlight>
 
Finally, our main program creates both communication channels and backgrounds treatment of <tt>printer</tt>:
<langsyntaxhighlight lang="ocaml">let _ =
let count = new_channel ()
and lines = new_channel ()
in
let _ = Thread.create (printer lines) count
in reader count lines</langsyntaxhighlight>
 
Note that, had we decided to background treatment of <tt>reader</tt> instead, an additional synchronization would have been necessary to prevent the program from leaving once the main thread is over.
Line 1,519 ⟶ 1,785:
When the file has been read and sent to channel chPrint, this channel is closed. All tasks waiting for an object in this channel are released and receive null. Then printing task returns number of printed lines into chCount channel for the main task.
 
<langsyntaxhighlight Oforthlang="oforth">import: parallel
 
: printing(chPrint, chCount)
Line 1,534 ⟶ 1,800:
aFileName File new forEach: line [ chPrint send(line) drop ]
chPrint close
chCount receive "Number of lines printed : " print println ;</langsyntaxhighlight>
 
=={{header|Ol}}==
We use lazy file reading to not a load full file content into memory.
Additionally, we send just a "#true" into coroutines as a message placeholder because we do not analyze (in this sample) a message content.
 
Source code:
<syntaxhighlight lang="scheme">
(import (owl parse))
 
(coroutine 'reader (lambda ()
; lazy line-by-line file reader
(define (not-a-newline x) (not (eq? x #\newline)))
(define parser (let-parse*
((line (greedy* (byte-if not-a-newline)))
(newline (imm #\newline)))
(bytes->string line)))
(define file (file->bytestream "input.txt"))
 
(let loop ((in (try-parse parser file #false)))
(cond
((not in) ; file is ended
(define envelope (wait-mail)) ; wait for a request
(mail (ref envelope 1) #eof)) ; send an end-of-file to caller
((pair? in) ; new line is read
(define envelope (wait-mail)) ; wait for a request
(mail (ref envelope 1) (car in)) ; send a line to caller
(loop (try-parse parser (cdr in) #false)))
(else ; just a lazy read, let's repeat
(loop (force in)))))
 
(print "total lines read: " (await (mail 'writer #t)))
))
 
(coroutine 'writer (lambda ()
(let loop ((n 0))
(define line (await (mail 'reader #t)))
 
(if (eof? line)
then
(define envelope (wait-mail)) ; wait for a request
(mail (ref envelope 1) n) ; send a lines count to caller
else
(print "read line: " line)
(loop (+ n 1))))))
 
</syntaxhighlight>
 
An input.txt:
<pre>The world was young, the mountains green,
No stain yet on the Moon was seen,
No words were laid on stream or stone
When Durin woke and walked alone.
He named the nameless hills and dells;
He drank from yet untasted wells;
He stooped and looked in Mirrormere,
And saw a crown of stars appear,
As gems upon a silver thread,
Above the shadows of his head.
</pre>
 
{{Out}}
<pre>read line: The world was young, the mountains green,
read line: No stain yet on the Moon was seen,
read line: No words were laid on stream or stone
read line: When Durin woke and walked alone.
read line: He named the nameless hills and dells;
read line: He drank from yet untasted wells;
read line: He stooped and looked in Mirrormere,
read line: And saw a crown of stars appear,
read line: As gems upon a silver thread,
read line: Above the shadows of his head.
total lines read: 10
</pre>
 
=={{header|ooRexx}}==
<syntaxhighlight lang="oorexx">
<lang ooRexx>
queue = .workqueue~new
input = .stream~new("jabberwocky.txt")
Line 1,623 ⟶ 1,962:
stream~lineout(item)
end
</syntaxhighlight>
</lang>
 
=={{header|Oz}}==
<langsyntaxhighlight lang="oz">declare
%% Helper function to read a file lazily.
%% Returns a lazy list of lines.
Line 1,674 ⟶ 2,013:
%% Sync on Count and print its value.
{Wait Count}
{Show Count}</langsyntaxhighlight>
 
=={{header|Perl}}==
<langsyntaxhighlight lang="perl">use threads;
use Thread::Queue qw();
Line 1,709 ⟶ 2,048:
$reader->join;
$printer->join;</langsyntaxhighlight>
 
=={{header|Perl 6}}==
{{works with|rakudo|2013-02-27}}
<lang perl6>sub MAIN ($infile) {
$infile.IO.lines ==> printer() ==> my $count;
say "printed $count lines";
}
 
sub printer(*@lines) {
my $lines;
for @lines {
.say;
++$lines;
}
$lines;
}</lang>
Concurrent units are established by use of the feed operator, which works much like an in-process object pipe. Since the final feed goes to a variable declaration that belongs to the outer thread, it serves as a backchannel from the printer thread. In this case the outer thread signals the desire for a line count by terminating the pipe to the printing thread.
(Note: soon these will be implemented with real threads in Perl 6, but this is currently emulated with coroutine semantics, aka lazy lists.)
 
=={{header|Phix}}==
Busy wait version (using threads) - the distribution file also contains single step-lock and queue-less versions of this code.
<!--<syntaxhighlight lang="phix">(notonline)-->
This program is included in the distribution as demo\rosetta\Synchronous_concurrency.exw, which also contains single step-lock and queue-less versions of this code.
<span style="color: #000080;font-style:italic;">-- demo\rosetta\Synchronous_concurrency.exw</span>
<lang Phix>atom frThread, -- file reader thread
<span style="color: #008080;">without</span> <span style="color: #008080;">js</span> <span style="color: #000080;font-style:italic;">-- threads, file i/o, command_line()</span>
lcThread -- line counter thread
<span style="color: #004080;">string</span> <span style="color: #000000;">filename</span> <span style="color: #0000FF;">=</span> <span style="color: #7060A8;">substitute</span><span style="color: #0000FF;">(</span><span style="color: #7060A8;">command_line</span><span style="color: #0000FF;">()[</span><span style="color: #000000;">2</span><span style="color: #0000FF;">],</span><span style="color: #008000;">".exe"</span><span style="color: #0000FF;">,</span><span style="color: #008000;">".exw"</span><span style="color: #0000FF;">)</span>
 
sequence queue = {}
<span style="color: #004080;">atom</span> <span style="color: #000000;">frThread</span><span style="color: #0000FF;">,</span> <span style="color: #000080;font-style:italic;">-- file reader thread</span>
integer qlock = init_cs()
<span style="color: #000000;">lcThread</span> <span style="color: #000080;font-style:italic;">-- line counter thread</span>
 
integer linecount = 1
<span style="color: #004080;">sequence</span> <span style="color: #000000;">queue</span> <span style="color: #0000FF;">=</span> <span style="color: #0000FF;">{}</span>
 
<span style="color: #004080;">integer</span> <span style="color: #000000;">qlock</span> <span style="color: #0000FF;">=</span> <span style="color: #7060A8;">init_cs</span><span style="color: #0000FF;">(),</span>
procedure readfile()
<span style="color: #000000;">linecount</span> <span style="color: #0000FF;">=</span> <span style="color: #000000;">1</span>
object line
integer fn = open("input.txt","r")
<span style="color: #008080;">procedure</span> <span style="color: #000000;">readfile</span><span style="color: #0000FF;">()</span>
while 1 do
<span style="color: #004080;">integer</span> <span style="color: #000000;">fn</span> <span style="color: #0000FF;">=</span> <span style="color: #7060A8;">open</span><span style="color: #0000FF;">(</span><span style="color: #000000;">filename</span><span style="color: #0000FF;">,</span><span style="color: #008000;">"r"</span><span style="color: #0000FF;">)</span>
line = gets(fn)
<span style="color: #008080;">while</span> <span style="color: #000000;">1</span> <span style="color: #008080;">do</span>
enter_cs(qlock)
<span style="color: #004080;">object</span> <span style="color: #000000;">line</span> <span style="color: #0000FF;">=</span> <span style="color: #7060A8;">gets</span><span style="color: #0000FF;">(</span><span style="color: #000000;">fn</span><span style="color: #0000FF;">)</span>
queue = append(queue,line)
<span style="color: #7060A8;">enter_cs</span><span style="color: #0000FF;">(</span><span style="color: #000000;">qlock</span><span style="color: #0000FF;">)</span>
line = atom(line) -- kill refcount!
<span style="color: #000000;">queue</span> <span style="color: #0000FF;">=</span> <span style="color: #7060A8;">append</span><span style="color: #0000FF;">(</span><span style="color: #000000;">queue</span><span style="color: #0000FF;">,</span><span style="color: #000000;">line</span><span style="color: #0000FF;">)</span>
leave_cs(qlock)
<span style="color: #000000;">line</span> <span style="color: #0000FF;">=</span> <span style="color: #004080;">atom</span><span style="color: #0000FF;">(</span><span style="color: #000000;">line</span><span style="color: #0000FF;">)</span> <span style="color: #000080;font-style:italic;">-- kill refcount!</span>
if line then exit end if
<span style="color: #7060A8;">leave_cs</span><span style="color: #0000FF;">(</span><span style="color: #000000;">qlock</span><span style="color: #0000FF;">)</span>
end while
<span style="color: #008080;">if</span> <span style="color: #000000;">line</span> <span style="color: #008080;">then</span> <span style="color: #008080;">exit</span> <span style="color: #008080;">end</span> <span style="color: #008080;">if</span>
close(fn)
<span style="color: #008080;">end</span> <span style="color: #008080;">while</span>
wait_thread(lcThread)
<span style="color: #7060A8;">close</span><span style="color: #0000FF;">(</span><span style="color: #000000;">fn</span><span style="color: #0000FF;">)</span>
printf(1,"Lines read: %d\n",linecount)
<span style="color: #7060A8;">wait_thread</span><span style="color: #0000FF;">(</span><span style="color: #000000;">lcThread</span><span style="color: #0000FF;">)</span>
exit_thread(0)
<span style="color: #7060A8;">printf</span><span style="color: #0000FF;">(</span><span style="color: #000000;">1</span><span style="color: #0000FF;">,</span><span style="color: #008000;">"Lines read: %d\n"</span><span style="color: #0000FF;">,</span><span style="color: #000000;">linecount</span><span style="color: #0000FF;">)</span>
end procedure
<span style="color: #7060A8;">exit_thread</span><span style="color: #0000FF;">(</span><span style="color: #000000;">0</span><span style="color: #0000FF;">)</span>
 
<span style="color: #008080;">end</span> <span style="color: #008080;">procedure</span>
procedure countlines()
object line
<span style="color: #008080;">procedure</span> <span style="color: #000000;">countlines</span><span style="color: #0000FF;">()</span>
linecount = 0
<span style="color: #000000;">linecount</span> <span style="color: #0000FF;">=</span> <span style="color: #000000;">0</span>
while 1 do
<span style="color: #008080;">while</span> <span style="color: #000000;">1</span> <span style="color: #008080;">do</span>
enter_cs(qlock)
<span style="color: #7060A8;">enter_cs</span><span style="color: #0000FF;">(</span><span style="color: #000000;">qlock</span><span style="color: #0000FF;">)</span>
if length(queue)=0 then
<span style="color: #008080;">if</span> <span style="color: #7060A8;">length</span><span style="color: #0000FF;">(</span><span style="color: #000000;">queue</span><span style="color: #0000FF;">)=</span><span style="color: #000000;">0</span> <span style="color: #008080;">then</span>
leave_cs(qlock)
<span style="color: #7060A8;">leave_cs</span><span style="color: #0000FF;">(</span><span style="color: #000000;">qlock</span><span style="color: #0000FF;">)</span>
-- sleep(0.1)
<span style="color: #000080;font-style:italic;">-- sleep(0.1)</span>
else
<span line style="color: queue[1]#008080;">else</span>
<span style="color: #004080;">object</span> <span style="color: #000000;">line</span> <span style="color: #0000FF;">=</span> <span style="color: #000000;">queue</span><span style="color: #0000FF;">[</span><span style="color: #000000;">1</span><span style="color: #0000FF;">]</span>
queue = queue[2..$]
<span style="color: #000000;">queue</span> <span style="color: #0000FF;">=</span> <span style="color: #000000;">queue</span><span style="color: #0000FF;">[</span><span style="color: #000000;">2</span><span style="color: #0000FF;">..$]</span>
leave_cs(qlock)
<span style="color: #7060A8;">leave_cs</span><span style="color: #0000FF;">(</span><span style="color: #000000;">qlock</span><span style="color: #0000FF;">)</span>
if atom(line) then exit end if
<span style="color: #008080;">if</span> <span style="color: #004080;">atom</span><span style="color: #0000FF;">(</span><span style="color: #000000;">line</span><span style="color: #0000FF;">)</span> <span style="color: #008080;">then</span> <span style="color: #008080;">exit</span> <span style="color: #008080;">end</span> <span style="color: #008080;">if</span>
-- ?line
<span style="color: #000080;font-style:italic;">-- ?line</span>
linecount += 1
<span style="color: #000000;">linecount</span> <span style="color: #0000FF;">+=</span> <span style="color: #000000;">1</span>
end if
<span style="color: #008080;">end</span> <span style="color: #008080;">if</span>
end while
<span style="color: #008080;">end</span> <span style="color: #008080;">while</span>
exit_thread(0)
<span style="color: #7060A8;">exit_thread</span><span style="color: #0000FF;">(</span><span style="color: #000000;">0</span><span style="color: #0000FF;">)</span>
end procedure
<span style="color: #008080;">end</span> <span style="color: #008080;">procedure</span>
 
frThread = create_thread(routine_id("readfile"),{})
<span style="color: #000000;">lcThread</span> <span style="color: #0000FF;">=</span> <span style="color: #7060A8;">create_thread</span><span style="color: #0000FF;">(</span><span style="color: #000000;">countlines</span><span style="color: #0000FF;">,{})</span>
lcThread = create_thread(routine_id("countlines"),{})
<span style="color: #000000;">frThread</span> <span style="color: #0000FF;">=</span> <span style="color: #7060A8;">create_thread</span><span style="color: #0000FF;">(</span><span style="color: #000000;">readfile</span><span style="color: #0000FF;">,{})</span>
 
wait_thread(frThread)
<span style="color: #7060A8;">wait_thread</span><span style="color: #0000FF;">(</span><span style="color: #000000;">frThread</span><span style="color: #0000FF;">)</span>
puts(1,"done")
<span style="color: #7060A8;">puts</span><span style="color: #0000FF;">(</span><span style="color: #000000;">1</span><span style="color: #0000FF;">,</span><span style="color: #008000;">"done"</span><span style="color: #0000FF;">)</span>
{} = wait_key()</lang>
<span style="color: #0000FF;">{}</span> <span style="color: #0000FF;">=</span> <span style="color: #7060A8;">wait_key</span><span style="color: #0000FF;">()</span>
<!--</syntaxhighlight>-->
 
=={{header|PicoLisp}}==
Line 1,792 ⟶ 2,116:
within the same process and not even the same machine. "input.txt" would rather
be a device (like a named pipe or socket) than a plain file.
<langsyntaxhighlight PicoLisplang="picolisp"># Reading task (synchronous)
(task (open "input.txt")
(let Fd @
Line 1,812 ⟶ 2,136:
(sigio (close "Sock")) ) # and stop the task
(println X) # Else print line to stdout
(inc 'Cnt) ) ) ) ) # and increment count</langsyntaxhighlight>
If the two cases of 'sigio' in the printing task are replaced with 'task',
that task would also be synchronous. The resulting behavior is the same.
Line 1,818 ⟶ 2,142:
===Using coroutines===
Coroutines are available only in the 64-bit version.
<langsyntaxhighlight PicoLisplang="picolisp">(co 'unit1
(yield) # Allow 'unit2' to start
(in "input.txt" # Read the file
Line 1,832 ⟶ 2,156:
(println @) # Print it
(inc 'Cnt) ) # Increment count
(yield Cnt 'unit1) ) ) # Send count to 'unit1'</langsyntaxhighlight>
 
=={{header|Pony}}==
Pony has concurrency baked into the language in the form of the actor model:
<syntaxhighlight lang="pony">use "files"
actor Main
let _env: Env // The environment contains stdout, so we save it here
 
new create(env: Env) =>
_env = env
let printer: Printer tag = Printer(env)
try
let path = FilePath(env.root as AmbientAuth, "input.txt")? // this may fail, hence the ?
let file = File.open(path)
for line in FileLines(file) do
printer(line) // sugar for "printer.apply(line)"
end
end
printer.done(this)
 
be finish(count: USize) =>
_env.out.print("Printed: " + count.string() + " lines")
 
actor Printer
let _env: Env
var _count: USize = 0
new create(env: Env) => _env = env
 
be apply(line: String) =>
_count = _count + 1
_env.out.print(line)
 
be done(main: Main tag) => main.finish(_count)</syntaxhighlight>
 
Actors are scheduled asynchronously, but messages (implemented via the behaviours) are guaranteed to arrive in causal ordering.
 
=={{header|PureBasic}}==
PureBasic uses Semaphores and Mutex's to coordinate threads.
<langsyntaxhighlight PureBasiclang="purebasic">Enumeration
#Write
#Done
Line 1,880 ⟶ 2,240:
WaitThread(Thread1) And WaitThread(Thread2)
Print("Press Enter to exit"):Input()
EndIf</langsyntaxhighlight>
 
=={{header|Python}}==
Notes: instead of hardcoding the input and output files in the units, each unit is created with a file and read or write the given file.
 
<langsyntaxhighlight lang="python">import sys
from Queue import Queue
from threading import Thread
Line 1,915 ⟶ 2,275:
writer.start()
reader.join()
writer.join()</langsyntaxhighlight>
 
===Using generators===
{{trans|Ruby}}
<langsyntaxhighlight lang="python">count = 0
 
def reader():
Line 1,930 ⟶ 2,290:
for line in r:
print(line)
count += 1</langsyntaxhighlight>
 
Note that the above accesses a variable from both paths of execution. To be more in the spirit of the task, we can actually communicate the count from the printer to the reader:
{{works with|Python|2.5+}}
<langsyntaxhighlight lang="python">def reader():
for line in open('input.txt'):
yield line.rstrip()
Line 1,950 ⟶ 2,310:
r.send(count)
except StopIteration:
pass</langsyntaxhighlight>
 
=={{header|Racket}}==
Using thread mailboxes for communication between threads:
<lang racket>
<syntaxhighlight lang="racket">
(define reader->printer-channel (make-channel))
(define printer->reader-channel (make-channel)reader)
(for ([line (in-lines (open-input-file "input.txt"))])
(thread-send printer-thread line))
(thread-send printer-thread eof)
(printf "Number of lines: ~a\n" (thread-receive)))
 
(define (sync-line-counter filenameprinter)
(definethread-send (reader)-thread
(for/sum ([line (in-producer thread-receive eof)])
(define file-port (open-input-file filename))
(let loop ([line (read-linedisplayln file-port)]line)
(when (not (eof-object? line 1)))
(begin
(channel-put reader->printer-channel line)
(loop (read-line file-port)))))
(channel-put reader->printer-channel eof)
(let ([num-lines (channel-get printer->reader-channel)])
(printf "Number of lines printed = ~a~%" num-lines)))
(define (printer)
(define count 0)
(let loop ([line (channel-get reader->printer-channel)])
(when (not (eof-object? line))
(begin
(printf "~a~%" line)
(set! count (add1 count))
(loop (channel-get reader->printer-channel)))))
(channel-put printer->reader-channel count))
(thread reader)
(thread printer))
 
(define printer-thread (thread printer))
(sync-line-counter "input.txt")
(define reader-thread (thread reader))
</lang>
 
(for-each thread-wait
(list printer-thread reader-thread))
 
</syntaxhighlight>
 
 
Using synchronous channels for communication between threads:
<syntaxhighlight lang="racket">
(define (reader out-ch result-ch)
(for ([line (in-lines (open-input-file "input.txt"))])
(channel-put out-ch line))
(channel-put out-ch eof)
(printf "Number of lines: ~a\n" (channel-get result-ch)))
 
(define (printer in-ch result-ch)
(channel-put result-ch
(for/sum ([line (in-producer channel-get eof in-ch)])
(displayln line)
1)))
 
(define lines-ch (make-channel))
(define result-ch (make-channel))
(define printer-thread (thread (lambda () (printer lines-ch result-ch))))
(define reader-thread (thread (lambda () (reader lines-ch result-ch))))
 
(for-each thread-wait
(list printer-thread reader-thread))
</syntaxhighlight>
 
=={{header|Raku}}==
(formerly Perl 6)
{{works with|rakudo|2013-02-27}}
<syntaxhighlight lang="raku" line>sub MAIN ($infile) {
$infile.IO.lines ==> printer() ==> my $count;
say "printed $count lines";
}
 
sub printer(*@lines) {
my $lines;
for @lines {
.say;
++$lines;
}
$lines;
}</syntaxhighlight>
Concurrent units are established by use of the feed operator, which works much like an in-process object pipe. Since the final feed goes to a variable declaration that belongs to the outer thread, it serves as a backchannel from the printer thread. In this case the outer thread signals the desire for a line count by terminating the pipe to the printing thread.
(Note: soon these will be implemented with real threads in Raku, but this is currently emulated with coroutine semantics, aka lazy lists.)
 
=={{header|Raven}}==
<langsyntaxhighlight lang="raven">'input.txt' as src_file
 
class Queue
Line 2,013 ⟶ 2,406:
 
reader as r
writer as w</langsyntaxhighlight>
 
=={{header|Ruby}}==
The task is to refactor this program to use two concurrent units.
 
<langsyntaxhighlight lang="ruby">count = 0
IO.foreach("input.txt") { |line| print line; count += 1 }
puts "Printed #{count} lines."</langsyntaxhighlight>
 
The above program has no concurrency, because the printer is a block <tt>{ |line| print line; count += 1 }</tt> that can print only one line. (The reader calls block more than one time.) After the refactoring, the printer will print all lines, and the program will jump between the reader and the printer. For the refactoring, Fibers might be best, Continuations work if our Ruby is too old for Fibers, and Threads are another alternative.
Line 2,029 ⟶ 2,422:
 
{{works with|Ruby|1.9}}
<langsyntaxhighlight lang="ruby">count = 0
reader = Fiber.new do
IO.foreach("input.txt") { |line| Fiber.yield line }
Line 2,040 ⟶ 2,433:
print line
count += 1
end</langsyntaxhighlight>
 
* We create a fiber as the reader, and we use the current fiber as the printer.
Line 2,053 ⟶ 2,446:
 
{{libheader|continuation}}
<langsyntaxhighlight lang="ruby">require 'continuation' unless defined? Continuation
 
count = 0
Line 2,067 ⟶ 2,460:
print line
count += 1
end</langsyntaxhighlight>
 
* The above program uses continuations almost like fibers. The reader continues the printer, and the printer continues the reader.
Line 2,078 ⟶ 2,471:
 
{{libheader|thread}}
<langsyntaxhighlight lang="ruby">require 'thread'
 
counts = Queue.new
Line 2,103 ⟶ 2,496:
end
end
reader.join</langsyntaxhighlight>
 
* We create a thread as the reader, and use the current thread as the writer.
Line 2,112 ⟶ 2,505:
=={{header|Rust}}==
 
{{works with|rustc 1.446.0-nightly|f84d53ca0 2015(04488afe3 2020-0908-0624)|}}
 
<langsyntaxhighlight lang="rust">use std::fs::File;
use std::io::BufReader;
use std::io::BufRead;
use std::io::BufReader;
 
use std::threadsync::spawnmpsc::{channel, sync_channel};
use std::thread;
use std::sync::mpsc::{SyncSender, Receiver, sync_channel};
 
fn main() {
// The reader sends lines to the writer via an async channel, so the reader is never blocked.
let (tx, rx): (SyncSender<String>, Receiver<String>) = sync_channel::<String>(0);
let (reader_send, writer_recv) = channel();
 
// The writer sends the final count via a blocking channel with bound 0,
// Reader thread.
// meaning the buffer is exactly the size of the result.
spawn(move || {
let (writer_send, reader_recv) = sync_channel(0);
let file = File::open("input.txt").unwrap();
 
// Define the work the reader will do.
let reader_work = move || {
let file = File::open("input.txt").expect("Failed to open input.txt");
let reader = BufReader::new(file);
 
for line in reader.lines() {
match line {
Ok(msg) => tx.send(msg).unwrap(),reader_send
Err(e) => println!("{}", e .send(msg)
.expect("Failed to send via the channel"),
Err(e) => println!("{}", e),
}
}
 
// Dropping the sender disconnects it and tells the receiver the connection is closed.
drop(tx);
} drop(reader_send);
 
// Now that we've sent all the lines,
// Writer thread.
// block until the writer gives us the final count.
spawn(move || {
let mut loop_count: u16count = 0;reader_recv
.recv()
.expect("Failed to receive count from printer.");
 
loop println!("{}", count);
};
let recvd = rx.recv();
 
// Define the work the writer will match recvd {do.
let writer_work = move || {
let mut line_count = 0;
 
loop {
match writer_recv.recv() {
Ok(msg) => {
println!("{}", msg);
loop_countline_count += 1;
},
Err(_) => break, // rx.recv()indicates willthe onlyconnection errhas whenbeen txclosed isby closedthe sender.
}
}
 
println!("Line// Send the final count: {}",back to the loop_count);reader.
writer_send
}).join().unwrap();
.send(line_count)
}</lang>
.expect("Failed to send line count from writer.");
 
drop(writer_send);
};
 
// Spawn each as a thread.
let reader_handle = thread::spawn(reader_work);
thread::spawn(writer_work);
 
reader_handle
.join()
.expect("Failed to join the reader thread.");
}</syntaxhighlight>
 
=={{header|Scala}}==
[[Category:Scala examples needing attention]]
A possible implementation using Actors
<langsyntaxhighlight lang="scala">case class HowMany(asker: Actor)
 
val printer = actor {
Line 2,185 ⟶ 2,605:
}
 
reader(printer)</langsyntaxhighlight>
 
=={{header|Swift}}==
Using Grand Central Dispatch and NSNotificationCenter<br><br>
'''Reader.swift'''
<syntaxhighlight lang="swift">//
<lang Swift>//
// Reader.swift
//
Line 2,233 ⟶ 2,653:
}
}
}</langsyntaxhighlight>
'''Printer.swift'''
<syntaxhighlight lang="swift">//
<lang Swift>//
// Printer.swift
//
Line 2,272 ⟶ 2,692:
}
}
}</langsyntaxhighlight>
'''main.swift'''
<syntaxhighlight lang="swift">//
<lang Swift>//
// main.swift
//
Line 2,291 ⟶ 2,711:
 
CFRunLoopRun()
</syntaxhighlight>
</lang>
 
=={{header|SystemVerilog}}==
 
<langsyntaxhighlight SystemVeriloglang="systemverilog">program main;
 
mailbox#(bit) p2c_cmd = new;
Line 2,331 ⟶ 2,751:
end
 
endprogram</langsyntaxhighlight>
 
=={{header|Tcl}}==
Uses the Thread package.
<langsyntaxhighlight lang="tcl">package require Thread
 
# Define the input thread
Line 2,362 ⟶ 2,782:
 
# Connect everything together and start the processing
thread::send $input [list readFile "input.txt" $output]</langsyntaxhighlight>
 
=={{header|TXR}}==
Line 2,372 ⟶ 2,792:
To get things going, we resume the producer via <code>pro.(resume)</code>, because we started that in a suspended state. This is actually not necessary; if we remove the <code>suspended t</code> from the <code>new</code> expression which instantiates the producer, we can remove this line. However, this means that the body of the <code>let</code> doesn't receive control. The producer finishes producing and then the <code>pro</code> variable is bound, and the final <code>(put-line ...)</code> expression evaluates. Starting the producer suspended lets us insert some logic prior to dispatching the producer. We implicitly start the consumer, though, because it immediately suspends to wait for an item, saving its context in a continuation and relinquishing control.
 
<langsyntaxhighlight lang="txrlisp">(defstruct thread nil
suspended
cont
Line 2,404 ⟶ 2,824:
(pro (new producer suspended t consumer con)))
pro.(resume)
(put-line `count = @{con.count}`))</langsyntaxhighlight>
 
=={{header|UnixPipes}}==
Line 2,415 ⟶ 2,835:
forks.
 
<langsyntaxhighlight lang="bash">rm -f node ; mkfifo node
cat file | tee >(wc -l > node ) | cat - node</langsyntaxhighlight>
 
=={{header|Visual Basic .NET}}==
This can be improved by adding a blocking Dequeue instead of spinning on TryDequeue.
 
<langsyntaxhighlight lang="vbnet">Imports System.Threading
 
Module Module1
Line 2,485 ⟶ 2,905:
End SyncLock
End Sub
End Class</langsyntaxhighlight>
 
=={{header|Wren}}==
<syntaxhighlight lang="wren">import "io" for File
var EOT = "\x04"
 
var readLines = Fiber.new { |fileName|
var file = File.open(fileName)
var offset = 0
var line = ""
while (true) {
var b = file.readBytes(1, offset)
offset = offset + 1
if (b == "\n") {
Fiber.yield(line)
line = "" // reset line variable
} else if (b == "\r") { // Windows
// wait for following "\n"
} else if (b == "") { // end of stream
var numLines = Fiber.yield(EOT)
System.print("Number of lines read = %(numLines)")
break
} else {
line = line + b
}
}
file.close()
}
 
var numLines = 0
while(true) {
var line = readLines.call("input.txt")
if (line != EOT) {
System.print(line)
numLines = numLines + 1
} else {
readLines.call(numLines)
break
}
}</syntaxhighlight>
 
{{out}}
input.txt contains the following 4 lines:
<pre>
line 1
line 2
line 3
line 4
Number of lines read = 4
</pre>
 
=={{header|zkl}}==
<langsyntaxhighlight lang="zkl">fcn reader(fileName,out){
n:=0; foreach line in (File(fileName)) { out.write(line); n+=1; }
out.close(); // signal done
Line 2,502 ⟶ 2,972:
p:=Thread.Pipe(); // NOT Unix pipes, thread safe channel between threads
reader.launch("input.txt",p);
writer.future(p).noop(); // noop forces eval, ie sleep until writer finished</langsyntaxhighlight>
Light on error handling, easily fixed.
{{out}}
9,476

edits