diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index e33a8cfe97..1a59e0b5a8 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1264,12 +1264,76 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return None
+ async def get_last_event_pos_in_room(
+ self,
+ room_id: str,
+ event_types: Optional[StrCollection] = None,
+ ) -> Optional[Tuple[str, PersistedEventPosition]]:
+ """
+ Returns the ID and event position of the last event in a room.
+
+ Based on `get_last_event_pos_in_room_before_stream_ordering(...)`
+
+ Args:
+ room_id
+ event_types: Optional allowlist of event types to filter by
+
+ Returns:
+ The ID of the most recent event and it's position, or None if there are no
+ events in the room that match the given event types.
+ """
+
+ def _get_last_event_pos_in_room_txn(
+ txn: LoggingTransaction,
+ ) -> Optional[Tuple[str, PersistedEventPosition]]:
+ event_type_clause = ""
+ event_type_args: List[str] = []
+ if event_types is not None and len(event_types) > 0:
+ event_type_clause, event_type_args = make_in_list_sql_clause(
+ txn.database_engine, "type", event_types
+ )
+ event_type_clause = f"AND {event_type_clause}"
+
+ sql = f"""
+ SELECT event_id, stream_ordering, instance_name
+ FROM events
+ LEFT JOIN rejections USING (event_id)
+ WHERE room_id = ?
+ {event_type_clause}
+ AND NOT outlier
+ AND rejections.event_id IS NULL
+ ORDER BY stream_ordering DESC
+ LIMIT 1
+ """
+
+ txn.execute(
+ sql,
+ [room_id] + event_type_args,
+ )
+
+ row = cast(Optional[Tuple[str, int, str]], txn.fetchone())
+ if row is not None:
+ event_id, stream_ordering, instance_name = row
+
+ return event_id, PersistedEventPosition(
+ # If instance_name is null we default to "master"
+ instance_name or "master",
+ stream_ordering,
+ )
+
+ return None
+
+ return await self.db_pool.runInteraction(
+ "get_last_event_pos_in_room",
+ _get_last_event_pos_in_room_txn,
+ )
+
@trace
async def get_last_event_pos_in_room_before_stream_ordering(
self,
room_id: str,
end_token: RoomStreamToken,
- event_types: Optional[Collection[str]] = None,
+ event_types: Optional[StrCollection] = None,
) -> Optional[Tuple[str, PersistedEventPosition]]:
"""
Returns the ID and event position of the last event in a room at or before a
|