summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_base.py27
-rw-r--r--synapse/federation/federation_client.py60
-rw-r--r--synapse/federation/federation_server.py46
-rw-r--r--synapse/federation/transaction_queue.py18
-rw-r--r--synapse/federation/transport/client.py3
-rw-r--r--synapse/federation/transport/server.py9
6 files changed, 98 insertions, 65 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index a0f5d40eb3..7918d3e442 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -16,7 +16,9 @@ import logging
 
 from synapse.api.errors import SynapseError
 from synapse.crypto.event_signing import check_event_content_hash
+from synapse.events import FrozenEvent
 from synapse.events.utils import prune_event
+from synapse.http.servlet import assert_params_in_request
 from synapse.util import unwrapFirstError, logcontext
 from twisted.internet import defer
 
@@ -169,3 +171,28 @@ class FederationBase(object):
             )
 
         return deferreds
+
+
+def event_from_pdu_json(pdu_json, outlier=False):
+    """Construct a FrozenEvent from an event json received over federation
+
+    Args:
+        pdu_json (object): pdu as received over federation
+        outlier (bool): True to mark this event as an outlier
+
+    Returns:
+        FrozenEvent
+
+    Raises:
+        SynapseError: if the pdu is missing required fields
+    """
+    # we could probably enforce a bunch of other fields here (room_id, sender,
+    # origin, etc etc)
+    assert_params_in_request(pdu_json, ('event_id', 'type'))
+    event = FrozenEvent(
+        pdu_json
+    )
+
+    event.internal_metadata.outlier = outlier
+
+    return event
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index b8f02f5391..813907f7f2 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -14,29 +14,29 @@
 # limitations under the License.
 
 
+import copy
+import itertools
+import logging
+import random
+
 from twisted.internet import defer
 
-from .federation_base import FederationBase
 from synapse.api.constants import Membership
-
 from synapse.api.errors import (
-    CodeMessageException, HttpResponseException, SynapseError,
+    CodeMessageException, HttpResponseException, SynapseError, FederationDeniedError
 )
-from synapse.util import unwrapFirstError, logcontext
+from synapse.events import builder
+from synapse.federation.federation_base import (
+    FederationBase,
+    event_from_pdu_json,
+)
+import synapse.metrics
+from synapse.util import logcontext, unwrapFirstError
 from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logutils import log_function
 from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
-from synapse.events import FrozenEvent, builder
-import synapse.metrics
-
+from synapse.util.logutils import log_function
 from synapse.util.retryutils import NotRetryingDestination
 
-import copy
-import itertools
-import logging
-import random
-
-
 logger = logging.getLogger(__name__)
 
 
@@ -184,7 +184,7 @@ class FederationClient(FederationBase):
         logger.debug("backfill transaction_data=%s", repr(transaction_data))
 
         pdus = [
-            self.event_from_pdu_json(p, outlier=False)
+            event_from_pdu_json(p, outlier=False)
             for p in transaction_data["pdus"]
         ]
 
@@ -244,7 +244,7 @@ class FederationClient(FederationBase):
                 logger.debug("transaction_data %r", transaction_data)
 
                 pdu_list = [
-                    self.event_from_pdu_json(p, outlier=outlier)
+                    event_from_pdu_json(p, outlier=outlier)
                     for p in transaction_data["pdus"]
                 ]
 
@@ -266,6 +266,9 @@ class FederationClient(FederationBase):
             except NotRetryingDestination as e:
                 logger.info(e.message)
                 continue
+            except FederationDeniedError as e:
+                logger.info(e.message)
+                continue
             except Exception as e:
                 pdu_attempts[destination] = now
 
@@ -336,11 +339,11 @@ class FederationClient(FederationBase):
         )
 
         pdus = [
-            self.event_from_pdu_json(p, outlier=True) for p in result["pdus"]
+            event_from_pdu_json(p, outlier=True) for p in result["pdus"]
         ]
 
         auth_chain = [
-            self.event_from_pdu_json(p, outlier=True)
+            event_from_pdu_json(p, outlier=True)
             for p in result.get("auth_chain", [])
         ]
 
@@ -441,7 +444,7 @@ class FederationClient(FederationBase):
         )
 
         auth_chain = [
-            self.event_from_pdu_json(p, outlier=True)
+            event_from_pdu_json(p, outlier=True)
             for p in res["auth_chain"]
         ]
 
@@ -570,12 +573,12 @@ class FederationClient(FederationBase):
                 logger.debug("Got content: %s", content)
 
                 state = [
-                    self.event_from_pdu_json(p, outlier=True)
+                    event_from_pdu_json(p, outlier=True)
                     for p in content.get("state", [])
                 ]
 
                 auth_chain = [
-                    self.event_from_pdu_json(p, outlier=True)
+                    event_from_pdu_json(p, outlier=True)
                     for p in content.get("auth_chain", [])
                 ]
 
@@ -650,7 +653,7 @@ class FederationClient(FederationBase):
 
         logger.debug("Got response to send_invite: %s", pdu_dict)
 
-        pdu = self.event_from_pdu_json(pdu_dict)
+        pdu = event_from_pdu_json(pdu_dict)
 
         # Check signatures are correct.
         pdu = yield self._check_sigs_and_hash(pdu)
@@ -740,7 +743,7 @@ class FederationClient(FederationBase):
         )
 
         auth_chain = [
-            self.event_from_pdu_json(e)
+            event_from_pdu_json(e)
             for e in content["auth_chain"]
         ]
 
@@ -788,7 +791,7 @@ class FederationClient(FederationBase):
             )
 
             events = [
-                self.event_from_pdu_json(e)
+                event_from_pdu_json(e)
                 for e in content.get("events", [])
             ]
 
@@ -805,15 +808,6 @@ class FederationClient(FederationBase):
 
         defer.returnValue(signed_events)
 
-    def event_from_pdu_json(self, pdu_json, outlier=False):
-        event = FrozenEvent(
-            pdu_json
-        )
-
-        event.internal_metadata.outlier = outlier
-
-        return event
-
     @defer.inlineCallbacks
     def forward_third_party_invite(self, destinations, room_id, event_dict):
         for destination in destinations:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index a2327f24b6..9849953c9b 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -12,25 +12,24 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from twisted.internet import defer
+import logging
 
-from .federation_base import FederationBase
-from .units import Transaction, Edu
+import simplejson as json
+from twisted.internet import defer
 
+from synapse.api.errors import AuthError, FederationError, SynapseError
+from synapse.crypto.event_signing import compute_event_signature
+from synapse.federation.federation_base import (
+    FederationBase,
+    event_from_pdu_json,
+)
+from synapse.federation.units import Edu, Transaction
+import synapse.metrics
+from synapse.types import get_domain_from_id
 from synapse.util import async
+from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.util.logutils import log_function
-from synapse.util.caches.response_cache import ResponseCache
-from synapse.events import FrozenEvent
-from synapse.types import get_domain_from_id
-import synapse.metrics
-
-from synapse.api.errors import AuthError, FederationError, SynapseError
-
-from synapse.crypto.event_signing import compute_event_signature
-
-import simplejson as json
-import logging
 
 # when processing incoming transactions, we try to handle multiple rooms in
 # parallel, up to this limit.
@@ -172,7 +171,7 @@ class FederationServer(FederationBase):
                 p["age_ts"] = request_time - int(p["age"])
                 del p["age"]
 
-            event = self.event_from_pdu_json(p)
+            event = event_from_pdu_json(p)
             room_id = event.room_id
             pdus_by_room.setdefault(room_id, []).append(event)
 
@@ -346,7 +345,7 @@ class FederationServer(FederationBase):
 
     @defer.inlineCallbacks
     def on_invite_request(self, origin, content):
-        pdu = self.event_from_pdu_json(content)
+        pdu = event_from_pdu_json(content)
         ret_pdu = yield self.handler.on_invite_request(origin, pdu)
         time_now = self._clock.time_msec()
         defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
@@ -354,7 +353,7 @@ class FederationServer(FederationBase):
     @defer.inlineCallbacks
     def on_send_join_request(self, origin, content):
         logger.debug("on_send_join_request: content: %s", content)
-        pdu = self.event_from_pdu_json(content)
+        pdu = event_from_pdu_json(content)
         logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures)
         res_pdus = yield self.handler.on_send_join_request(origin, pdu)
         time_now = self._clock.time_msec()
@@ -374,7 +373,7 @@ class FederationServer(FederationBase):
     @defer.inlineCallbacks
     def on_send_leave_request(self, origin, content):
         logger.debug("on_send_leave_request: content: %s", content)
-        pdu = self.event_from_pdu_json(content)
+        pdu = event_from_pdu_json(content)
         logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
         yield self.handler.on_send_leave_request(origin, pdu)
         defer.returnValue((200, {}))
@@ -411,7 +410,7 @@ class FederationServer(FederationBase):
         """
         with (yield self._server_linearizer.queue((origin, room_id))):
             auth_chain = [
-                self.event_from_pdu_json(e)
+                event_from_pdu_json(e)
                 for e in content["auth_chain"]
             ]
 
@@ -586,15 +585,6 @@ class FederationServer(FederationBase):
     def __str__(self):
         return "<ReplicationLayer(%s)>" % self.server_name
 
-    def event_from_pdu_json(self, pdu_json, outlier=False):
-        event = FrozenEvent(
-            pdu_json
-        )
-
-        event.internal_metadata.outlier = outlier
-
-        return event
-
     @defer.inlineCallbacks
     def exchange_third_party_invite(
             self,
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 7a3c9cbb70..a141ec9953 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -19,8 +19,8 @@ from twisted.internet import defer
 from .persistence import TransactionActions
 from .units import Transaction, Edu
 
-from synapse.api.errors import HttpResponseException
-from synapse.util import logcontext
+from synapse.api.errors import HttpResponseException, FederationDeniedError
+from synapse.util import logcontext, PreserveLoggingContext
 from synapse.util.async import run_on_reactor
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 from synapse.util.metrics import measure_func
@@ -42,6 +42,8 @@ sent_edus_counter = client_metrics.register_counter("sent_edus")
 
 sent_transactions_counter = client_metrics.register_counter("sent_transactions")
 
+events_processed_counter = client_metrics.register_counter("events_processed")
+
 
 class TransactionQueue(object):
     """This class makes sure we only have one transaction in flight at
@@ -146,7 +148,6 @@ class TransactionQueue(object):
         else:
             return not destination.startswith("localhost")
 
-    @defer.inlineCallbacks
     def notify_new_events(self, current_id):
         """This gets called when we have some new events we might want to
         send out to other servers.
@@ -156,6 +157,13 @@ class TransactionQueue(object):
         if self._is_processing:
             return
 
+        # fire off a processing loop in the background. It's likely it will
+        # outlast the current request, so run it in the sentinel logcontext.
+        with PreserveLoggingContext():
+            self._process_event_queue_loop()
+
+    @defer.inlineCallbacks
+    def _process_event_queue_loop(self):
         try:
             self._is_processing = True
             while True:
@@ -199,6 +207,8 @@ class TransactionQueue(object):
 
                     self._send_pdu(event, destinations)
 
+                events_processed_counter.inc_by(len(events))
+
                 yield self.store.update_federation_out_pos(
                     "events", next_token
                 )
@@ -480,6 +490,8 @@ class TransactionQueue(object):
                     (e.retry_last_ts + e.retry_interval) / 1000.0
                 ),
             )
+        except FederationDeniedError as e:
+            logger.info(e)
         except Exception as e:
             logger.warn(
                 "TX [%s] Failed to send transaction: %s",
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 1f3ce238f6..5488e82985 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -212,6 +212,9 @@ class TransportLayerClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if the remote destination
+            is not in our federation whitelist
         """
         valid_memberships = {Membership.JOIN, Membership.LEAVE}
         if membership not in valid_memberships:
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 2b02b021ec..06c16ba4fa 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -16,7 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.urls import FEDERATION_PREFIX as PREFIX
-from synapse.api.errors import Codes, SynapseError
+from synapse.api.errors import Codes, SynapseError, FederationDeniedError
 from synapse.http.server import JsonResource
 from synapse.http.servlet import (
     parse_json_object_from_request, parse_integer_from_args, parse_string_from_args,
@@ -81,6 +81,7 @@ class Authenticator(object):
         self.keyring = hs.get_keyring()
         self.server_name = hs.hostname
         self.store = hs.get_datastore()
+        self.federation_domain_whitelist = hs.config.federation_domain_whitelist
 
     # A method just so we can pass 'self' as the authenticator to the Servlets
     @defer.inlineCallbacks
@@ -92,6 +93,12 @@ class Authenticator(object):
             "signatures": {},
         }
 
+        if (
+            self.federation_domain_whitelist is not None and
+            self.server_name not in self.federation_domain_whitelist
+        ):
+            raise FederationDeniedError(self.server_name)
+
         if content is not None:
             json_request["content"] = content