summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/crypto/keyring.py69
-rw-r--r--synapse/rest/client/v1/room.py2
-rw-r--r--synapse/storage/events_worker.py7
-rw-r--r--synapse/storage/roommember.py33
-rw-r--r--synapse/storage/schema/delta/54/stats2.sql28
-rw-r--r--synapse/storage/stats.py130
6 files changed, 141 insertions, 128 deletions
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 194867db03..f4918d1bc6 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -15,12 +15,12 @@
 # limitations under the License.
 
 import logging
-from collections import namedtuple
 
 import six
 from six import raise_from
 from six.moves import urllib
 
+import attr
 from signedjson.key import (
     decode_verify_key_bytes,
     encode_verify_key_base64,
@@ -57,22 +57,32 @@ from synapse.util.retryutils import NotRetryingDestination
 logger = logging.getLogger(__name__)
 
 
-VerifyKeyRequest = namedtuple(
-    "VerifyRequest", ("server_name", "key_ids", "json_object", "deferred")
-)
-"""
-A request for a verify key to verify a JSON object.
+@attr.s(slots=True, cmp=False)
+class VerifyKeyRequest(object):
+    """
+    A request for a verify key to verify a JSON object.
+
+    Attributes:
+        server_name(str): The name of the server to verify against.
 
-Attributes:
-    server_name(str): The name of the server to verify against.
-    key_ids(set(str)): The set of key_ids to that could be used to verify the
-        JSON object
-    json_object(dict): The JSON object to verify.
-    deferred(Deferred[str, str, nacl.signing.VerifyKey]):
-        A deferred (server_name, key_id, verify_key) tuple that resolves when
-        a verify key has been fetched. The deferreds' callbacks are run with no
-        logcontext.
-"""
+        key_ids(set[str]): The set of key_ids to that could be used to verify the
+            JSON object
+
+        json_object(dict): The JSON object to verify.
+
+        deferred(Deferred[str, str, nacl.signing.VerifyKey]):
+            A deferred (server_name, key_id, verify_key) tuple that resolves when
+            a verify key has been fetched. The deferreds' callbacks are run with no
+            logcontext.
+
+            If we are unable to find a key which satisfies the request, the deferred
+            errbacks with an M_UNAUTHORIZED SynapseError.
+    """
+
+    server_name = attr.ib()
+    key_ids = attr.ib()
+    json_object = attr.ib()
+    deferred = attr.ib()
 
 
 class KeyLookupError(ValueError):
@@ -772,31 +782,8 @@ def _handle_key_deferred(verify_request):
         SynapseError if there was a problem performing the verification
     """
     server_name = verify_request.server_name
-    try:
-        with PreserveLoggingContext():
-            _, key_id, verify_key = yield verify_request.deferred
-    except KeyLookupError as e:
-        logger.warn(
-            "Failed to download keys for %s: %s %s",
-            server_name,
-            type(e).__name__,
-            str(e),
-        )
-        raise SynapseError(
-            502, "Error downloading keys for %s" % (server_name,), Codes.UNAUTHORIZED
-        )
-    except Exception as e:
-        logger.exception(
-            "Got Exception when downloading keys for %s: %s %s",
-            server_name,
-            type(e).__name__,
-            str(e),
-        )
-        raise SynapseError(
-            401,
-            "No key for %s with id %s" % (server_name, verify_request.key_ids),
-            Codes.UNAUTHORIZED,
-        )
+    with PreserveLoggingContext():
+        _, key_id, verify_key = yield verify_request.deferred
 
     json_object = verify_request.json_object
 
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 255a85c588..b92c6a9a9c 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -475,6 +475,8 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
         if filter_bytes:
             filter_json = urlparse.unquote(filter_bytes.decode("UTF-8"))
             event_filter = Filter(json.loads(filter_json))
+            if event_filter.filter_json.get("event_format", "client") == "federation":
+                as_client_event = False
         else:
             event_filter = None
         msgs = yield self.pagination_handler.get_messages(
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index b56c83e460..1782428048 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -618,7 +618,12 @@ class EventsWorkerStore(SQLBaseStore):
         """
         See get_total_state_event_counts.
         """
-        sql = "SELECT COUNT(*) FROM state_events WHERE room_id=?"
+        # We join against the events table as that has an index on room_id
+        sql = """
+            SELECT COUNT(*) FROM state_events
+            INNER JOIN events USING (room_id, event_id)
+            WHERE room_id=?
+        """
         txn.execute(sql, (room_id,))
         row = txn.fetchone()
         return row[0] if row else 0
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 4bd1669458..7617913326 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -142,26 +142,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         return self.runInteraction("get_room_summary", _get_room_summary_txn)
 
-    def _get_user_count_in_room_txn(self, txn, room_id, membership):
+    def _get_user_counts_in_room_txn(self, txn, room_id):
         """
-        See get_user_count_in_room.
-        """
-        sql = (
-            "SELECT count(*) FROM room_memberships as m"
-            " INNER JOIN current_state_events as c"
-            " ON m.event_id = c.event_id "
-            " AND m.room_id = c.room_id "
-            " AND m.user_id = c.state_key"
-            " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
-        )
-
-        txn.execute(sql, (room_id, membership))
-        row = txn.fetchone()
-        return row[0]
-
-    def get_user_count_in_room(self, room_id, membership):
-        """
-        Get the user count in a room with a particular membership.
+        Get the user count in a room by membership.
 
         Args:
             room_id (str)
@@ -170,9 +153,15 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         Returns:
             Deferred[int]
         """
-        return self.runInteraction(
-            "get_users_in_room", self._get_user_count_in_room_txn, room_id, membership
-        )
+        sql = """
+        SELECT m.membership, count(*) FROM room_memberships as m
+            INNER JOIN current_state_events as c USING(event_id)
+            WHERE c.type = 'm.room.member' AND c.room_id = ?
+            GROUP BY m.membership
+        """
+
+        txn.execute(sql, (room_id,))
+        return {row[0]: row[1] for row in txn}
 
     @cached()
     def get_invited_rooms_for_user(self, user_id):
diff --git a/synapse/storage/schema/delta/54/stats2.sql b/synapse/storage/schema/delta/54/stats2.sql
new file mode 100644
index 0000000000..3b2d48447f
--- /dev/null
+++ b/synapse/storage/schema/delta/54/stats2.sql
@@ -0,0 +1,28 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This delta file gets run after `54/stats.sql` delta.
+
+-- We want to add some indices to the temporary stats table, so we re-insert
+-- 'populate_stats_createtables' if we are still processing the rooms update.
+INSERT INTO background_updates (update_name, progress_json)
+    SELECT 'populate_stats_createtables', '{}'
+    WHERE
+        'populate_stats_process_rooms' IN (
+            SELECT update_name FROM background_updates
+        )
+        AND 'populate_stats_createtables' NOT IN (  -- don't insert if already exists
+            SELECT update_name FROM background_updates
+        );
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index eb0ced5b5e..1c0b183a56 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -18,6 +18,7 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
+from synapse.storage.prepare_database import get_statements
 from synapse.storage.state_deltas import StateDeltasStore
 from synapse.util.caches.descriptors import cached
 
@@ -69,12 +70,25 @@ class StatsStore(StateDeltasStore):
 
         # Get all the rooms that we want to process.
         def _make_staging_area(txn):
-            sql = (
-                "CREATE TABLE IF NOT EXISTS "
-                + TEMP_TABLE
-                + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
-            )
-            txn.execute(sql)
+            # Create the temporary tables
+            stmts = get_statements("""
+                -- We just recreate the table, we'll be reinserting the
+                -- correct entries again later anyway.
+                DROP TABLE IF EXISTS {temp}_rooms;
+
+                CREATE TABLE IF NOT EXISTS {temp}_rooms(
+                    room_id TEXT NOT NULL,
+                    events BIGINT NOT NULL
+                );
+
+                CREATE INDEX {temp}_rooms_events
+                    ON {temp}_rooms(events);
+                CREATE INDEX {temp}_rooms_id
+                    ON {temp}_rooms(room_id);
+            """.format(temp=TEMP_TABLE).splitlines())
+
+            for statement in stmts:
+                txn.execute(statement)
 
             sql = (
                 "CREATE TABLE IF NOT EXISTS "
@@ -83,15 +97,16 @@ class StatsStore(StateDeltasStore):
             )
             txn.execute(sql)
 
-            # Get rooms we want to process from the database
+            # Get rooms we want to process from the database, only adding
+            # those that we haven't (i.e. those not in room_stats_earliest_token)
             sql = """
-                SELECT room_id, count(*) FROM current_state_events
-                GROUP BY room_id
-            """
+                INSERT INTO %s_rooms (room_id, events)
+                SELECT c.room_id, count(*) FROM current_state_events AS c
+                LEFT JOIN room_stats_earliest_token AS t USING (room_id)
+                WHERE t.room_id IS NULL
+                GROUP BY c.room_id
+            """ % (TEMP_TABLE,)
             txn.execute(sql)
-            rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
-            self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
-            del rooms
 
         new_pos = yield self.get_max_stream_id_in_current_state_deltas()
         yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
@@ -179,46 +194,39 @@ class StatsStore(StateDeltasStore):
 
             current_state_ids = yield self.get_current_state_ids(room_id)
 
-            join_rules = yield self.get_event(
-                current_state_ids.get((EventTypes.JoinRules, "")), allow_none=True
-            )
-            history_visibility = yield self.get_event(
-                current_state_ids.get((EventTypes.RoomHistoryVisibility, "")),
-                allow_none=True,
-            )
-            encryption = yield self.get_event(
-                current_state_ids.get((EventTypes.RoomEncryption, "")), allow_none=True
-            )
-            name = yield self.get_event(
-                current_state_ids.get((EventTypes.Name, "")), allow_none=True
-            )
-            topic = yield self.get_event(
-                current_state_ids.get((EventTypes.Topic, "")), allow_none=True
-            )
-            avatar = yield self.get_event(
-                current_state_ids.get((EventTypes.RoomAvatar, "")), allow_none=True
+            join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
+            history_visibility_id = current_state_ids.get(
+                (EventTypes.RoomHistoryVisibility, "")
             )
-            canonical_alias = yield self.get_event(
-                current_state_ids.get((EventTypes.CanonicalAlias, "")), allow_none=True
-            )
-
-            def _or_none(x, arg):
-                if x:
-                    return x.content.get(arg)
+            encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
+            name_id = current_state_ids.get((EventTypes.Name, ""))
+            topic_id = current_state_ids.get((EventTypes.Topic, ""))
+            avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
+            canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
+
+            state_events = yield self.get_events([
+                join_rules_id, history_visibility_id, encryption_id, name_id,
+                topic_id, avatar_id, canonical_alias_id,
+            ])
+
+            def _get_or_none(event_id, arg):
+                event = state_events.get(event_id)
+                if event:
+                    return event.content.get(arg)
                 return None
 
             yield self.update_room_state(
                 room_id,
                 {
-                    "join_rules": _or_none(join_rules, "join_rule"),
-                    "history_visibility": _or_none(
-                        history_visibility, "history_visibility"
+                    "join_rules": _get_or_none(join_rules_id, "join_rule"),
+                    "history_visibility": _get_or_none(
+                        history_visibility_id, "history_visibility"
                     ),
-                    "encryption": _or_none(encryption, "algorithm"),
-                    "name": _or_none(name, "name"),
-                    "topic": _or_none(topic, "topic"),
-                    "avatar": _or_none(avatar, "url"),
-                    "canonical_alias": _or_none(canonical_alias, "alias"),
+                    "encryption": _get_or_none(encryption_id, "algorithm"),
+                    "name": _get_or_none(name_id, "name"),
+                    "topic": _get_or_none(topic_id, "topic"),
+                    "avatar": _get_or_none(avatar_id, "url"),
+                    "canonical_alias": _get_or_none(canonical_alias_id, "alias"),
                 },
             )
 
@@ -233,18 +241,9 @@ class StatsStore(StateDeltasStore):
                 current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
 
                 current_state_events = len(current_state_ids)
-                joined_members = self._get_user_count_in_room_txn(
-                    txn, room_id, Membership.JOIN
-                )
-                invited_members = self._get_user_count_in_room_txn(
-                    txn, room_id, Membership.INVITE
-                )
-                left_members = self._get_user_count_in_room_txn(
-                    txn, room_id, Membership.LEAVE
-                )
-                banned_members = self._get_user_count_in_room_txn(
-                    txn, room_id, Membership.BAN
-                )
+
+                membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
+
                 total_state_events = self._get_total_state_event_counts_txn(
                     txn, room_id
                 )
@@ -257,10 +256,10 @@ class StatsStore(StateDeltasStore):
                     {
                         "bucket_size": self.stats_bucket_size,
                         "current_state_events": current_state_events,
-                        "joined_members": joined_members,
-                        "invited_members": invited_members,
-                        "left_members": left_members,
-                        "banned_members": banned_members,
+                        "joined_members": membership_counts.get(Membership.JOIN, 0),
+                        "invited_members": membership_counts.get(Membership.INVITE, 0),
+                        "left_members": membership_counts.get(Membership.LEAVE, 0),
+                        "banned_members": membership_counts.get(Membership.BAN, 0),
                         "state_events": total_state_events,
                     },
                 )
@@ -270,10 +269,13 @@ class StatsStore(StateDeltasStore):
                     {"room_id": room_id, "token": current_token},
                 )
 
+                # We've finished a room. Delete it from the table.
+                self._simple_delete_one_txn(
+                    txn, TEMP_TABLE + "_rooms", {"room_id": room_id},
+                )
+
             yield self.runInteraction("update_room_stats", _fetch_data)
 
-            # We've finished a room. Delete it from the table.
-            yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
             # Update the remaining counter.
             progress["remaining"] -= 1
             yield self.runInteraction(