summary refs log tree commit diff
path: root/synapse/storage/data_stores
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/data_stores')
-rw-r--r--synapse/storage/data_stores/main/__init__.py9
-rw-r--r--synapse/storage/data_stores/main/devices.py109
-rw-r--r--synapse/storage/data_stores/main/end_to_end_keys.py24
-rw-r--r--synapse/storage/data_stores/main/event_push_actions.py2
-rw-r--r--synapse/storage/data_stores/main/events.py44
-rw-r--r--synapse/storage/data_stores/main/events_bg_updates.py2
-rw-r--r--synapse/storage/data_stores/main/group_server.py4
-rw-r--r--synapse/storage/data_stores/main/monthly_active_users.py2
-rw-r--r--synapse/storage/data_stores/main/push_rule.py2
-rw-r--r--synapse/storage/data_stores/main/registration.py2
-rw-r--r--synapse/storage/data_stores/main/roommember.py2
-rw-r--r--synapse/storage/data_stores/main/schema/delta/56/event_labels.sql30
-rw-r--r--synapse/storage/data_stores/main/schema/delta/56/hidden_devices_fix.sql.sqlite42
-rw-r--r--synapse/storage/data_stores/main/search.py2
-rw-r--r--synapse/storage/data_stores/main/state.py20
-rw-r--r--synapse/storage/data_stores/main/stats.py4
-rw-r--r--synapse/storage/data_stores/main/stream.py11
17 files changed, 258 insertions, 53 deletions
diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py
index b185ba0b3e..10c940df1e 100644
--- a/synapse/storage/data_stores/main/__init__.py
+++ b/synapse/storage/data_stores/main/__init__.py
@@ -139,7 +139,10 @@ class DataStore(
             db_conn, "public_room_list_stream", "stream_id"
         )
         self._device_list_id_gen = StreamIdGenerator(
-            db_conn, "device_lists_stream", "stream_id"
+            db_conn,
+            "device_lists_stream",
+            "stream_id",
+            extra_tables=[("user_signature_stream", "stream_id")],
         )
         self._cross_signing_id_gen = StreamIdGenerator(
             db_conn, "e2e_cross_signing_keys", "stream_id"
@@ -317,7 +320,7 @@ class DataStore(
             ) u
         """
         txn.execute(sql, (time_from,))
-        count, = txn.fetchone()
+        (count,) = txn.fetchone()
         return count
 
     def count_r30_users(self):
@@ -396,7 +399,7 @@ class DataStore(
 
             txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
 
-            count, = txn.fetchone()
+            (count,) = txn.fetchone()
             results["all"] = count
 
             return results
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index f7a3542348..71f62036c0 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -37,6 +37,7 @@ from synapse.storage._base import (
     make_in_list_sql_clause,
 )
 from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.types import get_verify_key_from_cross_signing_key
 from synapse.util import batch_iter
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 
@@ -90,13 +91,18 @@ class DeviceWorkerStore(SQLBaseStore):
 
     @trace
     @defer.inlineCallbacks
-    def get_devices_by_remote(self, destination, from_stream_id, limit):
-        """Get stream of updates to send to remote servers
+    def get_device_updates_by_remote(self, destination, from_stream_id, limit):
+        """Get a stream of device updates to send to the given remote server.
 
+        Args:
+            destination (str): The host the device updates are intended for
+            from_stream_id (int): The minimum stream_id to filter updates by, exclusive
+            limit (int): Maximum number of device updates to return
         Returns:
-            Deferred[tuple[int, list[dict]]]:
+            Deferred[tuple[int, list[tuple[string,dict]]]]:
                 current stream id (ie, the stream id of the last update included in the
-                response), and the list of updates
+                response), and the list of updates, where each update is a pair of EDU
+                type and EDU contents
         """
         now_stream_id = self._device_list_id_gen.get_current_token()
 
@@ -117,8 +123,8 @@ class DeviceWorkerStore(SQLBaseStore):
         # stream_id; the rationale being that such a large device list update
         # is likely an error.
         updates = yield self.runInteraction(
-            "get_devices_by_remote",
-            self._get_devices_by_remote_txn,
+            "get_device_updates_by_remote",
+            self._get_device_updates_by_remote_txn,
             destination,
             from_stream_id,
             now_stream_id,
@@ -129,6 +135,37 @@ class DeviceWorkerStore(SQLBaseStore):
         if not updates:
             return now_stream_id, []
 
+        # get the cross-signing keys of the users in the list, so that we can
+        # determine which of the device changes were cross-signing keys
+        users = set(r[0] for r in updates)
+        master_key_by_user = {}
+        self_signing_key_by_user = {}
+        for user in users:
+            cross_signing_key = yield self.get_e2e_cross_signing_key(user, "master")
+            if cross_signing_key:
+                key_id, verify_key = get_verify_key_from_cross_signing_key(
+                    cross_signing_key
+                )
+                # verify_key is a VerifyKey from signedjson, which uses
+                # .version to denote the portion of the key ID after the
+                # algorithm and colon, which is the device ID
+                master_key_by_user[user] = {
+                    "key_info": cross_signing_key,
+                    "device_id": verify_key.version,
+                }
+
+            cross_signing_key = yield self.get_e2e_cross_signing_key(
+                user, "self_signing"
+            )
+            if cross_signing_key:
+                key_id, verify_key = get_verify_key_from_cross_signing_key(
+                    cross_signing_key
+                )
+                self_signing_key_by_user[user] = {
+                    "key_info": cross_signing_key,
+                    "device_id": verify_key.version,
+                }
+
         # if we have exceeded the limit, we need to exclude any results with the
         # same stream_id as the last row.
         if len(updates) > limit:
@@ -153,20 +190,33 @@ class DeviceWorkerStore(SQLBaseStore):
         # context which created the Edu.
 
         query_map = {}
-        for update in updates:
-            if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
+        cross_signing_keys_by_user = {}
+        for user_id, device_id, update_stream_id, update_context in updates:
+            if stream_id_cutoff is not None and update_stream_id >= stream_id_cutoff:
                 # Stop processing updates
                 break
 
-            key = (update[0], update[1])
-
-            update_context = update[3]
-            update_stream_id = update[2]
+            if (
+                user_id in master_key_by_user
+                and device_id == master_key_by_user[user_id]["device_id"]
+            ):
+                result = cross_signing_keys_by_user.setdefault(user_id, {})
+                result["master_key"] = master_key_by_user[user_id]["key_info"]
+            elif (
+                user_id in self_signing_key_by_user
+                and device_id == self_signing_key_by_user[user_id]["device_id"]
+            ):
+                result = cross_signing_keys_by_user.setdefault(user_id, {})
+                result["self_signing_key"] = self_signing_key_by_user[user_id][
+                    "key_info"
+                ]
+            else:
+                key = (user_id, device_id)
 
-            previous_update_stream_id, _ = query_map.get(key, (0, None))
+                previous_update_stream_id, _ = query_map.get(key, (0, None))
 
-            if update_stream_id > previous_update_stream_id:
-                query_map[key] = (update_stream_id, update_context)
+                if update_stream_id > previous_update_stream_id:
+                    query_map[key] = (update_stream_id, update_context)
 
         # If we didn't find any updates with a stream_id lower than the cutoff, it
         # means that there are more than limit updates all of which have the same
@@ -176,16 +226,22 @@ class DeviceWorkerStore(SQLBaseStore):
         # devices, in which case E2E isn't going to work well anyway. We'll just
         # skip that stream_id and return an empty list, and continue with the next
         # stream_id next time.
-        if not query_map:
+        if not query_map and not cross_signing_keys_by_user:
             return stream_id_cutoff, []
 
         results = yield self._get_device_update_edus_by_remote(
             destination, from_stream_id, query_map
         )
 
+        # add the updated cross-signing keys to the results list
+        for user_id, result in iteritems(cross_signing_keys_by_user):
+            result["user_id"] = user_id
+            # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
+            results.append(("org.matrix.signing_key_update", result))
+
         return now_stream_id, results
 
-    def _get_devices_by_remote_txn(
+    def _get_device_updates_by_remote_txn(
         self, txn, destination, from_stream_id, now_stream_id, limit
     ):
         """Return device update information for a given remote destination
@@ -200,6 +256,7 @@ class DeviceWorkerStore(SQLBaseStore):
         Returns:
             List: List of device updates
         """
+        # get the list of device updates that need to be sent
         sql = """
             SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes
             WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
@@ -225,12 +282,16 @@ class DeviceWorkerStore(SQLBaseStore):
             List[Dict]: List of objects representing an device update EDU
 
         """
-        devices = yield self.runInteraction(
-            "_get_e2e_device_keys_txn",
-            self._get_e2e_device_keys_txn,
-            query_map.keys(),
-            include_all_devices=True,
-            include_deleted_devices=True,
+        devices = (
+            yield self.runInteraction(
+                "_get_e2e_device_keys_txn",
+                self._get_e2e_device_keys_txn,
+                query_map.keys(),
+                include_all_devices=True,
+                include_deleted_devices=True,
+            )
+            if query_map
+            else {}
         )
 
         results = []
@@ -262,7 +323,7 @@ class DeviceWorkerStore(SQLBaseStore):
                 else:
                     result["deleted"] = True
 
-                results.append(result)
+                results.append(("m.device_list_update", result))
 
         return results
 
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index a0bc6f2d18..073412a78d 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -315,6 +315,30 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             from_user_id,
         )
 
+    def get_all_user_signature_changes_for_remotes(self, from_key, to_key):
+        """Return a list of changes from the user signature stream to notify remotes.
+        Note that the user signature stream represents when a user signs their
+        device with their user-signing key, which is not published to other
+        users or servers, so no `destination` is needed in the returned
+        list. However, this is needed to poke workers.
+
+        Args:
+            from_key (int): the stream ID to start at (exclusive)
+            to_key (int): the stream ID to end at (inclusive)
+
+        Returns:
+            Deferred[list[(int,str)]] a list of `(stream_id, user_id)`
+        """
+        sql = """
+            SELECT MAX(stream_id) AS stream_id, from_user_id AS user_id
+            FROM user_signature_stream
+            WHERE ? < stream_id AND stream_id <= ?
+            GROUP BY user_id
+        """
+        return self._execute(
+            "get_all_user_signature_changes_for_remotes", None, sql, from_key, to_key
+        )
+
 
 class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
     def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys):
diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py
index 22025effbc..04ce21ac66 100644
--- a/synapse/storage/data_stores/main/event_push_actions.py
+++ b/synapse/storage/data_stores/main/event_push_actions.py
@@ -863,7 +863,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         )
         stream_row = txn.fetchone()
         if stream_row:
-            offset_stream_ordering, = stream_row
+            (offset_stream_ordering,) = stream_row
             rotate_to_stream_ordering = min(
                 self.stream_ordering_day_ago, offset_stream_ordering
             )
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index a71d7346d2..68f27078c4 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -30,7 +30,7 @@ from prometheus_client import Counter
 from twisted.internet import defer
 
 import synapse.metrics
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventContentFields, EventTypes
 from synapse.api.errors import SynapseError
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
@@ -933,6 +933,13 @@ class EventsStore(
 
             self._handle_event_relations(txn, event)
 
+            # Store the labels for this event.
+            labels = event.content.get(EventContentFields.LABELS)
+            if labels:
+                self.insert_labels_for_event_txn(
+                    txn, event.event_id, labels, event.room_id, event.depth
+                )
+
         # Insert into the room_memberships table.
         self._store_room_members_txn(
             txn,
@@ -1126,7 +1133,7 @@ class EventsStore(
                 AND stream_ordering > ?
             """
             txn.execute(sql, (self.stream_ordering_day_ago,))
-            count, = txn.fetchone()
+            (count,) = txn.fetchone()
             return count
 
         ret = yield self.runInteraction("count_messages", _count_messages)
@@ -1147,7 +1154,7 @@ class EventsStore(
             """
 
             txn.execute(sql, (like_clause, self.stream_ordering_day_ago))
-            count, = txn.fetchone()
+            (count,) = txn.fetchone()
             return count
 
         ret = yield self.runInteraction("count_daily_sent_messages", _count_messages)
@@ -1162,7 +1169,7 @@ class EventsStore(
                 AND stream_ordering > ?
             """
             txn.execute(sql, (self.stream_ordering_day_ago,))
-            count, = txn.fetchone()
+            (count,) = txn.fetchone()
             return count
 
         ret = yield self.runInteraction("count_daily_active_rooms", _count)
@@ -1596,7 +1603,7 @@ class EventsStore(
         """,
             (room_id,),
         )
-        min_depth, = txn.fetchone()
+        (min_depth,) = txn.fetchone()
 
         logger.info("[purge] updating room_depth to %d", min_depth)
 
@@ -1905,6 +1912,33 @@ class EventsStore(
             get_all_updated_current_state_deltas_txn,
         )
 
+    def insert_labels_for_event_txn(
+        self, txn, event_id, labels, room_id, topological_ordering
+    ):
+        """Store the mapping between an event's ID and its labels, with one row per
+        (event_id, label) tuple.
+
+        Args:
+            txn (LoggingTransaction): The transaction to execute.
+            event_id (str): The event's ID.
+            labels (list[str]): A list of text labels.
+            room_id (str): The ID of the room the event was sent to.
+            topological_ordering (int): The position of the event in the room's topology.
+        """
+        return self._simple_insert_many_txn(
+            txn=txn,
+            table="event_labels",
+            values=[
+                {
+                    "event_id": event_id,
+                    "label": label,
+                    "room_id": room_id,
+                    "topological_ordering": topological_ordering,
+                }
+                for label in labels
+            ],
+        )
+
 
 AllNewEventsResult = namedtuple(
     "AllNewEventsResult",
diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py
index 31ea6f917f..51352b9966 100644
--- a/synapse/storage/data_stores/main/events_bg_updates.py
+++ b/synapse/storage/data_stores/main/events_bg_updates.py
@@ -438,7 +438,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
             if not rows:
                 return 0
 
-            upper_event_id, = rows[-1]
+            (upper_event_id,) = rows[-1]
 
             # Update the redactions with the received_ts.
             #
diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py
index aeae5a2b28..b3a2771f1b 100644
--- a/synapse/storage/data_stores/main/group_server.py
+++ b/synapse/storage/data_stores/main/group_server.py
@@ -249,7 +249,7 @@ class GroupServerStore(SQLBaseStore):
                 WHERE group_id = ? AND category_id = ?
             """
             txn.execute(sql, (group_id, category_id))
-            order, = txn.fetchone()
+            (order,) = txn.fetchone()
 
         if existing:
             to_update = {}
@@ -509,7 +509,7 @@ class GroupServerStore(SQLBaseStore):
                 WHERE group_id = ? AND role_id = ?
             """
             txn.execute(sql, (group_id, role_id))
-            order, = txn.fetchone()
+            (order,) = txn.fetchone()
 
         if existing:
             to_update = {}
diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py
index e6ee1e4aaa..b41c3d317a 100644
--- a/synapse/storage/data_stores/main/monthly_active_users.py
+++ b/synapse/storage/data_stores/main/monthly_active_users.py
@@ -171,7 +171,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
             sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
 
             txn.execute(sql)
-            count, = txn.fetchone()
+            (count,) = txn.fetchone()
             return count
 
         return self.runInteraction("count_users", _count_users)
diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py
index cd95f1ce60..b520062d84 100644
--- a/synapse/storage/data_stores/main/push_rule.py
+++ b/synapse/storage/data_stores/main/push_rule.py
@@ -143,7 +143,7 @@ class PushRulesWorkerStore(
                     " WHERE user_id = ? AND ? < stream_id"
                 )
                 txn.execute(sql, (user_id, last_id))
-                count, = txn.fetchone()
+                (count,) = txn.fetchone()
                 return bool(count)
 
             return self.runInteraction(
diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index 6c5b29288a..f70d41ecab 100644
--- a/synapse/storage/data_stores/main/registration.py
+++ b/synapse/storage/data_stores/main/registration.py
@@ -459,7 +459,7 @@ class RegistrationWorkerStore(SQLBaseStore):
                 WHERE appservice_id IS NULL
             """
             )
-            count, = txn.fetchone()
+            (count,) = txn.fetchone()
             return count
 
         ret = yield self.runInteraction("count_users", _count_users)
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index bc04bfd7d4..2af24a20b7 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -927,7 +927,7 @@ class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
                 if not row or not row[0]:
                     return processed, True
 
-                next_room, = row
+                (next_room,) = row
 
                 sql = """
                     UPDATE current_state_events
diff --git a/synapse/storage/data_stores/main/schema/delta/56/event_labels.sql b/synapse/storage/data_stores/main/schema/delta/56/event_labels.sql
new file mode 100644
index 0000000000..5e29c1da19
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/event_labels.sql
@@ -0,0 +1,30 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- room_id and topoligical_ordering are denormalised from the events table in order to
+-- make the index work.
+CREATE TABLE IF NOT EXISTS event_labels (
+    event_id TEXT,
+    label TEXT,
+    room_id TEXT NOT NULL,
+    topological_ordering BIGINT NOT NULL,
+    PRIMARY KEY(event_id, label)
+);
+
+
+-- This index enables an event pagination looking for a particular label to index the
+-- event_labels table first, which is much quicker than scanning the events table and then
+-- filtering by label, if the label is rarely used relative to the size of the room.
+CREATE INDEX event_labels_room_id_label_idx ON event_labels(room_id, label, topological_ordering);
diff --git a/synapse/storage/data_stores/main/schema/delta/56/hidden_devices_fix.sql.sqlite b/synapse/storage/data_stores/main/schema/delta/56/hidden_devices_fix.sql.sqlite
new file mode 100644
index 0000000000..e8b1fd35d8
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/hidden_devices_fix.sql.sqlite
@@ -0,0 +1,42 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Change the hidden column from a default value of FALSE to a default value of
+ * 0, because sqlite3 prior to 3.23.0 caused the hidden column to contain the
+ * string 'FALSE', which is truthy.
+ *
+ * Since sqlite doesn't allow us to just change the default value, we have to
+ * recreate the table, copy the data, fix the rows that have incorrect data, and
+ * replace the old table with the new table.
+ */
+
+CREATE TABLE IF NOT EXISTS devices2 (
+    user_id TEXT NOT NULL,
+    device_id TEXT NOT NULL,
+    display_name TEXT,
+    last_seen BIGINT,
+    ip TEXT,
+    user_agent TEXT,
+    hidden BOOLEAN DEFAULT 0,
+    CONSTRAINT device_uniqueness UNIQUE (user_id, device_id)
+);
+
+INSERT INTO devices2 SELECT * FROM devices;
+
+UPDATE devices2 SET hidden = 0 WHERE hidden = 'FALSE';
+
+DROP TABLE devices;
+
+ALTER TABLE devices2 RENAME TO devices;
diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index a59b8331e1..d1d7c6863d 100644
--- a/synapse/storage/data_stores/main/search.py
+++ b/synapse/storage/data_stores/main/search.py
@@ -672,7 +672,7 @@ class SearchStore(SearchBackgroundUpdateStore):
                     )
                 )
                 txn.execute(query, (value, search_query))
-                headline, = txn.fetchall()[0]
+                (headline,) = txn.fetchall()[0]
 
                 # Now we need to pick the possible highlights out of the haedline
                 # result.
diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py
index bf6de4ca22..e1d3041c7c 100644
--- a/synapse/storage/data_stores/main/state.py
+++ b/synapse/storage/data_stores/main/state.py
@@ -725,16 +725,18 @@ class StateGroupWorkerStore(
         member_filter, non_member_filter = state_filter.get_member_split()
 
         # Now we look them up in the member and non-member caches
-        non_member_state, incomplete_groups_nm, = (
-            yield self._get_state_for_groups_using_cache(
-                groups, self._state_group_cache, state_filter=non_member_filter
-            )
+        (
+            non_member_state,
+            incomplete_groups_nm,
+        ) = yield self._get_state_for_groups_using_cache(
+            groups, self._state_group_cache, state_filter=non_member_filter
         )
 
-        member_state, incomplete_groups_m, = (
-            yield self._get_state_for_groups_using_cache(
-                groups, self._state_group_members_cache, state_filter=member_filter
-            )
+        (
+            member_state,
+            incomplete_groups_m,
+        ) = yield self._get_state_for_groups_using_cache(
+            groups, self._state_group_members_cache, state_filter=member_filter
         )
 
         state = dict(non_member_state)
@@ -1106,7 +1108,7 @@ class StateBackgroundUpdateStore(
                     " WHERE id < ? AND room_id = ?",
                     (state_group, room_id),
                 )
-                prev_group, = txn.fetchone()
+                (prev_group,) = txn.fetchone()
                 new_last_state_group = state_group
 
                 if prev_group:
diff --git a/synapse/storage/data_stores/main/stats.py b/synapse/storage/data_stores/main/stats.py
index 4d59b7833f..45b3de7d56 100644
--- a/synapse/storage/data_stores/main/stats.py
+++ b/synapse/storage/data_stores/main/stats.py
@@ -773,7 +773,7 @@ class StatsStore(StateDeltasStore):
                 (room_id,),
             )
 
-            current_state_events_count, = txn.fetchone()
+            (current_state_events_count,) = txn.fetchone()
 
             users_in_room = self.get_users_in_room_txn(txn, room_id)
 
@@ -863,7 +863,7 @@ class StatsStore(StateDeltasStore):
                 """,
                 (user_id,),
             )
-            count, = txn.fetchone()
+            (count,) = txn.fetchone()
             return count, pos
 
         joined_rooms, pos = yield self.runInteraction(
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index 263999dfca..616ef91d4e 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -229,6 +229,14 @@ def filter_to_clause(event_filter):
         clauses.append("contains_url = ?")
         args.append(event_filter.contains_url)
 
+    # We're only applying the "labels" filter on the database query, because applying the
+    # "not_labels" filter via a SQL query is non-trivial. Instead, we let
+    # event_filter.check_fields apply it, which is not as efficient but makes the
+    # implementation simpler.
+    if event_filter.labels:
+        clauses.append("(%s)" % " OR ".join("label = ?" for _ in event_filter.labels))
+        args.extend(event_filter.labels)
+
     return " AND ".join(clauses), args
 
 
@@ -864,8 +872,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         args.append(int(limit))
 
         sql = (
-            "SELECT event_id, topological_ordering, stream_ordering"
+            "SELECT DISTINCT event_id, topological_ordering, stream_ordering"
             " FROM events"
+            " LEFT JOIN event_labels USING (event_id, room_id, topological_ordering)"
             " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
             " ORDER BY topological_ordering %(order)s,"
             " stream_ordering %(order)s LIMIT ?"