Skip to content

feat(BA-903): Implement Kafka Message Queue #3922

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

racheliee
Copy link
Contributor

resolves #3887 (BA-903)

About

  • Implements kafka to support the production or dispatching of event messages.
  • Introduces an abstract message queue class that allows the use of redis or kafka as the message queue.
  • Creates Kafka and Zookeeper services to docker-compose

Checklist: (if applicable)

  • Milestone metadata specifying the target backport version
  • Mention to the original issue
  • Installer updates including:
    • Fixtures for db schema changes
    • New mandatory config options
  • Update of end-to-end CLI integration tests in ai.backend.test
  • API server-client counterparts (e.g., manager API -> client SDK)
  • Test case(s) to:
    • Demonstrate the difference of before/after
    • Demonstrate the flow of abstract/conceptual models with a concrete implementation
  • Documentation
    • Contents in the docs directory
    • docstrings in public interfaces and type annotations

@github-actions github-actions bot added size:XL 500~ LoC comp:common Related to Common component labels Mar 7, 2025
raise ValueError(f"Unsupported serialization format: {format}")

@classmethod
def deserialize(cls, data: bytes, format: str = "msgpack") -> Self:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using typing.Literal here for better type safety and readability. See the official documentation for more details: https://docs.python.org/3/library/typing.html#typing.Literal.

Suggested change
def deserialize(cls, data: bytes, format: str = "msgpack") -> Self:
def deserialize(cls, data: bytes, format: Literal["msgpack", "json"] = "msgpack") -> Self:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ty for the feedback!

Comment on lines 1332 to 1338
async with aclosing(
redis_helper.read_stream_by_group(
self.redis_client,
await self.message_queue.receive_group(
self._stream_key,
self._consumer_group,
self._consumer_name,
)
) as agen:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if aclosing was also handled inside the message queue.

@@ -132,7 +140,7 @@ def scb(context: object, source: AgentId, event: DummyEvent) -> None:
assert "ZeroDivisionError" in exception_log
assert "OverflowError" in exception_log

await redis_helper.execute(producer.redis_client, lambda r: r.flushdb())
await redis_helper.execute(producer.message_queue.connection_info, lambda r: r.flushdb())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, avoid logic that passes internal fields to external.

T = TypeVar("T", bound=BaseConnectionInfo)

class AbstractMessageQueue(ABC, Generic[T]):
connection_info: T
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should connection_info be a common field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connection_info is accessed as a field, should i remove it from being a common field and have a get function to return it instead?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is that each sub class have its own connection_info, and it doesn't seem necessary to have it as a common field in ABC.

task_id: Final[uuid.UUID]
total_progress: Union[int, float]
current_progress: Union[int, float]

def __init__(
self,
# redis_config: RedisConfig,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redis_connection is not set up in constructor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
comp:common Related to Common component size:XL 500~ LoC
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Replace redis_client with AbstractMessageQueue in EventDispatcher, EventProducer.
3 participants