summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_base.py21
-rw-r--r--synapse/federation/federation_client.py28
-rw-r--r--synapse/federation/federation_server.py76
-rw-r--r--synapse/federation/send_queue.py25
-rw-r--r--synapse/federation/transaction_queue.py112
-rw-r--r--synapse/federation/transport/client.py153
-rw-r--r--synapse/federation/transport/server.py63
-rw-r--r--synapse/federation/units.py2
8 files changed, 333 insertions, 147 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 79eaa31031..4cc98a3fe8 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -14,7 +14,10 @@
 # limitations under the License.
 import logging
 
-from synapse.api.errors import SynapseError
+import six
+
+from synapse.api.constants import MAX_DEPTH
+from synapse.api.errors import SynapseError, Codes
 from synapse.crypto.event_signing import check_event_content_hash
 from synapse.events import FrozenEvent
 from synapse.events.utils import prune_event
@@ -190,11 +193,23 @@ def event_from_pdu_json(pdu_json, outlier=False):
         FrozenEvent
 
     Raises:
-        SynapseError: if the pdu is missing required fields
+        SynapseError: if the pdu is missing required fields or is otherwise
+            not a valid matrix event
     """
     # we could probably enforce a bunch of other fields here (room_id, sender,
     # origin, etc etc)
-    assert_params_in_request(pdu_json, ('event_id', 'type'))
+    assert_params_in_request(pdu_json, ('event_id', 'type', 'depth'))
+
+    depth = pdu_json['depth']
+    if not isinstance(depth, six.integer_types):
+        raise SynapseError(400, "Depth %r not an intger" % (depth, ),
+                           Codes.BAD_JSON)
+
+    if depth < 0:
+        raise SynapseError(400, "Depth too small", Codes.BAD_JSON)
+    elif depth > MAX_DEPTH:
+        raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
+
     event = FrozenEvent(
         pdu_json
     )
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 38440da5b5..2761ffae07 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -19,6 +19,8 @@ import itertools
 import logging
 import random
 
+from six.moves import range
+
 from twisted.internet import defer
 
 from synapse.api.constants import Membership
@@ -30,20 +32,17 @@ from synapse.federation.federation_base import (
     FederationBase,
     event_from_pdu_json,
 )
-import synapse.metrics
 from synapse.util import logcontext, unwrapFirstError
 from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.logutils import log_function
 from synapse.util.retryutils import NotRetryingDestination
 
-logger = logging.getLogger(__name__)
-
+from prometheus_client import Counter
 
-# synapse.federation.federation_client is a silly name
-metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
+logger = logging.getLogger(__name__)
 
-sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
+sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"])
 
 
 PDU_RETRY_TIME_MS = 1 * 60 * 1000
@@ -106,7 +105,7 @@ class FederationClient(FederationBase):
             a Deferred which will eventually yield a JSON object from the
             response
         """
-        sent_queries_counter.inc(query_type)
+        sent_queries_counter.labels(query_type).inc()
 
         return self.transport_layer.make_query(
             destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail,
@@ -125,7 +124,7 @@ class FederationClient(FederationBase):
             a Deferred which will eventually yield a JSON object from the
             response
         """
-        sent_queries_counter.inc("client_device_keys")
+        sent_queries_counter.labels("client_device_keys").inc()
         return self.transport_layer.query_client_keys(
             destination, content, timeout
         )
@@ -135,7 +134,7 @@ class FederationClient(FederationBase):
         """Query the device keys for a list of user ids hosted on a remote
         server.
         """
-        sent_queries_counter.inc("user_devices")
+        sent_queries_counter.labels("user_devices").inc()
         return self.transport_layer.query_user_devices(
             destination, user_id, timeout
         )
@@ -152,7 +151,7 @@ class FederationClient(FederationBase):
             a Deferred which will eventually yield a JSON object from the
             response
         """
-        sent_queries_counter.inc("client_one_time_keys")
+        sent_queries_counter.labels("client_one_time_keys").inc()
         return self.transport_layer.claim_client_keys(
             destination, content, timeout
         )
@@ -394,7 +393,7 @@ class FederationClient(FederationBase):
             seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
             signed_events = seen_events.values()
         else:
-            seen_events = yield self.store.have_events(event_ids)
+            seen_events = yield self.store.have_seen_events(event_ids)
             signed_events = []
 
         failed_to_fetch = set()
@@ -413,11 +412,12 @@ class FederationClient(FederationBase):
 
         batch_size = 20
         missing_events = list(missing_events)
-        for i in xrange(0, len(missing_events), batch_size):
+        for i in range(0, len(missing_events), batch_size):
             batch = set(missing_events[i:i + batch_size])
 
             deferreds = [
-                preserve_fn(self.get_pdu)(
+                run_in_background(
+                    self.get_pdu,
                     destinations=random_server_list(),
                     event_id=e_id,
                 )
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index bea7fd0b71..2d420a58a2 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -26,30 +27,32 @@ from synapse.federation.federation_base import (
 
 from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Edu, Transaction
-import synapse.metrics
 from synapse.types import get_domain_from_id
 from synapse.util import async
 from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.util.logutils import log_function
 
+from prometheus_client import Counter
+
+from six import iteritems
+
 # when processing incoming transactions, we try to handle multiple rooms in
 # parallel, up to this limit.
 TRANSACTION_CONCURRENCY_LIMIT = 10
 
 logger = logging.getLogger(__name__)
 
-# synapse.federation.federation_server is a silly name
-metrics = synapse.metrics.get_metrics_for("synapse.federation.server")
+received_pdus_counter = Counter("synapse_federation_server_received_pdus", "")
 
-received_pdus_counter = metrics.register_counter("received_pdus")
+received_edus_counter = Counter("synapse_federation_server_received_edus", "")
 
-received_edus_counter = metrics.register_counter("received_edus")
-
-received_queries_counter = metrics.register_counter("received_queries", labels=["type"])
+received_queries_counter = Counter(
+    "synapse_federation_server_received_queries", "", ["type"]
+)
 
 
 class FederationServer(FederationBase):
+
     def __init__(self, hs):
         super(FederationServer, self).__init__(hs)
 
@@ -65,7 +68,7 @@ class FederationServer(FederationBase):
 
         # We cache responses to state queries, as they take a while and often
         # come in waves.
-        self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
+        self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000)
 
     @defer.inlineCallbacks
     @log_function
@@ -129,7 +132,7 @@ class FederationServer(FederationBase):
 
         logger.debug("[%s] Transaction is new", transaction.transaction_id)
 
-        received_pdus_counter.inc_by(len(transaction.pdus))
+        received_pdus_counter.inc(len(transaction.pdus))
 
         pdus_by_room = {}
 
@@ -212,16 +215,17 @@ class FederationServer(FederationBase):
         if not in_room:
             raise AuthError(403, "Host not in room.")
 
-        result = self._state_resp_cache.get((room_id, event_id))
-        if not result:
-            with (yield self._server_linearizer.queue((origin, room_id))):
-                d = self._state_resp_cache.set(
-                    (room_id, event_id),
-                    preserve_fn(self._on_context_state_request_compute)(room_id, event_id)
-                )
-                resp = yield make_deferred_yieldable(d)
-        else:
-            resp = yield make_deferred_yieldable(result)
+        # we grab the linearizer to protect ourselves from servers which hammer
+        # us. In theory we might already have the response to this query
+        # in the cache so we could return it without waiting for the linearizer
+        # - but that's non-trivial to get right, and anyway somewhat defeats
+        # the point of the linearizer.
+        with (yield self._server_linearizer.queue((origin, room_id))):
+            resp = yield self._state_resp_cache.wrap(
+                (room_id, event_id),
+                self._on_context_state_request_compute,
+                room_id, event_id,
+            )
 
         defer.returnValue((200, resp))
 
@@ -289,7 +293,7 @@ class FederationServer(FederationBase):
 
     @defer.inlineCallbacks
     def on_query_request(self, query_type, args):
-        received_queries_counter.inc(query_type)
+        received_queries_counter.labels(query_type).inc()
         resp = yield self.registry.on_query(query_type, args)
         defer.returnValue((200, resp))
 
@@ -425,9 +429,9 @@ class FederationServer(FederationBase):
             "Claimed one-time-keys: %s",
             ",".join((
                 "%s for %s:%s" % (key_id, user_id, device_id)
-                for user_id, user_keys in json_result.iteritems()
-                for device_id, device_keys in user_keys.iteritems()
-                for key_id, _ in device_keys.iteritems()
+                for user_id, user_keys in iteritems(json_result)
+                for device_id, device_keys in iteritems(user_keys)
+                for key_id, _ in iteritems(device_keys)
             )),
         )
 
@@ -494,13 +498,33 @@ class FederationServer(FederationBase):
     def _handle_received_pdu(self, origin, pdu):
         """ Process a PDU received in a federation /send/ transaction.
 
+        If the event is invalid, then this method throws a FederationError.
+        (The error will then be logged and sent back to the sender (which
+        probably won't do anything with it), and other events in the
+        transaction will be processed as normal).
+
+        It is likely that we'll then receive other events which refer to
+        this rejected_event in their prev_events, etc.  When that happens,
+        we'll attempt to fetch the rejected event again, which will presumably
+        fail, so those second-generation events will also get rejected.
+
+        Eventually, we get to the point where there are more than 10 events
+        between any new events and the original rejected event. Since we
+        only try to backfill 10 events deep on received pdu, we then accept the
+        new event, possibly introducing a discontinuity in the DAG, with new
+        forward extremities, so normal service is approximately returned,
+        until we try to backfill across the discontinuity.
+
         Args:
             origin (str): server which sent the pdu
             pdu (FrozenEvent): received pdu
 
         Returns (Deferred): completes with None
-        Raises: FederationError if the signatures / hash do not match
-    """
+
+        Raises: FederationError if the signatures / hash do not match, or
+            if the event was unacceptable for any other reason (eg, too large,
+            too many prev_events, couldn't find the prev_events)
+        """
         # check that it's actually being sent from a valid destination to
         # workaround bug #1753 in 0.18.5 and 0.18.6
         if origin != get_domain_from_id(pdu.event_id):
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 93e5acebc1..c7ed465617 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -33,17 +33,16 @@ from .units import Edu
 
 from synapse.storage.presence import UserPresenceState
 from synapse.util.metrics import Measure
-import synapse.metrics
+from synapse.metrics import LaterGauge
 
 from blist import sorteddict
 from collections import namedtuple
 
 import logging
 
-logger = logging.getLogger(__name__)
-
+from six import itervalues, iteritems
 
-metrics = synapse.metrics.get_metrics_for(__name__)
+logger = logging.getLogger(__name__)
 
 
 class FederationRemoteSendQueue(object):
@@ -75,10 +74,8 @@ class FederationRemoteSendQueue(object):
         # lambda binds to the queue rather than to the name of the queue which
         # changes. ARGH.
         def register(name, queue):
-            metrics.register_callback(
-                queue_name + "_size",
-                lambda: len(queue),
-            )
+            LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,),
+                       "", lambda: len(queue))
 
         for queue_name in [
             "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
@@ -122,7 +119,7 @@ class FederationRemoteSendQueue(object):
 
             user_ids = set(
                 user_id
-                for uids in self.presence_changed.itervalues()
+                for uids in itervalues(self.presence_changed)
                 for user_id in uids
             )
 
@@ -276,7 +273,7 @@ class FederationRemoteSendQueue(object):
         # stream position.
         keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
 
-        for ((destination, edu_key), pos) in keyed_edus.iteritems():
+        for ((destination, edu_key), pos) in iteritems(keyed_edus):
             rows.append((pos, KeyedEduRow(
                 key=edu_key,
                 edu=self.keyed_edu[(destination, edu_key)],
@@ -309,7 +306,7 @@ class FederationRemoteSendQueue(object):
         j = keys.bisect_right(to_token) + 1
         device_messages = {self.device_messages[k]: k for k in keys[i:j]}
 
-        for (destination, pos) in device_messages.iteritems():
+        for (destination, pos) in iteritems(device_messages):
             rows.append((pos, DeviceRow(
                 destination=destination,
             )))
@@ -528,19 +525,19 @@ def process_rows_for_federation(transaction_queue, rows):
     if buff.presence:
         transaction_queue.send_presence(buff.presence)
 
-    for destination, edu_map in buff.keyed_edus.iteritems():
+    for destination, edu_map in iteritems(buff.keyed_edus):
         for key, edu in edu_map.items():
             transaction_queue.send_edu(
                 edu.destination, edu.edu_type, edu.content, key=key,
             )
 
-    for destination, edu_list in buff.edus.iteritems():
+    for destination, edu_list in iteritems(buff.edus):
         for edu in edu_list:
             transaction_queue.send_edu(
                 edu.destination, edu.edu_type, edu.content, key=None,
             )
 
-    for destination, failure_list in buff.failures.iteritems():
+    for destination, failure_list in iteritems(buff.failures):
         for failure in failure_list:
             transaction_queue.send_failure(destination, failure)
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index a141ec9953..69312ec233 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -26,23 +26,23 @@ from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 from synapse.util.metrics import measure_func
 from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
 import synapse.metrics
+from synapse.metrics import LaterGauge
+from synapse.metrics import (
+    sent_edus_counter,
+    sent_transactions_counter,
+    events_processed_counter,
+)
+
+from prometheus_client import Counter
 
 import logging
 
 
 logger = logging.getLogger(__name__)
 
-metrics = synapse.metrics.get_metrics_for(__name__)
-
-client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
-sent_pdus_destination_dist = client_metrics.register_distribution(
-    "sent_pdu_destinations"
+sent_pdus_destination_dist = Counter(
+    "synapse_federation_transaction_queue_sent_pdu_destinations", ""
 )
-sent_edus_counter = client_metrics.register_counter("sent_edus")
-
-sent_transactions_counter = client_metrics.register_counter("sent_transactions")
-
-events_processed_counter = client_metrics.register_counter("events_processed")
 
 
 class TransactionQueue(object):
@@ -69,8 +69,10 @@ class TransactionQueue(object):
         # done
         self.pending_transactions = {}
 
-        metrics.register_callback(
-            "pending_destinations",
+        LaterGauge(
+            "synapse_federation_transaction_queue_pending_destinations",
+            "",
+            [],
             lambda: len(self.pending_transactions),
         )
 
@@ -94,12 +96,16 @@ class TransactionQueue(object):
         # Map of destination -> (edu_type, key) -> Edu
         self.pending_edus_keyed_by_dest = edus_keyed = {}
 
-        metrics.register_callback(
-            "pending_pdus",
+        LaterGauge(
+            "synapse_federation_transaction_queue_pending_pdus",
+            "",
+            [],
             lambda: sum(map(len, pdus.values())),
         )
-        metrics.register_callback(
-            "pending_edus",
+        LaterGauge(
+            "synapse_federation_transaction_queue_pending_edus",
+            "",
+            [],
             lambda: (
                 sum(map(len, edus.values()))
                 + sum(map(len, presence.values()))
@@ -169,7 +175,7 @@ class TransactionQueue(object):
             while True:
                 last_token = yield self.store.get_federation_out_pos("events")
                 next_token, events = yield self.store.get_all_new_events_stream(
-                    last_token, self._last_poked_id, limit=20,
+                    last_token, self._last_poked_id, limit=100,
                 )
 
                 logger.debug("Handling %s -> %s", last_token, next_token)
@@ -177,24 +183,33 @@ class TransactionQueue(object):
                 if not events and next_token >= self._last_poked_id:
                     break
 
-                for event in events:
+                @defer.inlineCallbacks
+                def handle_event(event):
                     # Only send events for this server.
                     send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
                     is_mine = self.is_mine_id(event.event_id)
                     if not is_mine and send_on_behalf_of is None:
-                        continue
-
-                    # Get the state from before the event.
-                    # We need to make sure that this is the state from before
-                    # the event and not from after it.
-                    # Otherwise if the last member on a server in a room is
-                    # banned then it won't receive the event because it won't
-                    # be in the room after the ban.
-                    destinations = yield self.state.get_current_hosts_in_room(
-                        event.room_id, latest_event_ids=[
-                            prev_id for prev_id, _ in event.prev_events
-                        ],
-                    )
+                        return
+
+                    try:
+                        # Get the state from before the event.
+                        # We need to make sure that this is the state from before
+                        # the event and not from after it.
+                        # Otherwise if the last member on a server in a room is
+                        # banned then it won't receive the event because it won't
+                        # be in the room after the ban.
+                        destinations = yield self.state.get_current_hosts_in_room(
+                            event.room_id, latest_event_ids=[
+                                prev_id for prev_id, _ in event.prev_events
+                            ],
+                        )
+                    except Exception:
+                        logger.exception(
+                            "Failed to calculate hosts in room for event: %s",
+                            event.event_id,
+                        )
+                        return
+
                     destinations = set(destinations)
 
                     if send_on_behalf_of is not None:
@@ -207,12 +222,41 @@ class TransactionQueue(object):
 
                     self._send_pdu(event, destinations)
 
-                events_processed_counter.inc_by(len(events))
+                @defer.inlineCallbacks
+                def handle_room_events(events):
+                    for event in events:
+                        yield handle_event(event)
+
+                events_by_room = {}
+                for event in events:
+                    events_by_room.setdefault(event.room_id, []).append(event)
+
+                yield logcontext.make_deferred_yieldable(defer.gatherResults(
+                    [
+                        logcontext.run_in_background(handle_room_events, evs)
+                        for evs in events_by_room.itervalues()
+                    ],
+                    consumeErrors=True
+                ))
 
                 yield self.store.update_federation_out_pos(
                     "events", next_token
                 )
 
+                if events:
+                    now = self.clock.time_msec()
+                    ts = yield self.store.get_received_ts(events[-1].event_id)
+
+                    synapse.metrics.event_processing_lag.labels(
+                        "federation_sender").set(now - ts)
+                    synapse.metrics.event_processing_last_ts.labels(
+                        "federation_sender").set(ts)
+
+                events_processed_counter.inc(len(events))
+
+                synapse.metrics.event_processing_positions.labels(
+                    "federation_sender").set(next_token)
+
         finally:
             self._is_processing = False
 
@@ -234,7 +278,7 @@ class TransactionQueue(object):
         if not destinations:
             return
 
-        sent_pdus_destination_dist.inc_by(len(destinations))
+        sent_pdus_destination_dist.inc(len(destinations))
 
         for destination in destinations:
             self.pending_pdus_by_dest.setdefault(destination, []).append(
@@ -282,6 +326,8 @@ class TransactionQueue(object):
                     break
 
                 yield self._process_presence_inner(states_map.values())
+        except Exception:
+            logger.exception("Error sending presence states to servers")
         finally:
             self._processing_pending_presence = False
 
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 5488e82985..6db8efa6dd 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -20,6 +21,7 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX
 from synapse.util.logutils import log_function
 
 import logging
+import urllib
 
 
 logger = logging.getLogger(__name__)
@@ -49,7 +51,7 @@ class TransportLayerClient(object):
         logger.debug("get_room_state dest=%s, room=%s",
                      destination, room_id)
 
-        path = PREFIX + "/state/%s/" % room_id
+        path = _create_path(PREFIX, "/state/%s/", room_id)
         return self.client.get_json(
             destination, path=path, args={"event_id": event_id},
         )
@@ -71,7 +73,7 @@ class TransportLayerClient(object):
         logger.debug("get_room_state_ids dest=%s, room=%s",
                      destination, room_id)
 
-        path = PREFIX + "/state_ids/%s/" % room_id
+        path = _create_path(PREFIX, "/state_ids/%s/", room_id)
         return self.client.get_json(
             destination, path=path, args={"event_id": event_id},
         )
@@ -93,7 +95,7 @@ class TransportLayerClient(object):
         logger.debug("get_pdu dest=%s, event_id=%s",
                      destination, event_id)
 
-        path = PREFIX + "/event/%s/" % (event_id, )
+        path = _create_path(PREFIX, "/event/%s/", event_id)
         return self.client.get_json(destination, path=path, timeout=timeout)
 
     @log_function
@@ -119,7 +121,7 @@ class TransportLayerClient(object):
             # TODO: raise?
             return
 
-        path = PREFIX + "/backfill/%s/" % (room_id,)
+        path = _create_path(PREFIX, "/backfill/%s/", room_id)
 
         args = {
             "v": event_tuples,
@@ -157,9 +159,11 @@ class TransportLayerClient(object):
         # generated by the json_data_callback.
         json_data = transaction.get_dict()
 
+        path = _create_path(PREFIX, "/send/%s/", transaction.transaction_id)
+
         response = yield self.client.put_json(
             transaction.destination,
-            path=PREFIX + "/send/%s/" % transaction.transaction_id,
+            path=path,
             data=json_data,
             json_data_callback=json_data_callback,
             long_retries=True,
@@ -177,7 +181,7 @@ class TransportLayerClient(object):
     @log_function
     def make_query(self, destination, query_type, args, retry_on_dns_fail,
                    ignore_backoff=False):
-        path = PREFIX + "/query/%s" % query_type
+        path = _create_path(PREFIX, "/query/%s", query_type)
 
         content = yield self.client.get_json(
             destination=destination,
@@ -222,7 +226,7 @@ class TransportLayerClient(object):
                 "make_membership_event called with membership='%s', must be one of %s" %
                 (membership, ",".join(valid_memberships))
             )
-        path = PREFIX + "/make_%s/%s/%s" % (membership, room_id, user_id)
+        path = _create_path(PREFIX, "/make_%s/%s/%s", membership, room_id, user_id)
 
         ignore_backoff = False
         retry_on_dns_fail = False
@@ -248,7 +252,7 @@ class TransportLayerClient(object):
     @defer.inlineCallbacks
     @log_function
     def send_join(self, destination, room_id, event_id, content):
-        path = PREFIX + "/send_join/%s/%s" % (room_id, event_id)
+        path = _create_path(PREFIX, "/send_join/%s/%s", room_id, event_id)
 
         response = yield self.client.put_json(
             destination=destination,
@@ -261,7 +265,7 @@ class TransportLayerClient(object):
     @defer.inlineCallbacks
     @log_function
     def send_leave(self, destination, room_id, event_id, content):
-        path = PREFIX + "/send_leave/%s/%s" % (room_id, event_id)
+        path = _create_path(PREFIX, "/send_leave/%s/%s", room_id, event_id)
 
         response = yield self.client.put_json(
             destination=destination,
@@ -280,7 +284,7 @@ class TransportLayerClient(object):
     @defer.inlineCallbacks
     @log_function
     def send_invite(self, destination, room_id, event_id, content):
-        path = PREFIX + "/invite/%s/%s" % (room_id, event_id)
+        path = _create_path(PREFIX, "/invite/%s/%s", room_id, event_id)
 
         response = yield self.client.put_json(
             destination=destination,
@@ -322,7 +326,7 @@ class TransportLayerClient(object):
     @defer.inlineCallbacks
     @log_function
     def exchange_third_party_invite(self, destination, room_id, event_dict):
-        path = PREFIX + "/exchange_third_party_invite/%s" % (room_id,)
+        path = _create_path(PREFIX, "/exchange_third_party_invite/%s", room_id,)
 
         response = yield self.client.put_json(
             destination=destination,
@@ -335,7 +339,7 @@ class TransportLayerClient(object):
     @defer.inlineCallbacks
     @log_function
     def get_event_auth(self, destination, room_id, event_id):
-        path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id)
+        path = _create_path(PREFIX, "/event_auth/%s/%s", room_id, event_id)
 
         content = yield self.client.get_json(
             destination=destination,
@@ -347,7 +351,7 @@ class TransportLayerClient(object):
     @defer.inlineCallbacks
     @log_function
     def send_query_auth(self, destination, room_id, event_id, content):
-        path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id)
+        path = _create_path(PREFIX, "/query_auth/%s/%s", room_id, event_id)
 
         content = yield self.client.post_json(
             destination=destination,
@@ -409,7 +413,7 @@ class TransportLayerClient(object):
         Returns:
             A dict containg the device keys.
         """
-        path = PREFIX + "/user/devices/" + user_id
+        path = _create_path(PREFIX, "/user/devices/%s", user_id)
 
         content = yield self.client.get_json(
             destination=destination,
@@ -459,7 +463,7 @@ class TransportLayerClient(object):
     @log_function
     def get_missing_events(self, destination, room_id, earliest_events,
                            latest_events, limit, min_depth, timeout):
-        path = PREFIX + "/get_missing_events/%s" % (room_id,)
+        path = _create_path(PREFIX, "/get_missing_events/%s", room_id,)
 
         content = yield self.client.post_json(
             destination=destination,
@@ -479,7 +483,7 @@ class TransportLayerClient(object):
     def get_group_profile(self, destination, group_id, requester_user_id):
         """Get a group profile
         """
-        path = PREFIX + "/groups/%s/profile" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/profile", group_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -498,7 +502,7 @@ class TransportLayerClient(object):
             requester_user_id (str)
             content (dict): The new profile of the group
         """
-        path = PREFIX + "/groups/%s/profile" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/profile", group_id,)
 
         return self.client.post_json(
             destination=destination,
@@ -512,7 +516,7 @@ class TransportLayerClient(object):
     def get_group_summary(self, destination, group_id, requester_user_id):
         """Get a group summary
         """
-        path = PREFIX + "/groups/%s/summary" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/summary", group_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -525,7 +529,7 @@ class TransportLayerClient(object):
     def get_rooms_in_group(self, destination, group_id, requester_user_id):
         """Get all rooms in a group
         """
-        path = PREFIX + "/groups/%s/rooms" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/rooms", group_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -538,7 +542,7 @@ class TransportLayerClient(object):
                           content):
         """Add a room to a group
         """
-        path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,)
+        path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,)
 
         return self.client.post_json(
             destination=destination,
@@ -552,7 +556,10 @@ class TransportLayerClient(object):
                              config_key, content):
         """Update room in group
         """
-        path = PREFIX + "/groups/%s/room/%s/config/%s" % (group_id, room_id, config_key,)
+        path = _create_path(
+            PREFIX, "/groups/%s/room/%s/config/%s",
+            group_id, room_id, config_key,
+        )
 
         return self.client.post_json(
             destination=destination,
@@ -565,7 +572,7 @@ class TransportLayerClient(object):
     def remove_room_from_group(self, destination, group_id, requester_user_id, room_id):
         """Remove a room from a group
         """
-        path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,)
+        path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,)
 
         return self.client.delete_json(
             destination=destination,
@@ -578,7 +585,7 @@ class TransportLayerClient(object):
     def get_users_in_group(self, destination, group_id, requester_user_id):
         """Get users in a group
         """
-        path = PREFIX + "/groups/%s/users" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/users", group_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -591,7 +598,7 @@ class TransportLayerClient(object):
     def get_invited_users_in_group(self, destination, group_id, requester_user_id):
         """Get users that have been invited to a group
         """
-        path = PREFIX + "/groups/%s/invited_users" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/invited_users", group_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -604,7 +611,23 @@ class TransportLayerClient(object):
     def accept_group_invite(self, destination, group_id, user_id, content):
         """Accept a group invite
         """
-        path = PREFIX + "/groups/%s/users/%s/accept_invite" % (group_id, user_id)
+        path = _create_path(
+            PREFIX, "/groups/%s/users/%s/accept_invite",
+            group_id, user_id,
+        )
+
+        return self.client.post_json(
+            destination=destination,
+            path=path,
+            data=content,
+            ignore_backoff=True,
+        )
+
+    @log_function
+    def join_group(self, destination, group_id, user_id, content):
+        """Attempts to join a group
+        """
+        path = _create_path(PREFIX, "/groups/%s/users/%s/join", group_id, user_id)
 
         return self.client.post_json(
             destination=destination,
@@ -617,7 +640,7 @@ class TransportLayerClient(object):
     def invite_to_group(self, destination, group_id, user_id, requester_user_id, content):
         """Invite a user to a group
         """
-        path = PREFIX + "/groups/%s/users/%s/invite" % (group_id, user_id)
+        path = _create_path(PREFIX, "/groups/%s/users/%s/invite", group_id, user_id)
 
         return self.client.post_json(
             destination=destination,
@@ -633,7 +656,7 @@ class TransportLayerClient(object):
         invited.
         """
 
-        path = PREFIX + "/groups/local/%s/users/%s/invite" % (group_id, user_id)
+        path = _create_path(PREFIX, "/groups/local/%s/users/%s/invite", group_id, user_id)
 
         return self.client.post_json(
             destination=destination,
@@ -647,7 +670,7 @@ class TransportLayerClient(object):
                                user_id, content):
         """Remove a user fron a group
         """
-        path = PREFIX + "/groups/%s/users/%s/remove" % (group_id, user_id)
+        path = _create_path(PREFIX, "/groups/%s/users/%s/remove", group_id, user_id)
 
         return self.client.post_json(
             destination=destination,
@@ -664,7 +687,7 @@ class TransportLayerClient(object):
         kicked from the group.
         """
 
-        path = PREFIX + "/groups/local/%s/users/%s/remove" % (group_id, user_id)
+        path = _create_path(PREFIX, "/groups/local/%s/users/%s/remove", group_id, user_id)
 
         return self.client.post_json(
             destination=destination,
@@ -679,7 +702,7 @@ class TransportLayerClient(object):
         the attestations
         """
 
-        path = PREFIX + "/groups/%s/renew_attestation/%s" % (group_id, user_id)
+        path = _create_path(PREFIX, "/groups/%s/renew_attestation/%s", group_id, user_id)
 
         return self.client.post_json(
             destination=destination,
@@ -694,11 +717,12 @@ class TransportLayerClient(object):
         """Update a room entry in a group summary
         """
         if category_id:
-            path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % (
+            path = _create_path(
+                PREFIX, "/groups/%s/summary/categories/%s/rooms/%s",
                 group_id, category_id, room_id,
             )
         else:
-            path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,)
+            path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,)
 
         return self.client.post_json(
             destination=destination,
@@ -714,11 +738,12 @@ class TransportLayerClient(object):
         """Delete a room entry in a group summary
         """
         if category_id:
-            path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % (
+            path = _create_path(
+                PREFIX + "/groups/%s/summary/categories/%s/rooms/%s",
                 group_id, category_id, room_id,
             )
         else:
-            path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,)
+            path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,)
 
         return self.client.delete_json(
             destination=destination,
@@ -731,7 +756,7 @@ class TransportLayerClient(object):
     def get_group_categories(self, destination, group_id, requester_user_id):
         """Get all categories in a group
         """
-        path = PREFIX + "/groups/%s/categories" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/categories", group_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -744,7 +769,7 @@ class TransportLayerClient(object):
     def get_group_category(self, destination, group_id, requester_user_id, category_id):
         """Get category info in a group
         """
-        path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
+        path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -758,7 +783,7 @@ class TransportLayerClient(object):
                               content):
         """Update a category in a group
         """
-        path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
+        path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
 
         return self.client.post_json(
             destination=destination,
@@ -773,7 +798,7 @@ class TransportLayerClient(object):
                               category_id):
         """Delete a category in a group
         """
-        path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
+        path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
 
         return self.client.delete_json(
             destination=destination,
@@ -786,7 +811,7 @@ class TransportLayerClient(object):
     def get_group_roles(self, destination, group_id, requester_user_id):
         """Get all roles in a group
         """
-        path = PREFIX + "/groups/%s/roles" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/roles", group_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -799,7 +824,7 @@ class TransportLayerClient(object):
     def get_group_role(self, destination, group_id, requester_user_id, role_id):
         """Get a roles info
         """
-        path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
+        path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -813,7 +838,7 @@ class TransportLayerClient(object):
                           content):
         """Update a role in a group
         """
-        path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
+        path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
 
         return self.client.post_json(
             destination=destination,
@@ -827,7 +852,7 @@ class TransportLayerClient(object):
     def delete_group_role(self, destination, group_id, requester_user_id, role_id):
         """Delete a role in a group
         """
-        path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
+        path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
 
         return self.client.delete_json(
             destination=destination,
@@ -842,11 +867,12 @@ class TransportLayerClient(object):
         """Update a users entry in a group
         """
         if role_id:
-            path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % (
+            path = _create_path(
+                PREFIX, "/groups/%s/summary/roles/%s/users/%s",
                 group_id, role_id, user_id,
             )
         else:
-            path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,)
+            path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,)
 
         return self.client.post_json(
             destination=destination,
@@ -857,16 +883,32 @@ class TransportLayerClient(object):
         )
 
     @log_function
+    def set_group_join_policy(self, destination, group_id, requester_user_id,
+                              content):
+        """Sets the join policy for a group
+        """
+        path = _create_path(PREFIX, "/groups/%s/settings/m.join_policy", group_id,)
+
+        return self.client.put_json(
+            destination=destination,
+            path=path,
+            args={"requester_user_id": requester_user_id},
+            data=content,
+            ignore_backoff=True,
+        )
+
+    @log_function
     def delete_group_summary_user(self, destination, group_id, requester_user_id,
                                   user_id, role_id):
         """Delete a users entry in a group
         """
         if role_id:
-            path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % (
+            path = _create_path(
+                PREFIX, "/groups/%s/summary/roles/%s/users/%s",
                 group_id, role_id, user_id,
             )
         else:
-            path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,)
+            path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,)
 
         return self.client.delete_json(
             destination=destination,
@@ -889,3 +931,22 @@ class TransportLayerClient(object):
             data=content,
             ignore_backoff=True,
         )
+
+
+def _create_path(prefix, path, *args):
+    """Creates a path from the prefix, path template and args. Ensures that
+    all args are url encoded.
+
+    Example:
+
+        _create_path(PREFIX, "/event/%s/", event_id)
+
+    Args:
+        prefix (str)
+        path (str): String template for the path
+        args: ([str]): Args to insert into path. Each arg will be url encoded
+
+    Returns:
+        str
+    """
+    return prefix + path % tuple(urllib.quote(arg, "") for arg in args)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a66a6b0692..19d09f5422 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -24,7 +25,7 @@ from synapse.http.servlet import (
 )
 from synapse.util.ratelimitutils import FederationRateLimiter
 from synapse.util.versionstring import get_version_string
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
 from synapse.types import ThirdPartyInstanceID, get_domain_from_id
 
 import functools
@@ -93,12 +94,6 @@ class Authenticator(object):
             "signatures": {},
         }
 
-        if (
-            self.federation_domain_whitelist is not None and
-            self.server_name not in self.federation_domain_whitelist
-        ):
-            raise FederationDeniedError(self.server_name)
-
         if content is not None:
             json_request["content"] = content
 
@@ -137,6 +132,12 @@ class Authenticator(object):
                 json_request["origin"] = origin
                 json_request["signatures"].setdefault(origin, {})[key] = sig
 
+        if (
+            self.federation_domain_whitelist is not None and
+            origin not in self.federation_domain_whitelist
+        ):
+            raise FederationDeniedError(origin)
+
         if not json_request["signatures"]:
             raise NoAuthenticationError(
                 401, "Missing Authorization headers", Codes.UNAUTHORIZED,
@@ -151,11 +152,18 @@ class Authenticator(object):
         # alive
         retry_timings = yield self.store.get_destination_retry_timings(origin)
         if retry_timings and retry_timings["retry_last_ts"]:
-            logger.info("Marking origin %r as up", origin)
-            preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0)
+            run_in_background(self._reset_retry_timings, origin)
 
         defer.returnValue(origin)
 
+    @defer.inlineCallbacks
+    def _reset_retry_timings(self, origin):
+        try:
+            logger.info("Marking origin %r as up", origin)
+            yield self.store.set_destination_retry_timings(origin, 0, 0)
+        except Exception:
+            logger.exception("Error resetting retry timings on %s", origin)
+
 
 class BaseFederationServlet(object):
     REQUIRE_AUTH = True
@@ -802,6 +810,23 @@ class FederationGroupsAcceptInviteServlet(BaseFederationServlet):
         defer.returnValue((200, new_content))
 
 
+class FederationGroupsJoinServlet(BaseFederationServlet):
+    """Attempt to join a group
+    """
+    PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join$"
+
+    @defer.inlineCallbacks
+    def on_POST(self, origin, content, query, group_id, user_id):
+        if get_domain_from_id(user_id) != origin:
+            raise SynapseError(403, "user_id doesn't match origin")
+
+        new_content = yield self.handler.join_group(
+            group_id, user_id, content,
+        )
+
+        defer.returnValue((200, new_content))
+
+
 class FederationGroupsRemoveUserServlet(BaseFederationServlet):
     """Leave or kick a user from the group
     """
@@ -1124,6 +1149,24 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet):
         defer.returnValue((200, resp))
 
 
+class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
+    """Sets whether a group is joinable without an invite or knock
+    """
+    PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy$"
+
+    @defer.inlineCallbacks
+    def on_PUT(self, origin, content, query, group_id):
+        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        if get_domain_from_id(requester_user_id) != origin:
+            raise SynapseError(403, "requester_user_id doesn't match origin")
+
+        new_content = yield self.handler.set_group_join_policy(
+            group_id, requester_user_id, content
+        )
+
+        defer.returnValue((200, new_content))
+
+
 FEDERATION_SERVLET_CLASSES = (
     FederationSendServlet,
     FederationPullServlet,
@@ -1163,6 +1206,7 @@ GROUP_SERVER_SERVLET_CLASSES = (
     FederationGroupsInvitedUsersServlet,
     FederationGroupsInviteServlet,
     FederationGroupsAcceptInviteServlet,
+    FederationGroupsJoinServlet,
     FederationGroupsRemoveUserServlet,
     FederationGroupsSummaryRoomsServlet,
     FederationGroupsCategoriesServlet,
@@ -1172,6 +1216,7 @@ GROUP_SERVER_SERVLET_CLASSES = (
     FederationGroupsSummaryUsersServlet,
     FederationGroupsAddRoomsServlet,
     FederationGroupsAddRoomsConfigServlet,
+    FederationGroupsSettingJoinPolicyServlet,
 )
 
 
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 3f645acc43..01c5b8fe17 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -74,8 +74,6 @@ class Transaction(JsonEncodedObject):
         "previous_ids",
         "pdus",
         "edus",
-        "transaction_id",
-        "destination",
         "pdu_failures",
     ]