diff --git a/synapse/storage/data_stores/__init__.py b/synapse/storage/data_stores/__init__.py
index 791961b296..599ee470d4 100644
--- a/synapse/storage/data_stores/__init__.py
+++ b/synapse/storage/data_stores/__init__.py
@@ -66,9 +66,9 @@ class DataStores(object):
self.main = main_store_class(database, db_conn, hs)
- # If we're on a process that can persist events (currently
- # master), also instantiate a `PersistEventsStore`
- if hs.config.worker.worker_app is None:
+ # If we're on a process that can persist events also
+ # instantiate a `PersistEventsStore`
+ if hs.config.worker.writers.events == hs.get_instance_name():
self.persist_events = PersistEventsStore(
hs, database, self.main
)
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index 0e8378714a..417ac8dc7c 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -689,6 +689,25 @@ class DeviceWorkerStore(SQLBaseStore):
desc="make_remote_user_device_cache_as_stale",
)
+ def mark_remote_user_device_list_as_unsubscribed(self, user_id):
+ """Mark that we no longer track device lists for remote user.
+ """
+
+ def _mark_remote_user_device_list_as_unsubscribed_txn(txn):
+ self.db.simple_delete_txn(
+ txn,
+ table="device_lists_remote_extremeties",
+ keyvalues={"user_id": user_id},
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_device_list_last_stream_id_for_remote, (user_id,)
+ )
+
+ return self.db.runInteraction(
+ "mark_remote_user_device_list_as_unsubscribed",
+ _mark_remote_user_device_list_as_unsubscribed_txn,
+ )
+
class DeviceBackgroundUpdateStore(SQLBaseStore):
def __init__(self, database: Database, db_conn, hs):
@@ -969,17 +988,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
desc="update_device",
)
- @defer.inlineCallbacks
- def mark_remote_user_device_list_as_unsubscribed(self, user_id):
- """Mark that we no longer track device lists for remote user.
- """
- yield self.db.simple_delete(
- table="device_lists_remote_extremeties",
- keyvalues={"user_id": user_id},
- desc="mark_remote_user_device_list_as_unsubscribed",
- )
- self.get_device_list_last_stream_id_for_remote.invalidate((user_id,))
-
def update_remote_device_list_cache_entry(
self, user_id, device_id, content, stream_id
):
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index a97f8b3934..a6572571b4 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -138,10 +138,10 @@ class PersistEventsStore:
self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator
self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator
- # This should only exist on master for now
+ # This should only exist on instances that are configured to write
assert (
- hs.config.worker.worker_app is None
- ), "Can only instantiate PersistEventsStore on master"
+ hs.config.worker.writers.events == hs.get_instance_name()
+ ), "Can only instantiate EventsStore on master"
@_retry_on_integrity_error
@defer.inlineCallbacks
@@ -1590,3 +1590,31 @@ class PersistEventsStore:
if not ev.internal_metadata.is_outlier()
],
)
+
+ async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
+ """Mark the invite has having been rejected even though we failed to
+ create a leave event for it.
+ """
+
+ sql = (
+ "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
+ " room_id = ? AND invitee = ? AND locally_rejected is NULL"
+ " AND replaced_by is NULL"
+ )
+
+ def f(txn, stream_ordering):
+ txn.execute(sql, (stream_ordering, True, room_id, user_id))
+
+ # We also clear this entry from `local_current_membership`.
+ # Ideally we'd point to a leave event, but we don't have one, so
+ # nevermind.
+ self.db.simple_delete_txn(
+ txn,
+ table="local_current_membership",
+ keyvalues={"room_id": room_id, "user_id": user_id},
+ )
+
+ with self._stream_id_gen.get_next() as stream_ordering:
+ await self.db.runInteraction("locally_reject_invite", f, stream_ordering)
+
+ return stream_ordering
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index b880a71782..213d69100a 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -76,7 +76,7 @@ class EventsWorkerStore(SQLBaseStore):
def __init__(self, database: Database, db_conn, hs):
super(EventsWorkerStore, self).__init__(database, db_conn, hs)
- if hs.config.worker_app is None:
+ if hs.config.worker.writers.events == hs.get_instance_name():
# We are the process in charge of generating stream ids for events,
# so instantiate ID generators based on the database
self._stream_id_gen = StreamIdGenerator(
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index 7c5ca81ae0..137ebac833 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -1046,31 +1046,6 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
def __init__(self, database: Database, db_conn, hs):
super(RoomMemberStore, self).__init__(database, db_conn, hs)
- @defer.inlineCallbacks
- def locally_reject_invite(self, user_id, room_id):
- sql = (
- "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
- " room_id = ? AND invitee = ? AND locally_rejected is NULL"
- " AND replaced_by is NULL"
- )
-
- def f(txn, stream_ordering):
- txn.execute(sql, (stream_ordering, True, room_id, user_id))
-
- # We also clear this entry from `local_current_membership`.
- # Ideally we'd point to a leave event, but we don't have one, so
- # nevermind.
- self.db.simple_delete_txn(
- txn,
- table="local_current_membership",
- keyvalues={"room_id": room_id, "user_id": user_id},
- )
-
- with self._stream_id_gen.get_next() as stream_ordering:
- yield self.db.runInteraction("locally_reject_invite", f, stream_ordering)
-
- return stream_ordering
-
def forget(self, user_id, room_id):
"""Indicate that user_id wishes to discard history for room_id."""
|