Scalar Select Anti-Pattern

I’ve written a number of stateful services starting with an event loop at the core:

async for event in events_incoming:
    await process(event)

I had to refactor this loop later, every time. For an example, see the direct cause for the article, this TigerBeetle PR. Let me write the refactoring down, so that I get it right from the get go next time!

Scalar Select

Let’s say I am implementing an LSP server for some programming language. There’s going to be three main sources of events:

  • file modifications from user typing code in or git operations,
  • requests from the client (“give me completions!”),
  • output from compilation jobs running in the background with error messages and such.

The “obvious” event loop setup for this case looks like this:

events_incoming: Stream[Event] = merge(
    events_file_system,
    events_compiler,
    events_lsp,
)

async for event in events_incoming:
    await process(event)

Here, merge is an operator that takes a number of event streams, and merges them into one. This is a

loop {
    select! {
        ...
    }
}

written with higher-order functions.

Key Observation

Crucially, event streams are external to the process and are driven by the outside IO. You don’t really know or have control over when the user is typing!

And process(event) takes time. This means that when we’ve finished processing the current event, there might be several events “ready”, already sitting in the address space of our process. Our “scalar select” will pick an arbitrary one, and that might create a lot of overhead.

Implications

Here are some specific optimizations you can apply if you don’t ignore the fact that multiple events are available at the same time after the delay induced by processing the previous event.

Prioritization

The most obvious one, we can pick the order in which to process events. For the LSP example, if you have a code completion request, and a file modification request, you want to process file modification first. The rule-of-thumb priority is writes over reads over accepts (of new clients).

Selective Backpressure

As an extreme form of prioritization, you can decide to not process a particular kind of request at all, exerting backpressure against a specific input, while allowing other inputs to proceed.

Elimination

Often, a request can be dropped completely depending on subsequent events. For example, if there’s a file modification that completely replaces its text, all preceding changes can be safely dropped.

Coalescing

Finally, even if it is not possible to completely eliminate the request, often it is more efficient to process several requests at the same time. For example, if we have two incremental file modification events (like “insert 'hello' at offset 92”), it makes sense to union them into one large change first. An LSP server will kick off a job to compute diagnostics after applying modifications. But if we have two modifications, we want to apply them both before starting a single diagnostics job.

Data Oriented All The Things

Once you see the problem (the hard part), the solution is as expected: always be batching, forget the singulars, push the fors down, become multitude!

In other words, we want to change our scalar select that gives us a single event at a time into a batched one, which gives all the events already received. Under low load, we’ll be getting singleton batches. But as the load increases, the batch size will grow, increasing our load sublinearly!

So, something like this:

events_incoming: Stream[Event] = merge(
    events_file_system,
    events_compiler,
    events_lsp,
)

events_incoming_batched: Stream[List[Event]] =
    batch_stream(events_incoming)

async for event_batch in events_incoming_batched:
    batch: List[Event] = coalesce(event_batch)
    for event in batch:
        await process(event)

The secret sauce is the batch_stream function which waits until at least one event is available, but pulls all available events:

async def batch_stream(
    stream: Stream[T]
) -> Stream[List[T]]:

    while True:
        event: T = await stream.next()
        batch: List[T] = [event]
        while event := stream.next_non_blocking():
            batch.append(event)
        yield batch

Always be batching when messages passing!