diff --git a/changelog.d/7959.bugfix b/changelog.d/7959.bugfix
new file mode 100644
index 0000000000..1982049a52
--- /dev/null
+++ b/changelog.d/7959.bugfix
@@ -0,0 +1 @@
+Add experimental support for moving typing off master.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index c1b76d827b..ec0dbddb8c 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -87,7 +87,6 @@ from synapse.replication.tcp.streams import (
ReceiptsStream,
TagAccountDataStream,
ToDeviceStream,
- TypingStream,
)
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events
@@ -644,7 +643,6 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
super(GenericWorkerReplicationHandler, self).__init__(hs)
self.store = hs.get_datastore()
- self.typing_handler = hs.get_typing_handler()
self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence
self.notifier = hs.get_notifier()
@@ -681,11 +679,6 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
await self.pusher_pool.on_new_receipts(
token, token, {row.room_id for row in rows}
)
- elif stream_name == TypingStream.NAME:
- self.typing_handler.process_replication_rows(token, rows)
- self.notifier.on_new_event(
- "typing_key", token, rooms=[row.room_id for row in rows]
- )
elif stream_name == ToDeviceStream.NAME:
entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities:
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 4985e40b1f..fcf8ebf1e7 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -24,6 +24,7 @@ from twisted.internet.protocol import ReconnectingClientFactory
from synapse.api.constants import EventTypes
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
+from synapse.replication.tcp.streams import TypingStream
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamEventRow,
@@ -104,6 +105,7 @@ class ReplicationDataHandler:
self._clock = hs.get_clock()
self._streams = hs.get_replication_streams()
self._instance_name = hs.get_instance_name()
+ self._typing_handler = hs.get_typing_handler()
# Map from stream to list of deferreds waiting for the stream to
# arrive at a particular position. The lists are sorted by stream position.
@@ -127,6 +129,12 @@ class ReplicationDataHandler:
"""
self.store.process_replication_rows(stream_name, instance_name, token, rows)
+ if stream_name == TypingStream.NAME:
+ self._typing_handler.process_replication_rows(token, rows)
+ self.notifier.on_new_event(
+ "typing_key", token, rooms=[row.room_id for row in rows]
+ )
+
if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 90a673778f..1aba408c21 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -31,6 +31,7 @@ import synapse.server_notices.server_notices_sender
import synapse.state
import synapse.storage
from synapse.events.builder import EventBuilderFactory
+from synapse.handlers.typing import FollowerTypingHandler
from synapse.replication.tcp.streams import Stream
class HomeServer(object):
@@ -150,3 +151,5 @@ class HomeServer(object):
pass
def should_send_federation(self) -> bool:
pass
+ def get_typing_handler(self) -> FollowerTypingHandler:
+ pass
|