Skip to content

Commit 5cd2414

Browse files
Mikolaj Maleckimaxsharabayko
authored andcommitted
[core] Fix stalled connection that should break after rogue NAK/ACK reception
1 parent a6b9959 commit 5cd2414

File tree

2 files changed

+194
-18
lines changed

2 files changed

+194
-18
lines changed

scripts/srt-proxy.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
#!/usr/bin/python3
2+
3+
# This script is a testing tool that inserts a packet interceptor for
4+
# a bond between two local ports and modify a packet that is sent over
5+
# the UDP link used by the SRT connection.
6+
7+
# Provided by:
8+
# https://github.com/FelixSodermanNeti
9+
10+
import socket, select, argparse
11+
12+
class UDPProxy:
13+
def __init__(self, caller_host, caller_port, listener_host, listener_port, break_at_pkt_NAK, break_at_pkt_ACK):
14+
15+
# Listening socket configuration
16+
self.caller_host = caller_host
17+
self.caller_port = caller_port
18+
self.caller_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
19+
self.caller_socket.bind((self.caller_host, self.caller_port))
20+
21+
# Forwarding destination configuration
22+
self.listener_host = listener_host
23+
self.listener_port = listener_port
24+
25+
# Client address tracking
26+
self.client_address = None
27+
28+
# Proxy state
29+
self.running = False
30+
31+
# Break SRT
32+
self.sentPacketCounter = 0
33+
self.recivedAckCounter = 0
34+
self.PACKETBREAK_NAK = break_at_pkt_NAK
35+
self.PACKETBREAK_ACK = break_at_pkt_ACK
36+
37+
def start(self):
38+
try:
39+
self.running = True
40+
print(
41+
f"UDP Proxy started. Listening on {self.caller_host}:{self.caller_port}. "
42+
f"Forwarding to {self.listener_host}:{self.listener_port}"
43+
)
44+
45+
# Create forward socket
46+
listener_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
47+
48+
# Use select for non-blocking I/O
49+
while self.running:
50+
readable, _, _ = select.select([self.caller_socket, listener_socket], [], [], 1)
51+
52+
for sock in readable:
53+
if sock == self.caller_socket:
54+
# Receive from client
55+
data, addr = sock.recvfrom(65535)
56+
self.client_address = addr
57+
58+
#Modify the NAK
59+
processedData = data
60+
# NAK
61+
if(data[1] == 0x03 and len(data) == 24 and self.PACKETBREAK_NAK != 0): # IS A NAK, This should check for a 1 in the first byte aswell.
62+
print("Modifying NAK.")
63+
newBytes = b"\xff\xff\xff\xff"
64+
processedData = data[:20] + newBytes + data[20 + len(newBytes):]
65+
"""
66+
print("NAK BEFORE:")
67+
print(string_bytearray_in_rows(data=data))
68+
print("\n\nModifying NAK.\n\n")
69+
print("ŃAK AFTER:")
70+
print(string_bytearray_in_rows(data=processedData))
71+
"""
72+
73+
# ACK
74+
if(data[1] == 0x02 and len(data) == 44): # IS A ACK
75+
self.recivedAckCounter += 1
76+
if (self.PACKETBREAK_ACK != 0 and self.recivedAckCounter == self.PACKETBREAK_ACK):
77+
print("Modifying ACK.")
78+
newBytes = b"\x7f\xff\xff\xff"
79+
offset = 16
80+
processedData = data[:offset] + newBytes + data[offset + len(newBytes):]
81+
"""
82+
print("ACK BEFORE:")
83+
print(string_bytearray_in_rows(data=data))
84+
print("\n\nModifying ACK.\n\n")
85+
print("ACK AFTER:")
86+
print(string_bytearray_in_rows(data=processedData))
87+
"""
88+
listener_socket.sendto(
89+
processedData,
90+
(self.listener_host, self.listener_port)
91+
)
92+
93+
elif sock == listener_socket:
94+
# Receive from server
95+
data, addr = sock.recvfrom(65535)
96+
97+
# Drop a sequence of data packets (as to atleast 3 to trigger an NAK with a range)
98+
if len(data) == 1332: # Packet with data
99+
self.sentPacketCounter += 1
100+
if (self.PACKETBREAK_NAK != 0 and self.sentPacketCounter in range(self.PACKETBREAK_NAK, self.PACKETBREAK_NAK+5)):
101+
print("Discarding packet.")
102+
continue
103+
104+
if not self.client_address:
105+
print("No client to send data to!")
106+
continue
107+
108+
self.caller_socket.sendto(
109+
data,
110+
self.client_address
111+
)
112+
113+
except Exception as e:
114+
print(f"Proxy error: {e}")
115+
116+
finally:
117+
self.stop()
118+
119+
def stop(self):
120+
self.running = False
121+
self.caller_socket.close()
122+
print("UDP Proxy stopped.")
123+
124+
125+
def string_bytearray_in_rows(data, bytes_per_row=4):
126+
tmpString = "\n"
127+
for i in range(0, len(data), bytes_per_row):
128+
row = data[i:i+bytes_per_row]
129+
tmpString += f"{i}: "
130+
tmpString += (" ".join(f"{byte:02x}" for byte in row))
131+
tmpString += "\n"
132+
tmpString += "\n"
133+
return tmpString
134+
135+
def main():
136+
# Parse command-line arguments
137+
parser = argparse.ArgumentParser(description='UDP Proxy')
138+
parser.add_argument('--caller-host', default='127.0.0.1',
139+
help='Host to listen on (srt caller) (default: 127.0.0.1)')
140+
parser.add_argument('--caller-port', type=int, required=True,
141+
help='Port to listen on (srt caller)')
142+
parser.add_argument('--listener-host', default='127.0.0.1',
143+
help='Destination host to forward packets (srt listener)')
144+
parser.add_argument('--listener-port', type=int, required=True,
145+
help='Destination port to forward packets (srt listener)')
146+
parser.add_argument('--break-at-pkt-NAK', type=int, default=0,
147+
help='At what datapacket should the NAK -1 be sent (0=never)')
148+
parser.add_argument('--break-at-pkt-ACK', type=int, default=0,
149+
help='At what datapacket should the ACK -1 be sent (0=never)')
150+
args = parser.parse_args()
151+
152+
# Create and start proxy
153+
proxy = UDPProxy(
154+
args.caller_host,
155+
args.caller_port,
156+
args.listener_host,
157+
args.listener_port,
158+
args.break_at_pkt_NAK,
159+
args.break_at_pkt_ACK
160+
)
161+
162+
try:
163+
proxy.start()
164+
except KeyboardInterrupt:
165+
proxy.stop()
166+
167+
if __name__ == '__main__':
168+
main()

srtcore/core.cpp

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8483,39 +8483,44 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_
84838483

84848484
// Protect packet retransmission
84858485
{
8486-
ScopedLock ack_lock(m_RecvAckLock);
8486+
UniqueLock ack_lock(m_RecvAckLock);
84878487

84888488
// Check the validation of the ack
84898489
if (CSeqNo::seqcmp(ackdata_seqno, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0)
84908490
{
8491+
ack_lock.unlock();
8492+
84918493
// this should not happen: attack or bug
84928494
LOGC(gglog.Error,
84938495
log << CONID() << "ATTACK/IPE: incoming ack seq " << ackdata_seqno << " exceeds current "
84948496
<< m_iSndCurrSeqNo << " by " << (CSeqNo::seqoff(m_iSndCurrSeqNo, ackdata_seqno) - 1) << "!");
84958497
m_bBroken = true;
84968498
m_iBrokenCounter = 0;
8499+
8500+
updateBrokenConnection();
8501+
completeBrokenConnectionDependencies(SRT_ESECFAIL); // LOCKS!
84978502
return;
84988503
}
84998504

8500-
if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0)
8501-
{
8502-
const int cwnd1 = std::min<int>(m_iFlowWindowSize, m_iCongestionWindow);
8503-
const bool bWasStuck = cwnd1<= getFlightSpan();
8504-
// Update Flow Window Size, must update before and together with m_iSndLastAck
8505-
m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT];
8506-
m_iSndLastAck = ackdata_seqno;
8507-
m_tsLastRspAckTime = currtime;
8508-
m_iReXmitCount = 1; // Reset re-transmit count since last ACK
8509-
8510-
const int cwnd = std::min<int>(m_iFlowWindowSize, m_iCongestionWindow);
8511-
if (bWasStuck && cwnd > getFlightSpan())
8505+
if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0)
85128506
{
8513-
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);
8514-
HLOGC(gglog.Debug,
8515-
log << CONID() << "processCtrlAck: could reschedule SND. iFlowWindowSize " << m_iFlowWindowSize
8516-
<< " SPAN " << getFlightSpan() << " ackdataseqno %" << ackdata_seqno);
8507+
const int cwnd1 = std::min<int>(m_iFlowWindowSize, m_iCongestionWindow);
8508+
const bool bWasStuck = cwnd1<= getFlightSpan();
8509+
// Update Flow Window Size, must update before and together with m_iSndLastAck
8510+
m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT];
8511+
m_iSndLastAck = ackdata_seqno;
8512+
m_tsLastRspAckTime = currtime;
8513+
m_iReXmitCount = 1; // Reset re-transmit count since last ACK
8514+
8515+
const int cwnd = std::min<int>(m_iFlowWindowSize, m_iCongestionWindow);
8516+
if (bWasStuck && cwnd > getFlightSpan())
8517+
{
8518+
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);
8519+
HLOGC(gglog.Debug,
8520+
log << CONID() << "processCtrlAck: could reschedule SND. iFlowWindowSize " << m_iFlowWindowSize
8521+
<< " SPAN " << getFlightSpan() << " ackdataseqno %" << ackdata_seqno);
8522+
}
85178523
}
8518-
}
85198524

85208525
/*
85218526
* We must not ignore full ack received by peer
@@ -8924,6 +8929,9 @@ void srt::CUDT::processCtrlLossReport(const CPacket& ctrlpkt)
89248929
// this should not happen: attack or bug
89258930
m_bBroken = true;
89268931
m_iBrokenCounter = 0;
8932+
8933+
updateBrokenConnection();
8934+
completeBrokenConnectionDependencies(SRT_ESECFAIL); // LOCKS!
89278935
return;
89288936
}
89298937

0 commit comments

Comments
 (0)