diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index d34376b8df..be81025355 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1178,6 +1178,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
self,
room_id: str,
end_token: RoomStreamToken,
+ event_types: Optional[Collection[str]] = None,
) -> Optional[Tuple[str, PersistedEventPosition]]:
"""
Returns the ID and event position of the last event in a room at or before a
@@ -1186,6 +1187,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
Args:
room_id
end_token: The token used to stream from
+ event_types: Optional allowlist of event types to filter by
Returns:
The ID of the most recent event and it's position, or None if there are no
@@ -1207,9 +1209,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
min_stream = end_token.stream
max_stream = end_token.get_max_stream_pos()
- # We use `union all` because we don't need any of the deduplication logic
- # (`union` is really a union + distinct). `UNION ALL` does preserve the
- # ordering of the operand queries but there is no actual gurantee that it
+ event_type_clause = ""
+ event_type_args: List[str] = []
+ if event_types is not None and len(event_types) > 0:
+ event_type_clause, event_type_args = make_in_list_sql_clause(
+ txn.database_engine, "type", event_types
+ )
+ event_type_clause = f"AND {event_type_clause}"
+
+ # We use `UNION ALL` because we don't need any of the deduplication logic
+ # (`UNION` is really a `UNION` + `DISTINCT`). `UNION ALL` does preserve the
+ # ordering of the operand queries but there is no actual guarantee that it
# has this behavior in all scenarios so we need the extra `ORDER BY` at the
# bottom.
sql = """
@@ -1218,6 +1228,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
FROM events
LEFT JOIN rejections USING (event_id)
WHERE room_id = ?
+ %s
AND ? < stream_ordering AND stream_ordering <= ?
AND NOT outlier
AND rejections.event_id IS NULL
@@ -1229,6 +1240,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
FROM events
LEFT JOIN rejections USING (event_id)
WHERE room_id = ?
+ %s
AND stream_ordering <= ?
AND NOT outlier
AND rejections.event_id IS NULL
@@ -1236,16 +1248,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
LIMIT 1
) AS b
ORDER BY stream_ordering DESC
- """
+ """ % (
+ event_type_clause,
+ event_type_clause,
+ )
txn.execute(
sql,
- (
- room_id,
- min_stream,
- max_stream,
- room_id,
- min_stream,
- ),
+ [room_id]
+ + event_type_args
+ + [min_stream, max_stream, room_id]
+ + event_type_args
+ + [min_stream],
)
for instance_name, stream_ordering, topological_ordering, event_id in txn:
|