diff options
author | Brendan Abolivier <babolivier@matrix.org> | 2019-11-01 17:04:45 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-01 17:04:45 +0000 |
commit | f496d2587723b0e802c49c210476266a19733f55 (patch) | |
tree | 9a2ea1817fdddec2f719e4ffe5d3150bf6f8599c /synapse/storage/data_stores | |
parent | Factor out an _AsyncEventContextImpl (#6298) (diff) | |
parent | Incorporate review (diff) | |
download | synapse-f496d2587723b0e802c49c210476266a19733f55.tar.xz |
Merge pull request #6301 from matrix-org/babolivier/msc2326
Implement MSC2326 (label based filtering)
Diffstat (limited to 'synapse/storage/data_stores')
-rw-r--r-- | synapse/storage/data_stores/main/events.py | 36 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/schema/delta/56/event_labels.sql | 30 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/stream.py | 11 |
3 files changed, 75 insertions, 2 deletions
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index aafc2007d3..301f8ea128 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -29,7 +29,7 @@ from prometheus_client import Counter from twisted.internet import defer import synapse.metrics -from synapse.api.constants import EventTypes +from synapse.api.constants import EventContentFields, EventTypes from synapse.api.errors import SynapseError from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 @@ -932,6 +932,13 @@ class EventsStore( self._handle_event_relations(txn, event) + # Store the labels for this event. + labels = event.content.get(EventContentFields.LABELS) + if labels: + self.insert_labels_for_event_txn( + txn, event.event_id, labels, event.room_id, event.depth + ) + # Insert into the room_memberships table. self._store_room_members_txn( txn, @@ -1917,6 +1924,33 @@ class EventsStore( get_all_updated_current_state_deltas_txn, ) + def insert_labels_for_event_txn( + self, txn, event_id, labels, room_id, topological_ordering + ): + """Store the mapping between an event's ID and its labels, with one row per + (event_id, label) tuple. + + Args: + txn (LoggingTransaction): The transaction to execute. + event_id (str): The event's ID. + labels (list[str]): A list of text labels. + room_id (str): The ID of the room the event was sent to. + topological_ordering (int): The position of the event in the room's topology. + """ + return self._simple_insert_many_txn( + txn=txn, + table="event_labels", + values=[ + { + "event_id": event_id, + "label": label, + "room_id": room_id, + "topological_ordering": topological_ordering, + } + for label in labels + ], + ) + AllNewEventsResult = namedtuple( "AllNewEventsResult", diff --git a/synapse/storage/data_stores/main/schema/delta/56/event_labels.sql b/synapse/storage/data_stores/main/schema/delta/56/event_labels.sql new file mode 100644 index 0000000000..5e29c1da19 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/56/event_labels.sql @@ -0,0 +1,30 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- room_id and topoligical_ordering are denormalised from the events table in order to +-- make the index work. +CREATE TABLE IF NOT EXISTS event_labels ( + event_id TEXT, + label TEXT, + room_id TEXT NOT NULL, + topological_ordering BIGINT NOT NULL, + PRIMARY KEY(event_id, label) +); + + +-- This index enables an event pagination looking for a particular label to index the +-- event_labels table first, which is much quicker than scanning the events table and then +-- filtering by label, if the label is rarely used relative to the size of the room. +CREATE INDEX event_labels_room_id_label_idx ON event_labels(room_id, label, topological_ordering); diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 263999dfca..616ef91d4e 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -229,6 +229,14 @@ def filter_to_clause(event_filter): clauses.append("contains_url = ?") args.append(event_filter.contains_url) + # We're only applying the "labels" filter on the database query, because applying the + # "not_labels" filter via a SQL query is non-trivial. Instead, we let + # event_filter.check_fields apply it, which is not as efficient but makes the + # implementation simpler. + if event_filter.labels: + clauses.append("(%s)" % " OR ".join("label = ?" for _ in event_filter.labels)) + args.extend(event_filter.labels) + return " AND ".join(clauses), args @@ -864,8 +872,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): args.append(int(limit)) sql = ( - "SELECT event_id, topological_ordering, stream_ordering" + "SELECT DISTINCT event_id, topological_ordering, stream_ordering" " FROM events" + " LEFT JOIN event_labels USING (event_id, room_id, topological_ordering)" " WHERE outlier = ? AND room_id = ? AND %(bounds)s" " ORDER BY topological_ordering %(order)s," " stream_ordering %(order)s LIMIT ?" |