summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2022-05-03 21:27:52 +0100
committerGitHub <noreply@github.com>2022-05-03 21:27:52 +0100
commit96e0cdbc5af0563ee805ec4e588e1df14899af66 (patch)
tree16778bcb0ad585075158c154fa462cbd7eb8222d
parentBump Synapse minimum Python version to 3.7.1 (#12613) (diff)
downloadsynapse-96e0cdbc5af0563ee805ec4e588e1df14899af66.tar.xz
Add a consistency check on events read from the database (#12620)
I've seen a few errors which can only plausibly be explained by the calculated
event id for an event being different from the ID of the event in the
database. It should be cheap to check this, so let's do so and raise an
exception.
-rw-r--r--changelog.d/12620.misc1
-rw-r--r--synapse/storage/databases/main/events_worker.py12
-rw-r--r--tests/storage/databases/main/test_events_worker.py59
3 files changed, 50 insertions, 22 deletions
diff --git a/changelog.d/12620.misc b/changelog.d/12620.misc
new file mode 100644
index 0000000000..63f8e540c3
--- /dev/null
+++ b/changelog.d/12620.misc
@@ -0,0 +1 @@
+Add a consistency check on events which we read from the database.
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index c31fc00eaa..0a48e5d29f 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1094,6 +1094,18 @@ class EventsWorkerStore(SQLBaseStore):
             original_ev.internal_metadata.stream_ordering = row.stream_ordering
             original_ev.internal_metadata.outlier = row.outlier
 
+            # Consistency check: if the content of the event has been modified in the
+            # database, then the calculated event ID will not match the event id in the
+            # database.
+            if original_ev.event_id != event_id:
+                # it's difficult to see what to do here. Pretty much all bets are off
+                # if Synapse cannot rely on the consistency of its database.
+                raise RuntimeError(
+                    f"Database corruption: Event {event_id} in room {d['room_id']} "
+                    f"from the database appears to have been modified (calculated "
+                    f"event id {original_ev.event_id})"
+                )
+
             event_map[event_id] = original_ev
 
         # finally, we can decide whether each one needs redacting, and build
diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py
index bf6374f93d..c237a8c7e2 100644
--- a/tests/storage/databases/main/test_events_worker.py
+++ b/tests/storage/databases/main/test_events_worker.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 import json
 from contextlib import contextmanager
-from typing import Generator, Tuple
+from typing import Generator, List, Tuple
 from unittest import mock
 
 from twisted.enterprise.adbapi import ConnectionPool
@@ -21,6 +21,7 @@ from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
 from twisted.test.proto_helpers import MemoryReactor
 
 from synapse.api.room_versions import EventFormatVersions, RoomVersions
+from synapse.events import make_event_from_dict
 from synapse.logging.context import LoggingContext
 from synapse.rest import admin
 from synapse.rest.client import login, room
@@ -49,23 +50,28 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
                 )
             )
 
-        for idx, (rid, eid) in enumerate(
+        self.event_ids: List[str] = []
+        for idx, rid in enumerate(
             (
-                ("room1", "event10"),
-                ("room1", "event11"),
-                ("room1", "event12"),
-                ("room2", "event20"),
+                "room1",
+                "room1",
+                "room1",
+                "room2",
             )
         ):
+            event_json = {"type": f"test {idx}", "room_id": rid}
+            event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
+            event_id = event.event_id
+
             self.get_success(
                 self.store.db_pool.simple_insert(
                     "events",
                     {
-                        "event_id": eid,
+                        "event_id": event_id,
                         "room_id": rid,
                         "topological_ordering": idx,
                         "stream_ordering": idx,
-                        "type": "test",
+                        "type": event.type,
                         "processed": True,
                         "outlier": False,
                     },
@@ -75,21 +81,22 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
                 self.store.db_pool.simple_insert(
                     "event_json",
                     {
-                        "event_id": eid,
+                        "event_id": event_id,
                         "room_id": rid,
-                        "json": json.dumps({"type": "test", "room_id": rid}),
+                        "json": json.dumps(event_json),
                         "internal_metadata": "{}",
                         "format_version": 3,
                     },
                 )
             )
+            self.event_ids.append(event_id)
 
     def test_simple(self):
         with LoggingContext(name="test") as ctx:
             res = self.get_success(
-                self.store.have_seen_events("room1", ["event10", "event19"])
+                self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
             )
-            self.assertEqual(res, {"event10"})
+            self.assertEqual(res, {self.event_ids[0]})
 
             # that should result in a single db query
             self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
@@ -97,19 +104,21 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
         # a second lookup of the same events should cause no queries
         with LoggingContext(name="test") as ctx:
             res = self.get_success(
-                self.store.have_seen_events("room1", ["event10", "event19"])
+                self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
             )
-            self.assertEqual(res, {"event10"})
+            self.assertEqual(res, {self.event_ids[0]})
             self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
 
     def test_query_via_event_cache(self):
         # fetch an event into the event cache
-        self.get_success(self.store.get_event("event10"))
+        self.get_success(self.store.get_event(self.event_ids[0]))
 
         # looking it up should now cause no db hits
         with LoggingContext(name="test") as ctx:
-            res = self.get_success(self.store.have_seen_events("room1", ["event10"]))
-            self.assertEqual(res, {"event10"})
+            res = self.get_success(
+                self.store.have_seen_events("room1", [self.event_ids[0]])
+            )
+            self.assertEqual(res, {self.event_ids[0]})
             self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
 
 
@@ -167,7 +176,6 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase):
         self.store: EventsWorkerStore = hs.get_datastores().main
 
         self.room_id = f"!room:{hs.hostname}"
-        self.event_ids = [f"event{i}" for i in range(20)]
 
         self._populate_events()
 
@@ -190,8 +198,14 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase):
             )
         )
 
-        self.event_ids = [f"event{i}" for i in range(20)]
-        for idx, event_id in enumerate(self.event_ids):
+        self.event_ids: List[str] = []
+        for idx in range(20):
+            event_json = {
+                "type": f"test {idx}",
+                "room_id": self.room_id,
+            }
+            event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
+            event_id = event.event_id
             self.get_success(
                 self.store.db_pool.simple_upsert(
                     "events",
@@ -201,7 +215,7 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase):
                         "room_id": self.room_id,
                         "topological_ordering": idx,
                         "stream_ordering": idx,
-                        "type": "test",
+                        "type": event.type,
                         "processed": True,
                         "outlier": False,
                     },
@@ -213,12 +227,13 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase):
                     {"event_id": event_id},
                     {
                         "room_id": self.room_id,
-                        "json": json.dumps({"type": "test", "room_id": self.room_id}),
+                        "json": json.dumps(event_json),
                         "internal_metadata": "{}",
                         "format_version": EventFormatVersions.V3,
                     },
                 )
             )
+            self.event_ids.append(event_id)
 
     @contextmanager
     def _outage(self) -> Generator[None, None, None]: