summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-08-15 14:17:18 +0100
committerErik Johnston <erik@matrix.org>2018-08-15 14:17:18 +0100
commit488ffe6fdb36c7479052096489c683777597c2aa (patch)
tree5e56469548925f551b6a2d385b4ca33e3d1f0421 /synapse
parentRename slave TransactionStore to SlaveTransactionStore (diff)
downloadsynapse-488ffe6fdb36c7479052096489c683777597c2aa.tar.xz
Use federation handler function rather than duplicate
This involves renaming _persist_events to be a public function.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/federation.py14
-rw-r--r--synapse/replication/http/federation.py44
2 files changed, 10 insertions, 48 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index acabca1d25..9a37d627ca 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1175,7 +1175,7 @@ class FederationHandler(BaseHandler):
         )
 
         context = yield self.state_handler.compute_event_context(event)
-        yield self._persist_events([(event, context)])
+        yield self.persist_events_and_notify([(event, context)])
 
         defer.returnValue(event)
 
@@ -1206,7 +1206,7 @@ class FederationHandler(BaseHandler):
         )
 
         context = yield self.state_handler.compute_event_context(event)
-        yield self._persist_events([(event, context)])
+        yield self.persist_events_and_notify([(event, context)])
 
         defer.returnValue(event)
 
@@ -1449,7 +1449,7 @@ class FederationHandler(BaseHandler):
                     event, context
                 )
 
-            yield self._persist_events(
+            yield self.persist_events_and_notify(
                 [(event, context)],
                 backfilled=backfilled,
             )
@@ -1487,7 +1487,7 @@ class FederationHandler(BaseHandler):
             ], consumeErrors=True,
         ))
 
-        yield self._persist_events(
+        yield self.persist_events_and_notify(
             [
                 (ev_info["event"], context)
                 for ev_info, context in zip(event_infos, contexts)
@@ -1575,7 +1575,7 @@ class FederationHandler(BaseHandler):
                     raise
                 events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
 
-        yield self._persist_events(
+        yield self.persist_events_and_notify(
             [
                 (e, events_to_context[e.event_id])
                 for e in itertools.chain(auth_events, state)
@@ -1586,7 +1586,7 @@ class FederationHandler(BaseHandler):
             event, old_state=state
         )
 
-        yield self._persist_events(
+        yield self.persist_events_and_notify(
             [(event, new_event_context)],
         )
 
@@ -2327,7 +2327,7 @@ class FederationHandler(BaseHandler):
             raise AuthError(403, "Third party certificate was invalid")
 
     @defer.inlineCallbacks
-    def _persist_events(self, event_and_contexts, backfilled=False):
+    def persist_events_and_notify(self, event_and_contexts, backfilled=False):
         """Persists events and tells the notifier/pushers about them, if
         necessary.
 
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 7b0b1cd32e..2ddd18f73b 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -17,13 +17,10 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes, Membership
 from synapse.events import FrozenEvent
 from synapse.events.snapshot import EventContext
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
-from synapse.types import UserID
-from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
@@ -55,9 +52,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
 
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
-        self.is_mine_id = hs.is_mine_id
-        self.notifier = hs.get_notifier()
-        self.pusher_pool = hs.get_pusherpool()
+        self.federation_handler = hs.get_handlers().federation_handler
 
     @staticmethod
     @defer.inlineCallbacks
@@ -114,45 +109,12 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
             len(event_and_contexts),
         )
 
-        max_stream_id = yield self.store.persist_events(
-            event_and_contexts,
-            backfilled=backfilled
+        yield self.federation_handler.persist_events_and_notify(
+            event_and_contexts, backfilled,
         )
 
-        if not backfilled:
-            for event, _ in event_and_contexts:
-                self._notify_persisted_event(event, max_stream_id)
-
         defer.returnValue((200, {}))
 
-    def _notify_persisted_event(self, event, max_stream_id):
-        extra_users = []
-        if event.type == EventTypes.Member:
-            target_user_id = event.state_key
-
-            # We notify for memberships if its an invite for one of our
-            # users
-            if event.internal_metadata.is_outlier():
-                if event.membership != Membership.INVITE:
-                    if not self.is_mine_id(target_user_id):
-                        return
-
-            target_user = UserID.from_string(target_user_id)
-            extra_users.append(target_user)
-        elif event.internal_metadata.is_outlier():
-            return
-
-        event_stream_id = event.internal_metadata.stream_ordering
-        self.notifier.on_new_room_event(
-            event, event_stream_id, max_stream_id,
-            extra_users=extra_users
-        )
-
-        run_in_background(
-            self.pusher_pool.on_new_notifications,
-            event_stream_id, max_stream_id,
-        )
-
 
 class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
     """Handles EDUs newly received from federation, including persisting and