Skip to content

Commit 924bf87

Browse files
committed
gateware.usb2.endpoints: add tests for isochronous endpoint streams
1 parent cef1921 commit 924bf87

File tree

1 file changed

+226
-0
lines changed

1 file changed

+226
-0
lines changed

tests/test_usb2_endpoints.py

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
#
2+
# This file is part of LUNA.
3+
#
4+
# Copyright (c) 2025 Great Scott Gadgets <[email protected]>
5+
# SPDX-License-Identifier: BSD-3-Clause
6+
7+
from luna.gateware.test import (
8+
LunaUSBGatewareTestCase,
9+
LunaSSGatewareTestCase,
10+
ss_domain_test_case,
11+
usb_domain_test_case,
12+
)
13+
14+
from luna.usb2 import (
15+
USBIsochronousStreamInEndpoint,
16+
USBIsochronousStreamOutEndpoint,
17+
)
18+
19+
20+
MAX_PACKET_SIZE = 512
21+
22+
class USBIsochronousStreamInEndpointTest(LunaUSBGatewareTestCase):
23+
FRAGMENT_UNDER_TEST = USBIsochronousStreamInEndpoint
24+
FRAGMENT_ARGUMENTS = {'endpoint_number': 1, 'max_packet_size': MAX_PACKET_SIZE}
25+
26+
def initialize_signals(self):
27+
# Configure the endpoint.
28+
yield self.dut.bytes_in_frame.eq(MAX_PACKET_SIZE)
29+
30+
# Pretend that our host is always targeting our endpoint.
31+
yield self.dut.interface.tokenizer.endpoint.eq(self.dut._endpoint_number)
32+
yield self.dut.interface.tokenizer.is_in.eq(1)
33+
34+
@usb_domain_test_case
35+
def test_single_packet_in(self):
36+
dut = self.dut
37+
38+
producer = dut.stream
39+
consumer = dut.interface.tx
40+
data = [b % 0xff for b in range(1, MAX_PACKET_SIZE + 1)]
41+
sent = []
42+
43+
# Before we see any data, our streams should all be invalid
44+
self.assertEqual((yield consumer.first), 0)
45+
self.assertEqual((yield consumer.last), 0)
46+
self.assertEqual((yield consumer.payload), 0)
47+
self.assertEqual((yield consumer.ready), 0)
48+
self.assertEqual((yield consumer.valid), 0)
49+
self.assertEqual((yield producer.payload), 0)
50+
self.assertEqual((yield producer.ready), 0)
51+
self.assertEqual((yield producer.valid), 0)
52+
53+
# Once we start a new frame ...
54+
yield dut.interface.tokenizer.new_frame.eq(1)
55+
yield
56+
57+
# ... but the host hasn't yet requested data from our endpoint;
58+
# our stream should still be at rest.
59+
self.assertEqual((yield consumer.first), 0)
60+
self.assertEqual((yield dut.data_requested), 0)
61+
62+
# When the host requests data ...
63+
yield dut.interface.tokenizer.ready_for_response.eq(1)
64+
yield
65+
66+
# ... we go out of State(IDLE) and can check that data_requested is strobed.
67+
self.assertEqual((yield dut.data_requested), 1)
68+
69+
# Then one cycle later...
70+
yield
71+
72+
# ... we will be in State(SEND_DATA) and our consumer stream becomes valid.
73+
self.assertEqual((yield consumer.first), 1)
74+
self.assertEqual((yield consumer.last), 0)
75+
self.assertEqual((yield consumer.valid), 1)
76+
77+
# Once the producer has data available ...
78+
yield producer.valid.eq(1)
79+
yield producer.payload.eq(data[0])
80+
81+
# ... but we haven't advanced yet ...
82+
self.assertEqual((yield producer.ready), 0)
83+
self.assertEqual((yield consumer.ready), 0)
84+
self.assertEqual((yield consumer.payload), 0x00)
85+
86+
# ... until our data is accepted.
87+
yield consumer.ready.eq(1)
88+
yield
89+
sent.append((yield consumer.payload))
90+
91+
# Now we can chack that the transmitter has the first byte ...
92+
self.assertEqual((yield producer.ready), 1)
93+
self.assertEqual((yield consumer.payload), data[0])
94+
self.assertEqual((yield consumer.first), 1)
95+
96+
# ... before sending the rest of the packet.
97+
clocks = 0
98+
for byte in data[1:]:
99+
clocks += 1
100+
yield producer.payload.eq(byte)
101+
yield
102+
sent.append((yield consumer.payload))
103+
self.assertEqual((yield consumer.payload), byte)
104+
105+
# Finally, we can check that we have received the correct
106+
# amount of data and that this was the last byte.
107+
self.assertEqual(sent, data)
108+
self.assertEqual((yield consumer.last), 1)
109+
self.assertEqual(clocks, len(data) - 1)
110+
111+
112+
class USBIsochronousStreamOutEndpointTest(LunaUSBGatewareTestCase):
113+
FRAGMENT_UNDER_TEST = USBIsochronousStreamOutEndpoint
114+
FRAGMENT_ARGUMENTS = {'endpoint_number': 1, 'max_packet_size': MAX_PACKET_SIZE}
115+
116+
def initialize_signals(self):
117+
# Pretend that our host is always targeting our endpoint.
118+
yield self.dut.interface.tokenizer.endpoint.eq(self.dut._endpoint_number)
119+
yield self.dut.interface.tokenizer.is_out.eq(1)
120+
121+
122+
@usb_domain_test_case
123+
def test_single_packet_out(self):
124+
dut = self.dut
125+
126+
producer = dut.interface.rx
127+
consumer = dut.stream
128+
data = [b % 0xff for b in range(1, MAX_PACKET_SIZE + 1)]
129+
received = []
130+
131+
# Before we see any data, our streams should all be invalid.
132+
self.assertEqual((yield consumer.p.data), 0)
133+
self.assertEqual((yield consumer.p.first), 0)
134+
self.assertEqual((yield consumer.p.last), 0)
135+
self.assertEqual((yield consumer.ready), 0)
136+
self.assertEqual((yield consumer.valid), 0)
137+
self.assertEqual((yield producer.next), 0)
138+
self.assertEqual((yield producer.payload), 0)
139+
self.assertEqual((yield producer.valid), 0)
140+
141+
# Once the producer sends the first byte ...
142+
yield producer.valid.eq(1)
143+
yield producer.next.eq(1)
144+
yield producer.payload.eq(data[0])
145+
yield
146+
147+
# ... and only the first byte ...
148+
yield producer.next.eq(0)
149+
yield
150+
151+
# ... we shouldn't see anything in the consumer stream ...
152+
self.assertEqual((yield consumer.p.first), 0)
153+
self.assertEqual((yield consumer.p.last), 0)
154+
self.assertEqual((yield consumer.p.data), 0)
155+
156+
# ... but even if we were to mark the consumer's stream as ready ...
157+
yield consumer.ready.eq(1)
158+
yield
159+
160+
# ... the consumer stream will still not be valid because
161+
# we're using a TransactionalizedFIFO that will only commit
162+
# once the entire packet has been received.
163+
self.assertEqual((yield consumer.valid), 0)
164+
self.assertEqual((yield consumer.p.first), 0)
165+
self.assertEqual((yield consumer.p.last), 0)
166+
self.assertEqual((yield consumer.p.data), 0)
167+
168+
# So let's send the rest of the packet ...
169+
yield producer.next.eq(1)
170+
clocks = 0
171+
for byte in data[1:]:
172+
clocks += 1
173+
yield producer.payload.eq(byte)
174+
yield
175+
176+
# ... which should have taken len(data) - 1 cycles because we already sent the first byte.
177+
self.assertEqual(clocks, len(data) - 1)
178+
self.assertEqual((yield producer.payload), data[-1])
179+
180+
# By now the consumer stream would also have picked up the first byte ...
181+
self.assertEqual((yield consumer.valid), 0)
182+
self.assertEqual((yield consumer.p.first), 1)
183+
self.assertEqual((yield consumer.p.data), data[0])
184+
185+
# ... but the stream still won't advance ...
186+
yield
187+
self.assertEqual((yield consumer.valid), 0)
188+
self.assertEqual((yield consumer.p.first), 1)
189+
self.assertEqual((yield consumer.p.data), data[0])
190+
191+
# ... until we finally mark the packet as complete and invalidate the producer stream.
192+
yield dut.interface.rx_complete.eq(1)
193+
yield producer.valid.eq(0)
194+
yield producer.next.eq(0)
195+
yield
196+
197+
# After three clock cycles delay our stream goes finally valid ...
198+
yield
199+
yield
200+
yield
201+
self.assertEqual((yield consumer.valid), 1)
202+
203+
# ... and we can now receive the packet.
204+
clocks = 0
205+
while (yield consumer.valid) and (yield consumer.p.last) == 0:
206+
clocks += 1
207+
received.append((yield consumer.p.data))
208+
yield
209+
210+
self.assertEqual(received, data)
211+
self.assertEqual((yield consumer.p.last), 1)
212+
self.assertEqual((yield consumer.p.data), data[-1])
213+
self.assertEqual(clocks, len(data))
214+
215+
# Finally, let's invalidate the consumer ...
216+
yield consumer.ready.eq(0)
217+
yield
218+
219+
# ... and everything should be over.
220+
self.assertEqual((yield producer.valid), 0)
221+
self.assertEqual((yield producer.next), 0)
222+
self.assertEqual((yield consumer.ready), 0)
223+
self.assertEqual((yield consumer.valid), 0)
224+
self.assertEqual((yield consumer.p.first), 0)
225+
self.assertEqual((yield consumer.p.last), 0)
226+
self.assertEqual((yield consumer.p.data), 0)

0 commit comments

Comments
 (0)