diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/device.py | 10 | ||||
-rw-r--r-- | synapse/storage/databases/main/event_federation.py | 52 |
2 files changed, 52 insertions, 10 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9ded6389ac..d2063d4435 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -215,6 +215,16 @@ class DeviceWorkerHandler: possibly_changed = set(changed) possibly_left = set() for room_id in rooms_changed: + # Check if the forward extremities have changed. If not then we know + # the current state won't have changed, and so we can skip this room. + try: + if not await self.store.have_room_forward_extremities_changed_since( + room_id, stream_ordering + ): + continue + except errors.StoreError: + pass + current_state_ids = await self._state_storage.get_current_state_ids( room_id, await_full_state=False ) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index a19ba88bf8..9e6011e8ea 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1171,6 +1171,38 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas return int(min_depth) if min_depth is not None else None + async def have_room_forward_extremities_changed_since( + self, + room_id: str, + stream_ordering: int, + ) -> bool: + """Check if the forward extremities in a room have changed since the + given stream ordering + + Throws a StoreError if we have since purged the index for + stream_orderings from that point. + """ + + if stream_ordering <= self.stream_ordering_month_ago: # type: ignore[attr-defined] + raise StoreError(400, f"stream_ordering too old {stream_ordering}") + + sql = """ + SELECT 1 FROM stream_ordering_to_exterm + WHERE stream_ordering > ? AND room_id = ? + LIMIT 1 + """ + + def have_room_forward_extremities_changed_since_txn( + txn: LoggingTransaction, + ) -> bool: + txn.execute(sql, (stream_ordering, room_id)) + return txn.fetchone() is not None + + return await self.db_pool.runInteraction( + "have_room_forward_extremities_changed_since", + have_room_forward_extremities_changed_since_txn, + ) + @cancellable async def get_forward_extremities_for_room_at_stream_ordering( self, room_id: str, stream_ordering: int @@ -1232,10 +1264,17 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas txn.execute(sql, (stream_ordering, room_id)) return [event_id for event_id, in txn] - return await self.db_pool.runInteraction( + event_ids = await self.db_pool.runInteraction( "get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn ) + # If we didn't find any IDs, then we must have cleared out the + # associated `stream_ordering_to_exterm`. + if not event_ids: + raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,)) + + return event_ids + def _get_connected_batch_event_backfill_results_txn( self, txn: LoggingTransaction, insertion_event_id: str, limit: int ) -> List[BackfillQueueNavigationItem]: @@ -1664,19 +1703,12 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas @wrap_as_background_process("delete_old_forward_extrem_cache") async def _delete_old_forward_extrem_cache(self) -> None: def _delete_old_forward_extrem_cache_txn(txn: LoggingTransaction) -> None: - # Delete entries older than a month, while making sure we don't delete - # the only entries for a room. sql = """ DELETE FROM stream_ordering_to_exterm - WHERE - room_id IN ( - SELECT room_id - FROM stream_ordering_to_exterm - WHERE stream_ordering > ? - ) AND stream_ordering < ? + WHERE stream_ordering < ? """ txn.execute( - sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago) # type: ignore[attr-defined] + sql, (self.stream_ordering_month_ago) # type: ignore[attr-defined] ) await self.db_pool.runInteraction( |