diff options
Diffstat (limited to 'synapse/replication/http')
-rw-r--r-- | synapse/replication/http/__init__.py | 3 | ||||
-rw-r--r-- | synapse/replication/http/federation.py | 245 |
2 files changed, 247 insertions, 1 deletions
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 589ee94c66..19f214281e 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.http.server import JsonResource -from synapse.replication.http import membership, send_event +from synapse.replication.http import federation, membership, send_event REPLICATION_PREFIX = "/_synapse/replication" @@ -27,3 +27,4 @@ class ReplicationRestResource(JsonResource): def register_servlets(self, hs): send_event.register_servlets(hs, self) membership.register_servlets(hs, self) + federation.register_servlets(hs, self) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py new file mode 100644 index 0000000000..3fa7bd64c7 --- /dev/null +++ b/synapse/replication/http/federation.py @@ -0,0 +1,245 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, Membership +from synapse.events import FrozenEvent +from synapse.events.snapshot import EventContext +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint +from synapse.types import UserID +from synapse.util.logcontext import run_in_background +from synapse.util.metrics import Measure + +logger = logging.getLogger(__name__) + + +class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): + """Handles events newly received from federation, including persisting and + notifying. + + The API looks like: + + POST /_synapse/replication/fed_send_events/:txn_id + + { + "events": [{ + "event": { .. serialized event .. }, + "internal_metadata": { .. serialized internal_metadata .. }, + "rejected_reason": .., // The event.rejected_reason field + "context": { .. serialized event context .. }, + }], + "backfilled": false + """ + + NAME = "fed_send_events" + PATH_ARGS = () + + def __init__(self, hs): + super(ReplicationFederationSendEventsRestServlet, self).__init__(hs) + + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.is_mine_id = hs.is_mine_id + self.notifier = hs.get_notifier() + self.pusher_pool = hs.get_pusherpool() + + @staticmethod + @defer.inlineCallbacks + def _serialize_payload(store, event_and_contexts, backfilled): + """ + Args: + store + event_and_contexts (list[tuple[FrozenEvent, EventContext]]) + backfilled (bool): Whether or not the events are the result of + backfilling + """ + event_payloads = [] + for event, context in event_and_contexts: + serialized_context = yield context.serialize(event, store) + + event_payloads.append({ + "event": event.get_pdu_json(), + "internal_metadata": event.internal_metadata.get_dict(), + "rejected_reason": event.rejected_reason, + "context": serialized_context, + }) + + payload = { + "events": event_payloads, + "backfilled": backfilled, + } + + defer.returnValue(payload) + + @defer.inlineCallbacks + def _handle_request(self, request): + with Measure(self.clock, "repl_fed_send_events_parse"): + content = parse_json_object_from_request(request) + + backfilled = content["backfilled"] + + event_payloads = content["events"] + + event_and_contexts = [] + for event_payload in event_payloads: + event_dict = event_payload["event"] + internal_metadata = event_payload["internal_metadata"] + rejected_reason = event_payload["rejected_reason"] + event = FrozenEvent(event_dict, internal_metadata, rejected_reason) + + context = yield EventContext.deserialize( + self.store, event_payload["context"], + ) + + event_and_contexts.append((event, context)) + + logger.info( + "Got %d events from federation", + len(event_and_contexts), + ) + + max_stream_id = yield self.store.persist_events( + event_and_contexts, + backfilled=backfilled + ) + + if not backfilled: + for event, _ in event_and_contexts: + self._notify_persisted_event(event, max_stream_id) + + defer.returnValue((200, {})) + + def _notify_persisted_event(self, event, max_stream_id): + extra_users = [] + if event.type == EventTypes.Member: + target_user_id = event.state_key + + # We notify for memberships if its an invite for one of our + # users + if event.internal_metadata.is_outlier(): + if event.membership != Membership.INVITE: + if not self.is_mine_id(target_user_id): + return + + target_user = UserID.from_string(target_user_id) + extra_users.append(target_user) + elif event.internal_metadata.is_outlier(): + return + + event_stream_id = event.internal_metadata.stream_ordering + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + + run_in_background( + self.pusher_pool.on_new_notifications, + event_stream_id, max_stream_id, + ) + + +class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): + """Handles EDUs newly received from federation, including persisting and + notifying. + """ + + NAME = "fed_send_edu" + PATH_ARGS = ("edu_type",) + + def __init__(self, hs): + super(ReplicationFederationSendEduRestServlet, self).__init__(hs) + + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.registry = hs.get_federation_registry() + + @staticmethod + def _serialize_payload(edu_type, origin, content): + return { + "origin": origin, + "content": content, + } + + @defer.inlineCallbacks + def _handle_request(self, request, edu_type): + with Measure(self.clock, "repl_fed_send_edu_parse"): + content = parse_json_object_from_request(request) + + origin = content["origin"] + edu_content = content["content"] + + logger.info( + "Got %r edu from $s", + edu_type, origin, + ) + + result = yield self.registry.on_edu(edu_type, origin, edu_content) + + defer.returnValue((200, result)) + + +class ReplicationGetQueryRestServlet(ReplicationEndpoint): + """Handle responding to queries from federation. + """ + + NAME = "fed_query" + PATH_ARGS = ("query_type",) + + # This is a query, so let's not bother caching + CACHE = False + + def __init__(self, hs): + super(ReplicationGetQueryRestServlet, self).__init__(hs) + + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.registry = hs.get_federation_registry() + + @staticmethod + def _serialize_payload(query_type, args): + """ + Args: + query_type (str) + args (dict): The arguments received for the given query type + """ + return { + "args": args, + } + + @defer.inlineCallbacks + def _handle_request(self, request, query_type): + with Measure(self.clock, "repl_fed_query_parse"): + content = parse_json_object_from_request(request) + + args = content["args"] + + logger.info( + "Got %r query", + query_type, + ) + + result = yield self.registry.on_query(query_type, args) + + defer.returnValue((200, result)) + + +def register_servlets(hs, http_server): + ReplicationFederationSendEventsRestServlet(hs).register(http_server) + ReplicationFederationSendEduRestServlet(hs).register(http_server) + ReplicationGetQueryRestServlet(hs).register(http_server) |