From ce428a1abe6aae25e236baf268f56b1811cba333 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 23 Apr 2020 18:19:08 +0100 Subject: Fix EventsStream raising assertions when it falls behind Figuring out how to correctly limit updates from this stream without dropping entries is far more complicated than just counting the number of rows being returned. We need to consider each query separately and, if any one query hits the limit, truncate the results from the others. I think this also fixes some potentially long-standing bugs where events or state changes could get missed if we hit the limit on either query. --- synapse/storage/data_stores/main/events_worker.py | 46 +++++++++++++++++------ 1 file changed, 34 insertions(+), 12 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index accde349a7..ce8be72bfe 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -973,8 +973,18 @@ class EventsWorkerStore(SQLBaseStore): return self._stream_id_gen.get_current_token() def get_all_new_forward_event_rows(self, last_id, current_id, limit): - if last_id == current_id: - return defer.succeed([]) + """Returns new events, for the Events replication stream + + Args: + last_id: the last stream_id from the previous batch. + current_id: the maximum stream_id to return up to + limit: the maximum number of rows to return + + Returns: Deferred[List[Tuple]] + a list of events stream rows. Each tuple consists of a stream id as + the first element, followed by fields suitable for casting into an + EventsStreamRow. + """ def get_all_new_forward_event_rows(txn): sql = ( @@ -989,13 +999,26 @@ class EventsWorkerStore(SQLBaseStore): " LIMIT ?" ) txn.execute(sql, (last_id, current_id, limit)) - new_event_updates = txn.fetchall() + return txn.fetchall() - if len(new_event_updates) == limit: - upper_bound = new_event_updates[-1][0] - else: - upper_bound = current_id + return self.db.runInteraction( + "get_all_new_forward_event_rows", get_all_new_forward_event_rows + ) + + def get_ex_outlier_stream_rows(self, last_id, current_id): + """Returns de-outliered events, for the Events replication stream + Args: + last_id: the last stream_id from the previous batch. + current_id: the maximum stream_id to return up to + + Returns: Deferred[List[Tuple]] + a list of events stream rows. Each tuple consists of a stream id as + the first element, followed by fields suitable for casting into an + EventsStreamRow. + """ + + def get_ex_outlier_stream_rows_txn(txn): sql = ( "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," " state_key, redacts, relates_to_id" @@ -1006,15 +1029,14 @@ class EventsWorkerStore(SQLBaseStore): " LEFT JOIN event_relations USING (event_id)" " WHERE ? < event_stream_ordering" " AND event_stream_ordering <= ?" - " ORDER BY event_stream_ordering DESC" + " ORDER BY event_stream_ordering ASC" ) - txn.execute(sql, (last_id, upper_bound)) - new_event_updates.extend(txn) - return new_event_updates + txn.execute(sql, (last_id, current_id)) + return txn.fetchall() return self.db.runInteraction( - "get_all_new_forward_event_rows", get_all_new_forward_event_rows + "get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn ) def get_all_new_backfill_event_rows(self, last_id, current_id, limit): -- cgit 1.5.1