Skip to content

mirrord-protocol update (part of passthrough mirroring prep) #3302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Apr 29, 2025
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.d/+incoming-protocol-extended.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Adjusted mirrord-protocol in preparation for passthrough mirroring.
10 changes: 5 additions & 5 deletions mirrord/agent/src/sniffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ mod test {

use api::TcpSnifferApi;
use mirrord_protocol::{
tcp::{DaemonTcp, LayerTcp, NewTcpConnection, TcpClose, TcpData},
tcp::{DaemonTcp, LayerTcp, NewTcpConnectionV1, TcpClose, TcpData},
ConnectionId, LogLevel,
};
use rstest::rstest;
Expand Down Expand Up @@ -525,7 +525,7 @@ mod test {
let (message, log) = api.recv().await.unwrap();
assert_eq!(
message,
DaemonTcp::NewConnection(NewTcpConnection {
DaemonTcp::NewConnectionV1(NewTcpConnectionV1 {
connection_id: 0,
remote_address: "1.1.1.1".parse().unwrap(),
destination_port: 80,
Expand Down Expand Up @@ -681,7 +681,7 @@ mod test {
let (message, log) = api.recv().await.unwrap();
assert_eq!(
message,
DaemonTcp::NewConnection(NewTcpConnection {
DaemonTcp::NewConnectionV1(NewTcpConnectionV1 {
connection_id: 0,
remote_address: session_id.source_addr.into(),
destination_port: session_id.dest_port,
Expand Down Expand Up @@ -784,7 +784,7 @@ mod test {
assert_eq!(log, None);
assert_eq!(
msg,
DaemonTcp::NewConnection(NewTcpConnection {
DaemonTcp::NewConnectionV1(NewTcpConnectionV1 {
connection_id: i as ConnectionId,
remote_address: source_addr.into(),
destination_port: 80,
Expand Down Expand Up @@ -817,7 +817,7 @@ mod test {
assert_eq!(log, None);
assert_eq!(
msg,
DaemonTcp::NewConnection(NewTcpConnection {
DaemonTcp::NewConnectionV1(NewTcpConnectionV1 {
connection_id: TcpSnifferApi::CONNECTION_CHANNEL_SIZE as ConnectionId,
remote_address: source_addr.into(),
destination_port: 80,
Expand Down
4 changes: 2 additions & 2 deletions mirrord/agent/src/sniffer/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::ops::RangeInclusive;

use futures::{stream::FuturesUnordered, StreamExt};
use mirrord_protocol::{
tcp::{DaemonTcp, LayerTcp, NewTcpConnection, TcpClose, TcpData},
tcp::{DaemonTcp, LayerTcp, NewTcpConnectionV1, TcpClose, TcpData},
ConnectionId, LogMessage, Port,
};
use tokio::sync::{
Expand Down Expand Up @@ -111,7 +111,7 @@ impl TcpSnifferApi {
self.connections.insert(id, StreamNotifyClose::new(BroadcastStream::new(conn.data)));

Ok((
DaemonTcp::NewConnection(NewTcpConnection {
DaemonTcp::NewConnectionV1(NewTcpConnectionV1 {
connection_id: id,
remote_address: conn.session_id.source_addr.into(),
local_address: conn.session_id.dest_addr.into(),
Expand Down
2 changes: 1 addition & 1 deletion mirrord/agent/src/sniffer/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub(crate) struct SnifferCommand {
/// New TCP connection picked up by [`TcpConnectionSniffer`](super::TcpConnectionSniffer).
pub(crate) struct SniffedConnection {
/// Parameters of this connection's TCP session.
/// Can be used to create [`NewTcpConnection`](mirrord_protocol::tcp::NewTcpConnection).
/// Can be used to create [`NewTcpConnectionV1`](mirrord_protocol::tcp::NewTcpConnectionV1).
pub session_id: TcpSessionDirectionId,
/// For receiving data from this connection.
pub data: broadcast::Receiver<Vec<u8>>,
Expand Down
14 changes: 7 additions & 7 deletions mirrord/agent/src/steal/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use mirrord_protocol::{
tcp::{
ChunkedRequest, ChunkedRequestBodyV1, ChunkedRequestErrorV1, ChunkedRequestErrorV2,
ChunkedRequestStartV1, ChunkedRequestStartV2, DaemonTcp, HttpRequest, HttpRequestMetadata,
HttpRequestTransportType, InternalHttpBody, InternalHttpBodyFrame, InternalHttpBodyNew,
IncomingTrafficTransportType, InternalHttpBody, InternalHttpBodyFrame, InternalHttpBodyNew,
InternalHttpRequest, StealType, TcpClose, TcpData, HTTP_CHUNKED_REQUEST_V2_VERSION,
HTTP_CHUNKED_REQUEST_VERSION, HTTP_FRAMED_VERSION,
},
Expand Down Expand Up @@ -49,7 +49,7 @@ struct MatchedHttpRequest {
request_id: RequestId,
request: Request<Incoming>,
metadata: HttpRequestMetadata,
transport: HttpRequestTransportType,
transport: IncomingTrafficTransportType,
}

impl MatchedHttpRequest {
Expand All @@ -58,7 +58,7 @@ impl MatchedHttpRequest {
request_id: RequestId,
request: Request<Incoming>,
metadata: HttpRequestMetadata,
transport: HttpRequestTransportType,
transport: IncomingTrafficTransportType,
) -> Self {
HTTP_REQUEST_IN_PROGRESS_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

Expand Down Expand Up @@ -517,7 +517,7 @@ impl TcpConnectionStealer {

let _ = client
.tx
.send(StealerMessage::TcpSteal(DaemonTcp::NewConnection(
.send(StealerMessage::TcpSteal(DaemonTcp::NewConnectionV1(
connection,
)))
.await;
Expand Down Expand Up @@ -785,7 +785,7 @@ mod test {
use hyper_util::rt::TokioIo;
use mirrord_protocol::tcp::{
ChunkedRequest, DaemonTcp, Filter, HttpFilter, HttpRequestMetadata,
HttpRequestTransportType, InternalHttpBodyFrame, StealType,
IncomingTrafficTransportType, InternalHttpBodyFrame, StealType,
};
use rstest::rstest;
use tokio::{
Expand Down Expand Up @@ -896,7 +896,7 @@ mod test {
source: "1.3.3.7:1337".parse().unwrap(),
destination: "2.1.3.7:80".parse().unwrap(),
},
transport: HttpRequestTransportType::Tcp,
transport: IncomingTrafficTransportType::Tcp,
request_id: 0,
request,
});
Expand Down Expand Up @@ -972,7 +972,7 @@ mod test {
source: "1.3.3.7:1337".parse().unwrap(),
destination: "2.1.3.7:80".parse().unwrap(),
},
transport: HttpRequestTransportType::Tcp,
transport: IncomingTrafficTransportType::Tcp,
request_id: 0,
request,
});
Expand Down
10 changes: 5 additions & 5 deletions mirrord/agent/src/steal/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use hyper::{body::Incoming, Request, Response};
use mirrord_protocol::{
tcp::{HttpRequestMetadata, HttpRequestTransportType, NewTcpConnection},
tcp::{HttpRequestMetadata, IncomingTrafficTransportType, NewTcpConnectionV1},
ConnectionId, LogMessage, RequestId,
};
use original_destination::OriginalDestination;
Expand Down Expand Up @@ -133,12 +133,12 @@ pub enum ConnectionMessageOut {
request: Request<Incoming>,
id: RequestId,
metadata: HttpRequestMetadata,
transport: HttpRequestTransportType,
transport: IncomingTrafficTransportType,
},
/// Subscribed the client to a new TCP connection.
///
/// This variant translates to
/// [`DaemonTcp::NewConnection`](mirrord_protocol::tcp::DaemonTcp::NewConnection).
/// [`DaemonTcp::NewConnectionV1`](mirrord_protocol::tcp::DaemonTcp::NewConnectionV1).
///
/// # Note
///
Expand All @@ -147,7 +147,7 @@ pub enum ConnectionMessageOut {
/// should follow.
SubscribedTcp {
client_id: ClientId,
connection: NewTcpConnection,
connection: NewTcpConnectionV1,
},
/// Subscribed the client to a new filtered HTTP connection.
///
Expand Down Expand Up @@ -489,7 +489,7 @@ impl ConnectionTask {
self.tx
.send(ConnectionMessageOut::SubscribedTcp {
client_id,
connection: NewTcpConnection {
connection: NewTcpConnectionV1 {
connection_id: self.connection_id,
remote_address: source.ip(),
source_port: source.port(),
Expand Down
26 changes: 13 additions & 13 deletions mirrord/agent/src/steal/connections/filtered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use hyper::{
use hyper_util::rt::{TokioExecutor, TokioIo};
use mirrord_protocol::{
tcp::{
HttpRequestMetadata, HttpRequestTransportType, HTTP_CHUNKED_REQUEST_V2_VERSION,
HttpRequestMetadata, IncomingTrafficTransportType, HTTP_CHUNKED_REQUEST_V2_VERSION,
HTTP_FILTERED_UPGRADE_VERSION,
},
ConnectionId, LogMessage, RequestId,
Expand Down Expand Up @@ -712,11 +712,11 @@ where
self.next_request_id += 1;

let transport = match self.original_destination.connector() {
Some(connector) => HttpRequestTransportType::Tls {
Some(connector) => IncomingTrafficTransportType::Tls {
alpn_protocol: connector.alpn_protocol().map(Vec::from),
server_name: connector.server_name().map(|name| name.to_str().into()),
},
None => HttpRequestTransportType::Tcp,
None => IncomingTrafficTransportType::Tcp,
};

tx.send(ConnectionMessageOut::Request {
Expand Down Expand Up @@ -1327,7 +1327,7 @@ mod test {
connection_id: TestSetup::CONNECTION_ID,
id,
metadata: HttpRequestMetadata::V1 { destination, .. },
transport: HttpRequestTransportType::Tcp,
transport: IncomingTrafficTransportType::Tcp,
..
} => {
assert_eq!(received_client_id, client_id);
Expand Down Expand Up @@ -1424,7 +1424,7 @@ mod test {
connection_id: TestSetup::CONNECTION_ID,
id,
metadata: HttpRequestMetadata::V1 { destination, .. },
transport: HttpRequestTransportType::Tcp,
transport: IncomingTrafficTransportType::Tcp,
..
} => {
assert_eq!(received_client_id, subscribed_client_id);
Expand Down Expand Up @@ -1511,7 +1511,7 @@ mod test {
connection_id: TestSetup::CONNECTION_ID,
id,
metadata: HttpRequestMetadata::V1 { destination, .. },
transport: HttpRequestTransportType::Tcp,
transport: IncomingTrafficTransportType::Tcp,
..
} => {
assert_eq!(destination, setup.original_address);
Expand Down Expand Up @@ -1551,7 +1551,7 @@ mod test {
connection_id: TestSetup::CONNECTION_ID,
id,
metadata: HttpRequestMetadata::V1 { destination, .. },
transport: HttpRequestTransportType::Tcp,
transport: IncomingTrafficTransportType::Tcp,
..
} => {
assert_eq!(destination, setup.original_address);
Expand Down Expand Up @@ -1617,7 +1617,7 @@ mod test {
connection_id: TestSetup::CONNECTION_ID,
id,
metadata: HttpRequestMetadata::V1 { destination, .. },
transport: HttpRequestTransportType::Tcp,
transport: IncomingTrafficTransportType::Tcp,
..
} => {
assert_eq!(destination, setup.original_address);
Expand Down Expand Up @@ -1702,7 +1702,7 @@ mod test {
connection_id: TestSetup::CONNECTION_ID,
id,
metadata: HttpRequestMetadata::V1 { destination, .. },
transport: HttpRequestTransportType::Tcp,
transport: IncomingTrafficTransportType::Tcp,
..
} => {
assert_eq!(destination, setup.original_address);
Expand Down Expand Up @@ -1755,7 +1755,7 @@ mod test {
id,
metadata: HttpRequestMetadata::V1 { destination, .. },
request,
transport: HttpRequestTransportType::Tcp,
transport: IncomingTrafficTransportType::Tcp,
..
} => {
assert_eq!(destination, setup.original_address);
Expand Down Expand Up @@ -1888,7 +1888,7 @@ mod test {
connection_id: TestSetup::CONNECTION_ID,
id,
metadata: HttpRequestMetadata::V1 { destination, .. },
transport: HttpRequestTransportType::Tcp,
transport: IncomingTrafficTransportType::Tcp,
..
} => {
assert_eq!(destination, setup.original_address);
Expand Down Expand Up @@ -2136,7 +2136,7 @@ mod test {
connection_id: 0,
id: 0,
transport:
HttpRequestTransportType::Tls {
IncomingTrafficTransportType::Tls {
alpn_protocol,
server_name,
},
Expand Down Expand Up @@ -2221,7 +2221,7 @@ mod test {
connection_id: TestSetup::CONNECTION_ID,
id,
metadata: HttpRequestMetadata::V1 { destination, .. },
transport: HttpRequestTransportType::Tcp,
transport: IncomingTrafficTransportType::Tcp,
..
} => {
assert_eq!(received_client_id, 0);
Expand Down
18 changes: 9 additions & 9 deletions mirrord/cli/src/port_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ mod test {
},
tcp::{
DaemonTcp, Filter, HttpRequest, HttpResponse, InternalHttpBody, InternalHttpBodyFrame,
InternalHttpRequest, InternalHttpResponse, LayerTcp, LayerTcpSteal, NewTcpConnection,
InternalHttpRequest, InternalHttpResponse, LayerTcp, LayerTcpSteal, NewTcpConnectionV1,
StealType, TcpClose, TcpData,
},
ClientMessage, DaemonMessage,
Expand Down Expand Up @@ -1354,8 +1354,8 @@ mod test {

// send new connection from agent and some data
test_connection
.send(DaemonMessage::Tcp(DaemonTcp::NewConnection(
NewTcpConnection {
.send(DaemonMessage::Tcp(DaemonTcp::NewConnectionV1(
NewTcpConnectionV1 {
connection_id: 1,
remote_address,
destination_port,
Expand Down Expand Up @@ -1429,8 +1429,8 @@ mod test {

// send new connection from agent and some data
test_connection
.send(DaemonMessage::TcpSteal(DaemonTcp::NewConnection(
NewTcpConnection {
.send(DaemonMessage::TcpSteal(DaemonTcp::NewConnectionV1(
NewTcpConnectionV1 {
connection_id: 1,
remote_address,
destination_port,
Expand Down Expand Up @@ -1526,8 +1526,8 @@ mod test {

// send new connections from agent and some data
test_connection
.send(DaemonMessage::Tcp(DaemonTcp::NewConnection(
NewTcpConnection {
.send(DaemonMessage::Tcp(DaemonTcp::NewConnectionV1(
NewTcpConnectionV1 {
connection_id: 1,
remote_address,
destination_port: destination_port_1,
Expand All @@ -1539,8 +1539,8 @@ mod test {
let mut stream_1 = listener_1.accept().await.unwrap().0;

test_connection
.send(DaemonMessage::Tcp(DaemonTcp::NewConnection(
NewTcpConnection {
.send(DaemonMessage::Tcp(DaemonTcp::NewConnectionV1(
NewTcpConnectionV1 {
connection_id: 2,
remote_address,
destination_port: destination_port_2,
Expand Down
Loading
Loading