diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/app/federation_reader.py | 10 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 43 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 46 | ||||
-rw-r--r-- | synapse/replication/http/__init__.py | 3 | ||||
-rw-r--r-- | synapse/replication/http/federation.py | 245 | ||||
-rw-r--r-- | synapse/replication/slave/storage/transactions.py | 13 | ||||
-rw-r--r-- | synapse/server.py | 6 | ||||
-rw-r--r-- | synapse/storage/events.py | 82 | ||||
-rw-r--r-- | synapse/storage/events_worker.py | 83 | ||||
-rw-r--r-- | synapse/storage/room.py | 32 |
10 files changed, 443 insertions, 120 deletions
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 7af00b8bcf..0c557a8242 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -32,9 +32,14 @@ from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.profile import SlavedProfileStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.pushers import SlavedPusherStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.tcp.client import ReplicationClientHandler @@ -49,6 +54,11 @@ logger = logging.getLogger("synapse.app.federation_reader") class FederationReaderSlavedStore( + SlavedProfileStore, + SlavedApplicationServiceStore, + SlavedPusherStore, + SlavedPushRuleStore, + SlavedReceiptsStore, SlavedEventStore, SlavedKeyStore, RoomStore, diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 2b62f687b6..d23c1cf13b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -39,6 +39,10 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction from synapse.http.endpoint import parse_server_name +from synapse.replication.http.federation import ( + ReplicationFederationSendEduRestServlet, + ReplicationGetQueryRestServlet, +) from synapse.types import get_domain_from_id from synapse.util import async from synapse.util.caches.response_cache import ResponseCache @@ -760,6 +764,8 @@ class FederationHandlerRegistry(object): if edu_type in self.edu_handlers: raise KeyError("Already have an EDU handler for %s" % (edu_type,)) + logger.info("Registering federation EDU handler for %r", edu_type) + self.edu_handlers[edu_type] = handler def register_query_handler(self, query_type, handler): @@ -778,6 +784,8 @@ class FederationHandlerRegistry(object): "Already have a Query handler for %s" % (query_type,) ) + logger.info("Registering federation query handler for %r", query_type) + self.query_handlers[query_type] = handler @defer.inlineCallbacks @@ -800,3 +808,38 @@ class FederationHandlerRegistry(object): raise NotFoundError("No handler for Query type '%s'" % (query_type,)) return handler(args) + + +class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): + def __init__(self, hs): + self.config = hs.config + self.http_client = hs.get_simple_http_client() + self.clock = hs.get_clock() + + self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs) + self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs) + + super(ReplicationFederationHandlerRegistry, self).__init__() + + def on_edu(self, edu_type, origin, content): + handler = self.edu_handlers.get(edu_type) + if handler: + return super(ReplicationFederationHandlerRegistry, self).on_edu( + edu_type, origin, content, + ) + + return self._send_edu( + edu_type=edu_type, + origin=origin, + content=content, + ) + + def on_query(self, query_type, args): + handler = self.query_handlers.get(query_type) + if handler: + return handler(args) + + return self._get_query_client( + query_type=query_type, + args=args, + ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0dffd44e22..37d2307d0a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -49,6 +49,10 @@ from synapse.crypto.event_signing import ( compute_event_signature, ) from synapse.events.validator import EventValidator +from synapse.replication.http.federation import ( + ReplicationFederationSendEventsRestServlet, +) +from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet from synapse.state import resolve_events_with_factory from synapse.types import UserID, get_domain_from_id from synapse.util import logcontext, unwrapFirstError @@ -91,6 +95,15 @@ class FederationHandler(BaseHandler): self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() self._server_notices_mxid = hs.config.server_notices_mxid + self.config = hs.config + self.http_client = hs.get_simple_http_client() + + self._send_events_to_master = ( + ReplicationFederationSendEventsRestServlet.make_client(hs) + ) + self._notify_user_membership_change = ( + ReplicationUserJoinedLeftRoomRestServlet.make_client(hs) + ) # When joining a room we need to queue any events for that room up self.room_queues = {} @@ -2297,7 +2310,7 @@ class FederationHandler(BaseHandler): for revocation. """ try: - response = yield self.hs.get_simple_http_client().get_json( + response = yield self.http_client.get_json( url, {"public_key": public_key} ) @@ -2322,14 +2335,21 @@ class FederationHandler(BaseHandler): Returns: Deferred """ - max_stream_id = yield self.store.persist_events( - event_and_contexts, - backfilled=backfilled, - ) + if self.config.worker_app: + yield self._send_events_to_master( + store=self.store, + event_and_contexts=event_and_contexts, + backfilled=backfilled + ) + else: + max_stream_id = yield self.store.persist_events( + event_and_contexts, + backfilled=backfilled, + ) - if not backfilled: # Never notify for backfilled events - for event, _ in event_and_contexts: - self._notify_persisted_event(event, max_stream_id) + if not backfilled: # Never notify for backfilled events + for event, _ in event_and_contexts: + self._notify_persisted_event(event, max_stream_id) def _notify_persisted_event(self, event, max_stream_id): """Checks to see if notifier/pushers should be notified about the @@ -2368,9 +2388,17 @@ class FederationHandler(BaseHandler): ) def _clean_room_for_join(self, room_id): + # TODO move this out to master return self.store.clean_room_for_join(room_id) def user_joined_room(self, user, room_id): """Called when a new user has joined the room """ - return user_joined_room(self.distributor, user, room_id) + if self.config.worker_app: + return self._notify_user_membership_change( + room_id=room_id, + user_id=user.to_string(), + change="joined", + ) + else: + return user_joined_room(self.distributor, user, room_id) diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 589ee94c66..19f214281e 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.http.server import JsonResource -from synapse.replication.http import membership, send_event +from synapse.replication.http import federation, membership, send_event REPLICATION_PREFIX = "/_synapse/replication" @@ -27,3 +27,4 @@ class ReplicationRestResource(JsonResource): def register_servlets(self, hs): send_event.register_servlets(hs, self) membership.register_servlets(hs, self) + federation.register_servlets(hs, self) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py new file mode 100644 index 0000000000..3fa7bd64c7 --- /dev/null +++ b/synapse/replication/http/federation.py @@ -0,0 +1,245 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, Membership +from synapse.events import FrozenEvent +from synapse.events.snapshot import EventContext +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint +from synapse.types import UserID +from synapse.util.logcontext import run_in_background +from synapse.util.metrics import Measure + +logger = logging.getLogger(__name__) + + +class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): + """Handles events newly received from federation, including persisting and + notifying. + + The API looks like: + + POST /_synapse/replication/fed_send_events/:txn_id + + { + "events": [{ + "event": { .. serialized event .. }, + "internal_metadata": { .. serialized internal_metadata .. }, + "rejected_reason": .., // The event.rejected_reason field + "context": { .. serialized event context .. }, + }], + "backfilled": false + """ + + NAME = "fed_send_events" + PATH_ARGS = () + + def __init__(self, hs): + super(ReplicationFederationSendEventsRestServlet, self).__init__(hs) + + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.is_mine_id = hs.is_mine_id + self.notifier = hs.get_notifier() + self.pusher_pool = hs.get_pusherpool() + + @staticmethod + @defer.inlineCallbacks + def _serialize_payload(store, event_and_contexts, backfilled): + """ + Args: + store + event_and_contexts (list[tuple[FrozenEvent, EventContext]]) + backfilled (bool): Whether or not the events are the result of + backfilling + """ + event_payloads = [] + for event, context in event_and_contexts: + serialized_context = yield context.serialize(event, store) + + event_payloads.append({ + "event": event.get_pdu_json(), + "internal_metadata": event.internal_metadata.get_dict(), + "rejected_reason": event.rejected_reason, + "context": serialized_context, + }) + + payload = { + "events": event_payloads, + "backfilled": backfilled, + } + + defer.returnValue(payload) + + @defer.inlineCallbacks + def _handle_request(self, request): + with Measure(self.clock, "repl_fed_send_events_parse"): + content = parse_json_object_from_request(request) + + backfilled = content["backfilled"] + + event_payloads = content["events"] + + event_and_contexts = [] + for event_payload in event_payloads: + event_dict = event_payload["event"] + internal_metadata = event_payload["internal_metadata"] + rejected_reason = event_payload["rejected_reason"] + event = FrozenEvent(event_dict, internal_metadata, rejected_reason) + + context = yield EventContext.deserialize( + self.store, event_payload["context"], + ) + + event_and_contexts.append((event, context)) + + logger.info( + "Got %d events from federation", + len(event_and_contexts), + ) + + max_stream_id = yield self.store.persist_events( + event_and_contexts, + backfilled=backfilled + ) + + if not backfilled: + for event, _ in event_and_contexts: + self._notify_persisted_event(event, max_stream_id) + + defer.returnValue((200, {})) + + def _notify_persisted_event(self, event, max_stream_id): + extra_users = [] + if event.type == EventTypes.Member: + target_user_id = event.state_key + + # We notify for memberships if its an invite for one of our + # users + if event.internal_metadata.is_outlier(): + if event.membership != Membership.INVITE: + if not self.is_mine_id(target_user_id): + return + + target_user = UserID.from_string(target_user_id) + extra_users.append(target_user) + elif event.internal_metadata.is_outlier(): + return + + event_stream_id = event.internal_metadata.stream_ordering + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + + run_in_background( + self.pusher_pool.on_new_notifications, + event_stream_id, max_stream_id, + ) + + +class ReplicationFederationSendEduRestServlet(ReplicationEndpoint): + """Handles EDUs newly received from federation, including persisting and + notifying. + """ + + NAME = "fed_send_edu" + PATH_ARGS = ("edu_type",) + + def __init__(self, hs): + super(ReplicationFederationSendEduRestServlet, self).__init__(hs) + + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.registry = hs.get_federation_registry() + + @staticmethod + def _serialize_payload(edu_type, origin, content): + return { + "origin": origin, + "content": content, + } + + @defer.inlineCallbacks + def _handle_request(self, request, edu_type): + with Measure(self.clock, "repl_fed_send_edu_parse"): + content = parse_json_object_from_request(request) + + origin = content["origin"] + edu_content = content["content"] + + logger.info( + "Got %r edu from $s", + edu_type, origin, + ) + + result = yield self.registry.on_edu(edu_type, origin, edu_content) + + defer.returnValue((200, result)) + + +class ReplicationGetQueryRestServlet(ReplicationEndpoint): + """Handle responding to queries from federation. + """ + + NAME = "fed_query" + PATH_ARGS = ("query_type",) + + # This is a query, so let's not bother caching + CACHE = False + + def __init__(self, hs): + super(ReplicationGetQueryRestServlet, self).__init__(hs) + + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.registry = hs.get_federation_registry() + + @staticmethod + def _serialize_payload(query_type, args): + """ + Args: + query_type (str) + args (dict): The arguments received for the given query type + """ + return { + "args": args, + } + + @defer.inlineCallbacks + def _handle_request(self, request, query_type): + with Measure(self.clock, "repl_fed_query_parse"): + content = parse_json_object_from_request(request) + + args = content["args"] + + logger.info( + "Got %r query", + query_type, + ) + + result = yield self.registry.on_query(query_type, args) + + defer.returnValue((200, result)) + + +def register_servlets(hs, http_server): + ReplicationFederationSendEventsRestServlet(hs).register(http_server) + ReplicationFederationSendEduRestServlet(hs).register(http_server) + ReplicationGetQueryRestServlet(hs).register(http_server) diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py index 9c9a5eadd9..cd6416c47c 100644 --- a/synapse/replication/slave/storage/transactions.py +++ b/synapse/replication/slave/storage/transactions.py @@ -13,19 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage import DataStore from synapse.storage.transactions import TransactionStore from ._base import BaseSlavedStore -class TransactionStore(BaseSlavedStore): - get_destination_retry_timings = TransactionStore.__dict__[ - "get_destination_retry_timings" - ] - _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__ - set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__ - _set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__ - - prep_send_transaction = DataStore.prep_send_transaction.__func__ - delivered_txn = DataStore.delivered_txn.__func__ +class TransactionStore(TransactionStore, BaseSlavedStore): + pass diff --git a/synapse/server.py b/synapse/server.py index 140be9ebe8..26228d8c72 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -36,6 +36,7 @@ from synapse.federation.federation_client import FederationClient from synapse.federation.federation_server import ( FederationHandlerRegistry, FederationServer, + ReplicationFederationHandlerRegistry, ) from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.federation.transaction_queue import TransactionQueue @@ -423,7 +424,10 @@ class HomeServer(object): return RoomMemberMasterHandler(self) def build_federation_registry(self): - return FederationHandlerRegistry() + if self.config.worker_app: + return ReplicationFederationHandlerRegistry(self) + else: + return FederationHandlerRegistry() def build_server_notices_manager(self): if self.config.worker_app: diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ce32e8fefd..ccfda5b0fa 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1436,88 +1436,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore ) @defer.inlineCallbacks - def have_events_in_timeline(self, event_ids): - """Given a list of event ids, check if we have already processed and - stored them as non outliers. - """ - rows = yield self._simple_select_many_batch( - table="events", - retcols=("event_id",), - column="event_id", - iterable=list(event_ids), - keyvalues={"outlier": False}, - desc="have_events_in_timeline", - ) - - defer.returnValue(set(r["event_id"] for r in rows)) - - @defer.inlineCallbacks - def have_seen_events(self, event_ids): - """Given a list of event ids, check if we have already processed them. - - Args: - event_ids (iterable[str]): - - Returns: - Deferred[set[str]]: The events we have already seen. - """ - results = set() - - def have_seen_events_txn(txn, chunk): - sql = ( - "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" - % (",".join("?" * len(chunk)), ) - ) - txn.execute(sql, chunk) - for (event_id, ) in txn: - results.add(event_id) - - # break the input up into chunks of 100 - input_iterator = iter(event_ids) - for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), - []): - yield self.runInteraction( - "have_seen_events", - have_seen_events_txn, - chunk, - ) - defer.returnValue(results) - - def get_seen_events_with_rejections(self, event_ids): - """Given a list of event ids, check if we rejected them. - - Args: - event_ids (list[str]) - - Returns: - Deferred[dict[str, str|None): - Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps - to None. - """ - if not event_ids: - return defer.succeed({}) - - def f(txn): - sql = ( - "SELECT e.event_id, reason FROM events as e " - "LEFT JOIN rejections as r ON e.event_id = r.event_id " - "WHERE e.event_id = ?" - ) - - res = {} - for event_id in event_ids: - txn.execute(sql, (event_id,)) - row = txn.fetchone() - if row: - _, rejected = row - res[event_id] = rejected - - return res - - return self.runInteraction("get_rejection_reasons", f) - - @defer.inlineCallbacks def count_daily_messages(self): """ Returns an estimate of the number of messages sent in the last day. diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 9b4cfeb899..59822178ff 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import itertools import logging from collections import namedtuple @@ -442,3 +443,85 @@ class EventsWorkerStore(SQLBaseStore): self._get_event_cache.prefill((original_ev.event_id,), cache_entry) defer.returnValue(cache_entry) + + @defer.inlineCallbacks + def have_events_in_timeline(self, event_ids): + """Given a list of event ids, check if we have already processed and + stored them as non outliers. + """ + rows = yield self._simple_select_many_batch( + table="events", + retcols=("event_id",), + column="event_id", + iterable=list(event_ids), + keyvalues={"outlier": False}, + desc="have_events_in_timeline", + ) + + defer.returnValue(set(r["event_id"] for r in rows)) + + @defer.inlineCallbacks + def have_seen_events(self, event_ids): + """Given a list of event ids, check if we have already processed them. + + Args: + event_ids (iterable[str]): + + Returns: + Deferred[set[str]]: The events we have already seen. + """ + results = set() + + def have_seen_events_txn(txn, chunk): + sql = ( + "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" + % (",".join("?" * len(chunk)), ) + ) + txn.execute(sql, chunk) + for (event_id, ) in txn: + results.add(event_id) + + # break the input up into chunks of 100 + input_iterator = iter(event_ids) + for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), + []): + yield self.runInteraction( + "have_seen_events", + have_seen_events_txn, + chunk, + ) + defer.returnValue(results) + + def get_seen_events_with_rejections(self, event_ids): + """Given a list of event ids, check if we rejected them. + + Args: + event_ids (list[str]) + + Returns: + Deferred[dict[str, str|None): + Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps + to None. + """ + if not event_ids: + return defer.succeed({}) + + def f(txn): + sql = ( + "SELECT e.event_id, reason FROM events as e " + "LEFT JOIN rejections as r ON e.event_id = r.event_id " + "WHERE e.event_id = ?" + ) + + res = {} + for event_id in event_ids: + txn.execute(sql, (event_id,)) + row = txn.fetchone() + if row: + _, rejected = row + res[event_id] = rejected + + return res + + return self.runInteraction("get_rejection_reasons", f) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 3147fb6827..3378fc77d1 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -41,6 +41,22 @@ RatelimitOverride = collections.namedtuple( class RoomWorkerStore(SQLBaseStore): + def get_room(self, room_id): + """Retrieve a room. + + Args: + room_id (str): The ID of the room to retrieve. + Returns: + A namedtuple containing the room information, or an empty list. + """ + return self._simple_select_one( + table="rooms", + keyvalues={"room_id": room_id}, + retcols=("room_id", "is_public", "creator"), + desc="get_room", + allow_none=True, + ) + def get_public_room_ids(self): return self._simple_select_onecol( table="rooms", @@ -215,22 +231,6 @@ class RoomStore(RoomWorkerStore, SearchStore): logger.error("store_room with room_id=%s failed: %s", room_id, e) raise StoreError(500, "Problem creating room.") - def get_room(self, room_id): - """Retrieve a room. - - Args: - room_id (str): The ID of the room to retrieve. - Returns: - A namedtuple containing the room information, or an empty list. - """ - return self._simple_select_one( - table="rooms", - keyvalues={"room_id": room_id}, - retcols=("room_id", "is_public", "creator"), - desc="get_room", - allow_none=True, - ) - @defer.inlineCallbacks def set_room_is_public(self, room_id, is_public): def set_room_is_public_txn(txn, next_id): |