diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e27ee216f0..2618eb1e53 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -141,21 +141,25 @@ class ReplicationDataHandler:
if row.type != EventsStreamEventRow.TypeId:
continue
assert isinstance(row, EventsStreamRow)
+ assert isinstance(row.data, EventsStreamEventRow)
- event = await self.store.get_event(
- row.data.event_id, allow_rejected=True
- )
- if event.rejected_reason:
+ if row.data.rejected:
continue
extra_users = () # type: Tuple[UserID, ...]
- if event.type == EventTypes.Member:
- extra_users = (UserID.from_string(event.state_key),)
+ if row.data.type == EventTypes.Member and row.data.state_key:
+ extra_users = (UserID.from_string(row.data.state_key),)
max_token = self.store.get_room_max_token()
event_pos = PersistedEventPosition(instance_name, token)
- self.notifier.on_new_room_event(
- event, event_pos, max_token, extra_users
+ self.notifier.on_new_room_event_args(
+ event_pos=event_pos,
+ max_room_stream_token=max_token,
+ extra_users=extra_users,
+ room_id=row.data.room_id,
+ event_type=row.data.type,
+ state_key=row.data.state_key,
+ membership=row.data.membership,
)
# Notify any waiting deferreds. The list is ordered by position so we
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 82e9e0d64e..86a62b71eb 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -15,12 +15,15 @@
# limitations under the License.
import heapq
from collections.abc import Iterable
-from typing import List, Tuple, Type
+from typing import TYPE_CHECKING, List, Optional, Tuple, Type
import attr
from ._base import Stream, StreamUpdateResult, Token
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
"""Handling of the 'events' replication stream
This stream contains rows of various types. Each row therefore contains a 'type'
@@ -81,12 +84,14 @@ class BaseEventsStreamRow:
class EventsStreamEventRow(BaseEventsStreamRow):
TypeId = "ev"
- event_id = attr.ib() # str
- room_id = attr.ib() # str
- type = attr.ib() # str
- state_key = attr.ib() # str, optional
- redacts = attr.ib() # str, optional
- relates_to = attr.ib() # str, optional
+ event_id = attr.ib(type=str)
+ room_id = attr.ib(type=str)
+ type = attr.ib(type=str)
+ state_key = attr.ib(type=Optional[str])
+ redacts = attr.ib(type=Optional[str])
+ relates_to = attr.ib(type=Optional[str])
+ membership = attr.ib(type=Optional[str])
+ rejected = attr.ib(type=bool)
@attr.s(slots=True, frozen=True)
@@ -113,7 +118,7 @@ class EventsStream(Stream):
NAME = "events"
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
self._store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
|