一個常見的說法是,線程可以做到 async/await
所能做的一切,且更簡單。那么,為什么大家選擇 async/await
呢?
Rust 是一種低級語言,它不會隱藏協程的復雜性。這與像 Go 這樣的語言相反,在 Go 中,異步是默認發生的,程序員甚至不需要考慮它。
聰明的程序員試圖避免復雜性。因此,他們看到 async/await
中的額外復雜性,并質疑為什么需要它。當考慮到存在一個合理的替代方案——操作系統線程時,這個問題尤其相關。
讓我們通過 async
來進行一次思維之旅吧。
背景概覽
通常,代碼是線性的;一件事情在另一件事情之后運行。它看起來像這樣:
fn main() {
foo();
bar();
baz();
}
很簡單,對吧?然而,有時你會想同時運行很多事情。這方面的典型例子是 web 服務器。考慮以下用線性代碼編寫的:
fn main() -> io::Result<()> {
let socket = TcpListener::bind("0.0.0.0:80")?;
loop {
let (client, _) = socket.accept()?;
handle_client(client)?;
}
}
想象一下,如果 handle_client
需要幾毫秒,并且兩個客戶端同時嘗試連接到你的 web 服務器。你會遇到一個嚴重的問題!
客戶端 #1 連接到 web 服務器,并被 accept()
函數接受。它開始運行 handle_client()
。
客戶端 #2 連接到 web 服務器。然而,由于 accept()
當前沒有運行,我們必須等待 handle_client()
完成客戶端 #1 的運行。
幾毫秒后,我們回到 accept()
。客戶端 #2 可以連接。
現在想象一下,如果有兩百萬個同時客戶端。在隊列的末尾,你必須等待幾分鐘,web 服務器才能幫助你。它很快就會變得不可擴展。
顯然,初期的 web 試圖解決這個問題。最初的解決方案是引入線程。通過將一些寄存器的值和程序的棧保存到內存中,操作系統可以停止一個程序,用另一個程序替換它,然后再后繼續運行那個程序。本質上,它允許多個例程(或“線程”,或“進程”)在同一個 CPU 上運行。
使用線程,我們可以將上述代碼重寫如下:
fn main() -> io::Result<()> {
let socket = TcpListener::bind("0.0.0.0:80")?;
loop {
let (client, _) = socket.accept()?;
thread::spawn(move || handle_client(client));
}
}
現在,客戶端由一個與處理新連接等待不同的線程處理。太棒了!通過允許并發線程訪問,這避免了問題。
客戶端 #1 被服務器接受。服務器生成一個調用 handle_client
的線程。
最終,handle_client
在某處阻塞。操作系統保存處理客戶端 #1 的線程,并將主線程帶回來。
主線程接受客戶端 #2。它生成一個單獨的線程來處理客戶端 #2。在只有幾微秒的延遲后,客戶端 #1 和客戶端 #2 并行運行。
線程在考慮到生產級 web 服務器擁有幾十個 CPU 核心時特別好用。不僅僅是操作系統可以給人一種所有這些線程同時運行的錯覺;實際上,操作系統可以讓它們真正同時運行。
最終,出于我稍后將詳細說明的原因,程序員希望將這種并發性從操作系統空間帶到用戶空間。用戶空間并發性有許多不同的模型。有事件驅動編程、actor
和協程。Rust 選擇的是 async/await
。
簡單來說,你將程序編譯成一個狀態機的集合,這些狀態機可以獨立于彼此運行。Rust 本身提供了一種創建狀態機的機制;async
和 await
的機制。使用 smol
編寫的上述程序將如下所示:
#[apply(smol_macros::main!)]
async fn main(ex: &smol::Executor) -> io::Result<()> {
let socket = TcpListener::bind("0.0.0.0:80").await?;
loop {
let (client, _) = socket.accept().await?;
ex.spawn(async move {
handle_client(client).await;
}).detach();
}
}
主函數前面有 async
關鍵字。這意味著它不是一個傳統函數,而是一個返回狀態機的函數。大致上,函數的內容對應于該狀態機。
await
包括另一個狀態機作為當前運行狀態機的一部分。對于 accept()
,這意味著狀態機將把它作為一個步驟包含在內。
最終,一個內部函數將會產生結果,或者放棄控制。例如,當 accept()
等待新連接時。在這一點上,整個狀態機將把執行權交給更高級別的執行器。對我們來說,那是 smol::Executor
。
一旦執行被產生,執行器將用另一個正在并發運行的狀態機替換當前狀態機,該狀態機是通過 spawn
函數生成的。
我們將一個異步塊傳遞給 spawn
函數。這個塊代表一個完全新的狀態機,獨立于由 main
函數創建的狀態機。這個狀態機所做的一切都是運行 handle_client
函數。
一旦 main
產生結果,就選擇一個客戶端來代替它運行。一旦那個客戶端產生結果,循環就會重復。
你現在可以處理數百萬的并發客戶端。
當然,像這樣的用戶空間并發性引入了復雜性的提升。當你使用線程時,你不必處理執行器、任務和狀態機等。
如果你是一個理智的人,你可能會問:“我們為什么需要做所有這些事情?線程工作得很好;對于 99% 的程序,我們不需要涉及任何用戶空間并發性。引入新復雜性是技術債務,技術債務會花費我們的時間和金錢。”
“那么,我們為什么不使用線程呢?”
超時問題
也許 Rust 最大的優勢之一是可組合性。它提供了一組可以嵌套、構建、組合和擴展的抽象。
我記得讓我堅持使用 Rust 的是 Iterator trait
。它可以讓我將某個東西變成 Iterator
,應用一些不同的組合器,然后將結果 Iterator
傳遞給任何接受 Iterator
的函數,這讓我大開眼界。
它繼續給我留下深刻印象。假設你想從另一個線程接收一列表整數,只取那些立即可用的整數,丟棄任何不是偶數的整數,給它們全部加一,然后將它們推到一個新列表上。
在某些其他語言中,這將是五十行代碼和一個輔助函數。在 Rust 中,可以用五行完成:
let (send, recv) = mpsc::channel();
my_list.extend(
recv.try_iter()
.filter(|x| x & 1 == 0)
.map(|x| x + 1)
);
async/await
最好的事情是,它允許你將這種可組合性應用于 I/O 限制函數。假設你有一個新的客戶端要求;你想在上面的函數中添加一個超時。假設我們的 handle_client
函數看起來像這樣:
async fn handle_client(client: TcpStream) -> io::Result<()> {
let mut data = vec![];
client.read_to_end(&mut data).await?;
let response = do_something_with_data(data).await?;
client.write_all(&response).await?;
Ok(())
}
如果我們想添加一個三秒鐘的超時,我們可以組合兩個組合器來做到這一點:
race
函數同時運行兩個 future
。
Timer future
等待一段時間后返回。
最終的代碼看起來像這樣:
async fn handle_client(client: TcpStream) -> io::Result<()> {
// 處理實際連接的 future
let driver = async move {
let mut data = vec![];
client.read_to_end(&mut data).await?;
let response = do_something_with_data(data).await?;
client.write_all(&response).await?;
Ok(())
};
// 處理等待超時的 future
let timeout = async {
Timer::after(Duration::from_secs(3)).await;
// 我們剛剛超時了!返回一個錯誤。
Err(io::ErrorKind::TimedOut.into())
};
// 并行運行兩者
driver.race(timeout).await
}
我發現這是一個非常簡單的過程。你所要做的就是將你的現有代碼包裝在一個異步塊中,然后將其與另一個 future 競速。
這種方法的額外好處是,它適用于任何類型的流。在這里,我們使用 TcpStream
。然而,我們可以很容易地將其替換為任何實現 impl AsyncRead + AsyncWrite
的東西。async
可以輕松地適應你需要的任何模式。
用線程實現
如果我們想在我們的線程示例中實現這一點呢?
fn handle_client(client: TcpStream) -> io::Result<()> {
let mut data = vec![];
client.read_to_end(&mut data)?;
let response = do_something_with_data(data)?;
client.write_all(&response)?;
Ok(())
}
這并不容易。通常,你不能在阻塞代碼中中斷 read
或 write
系統調用,除非做一些災難性的事情,比如關閉文件描述符(在 Rust 中無法做到)。
幸運的是,TcpStream
有兩個函數 set_read_timeout
和 set_write_timeout
,可以用來分別設置讀寫超時。然而,我們不能天真地使用它。想象一個客戶端每 2.9 秒發送一個字節,只是為了重置超時。
所以我們需要在這里稍微防御性地編程。由于 Rust 組合器的強大,我們可以編寫自己的類型,包裝 TcpStream
來編程超時。
// `TcpStream` 的截止日期感知包裝器
struct DeadlineStream {
tcp: TcpStream,
deadline: Instant,
}
impl DeadlineStream {
// 創建一個新的 `DeadlineStream`,經過一段時間后過期
fn new(tcp: TcpStream, timeout: Duration) -> Self {
Self {
tcp,
deadline: Instant::now() + timeout,
}
}
}
impl io::Read for DeadlineStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// 設置截止日期
let time_left = self.deadline.saturating_duration_since(Instant::now());
self.tcp.set_read_timeout(Some(time_left))?;
// 從流中讀取
self.tcp.read(buf)
}
}
impl io::Write for DeadlineStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// 設置截止日期
let time_left = self.deadline.saturating_duration_since(Instant::now());
self.tcp.set_write_timeout(Some(time_left))?;
// 從流中讀取
self.tcp.write(buf)
}
}
// 創建包裝器
let client = DeadlineStream::new(client, Duration::from_secs(3));
let mut data = vec![];
client.read_to_end(&mut data)?;
let response = do_something_with_data(data)?;
client.write_all(&response)?;
Ok(())
一方面,可以認為這是優雅的。我們使用 Rust 的能力用一個相對簡單的組合器解決了問題。我相信它會運行得很好。
另一方面,這絕對是 hacky。
我們鎖定了自己使用 TcpStream
。Rust 中沒有特質來抽象使用 set_read_timeout
和 set_write_timeout
類型。所以如果要使用任何類型的寫入器,需要額外的工作。
這涉及到設置超時的額外系統調用。
我認為這種類型對于 web 服務器要求的實際邏輯來說,使用起來要笨重得多。
異步成功案例
這就是為什么 HTTP 生態系統采用 async/await
作為其主要運行機制的原因,即使是客戶端也是如此。你可以取任何進行 HTTP 調用的函數,并使其適應你想要的任何用例。
tower
可能是我能想到的這種現象最好的例子,這也是讓我意識到 async/await
可以有多強大的東西。如果你將你的服務實現為一個異步函數,你會得到超時、速率限制、負載均衡、對沖和背壓處理。所有這些都是無負擔實現的。
不管你使用的是什么運行時,或者你的服務實際上在做什么。你可以將它扔給 tower
,使其更加健壯。
macroquad
是一個小型 Rust 游戲引擎,旨在使游戲開發盡可能簡單。它的主函數使用 async/await
來運行其引擎。這是因為 async/await
確實是在 Rust 中表達需要停下來等待其他事情的線性函數的最佳方式。
在實踐中,這可能非常強大。想象一下,同時輪詢你的游戲服務器和你的 GUI 框架的網絡連接,在同一線程上。可能性是無限的。
提升異步的形象
我認為問題不在于有人認為線程比異步更好。我認為問題是異步的好處沒有被廣泛傳播。這導致一些人對異步的好處有誤解。
如果這是一個教育問題,我認為值得看一下教育材料。這是 Rust Async Book 在比較 async/await
和操作系統線程時所說的:
操作系統線程不需要對編程模型做任何改變,這使得并發表達非常容易。然而,線程間的同步可能會很困難,性能開銷也很大。線程池可以緩解這些成本,但不足以支持大規模的 I/O 密集型工作負載。
—— Rust Async Book
我認為這是整個異步社區的一個一貫問題。當有人問“為什么我們想用這個而不是操作系統線程”時,人們傾向于揮揮手說“異步開銷更小。除此之外,其他都一樣。”
這就是 web 服務器作者轉向 async/await
的原因。這就是他們如何解決 C10k
問題的。但這不會是其他人轉向 async/await 的原因。
c10k 問題:https://en.wikipedia.org/wiki/C10k_problem
性能提升是不穩定的,可能會在錯誤的情況下消失。有很多情況下,線程工作流程可以比等效的異步工作流程更快(主要是在 CPU 密集型任務的情況下)。可能以前我們過分強調了異步 Rust 的短暫性能優勢,但低估了它的語義優勢。
在最壞的情況下,這會導致人們對 async/await
置之不理,認為它是“你為小眾用例而求助的奇怪事物”。它應該被視為一個強大的編程模型,讓你能夠簡潔地表達在同步 Rust 中無法表達的模式,而不需要數十個線程和通道。
有一種趨勢是試圖使異步 Rust “就像同步 Rust 一樣”,這種方式鼓勵了負面比較。當我說到“趨勢”時,我的意思是這是 Rust 項目的明確路線圖,即“編寫異步 Rust 代碼應該像編寫同步代碼一樣容易,除了偶爾的 async 和 await 關鍵字。”
我拒絕這種框架,因為它根本不可能。這就像試圖在一個滑雪坡上舉辦披薩派對。我們不應該試圖將我們的模型強行塞入不友好的慣用法,以迎合拒絕采用另一種模式的程序員。我們應該努力突出 Rust 的 async/await
生態系統的優勢;它的可組合性和它的能力。我們應該努力使 async/await
成為程序員達到并發性時的默認選擇。我們不應該試圖使同步 Rust 和異步 Rust 相同,我們應該接受差異。
該文章在 2024/4/28 21:30:25 編輯過