A practical look at Databend's lock-free channel
When a SQL query runs in Databend, it becomes a directed graph of operators—TableScan, Filter, Join, Aggregation—running in parallel and passing data blocks. This post describes the lock-free channel that connects those operators: a simple design that stores three state flags in the unused bits of a pointer.
This isn't a performance victory lap. It's a look at one practical choice: moving large blocks between stages with minimal allocation and without traditional locks—why we use it, how it works, and when it helps.
Part 1: Background and Context
What Problem Are We Solving?
Databend processes SQL queries using a pipeline architecture inspired by the "Morsel-Driven Parallelism" paper from TU Munich. The core idea is simple: break query execution into small, cache-friendly chunks (called morsels in the paper, DataBlocks in our implementation) and process them through a pipeline of operators.
Consider this query:
SELECT customer_type, SUM(amount)
FROM orders
WHERE date > '2024-01-01'
GROUP BY customer_type
Internally, this becomes a pipeline:
TableScan → Filter → PartialAggregate → Exchange → FinalAggregate
Each operator processes DataBlocks—typically containing 8,192 to 65,536 rows of columnar data, ranging from 64KB to 10MB in size. The challenge? These operators run at different speeds. A TableScan might read from fast NVMe storage, while a complex aggregation might be CPU-bound. We need a way for them to communicate that:
- Provides backpressure: A fast producer (like a ) must wait for a slow consumer (like a complex
TableScan
) instead of sending data that the consumer can't handle, which would waste memory. We need a way to signal "I'm full, please wait."Filter
- Minimizes allocations: Creating and destroying memory blocks for every message is slow. Since we're moving gigabytes of data, we want to reuse memory as much as possible.
- Avoids lock contention: When multiple threads try to access the same data, they often use locks to prevent errors. But if many threads are waiting for the same lock, they form a queue, and performance suffers. This is lock contention.
- Enables status checks: An operator needs to peek at the channel's state—is there data for me? is the consumer ready?—without having to stop and wait (block).
Design Constraints
Our solution is tailored to specific constraints:
- Single producer, single consumer (SPSC): Each connection has exactly one sender and one receiver. This is a huge simplification compared to general-purpose channels that might need to support multiple senders (MPSC) or multiple receivers (MPMC). By narrowing our scope to SPSC, we can create a much simpler and faster design.
- Large data blocks - We're moving 64KB-10MB chunks, not individual messages
- Direct handoff - No buffering between operators
- Shared memory - All operators run in the same process
These constraints are different from general-purpose channels, which typically support multiple producers/consumers, small messages, and cross-process communication. By accepting these limitations, we can optimize for our specific use case.
Part 2: The Architecture
Pipeline Structure
Here’s how Databend structures its pipeline.:
pub struct Node {
pub proc: ProcessorPtr, // The actual operator
pub inputs: Vec<Arc<InputPort>>, // Where data comes from
pub outputs: Vec<Arc<OutputPort>>, // Where data goes to
}
pub struct Edge {
pub input_index: usize,
pub output_index: usize,
}
The pipeline is a directed graph where:
- Nodes are processors (operators that transform data)
- Edges are port connections (data flow paths)
Each edge connects exactly one OutputPort to one InputPort, creating a dedicated communication channel between two operators.
The Event-Driven Model
Processors don't run continuously. Instead, they implement a state machine that the scheduler queries. From
src/query/pipeline/core/src/processors/processor.rs
pub enum Event {
NeedData, // Waiting for input
NeedConsume, // Has output ready, waiting for consumer
Sync, // Can process synchronously now
Async, // Needs to do async I/O
Finished, // Stream completed
}
pub trait Processor: Send { // The `Send` trait is a marker that indicates it's safe to transfer this processor across thread boundaries.
fn event(&mut self) -> Result<Event>;
fn process(&mut self) -> Result<()>;
}
The scheduler repeatedly calls
event()
Event::Sync
process()
Part 3: The Port Implementation
Now we get to the heart of the system. Let's examine how ports actually work.
Core Data Structure
From
src/query/pipeline/core/src/processors/port.rs
#[repr(align(8))]
pub struct SharedData(pub Result<DataBlock>);
pub struct SharedStatus {
data: AtomicPtr<SharedData>,
}
The entire communication state between an
InputPort
OutputPort
AtomicPtr
Pointer Tagging Technique
On 64-bit systems, memory is often accessed in chunks of 8 bytes. When we ask the system to allocate an 8-byte aligned structure, the memory address it gives us will always be a multiple of 8. In binary, a number that's a multiple of 8 always ends in
000
This gives us a clever opportunity. Since the last three bits of the pointer's address are guaranteed to be zero, they are unused. We can borrow them to store our own data—in this case, three boolean flags. This is called pointer tagging.
const HAS_DATA: usize = 0b001; // Bit 0: Data is available
const NEED_DATA: usize = 0b010; // Bit 1: Consumer is waiting
const IS_FINISHED: usize = 0b100; // Bit 2: Stream has ended
const FLAGS_MASK: usize = 0b111;
const UNSET_FLAGS_MASK: usize = !FLAGS_MASK;
Here’s what that looks like visually:
A 64-bit pointer address:
[ 63 ... 3 ][ 2 ][ 1 ][ 0 ]
| || || || |
| || || || `-- HAS_DATA flag (0b001)
| || || `----- NEED_DATA flag (0b010)
| || `-------- IS_FINISHED flag (0b100)
| `----------- The actual memory address (always ends in 000)
`---------------------- (Higher bits of the address)
The
#[repr(align(8))]
#[repr(...)]
align(8)
SharedData
The Atomic Swap Operation
The core of this lock-free mechanism is an atomic compare-and-swap (CAS) operation, which is a fundamental building block in concurrent programming. Think of it like updating a public whiteboard. You can't just erase and write; someone else might be writing at the same time.
Instead, you:
- Read the current value ().
expected
- Prepare your new value ().
desired
- Atomically tell the system: "If the value is still , change it to
expected
. Otherwise, tell me the new current value."desired
This all happens in a single, indivisible step. We use
compare_exchange_weak
loop
pub fn swap(
&self,
data: *mut SharedData, // The new pointer we want to set.
set_flags: usize, // The flags we want to turn ON.
unset_flags: usize, // The flags we want to turn OFF.
) -> *mut SharedData {
// We start by assuming the current state is null. This is just an initial guess.
let mut expected = std::ptr::null_mut();
// This is the new state we want to set. It combines the new pointer with the new flags.
let mut desired = (data as usize | set_flags) as *mut SharedData;
loop {
// This is the atomic CAS operation.
match self.data.compare_exchange_weak(
expected, // IF the current value is this...
desired, // ...THEN set it to this.
Ordering::SeqCst, // Use the strictest memory ordering for correctness.
Ordering::Relaxed, // If the CAS fails, we don't need strict ordering on the read.
) {
// The CAS succeeded! The state was `expected`, and we successfully changed it to `desired`.
Ok(old_value) => {
// We return the old pointer, stripped of its flags, so the caller can deallocate it.
let old_ptr = old_value as usize;
return (old_ptr & UNSET_FLAGS_MASK) as *mut SharedData;
}
// The CAS failed. This means another thread changed the value since we last checked.
// The hardware gives us the `current` value that caused the failure.
Err(current) => {
// Update our `expected` value to the `current` value we just received.
expected = current;
let current_address = current as usize;
let desired_address = desired as usize & UNSET_FLAGS_MASK;
// We must recalculate our `desired` state. We need to preserve the flags that the
// *other thread* set, while applying our own changes.
let new_flags = (current_address & FLAGS_MASK & !unset_flags) | set_flags;
desired = (desired_address | new_flags) as *mut SharedData;
// The loop will now repeat with the updated `expected` and `desired` values.
}
}
}
}
This function atomically:
- Swaps the pointer value
- Sets specified flags
- Unsets other flags
- Returns the old pointer
All in a single atomic operation that either succeeds completely or retries with the updated state.
Part 4: How Data Flows
Let's trace through how data actually moves from producer to consumer.
Push Operation (Producer Side)
When a producer has a
DataBlock
push_data
unsafe
Box::into_raw
*mut SharedData
Mutex
Channel
pub fn push_data(&self, data: Result<DataBlock>) {
// Step 1: Box the DataBlock (only allocation in the system)
let data = Box::into_raw(Box::new(SharedData(data)));
// Step 2: Atomically swap pointer and set HAS_DATA flag
let old = self.shared.swap(data, HAS_DATA, HAS_DATA);
// Step 3: Clean up old data if present (shouldn’t occur in normal flow)
if !old.is_null() {
unsafe { drop(Box::from_raw(old)); }
}
// Step 4: Wake up the scheduler
UpdateTrigger::update_output(&self.update_trigger);
}
The key insight is that we aren't copying the
DataBlock
Pull Operation (Consumer Side)
When a consumer needs data, it calls
pull_data
push_data
If the
ptr
unsafe { Box::from_raw(ptr) }
into_raw
Box
Box
pub fn pull_data(&self) -> Option<Result<DataBlock>> {
// Atomically take the pointer and clear HAS_DATA | NEED_DATA flags
let ptr = self.shared.swap(
std::ptr::null_mut(),
0,
HAS_DATA | NEED_DATA
);
if ptr.is_null() {
None // No data available
} else {
// Take ownership of the Box and extract the DataBlock
let boxed = unsafe { Box::from_raw(ptr) };
Some(boxed.0)
}
}
A single atomic operation transfers ownership of the
DataBlock
Handling Errors
You might have noticed that the data being sent is not a
DataBlock
Result<DataBlock>
Result::Err
Downstream operators will receive this
Err
Non-Blocking Status Checks
Operators can check port status without blocking:
pub fn has_data(&self) -> bool {
(self.shared.get_flags() & HAS_DATA) != 0
}
pub fn can_push(&self) -> bool {
let flags = self.shared.get_flags();
(flags & NEED_DATA) != 0 && (flags & HAS_DATA) == 0
}
pub fn is_finished(&self) -> bool {
let flags = self.shared.get_flags();
(flags & IS_FINISHED) != 0 && (flags & HAS_DATA) == 0
}
These are simple atomic loads—no locks, no waiting, just reading the current state.
Graceful Shutdown: The IS_FINISHED
Flag
IS_FINISHED
How does a consumer know when the producer is done and no more data will ever arrive? This is handled by the
IS_FINISHED
When a producer has sent all its data, it calls a
finish()
- It sets the flag.
IS_FINISHED
- It wakes up the consumer one last time.
The consumer can then check
is_finished()
(flags & IS_FINISHED) != 0 && (flags & HAS_DATA) == 0
IS_FINISHED
DataBlock
Putting It All Together: A Lifecycle Example
Let's walk through a simple lifecycle to see how these pieces connect:
- Initial State: The channel is empty. The pointer is null, and all flags are 0.
- Consumer Needs Data: The consumer's processor runs . It sees no data is available (
event()
is false). It callshas_data()
, which atomically sets theset_need_data()
flag. The state is nowNEED_DATA
.0b010
- Producer Has Data: The producer's processor runs and generates a . It calls
DataBlock
.push_data()
- The operation sees the
swap
flag is on.NEED_DATA
- It atomically swaps the null pointer for its new pointer and sets the
DataBlock
flag.HAS_DATA
- The state becomes .
(pointer) | 0b001
- The
- Consumer Gets Data: The consumer's runs again. It sees
event()
is now true. It callshas_data()
.pull_data()
- 's
pull_data
operation atomically takes the pointer and clears theswap
flag.HAS_DATA
- The state returns to null and all flags are 0.
- The consumer now has the and can process it.
DataBlock
- Producer Finishes: After sending its last block, the producer calls . This sets the
finish()
flag. The state is nowIS_FINISHED
.0b100
- Consumer Finishes: The consumer's runs one last time. It sees
event()
is true and there's no data. It knows the stream is complete and can shut down.is_finished()
This entire exchange happens with no locks and only one heap allocation per
DataBlock
Part 5: Integration with Processors
Here is a simplified
Filter
impl Processor for FilterTransform {
fn event(&mut self) -> Result<Event> {
// The event() method is a state machine that checks conditions in order:
// 1. Is the consumer ready for us to send data? If not, signal backpressure.
if !self.output.can_push() {
return Ok(Event::NeedConsume);
}
// 2. Do we have data we've already pulled but haven't processed yet?
if self.pending_data.is_some() {
return Ok(Event::Sync); // Ready to process.
}
// 3. Is there new data waiting at the input port?
if self.input.has_data() {
self.pending_data = self.input.pull_data();
return Ok(Event::Sync); // Got data, ready to process.
}
// 4. Is the input stream finished, with no more data coming?
if self.input.is_finished() {
self.output.finish();
return Ok(Event::Finished);
}
// 5. If none of the above, we need data. Ask the producer.
self.input.set_need_data();
Ok(Event::NeedData)
}
fn process(&mut self) -> Result<()> {
if let Some(block) = self.pending_data.take() {
let filtered = self.apply_filter(block)?;
self.output.push_data(Ok(filtered));
}
Ok(())
}
}
The processor uses ports to:
- Check for backpressure ()
can_push
- Pull input data ()
pull_data
- Push output data ()
push_data
- Signal completion ()
finish
Without additional synchronization code.
Part 6: Performance Analysis
Why it’s lighter than general channels
Let's compare the operations:
Traditional Channel (simplified):
// Send operation
1. Lock mutex
2. Allocate queue node
3. Update head/tail pointers
4. Signal condition variable
5. Unlock mutex
// Receive operation
1. Lock mutex
2. Check queue
3. Remove node
4. Update pointers
5. Unlock mutex
6. Deallocate node
Port Operation:
// Push operation
1. Single atomic CAS
// Pull operation
1. Single atomic CAS
Operationally, channels perform multiple memory operations, allocations, and synchronizations per transfer. Ports use a single atomic operation.
Memory Characteristics
- Cache efficiency: A CPU doesn't read memory byte by byte; it pulls in chunks called cache lines (commonly 64 bytes). Since our entire (an
SharedStatus
) fits in a single cache line, a CPU can read the full state in one go. There's no need to fetch multiple, separate pieces of memory.AtomicPtr
- No false sharing: This is a subtle but serious performance killer in concurrent code. If two threads are modifying two different variables that happen to live in the same cache line, the CPUs will constantly invalidate each other's caches, even though the threads aren't touching the same data. By giving the and
InputPort
their own separate status pointers, we ensure they don't cause false sharing.OutputPort
- Allocation pattern: We only allocate when a new is created, not for the communication itself. The channel just passes pointers around.
DataBlock
- Memory ordering: (Sequentially Consistent) is the strictest—and easiest to reason about—memory ordering. It guarantees that all threads see all atomic operations happen in the same, single global order. While faster but more complex orderings like
SeqCst
andAcquire
exist,Release
is the safest starting point.SeqCst
Limitations and Trade-offs
This design makes specific trade-offs:
Advantages:
- Minimal synchronization overhead
- Zero allocations for communication
- Direct backpressure signaling
- Cache-efficient for large transfers
Disadvantages:
- Only supports 1:1 connections (no broadcast, no multiple consumers)
- No buffering (can’t smooth out speed differences)
- Platform-specific (requires 64‑bit pointers, 8‑byte alignment)
- Pointer tagging can confuse debuggers and tooling
Part 7: Testing and Verification
Memory Safety
The implementation carefully handles memory ownership:
impl Drop for SharedStatus {
fn drop(&mut self) {
drop_guard(move || unsafe {
// Ensure we clean up any pending data
let address = self.swap(std::ptr::null_mut(), 0, HAS_DATA);
if !address.is_null() {
drop(Box::from_raw(address));
}
})
}
}
The
drop_guard
Concurrency Testing
Databend uses multiple approaches to verify correctness:
- Loom for deterministic concurrency testing
- Stress tests with millions of operations
- MIRI for detecting undefined behavior
- Production validation with real workloads
Part 8: Practical Considerations
When to Use This Pattern
This lock-free port design works well when:
- You have pipeline-style processing with stages
- Data chunks are large (overhead amortized)
- You need backpressure without queuing
- Performance is critical
It's not suitable when:
- You need multiple producers or consumers
- Messages are small and frequent
- You need buffering between stages
- You're communicating across process boundaries
Alternative Approaches
For different requirements, consider:
- Channels for many-to-many communication
- Ring buffers for fixed-size buffering
- Lock-free queues for multiple producers
- Shared memory IPC for cross-process communication
Conclusion
The lock‑free port design in Databend shows how matching design to requirements can cut overhead. By accepting constraints (1:1 connections, no buffering) and leveraging platform capabilities (pointer alignment, atomic operations), we built a mechanism that:
- Eliminates allocations in the communication path
- Reduces synchronization to a single atomic operation
- Provides direct backpressure signaling
- Maintains memory safety despite using unsafe code
This isn’t a universal solution—it’s a specialized tool. When the job is moving gigabytes of data through a query pipeline with minimal overhead, it fits well.
The key lesson isn’t pointer tagging or lock‑free programming. It’s understanding the problem domain and using specialized solutions when general‑purpose tools don’t fit.
Further Reading
- Morsel-Driven Parallelism: A NUMA-Aware Query Evaluation Framework
- Getting Started with Databend: Explore Databend and try it for yourself.
- Join our Slack: Connect with other data professionals and stay updated on the latest trends and best practices.
Subscribe to our newsletter
Stay informed on feature releases, product roadmap, support, and cloud offerings!