From 4a53f357379c2dc407617a3d39e6da4790dec9aa Mon Sep 17 00:00:00 2001 From: reivilibre Date: Fri, 11 Mar 2022 14:00:15 +0000 Subject: Improve code documentation for the typing stream over replication. (#12211) --- changelog.d/12211.misc | 1 + synapse/handlers/typing.py | 5 +++-- synapse/replication/tcp/handler.py | 2 +- synapse/replication/tcp/resource.py | 6 +++--- synapse/replication/tcp/streams/_base.py | 12 ++++++++++++ 5 files changed, 20 insertions(+), 6 deletions(-) create mode 100644 changelog.d/12211.misc diff --git a/changelog.d/12211.misc b/changelog.d/12211.misc new file mode 100644 index 0000000000..d11634a1ee --- /dev/null +++ b/changelog.d/12211.misc @@ -0,0 +1 @@ +Improve code documentation for the typing stream over replication. \ No newline at end of file diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 3b89126528..6854428b7c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -160,8 +160,9 @@ class FollowerTypingHandler: """Should be called whenever we receive updates for typing stream.""" if self._latest_room_serial > token: - # The master has gone backwards. To prevent inconsistent data, just - # clear everything. + # The typing worker has gone backwards (e.g. it may have restarted). + # To prevent inconsistent data, just clear everything. + logger.info("Typing handler stream went backwards; resetting") self._reset() # Set the latest serial token to whatever the server gave us. diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index d51f045f22..b217c35f99 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -709,7 +709,7 @@ class ReplicationCommandHandler: self.send_command(RemoteServerUpCommand(server)) def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None: - """Called when a new update is available to stream to clients. + """Called when a new update is available to stream to Redis subscribers. We need to check if the client is interested in the stream or not """ diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index ab829040cd..c6870df8f9 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -67,8 +67,8 @@ class ReplicationStreamProtocolFactory(ServerFactory): class ReplicationStreamer: """Handles replication connections. - This needs to be poked when new replication data may be available. When new - data is available it will propagate to all connected clients. + This needs to be poked when new replication data may be available. + When new data is available it will propagate to all Redis subscribers. """ def __init__(self, hs: "HomeServer"): @@ -109,7 +109,7 @@ class ReplicationStreamer: def on_notifier_poke(self) -> None: """Checks if there is actually any new data and sends it to the - connections if there are. + Redis subscribers if there are. This should get called each time new data is available, even if it is currently being executed, so that nothing gets missed diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 23d631a769..495f2f0285 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -316,7 +316,19 @@ class PresenceFederationStream(Stream): class TypingStream(Stream): @attr.s(slots=True, frozen=True, auto_attribs=True) class TypingStreamRow: + """ + An entry in the typing stream. + Describes all the users that are 'typing' right now in one room. + + When a user stops typing, it will be streamed as a new update with that + user absent; you can think of the `user_ids` list as overwriting the + entire list that was there previously. + """ + + # The room that this update is for. room_id: str + + # All the users that are 'typing' right now in the specified room. user_ids: List[str] NAME = "typing" -- cgit 1.4.1