summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-22 16:11:35 +0100
committerGitHub <noreply@github.com>2020-05-22 16:11:35 +0100
commite5c67d04dbe5ed45d659e826a5dfcd5044a4e374 (patch)
tree0ee1f865349d9fb3a6b215001f3c7ac3b7c0552b /synapse/handlers
parentReturn 200 OK for all OPTIONS requests (#7534) (diff)
downloadsynapse-e5c67d04dbe5ed45d659e826a5dfcd5044a4e374.tar.xz
Add option to move event persistence off master (#7517)
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/federation.py16
-rw-r--r--synapse/handlers/message.py12
-rw-r--r--synapse/handlers/presence.py6
-rw-r--r--synapse/handlers/room.py7
-rw-r--r--synapse/handlers/room_member.py39
5 files changed, 61 insertions, 19 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index e354c803db..75ec90d267 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -126,11 +126,10 @@ class FederationHandler(BaseHandler):
         self._server_notices_mxid = hs.config.server_notices_mxid
         self.config = hs.config
         self.http_client = hs.get_simple_http_client()
+        self._instance_name = hs.get_instance_name()
         self._replication = hs.get_replication_data_handler()
 
-        self._send_events_to_master = ReplicationFederationSendEventsRestServlet.make_client(
-            hs
-        )
+        self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
         self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client(
             hs
         )
@@ -1243,6 +1242,10 @@ class FederationHandler(BaseHandler):
 
             content: The event content to use for the join event.
         """
+        # TODO: We should be able to call this on workers, but the upgrading of
+        # room stuff after join currently doesn't work on workers.
+        assert self.config.worker.worker_app is None
+
         logger.debug("Joining %s to %s", joinee, room_id)
 
         origin, event, room_version_obj = await self._make_and_verify_event(
@@ -1314,7 +1317,7 @@ class FederationHandler(BaseHandler):
             #
             # TODO: Currently the events stream is written to from master
             await self._replication.wait_for_stream_position(
-                "master", "events", max_stream_id
+                self.config.worker.writers.events, "events", max_stream_id
             )
 
             # Check whether this room is the result of an upgrade of a room we already know
@@ -2854,8 +2857,9 @@ class FederationHandler(BaseHandler):
             backfilled: Whether these events are a result of
                 backfilling or not
         """
-        if self.config.worker_app:
-            result = await self._send_events_to_master(
+        if self.config.worker.writers.events != self._instance_name:
+            result = await self._send_events(
+                instance_name=self.config.worker.writers.events,
                 store=self.store,
                 event_and_contexts=event_and_contexts,
                 backfilled=backfilled,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index f445e2aa2a..ea25f0515a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -366,10 +366,11 @@ class EventCreationHandler(object):
         self.notifier = hs.get_notifier()
         self.config = hs.config
         self.require_membership_for_aliases = hs.config.require_membership_for_aliases
+        self._instance_name = hs.get_instance_name()
 
         self.room_invite_state_types = self.hs.config.room_invite_state_types
 
-        self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
+        self.send_event = ReplicationSendEventRestServlet.make_client(hs)
 
         # This is only used to get at ratelimit function, and maybe_kick_guest_users
         self.base_handler = BaseHandler(hs)
@@ -835,8 +836,9 @@ class EventCreationHandler(object):
         success = False
         try:
             # If we're a worker we need to hit out to the master.
-            if self.config.worker_app:
-                result = await self.send_event_to_master(
+            if self.config.worker.writers.events != self._instance_name:
+                result = await self.send_event(
+                    instance_name=self.config.worker.writers.events,
                     event_id=event.event_id,
                     store=self.store,
                     requester=requester,
@@ -902,9 +904,9 @@ class EventCreationHandler(object):
         """Called when we have fully built the event, have already
         calculated the push actions for the event, and checked auth.
 
-        This should only be run on master.
+        This should only be run on the instance in charge of persisting events.
         """
-        assert not self.config.worker_app
+        assert self.config.worker.writers.events == self._instance_name
 
         if ratelimit:
             # We check if this is a room admin redacting an event so that we
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 9ea11c0754..3594f3b00f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -193,6 +193,12 @@ class BasePresenceHandler(abc.ABC):
     ) -> None:
         """Set the presence state of the user. """
 
+    @abc.abstractmethod
+    async def bump_presence_active_time(self, user: UserID):
+        """We've seen the user do something that indicates they're interacting
+        with the app.
+        """
+
 
 class PresenceHandler(BasePresenceHandler):
     def __init__(self, hs: "synapse.server.HomeServer"):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 2698a129ca..61db3ccc43 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -89,6 +89,8 @@ class RoomCreationHandler(BaseHandler):
         self.room_member_handler = hs.get_room_member_handler()
         self.config = hs.config
 
+        self._replication = hs.get_replication_data_handler()
+
         # linearizer to stop two upgrades happening at once
         self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
 
@@ -752,6 +754,11 @@ class RoomCreationHandler(BaseHandler):
         if room_alias:
             result["room_alias"] = room_alias.to_string()
 
+        # Always wait for room creation to progate before returning
+        await self._replication.wait_for_stream_position(
+            self.hs.config.worker.writers.events, "events", last_stream_id
+        )
+
         return result, last_stream_id
 
     async def _send_events_for_new_room(
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 691b6705b2..0f7af982f0 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -26,6 +26,9 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
+from synapse.replication.http.membership import (
+    ReplicationLocallyRejectInviteRestServlet,
+)
 from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID
 from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room, user_left_room
@@ -44,11 +47,6 @@ class RoomMemberHandler(object):
     __metaclass__ = abc.ABCMeta
 
     def __init__(self, hs):
-        """
-
-        Args:
-            hs (synapse.server.HomeServer):
-        """
         self.hs = hs
         self.store = hs.get_datastore()
         self.auth = hs.get_auth()
@@ -71,6 +69,17 @@ class RoomMemberHandler(object):
         self._enable_lookup = hs.config.enable_3pid_lookup
         self.allow_per_room_profiles = self.config.allow_per_room_profiles
 
+        self._event_stream_writer_instance = hs.config.worker.writers.events
+        self._is_on_event_persistence_instance = (
+            self._event_stream_writer_instance == hs.get_instance_name()
+        )
+        if self._is_on_event_persistence_instance:
+            self.persist_event_storage = hs.get_storage().persistence
+        else:
+            self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client(
+                hs
+            )
+
         # This is only used to get at ratelimit function, and
         # maybe_kick_guest_users. It's fine there are multiple of these as
         # it doesn't store state.
@@ -121,6 +130,22 @@ class RoomMemberHandler(object):
         """
         raise NotImplementedError()
 
+    async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
+        """Mark the invite has having been rejected even though we failed to
+        create a leave event for it.
+        """
+        if self._is_on_event_persistence_instance:
+            return await self.persist_event_storage.locally_reject_invite(
+                user_id, room_id
+            )
+        else:
+            result = await self._locally_reject_client(
+                instance_name=self._event_stream_writer_instance,
+                user_id=user_id,
+                room_id=room_id,
+            )
+            return result["stream_id"]
+
     @abc.abstractmethod
     async def _user_joined_room(self, target: UserID, room_id: str) -> None:
         """Notifies distributor on master process that the user has joined the
@@ -1015,9 +1040,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
             #
             logger.warning("Failed to reject invite: %s", e)
 
-            stream_id = await self.store.locally_reject_invite(
-                target.to_string(), room_id
-            )
+            stream_id = await self.locally_reject_invite(target.to_string(), room_id)
             return None, stream_id
 
     async def _user_joined_room(self, target: UserID, room_id: str) -> None: