diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index acabca1d25..9a37d627ca 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1175,7 +1175,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
- yield self._persist_events([(event, context)])
+ yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@@ -1206,7 +1206,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
- yield self._persist_events([(event, context)])
+ yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@@ -1449,7 +1449,7 @@ class FederationHandler(BaseHandler):
event, context
)
- yield self._persist_events(
+ yield self.persist_events_and_notify(
[(event, context)],
backfilled=backfilled,
)
@@ -1487,7 +1487,7 @@ class FederationHandler(BaseHandler):
], consumeErrors=True,
))
- yield self._persist_events(
+ yield self.persist_events_and_notify(
[
(ev_info["event"], context)
for ev_info, context in zip(event_infos, contexts)
@@ -1575,7 +1575,7 @@ class FederationHandler(BaseHandler):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
- yield self._persist_events(
+ yield self.persist_events_and_notify(
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
@@ -1586,7 +1586,7 @@ class FederationHandler(BaseHandler):
event, old_state=state
)
- yield self._persist_events(
+ yield self.persist_events_and_notify(
[(event, new_event_context)],
)
@@ -2327,7 +2327,7 @@ class FederationHandler(BaseHandler):
raise AuthError(403, "Third party certificate was invalid")
@defer.inlineCallbacks
- def _persist_events(self, event_and_contexts, backfilled=False):
+ def persist_events_and_notify(self, event_and_contexts, backfilled=False):
"""Persists events and tells the notifier/pushers about them, if
necessary.
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 7b0b1cd32e..2ddd18f73b 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -17,13 +17,10 @@ import logging
from twisted.internet import defer
-from synapse.api.constants import EventTypes, Membership
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
-from synapse.types import UserID
-from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -55,9 +52,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
self.store = hs.get_datastore()
self.clock = hs.get_clock()
- self.is_mine_id = hs.is_mine_id
- self.notifier = hs.get_notifier()
- self.pusher_pool = hs.get_pusherpool()
+ self.federation_handler = hs.get_handlers().federation_handler
@staticmethod
@defer.inlineCallbacks
@@ -114,45 +109,12 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
len(event_and_contexts),
)
- max_stream_id = yield self.store.persist_events(
- event_and_contexts,
- backfilled=backfilled
+ yield self.federation_handler.persist_events_and_notify(
+ event_and_contexts, backfilled,
)
- if not backfilled:
- for event, _ in event_and_contexts:
- self._notify_persisted_event(event, max_stream_id)
-
defer.returnValue((200, {}))
- def _notify_persisted_event(self, event, max_stream_id):
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
-
- # We notify for memberships if its an invite for one of our
- # users
- if event.internal_metadata.is_outlier():
- if event.membership != Membership.INVITE:
- if not self.is_mine_id(target_user_id):
- return
-
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
- elif event.internal_metadata.is_outlier():
- return
-
- event_stream_id = event.internal_metadata.stream_ordering
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=extra_users
- )
-
- run_in_background(
- self.pusher_pool.on_new_notifications,
- event_stream_id, max_stream_id,
- )
-
class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
"""Handles EDUs newly received from federation, including persisting and
|