From 855af069a494f826ef941d722c811287b3fc4a8c Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 15 Jan 2020 18:56:18 +0000 Subject: Fix instantiation of message retention purge jobs When figuring out which topological token to start a purge job at, we need to do the following: 1. Figure out a timestamp before which events will be purged 2. Select the first stream ordering after that timestamp 3. Select info about the first event after that stream ordering 4. Build a topological token from that info In some situations (e.g. quiet rooms with a short max_lifetime), there might not be an event after the stream ordering at step 3, therefore we abort the purge with the error `No event found`. To mitigate that, this patch fetches the first event _before_ the stream ordering, instead of after. --- synapse/handlers/pagination.py | 2 +- synapse/storage/data_stores/main/stream.py | 59 ++++++++++++++++++++++++------ 2 files changed, 48 insertions(+), 13 deletions(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 00a6afc963..3ee6a091c5 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -156,7 +156,7 @@ class PaginationHandler(object): stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts) - r = yield self.store.get_room_event_after_stream_ordering( + r = yield self.store.get_room_event_before_stream_ordering( room_id, stream_ordering, ) if not r: diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 140da8dad6..223ce7fedb 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -536,20 +536,55 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Deferred[(int, int, str)]: (stream ordering, topological ordering, event_id) """ + return self.db.runInteraction( + "get_room_event_after_stream_ordering", + self.get_room_event_around_stream_ordering_txn, + room_id, stream_ordering, "f", + ) - def _f(txn): - sql = ( - "SELECT stream_ordering, topological_ordering, event_id" - " FROM events" - " WHERE room_id = ? AND stream_ordering >= ?" - " AND NOT outlier" - " ORDER BY stream_ordering" - " LIMIT 1" - ) - txn.execute(sql, (room_id, stream_ordering)) - return txn.fetchone() + def get_room_event_before_stream_ordering(self, room_id, stream_ordering): + """Gets details of the first event in a room at or before a stream ordering + + Args: + room_id (str): + stream_ordering (int): + + Returns: + Deferred[(int, int, str)]: + (stream ordering, topological ordering, event_id) + """ + return self.db.runInteraction( + "get_room_event_before_stream_ordering", + self.get_room_event_around_stream_ordering_txn, + room_id, stream_ordering, "f", + ) + + def get_room_event_around_stream_ordering_txn( + self, txn, room_id, stream_ordering, dir="f" + ): + """Gets details of the first event in a room at or either after or before a + stream ordering, depending on the provided direction. + + Args: + room_id (str): + stream_ordering (int): + dir (str): Direction in which we're looking towards in the room's history, + either "f" (forward) or "b" (backward). - return self.db.runInteraction("get_room_event_after_stream_ordering", _f) + Returns: + Deferred[(int, int, str)]: + (stream ordering, topological ordering, event_id) + """ + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering %s ?" + " AND NOT outlier" + " ORDER BY stream_ordering" + " LIMIT 1" + ) % ("<=" if dir == "b" else ">=",) + txn.execute(sql, (room_id, stream_ordering)) + return txn.fetchone() @defer.inlineCallbacks def get_room_events_max_id(self, room_id=None): -- cgit 1.4.1 From 83635882379ecddb1509ea3d071eefdedefb647e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 15 Jan 2020 19:13:22 +0000 Subject: Fix typo --- synapse/storage/data_stores/main/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 223ce7fedb..9fa5e1f203 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -556,7 +556,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return self.db.runInteraction( "get_room_event_before_stream_ordering", self.get_room_event_around_stream_ordering_txn, - room_id, stream_ordering, "f", + room_id, stream_ordering, "b", ) def get_room_event_around_stream_ordering_txn( -- cgit 1.4.1 From 066b9f52b80c172eec6074ca01fb24670200fd80 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 15 Jan 2020 19:32:47 +0000 Subject: Correctly order when selecting before stream ordering --- synapse/storage/data_stores/main/stream.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 9fa5e1f203..451f38296b 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -580,9 +580,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): " FROM events" " WHERE room_id = ? AND stream_ordering %s ?" " AND NOT outlier" - " ORDER BY stream_ordering" + " ORDER BY stream_ordering %s" " LIMIT 1" - ) % ("<=" if dir == "b" else ">=",) + ) % ( + "<=" if dir == "b" else ">=", + "DESC" if dir == "b" else "ASC", + ) txn.execute(sql, (room_id, stream_ordering)) return txn.fetchone() -- cgit 1.4.1 From 914e73cdd9053d6fd050e5ad04910db74a7b5cd9 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 15 Jan 2020 19:36:19 +0000 Subject: Changelog --- changelog.d/6713.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6713.bugfix diff --git a/changelog.d/6713.bugfix b/changelog.d/6713.bugfix new file mode 100644 index 0000000000..3924f1ad79 --- /dev/null +++ b/changelog.d/6713.bugfix @@ -0,0 +1 @@ +Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies. -- cgit 1.4.1 From 48e57a6452be3fef4372832f9e8f8f630325a648 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 15 Jan 2020 19:40:46 +0000 Subject: Rename changelog --- changelog.d/6713.bugfix | 1 - changelog.d/6714.bugfix | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) delete mode 100644 changelog.d/6713.bugfix create mode 100644 changelog.d/6714.bugfix diff --git a/changelog.d/6713.bugfix b/changelog.d/6713.bugfix deleted file mode 100644 index 3924f1ad79..0000000000 --- a/changelog.d/6713.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies. diff --git a/changelog.d/6714.bugfix b/changelog.d/6714.bugfix new file mode 100644 index 0000000000..3924f1ad79 --- /dev/null +++ b/changelog.d/6714.bugfix @@ -0,0 +1 @@ +Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies. -- cgit 1.4.1 From e601f35d3b562495b2f8b071bd4c812fd783d6a7 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 09:55:11 +0000 Subject: Lint --- synapse/storage/data_stores/main/stream.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 451f38296b..652cecd59b 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -539,7 +539,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return self.db.runInteraction( "get_room_event_after_stream_ordering", self.get_room_event_around_stream_ordering_txn, - room_id, stream_ordering, "f", + room_id, + stream_ordering, + "f", ) def get_room_event_before_stream_ordering(self, room_id, stream_ordering): @@ -556,7 +558,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return self.db.runInteraction( "get_room_event_before_stream_ordering", self.get_room_event_around_stream_ordering_txn, - room_id, stream_ordering, "b", + room_id, + stream_ordering, + "b", ) def get_room_event_around_stream_ordering_txn( @@ -575,6 +579,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Deferred[(int, int, str)]: (stream ordering, topological ordering, event_id) """ + # Figure out which comparison operation to perform and how to order the results, + # using the provided direction. + op = "<=" if dir == "b" else ">=" + order = "DESC" if dir == "b" else "ASC" + sql = ( "SELECT stream_ordering, topological_ordering, event_id" " FROM events" @@ -582,10 +591,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): " AND NOT outlier" " ORDER BY stream_ordering %s" " LIMIT 1" - ) % ( - "<=" if dir == "b" else ">=", - "DESC" if dir == "b" else "ASC", - ) + ) % (op, order) txn.execute(sql, (room_id, stream_ordering)) return txn.fetchone() -- cgit 1.4.1 From 842c2cfbf1e9f3e0d9251fa0c572eba9d6af6dbe Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 20:24:17 +0000 Subject: Remove get_room_event_after_stream_ordering entirely --- synapse/rest/admin/__init__.py | 2 +- synapse/storage/data_stores/main/stream.py | 69 ++++++------------------------ 2 files changed, 13 insertions(+), 58 deletions(-) diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index a10b4a9b72..2932fe2123 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -107,7 +107,7 @@ class PurgeHistoryRestServlet(RestServlet): stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts) - r = await self.store.get_room_event_after_stream_ordering( + r = await self.store.get_room_event_before_stream_ordering( room_id, stream_ordering ) if not r: diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 652cecd59b..a20c3d1012 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -525,25 +525,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return rows, token - def get_room_event_after_stream_ordering(self, room_id, stream_ordering): - """Gets details of the first event in a room at or after a stream ordering - - Args: - room_id (str): - stream_ordering (int): - - Returns: - Deferred[(int, int, str)]: - (stream ordering, topological ordering, event_id) - """ - return self.db.runInteraction( - "get_room_event_after_stream_ordering", - self.get_room_event_around_stream_ordering_txn, - room_id, - stream_ordering, - "f", - ) - def get_room_event_before_stream_ordering(self, room_id, stream_ordering): """Gets details of the first event in a room at or before a stream ordering @@ -555,45 +536,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Deferred[(int, int, str)]: (stream ordering, topological ordering, event_id) """ - return self.db.runInteraction( - "get_room_event_before_stream_ordering", - self.get_room_event_around_stream_ordering_txn, - room_id, - stream_ordering, - "b", - ) - - def get_room_event_around_stream_ordering_txn( - self, txn, room_id, stream_ordering, dir="f" - ): - """Gets details of the first event in a room at or either after or before a - stream ordering, depending on the provided direction. - - Args: - room_id (str): - stream_ordering (int): - dir (str): Direction in which we're looking towards in the room's history, - either "f" (forward) or "b" (backward). - - Returns: - Deferred[(int, int, str)]: - (stream ordering, topological ordering, event_id) - """ - # Figure out which comparison operation to perform and how to order the results, - # using the provided direction. - op = "<=" if dir == "b" else ">=" - order = "DESC" if dir == "b" else "ASC" + def _f(txn): + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering <= ?" + " AND NOT outlier" + " ORDER BY stream_ordering DESC" + " LIMIT 1" + ) + txn.execute(sql, (room_id, stream_ordering)) + return txn.fetchone() - sql = ( - "SELECT stream_ordering, topological_ordering, event_id" - " FROM events" - " WHERE room_id = ? AND stream_ordering %s ?" - " AND NOT outlier" - " ORDER BY stream_ordering %s" - " LIMIT 1" - ) % (op, order) - txn.execute(sql, (room_id, stream_ordering)) - return txn.fetchone() + return self.db.runInteraction("get_room_event_before_stream_ordering", _f) @defer.inlineCallbacks def get_room_events_max_id(self, room_id=None): -- cgit 1.4.1 From dac148341ba2638cc9486cf0b00005932dab939d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 20:25:09 +0000 Subject: Fixup diff --- synapse/storage/data_stores/main/stream.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index a20c3d1012..056b25b13a 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -536,14 +536,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Deferred[(int, int, str)]: (stream ordering, topological ordering, event_id) """ + def _f(txn): sql = ( - "SELECT stream_ordering, topological_ordering, event_id" - " FROM events" - " WHERE room_id = ? AND stream_ordering <= ?" - " AND NOT outlier" - " ORDER BY stream_ordering DESC" - " LIMIT 1" + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering <= ?" + " AND NOT outlier" + " ORDER BY stream_ordering DESC" + " LIMIT 1" ) txn.execute(sql, (room_id, stream_ordering)) return txn.fetchone() -- cgit 1.4.1 From 4fb3cb208a17ba36a5da050b19e3997cf4808f9a Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 16 Jan 2020 20:27:07 +0000 Subject: Precise changelog --- changelog.d/6714.bugfix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/6714.bugfix b/changelog.d/6714.bugfix index 3924f1ad79..410516694f 100644 --- a/changelog.d/6714.bugfix +++ b/changelog.d/6714.bugfix @@ -1 +1 @@ -Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies. +Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies when running the automated purge jobs. -- cgit 1.4.1