use std::io;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::ready;
use std::task::Context;
use std::task::Poll;
use bytes::Bytes;
use futures::Stream;
use futures::StreamExt;
use crate::raw::*;
use crate::*;
pub struct FuturesBytesStream {
stream: BufferStream,
buf: Buffer,
}
unsafe impl Sync for FuturesBytesStream {}
impl FuturesBytesStream {
#[inline]
pub(crate) fn new(ctx: Arc<ReadContext>, range: Range<u64>) -> Self {
let stream = BufferStream::new(ctx, range);
FuturesBytesStream {
stream,
buf: Buffer::new(),
}
}
}
impl Stream for FuturesBytesStream {
type Item = io::Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
if let Some(bs) = Iterator::next(&mut this.buf) {
return Poll::Ready(Some(Ok(bs)));
}
this.buf = match ready!(this.stream.poll_next_unpin(cx)) {
Some(Ok(buf)) => buf,
Some(Err(err)) => return Poll::Ready(Some(Err(format_std_io_error(err)))),
None => return Poll::Ready(None),
};
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use bytes::Bytes;
use futures::TryStreamExt;
use pretty_assertions::assert_eq;
use super::*;
#[test]
fn test_trait() -> Result<()> {
let acc = Operator::via_map(Scheme::Memory, HashMap::default())?.into_inner();
let ctx = Arc::new(ReadContext::new(
acc,
"test".to_string(),
OpRead::new(),
OpReader::new(),
));
let v = FuturesBytesStream::new(ctx, 4..8);
let _: Box<dyn Unpin + MaybeSend + Sync + 'static> = Box::new(v);
Ok(())
}
#[tokio::test]
async fn test_futures_bytes_stream() -> Result<()> {
let op = Operator::via_map(Scheme::Memory, HashMap::default())?;
op.write(
"test",
Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
)
.await?;
let acc = op.into_inner();
let ctx = Arc::new(ReadContext::new(
acc,
"test".to_string(),
OpRead::new(),
OpReader::new(),
));
let s = FuturesBytesStream::new(ctx, 4..8);
let bufs: Vec<Bytes> = s.try_collect().await.unwrap();
assert_eq!(&bufs[0], "o".as_bytes());
assert_eq!(&bufs[1], "Wor".as_bytes());
Ok(())
}
#[tokio::test]
async fn test_futures_bytes_stream_with_concurrent() -> Result<()> {
let op = Operator::via_map(Scheme::Memory, HashMap::default())?;
op.write(
"test",
Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
)
.await?;
let acc = op.into_inner();
let ctx = Arc::new(ReadContext::new(
acc,
"test".to_string(),
OpRead::new(),
OpReader::new().with_concurrent(3).with_chunk(1),
));
let s = FuturesBytesStream::new(ctx, 4..8);
let bufs: Vec<Bytes> = s.try_collect().await.unwrap();
assert_eq!(&bufs[0], "o".as_bytes());
assert_eq!(&bufs[1], "W".as_bytes());
assert_eq!(&bufs[2], "o".as_bytes());
assert_eq!(&bufs[3], "r".as_bytes());
Ok(())
}
}