我做推广找不到我的网站,wordpress 网店,重新设置wordpress,wordpress 去除表格Rust 学习笔记#xff1a;Stream Rust 学习笔记#xff1a;Stream流组合流合并流 Rust 学习笔记#xff1a;Stream
许多概念天然适合用 Stream 表示#xff1a;
队列中逐渐可用的项目文件系统中逐渐拉取的数据块网络中随时间到达的数据
流
消息传递中异步的 recv 方法会… Rust 学习笔记Stream Rust 学习笔记Stream流组合流合并流 Rust 学习笔记Stream
许多概念天然适合用 Stream 表示
队列中逐渐可用的项目文件系统中逐渐拉取的数据块网络中随时间到达的数据
流
消息传递中异步的 recv 方法会随时间产生一系列项目称为流Stream。
迭代器和异步通道接收器之间有两个不同之处。第一个区别是时间迭代器是同步的而通道接收器是异步的。第二个是 API。当直接使用 Iterator 时调用它的同步 next 方法而对于 trpl::Receiver 流我们调用异步 recv 方法。
Streams 类似于一种异步形式的迭代器。迭代器和 Rust 中的流之间的相似性意味着我们实际上可以从任何迭代器创建流。与使用迭代器一样我们可以调用流的 next 方法然后等待输出。
Stream trait 定义了一个底层接口它有效地结合了 Iterator 和 Future trait。StreamExt 在 Stream 之上提供了一组更高级的 API包括 next 方法以及其他类似于 Iterator trait 所提供的实用方法。Stream 和 StreamExt 还不是 Rust 标准库的一部分但大多数生态系统的 crate 使用相同的定义。 Ext 是扩展的缩写是 Rust 社区中用于将一个特性扩展到另一个特性的常用模式。 use trpl::StreamExt;fn main() {trpl::run(async {let values [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];let iter values.iter().map(|n| n * 2);let mut stream trpl::stream_from_iter(iter);while let Some(value) stream.next().await {println!(The value was: {value});}});
}程序从一个数字数组开始将其转换为迭代器然后调用 map 函数对所有值进行翻倍操作。使用 trpl::stream_from_iter 函数将迭代器转换为流。接下来当流中的项到达时使用 while let 循环遍历它们。
程序输出
The value was: 2
The value was: 4
The value was: 6
The value was: 8
The value was: 10
The value was: 12
The value was: 14
The value was: 16
The value was: 18
The value was: 20既然 StreamExt 已经在作用域中我们可以使用它的所有实用方法就像使用迭代器一样。
例如我们使用 filter 方法过滤除 3 和 5 的倍数以外的所有内容。
use trpl::StreamExt;fn main() {trpl::run(async {let values 1..101;let iter values.map(|n| n * 2);let stream trpl::stream_from_iter(iter);let mut filtered stream.filter(|value| value % 3 0 || value % 5 0);while let Some(value) filtered.next().await {println!(The value was: {value});}});
}组合流
因为流是 future我们可以将它们与任何其他类型的 future 一起使用并以有趣的方式组合它们。
让我们首先构建一个小消息流作为我们网络数据流的替代
use trpl::{ReceiverStream, Stream, StreamExt};fn get_messages() - impl StreamItem String {let (tx, rx) trpl::channel();let messages [a, b, c, d, e, f, g, h, i, j];for message in messages {tx.send(format!(Message: {message})).unwrap();}ReceiverStream::new(rx)
}fn main() {trpl::run(async {let mut messages get_messages();while let Some(message) messages.next().await {println!({message});}});
}首先我们创建一个名为 get_messages 的函数它返回 impl StreamItem String。为了实现它我们创建了一个异步通道并通过通道发送一系列字符串。我们还使用了一个新的类型ReceiverStream它通过 next 方法将 rx 接收器从 trpl::channel 转换为流。
回到 main 中我们使用 while let 循环打印流中的所有消息。当我们运行这段代码时我们得到了我们所期望的结果
Message: a
Message: b
Message: c
Message: d
Message: e
Message: f
Message: g
Message: h
Message: i
Message: j这些用常规的 Receiver 或 Iterator API 都能完成。让我们添加一个需要流的特性添加一个适用于流中的每个项的超时以及我们发出的项的延迟。
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};fn get_messages() - impl StreamItem String {let (tx, rx) trpl::channel();trpl::spawn_task(async move {let messages [a, b, c, d, e, f, g, h, i, j];for (index, message) in messages.into_iter().enumerate() {let time_to_sleep if index % 2 0 { 100 } else { 300 };trpl::sleep(Duration::from_millis(time_to_sleep)).await;tx.send(format!(Message: {message})).unwrap();}});ReceiverStream::new(rx)
}fn main() {trpl::run(async {let mut messages pin!(get_messages().timeout(Duration::from_millis(200)));while let Some(result) messages.next().await {match result {Ok(message) println!({message}),Err(reason) eprintln!(Problem: {reason:?}),}}})
}我们首先使用 timeout 方法向流添加一个超时该方法来自 StreamExt trait。然后更新 while let 循环体因为流现在返回一个 Result。Ok 变体表示消息及时到达Err 变体表示在任何消息到达之前超时已经过了。我们匹配该结果并在成功接收消息时打印消息或者打印关于超时的通知。
接着为发送的消息添加一个可变延迟。在 get_messages 函数中我们对偶数索引项应用 100 ms 的延迟对奇索引项应用 300 ms 的延迟因为我们的超时是 200 ms所以这应该会影响一半的消息。
要在 get_messages 函数中的消息之间休眠而不阻塞我们需要使用 async。然而我们不能使 get_messages 本身成为一个异步函数因为那样我们将返回 FutureOutput StreamItem String 而不是 StreamItem String。因此我们将 get_messages 保留为返回流的常规函数并生成一个任务来处理异步 sleep 调用。
请记住在给定的 future 内一切都是线性发生的。并发发生在 future 之间。
调用者必须等待 get_messages 本身来访问流这将要求它在返回接收方流之前发送所有消息包括每个消息之间的睡眠延迟。因此我们设置的超时将是无用的。Stream 本身不会有延迟它们都可能在流可用之前发生。
程序输出
Message: a
Problem: Elapsed(())
Message: b
Message: c
Problem: Elapsed(())
Message: d
Message: e
Problem: Elapsed(())
Message: f
Message: g
Problem: Elapsed(())
Message: h
Message: i
Problem: Elapsed(())
Message: j在接收偶数索引项消息前都出现 Problem: Elapsed(()) 错误。
发送偶数索引项消息后需要休眠 300 msmain 函数中轮询流的间隔是 200 ms。
第一次轮询的 result 匹配到 Err(reason)于是打印错误。
超时并不会阻止消息最终到达。我们仍然获得所有原始消息因为我们的通道是无界的它可以容纳内存中可以容纳的尽可能多的消息。
第二次轮询时消息已经到达result 匹配到 Ok(message)于是打印接收到的消息。
合并流
首先让我们创建另一个流。在无限循环中每隔 1 ms 发送一个数字。对于 async只要在循环的每次迭代中至少有一个等待点就不会阻塞其他任何东西。
fn get_intervals() - impl StreamItem u32 {let (tx, rx) trpl::channel();trpl::spawn_task(async move {let mut count 0;loop {trpl::sleep(Duration::from_millis(1)).await;count 1;tx.send(count).unwrap();}});ReceiverStream::new(rx)
}返回类型将是 impl StreamItem u32。因为所有这些都被封装在由 spawn_task创 建的任务中所以所有这些包括无限循环都将随着运行时一起被清理。
回到 main 函数的 async 块我们可以尝试合并消息和间隔流
fn main() {trpl::run(async {let messages get_messages().timeout(Duration::from_millis(200));let intervals get_intervals();let merged messages.merge(intervals);})
}merge 方法将多个流合并为一个流该流在项可用时立即从任何流生成项而不强加任何特定的顺序。
但是这个 merge 调用不能编译这是因为这两个流具有不同的类型。messages 流的类型为 Timeoutimpl StreamItem String其中 Timeout 是为超时调用实现 Stream 的类型。interval 流的类型为 impl StreamItem u32。要合并这两个流我们需要转换其中一个以匹配另一个。
修改 interval 流
fn main() {trpl::run(async {let messages get_messages().timeout(Duration::from_millis(200));let intervals get_intervals().map(|count| format!(Interval: {count})).timeout(Duration::from_secs(10));let merged messages.merge(intervals);let mut stream pin!(merged);while let Some(result) stream.next().await {match result {Ok(message) println!({message}),Err(reason) eprintln!(Problem: {reason:?}),}}})
}首先我们使用 map 方法将流中的数字转换为字符串。
其次我们需要匹配 messages 流的类型 Timeout…这里我们用 timeout 创建了一个超时。
最后我们需要使流可变以便 while let 循环的 next 调用可以遍历流并将其 pin以便这样做是安全的。
运行程序将会出现两个问题。首先它永远不会停止其次来自英文字母的消息将被隐藏在所有间隔计数器消息的中间。
...
Interval: 329
Interval: 330
Interval: 331
Interval: 332
Interval: 333
Interval: 334
Interval: 335
Interval: 336
Interval: 337
Interval: 338
Message: d
Interval: 339
Interval: 340
Interval: 341
Interval: 342
Interval: 343
Interval: 344
Interval: 345
Interval: 346
...修改程序解决这两个问题
fn main() {trpl::run(async {let messages get_messages().timeout(Duration::from_millis(200));let intervals get_intervals().map(|count| format!(Interval: {count})).throttle(Duration::from_millis(100)).timeout(Duration::from_secs(10));let merged messages.merge(intervals).take(20);let mut stream pin!(merged);while let Some(result) stream.next().await {match result {Ok(message) println!({message}),Err(reason) eprintln!(Problem: {reason:?}),}}})
}首先我们在 intervals 流上使用 throttle 方法。节流是一种限制函数调用速率的方法。在这种情况下限制流轮询的频率为每 100 ms 一次这样 intervals 流就不会压倒 messages 流。
take 方法用于限制从流中接受的项的数量。我们将 take 方法应用于合并的流因为我们想限制最终的输出而不仅仅是一个流或另一个流。
现在当我们运行程序时它在从合并流中取出 20 个项后就停止。
Interval: 1
Message: a
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: b
Interval: 5
Message: c
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: d
Interval: 9
Message: e
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12尽管我们有一个源流可以每毫秒产生一个事件但是节流调用产生了一个新的流它包装了原始流这样原始流就只能以节流速率轮询而不是它自己的“本地”速率。我们并非忽略 intervals 流发送的消息而是从一开始就不会产生这些消息这是 Rust future 在工作中固有的“懒惰”允许我们选择我们的性能特征。
我们还需要处理最后一件事错误对于这两种基于通道的流当通道的另一端关闭时send 调用可能会失败。到目前为止我们通过调用 unwrap 忽略了这种可能性但在一个行为良好的应用程序中我们应该显式处理错误至少通过结束循环这样我们就不会再发送任何消息。
修改两个函数
fn get_messages() - impl StreamItem String {let (tx, rx) trpl::channel();trpl::spawn_task(async move {let messages [a, b, c, d, e, f, g, h, i, j];for (index, message) in messages.into_iter().enumerate() {let time_to_sleep if index % 2 0 { 100 } else { 300 };trpl::sleep(Duration::from_millis(time_to_sleep)).await;if let Err(send_error) tx.send(format!(Message: {message})) {eprintln!(Cannot send message {message}: {send_error});break;}}});ReceiverStream::new(rx)
}fn get_intervals() - impl StreamItem u32 {let (tx, rx) trpl::channel();trpl::spawn_task(async move {let mut count 0;loop {trpl::sleep(Duration::from_millis(1)).await;count 1;if let Err(send_error) tx.send(count) {eprintln!(Could not send interval {count}: {send_error});break;};}});ReceiverStream::new(rx)
}两个函数都使用了一个简单的错误策略打印问题然后跳出循环。 文章转载自: http://www.morning.wrqw.cn.gov.cn.wrqw.cn http://www.morning.xptkl.cn.gov.cn.xptkl.cn http://www.morning.pshpx.cn.gov.cn.pshpx.cn http://www.morning.qpntn.cn.gov.cn.qpntn.cn http://www.morning.rnlx.cn.gov.cn.rnlx.cn http://www.morning.tgyqq.cn.gov.cn.tgyqq.cn http://www.morning.lmjtp.cn.gov.cn.lmjtp.cn http://www.morning.mtsck.cn.gov.cn.mtsck.cn http://www.morning.bnmfq.cn.gov.cn.bnmfq.cn http://www.morning.clbzy.cn.gov.cn.clbzy.cn http://www.morning.grbgn.cn.gov.cn.grbgn.cn http://www.morning.bxdlrcz.cn.gov.cn.bxdlrcz.cn http://www.morning.xckdn.cn.gov.cn.xckdn.cn http://www.morning.tlrxp.cn.gov.cn.tlrxp.cn http://www.morning.mywmb.cn.gov.cn.mywmb.cn http://www.morning.gfpyy.cn.gov.cn.gfpyy.cn http://www.morning.mxhgy.cn.gov.cn.mxhgy.cn http://www.morning.czcbl.cn.gov.cn.czcbl.cn http://www.morning.junyaod.com.gov.cn.junyaod.com http://www.morning.zgqysw.cn.gov.cn.zgqysw.cn http://www.morning.mjxgs.cn.gov.cn.mjxgs.cn http://www.morning.phlrp.cn.gov.cn.phlrp.cn http://www.morning.lbhck.cn.gov.cn.lbhck.cn http://www.morning.mrncd.cn.gov.cn.mrncd.cn http://www.morning.jkdtz.cn.gov.cn.jkdtz.cn http://www.morning.sfphz.cn.gov.cn.sfphz.cn http://www.morning.jgnst.cn.gov.cn.jgnst.cn http://www.morning.ynlbj.cn.gov.cn.ynlbj.cn http://www.morning.rrcrs.cn.gov.cn.rrcrs.cn http://www.morning.cjcry.cn.gov.cn.cjcry.cn http://www.morning.nqwkn.cn.gov.cn.nqwkn.cn http://www.morning.dkbsq.cn.gov.cn.dkbsq.cn http://www.morning.wdprz.cn.gov.cn.wdprz.cn http://www.morning.tlpgp.cn.gov.cn.tlpgp.cn http://www.morning.mslsn.cn.gov.cn.mslsn.cn http://www.morning.zqsnj.cn.gov.cn.zqsnj.cn http://www.morning.tfzjl.cn.gov.cn.tfzjl.cn http://www.morning.lnrr.cn.gov.cn.lnrr.cn http://www.morning.ldqrd.cn.gov.cn.ldqrd.cn http://www.morning.frfpx.cn.gov.cn.frfpx.cn http://www.morning.yqmmh.cn.gov.cn.yqmmh.cn http://www.morning.zxhhy.cn.gov.cn.zxhhy.cn http://www.morning.kflpf.cn.gov.cn.kflpf.cn http://www.morning.ejknty.cn.gov.cn.ejknty.cn http://www.morning.ygztf.cn.gov.cn.ygztf.cn http://www.morning.bsrcr.cn.gov.cn.bsrcr.cn http://www.morning.kjyhh.cn.gov.cn.kjyhh.cn http://www.morning.qineryuyin.com.gov.cn.qineryuyin.com http://www.morning.kysport1102.cn.gov.cn.kysport1102.cn http://www.morning.gwgjl.cn.gov.cn.gwgjl.cn http://www.morning.huihuangwh.cn.gov.cn.huihuangwh.cn http://www.morning.mprpx.cn.gov.cn.mprpx.cn http://www.morning.mhmdx.cn.gov.cn.mhmdx.cn http://www.morning.mqgqf.cn.gov.cn.mqgqf.cn http://www.morning.cpmwg.cn.gov.cn.cpmwg.cn http://www.morning.srgnd.cn.gov.cn.srgnd.cn http://www.morning.fywqr.cn.gov.cn.fywqr.cn http://www.morning.ntffl.cn.gov.cn.ntffl.cn http://www.morning.sblgt.cn.gov.cn.sblgt.cn http://www.morning.fhjnh.cn.gov.cn.fhjnh.cn http://www.morning.kfldw.cn.gov.cn.kfldw.cn http://www.morning.nrydm.cn.gov.cn.nrydm.cn http://www.morning.tmfm.cn.gov.cn.tmfm.cn http://www.morning.gyjld.cn.gov.cn.gyjld.cn http://www.morning.knrgb.cn.gov.cn.knrgb.cn http://www.morning.sqxr.cn.gov.cn.sqxr.cn http://www.morning.tpchy.cn.gov.cn.tpchy.cn http://www.morning.cwrnr.cn.gov.cn.cwrnr.cn http://www.morning.cjxqx.cn.gov.cn.cjxqx.cn http://www.morning.ztfzm.cn.gov.cn.ztfzm.cn http://www.morning.wbxtx.cn.gov.cn.wbxtx.cn http://www.morning.prfrb.cn.gov.cn.prfrb.cn http://www.morning.blxlf.cn.gov.cn.blxlf.cn http://www.morning.sqfnx.cn.gov.cn.sqfnx.cn http://www.morning.pftjj.cn.gov.cn.pftjj.cn http://www.morning.ymjrg.cn.gov.cn.ymjrg.cn http://www.morning.kpzrf.cn.gov.cn.kpzrf.cn http://www.morning.ntyanze.com.gov.cn.ntyanze.com http://www.morning.qbjrf.cn.gov.cn.qbjrf.cn http://www.morning.nzsdr.cn.gov.cn.nzsdr.cn