summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-07-19 16:30:33 +0100
committerGitHub <noreply@github.com>2019-07-19 16:30:33 +0100
commit5c07c97c0960ec1e8bb3a36a449ec94e14c3e65d (patch)
treed1390d0e188e567c9e0f8393c79bdce02ca9d0d0 /synapse
parentDon't accept opentracing data from clients. (#5715) (diff)
parentLoggingTransaction accepts None for callback lists. (diff)
downloadsynapse-5c07c97c0960ec1e8bb3a36a449ec94e14c3e65d.tar.xz
Merge pull request #5706 from matrix-org/erikj/add_memberships_to_current_state
Add membership column to current_state_events table
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/_base.py18
-rw-r--r--synapse/storage/event_push_actions.py2
-rw-r--r--synapse/storage/events.py26
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/roommember.py154
-rw-r--r--synapse/storage/schema/delta/56/current_state_events_membership.sql25
-rw-r--r--synapse/storage/user_directory.py8
7 files changed, 194 insertions, 41 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 2f940dbae6..a7c93efa46 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -86,7 +86,21 @@ _CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
 class LoggingTransaction(object):
     """An object that almost-transparently proxies for the 'txn' object
     passed to the constructor. Adds logging and metrics to the .execute()
-    method."""
+    method.
+
+    Args:
+        txn: The database transcation object to wrap.
+        name (str): The name of this transactions for logging.
+        database_engine (Sqlite3Engine|PostgresEngine)
+        after_callbacks(list|None): A list that callbacks will be appended to
+            that have been added by `call_after` which should be run on
+            successful completion of the transaction. None indicates that no
+            callbacks should be allowed to be scheduled to run.
+        exception_callbacks(list|None): A list that callbacks will be appended
+            to that have been added by `call_on_exception` which should be run
+            if transaction ends with an error. None indicates that no callbacks
+            should be allowed to be scheduled to run.
+    """
 
     __slots__ = [
         "txn",
@@ -97,7 +111,7 @@ class LoggingTransaction(object):
     ]
 
     def __init__(
-        self, txn, name, database_engine, after_callbacks, exception_callbacks
+        self, txn, name, database_engine, after_callbacks=None, exception_callbacks=None
     ):
         object.__setattr__(self, "txn", txn)
         object.__setattr__(self, "name", name)
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index eca77069fd..dcfb67e029 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -79,8 +79,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             db_conn.cursor(),
             name="_find_stream_orderings_for_times_txn",
             database_engine=self.database_engine,
-            after_callbacks=[],
-            exception_callbacks=[],
         )
         self._find_stream_orderings_for_times_txn(cur)
         cur.close()
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index b486ca50eb..b70457bfc6 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -918,8 +918,6 @@ class EventsStore(
         min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
         max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
 
-        self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
-
         self._update_forward_extremities_txn(
             txn,
             new_forward_extremities=new_forward_extremeties,
@@ -993,6 +991,10 @@ class EventsStore(
             backfilled=backfilled,
         )
 
+        # We call this last as it assumes we've inserted the events into
+        # room_memberships, where applicable.
+        self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
+
     def _update_current_state_txn(self, txn, state_delta_by_room, stream_id):
         for room_id, current_state_tuple in iteritems(state_delta_by_room):
             to_delete, to_insert = current_state_tuple
@@ -1062,16 +1064,16 @@ class EventsStore(
                 ),
             )
 
-            self._simple_insert_many_txn(
-                txn,
-                table="current_state_events",
-                values=[
-                    {
-                        "event_id": ev_id,
-                        "room_id": room_id,
-                        "type": key[0],
-                        "state_key": key[1],
-                    }
+            # We include the membership in the current state table, hence we do
+            # a lookup when we insert. This assumes that all events have already
+            # been inserted into room_memberships.
+            txn.executemany(
+                """INSERT INTO current_state_events
+                    (room_id, type, state_key, event_id, membership)
+                VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
+                """,
+                [
+                    (room_id, key[0], key[1], ev_id, ev_id)
                     for key, ev_id in iteritems(to_insert)
                 ],
             )
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 7c4e1dc7ec..d20eacda59 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -27,7 +27,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 55
+SCHEMA_VERSION = 56
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 32cfd010a5..257bcdb2f8 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,6 +24,8 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage._base import LoggingTransaction
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import get_domain_from_id
 from synapse.util.async_helpers import Linearizer
@@ -53,9 +55,51 @@ ProfileInfo = namedtuple("ProfileInfo", ("avatar_url", "display_name"))
 MemberSummary = namedtuple("MemberSummary", ("members", "count"))
 
 _MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
+_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership"
 
 
 class RoomMemberWorkerStore(EventsWorkerStore):
+    def __init__(self, db_conn, hs):
+        super(RoomMemberWorkerStore, self).__init__(db_conn, hs)
+
+        # Is the current_state_events.membership up to date? Or is the
+        # background update still running?
+        self._current_state_events_membership_up_to_date = False
+
+        txn = LoggingTransaction(
+            db_conn.cursor(),
+            name="_check_safe_current_state_events_membership_updated",
+            database_engine=self.database_engine,
+        )
+        self._check_safe_current_state_events_membership_updated_txn(txn)
+        txn.close()
+
+    def _check_safe_current_state_events_membership_updated_txn(self, txn):
+        """Checks if it is safe to assume the new current_state_events
+        membership column is up to date
+        """
+
+        pending_update = self._simple_select_one_txn(
+            txn,
+            table="background_updates",
+            keyvalues={"update_name": _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME},
+            retcols=["update_name"],
+            allow_none=True,
+        )
+
+        self._current_state_events_membership_up_to_date = not pending_update
+
+        # If the update is still running, reschedule to run.
+        if pending_update:
+            self._clock.call_later(
+                15.0,
+                run_as_background_process,
+                "_check_safe_current_state_events_membership_updated",
+                self.runInteraction,
+                "_check_safe_current_state_events_membership_updated",
+                self._check_safe_current_state_events_membership_updated_txn,
+            )
+
     @cachedInlineCallbacks(max_entries=100000, iterable=True, cache_context=True)
     def get_hosts_in_room(self, room_id, cache_context):
         """Returns the set of all hosts currently in the room
@@ -69,14 +113,23 @@ class RoomMemberWorkerStore(EventsWorkerStore):
     @cached(max_entries=100000, iterable=True)
     def get_users_in_room(self, room_id):
         def f(txn):
-            sql = (
-                "SELECT m.user_id FROM room_memberships as m"
-                " INNER JOIN current_state_events as c"
-                " ON m.event_id = c.event_id "
-                " AND m.room_id = c.room_id "
-                " AND m.user_id = c.state_key"
-                " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
-            )
+            # If we can assume current_state_events.membership is up to date
+            # then we can avoid a join, which is a Very Good Thing given how
+            # frequently this function gets called.
+            if self._current_state_events_membership_up_to_date:
+                sql = """
+                    SELECT state_key FROM current_state_events
+                    WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
+                """
+            else:
+                sql = """
+                    SELECT state_key FROM room_memberships as m
+                    INNER JOIN current_state_events as c
+                    ON m.event_id = c.event_id
+                    AND m.room_id = c.room_id
+                    AND m.user_id = c.state_key
+                    WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
+                """
 
             txn.execute(sql, (room_id, Membership.JOIN))
             return [to_ascii(r[0]) for r in txn]
@@ -98,15 +151,26 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             # first get counts.
             # We do this all in one transaction to keep the cache small.
             # FIXME: get rid of this when we have room_stats
-            sql = """
-                SELECT count(*), m.membership FROM room_memberships as m
-                 INNER JOIN current_state_events as c
-                 ON m.event_id = c.event_id
-                 AND m.room_id = c.room_id
-                 AND m.user_id = c.state_key
-                 WHERE c.type = 'm.room.member' AND c.room_id = ?
-                 GROUP BY m.membership
-            """
+
+            # If we can assume current_state_events.membership is up to date
+            # then we can avoid a join, which is a Very Good Thing given how
+            # frequently this function gets called.
+            if self._current_state_events_membership_up_to_date:
+                sql = """
+                    SELECT count(*), membership FROM current_state_events
+                    WHERE type = 'm.room.member' AND room_id = ?
+                    GROUP BY membership
+                """
+            else:
+                sql = """
+                    SELECT count(*), m.membership FROM room_memberships as m
+                    INNER JOIN current_state_events as c
+                    ON m.event_id = c.event_id
+                    AND m.room_id = c.room_id
+                    AND m.user_id = c.state_key
+                    WHERE c.type = 'm.room.member' AND c.room_id = ?
+                    GROUP BY m.membership
+                """
 
             txn.execute(sql, (room_id,))
             res = {}
@@ -224,7 +288,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         results = []
         if membership_list:
             where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
-                " OR ".join(["membership = ?" for _ in membership_list]),
+                " OR ".join(["m.membership = ?" for _ in membership_list]),
             )
 
             args = [user_id]
@@ -453,8 +517,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         sql = """
             SELECT state_key FROM current_state_events AS c
-            INNER JOIN room_memberships USING (event_id)
-            WHERE membership = 'join'
+            INNER JOIN room_memberships AS m USING (event_id)
+            WHERE m.membership = 'join'
                 AND type = 'm.room.member'
                 AND c.room_id = ?
                 AND state_key LIKE ?
@@ -602,6 +666,10 @@ class RoomMemberStore(RoomMemberWorkerStore):
         self.register_background_update_handler(
             _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
         )
+        self.register_background_update_handler(
+            _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME,
+            self._background_current_state_membership,
+        )
 
     def _store_room_members_txn(self, txn, events, backfilled):
         """Store a room member in the database.
@@ -781,6 +849,52 @@ class RoomMemberStore(RoomMemberWorkerStore):
 
         defer.returnValue(result)
 
+    @defer.inlineCallbacks
+    def _background_current_state_membership(self, progress, batch_size):
+        """Update the new membership column on current_state_events.
+        """
+
+        if "rooms" not in progress:
+            rooms = yield self._simple_select_onecol(
+                table="current_state_events",
+                keyvalues={},
+                retcol="DISTINCT room_id",
+                desc="_background_current_state_membership_get_rooms",
+            )
+            progress["rooms"] = rooms
+
+        rooms = progress["rooms"]
+
+        def _background_current_state_membership_txn(txn):
+            processed = 0
+            while rooms and processed < batch_size:
+                sql = """
+                    UPDATE current_state_events AS c
+                    SET membership = (
+                        SELECT membership FROM room_memberships
+                        WHERE event_id = c.event_id
+                    )
+                    WHERE room_id = ?
+                """
+                txn.execute(sql, (rooms.pop(),))
+                processed += txn.rowcount
+
+            self._background_update_progress_txn(
+                txn, _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, progress
+            )
+
+            return processed
+
+        result = yield self.runInteraction(
+            "_background_current_state_membership_update",
+            _background_current_state_membership_txn,
+        )
+
+        if not rooms:
+            yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME)
+
+        defer.returnValue(result)
+
 
 class _JoinedHostsCache(object):
     """Cache for joined hosts in a room that is optimised to handle updates
diff --git a/synapse/storage/schema/delta/56/current_state_events_membership.sql b/synapse/storage/schema/delta/56/current_state_events_membership.sql
new file mode 100644
index 0000000000..b2e08cd85d
--- /dev/null
+++ b/synapse/storage/schema/delta/56/current_state_events_membership.sql
@@ -0,0 +1,25 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- We add membership to current state so that we don't need to join against
+-- room_memberships, which can be surprisingly costly (we do such queries
+-- very frequently).
+-- This will be null for non-membership events and the content.membership key
+-- for membership events. (Will also be null for membership events until the
+-- background update job has finished).
+ALTER TABLE current_state_events ADD membership TEXT;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('current_state_events_membership', '{}');
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 83466e25d9..7fd16fe65e 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -618,15 +618,15 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
         sql = """
             SELECT room_id FROM (
                 SELECT c.room_id FROM current_state_events AS c
-                INNER JOIN room_memberships USING (event_id)
+                INNER JOIN room_memberships AS m USING (event_id)
                 WHERE type = 'm.room.member'
-                    AND membership = 'join'
+                    AND m.membership = 'join'
                     AND state_key = ?
             ) AS f1 INNER JOIN (
                 SELECT c.room_id FROM current_state_events AS c
-                INNER JOIN room_memberships USING (event_id)
+                INNER JOIN room_memberships AS m USING (event_id)
                 WHERE type = 'm.room.member'
-                    AND membership = 'join'
+                    AND m.membership = 'join'
                     AND state_key = ?
             ) f2 USING (room_id)
         """