summary refs log tree commit diff
path: root/synapse/storage/databases/main/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/stream.py')
-rw-r--r--synapse/storage/databases/main/stream.py35
1 files changed, 24 insertions, 11 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py

index d34376b8df..be81025355 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -1178,6 +1178,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): self, room_id: str, end_token: RoomStreamToken, + event_types: Optional[Collection[str]] = None, ) -> Optional[Tuple[str, PersistedEventPosition]]: """ Returns the ID and event position of the last event in a room at or before a @@ -1186,6 +1187,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Args: room_id end_token: The token used to stream from + event_types: Optional allowlist of event types to filter by Returns: The ID of the most recent event and it's position, or None if there are no @@ -1207,9 +1209,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): min_stream = end_token.stream max_stream = end_token.get_max_stream_pos() - # We use `union all` because we don't need any of the deduplication logic - # (`union` is really a union + distinct). `UNION ALL` does preserve the - # ordering of the operand queries but there is no actual gurantee that it + event_type_clause = "" + event_type_args: List[str] = [] + if event_types is not None and len(event_types) > 0: + event_type_clause, event_type_args = make_in_list_sql_clause( + txn.database_engine, "type", event_types + ) + event_type_clause = f"AND {event_type_clause}" + + # We use `UNION ALL` because we don't need any of the deduplication logic + # (`UNION` is really a `UNION` + `DISTINCT`). `UNION ALL` does preserve the + # ordering of the operand queries but there is no actual guarantee that it # has this behavior in all scenarios so we need the extra `ORDER BY` at the # bottom. sql = """ @@ -1218,6 +1228,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): FROM events LEFT JOIN rejections USING (event_id) WHERE room_id = ? + %s AND ? < stream_ordering AND stream_ordering <= ? AND NOT outlier AND rejections.event_id IS NULL @@ -1229,6 +1240,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): FROM events LEFT JOIN rejections USING (event_id) WHERE room_id = ? + %s AND stream_ordering <= ? AND NOT outlier AND rejections.event_id IS NULL @@ -1236,16 +1248,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): LIMIT 1 ) AS b ORDER BY stream_ordering DESC - """ + """ % ( + event_type_clause, + event_type_clause, + ) txn.execute( sql, - ( - room_id, - min_stream, - max_stream, - room_id, - min_stream, - ), + [room_id] + + event_type_args + + [min_stream, max_stream, room_id] + + event_type_args + + [min_stream], ) for instance_name, stream_ordering, topological_ordering, event_id in txn: