diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 24127d0364..f42af34a2f 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -110,6 +110,10 @@ event_fetch_ongoing_gauge = Gauge(
)
+class InvalidEventError(Exception):
+ """The event retrieved from the database is invalid and cannot be used."""
+
+
@attr.s(slots=True, auto_attribs=True)
class EventCacheEntry:
event: EventBase
@@ -1310,7 +1314,7 @@ class EventsWorkerStore(SQLBaseStore):
# invites, so just accept it for all membership events.
#
if d["type"] != EventTypes.Member:
- raise Exception(
+ raise InvalidEventError(
"Room %s for event %s is unknown" % (d["room_id"], event_id)
)
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 356d4ca788..0c1cbd540d 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -29,6 +29,7 @@ from synapse.storage.database import (
LoggingDatabaseConnection,
LoggingTransaction,
)
+from synapse.storage.databases.main.events_worker import InvalidEventError
from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.types import JsonDict
from synapse.util.caches.descriptors import cached
@@ -554,7 +555,17 @@ class StatsStore(StateDeltasStore):
"get_initial_state_for_room", _fetch_current_state_stats
)
- state_event_map = await self.get_events(event_ids, get_prev_content=False) # type: ignore[attr-defined]
+ try:
+ state_event_map = await self.get_events(event_ids, get_prev_content=False) # type: ignore[attr-defined]
+ except InvalidEventError as e:
+ # If an exception occurs fetching events then the room is broken;
+ # skip process it to avoid being stuck on a room.
+ logger.warning(
+ "Failed to fetch events for room %s, skipping stats calculation: %r.",
+ room_id,
+ e,
+ )
+ return
room_state: Dict[str, Union[None, bool, str]] = {
"join_rules": None,
|