如何在 Rust 中實現 HTTP 長輪詢

實時通信顧名思義就是儘可能快地傳播新數據。在討論實時數據時,有兩種工作負載:

今天,我們將研究後者,因爲它是你在開發 Web 應用程序時最常遇到的工作負載。

短輪詢

短輪詢

實時通信的第一種方法是短輪詢。

在這種情況下,客戶端向服務器發送請求,服務器立即回覆。如果沒有新數據,則響應爲空。而大多數時候,情況就是這樣的。所以大多數時候,服務器的響應是空的,這本來是可以避免的。

因此,短輪詢在網絡傳輸和 CPU 方面都是浪費的,因爲每次都需要解析和編碼請求。

唯一的優點就是簡單。

服務器發送事件 (SSE)

SSE

與 WebSockets 相反,SSE 流是單向的:只有服務器可以將數據發送回客戶端。此外,自動重新連接的機制(通常)內置於客戶端。

缺點是實現服務器端並不容易。

長輪詢

長輪詢

最後是長輪詢:客戶端發出請求,並指示它擁有的最後一條數據,服務器僅在有新數據可用或達到一定時間時纔將響應發送回去。

它的優點是實現起來極其簡單,因爲它不是一個流,而是一個簡單的請求—響應方案,因此非常健壯,不需要複雜的自動重連算法,並且可以優雅地處理網絡錯誤。此外,與短輪詢相反,長輪詢在資源使用方面的浪費較少。

唯一的缺點是,長輪詢的延遲不如 WebSockets 好,但在大多數情況下並不重要。

長輪詢在 Rust 中非常有效:多虧有 async,使得每個打開的連接使用的資源(一個簡單的任務)很少,而許多語言使用整個操作系統線程。

最後,由於長輪詢是簡單的 HTTP 請求,因此這種技術更有可能不被某種激進的防火牆或網絡設備阻止。

Rust 中的長輪詢

我們將使用由 tokio 團隊 [1] 開發的新 Web 框架:axum[2]。它的性能和簡單性在 Rust 界中是無與倫比的。另外,請注意,將此代碼移植到另一個 Web 框架很容易。

我們將實現一個簡單的聊天服務器,因爲聊天是從長輪詢中獲益最多的教科書應用程序。

有 3 個技巧可以使這個實現起來更高效,可以關注一下。

聊天服務

聊天服務是一個封裝了我們所有業務邏輯的對象。爲了使示例簡單,我們將只進行數據庫調用。

這是我們的第一個技巧:爲了啓用消息排序,我們不使用 UUIDv4。相反,我們使用轉換爲 UUID 的 ULID[3],因此序列化 / 反序列化它沒有問題:Uuid = Ulid::new().into()

chat.rs

impl ChatService {
    pub fn new(db: DB) -> Self {
        ChatService { db }
    }

    pub async fn create_message(&self, body: String) -> Result<Message, Error> {
        if body.len() > 10_000 {
            return Err(Error::InvalidArgument("Message is too large".to_string()));
        }

        let created_at = chrono::Utc::now();
        let id: Uuid = Ulid::new().into();

        let query = "INSERT INTO messages
            (id, created_at, body)
            VALUES ($1$2$3)";

        sqlx::query(query)
            .bind(id)
            .bind(created_at)
            .bind(&body)
            .execute(&self.db)
            .await?;

        Ok(Message {
            id,
            created_at,
            body,
        })
    }
}

這是我們的第二個技巧:注意 after.unwrap_or(Uuid::nil()) 返回 “0” UUID ( 00000000-0000-0000-0000-000000000000)  。有了 WHERE id > $1 它,我們就可以返回所有消息(如果 afternone)。

例如,恢復客戶端的整個狀態是很有用的。

    pub async fn find_messages(&self, after: Option<Uuid>) -> Result<Vec<Message>, Error> {
        let query = "SELECT *
            FROM messages
            WHERE id > $1";

        let messages: Vec<Message> = sqlx::query_as::<_, Message>(query)
            .bind(after.unwrap_or(Uuid::nil()))
            .fetch_all(&self.db)
            .await?;

        Ok(messages)
    }
}

網絡服務器

接下來,運行 Web 服務器的樣板。

由於 .layer(AddExtensionLayer::new(ctx))ServerContext 被注入到所有路由中,因此我們可以調用 ChatService 的方法。

struct ServerContext {
    chat_service: chat::ChatService,
}

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    std::env::set_var("RUST_LOG""rust_long_polling=info");
    env_logger::init();

    let database_url = std::env::var("DATABASE_URL")
        .map_err(|_| Error::BadConfig("DATABASE_URL env var is missing".to_string()))?;

    let db = db::connect(&database_url).await?;
    db::migrate(&db).await?;

    let chat_service = chat::ChatService::new(db);
    let ctx = Arc::new(ServerContext::new(chat_service));

    let app = Router::new()
        .route(
            "/messages",
            get(handler_find_messages).post(handler_create_message),
        )
        .or(handler_404.into_service())
        .layer(AddExtensionLayer::new(ctx));

    log::info!("Starting server on 0.0.0.0:8080");
    axum::Server::bind(
        &"0.0.0.0:8080"
            .parse()
            .expect("parsing server's bind address"),
    )
    .serve(app.into_make_service())
    .await
    .expect("running server");

    Ok(())
}

長輪詢

最後,我們的第三個技巧:長輪詢是一個簡單的循環 tokio::time::sleep

通過使用 tokio::time::sleep,活動連接在等待時幾乎不會使用任何資源。

如果找到新數據,我們立即返回新數據。否則,我們再等一秒鐘。

10 秒後,我們返回空數據。

main.rs

async fn handler_find_messages(
    Extension(ctx): Extension<Arc<ServerContext>>,
    query_params: Query<FindMessagesQueryParameters>,
) -> Result<Json<Vec<Message>>, Error> {
    let sleep_for = Duration::from_secs(1);

    // long polling: 10 secs
    for _ in 0..10u64 {
        let messages = ctx.chat_service.find_messages(query_params.after).await?;
        if messages.len() != 0 {
            return Ok(messages.into());
        }

        tokio::time::sleep(sleep_for).await;
    }

    // return an empty response
    Ok(Vec::new().into())
}

代碼在 GitHub 上

像往常一樣,你可以在 GitHub 上找到代碼:github.com/skerkour/kerkour.com[4]。

參考資料

[1]

tokio 團隊: https://github.com/tokio-rs

[2]

axum: https://github.com/tokio-rs/axum

[3]

ULID: https://github.com/ulid/spec

[4]

github.com/skerkour/kerkour.com: https://github.com/skerkour/kerkour.com/tree/main/2021/rust_long_polling

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/OJchpQzSGSDRvUTu5R0RsA