summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <eric.eastwood@beta.gouv.fr>2024-06-13 11:32:50 -0500
committerGitHub <noreply@github.com>2024-06-13 11:32:50 -0500
commit8c58eb7f17bdc697e653c7920edab42ee36f975b (patch)
tree1bbaa63d7ef1d7dcc11ae57bec89b2246a8369ee
parentFix `get_last_event_in_room_before_stream_ordering(...)` finding the wrong la... (diff)
downloadsynapse-8c58eb7f17bdc697e653c7920edab42ee36f975b.tar.xz
Add `event.internal_metadata.instance_name` (#17300)
Add `event.internal_metadata.instance_name` (the worker instance that persisted the event) to go alongside the existing `event.internal_metadata.stream_ordering`.

`instance_name` is useful to properly compare and query for events with a token since you need to compare both the `stream_ordering` and `instance_name` against the vector clock/`instance_map` in the `RoomStreamToken`.

This is pre-requisite work and may be used in https://github.com/element-hq/synapse/pull/17293

Adding `event.internal_metadata.instance_name` was first mentioned in the initial Sliding Sync PR while pairing with @erikjohnston, see https://github.com/element-hq/synapse/pull/17187/commits/09609cb0dbca3a4cfd9fbf90cc962e765ec469c0#diff-5cd773fb307aa754bd3948871ba118b1ef0303f4d72d42a2d21e38242bf4e096R405-R410
-rw-r--r--changelog.d/17300.misc1
-rw-r--r--rust/src/events/internal_metadata.rs3
-rw-r--r--synapse/events/utils.py2
-rw-r--r--synapse/handlers/message.py1
-rw-r--r--synapse/storage/databases/main/events.py1
-rw-r--r--synapse/storage/databases/main/events_worker.py16
-rw-r--r--synapse/synapse_rust/events.pyi2
-rw-r--r--tests/events/test_utils.py3
-rw-r--r--tests/replication/storage/test_events.py10
-rw-r--r--tests/storage/test_event_chain.py1
10 files changed, 31 insertions, 9 deletions
diff --git a/changelog.d/17300.misc b/changelog.d/17300.misc
new file mode 100644
index 0000000000..cdc40bb2e5
--- /dev/null
+++ b/changelog.d/17300.misc
@@ -0,0 +1 @@
+Expose the worker instance that persisted the event on `event.internal_metadata.instance_name`.
diff --git a/rust/src/events/internal_metadata.rs b/rust/src/events/internal_metadata.rs
index 63774fbd54..ad87825f16 100644
--- a/rust/src/events/internal_metadata.rs
+++ b/rust/src/events/internal_metadata.rs
@@ -204,6 +204,8 @@ pub struct EventInternalMetadata {
     /// The stream ordering of this event. None, until it has been persisted.
     #[pyo3(get, set)]
     stream_ordering: Option<NonZeroI64>,
+    #[pyo3(get, set)]
+    instance_name: Option<String>,
 
     /// whether this event is an outlier (ie, whether we have the state at that
     /// point in the DAG)
@@ -232,6 +234,7 @@ impl EventInternalMetadata {
         Ok(EventInternalMetadata {
             data,
             stream_ordering: None,
+            instance_name: None,
             outlier: false,
         })
     }
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 0772472312..b997d82d71 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -90,6 +90,7 @@ def prune_event(event: EventBase) -> EventBase:
     pruned_event.internal_metadata.stream_ordering = (
         event.internal_metadata.stream_ordering
     )
+    pruned_event.internal_metadata.instance_name = event.internal_metadata.instance_name
     pruned_event.internal_metadata.outlier = event.internal_metadata.outlier
 
     # Mark the event as redacted
@@ -116,6 +117,7 @@ def clone_event(event: EventBase) -> EventBase:
     new_event.internal_metadata.stream_ordering = (
         event.internal_metadata.stream_ordering
     )
+    new_event.internal_metadata.instance_name = event.internal_metadata.instance_name
     new_event.internal_metadata.outlier = event.internal_metadata.outlier
 
     return new_event
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index de5bd44a5f..721ef04f41 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1551,6 +1551,7 @@ class EventCreationHandler:
                     # stream_ordering entry manually (as it was persisted on
                     # another worker).
                     event.internal_metadata.stream_ordering = stream_id
+                    event.internal_metadata.instance_name = writer_instance
 
                 return event
 
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index f1bd85aa27..66428e6c8e 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -207,6 +207,7 @@ class PersistEventsStore:
         async with stream_ordering_manager as stream_orderings:
             for (event, _), stream in zip(events_and_contexts, stream_orderings):
                 event.internal_metadata.stream_ordering = stream
+                event.internal_metadata.instance_name = self._instance_name
 
             await self.db_pool.runInteraction(
                 "persist_events",
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index c06c44deb1..e264d36f02 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -156,6 +156,7 @@ class _EventRow:
 
     event_id: str
     stream_ordering: int
+    instance_name: str
     json: str
     internal_metadata: str
     format_version: Optional[int]
@@ -1354,6 +1355,7 @@ class EventsWorkerStore(SQLBaseStore):
                 rejected_reason=rejected_reason,
             )
             original_ev.internal_metadata.stream_ordering = row.stream_ordering
+            original_ev.internal_metadata.instance_name = row.instance_name
             original_ev.internal_metadata.outlier = row.outlier
 
             # Consistency check: if the content of the event has been modified in the
@@ -1439,6 +1441,7 @@ class EventsWorkerStore(SQLBaseStore):
                 SELECT
                   e.event_id,
                   e.stream_ordering,
+                  e.instance_name,
                   ej.internal_metadata,
                   ej.json,
                   ej.format_version,
@@ -1462,13 +1465,14 @@ class EventsWorkerStore(SQLBaseStore):
                 event_dict[event_id] = _EventRow(
                     event_id=event_id,
                     stream_ordering=row[1],
-                    internal_metadata=row[2],
-                    json=row[3],
-                    format_version=row[4],
-                    room_version_id=row[5],
-                    rejected_reason=row[6],
+                    instance_name=row[2],
+                    internal_metadata=row[3],
+                    json=row[4],
+                    format_version=row[5],
+                    room_version_id=row[6],
+                    rejected_reason=row[7],
                     redactions=[],
-                    outlier=bool(row[7]),  # This is an int in SQLite3
+                    outlier=bool(row[8]),  # This is an int in SQLite3
                 )
 
             # check for redactions
diff --git a/synapse/synapse_rust/events.pyi b/synapse/synapse_rust/events.pyi
index 69837617f5..1682d0d151 100644
--- a/synapse/synapse_rust/events.pyi
+++ b/synapse/synapse_rust/events.pyi
@@ -19,6 +19,8 @@ class EventInternalMetadata:
 
     stream_ordering: Optional[int]
     """the stream ordering of this event. None, until it has been persisted."""
+    instance_name: Optional[str]
+    """the instance name of the server that persisted this event. None, until it has been persisted."""
 
     outlier: bool
     """whether this event is an outlier (ie, whether we have the state at that
diff --git a/tests/events/test_utils.py b/tests/events/test_utils.py
index d5ac66a6ed..30f8787758 100644
--- a/tests/events/test_utils.py
+++ b/tests/events/test_utils.py
@@ -625,6 +625,8 @@ class CloneEventTestCase(stdlib_unittest.TestCase):
         )
         original.internal_metadata.stream_ordering = 1234
         self.assertEqual(original.internal_metadata.stream_ordering, 1234)
+        original.internal_metadata.instance_name = "worker1"
+        self.assertEqual(original.internal_metadata.instance_name, "worker1")
 
         cloned = clone_event(original)
         cloned.unsigned["b"] = 3
@@ -632,6 +634,7 @@ class CloneEventTestCase(stdlib_unittest.TestCase):
         self.assertEqual(original.unsigned, {"a": 1, "b": 2})
         self.assertEqual(cloned.unsigned, {"a": 1, "b": 3})
         self.assertEqual(cloned.internal_metadata.stream_ordering, 1234)
+        self.assertEqual(cloned.internal_metadata.instance_name, "worker1")
         self.assertEqual(cloned.internal_metadata.txn_id, "txn")
 
 
diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py
index 4e41a1c912..a56f1e2d5d 100644
--- a/tests/replication/storage/test_events.py
+++ b/tests/replication/storage/test_events.py
@@ -141,6 +141,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
         self.persist(type="m.room.create", key="", creator=USER_ID)
         self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
         event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
+        assert event.internal_metadata.instance_name is not None
         assert event.internal_metadata.stream_ordering is not None
 
         self.replicate()
@@ -155,7 +156,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
                     "invite",
                     event.event_id,
                     PersistedEventPosition(
-                        self.hs.get_instance_name(),
+                        event.internal_metadata.instance_name,
                         event.internal_metadata.stream_ordering,
                     ),
                     RoomVersions.V1.identifier,
@@ -232,11 +233,12 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
         j2 = self.persist(
             type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
         )
+        assert j2.internal_metadata.instance_name is not None
         assert j2.internal_metadata.stream_ordering is not None
         self.replicate()
 
         expected_pos = PersistedEventPosition(
-            "master", j2.internal_metadata.stream_ordering
+            j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering
         )
         self.check(
             "get_rooms_for_user_with_stream_ordering",
@@ -288,6 +290,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
         msg, msgctx = self.build_event()
         self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)]))
         self.replicate()
+        assert j2.internal_metadata.instance_name is not None
         assert j2.internal_metadata.stream_ordering is not None
 
         event_source = RoomEventSource(self.hs)
@@ -329,7 +332,8 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
             # joined_rooms list.
             if membership_changes:
                 expected_pos = PersistedEventPosition(
-                    "master", j2.internal_metadata.stream_ordering
+                    j2.internal_metadata.instance_name,
+                    j2.internal_metadata.stream_ordering,
                 )
                 self.assertEqual(
                     joined_rooms,
diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py
index 27d5b0125f..81feb3ec29 100644
--- a/tests/storage/test_event_chain.py
+++ b/tests/storage/test_event_chain.py
@@ -431,6 +431,7 @@ class EventChainStoreTestCase(HomeserverTestCase):
 
         for e in events:
             e.internal_metadata.stream_ordering = self._next_stream_ordering
+            e.internal_metadata.instance_name = self.hs.get_instance_name()
             self._next_stream_ordering += 1
 
         def _persist(txn: LoggingTransaction) -> None: