Rusty Circuit Breaker 🦀

Photo by Greg Bulla / Unsplash

In this blog, let us have fun implementing a software wheel, Circuit Breaker, in Rust.

What is Circuit Breaker?

I've talked about Circuit Breaker in one of my previous posts, Microservice Governance - Resilience Patterns - Part 2. If you are unfamiliar with its basic concept, please check it out first.

It's crucial to make transmissions between states of Circuit Breaker LSM clear before digging into the implementations. So I will quote some parts about the LSM in my previous post here.

We could use an LSM (Limited State Machine) to illustrate the transmissions between statuses, as below figure.
State - Closed: The circuit breaker is closed, and the target service can be accessed. The circuit breaker maintains a counter of request failures. If it encounters a failed request, the counter will increase 1.
State - Open: The circuit breaker is open, and the target service can not be accessed. A request to the target service will fail quickly.
State - Half-Open: The circuit breaker is half-open. It is allowed to try to access the target service. If the request can be accessed successfully, it means that the service is back to normal. Otherwise, the service is still performing poorly.

Let's Get Rusty!

The following sections will try to implement a Circuit Breaker in Rust gradually from a naive solution to a production-ready one. Let's get rusty! 🦀

A Straw Man Solution

This is our most naive solution without taking thread-safe and non-blocking into consideration. We will only implement the following basic functionality requirements in this solution:

  1. Support customizing the maximum failures threshold and trip timeout duration in the initialization of a Circuit Breaker
  2. Delegate function calls with Fail-fast protection
  3. Allowing a limited number of requests to pass through after the trip timeout duration has elapsed.
use std::time::{Duration, Instant};
use std::thread;

#[derive(Debug)]
enum State {
    // The circuit breaker is closed and allowing requests
    // to pass through
    Closed,
    // The circuit breaker is open and blocking requests
    Open,
    // The circuit breaker is half-open and allowing a limited
    // number of requests to pass through
    HalfOpen,
}

struct CircuitBreaker {
    state: State,
    // The duration to wait before transitioning from the
    // open state to the half-open state
    trip_timeout: Duration,
    // The maximum number of requests allowed through in
    // the closed state
    max_failures: usize,
    // The number of consecutive failures in the closed
    // state
    consecutive_failures: usize,
    // The time when the circuit breaker transitioned to the
    // open state
    opened_at: Option<Instant>,
}

impl CircuitBreaker {
    pub fn new(max_failures: usize, trip_timeout: Duration) -> CircuitBreaker {
        CircuitBreaker {
            state: State::Closed,
            max_failures,
            trip_timeout,
            consecutive_failures: 0,
            opened_at: None,
        }
    }

    pub fn call<F, T, E>(&mut self, f: F) -> Option<Result<T, E>>
    where
        F: FnOnce() -> Result<T, E>,
    {
        match self.state {
            // If the circuit breaker is closed, try the request
            // and track the result
            State::Closed => {
                if self.consecutive_failures < self.max_failures {
                    let result = f();
                    if let Err(_) = result {
                        self.record_failure();
                    }
                    Some(result)
                } else {
                    self.opened_at = Some(Instant::now());
                    self.state = State::Open;
                    self.consecutive_failures = 0;
                    None
                }
            }
            // If the circuit breaker is open, check if it's time
            // to transition to the half-open state
            State::Open => {
                if let Some(opened_at) = self.opened_at {
                    let elapsed = Instant::now().duration_since(opened_at);
                    if elapsed >= self.trip_timeout {
                        self.state = State::HalfOpen;
                        self.opened_at = None;
                    }
                }
                None
            }
            // If the circuit breaker is half-open, attempt a request to pass through
            State::HalfOpen => {
                let result = f();
                if let Err(_) = result {
                    self.state = State::Open;
                } else {
                    self.state = State::Closed;
                }
                Some(result)
            }
        }
    }

    fn record_failure(&mut self) {
        match self.state {
            State::Closed => self.consecutive_failures += 1,
            State::Open => (),
            State::HalfOpen => self.consecutive_failures += 1,
        }
    }
}

fn request(dice: u32) -> Result<u32, String> {
    if dice > 6 {
        Err("400: Bad request.".to_string())
    } else {
        Ok(dice)
    }
}

fn main() {

    let mut cb = CircuitBreaker::new(3, Duration::from_secs(10));
    println!("Circuit Breaker has been set with");
    println!("    * 3 as maximum consecutive failures");
    println!("    * 10 seconds as the trip timeout");
    println!("");

    println!("Circuit Breaker is in the initial state, which is closed.");
    // The circuit breaker is in the closed state, so the function
    // will be executed
    let result = cb.call(|| request(5));
    println!("Result for request_dice(5): {:?}", result);

    println!("Circuit Breaker is encounting 3 errors in a row ...");
    // The function returns an error 3 times in a row, so the circuit
    // breaker transitions to the open state
    cb.call(|| request(10));
    cb.call(|| request(10));
    cb.call(|| request(10));

    // The circuit breaker is in the open state, so the function is
    // not executed
    let result = cb.call(|| request(2));
    println!("Result for request_dice(2): {:?}", result);

    // The circuit breaker is in the half-open state after trip_timeout
    // seconds, so the function will be executed
    println!("Let's have fun by doing nothing in 20 seconds :)");
    println!("...");
    thread::sleep(Duration::from_secs(20));
    let result = cb.call(|| request(5));
    println!("Result for request_dice(5): {:?}", result);
    let result = cb.call(|| request(6));
    println!("Result for request_dice(6): {:?}", result);
}

Timer-driven State Transitioning and Thread-safe

The first issue of last solution is the delay of state transitioning. If you run the naive solution, you will find that the first request right after trip_timeout elapsed are not allow to pass through. Because the above implementation passively reset the state to HalfOpen by waiting for an additional request, which is not perfect. We can resolve this issue by enabling a Timer in a separate thread to help us reset the state to HalfOpen after the trip timed out.

And also the naive solution can not be used in multiple threads environment, such as a Service Mesh Proxy. Because the state and consecutive_failures will be running in Race Condition when multiple threads are trying to access a same Circuit Breaker. But fortunately, Rust has a lot of handy multiple-thread primitives to make our application thread-safe.

Let's figure out how to resolve these issues in practice. Please see the code comments to find out the difference than the last solution.

use std::time::Duration;
use std::thread;
use std::sync::{mpsc, Arc, RwLock};
use std::sync::atomic::{AtomicUsize, Ordering};

#[derive(Debug)]
struct Timer {
    duration: Duration,
    tx: mpsc::SyncSender<State>,
}

impl Timer {
    fn new(duration: Duration, tx: mpsc::SyncSender<State>) -> Self {
        Timer { duration, tx }
    }
	
    /// Timer will spawn a seperate thread to count down the
    /// trip timeout, and then send a state update to the
    /// Circuit Breaker.
    fn start(&self) {
        let duration = self.duration;
        let tx = self.tx.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            tx.send(State::HalfOpen).unwrap();
        });
    }
}

#[derive(Debug)]
enum State {
    // Nothing changed.
    ...
}

struct CircuitBreaker {
    // We use `RwLock` to guard the value retrieve and update,
    // `Arc` to deal with multiple references 
    state: Arc<RwLock<State>>,
    // We add a transition sender of state update channel to 
    // our solution
    state_tx: mpsc::SyncSender<State>,
    // We change our trip_timeout from Duration to a Timer.
    // The timer will wait for trip_timeout duration before
    // transitioning from the open state to the half-open state
    trip_timer: Timer,
    // Nothing changed for `max_failures`.
    // The maximum number of requests allowed through in
    // the closed state
    max_failures: usize,
    // We make consecutive_failures thread-safe by changing it to 
    // an `Atomic` variable and wrapping it with `Arc`
    // The number of consecutive failures in the closed
    // state
    consecutive_failures: Arc<AtomicUsize>,
}

impl CircuitBreaker {
    pub fn new(max_failures: usize, trip_timeout: Duration) -> CircuitBreaker {
        // It's needed to create a synchronous channel to handle
        // states updates in Circuit Breaker initialization.
        let (state_tx, state_rx) = mpsc::sync_channel(0);
        let timer_state_tx = state_tx.clone();
        let cb = CircuitBreaker {
            state: Arc::new(RwLock::new(State::Closed)),
            state_tx,
            max_failures,
            trip_timer: Timer::new(trip_timeout, timer_state_tx),
            consecutive_failures: Arc::new(AtomicUsize::new(0)),
        };
        // We spawn a seperate thread to receive the state updates from
        // the sync channel and reset the state of our Circuit Breaker.
        cb.spawn_state_update(state_rx);
        cb
    }

    pub fn call<F, T, E>(&mut self, f: F) -> Option<Result<T, E>>
    where
        F: FnOnce() -> Result<T, E>,
    {
        let state = self.state.read().unwrap();
        match *state {
            // If the circuit breaker is closed, try the request
            // and track the result
            State::Closed => {
                if self.consecutive_failures.load(Ordering::Relaxed) < self.max_failures {
                    let result = f();
                    if let Err(_) = result {
                        self.record_failure();
                    }
                    Some(result)
                } else {
                    // Update state by sending new state.
                    self.state_tx.send(State::Open).unwrap();
                    // Update the atomic variable to count consecutive
                    // failures.
                    self.consecutive_failures.store(0, Ordering::Relaxed);
                    // Start our trip timer right after sending out
                    // the state update.
                    self.trip_timer.start();
                    None
                }
            }
            // In this solution, we do nothing and fail-fast in the
            // Open state.
            State::Open => {
                None
            }
            // We only changed the way of updating states in this part.
            // If the circuit breaker is half-open, attempt a request
            // to pass through.
            State::HalfOpen => {
                let result = f();
                if let Err(_) = result {
                    self.state_tx.send(State::Open).unwrap();
                    self.trip_timer.start();
                } else {
                    self.state_tx.send(State::Closed).unwrap();
                }
                Some(result)
            }
        }
    }

    fn spawn_state_update(&self, state_rx: mpsc::Receiver<State>) {
        let state_lock = self.state.clone();
        // Run a loop receiving each update and reseting the state.
        thread::spawn(move || {
            while let Ok(new_state) = state_rx.recv() {
                let mut state = state_lock.write().unwrap();
                *state = new_state;
            };
        });
    }

    fn record_failure(&self) -> usize {
        let state = self.state.read().unwrap();
        // No logic change here, only bump up the atomic counter.
        match *state {
            State::Closed => self.consecutive_failures.fetch_add(1, Ordering::Relaxed),
            State::Open => 0,
            State::HalfOpen => self.consecutive_failures.fetch_add(1, Ordering::Relaxed),
        }
    }
}

fn request(dice: u32) -> Result<u32, String> {
    ...
}

fn main() {
    ...
}

Actually this is still far from a production-ready circuit breaker. We could do more to improve our solution, such as making it asynchronous using Tokio, enabling a Service-awarness Pool to reuse existing Circuit Breakers, or decreasing the sensitiveness of HalfOpen checking by attempt more but limited times. If you want to know how a battle-proved Circuit Breaker is implemented, you could check the source code of Envoy proxy here.

Summary

Firstly, we discussed a straw man solution to show what are the basic requirements of a circuit breaker in this article.

And then we talked about the issues of the naive solution, which are:

  • Delay of state transitioning,
  • State Race Condition

The source code of our second solution showcases how to use Timer and Rust multi-thread primitives to resolve them accordingly.

Though there are more work to do to make it production-ready, this is a good practice for learning how a circuit breaker works and Rust programming.

It's the last working day of 2022 as this blog post writing, it's been a great journey for me to dig into Service Mesh and Kubernetes in my daily work and learning Rust programming in my spare time across this whole year. I learned a lot about distributed system and system programming. I will try to have more articles about them in 2023. Thanks a lot, I hope you have a great New Year's Eve tomorrow and a SHINY new year in 2023. 👋🏼

Haili Zhang

Haili Zhang

Comments

Sign in or become a SoftWheel member to read and leave comments.