Rust:深入瞭解線程池
在某些情況下,你需要併發地執行許多短期任務。創建和銷燬執行這些任務線程的開銷可能會抑制程序的性能。解決這個問題的一個辦法是建立一個任務池,並在需要時將它們從這個任務池中取出。
任務池的另一個優點是,可用線程的數量可以根據可用的計算資源進行調整,即處理器內核的數量或內存量。
這些任務的約束之一是它們不是相互依賴的,也就是說,一個任務的結果不依賴於前一個任務的結果,或者下一個任務不應依賴於當前任務的結果。這使任務保持隔離,並且易於存儲在池中。
典型的用例包括:
Web 服務和 api 服務: 請求通常非常小且生命週期很短,因此非常適合於線程池,實際上許多 web 服務器都實現了線程池。
批量處理圖像、視頻文件或音頻文件: 例如,調整圖像大小也是非常適合線程池的小型且定義良好的任務。
數據處理管道: 數據處理管道中的每個階段都可以由線程池處理。如前所述,任務不應該相互依賴,以提高線程池的效率。
使用 Rust 實現線程池
在這個例子中,我們將構建一個簡單的線程池,但這可以很容易地擴展到一個真正的線程池。
在開始之前,需要添加一個庫到 Cargo.toml 文件中:
[dependencies]
fstrings = "0.2.3"
我們將使用這個 crate 以類似 python 的方式格式化字符串。
接下來在 src/main.rs 文件中添加以下幾行:
use std::sync::{Arc, Mutex};
use std::thread;
#[macro_use]
extern crate fstrings;
-
使用 Arc 和 Mutex 來保證線程池線程安全
-
使用 std::thread 可以生成新的線程
定義任務
在 main.rs 中定義一個 WebRequest 結構體:
struct WebRequest {
work: Box<dyn FnOnce(&str) + Send + Sync>,
}
impl WebRequest {
fn new<F>(f: F) -> Self
where
F: FnOnce(&str) + Send + Sync + 'static,
{
WebRequest { work: Box::new(f) }
}
}
在這段代碼中,WebRequest 包含一個字段 work,它是一個 Box 封裝的閉包。爲什麼要使用 Box?因爲閉包的大小是動態的,換句話說,它的大小在編譯時是未知的,所以我們需要將它存儲在像 Box 這樣的堆分配容器中。Send 和 Sync 特性向編譯器表明,這個特定的閉包可以安全地跨多個線程發送和訪問。
構造函數接受閉包作爲它的唯一參數。當然,它必須滿足與結構體中字段相同的約束。靜態生命週期是必需的,因爲閉包可能比定義它的作用域活得更長。
實現線程池
在 main.rs 中定義一個 ThreadPool 結構體:
struct ThreadPool {
workers: Vec<thread::JoinHandle<()>>,
queue: Arc<Mutex<Vec<WebRequest>>>,
}
-
workers 向量,它表示工作線程集合。每個元素都是一個線程的句柄。我們需要持有這個句柄,以便等待線程的完成。
-
queue,這是一個任務隊列,每個任務由一個工作線程執行。Arc 允許在多個線程之間共享,並且我們可以通過使用 Mutex 結構體確保線程可以安全的訪問隊列。
現在我們看一下實現。首先是構造函數:
impl ThreadPool {
fn new(num_threads: usize) -> ThreadPool {
let mut workers = Vec::with_capacity(num_threads);
let queue = Arc::new(Mutex::new(Vec::<WebRequest>::new()));
for i in 0..num_threads {
let number = f!("Request {i}");
let queue_clone = Arc::clone(&queue);
workers.push(thread::spawn(move || loop {
let task = queue_clone.lock().unwrap().pop();
if let Some(task) = task {
(task.work)(&number);
} else {
break;
}
}));
}
ThreadPool { workers, queue }
}
}
此方法使用指定的線程數初始化池,創建隊列。之後,構造函數生成工作線程。這些線程進入一個循環,彈出隊列的任務,並執行它們。如果隊列恰好爲空,則工作線程中斷循環。
然後,我們看一下 execute() 方法:
impl ThreadPool {
......
fn execute<F>(&self, f: F)
where
F: FnOnce(&str) + Send + Sync + 'static,
{
let task = WebRequest::new(f);
self.queue.lock().unwrap().push(task);
}
}
這個方法只是用指定的閉包創建一個新的 WebRequest,並將其 push 到任務隊列中。
接下來,我們看一下 join() 方法:
impl ThreadPool {
......
fn join(self) {
for worker in self.workers {
worker.join().unwrap();
}
}
}
這是一個阻塞操作,等待線程完成。
測試
使用如下代碼測試線程池:
fn main() {
let pool = ThreadPool::new(6);
for i in 0..6 {
pool.execute(move |message| {
println!("Task: {} prints {}",i, message);
});
}
pool.join();
}
執行結果如下:
Task: 3 prints Request 3
Task: 1 prints Request 3
Task: 5 prints Request 1
Task: 2 prints Request 5
Task: 0 prints Request 3
Task: 4 prints Request 4
總結
正如你所看到的,這種模式非常靈活,但是使用時,請考慮以下影響性能和資源因素:
-
如果程序銷燬線程的速度太慢,或者銷燬被阻塞,這可能會使其他線程缺乏資源。
-
如果創建太多線程,創建未使用的線程會浪費資源和時間。
-
如果線程創建時間過長,則會影響性能。
-
銷燬太多的線程可能會在以後需要重新創建它們時耗費時間。
總而言之,找到正確的線程數有時可能非常清楚,有時需要使用一些嘗試和錯誤來找到最佳數量。更高級的做法是可以根據需求動態地增加和減少可用線程的數量。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/YW_6g5gVf0RqO0vSxjZ_Lg