summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-08-15 14:17:18 +0100
committerErik Johnston <erik@matrix.org>2018-08-15 14:17:18 +0100
commit488ffe6fdb36c7479052096489c683777597c2aa (patch)
tree5e56469548925f551b6a2d385b4ca33e3d1f0421 /synapse/replication
parentRename slave TransactionStore to SlaveTransactionStore (diff)
downloadsynapse-488ffe6fdb36c7479052096489c683777597c2aa.tar.xz
Use federation handler function rather than duplicate
This involves renaming _persist_events to be a public function.
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/federation.py44
1 files changed, 3 insertions, 41 deletions
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 7b0b1cd32e..2ddd18f73b 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -17,13 +17,10 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes, Membership
 from synapse.events import FrozenEvent
 from synapse.events.snapshot import EventContext
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
-from synapse.types import UserID
-from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
@@ -55,9 +52,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
 
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
-        self.is_mine_id = hs.is_mine_id
-        self.notifier = hs.get_notifier()
-        self.pusher_pool = hs.get_pusherpool()
+        self.federation_handler = hs.get_handlers().federation_handler
 
     @staticmethod
     @defer.inlineCallbacks
@@ -114,45 +109,12 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
             len(event_and_contexts),
         )
 
-        max_stream_id = yield self.store.persist_events(
-            event_and_contexts,
-            backfilled=backfilled
+        yield self.federation_handler.persist_events_and_notify(
+            event_and_contexts, backfilled,
         )
 
-        if not backfilled:
-            for event, _ in event_and_contexts:
-                self._notify_persisted_event(event, max_stream_id)
-
         defer.returnValue((200, {}))
 
-    def _notify_persisted_event(self, event, max_stream_id):
-        extra_users = []
-        if event.type == EventTypes.Member:
-            target_user_id = event.state_key
-
-            # We notify for memberships if its an invite for one of our
-            # users
-            if event.internal_metadata.is_outlier():
-                if event.membership != Membership.INVITE:
-                    if not self.is_mine_id(target_user_id):
-                        return
-
-            target_user = UserID.from_string(target_user_id)
-            extra_users.append(target_user)
-        elif event.internal_metadata.is_outlier():
-            return
-
-        event_stream_id = event.internal_metadata.stream_ordering
-        self.notifier.on_new_room_event(
-            event, event_stream_id, max_stream_id,
-            extra_users=extra_users
-        )
-
-        run_in_background(
-            self.pusher_pool.on_new_notifications,
-            event_stream_id, max_stream_id,
-        )
-
 
 class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
     """Handles EDUs newly received from federation, including persisting and