From e26dbd82ef5f1d755be9a62165556ebce041af10 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 25 Jul 2018 16:32:05 +0100 Subject: Add replication APIs for persisting federation events --- synapse/handlers/federation.py | 44 +++++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 9 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 533b82c783..0524dec942 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -44,6 +44,8 @@ from synapse.crypto.event_signing import ( compute_event_signature, ) from synapse.events.validator import EventValidator +from synapse.replication.http.federation import send_federation_events_to_master +from synapse.replication.http.membership import notify_user_membership_change from synapse.state import resolve_events_with_factory from synapse.types import UserID, get_domain_from_id from synapse.util import logcontext, unwrapFirstError @@ -86,6 +88,8 @@ class FederationHandler(BaseHandler): self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() self._server_notices_mxid = hs.config.server_notices_mxid + self.config = hs.config + self.http_client = hs.get_simple_http_client() # When joining a room we need to queue any events for that room up self.room_queues = {} @@ -2288,7 +2292,7 @@ class FederationHandler(BaseHandler): for revocation. """ try: - response = yield self.hs.get_simple_http_client().get_json( + response = yield self.http_client.get_json( url, {"public_key": public_key} ) @@ -2313,14 +2317,25 @@ class FederationHandler(BaseHandler): Returns: Deferred """ - max_stream_id = yield self.store.persist_events( - event_and_contexts, - backfilled=backfilled, - ) + if self.config.worker_app: + yield send_federation_events_to_master( + clock=self.hs.get_clock(), + store=self.store, + client=self.http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + event_and_contexts=event_and_contexts, + backfilled=backfilled + ) + else: + max_stream_id = yield self.store.persist_events( + event_and_contexts, + backfilled=backfilled, + ) - if not backfilled: # Never notify for backfilled events - for event, _ in event_and_contexts: - self._notify_persisted_event(event, max_stream_id) + if not backfilled: # Never notify for backfilled events + for event, _ in event_and_contexts: + self._notify_persisted_event(event, max_stream_id) def _notify_persisted_event(self, event, max_stream_id): """Checks to see if notifier/pushers should be notified about the @@ -2359,9 +2374,20 @@ class FederationHandler(BaseHandler): ) def _clean_room_for_join(self, room_id): + # TODO move this out to master return self.store.clean_room_for_join(room_id) def user_joined_room(self, user, room_id): """Called when a new user has joined the room """ - return user_joined_room(self.distributor, user, room_id) + if self.config.worker_app: + return notify_user_membership_change( + client=self.http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + room_id=room_id, + user_id=user.to_string(), + change="joined", + ) + else: + return user_joined_room(self.distributor, user, room_id) -- cgit 1.4.1 From a3f5bf79a0fc0ea6d59069945f53717a3e9c6581 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Jul 2018 11:44:22 +0100 Subject: Add EDU/query handling over replication --- synapse/federation/federation_server.py | 43 +++++++++++++++++++++++++++++++++ synapse/handlers/federation.py | 24 +++++++++--------- synapse/replication/http/federation.py | 2 +- synapse/server.py | 6 ++++- 4 files changed, 62 insertions(+), 13 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index bf89d568af..941e30a596 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -33,6 +33,10 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction from synapse.http.endpoint import parse_server_name +from synapse.replication.http.federation import ( + ReplicationFederationSendEduRestServlet, + ReplicationGetQueryRestServlet, +) from synapse.types import get_domain_from_id from synapse.util import async from synapse.util.caches.response_cache import ResponseCache @@ -745,6 +749,8 @@ class FederationHandlerRegistry(object): if edu_type in self.edu_handlers: raise KeyError("Already have an EDU handler for %s" % (edu_type,)) + logger.info("Registering federation EDU handler for %r", edu_type) + self.edu_handlers[edu_type] = handler def register_query_handler(self, query_type, handler): @@ -763,6 +769,8 @@ class FederationHandlerRegistry(object): "Already have a Query handler for %s" % (query_type,) ) + logger.info("Registering federation query handler for %r", query_type) + self.query_handlers[query_type] = handler @defer.inlineCallbacks @@ -785,3 +793,38 @@ class FederationHandlerRegistry(object): raise NotFoundError("No handler for Query type '%s'" % (query_type,)) return handler(args) + + +class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): + def __init__(self, hs): + self.config = hs.config + self.http_client = hs.get_simple_http_client() + self.clock = hs.get_clock() + + self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs) + self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs) + + super(ReplicationFederationHandlerRegistry, self).__init__() + + def on_edu(self, edu_type, origin, content): + handler = self.edu_handlers.get(edu_type) + if handler: + return super(ReplicationFederationHandlerRegistry, self).on_edu( + edu_type, origin, content, + ) + + return self._send_edu( + edu_type=edu_type, + origin=origin, + content=content, + ) + + def on_query(self, query_type, args): + handler = self.query_handlers.get(query_type) + if handler: + return handler(args) + + return self._get_query_client( + query_type=query_type, + args=args, + ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0524dec942..d2cbb12df3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -44,8 +44,10 @@ from synapse.crypto.event_signing import ( compute_event_signature, ) from synapse.events.validator import EventValidator -from synapse.replication.http.federation import send_federation_events_to_master -from synapse.replication.http.membership import notify_user_membership_change +from synapse.replication.http.federation import ( + ReplicationFederationSendEventsRestServlet, +) +from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet from synapse.state import resolve_events_with_factory from synapse.types import UserID, get_domain_from_id from synapse.util import logcontext, unwrapFirstError @@ -91,6 +93,13 @@ class FederationHandler(BaseHandler): self.config = hs.config self.http_client = hs.get_simple_http_client() + self._send_events_to_master = ( + ReplicationFederationSendEventsRestServlet.make_client(hs) + ) + self._notify_user_membership_change = ( + ReplicationUserJoinedLeftRoomRestServlet.make_client(hs) + ) + # When joining a room we need to queue any events for that room up self.room_queues = {} self._room_pdu_linearizer = Linearizer("fed_room_pdu") @@ -2318,12 +2327,8 @@ class FederationHandler(BaseHandler): Deferred """ if self.config.worker_app: - yield send_federation_events_to_master( - clock=self.hs.get_clock(), + yield self._send_events_to_master( store=self.store, - client=self.http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, event_and_contexts=event_and_contexts, backfilled=backfilled ) @@ -2381,10 +2386,7 @@ class FederationHandler(BaseHandler): """Called when a new user has joined the room """ if self.config.worker_app: - return notify_user_membership_change( - client=self.http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + return self._notify_user_membership_change( room_id=room_id, user_id=user.to_string(), change="joined", diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index f39aaa89be..3fa7bd64c7 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -59,8 +59,8 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): self.notifier = hs.get_notifier() self.pusher_pool = hs.get_pusherpool() - @defer.inlineCallbacks @staticmethod + @defer.inlineCallbacks def _serialize_payload(store, event_and_contexts, backfilled): """ Args: diff --git a/synapse/server.py b/synapse/server.py index 140be9ebe8..26228d8c72 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -36,6 +36,7 @@ from synapse.federation.federation_client import FederationClient from synapse.federation.federation_server import ( FederationHandlerRegistry, FederationServer, + ReplicationFederationHandlerRegistry, ) from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.federation.transaction_queue import TransactionQueue @@ -423,7 +424,10 @@ class HomeServer(object): return RoomMemberMasterHandler(self) def build_federation_registry(self): - return FederationHandlerRegistry() + if self.config.worker_app: + return ReplicationFederationHandlerRegistry(self) + else: + return FederationHandlerRegistry() def build_server_notices_manager(self): if self.config.worker_app: -- cgit 1.4.1 From 360ba89c50ea5cbf824e54f04d536b89b57f3304 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Aug 2018 11:54:55 +0100 Subject: Don't fail requests to unbind 3pids for non supporting ID servers Older identity servers may not support the unbind 3pid request, so we shouldn't fail the requests if we received one of 400/404/501. The request still fails if we receive e.g. 500 responses, allowing clients to retry requests on transient identity server errors that otherwise do support the API. Fixes #3661 --- changelog.d/3661.bugfix | 1 + synapse/handlers/auth.py | 20 +++++++++++++++++--- synapse/handlers/deactivate_account.py | 13 +++++++++++-- synapse/handlers/identity.py | 30 +++++++++++++++++++++--------- synapse/rest/client/v1/admin.py | 11 +++++++++-- synapse/rest/client/v2_alpha/account.py | 22 ++++++++++++++++++---- 6 files changed, 77 insertions(+), 20 deletions(-) create mode 100644 changelog.d/3661.bugfix (limited to 'synapse/handlers') diff --git a/changelog.d/3661.bugfix b/changelog.d/3661.bugfix new file mode 100644 index 0000000000..f2b4703d80 --- /dev/null +++ b/changelog.d/3661.bugfix @@ -0,0 +1 @@ +Fix bug on deleting 3pid when using identity servers that don't support unbind API diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 184eef09d0..da17e73fdd 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -828,12 +828,26 @@ class AuthHandler(BaseHandler): @defer.inlineCallbacks def delete_threepid(self, user_id, medium, address): + """Attempts to unbind the 3pid on the identity servers and deletes it + from the local database. + + Args: + user_id (str) + medium (str) + address (str) + + Returns: + Deferred[bool]: Returns True if successfully unbound the 3pid on + the identity server, False if identity server doesn't support the + unbind API. + """ + # 'Canonicalise' email addresses as per above if medium == 'email': address = address.lower() identity_handler = self.hs.get_handlers().identity_handler - yield identity_handler.unbind_threepid( + result = yield identity_handler.try_unbind_threepid( user_id, { 'medium': medium, @@ -841,10 +855,10 @@ class AuthHandler(BaseHandler): }, ) - ret = yield self.store.user_delete_threepid( + yield self.store.user_delete_threepid( user_id, medium, address, ) - defer.returnValue(ret) + defer.returnValue(result) def _save_session(self, session): # TODO: Persistent storage diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index b3c5a9ee64..b078df4a76 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -51,7 +51,8 @@ class DeactivateAccountHandler(BaseHandler): erase_data (bool): whether to GDPR-erase the user's data Returns: - Deferred + Deferred[bool]: True if identity server supports removing + threepids, otherwise False. """ # FIXME: Theoretically there is a race here wherein user resets # password using threepid. @@ -60,16 +61,22 @@ class DeactivateAccountHandler(BaseHandler): # leave the user still active so they can try again. # Ideally we would prevent password resets and then do this in the # background thread. + + # This will be set to false if the identity server doesn't support + # unbinding + identity_server_supports_unbinding = True + threepids = yield self.store.user_get_threepids(user_id) for threepid in threepids: try: - yield self._identity_handler.unbind_threepid( + result = yield self._identity_handler.try_unbind_threepid( user_id, { 'medium': threepid['medium'], 'address': threepid['address'], }, ) + identity_server_supports_unbinding &= result except Exception: # Do we want this to be a fatal error or should we carry on? logger.exception("Failed to remove threepid from ID server") @@ -103,6 +110,8 @@ class DeactivateAccountHandler(BaseHandler): # parts users from rooms (if it isn't already running) self._start_user_parting() + defer.returnValue(identity_server_supports_unbinding) + def _start_user_parting(self): """ Start the process that goes through the table of users diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 1d36d967c3..a0f5fecc96 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -137,15 +137,19 @@ class IdentityHandler(BaseHandler): defer.returnValue(data) @defer.inlineCallbacks - def unbind_threepid(self, mxid, threepid): - """ - Removes a binding from an identity server + def try_unbind_threepid(self, mxid, threepid): + """Removes a binding from an identity server + Args: mxid (str): Matrix user ID of binding to be removed threepid (dict): Dict with medium & address of binding to be removed + Raises: + SynapseError: If we failed to contact the identity server + Returns: - Deferred[bool]: True on success, otherwise False + Deferred[bool]: True on success, otherwise False if the identity + server doesn't support unbinding """ logger.debug("unbinding threepid %r from %s", threepid, mxid) if not self.trusted_id_servers: @@ -175,11 +179,19 @@ class IdentityHandler(BaseHandler): content=content, destination_is=id_server, ) - yield self.http_client.post_json_get_json( - url, - content, - headers, - ) + try: + yield self.http_client.post_json_get_json( + url, + content, + headers, + ) + except HttpResponseException as e: + if e.code in (400, 404, 501,): + # The remote server probably doesn't support unbinding (yet) + defer.returnValue(False) + else: + raise SynapseError(502, "Failed to contact identity server") + defer.returnValue(True) @defer.inlineCallbacks diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 80d625eecc..ad536ab570 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -391,10 +391,17 @@ class DeactivateAccountRestServlet(ClientV1RestServlet): if not is_admin: raise AuthError(403, "You are not a server admin") - yield self._deactivate_account_handler.deactivate_account( + result = yield self._deactivate_account_handler.deactivate_account( target_user_id, erase, ) - defer.returnValue((200, {})) + if result: + id_server_unbind_result = "success" + else: + id_server_unbind_result = "no-support" + + defer.returnValue((200, { + "id_server_unbind_result": id_server_unbind_result, + })) class ShutdownRoomRestServlet(ClientV1RestServlet): diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index eeae466d82..372648cafd 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -209,10 +209,17 @@ class DeactivateAccountRestServlet(RestServlet): yield self.auth_handler.validate_user_via_ui_auth( requester, body, self.hs.get_ip_from_request(request), ) - yield self._deactivate_account_handler.deactivate_account( + result = yield self._deactivate_account_handler.deactivate_account( requester.user.to_string(), erase, ) - defer.returnValue((200, {})) + if result: + id_server_unbind_result = "success" + else: + id_server_unbind_result = "no-support" + + defer.returnValue((200, { + "id_server_unbind_result": id_server_unbind_result, + })) class EmailThreepidRequestTokenRestServlet(RestServlet): @@ -364,7 +371,7 @@ class ThreepidDeleteRestServlet(RestServlet): user_id = requester.user.to_string() try: - yield self.auth_handler.delete_threepid( + ret = yield self.auth_handler.delete_threepid( user_id, body['medium'], body['address'] ) except Exception: @@ -374,7 +381,14 @@ class ThreepidDeleteRestServlet(RestServlet): logger.exception("Failed to remove threepid") raise SynapseError(500, "Failed to remove threepid") - defer.returnValue((200, {})) + if ret: + id_server_unbind_result = "success" + else: + id_server_unbind_result = "no-support" + + defer.returnValue((200, { + "id_server_unbind_result": id_server_unbind_result, + })) class WhoamiRestServlet(RestServlet): -- cgit 1.4.1 From bf7598f582cdfbd7db0fed55afce28bcc51a4801 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Aug 2018 10:09:56 +0100 Subject: Log when we 3pid/unbind request fails --- synapse/handlers/identity.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index a0f5fecc96..5feb3f22a6 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -188,8 +188,10 @@ class IdentityHandler(BaseHandler): except HttpResponseException as e: if e.code in (400, 404, 501,): # The remote server probably doesn't support unbinding (yet) + logger.warn("Received %d response while unbinding threepid", e.code) defer.returnValue(False) else: + logger.error("Failed to unbind threepid on identity server: %s", e) raise SynapseError(502, "Failed to contact identity server") defer.returnValue(True) -- cgit 1.4.1 From b179537f2a51f4de52e2625939cc32eeba75cd6b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Aug 2018 10:29:48 +0100 Subject: Move clean_room_for_join to master --- synapse/handlers/federation.py | 16 ++++++++++++++-- synapse/replication/http/federation.py | 35 ++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 37d2307d0a..acabca1d25 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -50,6 +50,7 @@ from synapse.crypto.event_signing import ( ) from synapse.events.validator import EventValidator from synapse.replication.http.federation import ( + ReplicationCleanRoomRestServlet, ReplicationFederationSendEventsRestServlet, ) from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet @@ -104,6 +105,9 @@ class FederationHandler(BaseHandler): self._notify_user_membership_change = ( ReplicationUserJoinedLeftRoomRestServlet.make_client(hs) ) + self._clean_room_for_join_client = ( + ReplicationCleanRoomRestServlet.make_client(hs) + ) # When joining a room we need to queue any events for that room up self.room_queues = {} @@ -2388,8 +2392,16 @@ class FederationHandler(BaseHandler): ) def _clean_room_for_join(self, room_id): - # TODO move this out to master - return self.store.clean_room_for_join(room_id) + """Called to clean up any data in DB for a given room, ready for the + server to join the room. + + Args: + room_id (str) + """ + if self.config.worker_app: + return self._clean_room_for_join_client(room_id) + else: + return self.store.clean_room_for_join(room_id) def user_joined_room(self, user, room_id): """Called when a new user has joined the room diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 3e6cbbf5a1..7b0b1cd32e 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -256,7 +256,42 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint): defer.returnValue((200, result)) +class ReplicationCleanRoomRestServlet(ReplicationEndpoint): + """Called to clean up any data in DB for a given room, ready for the + server to join the room. + + Request format: + + POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id + + {} + """ + + NAME = "fed_cleanup_room" + PATH_ARGS = ("room_id",) + + def __init__(self, hs): + super(ReplicationCleanRoomRestServlet, self).__init__(hs) + + self.store = hs.get_datastore() + + @staticmethod + def _serialize_payload(room_id, args): + """ + Args: + room_id (str) + """ + return {} + + @defer.inlineCallbacks + def _handle_request(self, request, room_id): + yield self.store.clean_room_for_join(room_id) + + defer.returnValue((200, {})) + + def register_servlets(hs, http_server): ReplicationFederationSendEventsRestServlet(hs).register(http_server) ReplicationFederationSendEduRestServlet(hs).register(http_server) ReplicationGetQueryRestServlet(hs).register(http_server) + ReplicationCleanRoomRestServlet(hs).register(http_server) -- cgit 1.4.1 From c74c71128d164ce7aa6b50538ab960ec85f7a8da Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Tue, 14 Aug 2018 15:06:24 +0100 Subject: remove blank line --- synapse/handlers/register.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 54e3434928..f03ee1476b 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -534,4 +534,3 @@ class RegistrationHandler(BaseHandler): remote_room_hosts=remote_room_hosts, action="join", ) - -- cgit 1.4.1 From 488ffe6fdb36c7479052096489c683777597c2aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Aug 2018 14:17:18 +0100 Subject: Use federation handler function rather than duplicate This involves renaming _persist_events to be a public function. --- synapse/handlers/federation.py | 14 +++++------ synapse/replication/http/federation.py | 44 +++------------------------------- 2 files changed, 10 insertions(+), 48 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index acabca1d25..9a37d627ca 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1175,7 +1175,7 @@ class FederationHandler(BaseHandler): ) context = yield self.state_handler.compute_event_context(event) - yield self._persist_events([(event, context)]) + yield self.persist_events_and_notify([(event, context)]) defer.returnValue(event) @@ -1206,7 +1206,7 @@ class FederationHandler(BaseHandler): ) context = yield self.state_handler.compute_event_context(event) - yield self._persist_events([(event, context)]) + yield self.persist_events_and_notify([(event, context)]) defer.returnValue(event) @@ -1449,7 +1449,7 @@ class FederationHandler(BaseHandler): event, context ) - yield self._persist_events( + yield self.persist_events_and_notify( [(event, context)], backfilled=backfilled, ) @@ -1487,7 +1487,7 @@ class FederationHandler(BaseHandler): ], consumeErrors=True, )) - yield self._persist_events( + yield self.persist_events_and_notify( [ (ev_info["event"], context) for ev_info, context in zip(event_infos, contexts) @@ -1575,7 +1575,7 @@ class FederationHandler(BaseHandler): raise events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR - yield self._persist_events( + yield self.persist_events_and_notify( [ (e, events_to_context[e.event_id]) for e in itertools.chain(auth_events, state) @@ -1586,7 +1586,7 @@ class FederationHandler(BaseHandler): event, old_state=state ) - yield self._persist_events( + yield self.persist_events_and_notify( [(event, new_event_context)], ) @@ -2327,7 +2327,7 @@ class FederationHandler(BaseHandler): raise AuthError(403, "Third party certificate was invalid") @defer.inlineCallbacks - def _persist_events(self, event_and_contexts, backfilled=False): + def persist_events_and_notify(self, event_and_contexts, backfilled=False): """Persists events and tells the notifier/pushers about them, if necessary. diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 7b0b1cd32e..2ddd18f73b 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -17,13 +17,10 @@ import logging from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership from synapse.events import FrozenEvent from synapse.events.snapshot import EventContext from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint -from synapse.types import UserID -from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -55,9 +52,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): self.store = hs.get_datastore() self.clock = hs.get_clock() - self.is_mine_id = hs.is_mine_id - self.notifier = hs.get_notifier() - self.pusher_pool = hs.get_pusherpool() + self.federation_handler = hs.get_handlers().federation_handler @staticmethod @defer.inlineCallbacks @@ -114,45 +109,12 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): len(event_and_contexts), ) - max_stream_id = yield self.store.persist_events( - event_and_contexts, - backfilled=backfilled + yield self.federation_handler.persist_events_and_notify( + event_and_contexts, backfilled, ) - if not backfilled: - for event, _ in event_and_contexts: - self._notify_persisted_event(event, max_stream_id) - defer.returnValue((200, {})) - def _notify_persisted_event(self, event, max_stream_id): - extra_users = [] - if event.type == EventTypes.Member: - target_user_id = event.state_key - - # We notify for memberships if its an invite for one of our - # users - if event.internal_metadata.is_outlier(): - if event.membership != Membership.INVITE: - if not self.is_mine_id(target_user_id): - return - - target_user = UserID.from_string(target_user_id) - extra_users.append(target_user) - elif event.internal_metadata.is_outlier(): - return - - event_stream_id = event.internal_metadata.stream_ordering - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) - - run_in_background( - self.pusher_pool.on_new_notifications, - event_stream_id, max_stream_id, - ) - class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): """Handles EDUs newly received from federation, including persisting and -- cgit 1.4.1