diff --git a/changelog.d/14873.bugfix b/changelog.d/14873.bugfix
new file mode 100644
index 0000000000..9b058576cd
--- /dev/null
+++ b/changelog.d/14873.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where the `populate_room_stats` background job could fail on broken rooms.
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 24127d0364..f42af34a2f 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -110,6 +110,10 @@ event_fetch_ongoing_gauge = Gauge(
)
+class InvalidEventError(Exception):
+ """The event retrieved from the database is invalid and cannot be used."""
+
+
@attr.s(slots=True, auto_attribs=True)
class EventCacheEntry:
event: EventBase
@@ -1310,7 +1314,7 @@ class EventsWorkerStore(SQLBaseStore):
# invites, so just accept it for all membership events.
#
if d["type"] != EventTypes.Member:
- raise Exception(
+ raise InvalidEventError(
"Room %s for event %s is unknown" % (d["room_id"], event_id)
)
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 356d4ca788..0c1cbd540d 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -29,6 +29,7 @@ from synapse.storage.database import (
LoggingDatabaseConnection,
LoggingTransaction,
)
+from synapse.storage.databases.main.events_worker import InvalidEventError
from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.types import JsonDict
from synapse.util.caches.descriptors import cached
@@ -554,7 +555,17 @@ class StatsStore(StateDeltasStore):
"get_initial_state_for_room", _fetch_current_state_stats
)
- state_event_map = await self.get_events(event_ids, get_prev_content=False) # type: ignore[attr-defined]
+ try:
+ state_event_map = await self.get_events(event_ids, get_prev_content=False) # type: ignore[attr-defined]
+ except InvalidEventError as e:
+ # If an exception occurs fetching events then the room is broken;
+ # skip process it to avoid being stuck on a room.
+ logger.warning(
+ "Failed to fetch events for room %s, skipping stats calculation: %r.",
+ room_id,
+ e,
+ )
+ return
room_state: Dict[str, Union[None, bool, str]] = {
"join_rules": None,
diff --git a/tests/storage/databases/main/test_room.py b/tests/storage/databases/main/test_room.py
index 7d961fac64..3108ca3444 100644
--- a/tests/storage/databases/main/test_room.py
+++ b/tests/storage/databases/main/test_room.py
@@ -40,9 +40,23 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
self.token = self.login("foo", "pass")
def _generate_room(self) -> str:
- room_id = self.helper.create_room_as(self.user_id, tok=self.token)
+ """Create a room and return the room ID."""
+ return self.helper.create_room_as(self.user_id, tok=self.token)
- return room_id
+ def run_background_updates(self, update_name: str) -> None:
+ """Insert and run the background update."""
+ self.get_success(
+ self.store.db_pool.simple_insert(
+ "background_updates",
+ {"update_name": update_name, "progress_json": "{}"},
+ )
+ )
+
+ # ... and tell the DataStore that it hasn't finished all updates yet
+ self.store.db_pool.updates._all_done = False
+
+ # Now let's actually drive the updates to completion
+ self.wait_for_background_updates()
def test_background_populate_rooms_creator_column(self) -> None:
"""Test that the background update to populate the rooms creator column
@@ -71,22 +85,7 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
)
self.assertEqual(room_creator_before, None)
- # Insert and run the background update.
- self.get_success(
- self.store.db_pool.simple_insert(
- "background_updates",
- {
- "update_name": _BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN,
- "progress_json": "{}",
- },
- )
- )
-
- # ... and tell the DataStore that it hasn't finished all updates yet
- self.store.db_pool.updates._all_done = False
-
- # Now let's actually drive the updates to completion
- self.wait_for_background_updates()
+ self.run_background_updates(_BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN)
# Make sure the background update filled in the room creator
room_creator_after = self.get_success(
@@ -137,22 +136,7 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
)
)
- # Insert and run the background update
- self.get_success(
- self.store.db_pool.simple_insert(
- "background_updates",
- {
- "update_name": _BackgroundUpdates.ADD_ROOM_TYPE_COLUMN,
- "progress_json": "{}",
- },
- )
- )
-
- # ... and tell the DataStore that it hasn't finished all updates yet
- self.store.db_pool.updates._all_done = False
-
- # Now let's actually drive the updates to completion
- self.wait_for_background_updates()
+ self.run_background_updates(_BackgroundUpdates.ADD_ROOM_TYPE_COLUMN)
# Make sure the background update filled in the room type
room_type_after = self.get_success(
@@ -164,3 +148,39 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
)
)
self.assertEqual(room_type_after, RoomTypes.SPACE)
+
+ def test_populate_stats_broken_rooms(self) -> None:
+ """Ensure that re-populating room stats skips broken rooms."""
+
+ # Create a good room.
+ good_room_id = self._generate_room()
+
+ # Create a room and then break it by having no room version.
+ room_id = self._generate_room()
+ self.get_success(
+ self.store.db_pool.simple_update(
+ table="rooms",
+ keyvalues={"room_id": room_id},
+ updatevalues={"room_version": None},
+ desc="test",
+ )
+ )
+
+ # Nuke any current stats in the database.
+ self.get_success(
+ self.store.db_pool.simple_delete(
+ table="room_stats_state", keyvalues={"1": 1}, desc="test"
+ )
+ )
+
+ self.run_background_updates("populate_stats_process_rooms")
+
+ # Only the good room appears in the stats tables.
+ results = self.get_success(
+ self.store.db_pool.simple_select_onecol(
+ table="room_stats_state",
+ keyvalues={},
+ retcol="room_id",
+ )
+ )
+ self.assertEqual(results, [good_room_id])
|