diff --git a/changelog.d/17300.misc b/changelog.d/17300.misc
new file mode 100644
index 0000000000..cdc40bb2e5
--- /dev/null
+++ b/changelog.d/17300.misc
@@ -0,0 +1 @@
+Expose the worker instance that persisted the event on `event.internal_metadata.instance_name`.
diff --git a/rust/src/events/internal_metadata.rs b/rust/src/events/internal_metadata.rs
index 63774fbd54..ad87825f16 100644
--- a/rust/src/events/internal_metadata.rs
+++ b/rust/src/events/internal_metadata.rs
@@ -204,6 +204,8 @@ pub struct EventInternalMetadata {
/// The stream ordering of this event. None, until it has been persisted.
#[pyo3(get, set)]
stream_ordering: Option<NonZeroI64>,
+ #[pyo3(get, set)]
+ instance_name: Option<String>,
/// whether this event is an outlier (ie, whether we have the state at that
/// point in the DAG)
@@ -232,6 +234,7 @@ impl EventInternalMetadata {
Ok(EventInternalMetadata {
data,
stream_ordering: None,
+ instance_name: None,
outlier: false,
})
}
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 0772472312..b997d82d71 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -90,6 +90,7 @@ def prune_event(event: EventBase) -> EventBase:
pruned_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering
)
+ pruned_event.internal_metadata.instance_name = event.internal_metadata.instance_name
pruned_event.internal_metadata.outlier = event.internal_metadata.outlier
# Mark the event as redacted
@@ -116,6 +117,7 @@ def clone_event(event: EventBase) -> EventBase:
new_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering
)
+ new_event.internal_metadata.instance_name = event.internal_metadata.instance_name
new_event.internal_metadata.outlier = event.internal_metadata.outlier
return new_event
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index de5bd44a5f..721ef04f41 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1551,6 +1551,7 @@ class EventCreationHandler:
# stream_ordering entry manually (as it was persisted on
# another worker).
event.internal_metadata.stream_ordering = stream_id
+ event.internal_metadata.instance_name = writer_instance
return event
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index f1bd85aa27..66428e6c8e 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -207,6 +207,7 @@ class PersistEventsStore:
async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
+ event.internal_metadata.instance_name = self._instance_name
await self.db_pool.runInteraction(
"persist_events",
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index c06c44deb1..e264d36f02 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -156,6 +156,7 @@ class _EventRow:
event_id: str
stream_ordering: int
+ instance_name: str
json: str
internal_metadata: str
format_version: Optional[int]
@@ -1354,6 +1355,7 @@ class EventsWorkerStore(SQLBaseStore):
rejected_reason=rejected_reason,
)
original_ev.internal_metadata.stream_ordering = row.stream_ordering
+ original_ev.internal_metadata.instance_name = row.instance_name
original_ev.internal_metadata.outlier = row.outlier
# Consistency check: if the content of the event has been modified in the
@@ -1439,6 +1441,7 @@ class EventsWorkerStore(SQLBaseStore):
SELECT
e.event_id,
e.stream_ordering,
+ e.instance_name,
ej.internal_metadata,
ej.json,
ej.format_version,
@@ -1462,13 +1465,14 @@ class EventsWorkerStore(SQLBaseStore):
event_dict[event_id] = _EventRow(
event_id=event_id,
stream_ordering=row[1],
- internal_metadata=row[2],
- json=row[3],
- format_version=row[4],
- room_version_id=row[5],
- rejected_reason=row[6],
+ instance_name=row[2],
+ internal_metadata=row[3],
+ json=row[4],
+ format_version=row[5],
+ room_version_id=row[6],
+ rejected_reason=row[7],
redactions=[],
- outlier=bool(row[7]), # This is an int in SQLite3
+ outlier=bool(row[8]), # This is an int in SQLite3
)
# check for redactions
diff --git a/synapse/synapse_rust/events.pyi b/synapse/synapse_rust/events.pyi
index 69837617f5..1682d0d151 100644
--- a/synapse/synapse_rust/events.pyi
+++ b/synapse/synapse_rust/events.pyi
@@ -19,6 +19,8 @@ class EventInternalMetadata:
stream_ordering: Optional[int]
"""the stream ordering of this event. None, until it has been persisted."""
+ instance_name: Optional[str]
+ """the instance name of the server that persisted this event. None, until it has been persisted."""
outlier: bool
"""whether this event is an outlier (ie, whether we have the state at that
diff --git a/tests/events/test_utils.py b/tests/events/test_utils.py
index d5ac66a6ed..30f8787758 100644
--- a/tests/events/test_utils.py
+++ b/tests/events/test_utils.py
@@ -625,6 +625,8 @@ class CloneEventTestCase(stdlib_unittest.TestCase):
)
original.internal_metadata.stream_ordering = 1234
self.assertEqual(original.internal_metadata.stream_ordering, 1234)
+ original.internal_metadata.instance_name = "worker1"
+ self.assertEqual(original.internal_metadata.instance_name, "worker1")
cloned = clone_event(original)
cloned.unsigned["b"] = 3
@@ -632,6 +634,7 @@ class CloneEventTestCase(stdlib_unittest.TestCase):
self.assertEqual(original.unsigned, {"a": 1, "b": 2})
self.assertEqual(cloned.unsigned, {"a": 1, "b": 3})
self.assertEqual(cloned.internal_metadata.stream_ordering, 1234)
+ self.assertEqual(cloned.internal_metadata.instance_name, "worker1")
self.assertEqual(cloned.internal_metadata.txn_id, "txn")
diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py
index 4e41a1c912..a56f1e2d5d 100644
--- a/tests/replication/storage/test_events.py
+++ b/tests/replication/storage/test_events.py
@@ -141,6 +141,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
self.persist(type="m.room.create", key="", creator=USER_ID)
self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
+ assert event.internal_metadata.instance_name is not None
assert event.internal_metadata.stream_ordering is not None
self.replicate()
@@ -155,7 +156,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
"invite",
event.event_id,
PersistedEventPosition(
- self.hs.get_instance_name(),
+ event.internal_metadata.instance_name,
event.internal_metadata.stream_ordering,
),
RoomVersions.V1.identifier,
@@ -232,11 +233,12 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
j2 = self.persist(
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
)
+ assert j2.internal_metadata.instance_name is not None
assert j2.internal_metadata.stream_ordering is not None
self.replicate()
expected_pos = PersistedEventPosition(
- "master", j2.internal_metadata.stream_ordering
+ j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering
)
self.check(
"get_rooms_for_user_with_stream_ordering",
@@ -288,6 +290,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
msg, msgctx = self.build_event()
self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)]))
self.replicate()
+ assert j2.internal_metadata.instance_name is not None
assert j2.internal_metadata.stream_ordering is not None
event_source = RoomEventSource(self.hs)
@@ -329,7 +332,8 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
# joined_rooms list.
if membership_changes:
expected_pos = PersistedEventPosition(
- "master", j2.internal_metadata.stream_ordering
+ j2.internal_metadata.instance_name,
+ j2.internal_metadata.stream_ordering,
)
self.assertEqual(
joined_rooms,
diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py
index 27d5b0125f..81feb3ec29 100644
--- a/tests/storage/test_event_chain.py
+++ b/tests/storage/test_event_chain.py
@@ -431,6 +431,7 @@ class EventChainStoreTestCase(HomeserverTestCase):
for e in events:
e.internal_metadata.stream_ordering = self._next_stream_ordering
+ e.internal_metadata.instance_name = self.hs.get_instance_name()
self._next_stream_ordering += 1
def _persist(txn: LoggingTransaction) -> None:
|