Skip to content

Commit e0c440a

Browse files
authored
Improved listen_ports integration test (#3306)
1 parent 801994a commit e0c440a

File tree

4 files changed

+101
-70
lines changed

4 files changed

+101
-70
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improved `listen_ports` integration test to reduce flakes.
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,33 @@
1+
//! Test application that listens on a sequence of localhost ports.
2+
//! On each port, it accepts one connection and expects to receive the string "HELLO" and a
3+
//! shutdown.
4+
//!
5+
//! Reads the semicolon-separated port list from the `APP_PORTS` environment variable.
6+
17
use std::{
28
io::Read,
3-
net::{SocketAddr, TcpListener},
9+
net::{Ipv4Addr, SocketAddr, TcpListener},
410
};
511

12+
const PORTS_ENV: &str = "APP_PORTS";
13+
614
fn main() {
7-
let mut buf = [0_u8; 5];
15+
let ports = std::env::var(PORTS_ENV)
16+
.unwrap()
17+
.split(';')
18+
.map(|port| port.parse::<u16>().unwrap())
19+
.collect::<Vec<_>>();
20+
21+
let mut buf = String::new();
822

9-
let addr: SocketAddr = "127.0.0.1:80".parse().unwrap();
10-
// Check `listen_ports` work
11-
let listener = TcpListener::bind(addr).unwrap();
12-
// verify user could connect on port 51222
13-
let (mut conn, _) = listener.accept().unwrap();
14-
conn.read_exact(buf.as_mut_slice()).unwrap();
15-
assert_eq!(buf.as_slice(), b"HELLO");
23+
for port in ports {
24+
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port);
25+
let listener = TcpListener::bind(addr).unwrap();
1626

17-
// Check specific port but not in listen_ports works
18-
let addr: SocketAddr = "127.0.0.1:40000".parse().unwrap();
19-
let listener = TcpListener::bind(addr).unwrap();
20-
// verify user could connect on port 40000
21-
let (mut conn, _) = listener.accept().unwrap();
22-
conn.read_exact(buf.as_mut_slice()).unwrap();
23-
assert_eq!(buf.as_slice(), b"HELLO");
27+
let (mut conn, _) = listener.accept().unwrap();
28+
conn.read_to_string(&mut buf).unwrap();
29+
assert_eq!(buf, "HELLO");
2430

25-
// Checking that random port works is hard because we need to make libc return an error
26-
// so we miss that for now.
31+
buf.clear();
32+
}
2733
}

mirrord/layer/tests/configs/listen_ports.json

-10
This file was deleted.

mirrord/layer/tests/listen_ports.rs

+76-42
Original file line numberDiff line numberDiff line change
@@ -2,75 +2,109 @@
22
#![feature(assert_matches)]
33
#![warn(clippy::indexing_slicing)]
44

5-
use std::{assert_matches::assert_matches, path::Path, time::Duration};
5+
use std::{
6+
assert_matches::assert_matches,
7+
io::Write,
8+
net::{Ipv4Addr, SocketAddr},
9+
path::Path,
10+
time::Duration,
11+
};
612

713
use mirrord_protocol::{
814
tcp::{DaemonTcp, LayerTcpSteal, StealType},
915
ClientMessage, DaemonMessage,
1016
};
1117
use rstest::rstest;
12-
use tokio::io::AsyncWriteExt;
18+
use tokio::{io::AsyncWriteExt, net::TcpListener};
1319

1420
mod common;
1521

1622
pub use common::*;
1723
use tokio::net::TcpStream;
1824

19-
/// Start an application (and load the layer into it) that listens on a port that is configured to
20-
/// be ignored, and verify that no messages are sent to the agent.
25+
/// Verifies that the layer respects `feature.network.incoming.listen_ports` mapping.
2126
#[rstest]
2227
#[tokio::test]
2328
#[timeout(Duration::from_secs(60))]
2429
async fn listen_ports(
2530
#[values(Application::RustListenPorts)] application: Application,
2631
dylib_path: &Path,
27-
config_dir: &Path,
2832
) {
29-
let config_path = config_dir.join("listen_ports.json");
33+
// We need to know ports on which the application listens,
34+
// because we want to make the connections from the test code
35+
// (to verify that the layer respects `feature.network.incoming.listen_ports` mapping).
36+
// If we just use const ports, this test is flaky in the CI, as the app tends to fail with
37+
// EADDRINUSE. Here we bind to random ports, drop the sockets, and hope that the ports
38+
// remain free for the app to use.
39+
let port_1 = TcpListener::bind(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))
40+
.await
41+
.unwrap()
42+
.local_addr()
43+
.unwrap()
44+
.port();
45+
let port_2 = TcpListener::bind(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))
46+
.await
47+
.unwrap()
48+
.local_addr()
49+
.unwrap()
50+
.port();
51+
52+
let config = serde_json::json!({
53+
"feature": {
54+
"network": {
55+
"incoming": {
56+
"mode": "steal",
57+
"listen_ports": [[80, port_1]]
58+
}
59+
},
60+
"fs": "local"
61+
}
62+
});
63+
// Ports for the application code to use.
64+
// Due to the port mapping, the application should effectively listen on `port_1;port_2`.
65+
let app_ports = format!("80;{port_2}");
66+
67+
let mut config_file = tempfile::NamedTempFile::with_suffix(".json").unwrap();
68+
config_file
69+
.as_file_mut()
70+
.write_all(serde_json::to_string(&config).unwrap().as_bytes())
71+
.unwrap();
72+
3073
let (mut test_process, mut intproxy) = application
3174
.start_process_with_layer(
3275
dylib_path,
33-
vec![
34-
("RUST_LOG", "mirrord=trace"),
35-
("MIRRORD_FILE_MODE", "local"),
36-
],
37-
Some(&config_path),
76+
vec![("RUST_LOG", "mirrord=trace"), ("APP_PORTS", &app_ports)],
77+
Some(config_file.path()),
3878
)
3979
.await;
4080

41-
assert_matches!(
42-
intproxy.recv().await,
43-
ClientMessage::TcpSteal(LayerTcpSteal::PortSubscribe(StealType::All(80)))
44-
);
45-
intproxy
46-
.send(DaemonMessage::TcpSteal(DaemonTcp::SubscribeResult(Ok(80))))
47-
.await;
48-
let mut stream = TcpStream::connect("127.0.0.1:51222").await.unwrap();
49-
println!("connected to listener at port 51222");
50-
stream.write_all(b"HELLO").await.unwrap();
51-
52-
assert_matches!(
53-
intproxy.recv().await,
54-
ClientMessage::TcpSteal(LayerTcpSteal::PortSubscribe(StealType::All(40000)))
55-
);
56-
intproxy
57-
.send(DaemonMessage::TcpSteal(DaemonTcp::SubscribeResult(Ok(
58-
40000,
59-
))))
60-
.await;
61-
let mut stream = TcpStream::connect("127.0.0.1:40000").await.unwrap();
62-
println!("connected to listener at port 40000");
63-
stream.write_all(b"HELLO").await.unwrap();
64-
65-
loop {
66-
match intproxy.try_recv().await {
67-
Some(ClientMessage::TcpSteal(LayerTcpSteal::PortUnsubscribe(40000))) => {}
68-
Some(ClientMessage::TcpSteal(LayerTcpSteal::PortUnsubscribe(80))) => {}
69-
None => break,
70-
other => panic!("unexpected message: {:?}", other),
71-
}
81+
for (subscription_port, local_port) in [(80, port_1), (port_2, port_2)] {
82+
assert_matches!(
83+
intproxy.recv().await,
84+
ClientMessage::TcpSteal(LayerTcpSteal::PortSubscribe(StealType::All(port)))
85+
if port == subscription_port,
86+
);
87+
intproxy
88+
.send(DaemonMessage::TcpSteal(DaemonTcp::SubscribeResult(Ok(
89+
subscription_port,
90+
))))
91+
.await;
92+
let mut stream =
93+
TcpStream::connect(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), local_port))
94+
.await
95+
.unwrap();
96+
println!("connected to the application on port {local_port} (application code used {subscription_port})");
97+
stream.write_all(b"HELLO").await.unwrap();
98+
stream.shutdown().await.unwrap();
99+
assert_matches!(
100+
intproxy.recv().await,
101+
ClientMessage::TcpSteal(LayerTcpSteal::PortUnsubscribe(port))
102+
if port == subscription_port,
103+
);
72104
}
105+
73106
assert_eq!(intproxy.try_recv().await, None);
107+
74108
test_process.wait_assert_success().await;
75109
test_process.assert_no_error_in_stderr().await;
76110
test_process.assert_no_error_in_stdout().await;

0 commit comments

Comments
 (0)