diff options
author | Brendan Abolivier <babolivier@matrix.org> | 2020-01-15 18:56:18 +0000 |
---|---|---|
committer | Brendan Abolivier <babolivier@matrix.org> | 2020-01-15 18:56:18 +0000 |
commit | 855af069a494f826ef941d722c811287b3fc4a8c (patch) | |
tree | 88a4f9218c5867dcb0a2da82066b345430c5b88f | |
parent | Process EDUs in parallel with PDUs. (#6697) (diff) | |
download | synapse-855af069a494f826ef941d722c811287b3fc4a8c.tar.xz |
Fix instantiation of message retention purge jobs
When figuring out which topological token to start a purge job at, we need to do the following: 1. Figure out a timestamp before which events will be purged 2. Select the first stream ordering after that timestamp 3. Select info about the first event after that stream ordering 4. Build a topological token from that info In some situations (e.g. quiet rooms with a short max_lifetime), there might not be an event after the stream ordering at step 3, therefore we abort the purge with the error `No event found`. To mitigate that, this patch fetches the first event _before_ the stream ordering, instead of after.
-rw-r--r-- | synapse/handlers/pagination.py | 2 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/stream.py | 59 |
2 files changed, 48 insertions, 13 deletions
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 00a6afc963..3ee6a091c5 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -156,7 +156,7 @@ class PaginationHandler(object): stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts) - r = yield self.store.get_room_event_after_stream_ordering( + r = yield self.store.get_room_event_before_stream_ordering( room_id, stream_ordering, ) if not r: diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 140da8dad6..223ce7fedb 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -536,20 +536,55 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Deferred[(int, int, str)]: (stream ordering, topological ordering, event_id) """ + return self.db.runInteraction( + "get_room_event_after_stream_ordering", + self.get_room_event_around_stream_ordering_txn, + room_id, stream_ordering, "f", + ) - def _f(txn): - sql = ( - "SELECT stream_ordering, topological_ordering, event_id" - " FROM events" - " WHERE room_id = ? AND stream_ordering >= ?" - " AND NOT outlier" - " ORDER BY stream_ordering" - " LIMIT 1" - ) - txn.execute(sql, (room_id, stream_ordering)) - return txn.fetchone() + def get_room_event_before_stream_ordering(self, room_id, stream_ordering): + """Gets details of the first event in a room at or before a stream ordering + + Args: + room_id (str): + stream_ordering (int): + + Returns: + Deferred[(int, int, str)]: + (stream ordering, topological ordering, event_id) + """ + return self.db.runInteraction( + "get_room_event_before_stream_ordering", + self.get_room_event_around_stream_ordering_txn, + room_id, stream_ordering, "f", + ) + + def get_room_event_around_stream_ordering_txn( + self, txn, room_id, stream_ordering, dir="f" + ): + """Gets details of the first event in a room at or either after or before a + stream ordering, depending on the provided direction. + + Args: + room_id (str): + stream_ordering (int): + dir (str): Direction in which we're looking towards in the room's history, + either "f" (forward) or "b" (backward). - return self.db.runInteraction("get_room_event_after_stream_ordering", _f) + Returns: + Deferred[(int, int, str)]: + (stream ordering, topological ordering, event_id) + """ + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering %s ?" + " AND NOT outlier" + " ORDER BY stream_ordering" + " LIMIT 1" + ) % ("<=" if dir == "b" else ">=",) + txn.execute(sql, (room_id, stream_ordering)) + return txn.fetchone() @defer.inlineCallbacks def get_room_events_max_id(self, room_id=None): |