diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index accde349a7..bce9aa7fb8 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -973,8 +973,18 @@ class EventsWorkerStore(SQLBaseStore):
return self._stream_id_gen.get_current_token()
def get_all_new_forward_event_rows(self, last_id, current_id, limit):
- if last_id == current_id:
- return defer.succeed([])
+ """Returns new events, for the Events replication stream
+
+ Args:
+ last_id: the last stream_id from the previous batch.
+ current_id: the maximum stream_id to return up to
+ limit: the maximum number of rows to return
+
+ Returns: Deferred[List[Tuple]]
+ a list of events stream rows. Each tuple consists of a stream id as
+ the first element, followed by fields suitable for casting into an
+ EventsStreamRow.
+ """
def get_all_new_forward_event_rows(txn):
sql = (
@@ -989,13 +999,26 @@ class EventsWorkerStore(SQLBaseStore):
" LIMIT ?"
)
txn.execute(sql, (last_id, current_id, limit))
- new_event_updates = txn.fetchall()
+ return txn.fetchall()
- if len(new_event_updates) == limit:
- upper_bound = new_event_updates[-1][0]
- else:
- upper_bound = current_id
+ return self.db.runInteraction(
+ "get_all_new_forward_event_rows", get_all_new_forward_event_rows
+ )
+
+ def get_ex_outlier_stream_rows(self, last_id, current_id):
+ """Returns de-outliered events, for the Events replication stream
+
+ Args:
+ last_id: the last stream_id from the previous batch.
+ current_id: the maximum stream_id to return up to
+ Returns: Deferred[List[Tuple]]
+ a list of events stream rows. Each tuple consists of a stream id as
+ the first element, followed by fields suitable for casting into an
+ EventsStreamRow.
+ """
+
+ def get_ex_outlier_stream_rows_txn(txn):
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts, relates_to_id"
@@ -1006,15 +1029,14 @@ class EventsWorkerStore(SQLBaseStore):
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
- " ORDER BY event_stream_ordering DESC"
+ " ORDER BY event_stream_ordering ASC"
)
- txn.execute(sql, (last_id, upper_bound))
- new_event_updates.extend(txn)
- return new_event_updates
+ txn.execute(sql, (last_id, current_id))
+ return txn.fetchall()
return self.db.runInteraction(
- "get_all_new_forward_event_rows", get_all_new_forward_event_rows
+ "get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
)
def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
@@ -1062,15 +1084,23 @@ class EventsWorkerStore(SQLBaseStore):
"get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
)
- def get_all_updated_current_state_deltas(self, from_token, to_token, limit):
+ def get_all_updated_current_state_deltas(
+ self, from_token: int, to_token: int, limit: Optional[int]
+ ):
def get_all_updated_current_state_deltas_txn(txn):
sql = """
SELECT stream_id, room_id, type, state_key, event_id
FROM current_state_delta_stream
WHERE ? < stream_id AND stream_id <= ?
- ORDER BY stream_id ASC LIMIT ?
+ ORDER BY stream_id ASC
"""
- txn.execute(sql, (from_token, to_token, limit))
+ params = [from_token, to_token]
+
+ if limit is not None:
+ sql += "LIMIT ?"
+ params.append(limit)
+
+ txn.execute(sql, params)
return txn.fetchall()
return self.db.runInteraction(
|