LSM Trees Unveiled - Part 3 - Completing the Rust Implementation

LSM Trees Unveiled - Part 3 - Completing the Rust Implementation
Photo by Ilze Lucero / Unsplash

Welcome to the final part of our series on Log-Structured Merge (LSM) trees. In Part 1, we explored the core concepts of LSM trees. Part 2 dove into the implementation of key components: the MemTable and SSTable. Now, we'll bring everything together to create a complete, efficient LSM tree implementation in Rust.

The Main LSM Tree Implementation

We'll use a "fat" struct approach for our LSM Tree, which includes all the necessary components and channels for operation. This design allows for a clear separation of concerns and makes the code more modular and easier to maintain.

Let's start with the main struct and its implementation:

use std::sync::Arc;
use std::path::PathBuf;
use crossbeam_channel::{unbounded, Sender, Receiver};
use parking_lot::Mutex;

enum Command {
    Insert(String, String),
    Get(String, Sender<Option<String>>),
    Flush,
}

struct LSMTree {
    memtable: Arc<Mutex<MemTable>>,
    sstables: Arc<Mutex<VecDeque<SSTable>>>,
    wal: Arc<Mutex<File>>,
    command_sender: Sender<Command>,
    command_receiver: Receiver<Command>,
    flush_sender: Sender<()>,
    flush_receiver: Receiver<()>,
    memtable_size_threshold: usize,
    data_dir: PathBuf,
}

impl LSMTree {
    fn new(memtable_size_threshold: usize, data_dir: PathBuf) -> io::Result<Self> {
        std::fs::create_dir_all(&data_dir)?;

        let wal = Arc::new(Mutex::new(
            OpenOptions::new()
                .create(true)
                .append(true)
                .open(data_dir.join("wal.log"))?,
        ));

        let (command_sender, command_receiver) = unbounded();
        let (flush_sender, flush_receiver) = unbounded();

        let mut sstables = VecDeque::new();
        LSMTree::load_existing_sstables(&data_dir, &mut sstables)?;

        Ok(LSMTree {
            memtable: Arc::new(Mutex::new(MemTable::new())),
            sstables: Arc::new(Mutex::new(sstables)),
            wal,
            command_sender,
            command_receiver,
            flush_sender,
            flush_receiver,
            memtable_size_threshold,
            data_dir,
        })
    }

    fn load_existing_sstables(
        data_dir: &PathBuf,
        sstables: &mut VecDeque<SSTable>,
    ) -> io::Result<()> {
        for entry in read_dir(&data_dir)? {
            let entry = entry?;
            let path = entry.path();
            if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("dat") {
                match path.file_stem().and_then(|s| s.to_str()) {
                    Some(file_stem) => {
                        if let Some(id_str) = file_stem.strip_prefix("sstable_") {
                            if let Ok(id) = id_str.parse::<usize>() {
                                let sstable = SSTable::new(id, &data_dir)?;
                                info!("Loaded the {:?} SSTable", sstable.id);
                                sstables.push_back(sstable);
                            }
                        }
                    }
                    None => (),
                }
            }
        }
        Ok(())
    }

    fn insert(&self, key: String, value: String) -> io::Result<()> {
        self.command_sender
            .send(Command::Insert(key, value))
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
    }

    fn get(&self, key: String) -> io::Result<Option<String>> {
        let (sender, receiver) = unbounded();
        self.command_sender
            .send(Command::Get(key, sender))
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
        receiver
            .recv()
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
    }

    fn flush(&self) -> io::Result<()> {
        self.command_sender
            .send(Command::Flush)
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
    }

    fn run(&self) -> io::Result<()> {
        loop {
            crossbeam_channel::select! {
                recv(self.command_receiver) -> cmd => match cmd {
                    Ok(Command::Insert(key, value)) => self.handle_insert(key, value)?,
                    Ok(Command::Get(key, response_sender)) => self.handle_get(key, response_sender)?,
                    Ok(Command::Flush) => self.handle_flush()?,
                    Err(_) => break, // Channel closed, exit the loop
                },
                recv(self.flush_receiver) -> _ => self.perform_flush()?,
            }
        }
        Ok(())
    }

    fn handle_insert(&self, key: String, value: String) -> io::Result<()> {
        // Write to WAL
        {
            let mut wal = self.wal.lock();
            writeln!(&mut *wal, "{}:{}", key, value)?;
            wal.flush()?;
        }

        // Insert into memtable
        let memtable = self.memtable.lock();
        memtable.insert(key, value);

        // Check if memtable size exceeds threshold
        if memtable.size() > self.memtable_size_threshold {
            drop(memtable); // Release the lock before sending flush signal
            self.flush_sender.send(()).unwrap();
        }

        Ok(())
    }

    fn handle_get(&self, key: String, response_sender: Sender<Option<String>>) -> io::Result<()> {
        // Check memtable first
        let memtable = self.memtable.lock();
        if let Some(value) = memtable.get(&key) {
            response_sender.send(Some(value)).unwrap();
            return Ok(());
        }
        drop(memtable);

        // Check SSTables in reverse order
        let sstables = self.sstables.lock();
        for sstable in sstables.iter().rev() {
            if let Some(value) = sstable.get(&key)? {
                response_sender.send(Some(value)).unwrap();
                return Ok(());
            }
        }

        response_sender.send(None).unwrap();
        Ok(())
    }

    fn handle_flush(&self) -> io::Result<()> {
        self.flush_sender.send(()).unwrap();
        Ok(())
    }

    fn perform_flush(&self) -> io::Result<()> {
        let mut sstables = self.sstables.lock();
        let mut new_sstable = SSTable::new(sstables.len(), &self.data_dir)?;

        let memtable = self.memtable.lock();
        let data_to_flush = memtable.drain();
        drop(memtable);

        for (key, value) in data_to_flush {
            new_sstable.write(&key, &value)?;
        }
        sstables.push_back(new_sstable);

        // Reset memtable
        *self.memtable.lock() = MemTable::new();

        // Trigger compaction if necessary
        if sstables.len() > 2 {
            let sstables_to_compact = sstables.drain(..2).collect::<Vec<_>>();
            let data_dir = self.data_dir.clone();
            std::thread::spawn(move || {
                if let Err(e) = compact_sstables(sstables_to_compact, &data_dir) {
                    error!("Error during compaction: {:?}", e);
                }
            });
        }

        Ok(())
    }
}

This implementation uses a dedicated thread to handle all operations, communicating via channels. This approach eliminates the need for complex locking schemes and allows for easy management of the tree's state.

Compaction Process

The compaction process is crucial for maintaining the performance of our LSM tree. Let's implement it:

fn compact_sstables(sstables: Vec<SSTable>, data_dir: &Path) -> io::Result<()> {
    let mut merged_data = OrdMap::new();

    for sstable in sstables {
        for (key, _) in sstable.index.iter() {
            if let Ok(Some(value)) = sstable.get(key) {
                merged_data.insert(key.clone(), value);
            }
        }
    }

    if let Ok(mut new_sstable) = SSTable::new(0, data_dir) {
        for (key, value) in merged_data {
            new_sstable.write(&key, &value)?;
        }
    } else {
        return Err(io::Error::new(
            io::ErrorKind::Other,
            "Failed to create new SSTable during compaction",
        ));
    }

    Ok(())
}

This compaction process merges multiple SSTables into a single new SSTable, removing any outdated or deleted entries in the process.

Key Features of This Implementation

  1. Lock-Free MemTable: Using SkipMap allows for highly concurrent access to the in-memory data.
  2. Copy-on-Write SSTable Index: Provides lock-free reads and efficient updates to the SSTable index.
  3. Message-Passing Architecture: Reduces contention and simplifies the concurrency model.
  4. Background Compaction: Compaction is performed in a separate thread, preventing blocking of other operations.

Using the LSM Tree

Here's an example of how to use this LSM tree implementation:

fn main() -> io::Result<()> {
    let data_dir = PathBuf::from("lsm_data");
    let lsm_tree = Arc::new(LSMTree::new(1000, data_dir)?);

    // Start the main processing loop in a separate thread
    let lsm_tree_clone = Arc::clone(&lsm_tree);
    std::thread::spawn(move || {
        if let Err(e) = lsm_tree_clone.run() {
            error!("LSM tree processing loop error: {:?}", e);
        }
    });

    // Use the LSM tree
    lsm_tree.insert("key1".to_string(), "value1".to_string())?;
    lsm_tree.insert("key2".to_string(), "value2".to_string())?;

    println!("Value for key1: {:?}", lsm_tree.get("key1".to_string())?);
    println!("Value for key2: {:?}", lsm_tree.get("key2".to_string())?);

    lsm_tree.flush()?;

    // In a real application, you'd want to have a way to gracefully shut down the processing loop

    Ok(())
}

This example demonstrates how to initialize the LSM tree, start its processing loop in a separate thread, and perform basic operations like insert, get, and flush. Check all of the code in this GitHub repo:

Conclusion

In this series, we've explored the core concepts of LSM trees and implemented an efficient version in Rust. Our implementation leverages modern concurrency techniques like lock-free data structures, Copy-on-Write, and message passing to achieve high performance and scalability.

While this implementation provides a solid foundation, it's important to note that production-grade LSM tree implementations would include additional features such as:

  • Bloom filters to reduce unnecessary disk reads
  • More sophisticated compaction strategies
  • Robust error handling and recovery mechanisms
  • Configurable options for performance tuning

The concepts and techniques we've explored offer valuable insights into the design of modern, high-performance database systems. Whether you're working on key-value stores, time-series databases, or large-scale analytical systems, the principles of LSM trees we've discussed and implemented are likely at play behind the scenes.

By understanding both the theoretical underpinnings and practical implementation details of LSM trees, developers can make more informed decisions when choosing database systems, optimizing performance, or even building their own storage engines.

As data continues to grow in volume and velocity, efficient data structures like LSM trees will play an increasingly crucial role in managing and processing information at scale. I hope this series has equipped you with the knowledge to appreciate and leverage the power of LSM trees in your future projects and explorations in the world of database systems.