Blog

Implementing a Lock-Free Channel in Rust for Databend’s Query Pipeline

avatarZhiHanZAug 20, 2025
Implementing a Lock-Free Channel in Rust for Databend’s Query Pipeline

A practical look at Databend's lock-free channel

When a SQL query runs in Databend, it becomes a directed graph of operators—TableScan, Filter, Join, Aggregation—running in parallel and passing data blocks. This post describes the lock-free channel that connects those operators: a simple design that stores three state flags in the unused bits of a pointer.

This isn't a performance victory lap. It's a look at one practical choice: moving large blocks between stages with minimal allocation and without traditional locks—why we use it, how it works, and when it helps.

Part 1: Background and Context

What Problem Are We Solving?

Databend processes SQL queries using a pipeline architecture inspired by the "Morsel-Driven Parallelism" paper from TU Munich. The core idea is simple: break query execution into small, cache-friendly chunks (called morsels in the paper, DataBlocks in our implementation) and process them through a pipeline of operators.

Consider this query:

SELECT customer_type, SUM(amount) 
FROM orders
WHERE date > '2024-01-01'
GROUP BY customer_type

Internally, this becomes a pipeline:

TableScan → Filter → PartialAggregate → Exchange → FinalAggregate

Each operator processes DataBlocks—typically containing 8,192 to 65,536 rows of columnar data, ranging from 64KB to 10MB in size. The challenge? These operators run at different speeds. A TableScan might read from fast NVMe storage, while a complex aggregation might be CPU-bound. We need a way for them to communicate that:

  1. Provides backpressure: A fast producer (like a
    TableScan
    ) must wait for a slow consumer (like a complex
    Filter
    ) instead of sending data that the consumer can't handle, which would waste memory. We need a way to signal "I'm full, please wait."
  2. Minimizes allocations: Creating and destroying memory blocks for every message is slow. Since we're moving gigabytes of data, we want to reuse memory as much as possible.
  3. Avoids lock contention: When multiple threads try to access the same data, they often use locks to prevent errors. But if many threads are waiting for the same lock, they form a queue, and performance suffers. This is lock contention.
  4. Enables status checks: An operator needs to peek at the channel's state—is there data for me? is the consumer ready?—without having to stop and wait (block).

Design Constraints

Our solution is tailored to specific constraints:

  • Single producer, single consumer (SPSC): Each connection has exactly one sender and one receiver. This is a huge simplification compared to general-purpose channels that might need to support multiple senders (MPSC) or multiple receivers (MPMC). By narrowing our scope to SPSC, we can create a much simpler and faster design.
  • Large data blocks - We're moving 64KB-10MB chunks, not individual messages
  • Direct handoff - No buffering between operators
  • Shared memory - All operators run in the same process

These constraints are different from general-purpose channels, which typically support multiple producers/consumers, small messages, and cross-process communication. By accepting these limitations, we can optimize for our specific use case.

Part 2: The Architecture

Pipeline Structure

Here’s how Databend structures its pipeline.:

pub struct Node {
pub proc: ProcessorPtr, // The actual operator
pub inputs: Vec<Arc<InputPort>>, // Where data comes from
pub outputs: Vec<Arc<OutputPort>>, // Where data goes to
}

pub struct Edge {
pub input_index: usize,
pub output_index: usize,
}

The pipeline is a directed graph where:

  • Nodes are processors (operators that transform data)
  • Edges are port connections (data flow paths)

Each edge connects exactly one OutputPort to one InputPort, creating a dedicated communication channel between two operators.

The Event-Driven Model

Processors don't run continuously. Instead, they implement a state machine that the scheduler queries. From

src/query/pipeline/core/src/processors/processor.rs
:

pub enum Event {
NeedData, // Waiting for input
NeedConsume, // Has output ready, waiting for consumer
Sync, // Can process synchronously now
Async, // Needs to do async I/O
Finished, // Stream completed
}

pub trait Processor: Send { // The `Send` trait is a marker that indicates it's safe to transfer this processor across thread boundaries.
fn event(&mut self) -> Result<Event>;
fn process(&mut self) -> Result<()>;
}

The scheduler repeatedly calls

event()
to ask a processor for its status. If the processor returns
Event::Sync
, the scheduler knows it's ready to work and calls
process()
. This pull-based model lets processors signal what they need without busy-waiting.

Part 3: The Port Implementation

Now we get to the heart of the system. Let's examine how ports actually work.

Core Data Structure

From

src/query/pipeline/core/src/processors/port.rs
:

#[repr(align(8))]
pub struct SharedData(pub Result<DataBlock>);

pub struct SharedStatus {
data: AtomicPtr<SharedData>,
}

The entire communication state between an

InputPort
and
OutputPort
is a single
AtomicPtr
—one 64‑bit word. There’s a small but useful trick here.

Pointer Tagging Technique

On 64-bit systems, memory is often accessed in chunks of 8 bytes. When we ask the system to allocate an 8-byte aligned structure, the memory address it gives us will always be a multiple of 8. In binary, a number that's a multiple of 8 always ends in

000
.

This gives us a clever opportunity. Since the last three bits of the pointer's address are guaranteed to be zero, they are unused. We can borrow them to store our own data—in this case, three boolean flags. This is called pointer tagging.

const HAS_DATA: usize = 0b001;      // Bit 0: Data is available
const NEED_DATA: usize = 0b010; // Bit 1: Consumer is waiting
const IS_FINISHED: usize = 0b100; // Bit 2: Stream has ended

const FLAGS_MASK: usize = 0b111;
const UNSET_FLAGS_MASK: usize = !FLAGS_MASK;

Here’s what that looks like visually:

A 64-bit pointer address:
[ 63 ... 3 ][ 2 ][ 1 ][ 0 ]
| || || || |
| || || || `-- HAS_DATA flag (0b001)
| || || `----- NEED_DATA flag (0b010)
| || `-------- IS_FINISHED flag (0b100)
| `----------- The actual memory address (always ends in 000)
`---------------------- (Higher bits of the address)

The

#[repr(align(8))]
attribute is a direct instruction to the Rust compiler. By default, Rust is free to reorder a struct's fields to save space, but
#[repr(...)]
lets us control the memory layout. In this case,
align(8)
forces the
SharedData
struct to start at a memory address that is a multiple of 8. This is what guarantees the lower three bits of its pointer are always zero, making them available for our tagging trick.

The Atomic Swap Operation

The core of this lock-free mechanism is an atomic compare-and-swap (CAS) operation, which is a fundamental building block in concurrent programming. Think of it like updating a public whiteboard. You can't just erase and write; someone else might be writing at the same time.

Instead, you:

  1. Read the current value (
    expected
    ).
  2. Prepare your new value (
    desired
    ).
  3. Atomically tell the system: "If the value is still
    expected
    , change it to
    desired
    . Otherwise, tell me the new current value."

This all happens in a single, indivisible step. We use

compare_exchange_weak
inside a
loop
because it can sometimes fail even if the value hasn't changed (a "spurious failure"). The loop ensures we retry until the swap succeeds.

pub fn swap(
&self,
data: *mut SharedData, // The new pointer we want to set.
set_flags: usize, // The flags we want to turn ON.
unset_flags: usize, // The flags we want to turn OFF.
) -> *mut SharedData {
// We start by assuming the current state is null. This is just an initial guess.
let mut expected = std::ptr::null_mut();

// This is the new state we want to set. It combines the new pointer with the new flags.
let mut desired = (data as usize | set_flags) as *mut SharedData;

loop {
// This is the atomic CAS operation.
match self.data.compare_exchange_weak(
expected, // IF the current value is this...
desired, // ...THEN set it to this.
Ordering::SeqCst, // Use the strictest memory ordering for correctness.
Ordering::Relaxed, // If the CAS fails, we don't need strict ordering on the read.
) {
// The CAS succeeded! The state was `expected`, and we successfully changed it to `desired`.
Ok(old_value) => {
// We return the old pointer, stripped of its flags, so the caller can deallocate it.
let old_ptr = old_value as usize;
return (old_ptr & UNSET_FLAGS_MASK) as *mut SharedData;
}
// The CAS failed. This means another thread changed the value since we last checked.
// The hardware gives us the `current` value that caused the failure.
Err(current) => {
// Update our `expected` value to the `current` value we just received.
expected = current;
let current_address = current as usize;
let desired_address = desired as usize & UNSET_FLAGS_MASK;

// We must recalculate our `desired` state. We need to preserve the flags that the
// *other thread* set, while applying our own changes.
let new_flags = (current_address & FLAGS_MASK & !unset_flags) | set_flags;
desired = (desired_address | new_flags) as *mut SharedData;

// The loop will now repeat with the updated `expected` and `desired` values.
}
}
}
}

This function atomically:

  1. Swaps the pointer value
  2. Sets specified flags
  3. Unsets other flags
  4. Returns the old pointer

All in a single atomic operation that either succeeds completely or retries with the updated state.

Part 4: How Data Flows

Let's trace through how data actually moves from producer to consumer.

Push Operation (Producer Side)

When a producer has a

DataBlock
ready, it calls
push_data
. This is where we see some
unsafe
Rust.

Box::into_raw
takes ownership of the data on the heap and gives us a raw pointer (
*mut SharedData
). This tells the Rust compiler: "I will manage this memory manually from now on." This is necessary because we are about to hand this pointer over to another thread (the consumer) without a traditional ownership transfer mechanism like a
Mutex
or
Channel
.

pub fn push_data(&self, data: Result<DataBlock>) {
// Step 1: Box the DataBlock (only allocation in the system)
let data = Box::into_raw(Box::new(SharedData(data)));

// Step 2: Atomically swap pointer and set HAS_DATA flag
let old = self.shared.swap(data, HAS_DATA, HAS_DATA);

// Step 3: Clean up old data if present (shouldn’t occur in normal flow)
if !old.is_null() {
unsafe { drop(Box::from_raw(old)); }
}

// Step 4: Wake up the scheduler
UpdateTrigger::update_output(&self.update_trigger);
}

The key insight is that we aren't copying the

DataBlock
or placing it in a queue. We are just swapping a pointer and setting a flag in a single atomic operation.

Pull Operation (Consumer Side)

When a consumer needs data, it calls

pull_data
. This function complements
push_data
by safely taking back ownership of the pointer.

If the

ptr
is not null, it means the producer sent us data.
unsafe { Box::from_raw(ptr) }
does the reverse of
into_raw
: it takes the raw pointer and gives us back a
Box
, restoring Rust's memory safety guarantees. The
Box
will now automatically deallocate the memory when it goes out of scope.

pub fn pull_data(&self) -> Option<Result<DataBlock>> {
// Atomically take the pointer and clear HAS_DATA | NEED_DATA flags
let ptr = self.shared.swap(
std::ptr::null_mut(),
0,
HAS_DATA | NEED_DATA
);

if ptr.is_null() {
None // No data available
} else {
// Take ownership of the Box and extract the DataBlock
let boxed = unsafe { Box::from_raw(ptr) };
Some(boxed.0)
}
}

A single atomic operation transfers ownership of the

DataBlock
from producer to consumer.

Handling Errors

You might have noticed that the data being sent is not a

DataBlock
but a
Result<DataBlock>
. This is a powerful Rust pattern. If an operator encounters an error while processing (e.g., a division by zero or a parsing error), it doesn't crash the pipeline. Instead, it can wrap the error in a
Result::Err
and push it into the channel just like regular data.

Downstream operators will receive this

Err
, and they can stop processing and propagate the error up to the query coordinator. This allows the pipeline to shut down gracefully when something goes wrong.

Non-Blocking Status Checks

Operators can check port status without blocking:

pub fn has_data(&self) -> bool {
(self.shared.get_flags() & HAS_DATA) != 0
}

pub fn can_push(&self) -> bool {
let flags = self.shared.get_flags();
(flags & NEED_DATA) != 0 && (flags & HAS_DATA) == 0
}

pub fn is_finished(&self) -> bool {
let flags = self.shared.get_flags();
(flags & IS_FINISHED) != 0 && (flags & HAS_DATA) == 0
}

These are simple atomic loads—no locks, no waiting, just reading the current state.

Graceful Shutdown: The
IS_FINISHED
Flag

How does a consumer know when the producer is done and no more data will ever arrive? This is handled by the

IS_FINISHED
flag.

When a producer has sent all its data, it calls a

finish()
method on the port. This method does two things:

  1. It sets the
    IS_FINISHED
    flag.
  2. It wakes up the consumer one last time.

The consumer can then check

is_finished()
. The check
(flags & IS_FINISHED) != 0 && (flags & HAS_DATA) == 0
is important: it ensures the consumer only considers the stream finished after it has consumed the very last piece of data. This prevents a race condition where the consumer might see the
IS_FINISHED
flag and shut down before pulling the final
DataBlock
.

Putting It All Together: A Lifecycle Example

Let's walk through a simple lifecycle to see how these pieces connect:

  1. Initial State: The channel is empty. The pointer is null, and all flags are 0.
  2. Consumer Needs Data: The consumer's processor runs
    event()
    . It sees no data is available (
    has_data()
    is false). It calls
    set_need_data()
    , which atomically sets the
    NEED_DATA
    flag. The state is now
    0b010
    .
  3. Producer Has Data: The producer's processor runs and generates a
    DataBlock
    . It calls
    push_data()
    .
    • The
      swap
      operation sees the
      NEED_DATA
      flag is on.
    • It atomically swaps the null pointer for its new
      DataBlock
      pointer and sets the
      HAS_DATA
      flag.
    • The state becomes
      (pointer) | 0b001
      .
  4. Consumer Gets Data: The consumer's
    event()
    runs again. It sees
    has_data()
    is now true. It calls
    pull_data()
    .
    • pull_data
      's
      swap
      operation atomically takes the pointer and clears the
      HAS_DATA
      flag.
    • The state returns to null and all flags are 0.
    • The consumer now has the
      DataBlock
      and can process it.
  5. Producer Finishes: After sending its last block, the producer calls
    finish()
    . This sets the
    IS_FINISHED
    flag. The state is now
    0b100
    .
  6. Consumer Finishes: The consumer's
    event()
    runs one last time. It sees
    is_finished()
    is true and there's no data. It knows the stream is complete and can shut down.

This entire exchange happens with no locks and only one heap allocation per

DataBlock
.

Part 5: Integration with Processors

Here is a simplified

Filter
processor to show how operators use these ports.

impl Processor for FilterTransform {
fn event(&mut self) -> Result<Event> {
// The event() method is a state machine that checks conditions in order:
// 1. Is the consumer ready for us to send data? If not, signal backpressure.
if !self.output.can_push() {
return Ok(Event::NeedConsume);
}

// 2. Do we have data we've already pulled but haven't processed yet?
if self.pending_data.is_some() {
return Ok(Event::Sync); // Ready to process.
}

// 3. Is there new data waiting at the input port?
if self.input.has_data() {
self.pending_data = self.input.pull_data();
return Ok(Event::Sync); // Got data, ready to process.
}

// 4. Is the input stream finished, with no more data coming?
if self.input.is_finished() {
self.output.finish();
return Ok(Event::Finished);
}

// 5. If none of the above, we need data. Ask the producer.
self.input.set_need_data();
Ok(Event::NeedData)
}

fn process(&mut self) -> Result<()> {
if let Some(block) = self.pending_data.take() {
let filtered = self.apply_filter(block)?;
self.output.push_data(Ok(filtered));
}
Ok(())
}
}

The processor uses ports to:

  1. Check for backpressure (
    can_push
    )
  2. Pull input data (
    pull_data
    )
  3. Push output data (
    push_data
    )
  4. Signal completion (
    finish
    )

Without additional synchronization code.

Part 6: Performance Analysis

Why it’s lighter than general channels

Let's compare the operations:

Traditional Channel (simplified):

// Send operation
1. Lock mutex
2. Allocate queue node
3. Update head/tail pointers
4. Signal condition variable
5. Unlock mutex

// Receive operation
1. Lock mutex
2. Check queue
3. Remove node
4. Update pointers
5. Unlock mutex
6. Deallocate node

Port Operation:

// Push operation
1. Single atomic CAS

// Pull operation
1. Single atomic CAS

Operationally, channels perform multiple memory operations, allocations, and synchronizations per transfer. Ports use a single atomic operation.

Memory Characteristics

  • Cache efficiency: A CPU doesn't read memory byte by byte; it pulls in chunks called cache lines (commonly 64 bytes). Since our entire
    SharedStatus
    (an
    AtomicPtr
    ) fits in a single cache line, a CPU can read the full state in one go. There's no need to fetch multiple, separate pieces of memory.
  • No false sharing: This is a subtle but serious performance killer in concurrent code. If two threads are modifying two different variables that happen to live in the same cache line, the CPUs will constantly invalidate each other's caches, even though the threads aren't touching the same data. By giving the
    InputPort
    and
    OutputPort
    their own separate status pointers, we ensure they don't cause false sharing.
  • Allocation pattern: We only allocate when a new
    DataBlock
    is created, not for the communication itself. The channel just passes pointers around.
  • Memory ordering:
    SeqCst
    (Sequentially Consistent) is the strictest—and easiest to reason about—memory ordering. It guarantees that all threads see all atomic operations happen in the same, single global order. While faster but more complex orderings like
    Acquire
    and
    Release
    exist,
    SeqCst
    is the safest starting point.

Limitations and Trade-offs

This design makes specific trade-offs:

Advantages:

  • Minimal synchronization overhead
  • Zero allocations for communication
  • Direct backpressure signaling
  • Cache-efficient for large transfers

Disadvantages:

  • Only supports 1:1 connections (no broadcast, no multiple consumers)
  • No buffering (can’t smooth out speed differences)
  • Platform-specific (requires 64‑bit pointers, 8‑byte alignment)
  • Pointer tagging can confuse debuggers and tooling

Part 7: Testing and Verification

Memory Safety

The implementation carefully handles memory ownership:

impl Drop for SharedStatus {
fn drop(&mut self) {
drop_guard(move || unsafe {
// Ensure we clean up any pending data
let address = self.swap(std::ptr::null_mut(), 0, HAS_DATA);
if !address.is_null() {
drop(Box::from_raw(address));
}
})
}
}

The

drop_guard
ensures cleanup happens even if a panic occurs during drop.

Concurrency Testing

Databend uses multiple approaches to verify correctness:

  1. Loom for deterministic concurrency testing
  2. Stress tests with millions of operations
  3. MIRI for detecting undefined behavior
  4. Production validation with real workloads

Part 8: Practical Considerations

When to Use This Pattern

This lock-free port design works well when:

  • You have pipeline-style processing with stages
  • Data chunks are large (overhead amortized)
  • You need backpressure without queuing
  • Performance is critical

It's not suitable when:

  • You need multiple producers or consumers
  • Messages are small and frequent
  • You need buffering between stages
  • You're communicating across process boundaries

Alternative Approaches

For different requirements, consider:

  • Channels for many-to-many communication
  • Ring buffers for fixed-size buffering
  • Lock-free queues for multiple producers
  • Shared memory IPC for cross-process communication

Conclusion

The lock‑free port design in Databend shows how matching design to requirements can cut overhead. By accepting constraints (1:1 connections, no buffering) and leveraging platform capabilities (pointer alignment, atomic operations), we built a mechanism that:

  1. Eliminates allocations in the communication path
  2. Reduces synchronization to a single atomic operation
  3. Provides direct backpressure signaling
  4. Maintains memory safety despite using unsafe code

This isn’t a universal solution—it’s a specialized tool. When the job is moving gigabytes of data through a query pipeline with minimal overhead, it fits well.

The key lesson isn’t pointer tagging or lock‑free programming. It’s understanding the problem domain and using specialized solutions when general‑purpose tools don’t fit.

Further Reading

Share this post

Subscribe to our newsletter

Stay informed on feature releases, product roadmap, support, and cloud offerings!