summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-07-27 14:10:53 +0100
committerGitHub <noreply@github.com>2020-07-27 14:10:53 +0100
commit84d099ae1192af0f38d26f9a32e38bd4c0ad304e (patch)
tree525ead94bf5b196a855948896964b0242f6cecfa /synapse
parentRemove hacky error handling for inlineDeferreds. (#7950) (diff)
downloadsynapse-84d099ae1192af0f38d26f9a32e38bd4c0ad304e.tar.xz
Fix typing replication not being handled on master (#7959)
Handling of incoming typing stream updates from replication was not
hooked up on master, effecting set ups where typing was handled on a
different worker.

This is really only a problem if the master process is also handling
sync requests, which is unlikely for those that are at the stage of
moving typing off.

The other observable effect is that if a worker restarts or a
replication connect drops then the typing worker will issue a
`POSITION typing`, triggering master process to try and stream *all*
typing updates from position 0.

Fixes #7907
Diffstat (limited to '')
-rw-r--r--synapse/app/generic_worker.py7
-rw-r--r--synapse/replication/tcp/client.py8
-rw-r--r--synapse/server.pyi3
3 files changed, 11 insertions, 7 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index c1b76d827b..ec0dbddb8c 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -87,7 +87,6 @@ from synapse.replication.tcp.streams import (
     ReceiptsStream,
     TagAccountDataStream,
     ToDeviceStream,
-    TypingStream,
 )
 from synapse.rest.admin import register_servlets_for_media_repo
 from synapse.rest.client.v1 import events
@@ -644,7 +643,6 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
         super(GenericWorkerReplicationHandler, self).__init__(hs)
 
         self.store = hs.get_datastore()
-        self.typing_handler = hs.get_typing_handler()
         self.presence_handler = hs.get_presence_handler()  # type: GenericWorkerPresence
         self.notifier = hs.get_notifier()
 
@@ -681,11 +679,6 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
                 await self.pusher_pool.on_new_receipts(
                     token, token, {row.room_id for row in rows}
                 )
-            elif stream_name == TypingStream.NAME:
-                self.typing_handler.process_replication_rows(token, rows)
-                self.notifier.on_new_event(
-                    "typing_key", token, rooms=[row.room_id for row in rows]
-                )
             elif stream_name == ToDeviceStream.NAME:
                 entities = [row.entity for row in rows if row.entity.startswith("@")]
                 if entities:
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 4985e40b1f..fcf8ebf1e7 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -24,6 +24,7 @@ from twisted.internet.protocol import ReconnectingClientFactory
 from synapse.api.constants import EventTypes
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
 from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
+from synapse.replication.tcp.streams import TypingStream
 from synapse.replication.tcp.streams.events import (
     EventsStream,
     EventsStreamEventRow,
@@ -104,6 +105,7 @@ class ReplicationDataHandler:
         self._clock = hs.get_clock()
         self._streams = hs.get_replication_streams()
         self._instance_name = hs.get_instance_name()
+        self._typing_handler = hs.get_typing_handler()
 
         # Map from stream to list of deferreds waiting for the stream to
         # arrive at a particular position. The lists are sorted by stream position.
@@ -127,6 +129,12 @@ class ReplicationDataHandler:
         """
         self.store.process_replication_rows(stream_name, instance_name, token, rows)
 
+        if stream_name == TypingStream.NAME:
+            self._typing_handler.process_replication_rows(token, rows)
+            self.notifier.on_new_event(
+                "typing_key", token, rooms=[row.room_id for row in rows]
+            )
+
         if stream_name == EventsStream.NAME:
             # We shouldn't get multiple rows per token for events stream, so
             # we don't need to optimise this for multiple rows.
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 90a673778f..1aba408c21 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -31,6 +31,7 @@ import synapse.server_notices.server_notices_sender
 import synapse.state
 import synapse.storage
 from synapse.events.builder import EventBuilderFactory
+from synapse.handlers.typing import FollowerTypingHandler
 from synapse.replication.tcp.streams import Stream
 
 class HomeServer(object):
@@ -150,3 +151,5 @@ class HomeServer(object):
         pass
     def should_send_federation(self) -> bool:
         pass
+    def get_typing_handler(self) -> FollowerTypingHandler:
+        pass