diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 59b4cf1e53..765b5a5bcb 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -348,7 +348,14 @@ class EventFederationStore(SQLBaseStore):
# We want to make the cache more effective, so we clamp to the last
# change before the given ordering.
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)
- stream_ordering = min(last_change, stream_ordering)
+
+ # We don't always have a full stream_to_exterm_id table, e.g. after
+ # the upgrade that introduced it, so we make sure we never ask for a
+ # try and pin to a stream_ordering from before a restart
+ last_change = max(self._stream_order_on_start, last_change)
+
+ if last_change > self.stream_ordering_month_ago:
+ stream_ordering = min(last_change, stream_ordering)
return self._get_forward_extremeties_for_room(room_id, stream_ordering)
@@ -386,9 +393,19 @@ class EventFederationStore(SQLBaseStore):
def _delete_old_forward_extrem_cache(self):
def _delete_old_forward_extrem_cache_txn(txn):
+ sql = ("""
+ DELETE FROM stream_ordering_to_exterm
+ WHERE
+ (
+ SELECT max(stream_ordering) AS stream_ordering
+ FROM stream_ordering_to_exterm
+ WHERE room_id = stream_ordering_to_exterm.room_id
+ ) > ?
+ AND stream_ordering < ?
+ """)
txn.execute(
- "DELETE FROM stream_ordering_to_exterm WHERE stream_ordering < ?",
- (self.stream_ordering_month_ago,)
+ sql,
+ (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
)
return self.runInteraction(
"_delete_old_forward_extrem_cache",
|