summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-22 16:11:35 +0100
committerGitHub <noreply@github.com>2020-05-22 16:11:35 +0100
commite5c67d04dbe5ed45d659e826a5dfcd5044a4e374 (patch)
tree0ee1f865349d9fb3a6b215001f3c7ac3b7c0552b /synapse/storage
parentReturn 200 OK for all OPTIONS requests (#7534) (diff)
downloadsynapse-e5c67d04dbe5ed45d659e826a5dfcd5044a4e374.tar.xz
Add option to move event persistence off master (#7517)
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/data_stores/__init__.py6
-rw-r--r--synapse/storage/data_stores/main/devices.py30
-rw-r--r--synapse/storage/data_stores/main/events.py34
-rw-r--r--synapse/storage/data_stores/main/events_worker.py2
-rw-r--r--synapse/storage/data_stores/main/roommember.py25
-rw-r--r--synapse/storage/persist_events.py6
6 files changed, 60 insertions, 43 deletions
diff --git a/synapse/storage/data_stores/__init__.py b/synapse/storage/data_stores/__init__.py
index 791961b296..599ee470d4 100644
--- a/synapse/storage/data_stores/__init__.py
+++ b/synapse/storage/data_stores/__init__.py
@@ -66,9 +66,9 @@ class DataStores(object):
 
                     self.main = main_store_class(database, db_conn, hs)
 
-                    # If we're on a process that can persist events (currently
-                    # master), also instantiate a `PersistEventsStore`
-                    if hs.config.worker.worker_app is None:
+                    # If we're on a process that can persist events also
+                    # instantiate a `PersistEventsStore`
+                    if hs.config.worker.writers.events == hs.get_instance_name():
                         self.persist_events = PersistEventsStore(
                             hs, database, self.main
                         )
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index 0e8378714a..417ac8dc7c 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -689,6 +689,25 @@ class DeviceWorkerStore(SQLBaseStore):
             desc="make_remote_user_device_cache_as_stale",
         )
 
+    def mark_remote_user_device_list_as_unsubscribed(self, user_id):
+        """Mark that we no longer track device lists for remote user.
+        """
+
+        def _mark_remote_user_device_list_as_unsubscribed_txn(txn):
+            self.db.simple_delete_txn(
+                txn,
+                table="device_lists_remote_extremeties",
+                keyvalues={"user_id": user_id},
+            )
+            self._invalidate_cache_and_stream(
+                txn, self.get_device_list_last_stream_id_for_remote, (user_id,)
+            )
+
+        return self.db.runInteraction(
+            "mark_remote_user_device_list_as_unsubscribed",
+            _mark_remote_user_device_list_as_unsubscribed_txn,
+        )
+
 
 class DeviceBackgroundUpdateStore(SQLBaseStore):
     def __init__(self, database: Database, db_conn, hs):
@@ -969,17 +988,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             desc="update_device",
         )
 
-    @defer.inlineCallbacks
-    def mark_remote_user_device_list_as_unsubscribed(self, user_id):
-        """Mark that we no longer track device lists for remote user.
-        """
-        yield self.db.simple_delete(
-            table="device_lists_remote_extremeties",
-            keyvalues={"user_id": user_id},
-            desc="mark_remote_user_device_list_as_unsubscribed",
-        )
-        self.get_device_list_last_stream_id_for_remote.invalidate((user_id,))
-
     def update_remote_device_list_cache_entry(
         self, user_id, device_id, content, stream_id
     ):
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index a97f8b3934..a6572571b4 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -138,10 +138,10 @@ class PersistEventsStore:
         self._backfill_id_gen = self.store._backfill_id_gen  # type: StreamIdGenerator
         self._stream_id_gen = self.store._stream_id_gen  # type: StreamIdGenerator
 
-        # This should only exist on master for now
+        # This should only exist on instances that are configured to write
         assert (
-            hs.config.worker.worker_app is None
-        ), "Can only instantiate PersistEventsStore on master"
+            hs.config.worker.writers.events == hs.get_instance_name()
+        ), "Can only instantiate EventsStore on master"
 
     @_retry_on_integrity_error
     @defer.inlineCallbacks
@@ -1590,3 +1590,31 @@ class PersistEventsStore:
                 if not ev.internal_metadata.is_outlier()
             ],
         )
+
+    async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
+        """Mark the invite has having been rejected even though we failed to
+        create a leave event for it.
+        """
+
+        sql = (
+            "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
+            " room_id = ? AND invitee = ? AND locally_rejected is NULL"
+            " AND replaced_by is NULL"
+        )
+
+        def f(txn, stream_ordering):
+            txn.execute(sql, (stream_ordering, True, room_id, user_id))
+
+            # We also clear this entry from `local_current_membership`.
+            # Ideally we'd point to a leave event, but we don't have one, so
+            # nevermind.
+            self.db.simple_delete_txn(
+                txn,
+                table="local_current_membership",
+                keyvalues={"room_id": room_id, "user_id": user_id},
+            )
+
+        with self._stream_id_gen.get_next() as stream_ordering:
+            await self.db.runInteraction("locally_reject_invite", f, stream_ordering)
+
+        return stream_ordering
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index b880a71782..213d69100a 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -76,7 +76,7 @@ class EventsWorkerStore(SQLBaseStore):
     def __init__(self, database: Database, db_conn, hs):
         super(EventsWorkerStore, self).__init__(database, db_conn, hs)
 
-        if hs.config.worker_app is None:
+        if hs.config.worker.writers.events == hs.get_instance_name():
             # We are the process in charge of generating stream ids for events,
             # so instantiate ID generators based on the database
             self._stream_id_gen = StreamIdGenerator(
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index 7c5ca81ae0..137ebac833 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -1046,31 +1046,6 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
     def __init__(self, database: Database, db_conn, hs):
         super(RoomMemberStore, self).__init__(database, db_conn, hs)
 
-    @defer.inlineCallbacks
-    def locally_reject_invite(self, user_id, room_id):
-        sql = (
-            "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
-            " room_id = ? AND invitee = ? AND locally_rejected is NULL"
-            " AND replaced_by is NULL"
-        )
-
-        def f(txn, stream_ordering):
-            txn.execute(sql, (stream_ordering, True, room_id, user_id))
-
-            # We also clear this entry from `local_current_membership`.
-            # Ideally we'd point to a leave event, but we don't have one, so
-            # nevermind.
-            self.db.simple_delete_txn(
-                txn,
-                table="local_current_membership",
-                keyvalues={"room_id": room_id, "user_id": user_id},
-            )
-
-        with self._stream_id_gen.get_next() as stream_ordering:
-            yield self.db.runInteraction("locally_reject_invite", f, stream_ordering)
-
-        return stream_ordering
-
     def forget(self, user_id, room_id):
         """Indicate that user_id wishes to discard history for room_id."""
 
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 12e1ffb9a2..f159400a87 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -786,3 +786,9 @@ class EventsPersistenceStorage(object):
 
         for user_id in left_users:
             await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id)
+
+    async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
+        """Mark the invite has having been rejected even though we failed to
+        create a leave event for it.
+        """
+        return await self.persist_events_store.locally_reject_invite(user_id, room_id)