Stopping a Rust Worker

This is a small post about a specific pattern for cancellation in the Rust programming language. The pattern is simple and elegant, but its rather difficult to come up with it by yourself.

Introducing a worker

To be able to stop a worker, we need to have one in the first place! So, lets implement a model program.

The task is to read the output line-by-line, sending these lines to another thread for processing (echoing the line back, with ❤️). My solution looks like this:

use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;

fn main() {
    let worker = spawn_worker();

    let stdin = ::std::io::stdin();
    for line in stdin.lock().lines() {
        let line = line.unwrap();
        worker.send(Msg::Echo(line))
            .unwrap();
    }

    println!("Bye!");
}

enum Msg {
    Echo(String),
}

fn spawn_worker() -> Sender<Msg> {
    let (tx, rx) = channel();
    thread::spawn(move || {
        loop {
            let msg = rx.recv().unwrap();
            match msg {
                Msg::Echo(msg) => println!("{} ❤️", msg),
            }
        }
    });
    tx
}

The program seems to work:

$ cargo r
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/worker`
hello
hello ❤️
world
world ❤️
Bye!

Stopping the worker, the obvious way

Now that we have a worker, lets add a new requirement.

When the user types stop, the worker (but not the program itself) should be halted.

How can we do this? The most obvious way is to add a new variant, Stop, to the Msg enum, and break out of the workers loop:

use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;

fn main() {
    let worker = spawn_worker();

    let stdin = ::std::io::stdin();
    for line in stdin.lock().lines() {
        let line = line.unwrap();
        let msg = if line == "stop" {
            Msg::Stop
        } else {
            Msg::Echo(line)
        };

        worker.send(msg)
            .unwrap();
    }

    println!("Bye!");
}

enum Msg {
    Echo(String),
    Stop,
}

fn spawn_worker() -> Sender<Msg> {
    let (tx, rx) = channel();
    thread::spawn(move || {
        loop {
            let msg = rx.recv().unwrap();
            match msg {
                Msg::Echo(msg) => println!("{} ❤️", msg),
                Msg::Stop => break,
            }
        }
        println!("The worker has stopped!");
    });
    tx
}

This works, but only partially:

$ cargo r
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/worker`
hello
hello ❤️
stop
The worker has stopped!
world
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /checkout/src/libcore/result.rs:916:5
note: Run with `RUST_BACKTRACE=1` for a backtrace.

We can add more code to fix the panic, but lets stop for a moment and try to invent a more elegant way to stop the worker. The answer will be below this beautiful Ukiyo-e print :-)

Dropping the microphone

The answer is: the cleanest way to cancel something in Rust is to drop it. For our task, we can stop the worker by dropping the Sender:

use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;

fn main() {
    let mut worker = Some(spawn_worker());

    let stdin = ::std::io::stdin();
    for line in stdin.lock().lines() {
        let line = line.unwrap();
        if line == "stop" {
            drop(worker.take());
            continue
        };

        if let Some(ref worker) = worker {
            worker.send(Msg::Echo(line)).unwrap();
        } else {
            println!("The worker has been stopped!");
        };
    }

    println!("Bye!");
}

enum Msg {
    Echo(String),
}

fn spawn_worker() -> Sender<Msg> {
    let (tx, rx) = channel();
    thread::spawn(move || {
        while let Ok(msg) = rx.recv() {
            match msg {
                Msg::Echo(msg) => println!("{} ❤️", msg),
            }
        }
        println!("The worker has stopped!");
    });
    tx
}

Note the interesting parts of the solution:

  • no need to invent an additional message type,
  • the Sender is stored inside an Option, so that we can drop it with the .take method,
  • the Option forces us to check if the worker is alive before sending a message.

More generally, previously the worker had two paths for termination: a normal termination via the Stop message and an abnormal termination after a panic in recv (which might happen if the parent thread panics and drops the Sender). Now there is a single code path for both cases. That means we can be surer that if something somewhere dies with a panic then the shutdown will proceed in an orderly fashion, it is not a special case anymore.

The only thing left to make this ultimately neat is to replace a hand-written while let with a for loop:

for msg in rx {
    match msg {
        Msg::Echo(msg) => println!("{} ❤️", msg),
    }
}

Am I awaited?

Its interesting to see that the same pattern applies to the async version of the solution as well.

Async baseline:

extern crate futures; // [dependencies] futures = "0.1"

use std::io::BufRead;
use std::thread;

use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};

fn main() {
    let mut worker = spawn_worker();

    let stdin = ::std::io::stdin();
    for line in stdin.lock().lines() {
        let line = line.unwrap();
        worker = worker.send(Msg::Echo(line)).wait().unwrap();
    }

    println!("Bye!");
}

enum Msg {
    Echo(String),
}

fn spawn_worker() -> Sender<Msg> {
    let (tx, rx) = channel(1);
    thread::spawn(move || {
        rx.for_each(|msg| {
            match msg {
                Msg::Echo(msg) => println!("{} ❤️", msg),
            }
            Ok(())
        }).wait().unwrap()
    });
    tx
}

Async with a termination message:

extern crate futures; // [dependencies] futures = "0.1"

use std::io::BufRead;
use std::thread;

use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};

fn main() {
    let mut worker = spawn_worker();

    let stdin = ::std::io::stdin();
    for line in stdin.lock().lines() {
        let line = line.unwrap();
        let msg = if line == "stop" {
            Msg::Stop
        } else {
            Msg::Echo(line)
        };
        worker = worker.send(msg).wait().unwrap();
    }

    println!("Bye!");
}

enum Msg {
    Echo(String),
    Stop,
}

fn spawn_worker() -> Sender<Msg> {
    let (tx, rx) = channel(1);
    thread::spawn(move || {
        let _ = rx.for_each(|msg| {
            match msg {
                Msg::Echo(msg) => {
                    println!("{} ❤️", msg);
                    Ok(())
                },
                Msg::Stop => Err(()),
            }
        }).then(|result| {
            println!("The worker has stopped!");
            result
        }).wait();
    });
    tx
}
$ cargo r
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/worker`
hello
hello ❤️
stop
The worker has stopped!
world
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: SendError("...")', /checkout/src/libcore/result.rs:916:5
note: Run with `RUST_BACKTRACE=1` for a backtrace.

Async with drop:

extern crate futures; // [dependencies] futures = "0.1"

use std::io::BufRead;
use std::thread;

use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};

fn main() {
    let mut worker = Some(spawn_worker());

    let stdin = ::std::io::stdin();
    for line in stdin.lock().lines() {
        let line = line.unwrap();
        if line == "stop" {
            drop(worker.take());
            continue;
        };

        if let Some(w) = worker {
            worker = Some(w.send(Msg::Echo(line)).wait().unwrap())
        } else {
            println!("The worker has been stopped!");
        }
    }

    println!("Bye!");
}

enum Msg {
    Echo(String),
}

fn spawn_worker() -> Sender<Msg> {
    let (tx, rx) = channel(1);
    thread::spawn(move || {
        rx.for_each(|msg| {
            match msg {
                Msg::Echo(msg) => println!("{} ❤️", msg),
            }
            Ok(())
        }).map(|()| {
            println!("The worker has stopped!");
        }).wait().unwrap();
    });
    tx
}
$ cargo r
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/worker`
hello
hello ❤️
stop
The worker has stopped!
world
The worker has been stopped!
Bye!

Conclusion

So, yeah, this all was written just to say in Rust, cancellation is drop :-)

Discussion on /r/rust.