From a7b726ad181935da16ede550e1202b77632823f3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 30 Dec 2017 18:40:19 +0000 Subject: federation_client: clean up imports --- synapse/federation/federation_client.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b8f02f5391..0f754f933f 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -14,29 +14,26 @@ # limitations under the License. +import copy +import itertools +import logging +import random + from twisted.internet import defer -from .federation_base import FederationBase from synapse.api.constants import Membership - from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) -from synapse.util import unwrapFirstError, logcontext -from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.logutils import log_function -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.events import FrozenEvent, builder +from synapse.federation.federation_base import FederationBase import synapse.metrics - +from synapse.util import logcontext, unwrapFirstError +from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination -import copy -import itertools -import logging -import random - - logger = logging.getLogger(__name__) -- cgit 1.5.1 From 65abc90fb6f10a91d72bc4518a8d155251d23a6f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 30 Dec 2017 18:40:19 +0000 Subject: federation_server: clean up imports --- synapse/federation/federation_server.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index a2327f24b6..5fdfbbeee5 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -12,25 +12,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +import logging -from .federation_base import FederationBase -from .units import Transaction, Edu +import simplejson as json +from twisted.internet import defer +from synapse.api.errors import AuthError, FederationError, SynapseError +from synapse.crypto.event_signing import compute_event_signature +from synapse.events import FrozenEvent +from synapse.federation.federation_base import FederationBase +from synapse.federation.units import Edu, Transaction +import synapse.metrics +from synapse.types import get_domain_from_id from synapse.util import async +from synapse.util.caches.response_cache import ResponseCache from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.logutils import log_function -from synapse.util.caches.response_cache import ResponseCache -from synapse.events import FrozenEvent -from synapse.types import get_domain_from_id -import synapse.metrics - -from synapse.api.errors import AuthError, FederationError, SynapseError - -from synapse.crypto.event_signing import compute_event_signature - -import simplejson as json -import logging # when processing incoming transactions, we try to handle multiple rooms in # parallel, up to this limit. -- cgit 1.5.1 From 3079f80d4a67714294660e27214cd2a55bb7ecf1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 30 Dec 2017 18:40:19 +0000 Subject: Factor out `event_from_pdu_json` turns out we have two copies of this, and neither needs to be an instance method --- synapse/federation/federation_base.py | 20 ++++++++++++++++++ synapse/federation/federation_client.py | 36 ++++++++++++++------------------- synapse/federation/federation_server.py | 25 +++++++++-------------- 3 files changed, 44 insertions(+), 37 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index a0f5d40eb3..6476cea895 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -16,6 +16,7 @@ import logging from synapse.api.errors import SynapseError from synapse.crypto.event_signing import check_event_content_hash +from synapse.events import FrozenEvent from synapse.events.utils import prune_event from synapse.util import unwrapFirstError, logcontext from twisted.internet import defer @@ -169,3 +170,22 @@ class FederationBase(object): ) return deferreds + + +def event_from_pdu_json(pdu_json, outlier=False): + """Construct a FrozenEvent from an event json received over federation + + Args: + pdu_json (object): pdu as received over federation + outlier (bool): True to mark this event as an outlier + + Returns: + FrozenEvent + """ + event = FrozenEvent( + pdu_json + ) + + event.internal_metadata.outlier = outlier + + return event diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 0f754f933f..b1fe03f702 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -25,8 +25,11 @@ from synapse.api.constants import Membership from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) -from synapse.events import FrozenEvent, builder -from synapse.federation.federation_base import FederationBase +from synapse.events import builder +from synapse.federation.federation_base import ( + FederationBase, + event_from_pdu_json, +) import synapse.metrics from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache @@ -181,7 +184,7 @@ class FederationClient(FederationBase): logger.debug("backfill transaction_data=%s", repr(transaction_data)) pdus = [ - self.event_from_pdu_json(p, outlier=False) + event_from_pdu_json(p, outlier=False) for p in transaction_data["pdus"] ] @@ -241,7 +244,7 @@ class FederationClient(FederationBase): logger.debug("transaction_data %r", transaction_data) pdu_list = [ - self.event_from_pdu_json(p, outlier=outlier) + event_from_pdu_json(p, outlier=outlier) for p in transaction_data["pdus"] ] @@ -333,11 +336,11 @@ class FederationClient(FederationBase): ) pdus = [ - self.event_from_pdu_json(p, outlier=True) for p in result["pdus"] + event_from_pdu_json(p, outlier=True) for p in result["pdus"] ] auth_chain = [ - self.event_from_pdu_json(p, outlier=True) + event_from_pdu_json(p, outlier=True) for p in result.get("auth_chain", []) ] @@ -438,7 +441,7 @@ class FederationClient(FederationBase): ) auth_chain = [ - self.event_from_pdu_json(p, outlier=True) + event_from_pdu_json(p, outlier=True) for p in res["auth_chain"] ] @@ -567,12 +570,12 @@ class FederationClient(FederationBase): logger.debug("Got content: %s", content) state = [ - self.event_from_pdu_json(p, outlier=True) + event_from_pdu_json(p, outlier=True) for p in content.get("state", []) ] auth_chain = [ - self.event_from_pdu_json(p, outlier=True) + event_from_pdu_json(p, outlier=True) for p in content.get("auth_chain", []) ] @@ -647,7 +650,7 @@ class FederationClient(FederationBase): logger.debug("Got response to send_invite: %s", pdu_dict) - pdu = self.event_from_pdu_json(pdu_dict) + pdu = event_from_pdu_json(pdu_dict) # Check signatures are correct. pdu = yield self._check_sigs_and_hash(pdu) @@ -737,7 +740,7 @@ class FederationClient(FederationBase): ) auth_chain = [ - self.event_from_pdu_json(e) + event_from_pdu_json(e) for e in content["auth_chain"] ] @@ -785,7 +788,7 @@ class FederationClient(FederationBase): ) events = [ - self.event_from_pdu_json(e) + event_from_pdu_json(e) for e in content.get("events", []) ] @@ -802,15 +805,6 @@ class FederationClient(FederationBase): defer.returnValue(signed_events) - def event_from_pdu_json(self, pdu_json, outlier=False): - event = FrozenEvent( - pdu_json - ) - - event.internal_metadata.outlier = outlier - - return event - @defer.inlineCallbacks def forward_third_party_invite(self, destinations, room_id, event_dict): for destination in destinations: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5fdfbbeee5..9849953c9b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -19,8 +19,10 @@ from twisted.internet import defer from synapse.api.errors import AuthError, FederationError, SynapseError from synapse.crypto.event_signing import compute_event_signature -from synapse.events import FrozenEvent -from synapse.federation.federation_base import FederationBase +from synapse.federation.federation_base import ( + FederationBase, + event_from_pdu_json, +) from synapse.federation.units import Edu, Transaction import synapse.metrics from synapse.types import get_domain_from_id @@ -169,7 +171,7 @@ class FederationServer(FederationBase): p["age_ts"] = request_time - int(p["age"]) del p["age"] - event = self.event_from_pdu_json(p) + event = event_from_pdu_json(p) room_id = event.room_id pdus_by_room.setdefault(room_id, []).append(event) @@ -343,7 +345,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def on_invite_request(self, origin, content): - pdu = self.event_from_pdu_json(content) + pdu = event_from_pdu_json(content) ret_pdu = yield self.handler.on_invite_request(origin, pdu) time_now = self._clock.time_msec() defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)})) @@ -351,7 +353,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def on_send_join_request(self, origin, content): logger.debug("on_send_join_request: content: %s", content) - pdu = self.event_from_pdu_json(content) + pdu = event_from_pdu_json(content) logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures) res_pdus = yield self.handler.on_send_join_request(origin, pdu) time_now = self._clock.time_msec() @@ -371,7 +373,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def on_send_leave_request(self, origin, content): logger.debug("on_send_leave_request: content: %s", content) - pdu = self.event_from_pdu_json(content) + pdu = event_from_pdu_json(content) logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures) yield self.handler.on_send_leave_request(origin, pdu) defer.returnValue((200, {})) @@ -408,7 +410,7 @@ class FederationServer(FederationBase): """ with (yield self._server_linearizer.queue((origin, room_id))): auth_chain = [ - self.event_from_pdu_json(e) + event_from_pdu_json(e) for e in content["auth_chain"] ] @@ -583,15 +585,6 @@ class FederationServer(FederationBase): def __str__(self): return "" % self.server_name - def event_from_pdu_json(self, pdu_json, outlier=False): - event = FrozenEvent( - pdu_json - ) - - event.internal_metadata.outlier = outlier - - return event - @defer.inlineCallbacks def exchange_third_party_invite( self, -- cgit 1.5.1 From bd91857028e0b7adf046a379a0eee030a92c1249 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 30 Dec 2017 18:40:19 +0000 Subject: Check missing fields in event_from_pdu_json Return a 400 rather than a 500 when somebody messes up their send_join --- synapse/federation/federation_base.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 6476cea895..7918d3e442 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -18,6 +18,7 @@ from synapse.api.errors import SynapseError from synapse.crypto.event_signing import check_event_content_hash from synapse.events import FrozenEvent from synapse.events.utils import prune_event +from synapse.http.servlet import assert_params_in_request from synapse.util import unwrapFirstError, logcontext from twisted.internet import defer @@ -181,7 +182,13 @@ def event_from_pdu_json(pdu_json, outlier=False): Returns: FrozenEvent + + Raises: + SynapseError: if the pdu is missing required fields """ + # we could probably enforce a bunch of other fields here (room_id, sender, + # origin, etc etc) + assert_params_in_request(pdu_json, ('event_id', 'type')) event = FrozenEvent( pdu_json ) -- cgit 1.5.1 From a027c2af8d348554cad4855094a6f46ef21bfad7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 15 Jan 2018 18:20:30 +0000 Subject: Metrics for events processed in appservice and fed sender More metrics I wished I'd had --- synapse/federation/transaction_queue.py | 4 ++++ synapse/handlers/appservice.py | 7 +++++++ 2 files changed, 11 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 3e7809b04f..9d39f46583 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -42,6 +42,8 @@ sent_edus_counter = client_metrics.register_counter("sent_edus") sent_transactions_counter = client_metrics.register_counter("sent_transactions") +events_processed_counter = client_metrics.register_counter("events_processed") + class TransactionQueue(object): """This class makes sure we only have one transaction in flight at @@ -205,6 +207,8 @@ class TransactionQueue(object): self._send_pdu(event, destinations) + events_processed_counter.inc_by(len(events)) + yield self.store.update_federation_out_pos( "events", next_token ) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index feca3e4c10..3dd3fa2a27 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -15,6 +15,7 @@ from twisted.internet import defer +import synapse from synapse.api.constants import EventTypes from synapse.util.metrics import Measure from synapse.util.logcontext import make_deferred_yieldable, preserve_fn @@ -23,6 +24,10 @@ import logging logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) + +events_processed_counter = metrics.register_counter("events_processed") + def log_failure(failure): logger.error( @@ -103,6 +108,8 @@ class ApplicationServicesHandler(object): service, event ) + events_processed_counter.inc_by(len(events)) + yield self.store.set_appservice_last_pos(upper_bound) finally: self.is_processing = False -- cgit 1.5.1 From ab9f844aaf3662a64dbc4c56077e9fa37bc7d5d0 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 22 Jan 2018 19:11:18 +0100 Subject: Add federation_domain_whitelist option (#2820) Add federation_domain_whitelist gives a way to restrict which domains your HS is allowed to federate with. useful mainly for gracefully preventing a private but internet-connected HS from trying to federate to the wider public Matrix network --- synapse/api/errors.py | 26 ++++++++++++++++++++++++++ synapse/config/server.py | 22 ++++++++++++++++++++++ synapse/federation/federation_client.py | 5 ++++- synapse/federation/transaction_queue.py | 4 +++- synapse/federation/transport/client.py | 3 +++ synapse/federation/transport/server.py | 9 ++++++++- synapse/handlers/device.py | 4 ++++ synapse/handlers/e2e_keys.py | 8 +++++++- synapse/handlers/federation.py | 4 ++++ synapse/http/matrixfederationclient.py | 28 +++++++++++++++++++++++++++- synapse/rest/key/v2/remote_key_resource.py | 8 ++++++++ synapse/rest/media/v1/media_repository.py | 19 +++++++++++++++++-- synapse/util/retryutils.py | 12 ++++++++++++ tests/utils.py | 1 + 14 files changed, 146 insertions(+), 7 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 46b0d7b34c..aa15f73f36 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -141,6 +141,32 @@ class RegistrationError(SynapseError): pass +class FederationDeniedError(SynapseError): + """An error raised when the server tries to federate with a server which + is not on its federation whitelist. + + Attributes: + destination (str): The destination which has been denied + """ + + def __init__(self, destination): + """Raised by federation client or server to indicate that we are + are deliberately not attempting to contact a given server because it is + not on our federation whitelist. + + Args: + destination (str): the domain in question + """ + + self.destination = destination + + super(FederationDeniedError, self).__init__( + code=403, + msg="Federation denied with %s." % (self.destination,), + errcode=Codes.FORBIDDEN, + ) + + class InteractiveAuthIncompleteError(Exception): """An error raised when UI auth is not yet complete diff --git a/synapse/config/server.py b/synapse/config/server.py index 436dd8a6fe..8f0b6d1f28 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -55,6 +55,17 @@ class ServerConfig(Config): "block_non_admin_invites", False, ) + # FIXME: federation_domain_whitelist needs sytests + self.federation_domain_whitelist = None + federation_domain_whitelist = config.get( + "federation_domain_whitelist", None + ) + # turn the whitelist into a hash for speed of lookup + if federation_domain_whitelist is not None: + self.federation_domain_whitelist = {} + for domain in federation_domain_whitelist: + self.federation_domain_whitelist[domain] = True + if self.public_baseurl is not None: if self.public_baseurl[-1] != '/': self.public_baseurl += '/' @@ -210,6 +221,17 @@ class ServerConfig(Config): # (except those sent by local server admins). The default is False. # block_non_admin_invites: True + # Restrict federation to the following whitelist of domains. + # N.B. we recommend also firewalling your federation listener to limit + # inbound federation traffic as early as possible, rather than relying + # purely on this application-layer restriction. If not specified, the + # default is to whitelist everything. + # + # federation_domain_whitelist: + # - lon.example.com + # - nyc.example.com + # - syd.example.com + # List of ports that Synapse should listen on, their purpose and their # configuration. listeners: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b1fe03f702..813907f7f2 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -23,7 +23,7 @@ from twisted.internet import defer from synapse.api.constants import Membership from synapse.api.errors import ( - CodeMessageException, HttpResponseException, SynapseError, + CodeMessageException, HttpResponseException, SynapseError, FederationDeniedError ) from synapse.events import builder from synapse.federation.federation_base import ( @@ -266,6 +266,9 @@ class FederationClient(FederationBase): except NotRetryingDestination as e: logger.info(e.message) continue + except FederationDeniedError as e: + logger.info(e.message) + continue except Exception as e: pdu_attempts[destination] = now diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 9d39f46583..a141ec9953 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -19,7 +19,7 @@ from twisted.internet import defer from .persistence import TransactionActions from .units import Transaction, Edu -from synapse.api.errors import HttpResponseException +from synapse.api.errors import HttpResponseException, FederationDeniedError from synapse.util import logcontext, PreserveLoggingContext from synapse.util.async import run_on_reactor from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter @@ -490,6 +490,8 @@ class TransactionQueue(object): (e.retry_last_ts + e.retry_interval) / 1000.0 ), ) + except FederationDeniedError as e: + logger.info(e) except Exception as e: logger.warn( "TX [%s] Failed to send transaction: %s", diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 1f3ce238f6..5488e82985 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -212,6 +212,9 @@ class TransportLayerClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if the remote destination + is not in our federation whitelist """ valid_memberships = {Membership.JOIN, Membership.LEAVE} if membership not in valid_memberships: diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 2b02b021ec..06c16ba4fa 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX -from synapse.api.errors import Codes, SynapseError +from synapse.api.errors import Codes, SynapseError, FederationDeniedError from synapse.http.server import JsonResource from synapse.http.servlet import ( parse_json_object_from_request, parse_integer_from_args, parse_string_from_args, @@ -81,6 +81,7 @@ class Authenticator(object): self.keyring = hs.get_keyring() self.server_name = hs.hostname self.store = hs.get_datastore() + self.federation_domain_whitelist = hs.config.federation_domain_whitelist # A method just so we can pass 'self' as the authenticator to the Servlets @defer.inlineCallbacks @@ -92,6 +93,12 @@ class Authenticator(object): "signatures": {}, } + if ( + self.federation_domain_whitelist is not None and + self.server_name not in self.federation_domain_whitelist + ): + raise FederationDeniedError(self.server_name) + if content is not None: json_request["content"] = content diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 2152efc692..0e83453851 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -14,6 +14,7 @@ # limitations under the License. from synapse.api import errors from synapse.api.constants import EventTypes +from synapse.api.errors import FederationDeniedError from synapse.util import stringutils from synapse.util.async import Linearizer from synapse.util.caches.expiringcache import ExpiringCache @@ -513,6 +514,9 @@ class DeviceListEduUpdater(object): # This makes it more likely that the device lists will # eventually become consistent. return + except FederationDeniedError as e: + logger.info(e) + return except Exception: # TODO: Remember that we are now out of sync and try again # later diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 5af8abf66b..9aa95f89e6 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -19,7 +19,9 @@ import logging from canonicaljson import encode_canonical_json from twisted.internet import defer -from synapse.api.errors import SynapseError, CodeMessageException +from synapse.api.errors import ( + SynapseError, CodeMessageException, FederationDeniedError, +) from synapse.types import get_domain_from_id, UserID from synapse.util.logcontext import preserve_fn, make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination @@ -140,6 +142,10 @@ class E2eKeysHandler(object): failures[destination] = { "status": 503, "message": "Not ready for retry", } + except FederationDeniedError as e: + failures[destination] = { + "status": 403, "message": "Federation Denied", + } except Exception as e: # include ConnectionRefused and other errors failures[destination] = { diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ac70730885..677532c87b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -22,6 +22,7 @@ from ._base import BaseHandler from synapse.api.errors import ( AuthError, FederationError, StoreError, CodeMessageException, SynapseError, + FederationDeniedError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator @@ -782,6 +783,9 @@ class FederationHandler(BaseHandler): except NotRetryingDestination as e: logger.info(e.message) continue + except FederationDeniedError as e: + logger.info(e) + continue except Exception as e: logger.exception( "Failed to backfill from %s because %s", diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 833496b72d..9145405cb0 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -27,7 +27,7 @@ import synapse.metrics from canonicaljson import encode_canonical_json from synapse.api.errors import ( - SynapseError, Codes, HttpResponseException, + SynapseError, Codes, HttpResponseException, FederationDeniedError, ) from signedjson.sign import sign_json @@ -123,11 +123,22 @@ class MatrixFederationHttpClient(object): Fails with ``HTTPRequestException``: if we get an HTTP response code >= 300. + Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist + (May also fail with plenty of other Exceptions for things like DNS failures, connection failures, SSL failures.) """ + if ( + self.hs.config.federation_domain_whitelist and + destination not in self.hs.config.federation_domain_whitelist + ): + raise FederationDeniedError(destination) + limiter = yield synapse.util.retryutils.get_retry_limiter( destination, self.clock, @@ -308,6 +319,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ if not json_data_callback: @@ -368,6 +382,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ def body_callback(method, url_bytes, headers_dict): @@ -422,6 +439,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ logger.debug("get_json args: %s", args) @@ -475,6 +495,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ response = yield self._request( @@ -518,6 +541,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ encoded_args = {} diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index cc2842aa72..17e6079cba 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -93,6 +93,7 @@ class RemoteKey(Resource): self.store = hs.get_datastore() self.version_string = hs.version_string self.clock = hs.get_clock() + self.federation_domain_whitelist = hs.config.federation_domain_whitelist def render_GET(self, request): self.async_render_GET(request) @@ -137,6 +138,13 @@ class RemoteKey(Resource): logger.info("Handling query for keys %r", query) store_queries = [] for server_name, key_ids in query.items(): + if ( + self.federation_domain_whitelist is not None and + server_name not in self.federation_domain_whitelist + ): + logger.debug("Federation denied with %s", server_name) + continue + if not key_ids: key_ids = (None,) for key_id in key_ids: diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 4f56bcf577..485db8577a 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -32,8 +32,9 @@ from .media_storage import MediaStorage from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.util.stringutils import random_string -from synapse.api.errors import SynapseError, HttpResponseException, \ - NotFoundError +from synapse.api.errors import ( + SynapseError, HttpResponseException, NotFoundError, FederationDeniedError, +) from synapse.util.async import Linearizer from synapse.util.stringutils import is_ascii @@ -75,6 +76,8 @@ class MediaRepository(object): self.recently_accessed_remotes = set() self.recently_accessed_locals = set() + self.federation_domain_whitelist = hs.config.federation_domain_whitelist + # List of StorageProviders where we should search for media and # potentially upload to. storage_providers = [] @@ -216,6 +219,12 @@ class MediaRepository(object): Deferred: Resolves once a response has successfully been written to request """ + if ( + self.federation_domain_whitelist is not None and + server_name not in self.federation_domain_whitelist + ): + raise FederationDeniedError(server_name) + self.mark_recently_accessed(server_name, media_id) # We linearize here to ensure that we don't try and download remote @@ -250,6 +259,12 @@ class MediaRepository(object): Returns: Deferred[dict]: The media_info of the file """ + if ( + self.federation_domain_whitelist is not None and + server_name not in self.federation_domain_whitelist + ): + raise FederationDeniedError(server_name) + # We linearize here to ensure that we don't try and download remote # media multiple times concurrently key = (server_name, media_id) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 1adedbb361..47b0bb5eb3 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -26,6 +26,18 @@ logger = logging.getLogger(__name__) class NotRetryingDestination(Exception): def __init__(self, retry_last_ts, retry_interval, destination): + """Raised by the limiter (and federation client) to indicate that we are + are deliberately not attempting to contact a given server. + + Args: + retry_last_ts (int): the unix ts in milliseconds of our last attempt + to contact the server. 0 indicates that the last attempt was + successful or that we've never actually attempted to connect. + retry_interval (int): the time in milliseconds to wait until the next + attempt. + destination (str): the domain in question + """ + msg = "Not retrying server %s." % (destination,) super(NotRetryingDestination, self).__init__(msg) diff --git a/tests/utils.py b/tests/utils.py index 44e5f75093..3116047892 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -57,6 +57,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): config.worker_app = None config.email_enable_notifs = False config.block_non_admin_invites = False + config.federation_domain_whitelist = None # disable user directory updates, because they get done in the # background, which upsets the test runner. -- cgit 1.5.1 From c3f79c9da56931453ab86a4c726da5a02f18fe1e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Mar 2018 16:17:08 +0000 Subject: Split out edu/query registration to a separate class --- synapse/federation/federation_server.py | 117 +++++++++++++++++++------------- synapse/handlers/device.py | 6 +- synapse/handlers/devicemessage.py | 2 +- synapse/handlers/directory.py | 2 +- synapse/handlers/e2e_keys.py | 2 +- synapse/handlers/presence.py | 10 +-- synapse/handlers/profile.py | 2 +- synapse/handlers/receipts.py | 2 +- synapse/handlers/typing.py | 2 +- synapse/server.py | 5 ++ 10 files changed, 90 insertions(+), 60 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9849953c9b..5b1914f2f4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -17,7 +17,7 @@ import logging import simplejson as json from twisted.internet import defer -from synapse.api.errors import AuthError, FederationError, SynapseError +from synapse.api.errors import AuthError, FederationError, SynapseError, NotFoundError from synapse.crypto.event_signing import compute_event_signature from synapse.federation.federation_base import ( FederationBase, @@ -56,6 +56,8 @@ class FederationServer(FederationBase): self._server_linearizer = async.Linearizer("fed_server") self._transaction_linearizer = async.Linearizer("fed_txn_handler") + self.registry = hs.get_federation_registry() + # We cache responses to state queries, as they take a while and often # come in waves. self._state_resp_cache = ResponseCache(hs, timeout_ms=30000) @@ -67,35 +69,6 @@ class FederationServer(FederationBase): """ self.handler = handler - def register_edu_handler(self, edu_type, handler): - if edu_type in self.edu_handlers: - raise KeyError("Already have an EDU handler for %s" % (edu_type,)) - - self.edu_handlers[edu_type] = handler - - def register_query_handler(self, query_type, handler): - """Sets the handler callable that will be used to handle an incoming - federation Query of the given type. - - Args: - query_type (str): Category name of the query, which should match - the string used by make_query. - handler (callable): Invoked to handle incoming queries of this type - - handler is invoked as: - result = handler(args) - - where 'args' is a dict mapping strings to strings of the query - arguments. It should return a Deferred that will eventually yield an - object to encode as JSON. - """ - if query_type in self.query_handlers: - raise KeyError( - "Already have a Query handler for %s" % (query_type,) - ) - - self.query_handlers[query_type] = handler - @defer.inlineCallbacks @log_function def on_backfill_request(self, origin, room_id, versions, limit): @@ -229,16 +202,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def received_edu(self, origin, edu_type, content): received_edus_counter.inc() - - if edu_type in self.edu_handlers: - try: - yield self.edu_handlers[edu_type](origin, content) - except SynapseError as e: - logger.info("Failed to handle edu %r: %r", edu_type, e) - except Exception as e: - logger.exception("Failed to handle edu %r", edu_type) - else: - logger.warn("Received EDU of type %s with no handler", edu_type) + yield self.registry.on_edu(edu_type, origin, content) @defer.inlineCallbacks @log_function @@ -328,14 +292,8 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def on_query_request(self, query_type, args): received_queries_counter.inc(query_type) - - if query_type in self.query_handlers: - response = yield self.query_handlers[query_type](args) - defer.returnValue((200, response)) - else: - defer.returnValue( - (404, "No handler for Query type '%s'" % (query_type,)) - ) + resp = yield self.registry.on_query(query_type, args) + defer.returnValue((200, resp)) @defer.inlineCallbacks def on_make_join_request(self, room_id, user_id): @@ -607,3 +565,66 @@ class FederationServer(FederationBase): origin, room_id, event_dict ) defer.returnValue(ret) + + +class FederationHandlerRegistry(object): + """Allows classes to register themselves as handlers for a given EDU or + query type for incoming federation traffic. + """ + def __init__(self): + self.edu_handlers = {} + self.query_handlers = {} + + def register_edu_handler(self, edu_type, handler): + """Sets the handler callable that will be used to handle an incoming + federation EDU of the given type. + + Args: + edu_type (str): The type of the incoming EDU to register handler for + handler (Callable[str, dict]): A callable invoked on incoming EDU + of the given type. The arguments are the origin server name and + the EDU contents. + """ + if edu_type in self.edu_handlers: + raise KeyError("Already have an EDU handler for %s" % (edu_type,)) + + self.edu_handlers[edu_type] = handler + + def register_query_handler(self, query_type, handler): + """Sets the handler callable that will be used to handle an incoming + federation query of the given type. + + Args: + query_type (str): Category name of the query, which should match + the string used by make_query. + handler (Callable[dict] -> Deferred[dict]): Invoked to handle + incoming queries of this type. The return will be yielded + on and the result used as the response to the query request. + """ + if query_type in self.query_handlers: + raise KeyError( + "Already have a Query handler for %s" % (query_type,) + ) + + self.query_handlers[query_type] = handler + + @defer.inlineCallbacks + def on_edu(self, edu_type, origin, content): + handler = self.edu_handlers.get(edu_type) + if not handler: + logger.warn("No handler registered for EDU type %s", edu_type) + + try: + yield handler(origin, content) + except SynapseError as e: + logger.info("Failed to handle edu %r: %r", edu_type, e) + except Exception as e: + logger.exception("Failed to handle edu %r", edu_type) + + def on_query(self, query_type, args): + handler = self.query_handlers.get(query_type) + if not handler: + logger.warn("No handler registered for query type %s", query_type) + raise NotFoundError("No handler for Query type '%s'" % (query_type,)) + + return handler(args) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 0e83453851..9e58dbe64e 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -41,10 +41,12 @@ class DeviceHandler(BaseHandler): self._edu_updater = DeviceListEduUpdater(hs, self) - self.federation.register_edu_handler( + federation_registry = hs.get_federation_registry() + + federation_registry.register_edu_handler( "m.device_list_update", self._edu_updater.incoming_device_list_update, ) - self.federation.register_query_handler( + federation_registry.register_query_handler( "user_devices", self.on_federation_query_user_devices, ) diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index d996aa90bb..f147a20b73 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -37,7 +37,7 @@ class DeviceMessageHandler(object): self.is_mine = hs.is_mine self.federation = hs.get_federation_sender() - hs.get_replication_layer().register_edu_handler( + hs.get_federation_registry().register_edu_handler( "m.direct_to_device", self.on_direct_to_device_edu ) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 8580ada60a..e955cb1f3c 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -37,7 +37,7 @@ class DirectoryHandler(BaseHandler): self.event_creation_handler = hs.get_event_creation_handler() self.federation = hs.get_replication_layer() - self.federation.register_query_handler( + hs.get_federation_registry().register_query_handler( "directory", self.on_directory_query ) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 9aa95f89e6..57f50a4e27 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -40,7 +40,7 @@ class E2eKeysHandler(object): # doesn't really work as part of the generic query API, because the # query request requires an object POST, but we abuse the # "query handler" interface. - self.federation.register_query_handler( + hs.get_federation_registry().register_query_handler( "client_keys", self.on_federation_query_client_keys ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index cb158ba962..b11ae78350 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -98,24 +98,26 @@ class PresenceHandler(object): self.state = hs.get_state_handler() - self.replication.register_edu_handler( + federation_registry = hs.get_federation_registry() + + federation_registry.register_edu_handler( "m.presence", self.incoming_presence ) - self.replication.register_edu_handler( + federation_registry.register_edu_handler( "m.presence_invite", lambda origin, content: self.invite_presence( observed_user=UserID.from_string(content["observed_user"]), observer_user=UserID.from_string(content["observer_user"]), ) ) - self.replication.register_edu_handler( + federation_registry.register_edu_handler( "m.presence_accept", lambda origin, content: self.accept_presence( observed_user=UserID.from_string(content["observed_user"]), observer_user=UserID.from_string(content["observer_user"]), ) ) - self.replication.register_edu_handler( + federation_registry.register_edu_handler( "m.presence_deny", lambda origin, content: self.deny_presence( observed_user=UserID.from_string(content["observed_user"]), diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index c9c2879038..c386c79bbd 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -32,7 +32,7 @@ class ProfileHandler(BaseHandler): super(ProfileHandler, self).__init__(hs) self.federation = hs.get_replication_layer() - self.federation.register_query_handler( + hs.get_federation_registry().register_query_handler( "profile", self.on_profile_query ) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 0525765272..3f215c2b4e 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -35,7 +35,7 @@ class ReceiptsHandler(BaseHandler): self.store = hs.get_datastore() self.hs = hs self.federation = hs.get_federation_sender() - hs.get_replication_layer().register_edu_handler( + hs.get_federation_registry().register_edu_handler( "m.receipt", self._received_remote_receipt ) self.clock = self.hs.get_clock() diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 82dedbbc99..77c0cf146f 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -56,7 +56,7 @@ class TypingHandler(object): self.federation = hs.get_federation_sender() - hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu) + hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu) hs.get_distributor().observe("user_left_room", self.user_left_room) diff --git a/synapse/server.py b/synapse/server.py index 5b6effbe31..1bc8d6f702 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -34,6 +34,7 @@ from synapse.events.builder import EventBuilderFactory from synapse.events.spamcheck import SpamChecker from synapse.federation import initialize_http_replication from synapse.federation.send_queue import FederationRemoteSendQueue +from synapse.federation.federation_server import FederationHandlerRegistry from synapse.federation.transport.client import TransportLayerClient from synapse.federation.transaction_queue import TransactionQueue from synapse.handlers import Handlers @@ -147,6 +148,7 @@ class HomeServer(object): 'groups_attestation_renewer', 'spam_checker', 'room_member_handler', + 'federation_registry', ] def __init__(self, hostname, **kwargs): @@ -387,6 +389,9 @@ class HomeServer(object): def build_room_member_handler(self): return RoomMemberHandler(self) + def build_federation_registry(self): + return FederationHandlerRegistry() + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) -- cgit 1.5.1 From e05bf34117de19705b36a4803085ea93f7381928 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Mar 2018 14:07:39 +0000 Subject: Move property setting from ReplicationLayer to FederationBase --- synapse/federation/federation_base.py | 6 ++++++ synapse/federation/federation_client.py | 1 + synapse/federation/federation_server.py | 6 ++++++ synapse/federation/replication.py | 22 ---------------------- 4 files changed, 13 insertions(+), 22 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 7918d3e442..79eaa31031 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -27,7 +27,13 @@ logger = logging.getLogger(__name__) class FederationBase(object): def __init__(self, hs): + self.hs = hs + + self.server_name = hs.hostname + self.keyring = hs.get_keyring() self.spam_checker = hs.get_spam_checker() + self.store = hs.get_datastore() + self._clock = hs.get_clock() @defer.inlineCallbacks def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False, diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 813907f7f2..38440da5b5 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -58,6 +58,7 @@ class FederationClient(FederationBase): self._clear_tried_cache, 60 * 1000, ) self.state = hs.get_state_handler() + self.transport_layer = hs.get_federation_transport_client() def _clear_tried_cache(self): """Clear pdu_destination_tried cache""" diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5b1914f2f4..dd73fc50b2 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -23,6 +23,8 @@ from synapse.federation.federation_base import ( FederationBase, event_from_pdu_json, ) + +from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction import synapse.metrics from synapse.types import get_domain_from_id @@ -56,6 +58,10 @@ class FederationServer(FederationBase): self._server_linearizer = async.Linearizer("fed_server") self._transaction_linearizer = async.Linearizer("fed_txn_handler") + self.transaction_actions = TransactionActions(self.store) + + self.handler = None + self.registry = hs.get_federation_registry() # We cache responses to state queries, as they take a while and often diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 62d865ec4b..b8b3a3f933 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -20,8 +20,6 @@ a given transport. from .federation_client import FederationClient from .federation_server import FederationServer -from .persistence import TransactionActions - import logging @@ -47,26 +45,6 @@ class ReplicationLayer(FederationClient, FederationServer): """ def __init__(self, hs, transport_layer): - self.server_name = hs.hostname - - self.keyring = hs.get_keyring() - - self.transport_layer = transport_layer - - self.federation_client = self - - self.store = hs.get_datastore() - - self.handler = None - self.edu_handlers = {} - self.query_handlers = {} - - self._clock = hs.get_clock() - - self.transaction_actions = TransactionActions(self.store) - - self.hs = hs - super(ReplicationLayer, self).__init__(hs) def __str__(self): -- cgit 1.5.1 From 265b993b8afd2501b2aa3a50670f39d6d97eddb7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Mar 2018 14:34:31 +0000 Subject: Split replication layer into two --- synapse/app/homeserver.py | 2 +- synapse/federation/federation_server.py | 10 +--------- synapse/federation/transport/server.py | 2 +- synapse/handlers/device.py | 3 +-- synapse/handlers/directory.py | 2 +- synapse/handlers/e2e_keys.py | 2 +- synapse/handlers/federation.py | 4 +--- synapse/handlers/presence.py | 1 - synapse/handlers/profile.py | 2 +- synapse/handlers/room_list.py | 2 +- synapse/handlers/room_member.py | 3 +-- synapse/server.py | 13 +++++++++---- 12 files changed, 19 insertions(+), 27 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e375f2bbcf..503f461ab4 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -348,7 +348,7 @@ def setup(config_options): hs.get_state_handler().start_caching() hs.get_datastore().start_profiling() hs.get_datastore().start_doing_background_updates() - hs.get_replication_layer().start_get_pdu_cache() + hs.get_replication_client().start_get_pdu_cache() register_memory_metrics(hs) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index dd73fc50b2..740ef96280 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -54,27 +54,19 @@ class FederationServer(FederationBase): super(FederationServer, self).__init__(hs) self.auth = hs.get_auth() + self.handler = hs.get_handlers().federation_handler self._server_linearizer = async.Linearizer("fed_server") self._transaction_linearizer = async.Linearizer("fed_txn_handler") self.transaction_actions = TransactionActions(self.store) - self.handler = None - self.registry = hs.get_federation_registry() # We cache responses to state queries, as they take a while and often # come in waves. self._state_resp_cache = ResponseCache(hs, timeout_ms=30000) - def set_handler(self, handler): - """Sets the handler that the replication layer will use to communicate - receipt of new PDUs from other home servers. The required methods are - documented on :py:class:`.ReplicationHandler`. - """ - self.handler = handler - @defer.inlineCallbacks @log_function def on_backfill_request(self, origin, room_id, versions, limit): diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 06c16ba4fa..04b83e691a 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1190,7 +1190,7 @@ GROUP_ATTESTATION_SERVLET_CLASSES = ( def register_servlets(hs, resource, authenticator, ratelimiter): for servletclass in FEDERATION_SERVLET_CLASSES: servletclass( - handler=hs.get_replication_layer(), + handler=hs.get_replication_server(), authenticator=authenticator, ratelimiter=ratelimiter, server_name=hs.hostname, diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9e58dbe64e..fcf41630d6 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -37,7 +37,6 @@ class DeviceHandler(BaseHandler): self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() self.federation_sender = hs.get_federation_sender() - self.federation = hs.get_replication_layer() self._edu_updater = DeviceListEduUpdater(hs, self) @@ -432,7 +431,7 @@ class DeviceListEduUpdater(object): def __init__(self, hs, device_handler): self.store = hs.get_datastore() - self.federation = hs.get_replication_layer() + self.federation = hs.get_replication_client() self.clock = hs.get_clock() self.device_handler = device_handler diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index e955cb1f3c..dfe04eb1c1 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -36,7 +36,7 @@ class DirectoryHandler(BaseHandler): self.appservice_handler = hs.get_application_service_handler() self.event_creation_handler = hs.get_event_creation_handler() - self.federation = hs.get_replication_layer() + self.federation = hs.get_replication_client() hs.get_federation_registry().register_query_handler( "directory", self.on_directory_query ) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 57f50a4e27..0ca8d036ee 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -32,7 +32,7 @@ logger = logging.getLogger(__name__) class E2eKeysHandler(object): def __init__(self, hs): self.store = hs.get_datastore() - self.federation = hs.get_replication_layer() + self.federation = hs.get_replication_client() self.device_handler = hs.get_device_handler() self.is_mine = hs.is_mine self.clock = hs.get_clock() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 520612683e..cfd4379160 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -68,7 +68,7 @@ class FederationHandler(BaseHandler): self.hs = hs self.store = hs.get_datastore() - self.replication_layer = hs.get_replication_layer() + self.replication_layer = hs.get_replication_client() self.state_handler = hs.get_state_handler() self.server_name = hs.hostname self.keyring = hs.get_keyring() @@ -78,8 +78,6 @@ class FederationHandler(BaseHandler): self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() - self.replication_layer.set_handler(self) - # When joining a room we need to queue any events for that room up self.room_queues = {} self._room_pdu_linearizer = Linearizer("fed_room_pdu") diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b11ae78350..a5e501897c 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -93,7 +93,6 @@ class PresenceHandler(object): self.store = hs.get_datastore() self.wheel_timer = WheelTimer() self.notifier = hs.get_notifier() - self.replication = hs.get_replication_layer() self.federation = hs.get_federation_sender() self.state = hs.get_state_handler() diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index c386c79bbd..0cfac60d74 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -31,7 +31,7 @@ class ProfileHandler(BaseHandler): def __init__(self, hs): super(ProfileHandler, self).__init__(hs) - self.federation = hs.get_replication_layer() + self.federation = hs.get_replication_client() hs.get_federation_registry().register_query_handler( "profile", self.on_profile_query ) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index dfa09141ed..f79bd8902f 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -409,7 +409,7 @@ class RoomListHandler(BaseHandler): def _get_remote_list_cached(self, server_name, limit=None, since_token=None, search_filter=None, include_all_networks=False, third_party_instance_id=None,): - repl_layer = self.hs.get_replication_layer() + repl_layer = self.hs.get_replication_client() if search_filter: # We can't cache when asking for search return repl_layer.get_public_rooms( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ed3b97730d..e2f0527712 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -55,7 +55,6 @@ class RoomMemberHandler(object): self.registration_handler = hs.get_handlers().registration_handler self.profile_handler = hs.get_profile_handler() self.event_creation_hander = hs.get_event_creation_handler() - self.replication_layer = hs.get_replication_layer() self.member_linearizer = Linearizer(name="member") @@ -212,7 +211,7 @@ class RoomMemberHandler(object): # if this is a join with a 3pid signature, we may need to turn a 3pid # invite into a normal invite before we can handle the join. if third_party_signed is not None: - yield self.replication_layer.exchange_third_party_invite( + yield self.federation_handler.exchange_third_party_invite( third_party_signed["sender"], target.to_string(), room_id, diff --git a/synapse/server.py b/synapse/server.py index 1bc8d6f702..894e9c2acf 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -32,7 +32,8 @@ from synapse.appservice.scheduler import ApplicationServiceScheduler from synapse.crypto.keyring import Keyring from synapse.events.builder import EventBuilderFactory from synapse.events.spamcheck import SpamChecker -from synapse.federation import initialize_http_replication +from synapse.federation.federation_client import FederationClient +from synapse.federation.federation_server import FederationServer from synapse.federation.send_queue import FederationRemoteSendQueue from synapse.federation.federation_server import FederationHandlerRegistry from synapse.federation.transport.client import TransportLayerClient @@ -100,7 +101,8 @@ class HomeServer(object): DEPENDENCIES = [ 'http_client', 'db_pool', - 'replication_layer', + 'replication_client', + 'replication_server', 'handlers', 'v1auth', 'auth', @@ -197,8 +199,11 @@ class HomeServer(object): def get_ratelimiter(self): return self.ratelimiter - def build_replication_layer(self): - return initialize_http_replication(self) + def build_replication_client(self): + return FederationClient(self) + + def build_replication_server(self): + return FederationServer(self) def build_handlers(self): return Handlers(self) -- cgit 1.5.1 From ea7b3c4b1b829703a780418e6bb6b7860a5b5451 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 11:00:04 +0000 Subject: Remove unused ReplicationLayer --- synapse/federation/__init__.py | 8 ------ synapse/federation/replication.py | 51 --------------------------------------- 2 files changed, 59 deletions(-) delete mode 100644 synapse/federation/replication.py (limited to 'synapse/federation') diff --git a/synapse/federation/__init__.py b/synapse/federation/__init__.py index 2e32d245ba..f5f0bdfca3 100644 --- a/synapse/federation/__init__.py +++ b/synapse/federation/__init__.py @@ -15,11 +15,3 @@ """ This package includes all the federation specific logic. """ - -from .replication import ReplicationLayer - - -def initialize_http_replication(hs): - transport = hs.get_federation_transport_client() - - return ReplicationLayer(hs, transport) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py deleted file mode 100644 index b8b3a3f933..0000000000 --- a/synapse/federation/replication.py +++ /dev/null @@ -1,51 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014-2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This layer is responsible for replicating with remote home servers using -a given transport. -""" - -from .federation_client import FederationClient -from .federation_server import FederationServer - -import logging - - -logger = logging.getLogger(__name__) - - -class ReplicationLayer(FederationClient, FederationServer): - """This layer is responsible for replicating with remote home servers over - the given transport. I.e., does the sending and receiving of PDUs to - remote home servers. - - The layer communicates with the rest of the server via a registered - ReplicationHandler. - - In more detail, the layer: - * Receives incoming data and processes it into transactions and pdus. - * Fetches any PDUs it thinks it might have missed. - * Keeps the current state for contexts up to date by applying the - suitable conflict resolution. - * Sends outgoing pdus wrapped in transactions. - * Fills out the references to previous pdus/transactions appropriately - for outgoing data. - """ - - def __init__(self, hs, transport_layer): - super(ReplicationLayer, self).__init__(hs) - - def __str__(self): - return "" % self.server_name -- cgit 1.5.1 From f43b6d6d9b7e6f6a94ba3e1886cec6ca864fad43 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 11:29:17 +0000 Subject: Fix docstring types --- synapse/federation/federation_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5b1914f2f4..90302c9537 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -581,7 +581,7 @@ class FederationHandlerRegistry(object): Args: edu_type (str): The type of the incoming EDU to register handler for - handler (Callable[str, dict]): A callable invoked on incoming EDU + handler (Callable[[str, dict]]): A callable invoked on incoming EDU of the given type. The arguments are the origin server name and the EDU contents. """ @@ -597,7 +597,7 @@ class FederationHandlerRegistry(object): Args: query_type (str): Category name of the query, which should match the string used by make_query. - handler (Callable[dict] -> Deferred[dict]): Invoked to handle + handler (Callable[[dict], Deferred[dict]]): Invoked to handle incoming queries of this type. The return will be yielded on and the result used as the response to the query request. """ -- cgit 1.5.1 From cea462e2857d3af2007521f131adb46e4f5ce6fe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Mar 2018 13:22:21 +0000 Subject: s/replication_server/federation_server --- synapse/federation/transport/server.py | 2 +- synapse/server.py | 4 ++-- tests/handlers/test_profile.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 04b83e691a..a66a6b0692 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1190,7 +1190,7 @@ GROUP_ATTESTATION_SERVLET_CLASSES = ( def register_servlets(hs, resource, authenticator, ratelimiter): for servletclass in FEDERATION_SERVLET_CLASSES: servletclass( - handler=hs.get_replication_server(), + handler=hs.get_federation_server(), authenticator=authenticator, ratelimiter=ratelimiter, server_name=hs.hostname, diff --git a/synapse/server.py b/synapse/server.py index 894e9c2acf..802a793848 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -102,7 +102,7 @@ class HomeServer(object): 'http_client', 'db_pool', 'replication_client', - 'replication_server', + 'federation_server', 'handlers', 'v1auth', 'auth', @@ -202,7 +202,7 @@ class HomeServer(object): def build_replication_client(self): return FederationClient(self) - def build_replication_server(self): + def build_federation_server(self): return FederationServer(self) def build_handlers(self): diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index c690437682..f9f828471a 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -52,7 +52,7 @@ class ProfileTestCase(unittest.TestCase): handlers=None, resource_for_federation=Mock(), replication_client=self.mock_federation, - replication_server=Mock(), + federation_server=Mock(), federation_registry=self.mock_registry, ratelimiter=NonCallableMock(spec_set=[ "send_message", -- cgit 1.5.1