diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 2f59245058..e4f2201c92 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -21,7 +21,7 @@ from twisted.internet.interfaces import IAddress, IConnector
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.python.failure import Failure
-from synapse.api.constants import EventTypes, ReceiptTypes
+from synapse.api.constants import EventTypes, Membership, ReceiptTypes
from synapse.federation import send_queue
from synapse.federation.sender import FederationSender
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
@@ -219,6 +219,21 @@ class ReplicationDataHandler:
membership=row.data.membership,
)
+ # If this event is a join, make a note of it so we have an accurate
+ # cross-worker room rate limit.
+ # TODO: Erik said we should exclude rows that came from ex_outliers
+ # here, but I don't see how we can determine that. I guess we could
+ # add a flag to row.data?
+ if (
+ row.data.type == EventTypes.Member
+ and row.data.membership == Membership.JOIN
+ and not row.data.outlier
+ ):
+ # TODO retrieve the previous state, and exclude join -> join transitions
+ self.notifier.notify_user_joined_room(
+ row.data.event_id, row.data.room_id
+ )
+
await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows
)
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 26f4fa7cfd..14b6705862 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -98,6 +98,7 @@ class EventsStreamEventRow(BaseEventsStreamRow):
relates_to: Optional[str]
membership: Optional[str]
rejected: bool
+ outlier: bool
@attr.s(slots=True, frozen=True, auto_attribs=True)
|