如何在 Rust 中實現 HTTP 長輪詢
實時通信顧名思義就是儘可能快地傳播新數據。在討論實時數據時,有兩種工作負載:
-
低延遲、多向流:Websockets。
-
中等延遲、單向流:短輪詢、服務器發送事件 (SSE) 和長輪詢。
今天,我們將研究後者,因爲它是你在開發 Web 應用程序時最常遇到的工作負載。
短輪詢
短輪詢
實時通信的第一種方法是短輪詢。
在這種情況下,客戶端向服務器發送請求,服務器立即回覆。如果沒有新數據,則響應爲空。而大多數時候,情況就是這樣的。所以大多數時候,服務器的響應是空的,這本來是可以避免的。
因此,短輪詢在網絡傳輸和 CPU 方面都是浪費的,因爲每次都需要解析和編碼請求。
唯一的優點就是簡單。
服務器發送事件 (SSE)
SSE
與 WebSockets 相反,SSE 流是單向的:只有服務器可以將數據發送回客戶端。此外,自動重新連接的機制(通常)內置於客戶端。
缺點是實現服務器端並不容易。
長輪詢
長輪詢
最後是長輪詢:客戶端發出請求,並指示它擁有的最後一條數據,服務器僅在有新數據可用或達到一定時間時纔將響應發送回去。
它的優點是實現起來極其簡單,因爲它不是一個流,而是一個簡單的請求—響應方案,因此非常健壯,不需要複雜的自動重連算法,並且可以優雅地處理網絡錯誤。此外,與短輪詢相反,長輪詢在資源使用方面的浪費較少。
唯一的缺點是,長輪詢的延遲不如 WebSockets 好,但在大多數情況下並不重要。
長輪詢在 Rust 中非常有效:多虧有 async
,使得每個打開的連接使用的資源(一個簡單的任務)很少,而許多語言使用整個操作系統線程。
最後,由於長輪詢是簡單的 HTTP 請求,因此這種技術更有可能不被某種激進的防火牆或網絡設備阻止。
Rust 中的長輪詢
我們將使用由 tokio 團隊 [1] 開發的新 Web 框架:axum[2]。它的性能和簡單性在 Rust 界中是無與倫比的。另外,請注意,將此代碼移植到另一個 Web 框架很容易。
我們將實現一個簡單的聊天服務器,因爲聊天是從長輪詢中獲益最多的教科書應用程序。
有 3 個技巧可以使這個實現起來更高效,可以關注一下。
聊天服務
聊天服務是一個封裝了我們所有業務邏輯的對象。爲了使示例簡單,我們將只進行數據庫調用。
這是我們的第一個技巧:爲了啓用消息排序,我們不使用 UUIDv4
。相反,我們使用轉換爲 UUID 的 ULID[3],因此序列化 / 反序列化它沒有問題:Uuid = Ulid::new().into()
chat.rs
impl ChatService {
pub fn new(db: DB) -> Self {
ChatService { db }
}
pub async fn create_message(&self, body: String) -> Result<Message, Error> {
if body.len() > 10_000 {
return Err(Error::InvalidArgument("Message is too large".to_string()));
}
let created_at = chrono::Utc::now();
let id: Uuid = Ulid::new().into();
let query = "INSERT INTO messages
(id, created_at, body)
VALUES ($1, $2, $3)";
sqlx::query(query)
.bind(id)
.bind(created_at)
.bind(&body)
.execute(&self.db)
.await?;
Ok(Message {
id,
created_at,
body,
})
}
}
這是我們的第二個技巧:注意 after.unwrap_or(Uuid::nil())
返回 “0” UUID ( 00000000-0000-0000-0000-000000000000
) 。有了 WHERE id > $1
它,我們就可以返回所有消息(如果 after
是 none
)。
例如,恢復客戶端的整個狀態是很有用的。
pub async fn find_messages(&self, after: Option<Uuid>) -> Result<Vec<Message>, Error> {
let query = "SELECT *
FROM messages
WHERE id > $1";
let messages: Vec<Message> = sqlx::query_as::<_, Message>(query)
.bind(after.unwrap_or(Uuid::nil()))
.fetch_all(&self.db)
.await?;
Ok(messages)
}
}
網絡服務器
接下來,運行 Web 服務器的樣板。
由於 .layer(AddExtensionLayer::new(ctx))
,ServerContext
被注入到所有路由中,因此我們可以調用 ChatService
的方法。
struct ServerContext {
chat_service: chat::ChatService,
}
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
std::env::set_var("RUST_LOG", "rust_long_polling=info");
env_logger::init();
let database_url = std::env::var("DATABASE_URL")
.map_err(|_| Error::BadConfig("DATABASE_URL env var is missing".to_string()))?;
let db = db::connect(&database_url).await?;
db::migrate(&db).await?;
let chat_service = chat::ChatService::new(db);
let ctx = Arc::new(ServerContext::new(chat_service));
let app = Router::new()
.route(
"/messages",
get(handler_find_messages).post(handler_create_message),
)
.or(handler_404.into_service())
.layer(AddExtensionLayer::new(ctx));
log::info!("Starting server on 0.0.0.0:8080");
axum::Server::bind(
&"0.0.0.0:8080"
.parse()
.expect("parsing server's bind address"),
)
.serve(app.into_make_service())
.await
.expect("running server");
Ok(())
}
長輪詢
最後,我們的第三個技巧:長輪詢是一個簡單的循環 tokio::time::sleep
。
通過使用 tokio::time::sleep
,活動連接在等待時幾乎不會使用任何資源。
如果找到新數據,我們立即返回新數據。否則,我們再等一秒鐘。
10 秒後,我們返回空數據。
main.rs
async fn handler_find_messages(
Extension(ctx): Extension<Arc<ServerContext>>,
query_params: Query<FindMessagesQueryParameters>,
) -> Result<Json<Vec<Message>>, Error> {
let sleep_for = Duration::from_secs(1);
// long polling: 10 secs
for _ in 0..10u64 {
let messages = ctx.chat_service.find_messages(query_params.after).await?;
if messages.len() != 0 {
return Ok(messages.into());
}
tokio::time::sleep(sleep_for).await;
}
// return an empty response
Ok(Vec::new().into())
}
代碼在 GitHub 上
像往常一樣,你可以在 GitHub 上找到代碼:github.com/skerkour/kerkour.com[4]。
參考資料
[1]
tokio 團隊: https://github.com/tokio-rs
[2]
axum: https://github.com/tokio-rs/axum
[3]
ULID: https://github.com/ulid/spec
[4]
github.com/skerkour/kerkour.com: https://github.com/skerkour/kerkour.com/tree/main/2021/rust_long_polling
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/OJchpQzSGSDRvUTu5R0RsA