summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/data_stores/main/events_worker.py46
1 files changed, 34 insertions, 12 deletions
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index accde349a7..ce8be72bfe 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -973,8 +973,18 @@ class EventsWorkerStore(SQLBaseStore):
         return self._stream_id_gen.get_current_token()
 
     def get_all_new_forward_event_rows(self, last_id, current_id, limit):
-        if last_id == current_id:
-            return defer.succeed([])
+        """Returns new events, for the Events replication stream
+
+        Args:
+            last_id: the last stream_id from the previous batch.
+            current_id: the maximum stream_id to return up to
+            limit: the maximum number of rows to return
+
+        Returns: Deferred[List[Tuple]]
+            a list of events stream rows. Each tuple consists of a stream id as
+            the first element, followed by fields suitable for casting into an
+            EventsStreamRow.
+        """
 
         def get_all_new_forward_event_rows(txn):
             sql = (
@@ -989,13 +999,26 @@ class EventsWorkerStore(SQLBaseStore):
                 " LIMIT ?"
             )
             txn.execute(sql, (last_id, current_id, limit))
-            new_event_updates = txn.fetchall()
+            return txn.fetchall()
 
-            if len(new_event_updates) == limit:
-                upper_bound = new_event_updates[-1][0]
-            else:
-                upper_bound = current_id
+        return self.db.runInteraction(
+            "get_all_new_forward_event_rows", get_all_new_forward_event_rows
+        )
+
+    def get_ex_outlier_stream_rows(self, last_id, current_id):
+        """Returns de-outliered events, for the Events replication stream
 
+        Args:
+            last_id: the last stream_id from the previous batch.
+            current_id: the maximum stream_id to return up to
+
+        Returns: Deferred[List[Tuple]]
+            a list of events stream rows. Each tuple consists of a stream id as
+            the first element, followed by fields suitable for casting into an
+            EventsStreamRow.
+        """
+
+        def get_ex_outlier_stream_rows_txn(txn):
             sql = (
                 "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
                 " state_key, redacts, relates_to_id"
@@ -1006,15 +1029,14 @@ class EventsWorkerStore(SQLBaseStore):
                 " LEFT JOIN event_relations USING (event_id)"
                 " WHERE ? < event_stream_ordering"
                 " AND event_stream_ordering <= ?"
-                " ORDER BY event_stream_ordering DESC"
+                " ORDER BY event_stream_ordering ASC"
             )
-            txn.execute(sql, (last_id, upper_bound))
-            new_event_updates.extend(txn)
 
-            return new_event_updates
+            txn.execute(sql, (last_id, current_id))
+            return txn.fetchall()
 
         return self.db.runInteraction(
-            "get_all_new_forward_event_rows", get_all_new_forward_event_rows
+            "get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
         )
 
     def get_all_new_backfill_event_rows(self, last_id, current_id, limit):