Skip to content

Commit ea9b661

Browse files
committed
Show TCP read
1 parent 0982257 commit ea9b661

File tree

4 files changed

+88
-17
lines changed

4 files changed

+88
-17
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# Unreleased
22

3+
- Show TCP read bytes instead of body size
4+
35
# 0.5.5 (2022-09-19)
46

57
- Add colors to the tui view #64

src/client.rs

+16-17
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
use futures::future::FutureExt;
22
use futures::StreamExt;
33
use rand::prelude::*;
4+
use std::sync::atomic::{AtomicUsize, Ordering};
45
use std::sync::Arc;
56
use thiserror::Error;
67

8+
use crate::tcp_stream::CustomTcpStream;
79
use crate::ConnectToEntry;
810

911
#[derive(Debug, Clone)]
@@ -128,6 +130,7 @@ impl ClientBuilder {
128130
rng: rand::rngs::StdRng::from_entropy(),
129131
},
130132
client: None,
133+
read_bytes_counter: Arc::new(AtomicUsize::new(0)),
131134
timeout: self.timeout,
132135
http_version: self.http_version,
133136
redirect_limit: self.redirect_limit,
@@ -195,6 +198,7 @@ pub struct Client {
195198
body: Option<&'static [u8]>,
196199
dns: DNS,
197200
client: Option<hyper::client::conn::SendRequest<hyper::Body>>,
201+
read_bytes_counter: Arc<AtomicUsize>,
198202
timeout: Option<std::time::Duration>,
199203
redirect_limit: usize,
200204
disable_keepalive: bool,
@@ -211,6 +215,7 @@ impl Client {
211215
} else {
212216
let stream = tokio::net::TcpStream::connect(addr).await?;
213217
stream.set_nodelay(true)?;
218+
let stream = CustomTcpStream::new(stream, self.read_bytes_counter.clone());
214219
// stream.set_keepalive(std::time::Duration::from_secs(1).into())?;
215220
let (send, conn) = hyper::client::conn::handshake(stream).await?;
216221
tokio::spawn(conn);
@@ -225,6 +230,7 @@ impl Client {
225230
) -> Result<hyper::client::conn::SendRequest<hyper::Body>, ClientError> {
226231
let stream = tokio::net::TcpStream::connect(addr).await?;
227232
stream.set_nodelay(true)?;
233+
let stream = CustomTcpStream::new(stream, self.read_bytes_counter.clone());
228234

229235
let connector = if self.insecure {
230236
native_tls::TlsConnector::builder()
@@ -251,6 +257,7 @@ impl Client {
251257
) -> Result<hyper::client::conn::SendRequest<hyper::Body>, ClientError> {
252258
let stream = tokio::net::TcpStream::connect(addr).await?;
253259
stream.set_nodelay(true)?;
260+
let stream = CustomTcpStream::new(stream, self.read_bytes_counter.clone());
254261

255262
let mut root_cert_store = rustls::RootCertStore::empty();
256263
for cert in rustls_native_certs::load_native_certs()? {
@@ -331,19 +338,17 @@ impl Client {
331338
connection_time = Some(ConnectionTime { dns_lookup, dialup });
332339
}
333340
let request = self.request(&self.url)?;
341+
self.read_bytes_counter.store(0, Ordering::Relaxed);
334342
match send_request.send_request(request).await {
335343
Ok(res) => {
336344
let (parts, mut stream) = res.into_parts();
337345
let mut status = parts.status;
338346

339-
let mut len_sum = 0;
340-
while let Some(chunk) = stream.next().await {
341-
len_sum += chunk?.len();
342-
}
347+
while stream.next().await.is_some() {}
343348

344349
if self.redirect_limit != 0 {
345350
if let Some(location) = parts.headers.get("Location") {
346-
let (send_request_redirect, new_status, len) = self
351+
let (send_request_redirect, new_status) = self
347352
.redirect(
348353
send_request,
349354
&self.url.clone(),
@@ -354,7 +359,6 @@ impl Client {
354359

355360
send_request = send_request_redirect;
356361
status = new_status;
357-
len_sum = len;
358362
}
359363
}
360364

@@ -364,7 +368,7 @@ impl Client {
364368
start,
365369
end,
366370
status,
367-
len_bytes: len_sum,
371+
len_bytes: self.read_bytes_counter.load(Ordering::Relaxed),
368372
connection_time,
369373
};
370374

@@ -404,7 +408,6 @@ impl Client {
404408
(
405409
hyper::client::conn::SendRequest<hyper::Body>,
406410
http::StatusCode,
407-
usize,
408411
),
409412
ClientError,
410413
>,
@@ -451,28 +454,25 @@ impl Client {
451454
)?,
452455
);
453456
}
457+
self.read_bytes_counter.store(0, Ordering::Relaxed);
454458
let res = send_request.send_request(request).await?;
455459
let (parts, mut stream) = res.into_parts();
456460
let mut status = parts.status;
457461

458-
let mut len_sum = 0;
459-
while let Some(chunk) = stream.next().await {
460-
len_sum += chunk?.len();
461-
}
462+
while stream.next().await.is_some() {}
462463

463464
if let Some(location) = parts.headers.get("Location") {
464-
let (send_request_redirect, new_status, len) = self
465+
let (send_request_redirect, new_status) = self
465466
.redirect(send_request, &url, location, limit - 1)
466467
.await?;
467468
send_request = send_request_redirect;
468469
status = new_status;
469-
len_sum = len;
470470
}
471471

472472
if let Some(send_request_base) = send_request_base {
473-
Ok((send_request_base, status, len_sum))
473+
Ok((send_request_base, status))
474474
} else {
475-
Ok((send_request, status, len_sum))
475+
Ok((send_request, status))
476476
}
477477
}
478478
.boxed()
@@ -546,7 +546,6 @@ pub async fn work(
546546
n_tasks: usize,
547547
n_workers: usize,
548548
) {
549-
use std::sync::atomic::{AtomicUsize, Ordering};
550549
let counter = Arc::new(AtomicUsize::new(0));
551550

552551
let futures = (0..n_workers)

src/main.rs

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ mod client;
1111
mod histogram;
1212
mod monitor;
1313
mod printer;
14+
mod tcp_stream;
1415
mod timescale;
1516

1617
use client::{ClientError, RequestResult};

src/tcp_stream.rs

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Ported from https://github.com/lnx-search/rewrk/pull/6
2+
use std::pin::Pin;
3+
use std::sync::atomic::{AtomicUsize, Ordering};
4+
use std::sync::Arc;
5+
use std::task::{Context, Poll};
6+
use tokio::io::ReadBuf;
7+
use tokio::io::{AsyncRead, AsyncWrite};
8+
use tokio::net::TcpStream;
9+
10+
use std::io::{IoSlice, Result};
11+
12+
pub struct CustomTcpStream {
13+
inner: TcpStream,
14+
counter: Arc<AtomicUsize>,
15+
}
16+
17+
impl CustomTcpStream {
18+
pub fn new(stream: TcpStream, counter: Arc<AtomicUsize>) -> Self {
19+
Self {
20+
inner: stream,
21+
counter,
22+
}
23+
}
24+
}
25+
26+
impl AsyncRead for CustomTcpStream {
27+
fn poll_read(
28+
mut self: Pin<&mut Self>,
29+
cx: &mut Context<'_>,
30+
buf: &mut ReadBuf<'_>,
31+
) -> Poll<Result<()>> {
32+
let result = Pin::new(&mut self.inner).poll_read(cx, buf);
33+
34+
self.counter
35+
.fetch_add(buf.filled().len(), Ordering::Relaxed);
36+
37+
result
38+
}
39+
}
40+
41+
impl AsyncWrite for CustomTcpStream {
42+
fn poll_write(
43+
mut self: Pin<&mut Self>,
44+
cx: &mut Context<'_>,
45+
buf: &[u8],
46+
) -> Poll<Result<usize>> {
47+
Pin::new(&mut self.inner).poll_write(cx, buf)
48+
}
49+
50+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
51+
Pin::new(&mut self.inner).poll_flush(cx)
52+
}
53+
54+
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
55+
Pin::new(&mut self.inner).poll_shutdown(cx)
56+
}
57+
58+
fn poll_write_vectored(
59+
mut self: Pin<&mut Self>,
60+
cx: &mut Context<'_>,
61+
bufs: &[IoSlice<'_>],
62+
) -> Poll<Result<usize>> {
63+
Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
64+
}
65+
66+
fn is_write_vectored(&self) -> bool {
67+
self.inner.is_write_vectored()
68+
}
69+
}

0 commit comments

Comments
 (0)