Concurrency with Channels and Processes
Flix supports CSP-style concurrency with channels and processes inspired by Go and Rust.
Spawning Processes
We can spawn a process with the spawn keyword:
spawn (1 + 2)
This spawns a process that computes 1 + 2 and
throws the result away.
The spawn expression always returns Unit.
We can spawn any expression, but we typically spawn
functions to run in a new process:
def sum(x: Int32, y: Int32): Int32 = x + y
def main(): Unit \ IO = spawn sum(1, 2)
Communicating with Channels
To communicate between processes we use channels. A channel allows two or more processes to exchange data by sending immutable messages to each other.
A channel comes in one of two variants: buffered or unbuffered.
A buffered channel has a size, set at creation time, and can hold that many messages. If a process attempts to put a message into a buffered channel that is full, then the process is blocked until space becomes available. If, on the other hand, a process attempts to get a message from an empty channel, the process is blocked until a message is put into the channel.
An unbuffered channel works like a buffered channel of size zero; for a get and a put to happen both processes must rendezvous (block) until the message is passed from sender to receiver.
Here is an example of sending and receiving a message over a channel:
def main(): Int32 \ IO =
let (s, r) = Channel.unbuffered();
spawn Channel.send(42, s);
Channel.recv(r)
Here the main function creates an unbuffered
channel which returns Sender s and a Receiver r channels,
spawns the send function, and waits
for a message from the channel.
As the example shows, a channel consists of two end points:
the Sender and the Receiver. As one would expect,
messages can only be send using the Sender, and only
received using the Receiver.
Selecting on Channels
We can use the select expression to receive a
message from a collection of channels.
For example:
def meow(s: Sender[String]): Unit \ IO = Channel.send("Meow!", s)
def woof(s: Sender[String]): Unit \ IO = Channel.send("Woof!", s)
def main(): Unit \ IO =
let (s1, r1) = Channel.buffered(1);
let (s2, r2) = Channel.buffered(1);
spawn meow(s1);
spawn woof(s2);
select {
case m <- recv(r1) => m
case m <- recv(r2) => m
} |> println
Many important concurrency patterns such as
producer-consumer and load balancers can be expressed
using the select expression.
Selecting with Default
In some cases, we do not want to block until a message arrives, potentially waiting forever. Instead, we want to take some alternative action if no message is readily available. We can achieve this with a default case as shown below:
def main(): String \ IO =
let (_, r1) = Channel.buffered(1);
let (_, r2) = Channel.buffered(1);
select {
case _ <- recv(r1) => "one"
case _ <- recv(r2) => "two"
case _ => "default"
}
Here a message is never sent to r1 nor r2.
The select expression tries all cases, and if no
channel is ready, it immediately selects the default
case.
Hence using a default case prevents the select
expression from blocking forever.
Selecting with Timeouts
As an alternative to a default case, we can use
tickers and timers to wait for pre-defined
periods of time inside a select expression.
For example, here is a program that has a slow
function that takes a minute to send a message on
a channel, but the select expression relies on
Channel.timeout to only wait 5 seconds before
giving up:
def slow(s: Sender[String]): Unit \ IO =
Thread.sleep(Time/Duration.fromSeconds(60));
Channel.send("I am very slow", s)
def main(): Unit \ IO =
let (s, r) = Channel.buffered(1);
spawn slow(s);
let timeout = Channel.timeout(Time/Duration.fromSeconds(5));
select {
case m <- recv(r) => m
case _ <- recv(timeout) => "timeout"
} |> println
This program prints the string "timeout" after five
seconds.