使用 Pingora 構建反向代理

# 使用 Pingora 構建反向代理 | Rust

Carlos Armando Marcano Vargas  
2024年11月

![Photo by Jezael Melgoza on Unsplash](https://unsplash.com/photos/...)

本教程將重點介紹如何使用 Pingora 庫創建一個負載均衡器。如果你已經閱讀過 Pingora 的文檔,那麼本文對你來說可能沒有新內容,除了我們將從文檔中的示例中添加一個速率限制器。需要明確的是,本文僅用於學習目的,我自己也在學習這個主題。本文不會提供一個可用於生產環境的反向代理。

## 環境要求

- 已安裝 Rust
- 對於 Windows 用戶,已安裝 WSL

## 什麼是反向代理?

根據 Salman Ravoof 撰寫的文章《如何設置反向代理(Nginx 和 Apache 的逐步指南)》,反向代理是位於 Web 服務器前面的服務器,它在請求到達源服務器之前接收所有請求。它的工作方式類似於正向代理,但在這種情況下,是 Web 服務器使用代理而不是用戶或客戶端。反向代理通常用於增強 Web 服務器的性能、安全性和可靠性。

Pingora 是一個用於構建快速、可靠和可編程網絡系統的 Rust 框架。Pingora 已經過實戰測試,處理超過 4000 萬次每秒的互聯網請求已有數年之久。

## 創建項目

我們首先使用 cargo 創建一個新的 Rust 項目:

```bash
cargo new loadbalancer

然後,在cargo.toml 文件中添加以下依賴項:

async-trait = "0.1" pingora = { version = "0.3", features = ["lb"] } pingora-core = "0.3" pingora-load-balancing = "0.3" pingora-proxy = "0.3"

現在,通過在main.rs 文件中添加以下代碼來創建一個 Pingora 服務器:

use async_trait::async_trait;
use pingora::prelude::*;
use std::sync::Arc;

fn main() {
    let mut my_server = Server::new(None).unwrap();
    my_server.bootstrap();
    my_server.run_forever();
}

根據 Pingora 文檔指南,Pingora 服務器是一個可以託管一個或多個服務的進程。Pingora 服務器負責配置和 CLI 參數解析、守護進程化、信號處理以及優雅的重啓或關閉。在上面的代碼片段中,我們在main() 函數中初始化服務器,並使用run_forever() 函數來生成所有運行時線程,並阻塞主線程直到服務器需要退出。根據文檔,run_forever() 可能會進行進程的守護進程化,因此在調用此函數之前創建的任何附加線程將丟失給任何服務邏輯。

創建負載均衡代理

use async_trait::async_trait;
use pingora::prelude::*;
use std::sync::Arc;
use pingora_load_balancing::{selection::RoundRobin, LoadBalancer};

pub struct LB(Arc<LoadBalancer<RoundRobin>>);

fn main() {
    let mut my_server = Server::new(None).unwrap();
    my_server.bootstrap();
    my_server.run_forever();
}

在上面的代碼中,我們導入了pingora_load_balancing crate,它提供了LoadBalancer 結構體和選擇算法。我們爲我們的負載均衡器使用了輪詢算法。

然後,我們創建了一個結構體LB,它使用Arc 類型封裝了使用輪詢策略的LoadBalancer

爲了使我們的服務器代理化,我們必須實現ProxyHttp 特性。

use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_proxy::{ProxyHttp, Session};

pub struct LB(Arc<LoadBalancer<RoundRobin>>);

#[async_trait]
impl ProxyHttp for LB {
    type CTX = ();
    fn new_ctx(&self) -> () {
        ()
    }
    async fn uptream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> {
        let upstream = self.0
            .select(b"", 256)
            .unwrap();
        println!("upstream peer is: {upstream:?}");
        let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
        Ok(peer)
    }
}

任何實現ProxyHttp 特性的對象都定義了請求在代理中的處理方式。ProxyHttp 特性中唯一需要的方法是upstream_peer(),它返回請求應被代理到的地址。

upstream_peer() 的方法體中,我們使用LoadBalancerselect() 方法在上游 IP 之間進行輪詢。在這個例子中,我們使用 HTTPS 連接到後端,因此在構建我們的對象時,我們還需要指定use_tls 並設置 SNI。

async fn upstream_request_filter(
    &self,
    _session: &mut Session,
    upstream_request: &mut RequestHeader,
    _ctx: &mut Self::CTX,
) -> Result<()> {
    upstream_request.insert_header("Host", "one.one.one.one").unwrap();
    Ok(())
}

爲了讓 1.1.1.1 後端接受我們的請求,必須存在一個主機頭。可以通過upstream_request_filter() 回調添加此頭,該回調在連接到後端後並在請求頭髮送之前修改請求頭。

現在,根據 Pingora 指南,我們必須創建一個遵循負載均衡器指令的代理服務。

fn main() {
    let mut my_server = Server::new(None).unwrap();
    my_server.bootstrap();
    let upstream =
        LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();
    let mut lb = http_proxy_service(&my_server.configuration, LB(Arc::new(upstream)));
    lb.add_tcp("0.0.0.0:6188");
    my_server.add_service(lb);
    my_server.run_forever();
}

Pingora 服務監聽一個或多個(TCP 或 Unix 域套接字)端點。當建立新連接時,服務將連接交給其 “應用程序”。pingora-proxy 就是這樣一個應用程序,它將 HTTP 請求代理到上面配置的給定後端。

在下面的示例中,我們創建了一個包含兩個後端1.1.1.1:4431.0.0.1:443LB 實例。我們通過http_proxy_service() 調用將該LB 實例放入代理服務中,然後告訴我們的服務器託管該代理服務。

現在,我們運行命令cargo run 啓動我們的代理。然後,我們向代理發送請求以查看負載均衡器是否正常工作。

要測試代理,我們運行以下命令:

curl 127.0.0.1:6188 -svo /dev/null

我將運行上述命令三次。我們應該在代理的命令行中看到以下消息:

添加健康檢查

首先,我們需要在main.rs 文件中導入background_servicetime::Duration 包。

use pingora_core::services::background::background_service;
use std::{sync::Arc, time::Duration};

現在,讓我們開發健康檢查功能。

fn main() {
    let mut my_server = Server::new(None).unwrap();
    my_server.bootstrap();
    let mut upstreams =
        LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443", "127.0.0.1:343"]).unwrap();
    let hc = TcpHealthCheck::new();
    upstreams.set_health_check(hc);
    upstreams.health_check_frequency = Some(std::time::Duration::from_secs(1));
    let background = background_service("health check", upstreams);
    let upstreams = background.task();
    let mut lb = http_proxy_service(&my_server.configuration, LB(upstreams));
    lb.add_tcp("0.0.0.0:6188");
    my_server.add_service(background);
    my_server.add_service(lb);
    my_server.run_forever();
}

通過健康檢查,我們確保負載均衡器將請求代理到正在運行的後端。否則,當服務嘗試將連接代理到127.0.0.1:343 後端時,我們會收到 502: Bad Gateway 狀態碼。

添加速率限制器

根據 CloudFlare 網站上發佈的文章《什麼是速率限制?| 速率限制和機器人》,速率限制是一種限制網絡流量的策略。它限制某人在特定時間內重複某個操作的頻率,例如嘗試登錄一個賬戶。速率限制可以幫助阻止某些類型的惡意機器人活動,也可以減輕 Web 服務器的壓力。

Pingora 提供了pingora-limits crate,允許我們爲我們的反向代理創建一個速率限制器。

我們需要將以下依賴項添加到我們的項目中:

pingora-limits = "0.3.0" once_cell = "1.19.0"

現在,開始創建我們的速率限制器。首先,我們導入RateLazy。然後爲LB 結構體實現get_request_appid() 方法。

use once_cell::sync::Lazy;
use pingora_limits::rate::Rate;

pub struct LB(Arc<LoadBalancer<RoundRobin>>);

impl LB {
    pub fn get_request_appid(&self, session: &mut Session) -> Option<Option<String>> {
        match session
            .req_header()
            .headers
            .get("appid")
            .map(|v| v.to_string())
        {
            None => None,
            Some(v) => match v {
                Ok(v) => Some(v.to_string()),
                Err(_) => None,
            },
        }
    }
}

static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1)));
static MAX_REQ_PER_SEC: isize = 1;

pingora-limits crate 中的Rate 結構體是一個穩定的速率估算器,它報告過去時間間隔內事件的速率。根據文檔,它返回在interval * 2interval 之間的平均速率,同時收集在interval 和現在之間發生的事件。

文檔還指出,估算器會忽略少於每個時間間隔發生一次的事件。

Lazy 用於在第一次訪問時初始化值。此類型是線程安全的,可以在靜態變量中使用。

接下來,ProxyHttp 特性具有request_filter 方法,我們必須重寫它以實現速率限制器。

根據文檔,request_filter 方法負責處理傳入請求。它還說明了以下內容:

在此階段,用戶可以解析、驗證、速率限制、執行訪問控制和 / 或爲此請求返回響應。如果用戶已經爲此請求發送了響應,則應返回Ok(true) 以便代理退出。當返回Ok(false) 時,代理繼續到下一個階段。

默認情況下,此過濾器不執行任何操作並返回Ok(false)

async fn request_filter(
    &self,
    session: &mut Session,
    _ctx: &mut Self::CTX
) -> Result<bool>
where
    Self::CTX: Send + Sync,
{
    let appid = match self.get_request_appid(session) {
        None => return Ok(false),
        Some(addr) => addr,
    };
    let curr_window_request = RATE_LIMITER.observe(&appid, 1);
    if curr_window_request > MAX_REQ_PER_SEC {
        let mut header = ResponseHeader::build(429, None).unwrap();
        header
            .insert_header("X-Rate-Limit-Limit", MAX_REQ_PER_SEC.to_string())
            .unwrap();
        header.insert_header("X-Rate-Limit-Remaining", "0").unwrap();
        header.insert_header("X-Rate-Limit-Reset", "1").unwrap();
        session.set_keepalive(None);
        session
            .write_response_header(Box::new(header), true)
            .await?;
        return Ok(true);
    }
    Ok(false)
}

讓我們通過運行以下命令來測試速率限制器:

curl localhost:6188 -H "appid:1" -vi

我們應該在命令行中看到以下消息:

結論

這是我第一次構建反向代理。我知道在這個主題上還有很多需要了解的內容。我必須說 Pingora 的文檔非常好。讓我喜歡的是,它的文檔中有一些例子,對於沒有反向代理構建經驗的人來說很容易跟隨。

感謝您花時間閱讀這篇文章。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/v5A2QweeFMRhawb1ZrNMRA