summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/stream.py65
1 files changed, 59 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index cc27ec3804..63d8350530 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -801,13 +801,66 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             before this stream ordering.
         """
 
-        last_row = await self.get_room_event_before_stream_ordering(
-            room_id=room_id,
-            stream_ordering=end_token.stream,
+        def get_last_event_in_room_before_stream_ordering_txn(
+            txn: LoggingTransaction,
+        ) -> Optional[str]:
+            # We need to handle the fact that the stream tokens can be vector
+            # clocks. We do this by getting all rows between the minimum and
+            # maximum stream ordering in the token, plus one row less than the
+            # minimum stream ordering. We then filter the results against the
+            # token and return the first row that matches.
+
+            sql = """
+                SELECT * FROM (
+                    SELECT instance_name, stream_ordering, topological_ordering, event_id
+                    FROM events
+                    LEFT JOIN rejections USING (event_id)
+                    WHERE room_id = ?
+                        AND ? < stream_ordering AND stream_ordering <= ?
+                        AND NOT outlier
+                        AND rejections.event_id IS NULL
+                    ORDER BY stream_ordering DESC
+                ) AS a
+                UNION
+                SELECT * FROM (
+                    SELECT instance_name, stream_ordering, topological_ordering, event_id
+                    FROM events
+                    LEFT JOIN rejections USING (event_id)
+                    WHERE room_id = ?
+                        AND stream_ordering <= ?
+                        AND NOT outlier
+                        AND rejections.event_id IS NULL
+                    ORDER BY stream_ordering DESC
+                    LIMIT 1
+                ) AS b
+            """
+            txn.execute(
+                sql,
+                (
+                    room_id,
+                    end_token.stream,
+                    end_token.get_max_stream_pos(),
+                    room_id,
+                    end_token.stream,
+                ),
+            )
+
+            for instance_name, stream_ordering, topological_ordering, event_id in txn:
+                if _filter_results(
+                    lower_token=None,
+                    upper_token=end_token,
+                    instance_name=instance_name,
+                    topological_ordering=topological_ordering,
+                    stream_ordering=stream_ordering,
+                ):
+                    return event_id
+
+            return None
+
+        return await self.db_pool.runInteraction(
+            "get_last_event_in_room_before_stream_ordering",
+            get_last_event_in_room_before_stream_ordering_txn,
         )
-        if last_row:
-            return last_row[2]
-        return None
 
     async def get_current_room_stream_token_for_room_id(
         self, room_id: str