summary refs log tree commit diff
path: root/synapse/storage/databases/main/stream.py
diff options
context:
space:
mode:
authorShay <hillerys@element.io>2022-10-11 11:18:45 -0700
committerGitHub <noreply@github.com>2022-10-11 11:18:45 -0700
commita86b2f6837f0a067b0a014fbf5140e8773b8da2e (patch)
tree688048da772b1e5cd56070ce5fda8220af4c225f /synapse/storage/databases/main/stream.py
parentHandle `gottestfmt` repository move (#14144) (diff)
downloadsynapse-a86b2f6837f0a067b0a014fbf5140e8773b8da2e.tar.xz
Fix a bug where redactions were not being sent over federation if we did not have the original event. (#13813)
Diffstat (limited to 'synapse/storage/databases/main/stream.py')
-rw-r--r--synapse/storage/databases/main/stream.py28
1 files changed, 13 insertions, 15 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 530f04e149..ffeb2b3683 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1024,28 +1024,31 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             "after": {"event_ids": events_after, "token": end_token},
         }
 
-    async def get_all_new_events_stream(
-        self, from_id: int, current_id: int, limit: int, get_prev_content: bool = False
-    ) -> Tuple[int, List[EventBase], Dict[str, Optional[int]]]:
+    async def get_all_new_event_ids_stream(
+        self,
+        from_id: int,
+        current_id: int,
+        limit: int,
+    ) -> Tuple[int, Dict[str, Optional[int]]]:
         """Get all new events
 
-        Returns all events with from_id < stream_ordering <= current_id.
+        Returns all event ids with from_id < stream_ordering <= current_id.
 
         Args:
             from_id:  the stream_ordering of the last event we processed
             current_id:  the stream_ordering of the most recently processed event
             limit: the maximum number of events to return
-            get_prev_content: whether to fetch previous event content
 
         Returns:
-            A tuple of (next_id, events, event_to_received_ts), where `next_id`
+            A tuple of (next_id, event_to_received_ts), where `next_id`
             is the next value to pass as `from_id` (it will either be the
             stream_ordering of the last returned event, or, if fewer than `limit`
             events were found, the `current_id`). The `event_to_received_ts` is
-            a dictionary mapping event ID to the event `received_ts`.
+            a dictionary mapping event ID to the event `received_ts`, sorted by ascending
+            stream_ordering.
         """
 
-        def get_all_new_events_stream_txn(
+        def get_all_new_event_ids_stream_txn(
             txn: LoggingTransaction,
         ) -> Tuple[int, Dict[str, Optional[int]]]:
             sql = (
@@ -1070,15 +1073,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             return upper_bound, event_to_received_ts
 
         upper_bound, event_to_received_ts = await self.db_pool.runInteraction(
-            "get_all_new_events_stream", get_all_new_events_stream_txn
-        )
-
-        events = await self.get_events_as_list(
-            event_to_received_ts.keys(),
-            get_prev_content=get_prev_content,
+            "get_all_new_event_ids_stream", get_all_new_event_ids_stream_txn
         )
 
-        return upper_bound, events, event_to_received_ts
+        return upper_bound, event_to_received_ts
 
     async def get_federation_out_pos(self, typ: str) -> int:
         if self._need_to_reset_federation_stream_positions: