Stopping a Rust Worker
This is a small post about a specific pattern for cancellation in the Rust programming language. The pattern is simple and elegant, but it’s rather difficult to come up with it by yourself.
Introducing a worker
To be able to stop a worker, we need to have one in the first place! So, let’s implement a model program.
The task is to read the output line-by-line, sending these lines to another thread for processing (echoing the line back, with ❤️). My solution looks like this:
use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;
fn main() {
let worker = spawn_worker();
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
worker.send(Msg::Echo(line))
.unwrap();
}
println!("Bye!");
}
enum Msg {
Echo(String),
}
fn spawn_worker() -> Sender<Msg> {
let (tx, rx) = channel();
thread::spawn(move || {
loop {
let msg = rx.recv().unwrap();
match msg {
Msg::Echo(msg) => println!("{} ❤️", msg),
}
}
});
tx
}
The program seems to work:
$ cargo r
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target/debug/worker`
hello
hello ❤️
world
world ❤️
Bye!
Stopping the worker, the obvious way
Now that we have a worker, let’s add a new requirement.
When the user types stop
, the worker (but not the program itself) should be halted.
How can we do this? The most obvious way is to add a new variant, Stop
, to the Msg
enum, and break out of the worker’s loop:
use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;
fn main() {
let worker = spawn_worker();
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
let msg = if line == "stop" {
Msg::Stop
} else {
Msg::Echo(line)
};
worker.send(msg)
.unwrap();
}
println!("Bye!");
}
enum Msg {
Echo(String),
Stop,
}
fn spawn_worker() -> Sender<Msg> {
let (tx, rx) = channel();
thread::spawn(move || {
loop {
let msg = rx.recv().unwrap();
match msg {
Msg::Echo(msg) => println!("{} ❤️", msg),
Msg::Stop => break,
}
}
println!("The worker has stopped!");
});
tx
}
This works, but only partially:
$ cargo r
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target/debug/worker`
hello
hello ❤️
stop
The worker has stopped!
world
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /checkout/src/libcore/result.rs:916:5
note: Run with `RUST_BACKTRACE=1` for a backtrace.
We can add more code to fix the panic, but let’s stop for a moment and try to invent a more elegant way to stop the worker. The answer will be below this beautiful Ukiyo-e print :-)

Dropping the microphone
The answer is: the cleanest way to cancel something in Rust is to drop it.
For our task, we can stop the worker by dropping the Sender
:
use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;
fn main() {
let mut worker = Some(spawn_worker());
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
if line == "stop" {
drop(worker.take());
continue
};
if let Some(ref worker) = worker {
worker.send(Msg::Echo(line)).unwrap();
} else {
println!("The worker has been stopped!");
};
}
println!("Bye!");
}
enum Msg {
Echo(String),
}
fn spawn_worker() -> Sender<Msg> {
let (tx, rx) = channel();
thread::spawn(move || {
while let Ok(msg) = rx.recv() {
match msg {
Msg::Echo(msg) => println!("{} ❤️", msg),
}
}
println!("The worker has stopped!");
});
tx
}
Note the interesting parts of the solution:
- no need to invent an additional message type,
-
the
Sender
is stored inside anOption
, so that we can drop it with the.take
method, -
the
Option
forces us to check if the worker is alive before sending a message.
More generally, previously the worker had two paths for termination: a normal
termination via the Stop
message and an abnormal termination after a panic
in recv
(which might happen if the parent thread panics and drops the Sender
).
Now there is a single code path for both cases. That means we can be surer that if
something somewhere dies with a panic then the shutdown will proceed in an
orderly fashion, it is not a special case anymore.
The only thing left to make this ultimately neat is to replace a hand-written while let
with a for
loop:
for msg in rx {
match msg {
Msg::Echo(msg) => println!("{} ❤️", msg),
}
}
Am I awaited?
It’s interesting to see that the same pattern applies to the async version of the solution as well.
Async baseline:
extern crate futures; // [dependencies] futures = "0.1"
use std::io::BufRead;
use std::thread;
use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};
fn main() {
let mut worker = spawn_worker();
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
worker = worker.send(Msg::Echo(line)).wait().unwrap();
}
println!("Bye!");
}
enum Msg {
Echo(String),
}
fn spawn_worker() -> Sender<Msg> {
let (tx, rx) = channel(1);
thread::spawn(move || {
rx.for_each(|msg| {
match msg {
Msg::Echo(msg) => println!("{} ❤️", msg),
}
Ok(())
}).wait().unwrap()
});
tx
}
Async with a termination message:
extern crate futures; // [dependencies] futures = "0.1"
use std::io::BufRead;
use std::thread;
use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};
fn main() {
let mut worker = spawn_worker();
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
let msg = if line == "stop" {
Msg::Stop
} else {
Msg::Echo(line)
};
worker = worker.send(msg).wait().unwrap();
}
println!("Bye!");
}
enum Msg {
Echo(String),
Stop,
}
fn spawn_worker() -> Sender<Msg> {
let (tx, rx) = channel(1);
thread::spawn(move || {
let _ = rx.for_each(|msg| {
match msg {
Msg::Echo(msg) => {
println!("{} ❤️", msg);
Ok(())
},
Msg::Stop => Err(()),
}
}).then(|result| {
println!("The worker has stopped!");
result
}).wait();
});
tx
}
$ cargo r
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target/debug/worker`
hello
hello ❤️
stop
The worker has stopped!
world
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: SendError("...")', /checkout/src/libcore/result.rs:916:5
note: Run with `RUST_BACKTRACE=1` for a backtrace.
Async with drop:
extern crate futures; // [dependencies] futures = "0.1"
use std::io::BufRead;
use std::thread;
use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};
fn main() {
let mut worker = Some(spawn_worker());
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
if line == "stop" {
drop(worker.take());
continue;
};
if let Some(w) = worker {
worker = Some(w.send(Msg::Echo(line)).wait().unwrap())
} else {
println!("The worker has been stopped!");
}
}
println!("Bye!");
}
enum Msg {
Echo(String),
}
fn spawn_worker() -> Sender<Msg> {
let (tx, rx) = channel(1);
thread::spawn(move || {
rx.for_each(|msg| {
match msg {
Msg::Echo(msg) => println!("{} ❤️", msg),
}
Ok(())
}).map(|()| {
println!("The worker has stopped!");
}).wait().unwrap();
});
tx
}
$ cargo r
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target/debug/worker`
hello
hello ❤️
stop
The worker has stopped!
world
The worker has been stopped!
Bye!
Conclusion
So, yeah, this all was written just to say “in Rust, cancellation is drop
” :-)
Discussion on /r/rust.