summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-24 15:51:41 +0000
committerErik Johnston <erik@matrix.org>2020-03-24 15:54:38 +0000
commit7eec84bfbec26ffcd3835ecb2af0ac2d5b352fd6 (patch)
tree2b040ea3e63db328567babc939520e72d600aba3
parentMake ReplicationStreamer work on workers (diff)
downloadsynapse-7eec84bfbec26ffcd3835ecb2af0ac2d5b352fd6.tar.xz
Shuffle around code typing handlers
-rw-r--r--synapse/app/generic_worker.py63
-rw-r--r--synapse/config/server.py2
-rw-r--r--synapse/handlers/typing.py47
-rw-r--r--synapse/replication/tcp/handler.py71
-rw-r--r--synapse/replication/tcp/resource.py8
-rw-r--r--synapse/server.py11
6 files changed, 101 insertions, 101 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index b975fb3e1c..9eba101d0a 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -66,7 +66,7 @@ from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientFactory
 from synapse.replication.tcp.commands import ClearUserSyncsCommand
-from synapse.replication.tcp.handler import WorkerReplicationDataHandler
+from synapse.replication.tcp.handler import ReplicationDataHandler
 from synapse.replication.tcp.streams import (
     AccountDataStream,
     DeviceListsStream,
@@ -77,7 +77,6 @@ from synapse.replication.tcp.streams import (
     ReceiptsStream,
     TagAccountDataStream,
     ToDeviceStream,
-    TypingStream,
 )
 from synapse.replication.tcp.streams.events import (
     EventsStream,
@@ -381,43 +380,6 @@ class GenericWorkerPresence(object):
             return set()
 
 
-class GenericWorkerTyping(object):
-    def __init__(self, hs):
-        self._latest_room_serial = 0
-        self._reset()
-
-    def _reset(self):
-        """
-        Reset the typing handler's data caches.
-        """
-        # map room IDs to serial numbers
-        self._room_serials = {}
-        # map room IDs to sets of users currently typing
-        self._room_typing = {}
-
-    def stream_positions(self):
-        # We must update this typing token from the response of the previous
-        # sync. In particular, the stream id may "reset" back to zero/a low
-        # value which we *must* use for the next replication request.
-        return {"typing": self._latest_room_serial}
-
-    def process_replication_rows(self, token, rows):
-        if self._latest_room_serial > token:
-            # The master has gone backwards. To prevent inconsistent data, just
-            # clear everything.
-            self._reset()
-
-        # Set the latest serial token to whatever the server gave us.
-        self._latest_room_serial = token
-
-        for row in rows:
-            self._room_serials[row.room_id] = token
-            self._room_typing[row.room_id] = row.user_ids
-
-    def get_current_token(self) -> int:
-        return self._latest_room_serial
-
-
 class GenericWorkerSlavedStore(
     # FIXME(#3714): We need to add UserDirectoryStore as we write directly
     # rather than going via the correct worker.
@@ -619,17 +581,15 @@ class GenericWorkerServer(HomeServer):
     def build_presence_handler(self):
         return GenericWorkerPresence(self)
 
-    def build_typing_handler(self):
-        return GenericWorkerTyping(self)
-
     def build_replication_data_handler(self):
         return GenericWorkerReplicationHandler(self)
 
 
-class GenericWorkerReplicationHandler(WorkerReplicationDataHandler):
+class GenericWorkerReplicationHandler(ReplicationDataHandler):
     def __init__(self, hs):
+        super().__init__(hs)
+
         self.store = hs.get_datastore()
-        self.typing_handler = hs.get_typing_handler()
         # NB this is a SynchrotronPresence, not a normal PresenceHandler
         self.presence_handler = hs.get_presence_handler()
         self.notifier = hs.get_notifier()
@@ -643,14 +603,12 @@ class GenericWorkerReplicationHandler(WorkerReplicationDataHandler):
             self.send_handler = None
 
     async def on_rdata(self, stream_name, token, rows):
-        await super(GenericWorkerReplicationHandler, self).on_rdata(
-            stream_name, token, rows
-        )
+        await super().on_rdata(stream_name, token, rows)
         run_in_background(self.process_and_notify, stream_name, token, rows)
 
     def get_streams_to_replicate(self):
-        args = super(GenericWorkerReplicationHandler, self).get_streams_to_replicate()
-        args.update(self.typing_handler.stream_positions())
+        args = super().get_streams_to_replicate()
+
         if self.send_handler:
             args.update(self.send_handler.stream_positions())
         return args
@@ -698,11 +656,6 @@ class GenericWorkerReplicationHandler(WorkerReplicationDataHandler):
                 await self.pusher_pool.on_new_receipts(
                     token, token, {row.room_id for row in rows}
                 )
-            elif stream_name == TypingStream.NAME:
-                self.typing_handler.process_replication_rows(token, rows)
-                self.notifier.on_new_event(
-                    "typing_key", token, rooms=[row.room_id for row in rows]
-                )
             elif stream_name == ToDeviceStream.NAME:
                 entities = [row.entity for row in rows if row.entity.startswith("@")]
                 if entities:
@@ -938,6 +891,8 @@ def start(config_options):
         # Force the pushers to start since they will be disabled in the main config
         config.send_federation = True
 
+    config.server.handle_typing = False
+
     synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     ss = GenericWorkerServer(
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 7525765fee..0f69297cc1 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -83,6 +83,8 @@ class ServerConfig(Config):
         # "disable" federation
         self.send_federation = config.get("send_federation", True)
 
+        self.handle_typing = config.get("handle_typing", True)
+
         # Whether to enable user presence.
         self.use_presence = config.get("use_presence", True)
 
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c7bc14c623..a6edea412b 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -21,6 +21,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import AuthError, SynapseError
 from synapse.logging.context import run_in_background
+from synapse.replication.tcp.streams import TypingStream
 from synapse.types import UserID, get_domain_from_id
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.metrics import Measure
@@ -288,6 +289,52 @@ class TypingHandler(object):
         return self._latest_room_serial
 
 
+class TypingSlaveHandler(object):
+    def __init__(self, hs):
+        self.notifier = hs.get_notifier()
+
+        self._latest_room_serial = 0
+        self._reset()
+
+    def _reset(self):
+        """
+        Reset the typing handler's data caches.
+        """
+        # map room IDs to serial numbers
+        self._room_serials = {}
+        # map room IDs to sets of users currently typing
+        self._room_typing = {}
+
+    def stream_positions(self):
+        # We must update this typing token from the response of the previous
+        # sync. In particular, the stream id may "reset" back to zero/a low
+        # value which we *must* use for the next replication request.
+        return {"typing": self._latest_room_serial}
+
+    def process_replication_rows(self, stream_name, token, rows):
+        if stream_name != TypingStream.NAME:
+            return
+
+        if self._latest_room_serial > token:
+            # The master has gone backwards. To prevent inconsistent data, just
+            # clear everything.
+            self._reset()
+
+        # Set the latest serial token to whatever the server gave us.
+        self._latest_room_serial = token
+
+        for row in rows:
+            self._room_serials[row.room_id] = token
+            self._room_typing[row.room_id] = row.user_ids
+
+        self.notifier.on_new_event(
+            "typing_key", token, rooms=[row.room_id for row in rows]
+        )
+
+    def get_current_token(self) -> int:
+        return self._latest_room_serial
+
+
 class TypingNotificationEventSource(object):
     def __init__(self, hs):
         self.hs = hs
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index b620b29dfb..c197f6c26d 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -329,10 +329,17 @@ class ReplicationClientHandler:
         self.send_command(RdataCommand(stream_name, token, data))
 
 
-class DummyReplicationDataHandler:
+class ReplicationDataHandler:
     """A replication data handler that simply discards all data.
     """
 
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.typing_handler = hs.get_typing_handler()
+
+        self.slaved_store = hs.config.worker_app is not None
+        self.slaved_typing = not hs.config.server.handle_typing
+
     async def on_rdata(self, stream_name: str, token: int, rows: list):
         """Called to handle a batch of replication data with a given stream token.
 
@@ -345,7 +352,11 @@ class DummyReplicationDataHandler:
             rows (list): a list of Stream.ROW_TYPE objects as returned by
                 Stream.parse_row.
         """
-        pass
+        if self.slaved_store:
+            self.store.process_replication_rows(stream_name, token, rows)
+
+        if self.slaved_typing:
+            self.typing_handler.process_replication_rows(stream_name, token, rows)
 
     def get_streams_to_replicate(self) -> Dict[str, int]:
         """Called when a new connection has been established and we need to
@@ -355,49 +366,25 @@ class DummyReplicationDataHandler:
             map from stream name to the most recent update we have for
             that stream (ie, the point we want to start replicating from)
         """
-        return {}
-
-    async def on_position(self, stream_name: str, token: int):
-        pass
-
+        args = {}  # type: Dict[str, int]
 
-class WorkerReplicationDataHandler:
-    """A replication data handler that calls slave data stores.
-    """
-
-    def __init__(self, store):
-        self.store = store
-
-    async def on_rdata(self, stream_name: str, token: int, rows: list):
-        """Called to handle a batch of replication data with a given stream token.
+        if self.slaved_store:
+            args = self.store.stream_positions()
+            user_account_data = args.pop("user_account_data", None)
+            room_account_data = args.pop("room_account_data", None)
+            if user_account_data:
+                args["account_data"] = user_account_data
+            elif room_account_data:
+                args["account_data"] = room_account_data
 
-        By default this just pokes the slave store. Can be overridden in subclasses to
-        handle more.
+        if self.slaved_typing:
+            args.update(self.typing_handler.stream_positions())
 
-        Args:
-            stream_name (str): name of the replication stream for this batch of rows
-            token (int): stream token for this batch of rows
-            rows (list): a list of Stream.ROW_TYPE objects as returned by
-                Stream.parse_row.
-        """
-        self.store.process_replication_rows(stream_name, token, rows)
-
-    def get_streams_to_replicate(self) -> Dict[str, int]:
-        """Called when a new connection has been established and we need to
-        subscribe to streams.
-
-        Returns:
-            map from stream name to the most recent update we have for
-            that stream (ie, the point we want to start replicating from)
-        """
-        args = self.store.stream_positions()
-        user_account_data = args.pop("user_account_data", None)
-        room_account_data = args.pop("room_account_data", None)
-        if user_account_data:
-            args["account_data"] = user_account_data
-        elif room_account_data:
-            args["account_data"] = room_account_data
         return args
 
     async def on_position(self, stream_name: str, token: int):
-        self.store.process_replication_rows(stream_name, token, [])
+        if self.slaved_store:
+            self.store.process_replication_rows(stream_name, token, [])
+
+        if self.slaved_typing:
+            self.typing_handler.process_replication_rows(stream_name, token, [])
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index b2a1785c08..c9d671210b 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -25,7 +25,7 @@ from twisted.internet.protocol import Factory
 
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
-from synapse.replication.tcp.streams import STREAMS_MAP, Stream
+from synapse.replication.tcp.streams import STREAMS_MAP, Stream, TypingStream
 from synapse.replication.tcp.streams.federation import FederationStream
 from synapse.util.metrics import Measure
 
@@ -79,8 +79,14 @@ class ReplicationStreamer(object):
                     # hase been disabled on the master.
                     continue
 
+                if stream == TypingStream:
+                    continue
+
                 self.streams.append(stream(hs))
 
+        if hs.config.server.handle_typing:
+            self.streams.append(TypingStream(hs))
+
         self.streams_by_name = {stream.NAME: stream for stream in self.streams}
 
         self.notifier.add_replication_callback(self.on_notifier_poke)
diff --git a/synapse/server.py b/synapse/server.py
index 5f5d79161c..8955c32cc4 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -78,7 +78,7 @@ from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
 from synapse.handlers.set_password import SetPasswordHandler
 from synapse.handlers.stats import StatsHandler
 from synapse.handlers.sync import SyncHandler
-from synapse.handlers.typing import TypingHandler
+from synapse.handlers.typing import TypingHandler, TypingSlaveHandler
 from synapse.handlers.user_directory import UserDirectoryHandler
 from synapse.http.client import InsecureInterceptableContextFactory, SimpleHttpClient
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
@@ -86,8 +86,8 @@ from synapse.notifier import Notifier
 from synapse.push.action_generator import ActionGenerator
 from synapse.push.pusherpool import PusherPool
 from synapse.replication.tcp.handler import (
-    DummyReplicationDataHandler,
     ReplicationClientHandler,
+    ReplicationDataHandler,
 )
 from synapse.replication.tcp.resource import ReplicationStreamer
 from synapse.rest.media.v1.media_repository import (
@@ -354,7 +354,10 @@ class HomeServer(object):
         return PresenceHandler(self)
 
     def build_typing_handler(self):
-        return TypingHandler(self)
+        if self.config.handle_typing:
+            return TypingHandler(self)
+        else:
+            return TypingSlaveHandler(self)
 
     def build_sync_handler(self):
         return SyncHandler(self)
@@ -555,7 +558,7 @@ class HomeServer(object):
         return ReplicationStreamer(self)
 
     def build_replication_data_handler(self):
-        return DummyReplicationDataHandler()
+        return ReplicationDataHandler(self)
 
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)