Skip to content

Common: Add thread-safe CircularQueue template and make Mixer use it. #13635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

jordan-woyak
Copy link
Member

@jordan-woyak jordan-woyak commented May 6, 2025

I really don't like looking at the manual circular buffer atomic head/tail math in Mixer.

I've created a CircularQueue class template with some unit tests.

There's an atomic-using thread-safe SPSCCircularQueue version and a WaitableSPSCCircularQueue version that can block for data or room for pushing data.

I plan to also use this for Wii Speak Microphone after the merge of #12769 which currently has a mutex around a manual circular buffer.

I have slightly altered the logic of fill audio gaps (off by one stuff).
It seems somewhat cleaner this way, but hopefully @Sam-Belliveau can tell me if that should be changed back.

@jordan-woyak jordan-woyak force-pushed the circular-queue branch 3 times, most recently from ba5723d to dd84380 Compare May 6, 2025 19:33
//
// Capacity() gives size of the underlying buffer.
//
// Push'ing Size() beyond Capacity() is unchecked and legal.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only true if the queue is used by a single thread; with separate threads if Size() > Capacity() then you can have a data race.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I will make that more clear in the comment.

Comment on lines +14 to +16
// Push/Pop increases/decrease the Size()
//
// Capacity() gives size of the underlying buffer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Push/Pop increases/decrease the Size()
//
// Capacity() gives size of the underlying buffer.
// Push/Pop increases/decreases the Size()
//
// Capacity() gives the size of the underlying buffer.

Comment on lines +39 to +40
static_assert(std::is_trivially_copyable_v<ElementType>,
"Really only sane for trivially copyable types.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What goes wrong if it's not trivially copyable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is the Size() > Capacity() race that you've noticed. Restricting the types to trivially copyable ones makes the race "less terrible". I suppose I could just state that Size exceeding Capacity isn't strictly legal for the SPSC versions. Users should "know what they are doing".

{
}

// "Push" functions are only safe from the "producer" thread.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// "Push" functions are only safe from the "producer" thread.
// "Write" and "Push" functions are only safe from the "producer" thread.

AdjustSize(count, order);
}

// "Pop" functions are only safe from the "consumer" thread.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// "Pop" functions are only safe from the "consumer" thread.
// "Read" and "Pop" functions are only safe from the "consumer" thread.

void AdvanceReadHead(std::size_t count)
{
m_read_index = (m_read_index + count) % Capacity();
AdjustSize(0 - count, std::memory_order_relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 - count underflows here, which is cancelled out by m_size += count (or fetch_add) overflowing in AdjustSize. The math works, but I feel like it'd be cleaner to have a bit of code duplication with separate IncreaseSize and DecreaseSize functions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dunno, that's 10 lines of duplicate code just for being afraid of some unsigned integer underflow. :D

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsigned underflow is well defined, so I'd say take advantage of it.

Comment on lines +146 to +148
// Used by the "consumer" to move the read position backwards.
// Also increases the size by the given amount.
void RewindReadHead(std::size_t count)
Copy link
Contributor

@Dentomologist Dentomologist May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the consumer thread uses this at the same time as the producer thread uses AdvanceWriteHead that could cause Size to exceed Capacity even if both threads check if there's enough space beforehand, potentially causing a data race.

I'd suggest disabling this when UseAtomicSize is true to maintain the invariant of Only the producer thread can increase the Size, and only the consumer thread can decrease it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I would also rather this function didn't exist at all for all the reasons you mention.

Unfortunately "Mixer.cpp" is already using this racey technique, and I've begrudgingly used it there. The changes are in this PR: e04e98f

Maybe I can add a comment about the consequences of RewindReadHead and to not use it and that it is pending removal? I don't know. Fixing Mixer to not do this doesn't look straightforward.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants