diff options
-rw-r--r-- | changelog.d/12620.misc | 1 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 12 | ||||
-rw-r--r-- | tests/storage/databases/main/test_events_worker.py | 59 |
3 files changed, 50 insertions, 22 deletions
diff --git a/changelog.d/12620.misc b/changelog.d/12620.misc new file mode 100644 index 0000000000..63f8e540c3 --- /dev/null +++ b/changelog.d/12620.misc @@ -0,0 +1 @@ +Add a consistency check on events which we read from the database. diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index c31fc00eaa..0a48e5d29f 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1094,6 +1094,18 @@ class EventsWorkerStore(SQLBaseStore): original_ev.internal_metadata.stream_ordering = row.stream_ordering original_ev.internal_metadata.outlier = row.outlier + # Consistency check: if the content of the event has been modified in the + # database, then the calculated event ID will not match the event id in the + # database. + if original_ev.event_id != event_id: + # it's difficult to see what to do here. Pretty much all bets are off + # if Synapse cannot rely on the consistency of its database. + raise RuntimeError( + f"Database corruption: Event {event_id} in room {d['room_id']} " + f"from the database appears to have been modified (calculated " + f"event id {original_ev.event_id})" + ) + event_map[event_id] = original_ev # finally, we can decide whether each one needs redacting, and build diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index bf6374f93d..c237a8c7e2 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -13,7 +13,7 @@ # limitations under the License. import json from contextlib import contextmanager -from typing import Generator, Tuple +from typing import Generator, List, Tuple from unittest import mock from twisted.enterprise.adbapi import ConnectionPool @@ -21,6 +21,7 @@ from twisted.internet.defer import CancelledError, Deferred, ensureDeferred from twisted.test.proto_helpers import MemoryReactor from synapse.api.room_versions import EventFormatVersions, RoomVersions +from synapse.events import make_event_from_dict from synapse.logging.context import LoggingContext from synapse.rest import admin from synapse.rest.client import login, room @@ -49,23 +50,28 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase): ) ) - for idx, (rid, eid) in enumerate( + self.event_ids: List[str] = [] + for idx, rid in enumerate( ( - ("room1", "event10"), - ("room1", "event11"), - ("room1", "event12"), - ("room2", "event20"), + "room1", + "room1", + "room1", + "room2", ) ): + event_json = {"type": f"test {idx}", "room_id": rid} + event = make_event_from_dict(event_json, room_version=RoomVersions.V4) + event_id = event.event_id + self.get_success( self.store.db_pool.simple_insert( "events", { - "event_id": eid, + "event_id": event_id, "room_id": rid, "topological_ordering": idx, "stream_ordering": idx, - "type": "test", + "type": event.type, "processed": True, "outlier": False, }, @@ -75,21 +81,22 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase): self.store.db_pool.simple_insert( "event_json", { - "event_id": eid, + "event_id": event_id, "room_id": rid, - "json": json.dumps({"type": "test", "room_id": rid}), + "json": json.dumps(event_json), "internal_metadata": "{}", "format_version": 3, }, ) ) + self.event_ids.append(event_id) def test_simple(self): with LoggingContext(name="test") as ctx: res = self.get_success( - self.store.have_seen_events("room1", ["event10", "event19"]) + self.store.have_seen_events("room1", [self.event_ids[0], "event19"]) ) - self.assertEqual(res, {"event10"}) + self.assertEqual(res, {self.event_ids[0]}) # that should result in a single db query self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) @@ -97,19 +104,21 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase): # a second lookup of the same events should cause no queries with LoggingContext(name="test") as ctx: res = self.get_success( - self.store.have_seen_events("room1", ["event10", "event19"]) + self.store.have_seen_events("room1", [self.event_ids[0], "event19"]) ) - self.assertEqual(res, {"event10"}) + self.assertEqual(res, {self.event_ids[0]}) self.assertEqual(ctx.get_resource_usage().db_txn_count, 0) def test_query_via_event_cache(self): # fetch an event into the event cache - self.get_success(self.store.get_event("event10")) + self.get_success(self.store.get_event(self.event_ids[0])) # looking it up should now cause no db hits with LoggingContext(name="test") as ctx: - res = self.get_success(self.store.have_seen_events("room1", ["event10"])) - self.assertEqual(res, {"event10"}) + res = self.get_success( + self.store.have_seen_events("room1", [self.event_ids[0]]) + ) + self.assertEqual(res, {self.event_ids[0]}) self.assertEqual(ctx.get_resource_usage().db_txn_count, 0) @@ -167,7 +176,6 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase): self.store: EventsWorkerStore = hs.get_datastores().main self.room_id = f"!room:{hs.hostname}" - self.event_ids = [f"event{i}" for i in range(20)] self._populate_events() @@ -190,8 +198,14 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase): ) ) - self.event_ids = [f"event{i}" for i in range(20)] - for idx, event_id in enumerate(self.event_ids): + self.event_ids: List[str] = [] + for idx in range(20): + event_json = { + "type": f"test {idx}", + "room_id": self.room_id, + } + event = make_event_from_dict(event_json, room_version=RoomVersions.V4) + event_id = event.event_id self.get_success( self.store.db_pool.simple_upsert( "events", @@ -201,7 +215,7 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase): "room_id": self.room_id, "topological_ordering": idx, "stream_ordering": idx, - "type": "test", + "type": event.type, "processed": True, "outlier": False, }, @@ -213,12 +227,13 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase): {"event_id": event_id}, { "room_id": self.room_id, - "json": json.dumps({"type": "test", "room_id": self.room_id}), + "json": json.dumps(event_json), "internal_metadata": "{}", "format_version": EventFormatVersions.V3, }, ) ) + self.event_ids.append(event_id) @contextmanager def _outage(self) -> Generator[None, None, None]: |