diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 7cdc9fe98f..d4104462b5 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -474,7 +474,7 @@ class EventsWorkerStore(SQLBaseStore):
return []
# there may be duplicates so we cast the list to a set
- event_entry_map = await self._get_events_from_cache_or_db(
+ event_entry_map = await self.get_unredacted_events_from_cache_or_db(
set(event_ids), allow_rejected=allow_rejected
)
@@ -509,7 +509,9 @@ class EventsWorkerStore(SQLBaseStore):
continue
redacted_event_id = entry.event.redacts
- event_map = await self._get_events_from_cache_or_db([redacted_event_id])
+ event_map = await self.get_unredacted_events_from_cache_or_db(
+ [redacted_event_id]
+ )
original_event_entry = event_map.get(redacted_event_id)
if not original_event_entry:
# we don't have the redacted event (or it was rejected).
@@ -588,11 +590,16 @@ class EventsWorkerStore(SQLBaseStore):
return events
@cancellable
- async def _get_events_from_cache_or_db(
- self, event_ids: Iterable[str], allow_rejected: bool = False
+ async def get_unredacted_events_from_cache_or_db(
+ self,
+ event_ids: Iterable[str],
+ allow_rejected: bool = False,
) -> Dict[str, EventCacheEntry]:
"""Fetch a bunch of events from the cache or the database.
+ Note that the events pulled by this function will not have any redactions
+ applied, and no guarantee is made about the ordering of the events returned.
+
If events are pulled from the database, they will be cached for future lookups.
Unknown events are omitted from the response.
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 530f04e149..ffeb2b3683 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1024,28 +1024,31 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"after": {"event_ids": events_after, "token": end_token},
}
- async def get_all_new_events_stream(
- self, from_id: int, current_id: int, limit: int, get_prev_content: bool = False
- ) -> Tuple[int, List[EventBase], Dict[str, Optional[int]]]:
+ async def get_all_new_event_ids_stream(
+ self,
+ from_id: int,
+ current_id: int,
+ limit: int,
+ ) -> Tuple[int, Dict[str, Optional[int]]]:
"""Get all new events
- Returns all events with from_id < stream_ordering <= current_id.
+ Returns all event ids with from_id < stream_ordering <= current_id.
Args:
from_id: the stream_ordering of the last event we processed
current_id: the stream_ordering of the most recently processed event
limit: the maximum number of events to return
- get_prev_content: whether to fetch previous event content
Returns:
- A tuple of (next_id, events, event_to_received_ts), where `next_id`
+ A tuple of (next_id, event_to_received_ts), where `next_id`
is the next value to pass as `from_id` (it will either be the
stream_ordering of the last returned event, or, if fewer than `limit`
events were found, the `current_id`). The `event_to_received_ts` is
- a dictionary mapping event ID to the event `received_ts`.
+ a dictionary mapping event ID to the event `received_ts`, sorted by ascending
+ stream_ordering.
"""
- def get_all_new_events_stream_txn(
+ def get_all_new_event_ids_stream_txn(
txn: LoggingTransaction,
) -> Tuple[int, Dict[str, Optional[int]]]:
sql = (
@@ -1070,15 +1073,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return upper_bound, event_to_received_ts
upper_bound, event_to_received_ts = await self.db_pool.runInteraction(
- "get_all_new_events_stream", get_all_new_events_stream_txn
- )
-
- events = await self.get_events_as_list(
- event_to_received_ts.keys(),
- get_prev_content=get_prev_content,
+ "get_all_new_event_ids_stream", get_all_new_event_ids_stream_txn
)
- return upper_bound, events, event_to_received_ts
+ return upper_bound, event_to_received_ts
async def get_federation_out_pos(self, typ: str) -> int:
if self._need_to_reset_federation_stream_positions:
|