diff --git a/changelog.d/6340.feature b/changelog.d/6340.feature
new file mode 100644
index 0000000000..78a187a1dc
--- /dev/null
+++ b/changelog.d/6340.feature
@@ -0,0 +1 @@
+Implement label-based filtering on `/sync` and `/messages` ([MSC2326](https://github.com/matrix-org/matrix-doc/pull/2326)).
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index 616ef91d4e..8780fdd989 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -871,14 +871,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
args.append(int(limit))
- sql = (
- "SELECT DISTINCT event_id, topological_ordering, stream_ordering"
- " FROM events"
- " LEFT JOIN event_labels USING (event_id, room_id, topological_ordering)"
- " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
- " ORDER BY topological_ordering %(order)s,"
- " stream_ordering %(order)s LIMIT ?"
- ) % {"bounds": bounds, "order": order}
+ select_keywords = "SELECT"
+ join_clause = ""
+ if event_filter and event_filter.labels:
+ # If we're not filtering on a label, then joining on event_labels will
+ # return as many row for a single event as the number of labels it has. To
+ # avoid this, only join if we're filtering on at least one label.
+ join_clause = """
+ LEFT JOIN event_labels
+ USING (event_id, room_id, topological_ordering)
+ """
+ if len(event_filter.labels) > 1:
+ # Using DISTINCT in this SELECT query is quite expensive, because it
+ # requires the engine to sort on the entire (not limited) result set,
+ # i.e. the entire events table. We only need to use it when we're
+ # filtering on more than two labels, because that's the only scenario
+ # in which we can possibly to get multiple times the same event ID in
+ # the results.
+ select_keywords += "DISTINCT"
+
+ sql = """
+ %(select_keywords)s event_id, topological_ordering, stream_ordering
+ FROM events
+ %(join_clause)s
+ WHERE outlier = ? AND room_id = ? AND %(bounds)s
+ ORDER BY topological_ordering %(order)s,
+ stream_ordering %(order)s LIMIT ?
+ """ % {
+ "select_keywords": select_keywords,
+ "join_clause": join_clause,
+ "bounds": bounds,
+ "order": order,
+ }
txn.execute(sql, args)
|