summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2020-01-15 18:56:18 +0000
committerBrendan Abolivier <babolivier@matrix.org>2020-01-15 18:56:18 +0000
commit855af069a494f826ef941d722c811287b3fc4a8c (patch)
tree88a4f9218c5867dcb0a2da82066b345430c5b88f /synapse
parentProcess EDUs in parallel with PDUs. (#6697) (diff)
downloadsynapse-855af069a494f826ef941d722c811287b3fc4a8c.tar.xz
Fix instantiation of message retention purge jobs
When figuring out which topological token to start a purge job at, we
need to do the following:

1. Figure out a timestamp before which events will be purged
2. Select the first stream ordering after that timestamp
3. Select info about the first event after that stream ordering
4. Build a topological token from that info

In some situations (e.g. quiet rooms with a short max_lifetime), there
might not be an event after the stream ordering at step 3, therefore we
abort the purge with the error `No event found`. To mitigate that, this
patch fetches the first event _before_ the stream ordering, instead of
after.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/pagination.py2
-rw-r--r--synapse/storage/data_stores/main/stream.py59
2 files changed, 48 insertions, 13 deletions
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 00a6afc963..3ee6a091c5 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -156,7 +156,7 @@ class PaginationHandler(object):
 
             stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
 
-            r = yield self.store.get_room_event_after_stream_ordering(
+            r = yield self.store.get_room_event_before_stream_ordering(
                 room_id, stream_ordering,
             )
             if not r:
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index 140da8dad6..223ce7fedb 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -536,20 +536,55 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             Deferred[(int, int, str)]:
                 (stream ordering, topological ordering, event_id)
         """
+        return self.db.runInteraction(
+            "get_room_event_after_stream_ordering",
+            self.get_room_event_around_stream_ordering_txn,
+            room_id, stream_ordering, "f",
+        )
 
-        def _f(txn):
-            sql = (
-                "SELECT stream_ordering, topological_ordering, event_id"
-                " FROM events"
-                " WHERE room_id = ? AND stream_ordering >= ?"
-                " AND NOT outlier"
-                " ORDER BY stream_ordering"
-                " LIMIT 1"
-            )
-            txn.execute(sql, (room_id, stream_ordering))
-            return txn.fetchone()
+    def get_room_event_before_stream_ordering(self, room_id, stream_ordering):
+        """Gets details of the first event in a room at or before a stream ordering
+
+        Args:
+            room_id (str):
+            stream_ordering (int):
+
+        Returns:
+            Deferred[(int, int, str)]:
+                (stream ordering, topological ordering, event_id)
+        """
+        return self.db.runInteraction(
+            "get_room_event_before_stream_ordering",
+            self.get_room_event_around_stream_ordering_txn,
+            room_id, stream_ordering, "f",
+        )
+
+    def get_room_event_around_stream_ordering_txn(
+        self, txn, room_id, stream_ordering, dir="f"
+    ):
+        """Gets details of the first event in a room at or either after or before a
+        stream ordering, depending on the provided direction.
+
+        Args:
+            room_id (str):
+            stream_ordering (int):
+            dir (str): Direction in which we're looking towards in the room's history,
+                either "f" (forward) or "b" (backward).
 
-        return self.db.runInteraction("get_room_event_after_stream_ordering", _f)
+        Returns:
+            Deferred[(int, int, str)]:
+                (stream ordering, topological ordering, event_id)
+        """
+        sql = (
+            "SELECT stream_ordering, topological_ordering, event_id"
+            " FROM events"
+            " WHERE room_id = ? AND stream_ordering %s ?"
+            " AND NOT outlier"
+            " ORDER BY stream_ordering"
+            " LIMIT 1"
+        ) % ("<=" if dir == "b" else ">=",)
+        txn.execute(sql, (room_id, stream_ordering))
+        return txn.fetchone()
 
     @defer.inlineCallbacks
     def get_room_events_max_id(self, room_id=None):