Skip to content

Add stream support for isochronous endpoints. #285

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions luna/gateware/stream/future.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# This file is part of LUNA.
#
# Copyright (c) 2025 Great Scott Gadgets <[email protected]>
# SPDX-License-Identifier: BSD-3-Clause

""" Core stream definitions for supporting native Amaranth 0.5 streams. """

from amaranth import *
from amaranth.lib import data

class Packet(data.StructLayout):
def __init__(self, data_layout, first=True, last=True):
layout = (first and { "first": unsigned(1) } or {}) \
| (last and { "last": unsigned(1) } or {})
super().__init__(layout | {
"data": data_layout
})
220 changes: 220 additions & 0 deletions luna/gateware/usb/usb2/endpoints/isochronous_stream_in.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are USBIsochronousStreamInEndpointand USBIsochronousStreamOutEndpoint implemented in different files? This is a bit inconsistent with the rest of the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd dearly love to split up the rest of the code as well, having everything in single files makes it really hard to both find and work on things.

I'm a firm believer in the "one big thing per file" principle unless it's a bunch of small things that are closely related like type definitions.

Doing a PR that does the same for the other files in luna/gateware/usb/usb2/endpoints/ should have minimal downstream impact as these are all re-exported via the top-level luna.usb2 module path, e.g.

from luna.usb2 import USBDevice, USBIsochronousInEndpoint

That said, I am open to changing this for now if we want to leave this discussion to a later time!

Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
#
# This file is part of LUNA.
#
# Copyright (c) 2020-2025 Great Scott Gadgets <[email protected]>
# SPDX-License-Identifier: BSD--3-Clause
""" Endpoint interfaces for isochronous endpoints.

These interfaces provide interfaces for connecting streams or stream-like
interfaces to hosts via isochronous pipes.
"""

from amaranth import *
from amaranth.lib import stream

from ..endpoint import EndpointInterface


class USBIsochronousStreamInEndpoint(Elaboratable):
""" Isochronous endpoint that presents a stream-like interface.

Used for repeatedly streaming data to a host from a stream-like interface.
Intended to be useful as a transport for e.g. video or audio data.

Attributes
----------
stream: amaranth.lib.stream.Interface, input stream
Full-featured stream interface that carries the data we'll transmit to the host.

interface: EndpointInterface
Communications link to our USB core.

data_requested: Signal(), output
Strobes, when a new packet starts

frame_finished: Signal(), output
Strobes immediately after the last byte in a frame has been transmitted

bytes_in_frame: Signal(range(0, 3073)), input
Specifies how many bytes will be transferred during this frame. If this is 0,
a single ZLP will be emitted; for any other value one, two, or three packets
will be generated, depending on the packet size. Latched in at the start of
each frame.

The maximum allowed value for this signal depends on the number of transfers
per (micro)frame:
- If this is a high-speed, high-throughput endpoint (descriptor indicates
maxPacketSize > 512 and multiple transfers per microframe), then this value
maxes out at (N * maxPacketSize), where N is the number of transfers per microframe.
- For all other configurations, this must be <= the maximum packet size.

Parameters
----------
endpoint_number: int
The endpoint number (not address) this endpoint should respond to.
max_packet_size: int
The maximum packet size for this endpoint. Should match the wMaxPacketSize provided in the
USB endpoint descriptor.
"""

_MAX_FRAME_DATA = 1024 * 3

def __init__(self, *, endpoint_number, max_packet_size):

self._endpoint_number = endpoint_number
self._max_packet_size = max_packet_size

#
# I/O Port
#
self.interface = EndpointInterface()
self.stream = stream.Interface(stream.Signature(unsigned(8)))
self.data_requested = Signal()
self.frame_finished = Signal()

self.bytes_in_frame = Signal(range(0, self._MAX_FRAME_DATA + 1))

def elaborate(self, platform):
m = Module()

# Shortcuts.
interface = self.interface
tx_stream = interface.tx
new_frame = interface.tokenizer.new_frame

targeting_ep_num = (interface.tokenizer.endpoint == self._endpoint_number)
targeting_us = targeting_ep_num & interface.tokenizer.is_in
data_requested = targeting_us & interface.tokenizer.ready_for_response

# Track our transmission state.
bytes_left_in_frame = Signal.like(self.bytes_in_frame)
bytes_left_in_packet = Signal(range(0, self._max_packet_size + 1), init=self._max_packet_size - 1)
next_data_pid = Signal(2)
tx_cnt = Signal(range(0, self._MAX_FRAME_DATA))
next_byte = Signal.like(tx_cnt)

m.d.comb += [
tx_stream.payload .eq(0),
interface.tx_pid_toggle .eq(next_data_pid)
]

# Reset our state at the start of each frame.
with m.If(new_frame):

m.d.usb += [
# Latch in how many bytes we'll be transmitting this frame.
bytes_left_in_frame.eq(self.bytes_in_frame),

# And start with a full packet to transmit.
bytes_left_in_packet.eq(self._max_packet_size),
]

# If it'll take more than two packets to send our data, start off with DATA2.
# We'll follow with DATA1 and DATA0.
with m.If(self.bytes_in_frame > (2 * self._max_packet_size)):
m.d.usb += next_data_pid.eq(2)

# Otherwise, if we need two, start with DATA1.
with m.Elif(self.bytes_in_frame > self._max_packet_size):
m.d.usb += next_data_pid.eq(1)

# Otherwise, we'll start (and end) with DATA0.
with m.Else():
m.d.usb += next_data_pid.eq(0)

#
# Core sequencing FSM.
#
with m.FSM(domain="usb"):
m.d.usb += self.frame_finished.eq(0)

# IDLE -- the host hasn't yet requested data from our endpoint.
with m.State("IDLE"):
m.d.comb += next_byte.eq(0)
m.d.usb += [
tx_cnt.eq(0),
tx_stream.first.eq(0),
]

# Once the host requests a packet from us...
with m.If(data_requested):
# If we have data to send, send it.
with m.If(bytes_left_in_frame):
m.d.usb += tx_stream.first.eq(1)
m.next = "SEND_DATA"

# Otherwise, we'll send a ZLP.
with m.Else():
m.next = "SEND_ZLP"

# Strobe when a new packet starts.
m.d.comb += self.data_requested.eq(1)


# SEND_DATA -- our primary data-transmission state; handles packet transmission
with m.State("SEND_DATA"):
last_byte_in_packet = (bytes_left_in_packet <= 1)
last_byte_in_frame = (bytes_left_in_frame <= 1)
byte_terminates_send = last_byte_in_packet | last_byte_in_frame

m.d.comb += [
# Our data is always valid in this state...
tx_stream.valid .eq(1),
# ... and we're terminating if we're on the last byte of the packet or frame.
tx_stream.last .eq(byte_terminates_send),
]

# Strobe frame_finished one cycle after we're on the last byte of the frame.
m.d.usb += self.frame_finished.eq(last_byte_in_frame)

# Producer has data available.
with m.If(self.stream.valid):
m.d.comb += tx_stream.payload.eq(self.stream.payload)

# Don't advance ...
m.d.comb += [
next_byte.eq(tx_cnt),
self.stream.ready.eq(0),
]
m.d.usb += tx_cnt.eq(next_byte)

# ... until our data is accepted.
with m.If(tx_stream.ready):
m.d.usb += tx_stream.first.eq(0)

# Advance to the next byte in the frame ...
m.d.comb += [
self.stream.ready.eq(1),
next_byte.eq(tx_cnt + 1)
]

# ... and mark the relevant byte as sent.
m.d.usb += [
bytes_left_in_frame .eq(bytes_left_in_frame - 1),
bytes_left_in_packet .eq(bytes_left_in_packet - 1),
]

# If we've just completed transmitting a packet, or we've
# just transmitted a full frame, end our transmission.
with m.If(byte_terminates_send):
m.d.usb += [
# Move to the next DATA pid, which is always one DATA PID less.
# [USB2.0: 5.9.2]. We'll reset this back to its maximum value when
# the next frame starts.
next_data_pid .eq(next_data_pid - 1),

# Mark our next packet as being a full one.
bytes_left_in_packet .eq(self._max_packet_size),
]
m.next = "IDLE"

# SEND_ZLP -- sends a zero-length packet, and then return to idle.
with m.State("SEND_ZLP"):
# We'll request a ZLP by strobing LAST and VALID without strobing FIRST.
m.d.comb += [
tx_stream.valid .eq(1),
tx_stream.last .eq(1),
]
m.next = "IDLE"

return m
157 changes: 157 additions & 0 deletions luna/gateware/usb/usb2/endpoints/isochronous_stream_out.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#
# This file is part of LUNA.
#
# Copyright (c) 2020-2025 Great Scott Gadgets <[email protected]>
# SPDX-License-Identifier: BSD-3-Clause

""" Endpoint interfaces for isochronous endpoints.

These interfaces provide interfaces for connecting streams or stream-like
interfaces to hosts via isochronous pipes.
"""

from amaranth import *
from amaranth.lib import stream, wiring
from amaranth.lib .wiring import In, Out

from ..endpoint import EndpointInterface
from ...stream import USBOutStreamBoundaryDetector
from ....stream.future import Packet
from ....memory import TransactionalizedFIFO


class USBIsochronousStreamOutEndpoint(Elaboratable):
""" Endpoint interface that receives isochronous data from the host, and produces a simple data stream.

Used for repeatedly streaming data from a host to a stream or stream-like interface.
Intended to be useful as a transport for e.g. video or audio data.


Attributes
----------
stream: StreamInterface, output stream
Full-featured stream interface that carries the data we've received from the host.
interface: EndpointInterface
Communications link to our USB device.

Parameters
----------
endpoint_number: int
The endpoint number (not address) this endpoint should respond to.
max_packet_size: int, optional
The maximum packet size for this endpoint. If there isn't `max_packet_size` space in
the endpoint buffer, additional data will be silently dropped.
buffer_size: int, optional
The total amount of data we'll keep in the buffer; typically two (TODO three?) max-packet-sizes or more.
Defaults to twice (TODO three?) times the maximum packet size.
"""

def __init__(self, *, endpoint_number, max_packet_size, buffer_size=None):
self._endpoint_number = endpoint_number
self._max_packet_size = max_packet_size
# TODO self._buffer_size = buffer_size if (buffer_size is not None) else (self._max_packet_size * 3)
self._buffer_size = buffer_size if (buffer_size is not None) else (self._max_packet_size * 2)

#
# I/O port
#
self.stream = stream.Interface(
stream.Signature(
Packet(unsigned(8))
)
)
self.interface = EndpointInterface()

def elaborate(self, platform):
m = Module()

stream = self.stream
interface = self.interface
tokenizer = interface.tokenizer

#
# Internal state.
#

# Stores whether we've had a receive overflow.
overflow = Signal()

# Stores a count of received bytes in the current packet.
rx_cnt = Signal(range(self._max_packet_size))

#
# Receiver logic.
#

# Create a version of our receive stream that has added `first` and `last` signals, which we'll use
# internally as our main stream.
m.submodules.boundary_detector = boundary_detector = USBOutStreamBoundaryDetector()
m.d.comb += [
interface.rx .stream_eq(boundary_detector.unprocessed_stream),
boundary_detector.complete_in .eq(interface.rx_complete),
boundary_detector.invalid_in .eq(interface.rx_invalid),
]

rx = boundary_detector.processed_stream
rx_first = boundary_detector.first
rx_last = boundary_detector.last

# Create a Rx FIFO.
m.submodules.fifo = fifo = TransactionalizedFIFO(width=10, depth=self._buffer_size, name="rx_fifo", domain="usb")

#
# Create some basic conditionals that will help us make decisions.
#

endpoint_number_matches = (tokenizer.endpoint == self._endpoint_number)
targeting_endpoint = endpoint_number_matches & tokenizer.is_out

sufficient_space = (fifo.space_available >= self._max_packet_size)

okay_to_receive = targeting_endpoint & sufficient_space
data_is_lost = okay_to_receive & rx.next & rx.valid & fifo.full

full_packet = rx_cnt == self._max_packet_size - 1

m.d.comb += [

# We'll always populate our FIFO directly from the receive stream; but we'll also include our
# "short packet detected" signal, as this indicates that we're detecting the last byte of a transfer.
fifo.write_data[0:8] .eq(rx.payload),
fifo.write_data[8] .eq(rx_last),
fifo.write_data[9] .eq(rx_first),
fifo.write_en .eq(okay_to_receive & rx.next & rx.valid),

# We'll keep data if our packet finishes with a valid CRC; and discard it otherwise.
fifo.write_commit .eq(targeting_endpoint & boundary_detector.complete_out),
fifo.write_discard .eq(targeting_endpoint & boundary_detector.invalid_out),

# Our stream data always comes directly out of the FIFO; and is valid
# whenever our FIFO actually has data for us to read.
stream.valid .eq(~fifo.empty),
stream.p.data .eq(fifo.read_data[0:8]),

# Our `last` bit comes directly from the FIFO; and we know a `first` bit immediately
# follows a `last` one.
stream.p.last .eq(fifo.read_data[8]),
stream.p.first .eq(fifo.read_data[9]),

# Move to the next byte in the FIFO whenever our stream is advanced.
fifo.read_en .eq(stream.ready),
fifo.read_commit .eq(1)
]

# Count bytes in packet.
with m.If(fifo.write_en):
m.d.usb += rx_cnt.eq(rx_cnt + 1)

# We'll set the overflow flag if we're receiving data we don't have room for.
with m.If(data_is_lost):
m.d.usb += overflow.eq(1)

# We'll clear the overflow flag and byte counter when the packet is done.
with m.Elif(fifo.write_commit | fifo.write_discard):
m.d.usb += overflow.eq(0)
m.d.usb += rx_cnt.eq(0)

return m
Loading