diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 7b0b1cd32e..2ddd18f73b 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -17,13 +17,10 @@ import logging
from twisted.internet import defer
-from synapse.api.constants import EventTypes, Membership
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
-from synapse.types import UserID
-from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -55,9 +52,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
self.store = hs.get_datastore()
self.clock = hs.get_clock()
- self.is_mine_id = hs.is_mine_id
- self.notifier = hs.get_notifier()
- self.pusher_pool = hs.get_pusherpool()
+ self.federation_handler = hs.get_handlers().federation_handler
@staticmethod
@defer.inlineCallbacks
@@ -114,45 +109,12 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
len(event_and_contexts),
)
- max_stream_id = yield self.store.persist_events(
- event_and_contexts,
- backfilled=backfilled
+ yield self.federation_handler.persist_events_and_notify(
+ event_and_contexts, backfilled,
)
- if not backfilled:
- for event, _ in event_and_contexts:
- self._notify_persisted_event(event, max_stream_id)
-
defer.returnValue((200, {}))
- def _notify_persisted_event(self, event, max_stream_id):
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
-
- # We notify for memberships if its an invite for one of our
- # users
- if event.internal_metadata.is_outlier():
- if event.membership != Membership.INVITE:
- if not self.is_mine_id(target_user_id):
- return
-
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
- elif event.internal_metadata.is_outlier():
- return
-
- event_stream_id = event.internal_metadata.stream_ordering
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=extra_users
- )
-
- run_in_background(
- self.pusher_pool.on_new_notifications,
- event_stream_id, max_stream_id,
- )
-
class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
"""Handles EDUs newly received from federation, including persisting and
|