In my previous blog posts about LSM Trees, we explored how log-structured storage engines can efficiently handle write-heavy workloads. Today, we'll dive deeper into how various logging patterns serve as the backbone of distributed storage systems. The famous quote "You can't build a distributed system without logs" rings particularly true as we examine these essential patterns.
Why Logs Matter in Distributed Systems
Logs are more than just append-only records – they're the source of truth that enables:
- Crash recovery and system reliability
- Consistent state replication across nodes
- Clear ordering of operations
- Point-in-time system state recovery
Let's explore these critical logging patterns that make these properties possible.
Write-Ahead Log (WAL): The Guardian of Durability
Write-Ahead Logging is the first line of defense against data loss. Before any modification hits your actual data structures, it must be recorded in the WAL.
How WAL Works
- Incoming write operation arrives
- System appends operation to WAL
- System acknowledges write only after WAL sync
- Background process applies changes to main data structure
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Clone, Debug)]
struct WalEntry {
operation: String,
key: String,
value: String,
sequence_number: u64,
}
struct StorageEngine {
wal: Arc<Mutex<Vec<WalEntry>>>,
memtable: Arc<Mutex<HashMap<String, String>>>,
next_sequence: Arc<Mutex<u64>>,
}
impl StorageEngine {
async fn write(&self, key: String, value: String) -> Result<(), Box<dyn std::error::Error>> {
let sequence = {
let mut seq = self.next_sequence.lock().await;
*seq += 1;
*seq
};
// First, log the operation
let wal_entry = WalEntry {
operation: "PUT".to_string(),
key: key.clone(),
value: value.clone(),
sequence_number: sequence,
};
// Acquire lock and append to WAL
{
let mut wal = self.wal.lock().await;
wal.push(wal_entry);
// In real implementation, we'd sync to disk here
}
// Update memtable
{
let mut memtable = self.memtable.lock().await;
memtable.insert(key, value);
}
Ok(())
}
}
Low-Water Mark (LWM): Managing Storage Cleanup
The Low-Water Mark is crucial for garbage collection and managing system resources. It represents the oldest data that might still be needed by any part of the system.
Purpose of Low-Water Mark
- Determines which log entries can be safely deleted
- Prevents resource leaks from accumulated logs
- Facilitates efficient storage management
#[derive(Debug)]
struct LogCleaner {
low_water_mark: u64,
segments: Vec<LogSegment>,
}
#[derive(Debug)]
struct LogSegment {
start_sequence: u64,
end_sequence: u64,
entries: Vec<WalEntry>,
}
impl LogCleaner {
fn new() -> Self {
LogCleaner {
low_water_mark: 0,
segments: Vec::new(),
}
}
fn update_low_water_mark(&mut self, oldest_needed_sequence: u64) {
self.low_water_mark = oldest_needed_sequence;
}
fn garbage_collect(&mut self) -> Result<(), Box<dyn std::error::Error>> {
// Remove segments that are entirely before the low water mark
self.segments.retain(|segment| {
segment.end_sequence >= self.low_water_mark
});
// Truncate partial segments if needed
if let Some(first_segment) = self.segments.first_mut() {
if first_segment.start_sequence < self.low_water_mark {
first_segment.entries.retain(|entry| {
entry.sequence_number >= self.low_water_mark
});
first_segment.start_sequence = self.low_water_mark;
}
}
Ok(())
}
}
Replication Log: Distributed Consensus and State Machine Replication
The replication log is the backbone of distributed consensus, enabling multiple nodes to maintain consistent state. While WAL ensures durability on a single node, the replication log ensures consistency across the entire cluster.
Components of Replication Log
use serde::{Serialize, Deserialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
struct ReplicationLogEntry {
sequence: u64,
term: u64, // For leader election/consensus
data: Vec<u8>,
checksum: u32,
}
#[derive(Debug)]
struct ReplicationLog {
entries: Vec<ReplicationLogEntry>,
committed_index: u64,
last_applied: u64,
}
impl ReplicationLog {
fn new() -> Self {
ReplicationLog {
entries: Vec::new(),
committed_index: 0,
last_applied: 0,
}
}
fn append(&mut self, entry: ReplicationLogEntry) -> Result<(), Box<dyn std::error::Error>> {
// Verify entry sequence is next in line
if let Some(last) = self.entries.last() {
if entry.sequence != last.sequence + 1 {
return Err("Sequence gap detected".into());
}
}
// Verify checksum
if !self.verify_checksum(&entry) {
return Err("Checksum validation failed".into());
}
self.entries.push(entry);
Ok(())
}
fn verify_checksum(&self, entry: &ReplicationLogEntry) -> bool {
// Implementation of checksum verification
true
}
async fn replicate_to_follower(
&self,
follower_id: String,
last_sequence: u64,
) -> Result<(), Box<dyn std::error::Error>> {
// Find entries that follower needs
let new_entries: Vec<_> = self.entries
.iter()
.filter(|e| e.sequence > last_sequence)
.cloned()
.collect();
if new_entries.is_empty() {
return Ok(());
}
// Send entries to follower
self.send_entries(follower_id, new_entries).await
}
async fn send_entries(
&self,
follower_id: String,
entries: Vec<ReplicationLogEntry>,
) -> Result<(), Box<dyn std::error::Error>> {
// Network communication implementation
Ok(())
}
}
// Leader's replication management
struct ReplicationCoordinator {
log: ReplicationLog,
followers: HashMap<String, FollowerState>,
}
#[derive(Debug)]
struct FollowerState {
next_index: u64,
match_index: u64,
last_heartbeat: std::time::Instant,
}
impl ReplicationCoordinator {
async fn replicate_to_followers(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let mut futures = Vec::new();
for (follower_id, state) in &mut self.followers {
let future = self.log.replicate_to_follower(
follower_id.clone(),
state.match_index,
);
futures.push(future);
}
// Wait for replication to complete
futures::future::try_join_all(futures).await?;
Ok(())
}
fn update_commit_index(&mut self) {
// Find the highest match_index that has been replicated to a majority
let mut match_indices: Vec<u64> = self.followers
.values()
.map(|state| state.match_index)
.collect();
match_indices.sort_unstable();
if let Some(new_commit_index) = match_indices.get(self.followers.len() / 2) {
self.log.committed_index = *new_commit_index;
}
}
}
Key Features of Replication Log
- Ordered Entry Tracking
- Each entry has a unique sequence number
- Maintains strict ordering of operations
- Includes term numbers for leader election
- Consistency Checking
- Checksum verification for data integrity
- Sequence gap detection
- Term validation for leader changes
- Replication Management
- Tracks follower progress
- Handles catch-up scenarios
- Manages commit index updates
High-Water Mark: Ensuring Replication Consistency
The High-Water Mark is fundamental to maintaining consistency in replicated systems. It represents the newest confirmed data that has been safely replicated across the cluster.
Role in Replication
- Tracks confirmed writes across the cluster
- Ensures consistency between primary and replicas
- Controls when writes become visible to reads
use tokio::sync::mpsc;
use std::collections::{HashMap, HashSet};
#[derive(Debug)]
struct ReplicationManager {
high_water_mark: u64,
followers: HashSet<String>,
follower_progress: HashMap<String, u64>,
quorum_size: usize,
}
impl ReplicationManager {
fn new(quorum_size: usize) -> Self {
ReplicationManager {
high_water_mark: 0,
followers: HashSet::new(),
follower_progress: HashMap::new(),
quorum_size,
}
}
async fn replicate_entry(&mut self, entry: LogEntry) -> Result<(), Box<dyn std::error::Error>> {
let (tx, mut rx) = mpsc::channel(self.followers.len());
// Send to all followers
for follower_id in &self.followers {
let tx = tx.clone();
let entry = entry.clone();
let follower_id = follower_id.clone();
tokio::spawn(async move {
// Simulate sending to follower
tx.send((follower_id, entry.sequence)).await.unwrap();
});
}
// Track acknowledgments
let mut acks = 1; // Count primary
while let Some((follower_id, sequence)) = rx.recv().await {
self.follower_progress.insert(follower_id, sequence);
acks += 1;
if acks >= self.quorum_size {
// Update high water mark when quorum is reached
self.update_high_water_mark(sequence);
return Ok(());
}
}
Err("Failed to achieve quorum".into())
}
fn update_high_water_mark(&mut self, sequence: u64) {
let mut progress: Vec<u64> = self.follower_progress.values().cloned().collect();
progress.sort_unstable();
// High water mark is the highest sequence that has been replicated to a quorum
if let Some(new_hwm) = progress.get(self.quorum_size - 1) {
self.high_water_mark = *new_hwm;
}
}
fn is_write_visible(&self, sequence: u64) -> bool {
sequence <= self.high_water_mark
}
}
Putting Them All Together
struct DistributedStorage {
storage_engine: Arc<StorageEngine>,
log_cleaner: Arc<Mutex<LogCleaner>>,
replication_log: Arc<Mutex<ReplicationLog>>,
replication_coordinator: Arc<Mutex<ReplicationCoordinator>>,
high_water_mark_manager: Arc<Mutex<ReplicationManager>>,
}
impl DistributedStorage {
async fn write(&self, key: String, value: String) -> Result<(), Box<dyn std::error::Error>> {
// 1. Write to WAL and get sequence number
let sequence = self.storage_engine.write(key, value).await?;
// 2. Append to replication log
let log_entry = ReplicationLogEntry {
sequence,
term: self.current_term(),
data: serialize_operation(key, value)?,
checksum: calculate_checksum(&data),
};
{
let mut rep_log = self.replication_log.lock().await;
rep_log.append(log_entry.clone())?;
}
// 3. Replicate to followers
{
let mut coordinator = self.replication_coordinator.lock().await;
coordinator.replicate_to_followers().await?;
coordinator.update_commit_index();
}
// 4. Update high water mark
{
let mut hwm_manager = self.high_water_mark_manager.lock().await;
hwm_manager.update_high_water_mark(sequence);
}
// 5. Periodic cleanup using low water mark
if sequence % 1000 == 0 {
let mut log_cleaner = self.log_cleaner.lock().await;
let oldest_needed = self.calculate_oldest_needed_sequence().await?;
log_cleaner.update_low_water_mark(oldest_needed);
log_cleaner.garbage_collect()?;
}
Ok(())
}
async fn read(&self, key: String) -> Result<Option<String>, Box<dyn std::error::Error>> {
let replication_manager = self.high_water_mark_manager.lock().await;
let storage_engine = self.storage_engine.clone();
// Only return data that's confirmed replicated
if let Some((value, sequence)) = storage_engine.read_with_sequence(&key).await? {
if replication_manager.is_write_visible(sequence) {
return Ok(Some(value));
}
}
Ok(None)
}
}
Conclusion
Understanding these four fundamental patterns is crucial for building robust distributed storage systems. While they often work together, each serves a specific purpose:
- WAL provides durability and recovery
- Replication Log ensures distributed consensus
- Low-Water Mark manages storage cleanup
- High-Water Mark ensures consistency and visibility
By properly implementing these patterns, we can build systems that are not only reliable and consistent but also efficient in their resource usage. The separation of concerns between these patterns allows for better system maintenance, debugging, and scaling.
In the next post, we'll explore how to handle partition tolerance and network failures in such systems. Stay tuned!