PingCAP 故障注入利器 fail-rs

年初分享過聊聊 Go failpoint 使用,感興趣的可以看看看這篇文章

Failpoints 是一個允許在運行時注入錯誤或是其它行爲的工具,主要用於測試目的,包括 ut 單測,集成壓測等等。測試的內容包括狀態機錯誤,磁盤錯誤,網絡 IO 延遲

可以注入的行爲有:panic, early returns, sleeping 等等,注入的行爲可以通過環境變量或代碼進行控制。一般推薦用 http 或集成公司的配置平臺,觸發規則可以是次數,概率或是兩種的結合

入門案例

首先配置依賴,Cargo.toml

[dependencies]
fail = "0.4"

我們依賴 0.4 版本

use fail::{fail_point, FailScenario};

fn do_fallible_work() {
    fail_point!("read-dir");
    println!("mock working now");
}

fn main() {
    let scenario = FailScenario::setup();
    do_fallible_work();
    scenario.teardown();
    println!("done");
}

do_fallible_work 函數只做兩件事情,執行 read-dir 注入點,打印消息用於模擬函數處理請求

FAILPOINTS=read-dir="panic" cargo run
mock working now
done

通過環境變量注入 panic 語句,條件編譯默認沒有開啓,所以正常輸出

FAILPOINTS=read-dir="panic" cargo run --features fail/failpoints
mock working now
thread 'main' panicked at 'failpoint read-dir panic', /Users/zerun.dong/.cargo/registry/src/github.com-1ecc6299db9ec823/fail-0.4.0/src/lib.rs:488:25
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

cargo 指定 --features fail/failpoints, 發生 Panic 符合預期

FAILPOINTS=read-dir="sleep(2000)" cargo run --features fail/failpoints

當然我們也可以指定其它行爲,比如 sleep(2000) 休眠 2 秒

use fail::{fail_point, FailScenario};
use std::io;

fn do_fallible_work() -> io::Result<()>{
    println!("mock working now");
    fail_point!("read-dir"|_| {
        Err(io::Error::new(io::ErrorKind::PermissionDenied, "error"))
    });
    Ok(())
}

fn main() -> io::Result<(){
    let scenario = FailScenario::setup();
    do_fallible_work()?;
    do_fallible_work()?;
    scenario.teardown();
    println!("done");
    Ok(())
}

這是測試提前返回 early return 的案例,需要使用閉包來封裝 error

FAILPOINTS=read-dir=return cargo run --features fail/failpoints
mock working now
Error: Custom { kind: PermissionDenied, error: "error" }

上面是普通用法,也可以指定多個 action

FAILPOINTS=read-dir="1*sleep(2000)->return" cargo run --features fail/failpoints
mock working now
mock working now
Error: Custom { kind: PermissionDenied, error: "error" }

"1*sleep(2000)->return" 表示第一次休眠 2 秒,然後第二次時提前返回。關於更多高級用法,請參考官網 https://docs.rs/fail

零性能消耗

最重要的要求是:集成 Failpoint 的代碼,在線上正式環境運行時,要做到零性能消耗

func test() {
    failpoint.Inject("testValue", func(v failpoint.Value) {
        fmt.Println(v)
    })
}

這是 go 測試代碼,failpoint.Injectmarker 函數,參數是名稱和閉包

// failpoint.Inject("fail-point-name", func(_ failpoint.Value) (...){}
func Inject(fpname string, fpbody interface{}) {}

由於 Inject 是空函數體,編譯時會被優化掉,所以運行時零性能消耗。當線下測試時,需要執行 failpoint-ctl 將所有 marker 函數轉化成注入函數

func test() {
 if v, _err_ := failpoint.Eval(_curpkg_("testValue")); _err_ == nil {
  fmt.Println(v)
 }
}

上面是轉換後的代碼,原理不難,解析 AST 替換語法樹。那麼 rust 如何實現呢?答案是 macro 宏 + 條件編譯

/// Define a fail point (disabled, see `failpoints` feature).
#[macro_export]
#[cfg(not(feature = "failpoints"))]
macro_rules! fail_point {
    ($name:expr, $e:expr) ={{}};
    ($name:expr) ={{}};
    ($name:expr, $cond:expr, $e:expr) ={{}};
}

當 cargo build 編譯時未指定 failpints feature, fail_point 宏對應空實現

#[cfg(feature = "failpoints")]
macro_rules! fail_point {
    ($name:expr) ={{
        $crate::eval($name|_| {
            panic!("Return is not supported for the fail point \"{}\""$name);
        });
    }};
    ($name:expr, $e:expr) ={{
        if let Some(res) = $crate::eval($name$e) {
            return res;
        }
    }};
    ($name:expr, $cond:expr, $e:expr) ={{
        if $cond {
            fail_point!($name$e);
        }
    }};
}

指定 feature 時,對應上面的宏實現,編譯期展開成相應的邏輯代碼。fail_point 宏有三種形式,模式匹配到不同的參數表達式 (designators) 對應不同代碼塊

實現原理

1. 註冊中心

/// Registry with failpoints configuration.
type Registry = HashMap<String, Arc<FailPoint>>;

#[derive(Debug, Default)]
struct FailPointRegistry {
    // TODO: remove rwlock or store *mut FailPoint
    registry: RwLock<Registry>,
}

lazy_static::lazy_static! {
    static ref REGISTRY: FailPointRegistry = FailPointRegistry::default();
    static ref SCENARIO: Mutex<&'static FailPointRegistry> = Mutex::new(®ISTRY);
}

註冊中心 Registry 是 HashMap 類型,key 是上面測試例子的 name, value 是 Arc<Failpoint> 類型,Arc 用於併發環境下共享所有權

struct FailPoint {
    pause: Mutex<bool>,
    pause_notifier: Condvar,
    actions: RwLock<Vec<Action>>,
    actions_str: RwLock<String>,
}

pause 表示是否暫停,pause_notifier 用於暫停通知,actions 是一個數組,因爲一個 fail_point 注入可以有多個動作,actions_str 是表示任務的字符串,通過 from_str 轉化成 action 結構體

2. 生成任務

FailScenario::setup() 通過獲取 FAILPOINTS 環境變量來初始化注入動作,暫時不支持通過 http 方式

解析後通過 set 函數將多個注入動作解析,註冊到上文提到的 Registry

fn set(
    registry: &mut HashMap<String, Arc<FailPoint>>,
    name: String,
    actions: &str,
) -> Result<(), String> {
    let actions_str = actions;
    // `actions` are in the format of `failpoint[->failpoint...]`.
    let actions = actions
        .split("->")
        .map(Action::from_str)
        .collect::<Result<_, _>>()?;
    // Please note that we can't figure out whether there is a failpoint named `name`,
    // so we may insert a failpoint that doesn't exist at all.
    let p = registry
        .entry(name)
        .or_insert_with(|| Arc::new(FailPoint::new()));
    p.set_actions(actions_str, actions);
    Ok(())
}

這裏面用 Action::from_str 將字符串解析成 Action

#[derive(Clone, Debug, PartialEq)]
enum Task {
    /// Do nothing.
    Off,
    /// Return the value.
    Return(Option<String>),
    /// Sleep for some milliseconds.
    Sleep(u64),
    /// Panic with the message.
    Panic(Option<String>),
    /// Print the message.
    Print(Option<String>),
    /// Sleep until other action is set.
    Pause,
    /// Yield the CPU.
    Yield,
    /// Busy waiting for some milliseconds.
    Delay(u64),
    /// Call callback function.
    Callback(SyncCallback),
}

#[derive(Debug)]
struct Action {
    task: Task,
    freq: f32,
    count: Option<AtomicUsize>,
}

Action 類型都不一樣,freq 控制頻率,count 控制觸發次數

3. 觸發任務

大前提肯定是條件編譯打開了 failpoint, 直接看 macro 實現

pub fn eval<R, F: FnOnce(Option<String>) -> R>(name: &str, f: F) -> Option<R> {
    let p = {
        let registry = REGISTRY.registry.read().unwrap();
        match registry.get(name) {
            None =return None,
            Some(p) => p.clone(),
        }
    };
    p.eval(name).map(f)
}

邏輯比較簡單,從 Registry 註冊中心 map 找到對應 failpoint, 然後調用 failpoint.eval 函數,並且針對所有返回值執行閉包 f (如果有值)

#[cfg_attr(feature = "cargo-clippy", allow(clippy::option_option))]
fn eval(&self, name: &str) -> Option<Option<String>> {
    let task = {
        let actions = self.actions.read().unwrap();
        match actions.iter().filter_map(Action::get_task).next() {
            Some(Task::Pause) ={
                let mut guard = self.pause.lock().unwrap();
                *guard = true;
                loop {
                    guard = self.pause_notifier.wait(guard).unwrap();
                    if !*guard {
                        break;
                    }
                }
                return None;
            }
            Some(t) => t,
            None =return None,
        }
    };

    match task {
        Task::Off ={}
        Task::Return(s) =return Some(s),
        Task::Sleep(t) => thread::sleep(Duration::from_millis(t)),
        Task::Panic(msg) => match msg {
            Some(ref msg) => panic!("{}", msg),
            None => panic!("failpoint {} panic", name),
        },
        Task::Print(msg) => match msg {
            Some(ref msg) => log::info!("{}", msg),
            None => log::info!("failpoint {} executed.", name),
        },
        Task::Pause => unreachable!(),
        Task::Yield => thread::yield_now(),
        Task::Delay(t) ={
            let timer = Instant::now();
            let timeout = Duration::from_millis(t);
            while timer.elapsed() < timeout {}
        }
        Task::Callback(f) ={
            f.run();
        }
    }
    None
}

eval 函數不難,首先調用 get_task 獲取要執行的 Action, 這裏 Pause 動作單獨處理,其它的通過 match 模式匹配。同時也能看到,如果 Return 不指定閉包 f, 那麼返回值是 Some(""), 觸發 macro 的默認 panic 閉包

fn get_task(&self) -> Option<Task> {
  use rand::Rng;

  if let Some(ref cnt) = self.count {
      let c = cnt.load(Ordering::Acquire);
      if c == 0 {
          return None;
      }
  }
  if self.freq < 1f32 && !rand::thread_rng().gen_bool(f64::from(self.freq)) {
      return None;
  }
  if let Some(ref ref_cnt) = self.count {
      let mut cnt = ref_cnt.load(Ordering::Acquire);
      loop {
          if cnt == 0 {
              return None;
          }
          let new_cnt = cnt - 1;
          match ref_cnt.compare_exchange_weak(
              cnt,
              new_cnt,
              Ordering::AcqRel,
              Ordering::Acquire,
          ) {
              Ok(_) => break,
              Err(c) =cnt = c,
          }
      }
  }
  Some(self.task.clone())
}

get_task 先判斷執行次數,如果爲 0 返回空。然後判斷頻率,如果沒有觸發返回空,最後再判斷一次計數,並 cas 更新。這裏 count 計數字段類型是 Option<AtomicUsize>, 如果不指定次數默認無限制

小結

寫文章不容易,如果對大家有所幫助和啓發,請大家幫忙點擊在看點贊分享 三連

關於 Failpoint 大家有什麼看法,歡迎留言一起討論,大牛多留言 ^_^

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/bOzSMrvkhquelEWEugtrmw