Skip to content

Commit 14e3f9d

Browse files
committed
fixed #1242 - added streamlocal-forward support (remote UNIX socket forwarding)
1 parent d6b9864 commit 14e3f9d

File tree

6 files changed

+234
-37
lines changed

6 files changed

+234
-37
lines changed

warpgate-core/src/recordings/traffic.rs

+44-21
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@ pub struct TrafficRecorder {
1717
}
1818

1919
#[derive(Debug)]
20-
pub struct TrafficConnectionParams {
21-
pub src_addr: Ipv4Addr,
22-
pub src_port: u16,
23-
pub dst_addr: Ipv4Addr,
24-
pub dst_port: u16,
20+
pub enum TrafficConnectionParams {
21+
Tcp {
22+
src_addr: Ipv4Addr,
23+
src_port: u16,
24+
dst_addr: Ipv4Addr,
25+
dst_port: u16,
26+
},
27+
Socket {
28+
socket_path: String,
29+
},
2530
}
2631

2732
impl TrafficRecorder {
@@ -136,41 +141,59 @@ impl ConnectionRecorder {
136141
where
137142
F: FnOnce(packet::ip::v4::Builder) -> Result<Bytes>,
138143
{
139-
f(packet::ip::v4::Builder::default()
140-
.protocol(packet::ip::Protocol::Tcp)?
141-
.source(self.params.src_addr)?
142-
.destination(self.params.dst_addr)?)
144+
match self.params {
145+
TrafficConnectionParams::Socket { .. } => f(packet::ip::v4::Builder::default()
146+
.protocol(packet::ip::Protocol::Tcp)?
147+
.source(Ipv4Addr::UNSPECIFIED)?
148+
.destination(Ipv4Addr::BROADCAST)?),
149+
TrafficConnectionParams::Tcp {
150+
src_addr, dst_addr, ..
151+
} => f(packet::ip::v4::Builder::default()
152+
.protocol(packet::ip::Protocol::Tcp)?
153+
.source(src_addr)?
154+
.destination(dst_addr)?),
155+
}
143156
}
144157

145158
fn ip_packet_rx<F>(&self, f: F) -> Result<Bytes>
146159
where
147160
F: FnOnce(packet::ip::v4::Builder) -> Result<Bytes>,
148161
{
149-
f(packet::ip::v4::Builder::default()
150-
.protocol(packet::ip::Protocol::Tcp)?
151-
.source(self.params.dst_addr)?
152-
.destination(self.params.src_addr)?)
162+
match self.params {
163+
TrafficConnectionParams::Socket { .. } => f(packet::ip::v4::Builder::default()
164+
.protocol(packet::ip::Protocol::Tcp)?
165+
.source(Ipv4Addr::BROADCAST)?
166+
.destination(Ipv4Addr::UNSPECIFIED)?),
167+
TrafficConnectionParams::Tcp {
168+
src_addr, dst_addr, ..
169+
} => f(packet::ip::v4::Builder::default()
170+
.protocol(packet::ip::Protocol::Tcp)?
171+
.source(dst_addr)?
172+
.destination(src_addr)?),
173+
}
153174
}
154175

155176
fn tcp_packet_tx<F>(&self, f: F) -> Result<Bytes>
156177
where
157178
F: FnOnce(packet::tcp::Builder) -> Result<Bytes>,
158179
{
159-
self.ip_packet_tx(|b| {
160-
f(b.tcp()?
161-
.source(self.params.src_port)?
162-
.destination(self.params.dst_port)?)
180+
self.ip_packet_tx(|b| match self.params {
181+
TrafficConnectionParams::Socket { .. } => f(b.tcp()?.source(0)?.destination(0)?),
182+
TrafficConnectionParams::Tcp {
183+
src_port, dst_port, ..
184+
} => f(b.tcp()?.source(src_port)?.destination(dst_port)?),
163185
})
164186
}
165187

166188
fn tcp_packet_rx<F>(&self, f: F) -> Result<Bytes>
167189
where
168190
F: FnOnce(packet::tcp::Builder) -> Result<Bytes>,
169191
{
170-
self.ip_packet_rx(|b| {
171-
f(b.tcp()?
172-
.source(self.params.dst_port)?
173-
.destination(self.params.src_port)?)
192+
self.ip_packet_rx(|b| match self.params {
193+
TrafficConnectionParams::Socket { .. } => f(b.tcp()?.source(0)?.destination(0)?),
194+
TrafficConnectionParams::Tcp {
195+
src_port, dst_port, ..
196+
} => f(b.tcp()?.source(dst_port)?.destination(src_port)?),
174197
})
175198
}
176199

warpgate-protocol-ssh/src/client/handler.rs

+16-1
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@ use warpgate_common::{SessionId, TargetSSHOptions};
88
use warpgate_core::Services;
99

1010
use crate::known_hosts::{KnownHostValidationResult, KnownHosts};
11-
use crate::{ConnectionError, ForwardedTcpIpParams};
11+
use crate::{ConnectionError, ForwardedStreamlocalParams, ForwardedTcpIpParams};
1212

1313
#[derive(Debug)]
1414
pub enum ClientHandlerEvent {
1515
HostKeyReceived(PublicKey),
1616
HostKeyUnknown(PublicKey, oneshot::Sender<bool>),
1717
ForwardedTcpIp(Channel<Msg>, ForwardedTcpIpParams),
18+
ForwardedStreamlocal(Channel<Msg>, ForwardedStreamlocalParams),
1819
X11(Channel<Msg>, String, u32),
1920
Disconnect,
2021
}
@@ -146,6 +147,20 @@ impl russh::client::Handler for ClientHandler {
146147
));
147148
Ok(())
148149
}
150+
151+
async fn server_channel_open_forwarded_streamlocal(
152+
&mut self,
153+
channel: Channel<Msg>,
154+
socket_path: &str,
155+
_session: &mut Session,
156+
) -> Result<(), Self::Error> {
157+
let socket_path = socket_path.to_string();
158+
let _ = self.event_tx.send(ClientHandlerEvent::ForwardedStreamlocal(
159+
channel,
160+
ForwardedStreamlocalParams { socket_path },
161+
));
162+
Ok(())
163+
}
149164
}
150165

151166
impl Drop for ClientHandler {

warpgate-protocol-ssh/src/client/mod.rs

+50-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use warpgate_core::Services;
2929
use self::handler::ClientHandlerEvent;
3030
use super::{ChannelOperation, DirectTCPIPParams};
3131
use crate::client::handler::ClientHandlerError;
32-
use crate::{load_all_usable_private_keys, ForwardedTcpIpParams};
32+
use crate::{load_all_usable_private_keys, ForwardedStreamlocalParams, ForwardedTcpIpParams};
3333

3434
#[derive(Debug, thiserror::Error)]
3535
pub enum ConnectionError {
@@ -91,6 +91,7 @@ pub enum RCEvent {
9191
HostKeyReceived(PublicKey),
9292
HostKeyUnknown(PublicKey, oneshot::Sender<bool>),
9393
ForwardedTcpIp(Uuid, ForwardedTcpIpParams),
94+
ForwardedStreamlocal(Uuid, ForwardedStreamlocalParams),
9495
X11(Uuid, String, u32),
9596
}
9697

@@ -102,6 +103,8 @@ pub enum RCCommand {
102103
Channel(Uuid, ChannelOperation),
103104
ForwardTCPIP(String, u32),
104105
CancelTCPIPForward(String, u32),
106+
StreamlocalForward(String),
107+
CancelStreamlocalForward(String),
105108
Disconnect,
106109
}
107110

@@ -126,6 +129,7 @@ pub struct RemoteClient {
126129
channel_pipes: Arc<Mutex<HashMap<Uuid, UnboundedSender<ChannelOperation>>>>,
127130
pending_ops: Vec<(Uuid, ChannelOperation)>,
128131
pending_forwards: Vec<(String, u32)>,
132+
pending_streamlocal_forwards: Vec<String>,
129133
state: RCState,
130134
abort_rx: UnboundedReceiver<()>,
131135
inner_event_rx: UnboundedReceiver<InnerEvent>,
@@ -155,6 +159,7 @@ impl RemoteClient {
155159
channel_pipes: Arc::new(Mutex::new(HashMap::new())),
156160
pending_ops: vec![],
157161
pending_forwards: vec![],
162+
pending_streamlocal_forwards: vec![],
158163
state: RCState::NotInitialized,
159164
inner_event_rx,
160165
inner_event_tx: inner_event_tx.clone(),
@@ -309,6 +314,11 @@ impl RemoteClient {
309314
let id = self.setup_server_initiated_channel(channel).await?;
310315
let _ = self.tx.send(RCEvent::ForwardedTcpIp(id, params));
311316
}
317+
ClientHandlerEvent::ForwardedStreamlocal(channel, params) => {
318+
info!("New forwarded socket connection: {params:?}");
319+
let id = self.setup_server_initiated_channel(channel).await?;
320+
let _ = self.tx.send(RCEvent::ForwardedStreamlocal(id, params));
321+
}
312322
ClientHandlerEvent::X11(channel, originator_address, originator_port) => {
313323
info!("New X11 connection from {originator_address}:{originator_port:?}");
314324
let id = self.setup_server_initiated_channel(channel).await?;
@@ -355,10 +365,19 @@ impl RemoteClient {
355365
for (id, op) in ops {
356366
self.apply_channel_op(id, op).await?;
357367
}
368+
358369
let forwards = self.pending_forwards.drain(..).collect::<Vec<_>>();
359370
for (address, port) in forwards {
360371
self.tcpip_forward(address, port).await?;
361372
}
373+
374+
let forwards = self
375+
.pending_streamlocal_forwards
376+
.drain(..)
377+
.collect::<Vec<_>>();
378+
for socket_path in forwards {
379+
self.streamlocal_forward(socket_path).await?;
380+
}
362381
}
363382
Err(e) => {
364383
debug!("Connect error: {}", e);
@@ -376,6 +395,12 @@ impl RemoteClient {
376395
RCCommand::CancelTCPIPForward(address, port) => {
377396
self.cancel_tcpip_forward(address, port).await?;
378397
}
398+
RCCommand::StreamlocalForward(socket_path) => {
399+
self.streamlocal_forward(socket_path).await?;
400+
}
401+
RCCommand::CancelStreamlocalForward(socket_path) => {
402+
self.cancel_streamlocal_forward(socket_path).await?;
403+
}
379404
RCCommand::Disconnect => {
380405
self.disconnect().await;
381406
return Ok(true);
@@ -635,6 +660,30 @@ impl RemoteClient {
635660
Ok(())
636661
}
637662

663+
async fn streamlocal_forward(&mut self, socket_path: String) -> Result<(), SshClientError> {
664+
if let Some(session) = &self.session {
665+
let mut session = session.lock().await;
666+
session.streamlocal_forward(socket_path).await?;
667+
} else {
668+
self.pending_streamlocal_forwards.push(socket_path);
669+
}
670+
Ok(())
671+
}
672+
673+
async fn cancel_streamlocal_forward(
674+
&mut self,
675+
socket_path: String,
676+
) -> Result<(), SshClientError> {
677+
if let Some(session) = &self.session {
678+
let session = session.lock().await;
679+
session.cancel_streamlocal_forward(socket_path).await?;
680+
} else {
681+
self.pending_streamlocal_forwards
682+
.retain(|x| x != &socket_path);
683+
}
684+
Ok(())
685+
}
686+
638687
async fn disconnect(&mut self) {
639688
if let Some(session) = &mut self.session {
640689
let _ = session

warpgate-protocol-ssh/src/common.rs

+5
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ pub struct ForwardedTcpIpParams {
3838
pub originator_port: u32,
3939
}
4040

41+
#[derive(Clone, Debug)]
42+
pub struct ForwardedStreamlocalParams {
43+
pub socket_path: String,
44+
}
45+
4146
#[derive(Clone, Debug)]
4247
pub struct X11Request {
4348
pub single_conection: bool,

warpgate-protocol-ssh/src/server/russh_handler.rs

+39
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ pub enum ServerHandlerEvent {
4747
X11Request(ServerChannelId, X11Request, oneshot::Sender<()>),
4848
TcpIpForward(String, u32, oneshot::Sender<bool>),
4949
CancelTcpIpForward(String, u32, oneshot::Sender<bool>),
50+
StreamlocalForward(String, oneshot::Sender<bool>),
51+
CancelStreamlocalForward(String, oneshot::Sender<bool>),
5052
Disconnect,
5153
}
5254

@@ -477,6 +479,43 @@ impl russh::server::Handler for ServerHandler {
477479
}
478480
Ok(allowed)
479481
}
482+
483+
async fn streamlocal_forward(
484+
&mut self,
485+
socket_path: &str,
486+
session: &mut Session,
487+
) -> Result<bool, Self::Error> {
488+
let socket_path = socket_path.to_string();
489+
let (tx, rx) = oneshot::channel();
490+
self.send_event(ServerHandlerEvent::StreamlocalForward(socket_path, tx))?;
491+
let allowed = rx.await.unwrap_or(false);
492+
if allowed {
493+
session.request_success()
494+
} else {
495+
session.request_failure()
496+
}
497+
Ok(allowed)
498+
}
499+
500+
async fn cancel_streamlocal_forward(
501+
&mut self,
502+
socket_path: &str,
503+
session: &mut Session,
504+
) -> Result<bool, Self::Error> {
505+
let socket_path = socket_path.to_string();
506+
let (tx, rx) = oneshot::channel();
507+
self.send_event(ServerHandlerEvent::CancelStreamlocalForward(
508+
socket_path,
509+
tx,
510+
))?;
511+
let allowed = rx.await.unwrap_or(false);
512+
if allowed {
513+
session.request_success()
514+
} else {
515+
session.request_failure()
516+
}
517+
Ok(allowed)
518+
}
480519
}
481520

482521
impl Drop for ServerHandler {

0 commit comments

Comments
 (0)