summary refs log tree commit diff
path: root/synapse/storage/data_stores/main/roommember.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/data_stores/main/roommember.py')
-rw-r--r--synapse/storage/data_stores/main/roommember.py71
1 files changed, 39 insertions, 32 deletions
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index fe2428a281..929f6b0d39 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -26,8 +26,11 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage._base import LoggingTransaction, make_in_list_sql_clause
-from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.storage._base import (
+    LoggingTransaction,
+    SQLBaseStore,
+    make_in_list_sql_clause,
+)
 from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
 from synapse.storage.engines import Sqlite3Engine
 from synapse.storage.roommember import (
@@ -116,7 +119,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             txn.execute(query)
             return list(txn)[0][0]
 
-        count = yield self.runInteraction("get_known_servers", _transact)
+        count = yield self.db.runInteraction("get_known_servers", _transact)
 
         # We always know about ourselves, even if we have nothing in
         # room_memberships (for example, the server is new).
@@ -128,7 +131,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         membership column is up to date
         """
 
-        pending_update = self.simple_select_one_txn(
+        pending_update = self.db.simple_select_one_txn(
             txn,
             table="background_updates",
             keyvalues={"update_name": _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME},
@@ -144,7 +147,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
                 15.0,
                 run_as_background_process,
                 "_check_safe_current_state_events_membership_updated",
-                self.runInteraction,
+                self.db.runInteraction,
                 "_check_safe_current_state_events_membership_updated",
                 self._check_safe_current_state_events_membership_updated_txn,
             )
@@ -161,7 +164,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
     @cached(max_entries=100000, iterable=True)
     def get_users_in_room(self, room_id):
-        return self.runInteraction(
+        return self.db.runInteraction(
             "get_users_in_room", self.get_users_in_room_txn, room_id
         )
 
@@ -269,7 +272,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
             return res
 
-        return self.runInteraction("get_room_summary", _get_room_summary_txn)
+        return self.db.runInteraction("get_room_summary", _get_room_summary_txn)
 
     def _get_user_counts_in_room_txn(self, txn, room_id):
         """
@@ -339,7 +342,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         if not membership_list:
             return defer.succeed(None)
 
-        rooms = yield self.runInteraction(
+        rooms = yield self.db.runInteraction(
             "get_rooms_for_user_where_membership_is",
             self._get_rooms_for_user_where_membership_is_txn,
             user_id,
@@ -392,7 +395,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
                 )
 
             txn.execute(sql, (user_id, *args))
-            results = [RoomsForUser(**r) for r in self.cursor_to_dict(txn)]
+            results = [RoomsForUser(**r) for r in self.db.cursor_to_dict(txn)]
 
         if do_invite:
             sql = (
@@ -412,7 +415,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
                     stream_ordering=r["stream_ordering"],
                     membership=Membership.INVITE,
                 )
-                for r in self.cursor_to_dict(txn)
+                for r in self.db.cursor_to_dict(txn)
             )
 
         return results
@@ -603,7 +606,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             to `user_id` and ProfileInfo (or None if not join event).
         """
 
-        rows = yield self.simple_select_many_batch(
+        rows = yield self.db.simple_select_many_batch(
             table="room_memberships",
             column="event_id",
             iterable=event_ids,
@@ -643,7 +646,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         # the returned user actually has the correct domain.
         like_clause = "%:" + host
 
-        rows = yield self.execute("is_host_joined", None, sql, room_id, like_clause)
+        rows = yield self.db.execute("is_host_joined", None, sql, room_id, like_clause)
 
         if not rows:
             return False
@@ -683,7 +686,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         # the returned user actually has the correct domain.
         like_clause = "%:" + host
 
-        rows = yield self.execute("was_host_joined", None, sql, room_id, like_clause)
+        rows = yield self.db.execute("was_host_joined", None, sql, room_id, like_clause)
 
         if not rows:
             return False
@@ -753,7 +756,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             rows = txn.fetchall()
             return rows[0][0]
 
-        count = yield self.runInteraction("did_forget_membership", f)
+        count = yield self.db.runInteraction("did_forget_membership", f)
         return count == 0
 
     @cached()
@@ -790,7 +793,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             txn.execute(sql, (user_id,))
             return set(row[0] for row in txn if row[1] == 0)
 
-        return self.runInteraction(
+        return self.db.runInteraction(
             "get_forgotten_rooms_for_user", _get_forgotten_rooms_for_user_txn
         )
 
@@ -805,7 +808,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             Deferred[set[str]]: Set of room IDs.
         """
 
-        room_ids = yield self.simple_select_onecol(
+        room_ids = yield self.db.simple_select_onecol(
             table="room_memberships",
             keyvalues={"membership": Membership.JOIN, "user_id": user_id},
             retcol="room_id",
@@ -820,7 +823,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         """Get user_id and membership of a set of event IDs.
         """
 
-        return self.simple_select_many_batch(
+        return self.db.simple_select_many_batch(
             table="room_memberships",
             column="event_id",
             iterable=member_event_ids,
@@ -831,17 +834,17 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         )
 
 
-class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
+class RoomMemberBackgroundUpdateStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
         super(RoomMemberBackgroundUpdateStore, self).__init__(db_conn, hs)
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
         )
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME,
             self._background_current_state_membership,
         )
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             "room_membership_forgotten_idx",
             index_name="room_memberships_user_room_forgotten",
             table="room_memberships",
@@ -874,7 +877,7 @@ class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
 
             txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
 
-            rows = self.cursor_to_dict(txn)
+            rows = self.db.cursor_to_dict(txn)
             if not rows:
                 return 0
 
@@ -909,18 +912,20 @@ class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
                 "max_stream_id_exclusive": min_stream_id,
             }
 
-            self._background_update_progress_txn(
+            self.db.updates._background_update_progress_txn(
                 txn, _MEMBERSHIP_PROFILE_UPDATE_NAME, progress
             )
 
             return len(rows)
 
-        result = yield self.runInteraction(
+        result = yield self.db.runInteraction(
             _MEMBERSHIP_PROFILE_UPDATE_NAME, add_membership_profile_txn
         )
 
         if not result:
-            yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME)
+            yield self.db.updates._end_background_update(
+                _MEMBERSHIP_PROFILE_UPDATE_NAME
+            )
 
         return result
 
@@ -959,7 +964,7 @@ class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
 
                 last_processed_room = next_room
 
-            self._background_update_progress_txn(
+            self.db.updates._background_update_progress_txn(
                 txn,
                 _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME,
                 {"last_processed_room": last_processed_room},
@@ -971,14 +976,16 @@ class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
         # string, which will compare before all room IDs correctly.
         last_processed_room = progress.get("last_processed_room", "")
 
-        row_count, finished = yield self.runInteraction(
+        row_count, finished = yield self.db.runInteraction(
             "_background_current_state_membership_update",
             _background_current_state_membership_txn,
             last_processed_room,
         )
 
         if finished:
-            yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME)
+            yield self.db.updates._end_background_update(
+                _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME
+            )
 
         return row_count
 
@@ -990,7 +997,7 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
     def _store_room_members_txn(self, txn, events, backfilled):
         """Store a room member in the database.
         """
-        self.simple_insert_many_txn(
+        self.db.simple_insert_many_txn(
             txn,
             table="room_memberships",
             values=[
@@ -1028,7 +1035,7 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
             is_mine = self.hs.is_mine_id(event.state_key)
             if is_new_state and is_mine:
                 if event.membership == Membership.INVITE:
-                    self.simple_insert_txn(
+                    self.db.simple_insert_txn(
                         txn,
                         table="local_invites",
                         values={
@@ -1068,7 +1075,7 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
             txn.execute(sql, (stream_ordering, True, room_id, user_id))
 
         with self._stream_id_gen.get_next() as stream_ordering:
-            yield self.runInteraction("locally_reject_invite", f, stream_ordering)
+            yield self.db.runInteraction("locally_reject_invite", f, stream_ordering)
 
     def forget(self, user_id, room_id):
         """Indicate that user_id wishes to discard history for room_id."""
@@ -1091,7 +1098,7 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
                 txn, self.get_forgotten_rooms_for_user, (user_id,)
             )
 
-        return self.runInteraction("forget_membership", f)
+        return self.db.runInteraction("forget_membership", f)
 
 
 class _JoinedHostsCache(object):