Synchronous concurrency: Difference between revisions

Content added Content deleted
(Rename Perl 6 -> Raku, alphabetize, minor clean-up)
Line 409: Line 409:
return 0;
return 0;
}</lang>
}</lang>

=={{header|C sharp}}==
<lang csharp>using System;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.IO;

namespace SynchronousConcurrency
{
class Program
{
static void Main(string[] args)
{
BlockingCollection<string> toWriterTask = new BlockingCollection<string>();
BlockingCollection<int> fromWriterTask = new BlockingCollection<int>();
Task writer = Task.Factory.StartNew(() => ConsoleWriter(toWriterTask, fromWriterTask));
Task reader = Task.Factory.StartNew(() => FileReader(fromWriterTask, toWriterTask));
Task.WaitAll(writer, reader);
}
static void ConsoleWriter(BlockingCollection<string> input, BlockingCollection<int> output)
{
int nLines = 0;
string line;
while ((line = input.Take()) != null)
{
Console.WriteLine(line);
++nLines;
}
output.Add(nLines);
}
static void FileReader(BlockingCollection<int> input, BlockingCollection<string> output)
{
StreamReader file = new StreamReader("input.txt"); // TODO: check exceptions
string line;
while ((line = file.ReadLine()) != null)
{
output.Add(line);

}
output.Add(null); // EOF
Console.WriteLine("line count: " + input.Take());
}
}
}</lang>
{{out}}
<pre>
foo
bar
baz
xenu 666
line count: 4
</pre>


=={{header|C++}}==
=={{header|C++}}==
Line 486: Line 538:


Printed 9 lines
Printed 9 lines
</pre>

=={{header|C sharp}}==
<lang csharp>using System;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.IO;

namespace SynchronousConcurrency
{
class Program
{
static void Main(string[] args)
{
BlockingCollection<string> toWriterTask = new BlockingCollection<string>();
BlockingCollection<int> fromWriterTask = new BlockingCollection<int>();
Task writer = Task.Factory.StartNew(() => ConsoleWriter(toWriterTask, fromWriterTask));
Task reader = Task.Factory.StartNew(() => FileReader(fromWriterTask, toWriterTask));
Task.WaitAll(writer, reader);
}
static void ConsoleWriter(BlockingCollection<string> input, BlockingCollection<int> output)
{
int nLines = 0;
string line;
while ((line = input.Take()) != null)
{
Console.WriteLine(line);
++nLines;
}
output.Add(nLines);
}
static void FileReader(BlockingCollection<int> input, BlockingCollection<string> output)
{
StreamReader file = new StreamReader("input.txt"); // TODO: check exceptions
string line;
while ((line = file.ReadLine()) != null)
{
output.Add(line);

}
output.Add(null); // EOF
Console.WriteLine("line count: " + input.Take());
}
}
}</lang>
{{out}}
<pre>
foo
bar
baz
xenu 666
line count: 4
</pre>
</pre>


Line 1,127: Line 1,127:
task_yield()
task_yield()
end while</lang>
end while</lang>

=={{header|F_Sharp|F#}}==

This code will read lines from the file on one thread, and print them to the console on one or more other
threads from the ThreadPool, using a MailboxProcessor for lock-free communication between threads and tracking the line count without the use of mutable state.

<lang fsharp>
open System.IO

type Msg =
| PrintLine of string
| GetCount of AsyncReplyChannel<int>

let printer =
MailboxProcessor.Start(fun inbox ->
let rec loop count =
async {
let! msg = inbox.Receive()
match msg with
| PrintLine(s) ->
printfn "%s" s
return! loop (count + 1)
| GetCount(reply) ->
reply.Reply(count)
return! loop count
}
loop 0
)

let reader (printAgent:MailboxProcessor<Msg>) file =
File.ReadLines(file)
|> Seq.iter (fun line -> PrintLine line |> printAgent.Post)
printAgent.PostAndReply(fun reply -> GetCount(reply))
|> printfn "Lines written: %i"

reader printer @"c:\temp\input.txt"
</lang>


=={{header|Forth}}==
=={{header|Forth}}==
Line 1,227: Line 1,264:
LINES: 59
LINES: 59
</pre>
</pre>

=={{header|F_Sharp|F#}}==

This code will read lines from the file on one thread, and print them to the console on one or more other
threads from the ThreadPool, using a MailboxProcessor for lock-free communication between threads and tracking the line count without the use of mutable state.

<lang fsharp>
open System.IO

type Msg =
| PrintLine of string
| GetCount of AsyncReplyChannel<int>

let printer =
MailboxProcessor.Start(fun inbox ->
let rec loop count =
async {
let! msg = inbox.Receive()
match msg with
| PrintLine(s) ->
printfn "%s" s
return! loop (count + 1)
| GetCount(reply) ->
reply.Reply(count)
return! loop count
}
loop 0
)

let reader (printAgent:MailboxProcessor<Msg>) file =
File.ReadLines(file)
|> Seq.iter (fun line -> PrintLine line |> printAgent.Post)
printAgent.PostAndReply(fun reply -> GetCount(reply))
|> printfn "Lines written: %i"

reader printer @"c:\temp\input.txt"
</lang>


=={{header|Go}}==
=={{header|Go}}==
Line 1,985: Line 1,985:
$reader->join;
$reader->join;
$printer->join;</lang>
$printer->join;</lang>

=={{header|Perl 6}}==
{{works with|rakudo|2013-02-27}}
<lang perl6>sub MAIN ($infile) {
$infile.IO.lines ==> printer() ==> my $count;
say "printed $count lines";
}

sub printer(*@lines) {
my $lines;
for @lines {
.say;
++$lines;
}
$lines;
}</lang>
Concurrent units are established by use of the feed operator, which works much like an in-process object pipe. Since the final feed goes to a variable declaration that belongs to the outer thread, it serves as a backchannel from the printer thread. In this case the outer thread signals the desire for a line count by terminating the pipe to the printing thread.
(Note: soon these will be implemented with real threads in Perl 6, but this is currently emulated with coroutine semantics, aka lazy lists.)


=={{header|Phix}}==
=={{header|Phix}}==
Line 2,108: Line 2,090:
(inc 'Cnt) ) # Increment count
(inc 'Cnt) ) # Increment count
(yield Cnt 'unit1) ) ) # Send count to 'unit1'</lang>
(yield Cnt 'unit1) ) ) # Send count to 'unit1'</lang>



=={{header|Pony}}==
=={{header|Pony}}==
Line 2,145: Line 2,126:


Actors are scheduled asynchronously, but messages (implemented via the behaviours) are guaranteed to arrive in causal ordering.
Actors are scheduled asynchronously, but messages (implemented via the behaviours) are guaranteed to arrive in causal ordering.



=={{header|PureBasic}}==
=={{header|PureBasic}}==
Line 2,311: Line 2,291:
(list printer-thread reader-thread))
(list printer-thread reader-thread))
</lang>
</lang>

=={{header|Raku}}==
(formerly Perl 6)
{{works with|rakudo|2013-02-27}}
<lang perl6>sub MAIN ($infile) {
$infile.IO.lines ==> printer() ==> my $count;
say "printed $count lines";
}

sub printer(*@lines) {
my $lines;
for @lines {
.say;
++$lines;
}
$lines;
}</lang>
Concurrent units are established by use of the feed operator, which works much like an in-process object pipe. Since the final feed goes to a variable declaration that belongs to the outer thread, it serves as a backchannel from the printer thread. In this case the outer thread signals the desire for a line count by terminating the pipe to the printing thread.
(Note: soon these will be implemented with real threads in Perl 6, but this is currently emulated with coroutine semantics, aka lazy lists.)


=={{header|Raven}}==
=={{header|Raven}}==