summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-03-04 11:54:58 +0000
committerErik Johnston <erik@matrix.org>2019-03-04 11:54:58 +0000
commitfbc047f2a5f12ee934e5ccbe7274100aa72166b5 (patch)
tree2eabc4f13032883ff61fc635d0be43292a5ad131 /synapse
parentUpdate newsfile to have a full stop (diff)
parentUpdate test_typing to use HomeserverTestCase. (#4771) (diff)
downloadsynapse-fbc047f2a5f12ee934e5ccbe7274100aa72166b5.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/stop_fed_not_in_room
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/app/client_reader.py2
-rw-r--r--synapse/app/federation_reader.py6
-rw-r--r--synapse/app/frontend_proxy.py15
-rwxr-xr-xsynapse/app/homeserver.py3
-rw-r--r--synapse/config/captcha.py2
-rw-r--r--synapse/config/tls.py10
-rw-r--r--synapse/crypto/keyring.py72
-rw-r--r--synapse/federation/federation_client.py18
-rw-r--r--synapse/federation/federation_server.py18
-rw-r--r--synapse/federation/transport/server.py7
-rw-r--r--synapse/groups/groups_server.py6
-rw-r--r--synapse/handlers/federation.py34
-rw-r--r--synapse/handlers/message.py7
-rw-r--r--synapse/handlers/pagination.py8
-rw-r--r--synapse/handlers/receipts.py68
-rw-r--r--synapse/handlers/register.py4
-rw-r--r--synapse/handlers/room_list.py68
-rw-r--r--synapse/http/federation/matrix_federation_agent.py6
-rw-r--r--synapse/http/server.py8
-rw-r--r--synapse/push/httppusher.py38
-rw-r--r--synapse/push/pusher.py2
-rw-r--r--synapse/push/pusherpool.py18
-rw-r--r--synapse/replication/slave/storage/_base.py7
-rw-r--r--synapse/replication/slave/storage/presence.py7
-rw-r--r--synapse/replication/tcp/client.py22
-rw-r--r--synapse/replication/tcp/commands.py5
-rw-r--r--synapse/replication/tcp/protocol.py34
-rw-r--r--synapse/rest/client/v2_alpha/auth.py2
-rw-r--r--synapse/rest/media/v1/_base.py96
-rw-r--r--synapse/server.pyi2
-rw-r--r--synapse/static/client/register/index.html2
-rw-r--r--synapse/storage/_base.py63
-rw-r--r--synapse/storage/engines/postgres.py25
-rw-r--r--synapse/storage/engines/sqlite.py9
-rw-r--r--synapse/storage/registration.py66
36 files changed, 571 insertions, 191 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 2004375f98..25c10244d3 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -27,4 +27,4 @@ try:
 except ImportError:
     pass
 
-__version__ = "0.99.1.1"
+__version__ = "0.99.2"
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 043b48f8f3..5070094cad 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -48,6 +48,7 @@ from synapse.rest.client.v1.room import (
     RoomMemberListRestServlet,
     RoomStateRestServlet,
 )
+from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
 from synapse.rest.client.v2_alpha.register import RegisterRestServlet
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
@@ -96,6 +97,7 @@ class ClientReaderServer(HomeServer):
                     RoomEventContextServlet(self).register(resource)
                     RegisterRestServlet(self).register(resource)
                     LoginRestServlet(self).register(resource)
+                    ThreepidRestServlet(self).register(resource)
 
                     resources.update({
                         "/_matrix/client/r0": resource,
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index b116c17669..7da79dc827 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -21,7 +21,7 @@ from twisted.web.resource import NoResource
 
 import synapse
 from synapse import events
-from synapse.api.urls import FEDERATION_PREFIX
+from synapse.api.urls import FEDERATION_PREFIX, SERVER_KEY_V2_PREFIX
 from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
@@ -44,6 +44,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.key.v2 import KeyApiV2Resource
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
@@ -99,6 +100,9 @@ class FederationReaderServer(HomeServer):
                         ),
                     })
 
+                if name in ["keys", "federation"]:
+                    resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
+
         root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index d5b954361d..8479fee738 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -21,7 +21,7 @@ from twisted.web.resource import NoResource
 
 import synapse
 from synapse import events
-from synapse.api.errors import SynapseError
+from synapse.api.errors import HttpResponseException, SynapseError
 from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
@@ -66,10 +66,15 @@ class PresenceStatusStubServlet(ClientV1RestServlet):
         headers = {
             "Authorization": auth_headers,
         }
-        result = yield self.http_client.get_json(
-            self.main_uri + request.uri.decode('ascii'),
-            headers=headers,
-        )
+
+        try:
+            result = yield self.http_client.get_json(
+                self.main_uri + request.uri.decode('ascii'),
+                headers=headers,
+            )
+        except HttpResponseException as e:
+            raise e.to_synapse_error()
+
         defer.returnValue((200, result))
 
     @defer.inlineCallbacks
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 05a97979ec..e8b6cc3114 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -555,6 +555,9 @@ def run(hs):
                 stats["memory_rss"] += process.memory_info().rss
                 stats["cpu_average"] += int(process.cpu_percent(interval=None))
 
+        stats["database_engine"] = hs.get_datastore().database_engine_name
+        stats["database_server_version"] = hs.get_datastore().get_server_version()
+
         logger.info("Reporting stats to matrix.org: %s" % (stats,))
         try:
             yield hs.get_simple_http_client().put_json(
diff --git a/synapse/config/captcha.py b/synapse/config/captcha.py
index 4064891ffb..d25196be08 100644
--- a/synapse/config/captcha.py
+++ b/synapse/config/captcha.py
@@ -47,5 +47,5 @@ class CaptchaConfig(Config):
         #captcha_bypass_secret: "YOUR_SECRET_HERE"
 
         # The API endpoint to use for verifying m.login.recaptcha responses.
-        recaptcha_siteverify_api: "https://www.google.com/recaptcha/api/siteverify"
+        recaptcha_siteverify_api: "https://www.recaptcha.net/recaptcha/api/siteverify"
         """
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index 8d5d287357..40045de7ac 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -19,6 +19,8 @@ import warnings
 from datetime import datetime
 from hashlib import sha256
 
+import six
+
 from unpaddedbase64 import encode_base64
 
 from OpenSSL import crypto
@@ -36,9 +38,11 @@ class TlsConfig(Config):
             acme_config = {}
 
         self.acme_enabled = acme_config.get("enabled", False)
-        self.acme_url = acme_config.get(
+
+        # hyperlink complains on py2 if this is not a Unicode
+        self.acme_url = six.text_type(acme_config.get(
             "url", u"https://acme-v01.api.letsencrypt.org/directory"
-        )
+        ))
         self.acme_port = acme_config.get("port", 80)
         self.acme_bind_addresses = acme_config.get("bind_addresses", ['::', '0.0.0.0'])
         self.acme_reprovision_threshold = acme_config.get("reprovision_threshold", 30)
@@ -55,7 +59,7 @@ class TlsConfig(Config):
                 )
             if not self.tls_private_key_file:
                 raise ConfigError(
-                    "tls_certificate_path must be specified if TLS-enabled listeners are "
+                    "tls_private_key_path must be specified if TLS-enabled listeners are "
                     "configured."
                 )
 
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index cce40fdd2d..7474fd515f 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -17,6 +17,7 @@
 import logging
 from collections import namedtuple
 
+from six import raise_from
 from six.moves import urllib
 
 from signedjson.key import (
@@ -35,7 +36,12 @@ from unpaddedbase64 import decode_base64
 
 from twisted.internet import defer
 
-from synapse.api.errors import Codes, RequestSendFailed, SynapseError
+from synapse.api.errors import (
+    Codes,
+    HttpResponseException,
+    RequestSendFailed,
+    SynapseError,
+)
 from synapse.util import logcontext, unwrapFirstError
 from synapse.util.logcontext import (
     LoggingContext,
@@ -44,6 +50,7 @@ from synapse.util.logcontext import (
     run_in_background,
 )
 from synapse.util.metrics import Measure
+from synapse.util.retryutils import NotRetryingDestination
 
 logger = logging.getLogger(__name__)
 
@@ -367,13 +374,18 @@ class Keyring(object):
                     server_name_and_key_ids, perspective_name, perspective_keys
                 )
                 defer.returnValue(result)
+            except KeyLookupError as e:
+                logger.warning(
+                    "Key lookup failed from %r: %s", perspective_name, e,
+                )
             except Exception as e:
                 logger.exception(
                     "Unable to get key from %r: %s %s",
                     perspective_name,
                     type(e).__name__, str(e),
                 )
-                defer.returnValue({})
+
+            defer.returnValue({})
 
         results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
@@ -421,21 +433,30 @@ class Keyring(object):
         # TODO(mark): Set the minimum_valid_until_ts to that needed by
         # the events being validated or the current time if validating
         # an incoming request.
-        query_response = yield self.client.post_json(
-            destination=perspective_name,
-            path="/_matrix/key/v2/query",
-            data={
-                u"server_keys": {
-                    server_name: {
-                        key_id: {
-                            u"minimum_valid_until_ts": 0
-                        } for key_id in key_ids
+        try:
+            query_response = yield self.client.post_json(
+                destination=perspective_name,
+                path="/_matrix/key/v2/query",
+                data={
+                    u"server_keys": {
+                        server_name: {
+                            key_id: {
+                                u"minimum_valid_until_ts": 0
+                            } for key_id in key_ids
+                        }
+                        for server_name, key_ids in server_names_and_key_ids
                     }
-                    for server_name, key_ids in server_names_and_key_ids
-                }
-            },
-            long_retries=True,
-        )
+                },
+                long_retries=True,
+            )
+        except (NotRetryingDestination, RequestSendFailed) as e:
+            raise_from(
+                KeyLookupError("Failed to connect to remote server"), e,
+            )
+        except HttpResponseException as e:
+            raise_from(
+                KeyLookupError("Remote server returned an error"), e,
+            )
 
         keys = {}
 
@@ -502,11 +523,20 @@ class Keyring(object):
             if requested_key_id in keys:
                 continue
 
-            response = yield self.client.get_json(
-                destination=server_name,
-                path="/_matrix/key/v2/server/" + urllib.parse.quote(requested_key_id),
-                ignore_backoff=True,
-            )
+            try:
+                response = yield self.client.get_json(
+                    destination=server_name,
+                    path="/_matrix/key/v2/server/" + urllib.parse.quote(requested_key_id),
+                    ignore_backoff=True,
+                )
+            except (NotRetryingDestination, RequestSendFailed) as e:
+                raise_from(
+                    KeyLookupError("Failed to connect to remote server"), e,
+                )
+            except HttpResponseException as e:
+                raise_from(
+                    KeyLookupError("Remote server returned an error"), e,
+                )
 
             if (u"signatures" not in response
                     or server_name not in response[u"signatures"]):
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 4e4f58b418..58e04d81ab 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -33,6 +33,7 @@ from synapse.api.constants import (
 )
 from synapse.api.errors import (
     CodeMessageException,
+    Codes,
     FederationDeniedError,
     HttpResponseException,
     SynapseError,
@@ -792,10 +793,25 @@ class FederationClient(FederationBase):
             defer.returnValue(content)
         except HttpResponseException as e:
             if e.code in [400, 404]:
+                err = e.to_synapse_error()
+
+                # If we receive an error response that isn't a generic error, we
+                # assume that the remote understands the v2 invite API and this
+                # is a legitimate error.
+                if err.errcode != Codes.UNKNOWN:
+                    raise err
+
+                # Otherwise, we assume that the remote server doesn't understand
+                # the v2 invite API.
+
                 if room_version in (RoomVersions.V1, RoomVersions.V2):
                     pass  # We'll fall through
                 else:
-                    raise Exception("Remote server is too old")
+                    raise SynapseError(
+                        400,
+                        "User's homeserver does not support this room version",
+                        Codes.UNSUPPORTED_ROOM_VERSION,
+                    )
             elif e.code == 403:
                 raise e.to_synapse_error()
             else:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 3da86d4ba6..81f3b4b1ff 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -25,9 +25,10 @@ from twisted.internet import defer
 from twisted.internet.abstract import isIPAddress
 from twisted.python import failure
 
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
 from synapse.api.errors import (
     AuthError,
+    Codes,
     FederationError,
     IncompatibleRoomVersionError,
     NotFoundError,
@@ -239,8 +240,9 @@ class FederationServer(FederationBase):
                         f = failure.Failure()
                         pdu_results[event_id] = {"error": str(e)}
                         logger.error(
-                            "Failed to handle PDU %s: %s",
-                            event_id, f.getTraceback().rstrip(),
+                            "Failed to handle PDU %s",
+                            event_id,
+                            exc_info=(f.type, f.value, f.getTracebackObject()),
                         )
 
         yield concurrently_execute(
@@ -386,6 +388,13 @@ class FederationServer(FederationBase):
 
     @defer.inlineCallbacks
     def on_invite_request(self, origin, content, room_version):
+        if room_version not in KNOWN_ROOM_VERSIONS:
+            raise SynapseError(
+                400,
+                "Homeserver does not support this room version",
+                Codes.UNSUPPORTED_ROOM_VERSION,
+            )
+
         format_ver = room_version_to_event_format(room_version)
 
         pdu = event_from_pdu_json(content, format_ver)
@@ -877,6 +886,9 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
     def on_edu(self, edu_type, origin, content):
         """Overrides FederationHandlerRegistry
         """
+        if not self.config.use_presence and edu_type == "m.presence":
+            return
+
         handler = self.edu_handlers.get(edu_type)
         if handler:
             return super(ReplicationFederationHandlerRegistry, self).on_edu(
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a2396ab466..ebb81be377 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -393,7 +393,7 @@ class FederationStateServlet(BaseFederationServlet):
         return self.handler.on_context_state_request(
             origin,
             context,
-            parse_string_from_args(query, "event_id", None),
+            parse_string_from_args(query, "event_id", None, required=True),
         )
 
 
@@ -404,7 +404,7 @@ class FederationStateIdsServlet(BaseFederationServlet):
         return self.handler.on_state_ids_request(
             origin,
             room_id,
-            parse_string_from_args(query, "event_id", None),
+            parse_string_from_args(query, "event_id", None, required=True),
         )
 
 
@@ -736,7 +736,8 @@ class PublicRoomList(BaseFederationServlet):
 
         data = yield self.handler.get_local_public_room_list(
             limit, since_token,
-            network_tuple=network_tuple
+            network_tuple=network_tuple,
+            from_federation=True,
         )
         defer.returnValue((200, data))
 
diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py
index 633c865ed8..a7eaead56b 100644
--- a/synapse/groups/groups_server.py
+++ b/synapse/groups/groups_server.py
@@ -113,8 +113,7 @@ class GroupsServerHandler(object):
             room_id = room_entry["room_id"]
             joined_users = yield self.store.get_users_in_room(room_id)
             entry = yield self.room_list_handler.generate_room_entry(
-                room_id, len(joined_users),
-                with_alias=False, allow_private=True,
+                room_id, len(joined_users), with_alias=False, allow_private=True,
             )
             entry = dict(entry)  # so we don't change whats cached
             entry.pop("room_id", None)
@@ -544,8 +543,7 @@ class GroupsServerHandler(object):
 
             joined_users = yield self.store.get_users_in_room(room_id)
             entry = yield self.room_list_handler.generate_room_entry(
-                room_id, len(joined_users),
-                with_alias=False, allow_private=True,
+                room_id, len(joined_users), with_alias=False, allow_private=True,
             )
 
             if not entry:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index de839ca527..0425380e55 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -770,10 +770,26 @@ class FederationHandler(BaseHandler):
             set(auth_events.keys()) | set(state_events.keys())
         )
 
+        # We now have a chunk of events plus associated state and auth chain to
+        # persist. We do the persistence in two steps:
+        #   1. Auth events and state get persisted as outliers, plus the
+        #      backward extremities get persisted (as non-outliers).
+        #   2. The rest of the events in the chunk get persisted one by one, as
+        #      each one depends on the previous event for its state.
+        #
+        # The important thing is that events in the chunk get persisted as
+        # non-outliers, including when those events are also in the state or
+        # auth chain. Caution must therefore be taken to ensure that they are
+        # not accidentally marked as outliers.
+
+        # Step 1a: persist auth events that *don't* appear in the chunk
         ev_infos = []
         for a in auth_events.values():
-            if a.event_id in seen_events:
+            # We only want to persist auth events as outliers that we haven't
+            # seen and aren't about to persist as part of the backfilled chunk.
+            if a.event_id in seen_events or a.event_id in event_map:
                 continue
+
             a.internal_metadata.outlier = True
             ev_infos.append({
                 "event": a,
@@ -785,14 +801,21 @@ class FederationHandler(BaseHandler):
                 }
             })
 
+        # Step 1b: persist the events in the chunk we fetched state for (i.e.
+        # the backwards extremities) as non-outliers.
         for e_id in events_to_state:
+            # For paranoia we ensure that these events are marked as
+            # non-outliers
+            ev = event_map[e_id]
+            assert(not ev.internal_metadata.is_outlier())
+
             ev_infos.append({
-                "event": event_map[e_id],
+                "event": ev,
                 "state": events_to_state[e_id],
                 "auth_events": {
                     (auth_events[a_id].type, auth_events[a_id].state_key):
                     auth_events[a_id]
-                    for a_id in event_map[e_id].auth_event_ids()
+                    for a_id in ev.auth_event_ids()
                     if a_id in auth_events
                 }
             })
@@ -802,12 +825,17 @@ class FederationHandler(BaseHandler):
             backfilled=True,
         )
 
+        # Step 2: Persist the rest of the events in the chunk one by one
         events.sort(key=lambda e: e.depth)
 
         for event in events:
             if event in events_to_state:
                 continue
 
+            # For paranoia we ensure that these events are marked as
+            # non-outliers
+            assert(not event.internal_metadata.is_outlier())
+
             # We store these one at a time since each event depends on the
             # previous to work out the state.
             # TODO: We can probably do something more clever here.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3981fe69ce..c762b58902 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -436,10 +436,11 @@ class EventCreationHandler(object):
 
         if event.is_state():
             prev_state = yield self.deduplicate_state_event(event, context)
-            logger.info(
-                "Not bothering to persist duplicate state event %s", event.event_id,
-            )
             if prev_state is not None:
+                logger.info(
+                    "Not bothering to persist state event %s duplicated by %s",
+                    event.event_id, prev_state.event_id,
+                )
                 defer.returnValue(prev_state)
 
         yield self.handle_new_client_event(
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 9d257ecf31..e4fdae9266 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -136,7 +136,11 @@ class PaginationHandler(object):
             logger.info("[purge] complete")
             self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
         except Exception:
-            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+            f = Failure()
+            logger.error(
+                "[purge] failed",
+                exc_info=(f.type, f.value, f.getTracebackObject()),
+            )
             self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
         finally:
             self._purges_in_progress_by_room.discard(room_id)
@@ -254,7 +258,7 @@ class PaginationHandler(object):
             })
 
         state = None
-        if event_filter and event_filter.lazy_load_members():
+        if event_filter and event_filter.lazy_load_members() and len(events) > 0:
             # TODO: remove redundant members
 
             # FIXME: we also care about invite targets etc.
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 4c2690ba26..696469732c 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -16,8 +16,8 @@ import logging
 
 from twisted.internet import defer
 
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import get_domain_from_id
-from synapse.util import logcontext
 
 from ._base import BaseHandler
 
@@ -59,7 +59,9 @@ class ReceiptsHandler(BaseHandler):
         if is_new:
             # fire off a process in the background to send the receipt to
             # remote servers
-            self._push_remotes([receipt])
+            run_as_background_process(
+                'push_receipts_to_remotes', self._push_remotes, receipt
+            )
 
     @defer.inlineCallbacks
     def _received_remote_receipt(self, origin, content):
@@ -125,44 +127,42 @@ class ReceiptsHandler(BaseHandler):
 
         defer.returnValue(True)
 
-    @logcontext.preserve_fn   # caller should not yield on this
     @defer.inlineCallbacks
-    def _push_remotes(self, receipts):
-        """Given a list of receipts, works out which remote servers should be
+    def _push_remotes(self, receipt):
+        """Given a receipt, works out which remote servers should be
         poked and pokes them.
         """
         try:
-            # TODO: Some of this stuff should be coallesced.
-            for receipt in receipts:
-                room_id = receipt["room_id"]
-                receipt_type = receipt["receipt_type"]
-                user_id = receipt["user_id"]
-                event_ids = receipt["event_ids"]
-                data = receipt["data"]
-
-                users = yield self.state.get_current_user_in_room(room_id)
-                remotedomains = set(get_domain_from_id(u) for u in users)
-                remotedomains = remotedomains.copy()
-                remotedomains.discard(self.server_name)
-
-                logger.debug("Sending receipt to: %r", remotedomains)
-
-                for domain in remotedomains:
-                    self.federation.send_edu(
-                        destination=domain,
-                        edu_type="m.receipt",
-                        content={
-                            room_id: {
-                                receipt_type: {
-                                    user_id: {
-                                        "event_ids": event_ids,
-                                        "data": data,
-                                    }
+            # TODO: optimise this to move some of the work to the workers.
+            room_id = receipt["room_id"]
+            receipt_type = receipt["receipt_type"]
+            user_id = receipt["user_id"]
+            event_ids = receipt["event_ids"]
+            data = receipt["data"]
+
+            users = yield self.state.get_current_user_in_room(room_id)
+            remotedomains = set(get_domain_from_id(u) for u in users)
+            remotedomains = remotedomains.copy()
+            remotedomains.discard(self.server_name)
+
+            logger.debug("Sending receipt to: %r", remotedomains)
+
+            for domain in remotedomains:
+                self.federation.send_edu(
+                    destination=domain,
+                    edu_type="m.receipt",
+                    content={
+                        room_id: {
+                            receipt_type: {
+                                user_id: {
+                                    "event_ids": event_ids,
+                                    "data": data,
                                 }
-                            },
+                            }
                         },
-                        key=(room_id, receipt_type, user_id),
-                    )
+                    },
+                    key=(room_id, receipt_type, user_id),
+                )
         except Exception:
             logger.exception("Error pushing receipts to remote servers")
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 24a4cb5a83..c0e06929bd 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -460,7 +460,7 @@ class RegistrationHandler(BaseHandler):
         lines = response.split('\n')
         json = {
             "valid": lines[0] == 'true',
-            "error_url": "http://www.google.com/recaptcha/api/challenge?" +
+            "error_url": "http://www.recaptcha.net/recaptcha/api/challenge?" +
                          "error=%s" % lines[1]
         }
         defer.returnValue(json)
@@ -471,7 +471,7 @@ class RegistrationHandler(BaseHandler):
         Used only by c/s api v1
         """
         data = yield self.captcha_client.post_urlencoded_get_raw(
-            "http://www.google.com:80/recaptcha/api/verify",
+            "http://www.recaptcha.net:80/recaptcha/api/verify",
             args={
                 'privatekey': private_key,
                 'remoteip': ip_addr,
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 13e212d669..afa508d729 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -50,16 +50,17 @@ class RoomListHandler(BaseHandler):
 
     def get_local_public_room_list(self, limit=None, since_token=None,
                                    search_filter=None,
-                                   network_tuple=EMPTY_THIRD_PARTY_ID,):
+                                   network_tuple=EMPTY_THIRD_PARTY_ID,
+                                   from_federation=False):
         """Generate a local public room list.
 
         There are multiple different lists: the main one plus one per third
         party network. A client can ask for a specific list or to return all.
 
         Args:
-            limit (int)
-            since_token (str)
-            search_filter (dict)
+            limit (int|None)
+            since_token (str|None)
+            search_filter (dict|None)
             network_tuple (ThirdPartyInstanceID): Which public list to use.
                 This can be (None, None) to indicate the main list, or a particular
                 appservice and network id to use an appservice specific one.
@@ -87,14 +88,30 @@ class RoomListHandler(BaseHandler):
         return self.response_cache.wrap(
             key,
             self._get_public_room_list,
-            limit, since_token, network_tuple=network_tuple,
+            limit, since_token,
+            network_tuple=network_tuple, from_federation=from_federation,
         )
 
     @defer.inlineCallbacks
     def _get_public_room_list(self, limit=None, since_token=None,
                               search_filter=None,
                               network_tuple=EMPTY_THIRD_PARTY_ID,
+                              from_federation=False,
                               timeout=None,):
+        """Generate a public room list.
+        Args:
+            limit (int|None): Maximum amount of rooms to return.
+            since_token (str|None)
+            search_filter (dict|None): Dictionary to filter rooms by.
+            network_tuple (ThirdPartyInstanceID): Which public list to use.
+                This can be (None, None) to indicate the main list, or a particular
+                appservice and network id to use an appservice specific one.
+                Setting to None returns all public rooms across all lists.
+            from_federation (bool): Whether this request originated from a
+                federating server or a client. Used for room filtering.
+            timeout (int|None): Amount of seconds to wait for a response before
+                timing out.
+        """
         if since_token and since_token != "END":
             since_token = RoomListNextBatch.from_token(since_token)
         else:
@@ -217,7 +234,8 @@ class RoomListHandler(BaseHandler):
             yield concurrently_execute(
                 lambda r: self._append_room_entry_to_chunk(
                     r, rooms_to_num_joined[r],
-                    chunk, limit, search_filter
+                    chunk, limit, search_filter,
+                    from_federation=from_federation,
                 ),
                 batch, 5,
             )
@@ -288,23 +306,51 @@ class RoomListHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit,
-                                    search_filter):
+                                    search_filter, from_federation=False):
         """Generate the entry for a room in the public room list and append it
         to the `chunk` if it matches the search filter
+
+        Args:
+            room_id (str): The ID of the room.
+            num_joined_users (int): The number of joined users in the room.
+            chunk (list)
+            limit (int|None): Maximum amount of rooms to display. Function will
+                return if length of chunk is greater than limit + 1.
+            search_filter (dict|None)
+            from_federation (bool): Whether this request originated from a
+                federating server or a client. Used for room filtering.
         """
         if limit and len(chunk) > limit + 1:
             # We've already got enough, so lets just drop it.
             return
 
         result = yield self.generate_room_entry(room_id, num_joined_users)
+        if not result:
+            return
+
+        if from_federation and not result.get("m.federate", True):
+            # This is a room that other servers cannot join. Do not show them
+            # this room.
+            return
 
-        if result and _matches_room_entry(result, search_filter):
+        if _matches_room_entry(result, search_filter):
             chunk.append(result)
 
     @cachedInlineCallbacks(num_args=1, cache_context=True)
     def generate_room_entry(self, room_id, num_joined_users, cache_context,
                             with_alias=True, allow_private=False):
         """Returns the entry for a room
+
+        Args:
+            room_id (str): The room's ID.
+            num_joined_users (int): Number of users in the room.
+            cache_context: Information for cached responses.
+            with_alias (bool): Whether to return the room's aliases in the result.
+            allow_private (bool): Whether invite-only rooms should be shown.
+
+        Returns:
+            Deferred[dict|None]: Returns a room entry as a dictionary, or None if this
+            room was determined not to be shown publicly.
         """
         result = {
             "room_id": room_id,
@@ -318,6 +364,7 @@ class RoomListHandler(BaseHandler):
         event_map = yield self.store.get_events([
             event_id for key, event_id in iteritems(current_state_ids)
             if key[0] in (
+                EventTypes.Create,
                 EventTypes.JoinRules,
                 EventTypes.Name,
                 EventTypes.Topic,
@@ -334,12 +381,17 @@ class RoomListHandler(BaseHandler):
         }
 
         # Double check that this is actually a public room.
+
         join_rules_event = current_state.get((EventTypes.JoinRules, ""))
         if join_rules_event:
             join_rule = join_rules_event.content.get("join_rule", None)
             if not allow_private and join_rule and join_rule != JoinRules.PUBLIC:
                 defer.returnValue(None)
 
+        # Return whether this room is open to federation users or not
+        create_event = current_state.get((EventTypes.Create, ""))
+        result["m.federate"] = create_event.content.get("m.federate", True)
+
         if with_alias:
             aliases = yield self.store.get_aliases_for_room(
                 room_id, on_invalidate=cache_context.invalidate
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index 384d8a37a2..1334c630cc 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -68,9 +68,13 @@ class MatrixFederationAgent(object):
             TLS policy to use for fetching .well-known files. None to use a default
             (browser-like) implementation.
 
-        srv_resolver (SrvResolver|None):
+        _srv_resolver (SrvResolver|None):
             SRVResolver impl to use for looking up SRV records. None to use a default
             implementation.
+
+        _well_known_cache (TTLCache|None):
+            TTLCache impl for storing cached well-known lookups. None to use a default
+            implementation.
     """
 
     def __init__(
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 6c67a25a11..16fb7935da 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -169,18 +169,18 @@ def _return_html_error(f, request):
             )
         else:
             logger.error(
-                "Failed handle request %r: %s",
+                "Failed handle request %r",
                 request,
-                f.getTraceback().rstrip(),
+                exc_info=(f.type, f.value, f.getTracebackObject()),
             )
     else:
         code = http_client.INTERNAL_SERVER_ERROR
         msg = "Internal server error"
 
         logger.error(
-            "Failed handle request %r: %s",
+            "Failed handle request %r",
             request,
-            f.getTraceback().rstrip(),
+            exc_info=(f.type, f.value, f.getTracebackObject()),
         )
 
     body = HTML_ERROR_TEMPLATE.format(
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 98d8d9560b..e65f8c63d3 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -32,9 +32,25 @@ if six.PY3:
 
 logger = logging.getLogger(__name__)
 
-http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "")
+http_push_processed_counter = Counter(
+    "synapse_http_httppusher_http_pushes_processed",
+    "Number of push notifications successfully sent",
+)
 
-http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "")
+http_push_failed_counter = Counter(
+    "synapse_http_httppusher_http_pushes_failed",
+    "Number of push notifications which failed",
+)
+
+http_badges_processed_counter = Counter(
+    "synapse_http_httppusher_badge_updates_processed",
+    "Number of badge updates successfully sent",
+)
+
+http_badges_failed_counter = Counter(
+    "synapse_http_httppusher_badge_updates_failed",
+    "Number of badge updates which failed",
+)
 
 
 class HttpPusher(object):
@@ -81,6 +97,11 @@ class HttpPusher(object):
             pusherdict['pushkey'],
         )
 
+        if self.data is None:
+            raise PusherConfigException(
+                "data can not be null for HTTP pusher"
+            )
+
         if 'url' not in self.data:
             raise PusherConfigException(
                 "'url' required in data for HTTP pusher"
@@ -346,6 +367,10 @@ class HttpPusher(object):
 
     @defer.inlineCallbacks
     def _send_badge(self, badge):
+        """
+        Args:
+            badge (int): number of unread messages
+        """
         logger.info("Sending updated badge count %d to %s", badge, self.name)
         d = {
             'notification': {
@@ -366,14 +391,11 @@ class HttpPusher(object):
             }
         }
         try:
-            resp = yield self.http_client.post_json_get_json(self.url, d)
+            yield self.http_client.post_json_get_json(self.url, d)
+            http_badges_processed_counter.inc()
         except Exception as e:
             logger.warning(
                 "Failed to send badge count to %s: %s %s",
                 self.name, type(e), e,
             )
-            defer.returnValue(False)
-        rejected = []
-        if 'rejected' in resp:
-            rejected = resp['rejected']
-        defer.returnValue(rejected)
+            http_badges_failed_counter.inc()
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 368d5094be..b33f2a357b 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -56,7 +56,7 @@ class PusherFactory(object):
         f = self.pusher_types.get(kind, None)
         if not f:
             return None
-        logger.info("creating %s pusher for %r", kind, pusherdict)
+        logger.debug("creating %s pusher for %r", kind, pusherdict)
         return f(self.hs, pusherdict)
 
     def _create_email_pusher(self, _hs, pusherdict):
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 5a4e73ccd6..abf1a1a9c1 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,6 +19,7 @@ import logging
 from twisted.internet import defer
 
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.push import PusherConfigException
 from synapse.push.pusher import PusherFactory
 
 logger = logging.getLogger(__name__)
@@ -140,6 +141,10 @@ class PusherPool:
 
     @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_id, max_stream_id):
+        if not self.pushers:
+            # nothing to do here.
+            return
+
         try:
             users_affected = yield self.store.get_push_action_users_in_range(
                 min_stream_id, max_stream_id
@@ -155,6 +160,10 @@ class PusherPool:
 
     @defer.inlineCallbacks
     def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
+        if not self.pushers:
+            # nothing to do here.
+            return
+
         try:
             # Need to subtract 1 from the minimum because the lower bound here
             # is not inclusive
@@ -214,6 +223,15 @@ class PusherPool:
         """
         try:
             p = self.pusher_factory.create_pusher(pusherdict)
+        except PusherConfigException as e:
+            logger.warning(
+                "Pusher incorrectly configured user=%s, appid=%s, pushkey=%s: %s",
+                pusherdict.get('user_name'),
+                pusherdict.get('app_id'),
+                pusherdict.get('pushkey'),
+                e,
+            )
+            return
         except Exception:
             logger.exception("Couldn't start a pusher: caught Exception")
             return
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 1353a32d00..817d1f67f9 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -59,12 +59,7 @@ class BaseSlavedStore(SQLBaseStore):
                     members_changed = set(row.keys[1:])
                     self._invalidate_state_caches(room_id, members_changed)
                 else:
-                    try:
-                        getattr(self, row.cache_func).invalidate(tuple(row.keys))
-                    except AttributeError:
-                        # We probably haven't pulled in the cache in this worker,
-                        # which is fine.
-                        pass
+                    self._attempt_to_invalidate_cache(row.cache_func, tuple(row.keys))
 
     def _invalidate_cache_and_stream(self, txn, cache_func, keys):
         txn.call_after(cache_func.invalidate, keys)
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
index 92447b00d4..9e530defe0 100644
--- a/synapse/replication/slave/storage/presence.py
+++ b/synapse/replication/slave/storage/presence.py
@@ -54,8 +54,11 @@ class SlavedPresenceStore(BaseSlavedStore):
 
     def stream_positions(self):
         result = super(SlavedPresenceStore, self).stream_positions()
-        position = self._presence_id_gen.get_current_token()
-        result["presence"] = position
+
+        if self.hs.config.use_presence:
+            position = self._presence_id_gen.get_current_token()
+            result["presence"] = position
+
         return result
 
     def process_replication_rows(self, stream_name, token, rows):
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 586dddb40b..e558f90e1a 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -39,7 +39,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
     Accepts a handler that will be called when new data is available or data
     is required.
     """
-    maxDelay = 5  # Try at least once every N seconds
+    maxDelay = 30  # Try at least once every N seconds
 
     def __init__(self, hs, client_name, handler):
         self.client_name = client_name
@@ -54,7 +54,6 @@ class ReplicationClientFactory(ReconnectingClientFactory):
 
     def buildProtocol(self, addr):
         logger.info("Connected to replication: %r", addr)
-        self.resetDelay()
         return ClientReplicationStreamProtocol(
             self.client_name, self.server_name, self._clock, self.handler
         )
@@ -90,15 +89,18 @@ class ReplicationClientHandler(object):
         # Used for tests.
         self.awaiting_syncs = {}
 
+        # The factory used to create connections.
+        self.factory = None
+
     def start_replication(self, hs):
         """Helper method to start a replication connection to the remote server
         using TCP.
         """
         client_name = hs.config.worker_name
-        factory = ReplicationClientFactory(hs, client_name, self)
+        self.factory = ReplicationClientFactory(hs, client_name, self)
         host = hs.config.worker_replication_host
         port = hs.config.worker_replication_port
-        hs.get_reactor().connectTCP(host, port, factory)
+        hs.get_reactor().connectTCP(host, port, self.factory)
 
     def on_rdata(self, stream_name, token, rows):
         """Called when we get new replication data. By default this just pokes
@@ -140,6 +142,7 @@ class ReplicationClientHandler(object):
             args["account_data"] = user_account_data
         elif room_account_data:
             args["account_data"] = room_account_data
+
         return args
 
     def get_currently_syncing_users(self):
@@ -204,3 +207,14 @@ class ReplicationClientHandler(object):
             for cmd in self.pending_commands:
                 connection.send_command(cmd)
             self.pending_commands = []
+
+    def finished_connecting(self):
+        """Called when we have successfully subscribed and caught up to all
+        streams we're interested in.
+        """
+        logger.info("Finished connecting to server")
+
+        # We don't reset the delay any earlier as otherwise if there is a
+        # problem during start up we'll end up tight looping connecting to the
+        # server.
+        self.factory.resetDelay()
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 327556f6a1..2098c32a77 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -127,8 +127,11 @@ class RdataCommand(Command):
 
 
 class PositionCommand(Command):
-    """Sent by the client to tell the client the stream postition without
+    """Sent by the server to tell the client the stream postition without
     needing to send an RDATA.
+
+    Sent to the client after all missing updates for a stream have been sent
+    to the client and they're now up to date.
     """
     NAME = "POSITION"
 
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 0b3fe6cbf5..49ae5b3355 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -268,7 +268,17 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
         if "\n" in string:
             raise Exception("Unexpected newline in command: %r", string)
 
-        self.sendLine(string.encode("utf-8"))
+        encoded_string = string.encode("utf-8")
+
+        if len(encoded_string) > self.MAX_LENGTH:
+            raise Exception(
+                "Failed to send command %s as too long (%d > %d)" % (
+                    cmd.NAME,
+                    len(encoded_string), self.MAX_LENGTH,
+                )
+            )
+
+        self.sendLine(encoded_string)
 
         self.last_sent_command = self.clock.time_msec()
 
@@ -361,6 +371,11 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
     def id(self):
         return "%s-%s" % (self.name, self.conn_id)
 
+    def lineLengthExceeded(self, line):
+        """Called when we receive a line that is above the maximum line length
+        """
+        self.send_error("Line length exceeded")
+
 
 class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
     VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS
@@ -511,6 +526,11 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
         self.server_name = server_name
         self.handler = handler
 
+        # Set of stream names that have been subscribe to, but haven't yet
+        # caught up with. This is used to track when the client has been fully
+        # connected to the remote.
+        self.streams_connecting = set()
+
         # Map of stream to batched updates. See RdataCommand for info on how
         # batching works.
         self.pending_batches = {}
@@ -533,6 +553,10 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
         # We've now finished connecting to so inform the client handler
         self.handler.update_connection(self)
 
+        # This will happen if we don't actually subscribe to any streams
+        if not self.streams_connecting:
+            self.handler.finished_connecting()
+
     def on_SERVER(self, cmd):
         if cmd.data != self.server_name:
             logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
@@ -562,6 +586,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
             return self.handler.on_rdata(stream_name, cmd.token, rows)
 
     def on_POSITION(self, cmd):
+        # When we get a `POSITION` command it means we've finished getting
+        # missing updates for the given stream, and are now up to date.
+        self.streams_connecting.discard(cmd.stream_name)
+        if not self.streams_connecting:
+            self.handler.finished_connecting()
+
         return self.handler.on_position(cmd.stream_name, cmd.token)
 
     def on_SYNC(self, cmd):
@@ -578,6 +608,8 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
             self.id(), stream_name, token
         )
 
+        self.streams_connecting.add(stream_name)
+
         self.send_command(ReplicateCommand(stream_name, token))
 
     def on_connection_closed(self):
diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py
index f7bb710642..ac035c7735 100644
--- a/synapse/rest/client/v2_alpha/auth.py
+++ b/synapse/rest/client/v2_alpha/auth.py
@@ -33,7 +33,7 @@ RECAPTCHA_TEMPLATE = """
 <title>Authentication</title>
 <meta name='viewport' content='width=device-width, initial-scale=1,
     user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
-<script src="https://www.google.com/recaptcha/api.js"
+<script src="https://www.recaptcha.net/recaptcha/api.js"
     async defer></script>
 <script src="//code.jquery.com/jquery-1.11.2.min.js"></script>
 <link rel="stylesheet" href="/_matrix/static/client/register/style.css">
diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py
index efe42a429d..fece1ef0b8 100644
--- a/synapse/rest/media/v1/_base.py
+++ b/synapse/rest/media/v1/_base.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2019 New Vector Ltd.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -133,8 +134,15 @@ def respond_with_responder(request, responder, media_type, file_size, upload_nam
 
     logger.debug("Responding to media request with responder %s")
     add_file_headers(request, media_type, file_size, upload_name)
-    with responder:
-        yield responder.write_to_consumer(request)
+    try:
+        with responder:
+            yield responder.write_to_consumer(request)
+    except Exception as e:
+        # The majority of the time this will be due to the client having gone
+        # away. Unfortunately, Twisted simply throws a generic exception at us
+        # in that case.
+        logger.warning("Failed to write to consumer: %s %s", type(e), e)
+
     finish_request(request)
 
 
@@ -206,8 +214,7 @@ def get_filename_from_headers(headers):
     Content-Disposition HTTP header.
 
     Args:
-        headers (twisted.web.http_headers.Headers): The HTTP
-            request headers.
+        headers (dict[bytes, list[bytes]]): The HTTP request headers.
 
     Returns:
         A Unicode string of the filename, or None.
@@ -218,23 +225,12 @@ def get_filename_from_headers(headers):
     if not content_disposition[0]:
         return
 
-    # dict of unicode: bytes, corresponding to the key value sections of the
-    # Content-Disposition header.
-    params = {}
-    parts = content_disposition[0].split(b";")
-    for i in parts:
-        # Split into key-value pairs, if able
-        # We don't care about things like `inline`, so throw it out
-        if b"=" not in i:
-            continue
-
-        key, value = i.strip().split(b"=")
-        params[key.decode('ascii')] = value
+    _, params = _parse_header(content_disposition[0])
 
     upload_name = None
 
     # First check if there is a valid UTF-8 filename
-    upload_name_utf8 = params.get("filename*", None)
+    upload_name_utf8 = params.get(b"filename*", None)
     if upload_name_utf8:
         if upload_name_utf8.lower().startswith(b"utf-8''"):
             upload_name_utf8 = upload_name_utf8[7:]
@@ -260,12 +256,68 @@ def get_filename_from_headers(headers):
 
     # If there isn't check for an ascii name.
     if not upload_name:
-        upload_name_ascii = params.get("filename", None)
+        upload_name_ascii = params.get(b"filename", None)
         if upload_name_ascii and is_ascii(upload_name_ascii):
-            # Make sure there's no %-quoted bytes. If there is, reject it as
-            # non-valid ASCII.
-            if b"%" not in upload_name_ascii:
-                upload_name = upload_name_ascii.decode('ascii')
+            upload_name = upload_name_ascii.decode('ascii')
 
     # This may be None here, indicating we did not find a matching name.
     return upload_name
+
+
+def _parse_header(line):
+    """Parse a Content-type like header.
+
+    Cargo-culted from `cgi`, but works on bytes rather than strings.
+
+    Args:
+        line (bytes): header to be parsed
+
+    Returns:
+        Tuple[bytes, dict[bytes, bytes]]:
+            the main content-type, followed by the parameter dictionary
+    """
+    parts = _parseparam(b';' + line)
+    key = next(parts)
+    pdict = {}
+    for p in parts:
+        i = p.find(b'=')
+        if i >= 0:
+            name = p[:i].strip().lower()
+            value = p[i + 1:].strip()
+
+            # strip double-quotes
+            if len(value) >= 2 and value[0:1] == value[-1:] == b'"':
+                value = value[1:-1]
+                value = value.replace(b'\\\\', b'\\').replace(b'\\"', b'"')
+            pdict[name] = value
+
+    return key, pdict
+
+
+def _parseparam(s):
+    """Generator which splits the input on ;, respecting double-quoted sequences
+
+    Cargo-culted from `cgi`, but works on bytes rather than strings.
+
+    Args:
+        s (bytes): header to be parsed
+
+    Returns:
+        Iterable[bytes]: the split input
+    """
+    while s[:1] == b';':
+        s = s[1:]
+
+        # look for the next ;
+        end = s.find(b';')
+
+        # if there is an odd number of " marks between here and the next ;, skip to the
+        # next ; instead
+        while end > 0 and (s.count(b'"', 0, end) - s.count(b'\\"', 0, end)) % 2:
+            end = s.find(b';', end + 1)
+
+        if end < 0:
+            end = len(s)
+        f = s[:end]
+        yield f.strip()
+        s = s[end:]
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 06cd083a74..fb8df56cd5 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -7,9 +7,9 @@ import synapse.handlers.auth
 import synapse.handlers.deactivate_account
 import synapse.handlers.device
 import synapse.handlers.e2e_keys
+import synapse.handlers.message
 import synapse.handlers.room
 import synapse.handlers.room_member
-import synapse.handlers.message
 import synapse.handlers.set_password
 import synapse.rest.media.v1.media_repository
 import synapse.server_notices.server_notices_manager
diff --git a/synapse/static/client/register/index.html b/synapse/static/client/register/index.html
index 886f2edd1f..6edc4deb03 100644
--- a/synapse/static/client/register/index.html
+++ b/synapse/static/client/register/index.html
@@ -4,7 +4,7 @@
 <meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'> 
 <link rel="stylesheet" href="style.css">
 <script src="js/jquery-2.1.3.min.js"></script>
-<script src="https://www.google.com/recaptcha/api/js/recaptcha_ajax.js"></script>
+<script src="https://www.recaptcha.net/recaptcha/api/js/recaptcha_ajax.js"></script>
 <script src="register_config.js"></script>
 <script src="js/register.js"></script>
 </head>
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 3d895da43c..a0333d5309 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -30,6 +30,7 @@ from synapse.api.errors import StoreError
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import get_domain_from_id
+from synapse.util import batch_iter
 from synapse.util.caches.descriptors import Cache
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.stringutils import exception_to_unicode
@@ -1327,10 +1328,16 @@ class SQLBaseStore(object):
         """
         txn.call_after(self._invalidate_state_caches, room_id, members_changed)
 
-        keys = itertools.chain([room_id], members_changed)
-        self._send_invalidation_to_replication(
-            txn, _CURRENT_STATE_CACHE_NAME, keys,
-        )
+        # We need to be careful that the size of the `members_changed` list
+        # isn't so large that it causes problems sending over replication, so we
+        # send them in chunks.
+        # Max line length is 16K, and max user ID length is 255, so 50 should
+        # be safe.
+        for chunk in batch_iter(members_changed, 50):
+            keys = itertools.chain([room_id], chunk)
+            self._send_invalidation_to_replication(
+                txn, _CURRENT_STATE_CACHE_NAME, keys,
+            )
 
     def _invalidate_state_caches(self, room_id, members_changed):
         """Invalidates caches that are based on the current state, but does
@@ -1342,15 +1349,43 @@ class SQLBaseStore(object):
                 changed
         """
         for member in members_changed:
-            self.get_rooms_for_user_with_stream_ordering.invalidate((member,))
+            self._attempt_to_invalidate_cache(
+                "get_rooms_for_user_with_stream_ordering", (member,),
+            )
 
         for host in set(get_domain_from_id(u) for u in members_changed):
-            self.is_host_joined.invalidate((room_id, host))
-            self.was_host_joined.invalidate((room_id, host))
+            self._attempt_to_invalidate_cache(
+                "is_host_joined", (room_id, host,),
+            )
+            self._attempt_to_invalidate_cache(
+                "was_host_joined", (room_id, host,),
+            )
+
+        self._attempt_to_invalidate_cache(
+            "get_users_in_room", (room_id,),
+        )
+        self._attempt_to_invalidate_cache(
+            "get_room_summary", (room_id,),
+        )
+        self._attempt_to_invalidate_cache(
+            "get_current_state_ids", (room_id,),
+        )
+
+    def _attempt_to_invalidate_cache(self, cache_name, key):
+        """Attempts to invalidate the cache of the given name, ignoring if the
+        cache doesn't exist. Mainly used for invalidating caches on workers,
+        where they may not have the cache.
 
-        self.get_users_in_room.invalidate((room_id,))
-        self.get_room_summary.invalidate((room_id,))
-        self.get_current_state_ids.invalidate((room_id,))
+        Args:
+            cache_name (str)
+            key (tuple)
+        """
+        try:
+            getattr(self, cache_name).invalidate(key)
+        except AttributeError:
+            # We probably haven't pulled in the cache in this worker,
+            # which is fine.
+            pass
 
     def _send_invalidation_to_replication(self, txn, cache_name, keys):
         """Notifies replication that given cache has been invalidated.
@@ -1568,6 +1603,14 @@ class SQLBaseStore(object):
 
         return cls.cursor_to_dict(txn)
 
+    @property
+    def database_engine_name(self):
+        return self.database_engine.module.__name__
+
+    def get_server_version(self):
+        """Returns a string describing the server version number"""
+        return self.database_engine.server_version
+
 
 class _RollbackButIsFineException(Exception):
     """ This exception is used to rollback a transaction without implying
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 4004427c7b..dc3238501c 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -23,6 +23,7 @@ class PostgresEngine(object):
         self.module = database_module
         self.module.extensions.register_type(self.module.extensions.UNICODE)
         self.synchronous_commit = database_config.get("synchronous_commit", True)
+        self._version = None   # unknown as yet
 
     def check_database(self, txn):
         txn.execute("SHOW SERVER_ENCODING")
@@ -87,3 +88,27 @@ class PostgresEngine(object):
         """
         txn.execute("SELECT nextval('state_group_id_seq')")
         return txn.fetchone()[0]
+
+    @property
+    def server_version(self):
+        """Returns a string giving the server version. For example: '8.1.5'
+
+        Returns:
+            string
+        """
+        # note that this is a bit of a hack because it relies on on_new_connection
+        # having been called at least once. Still, that should be a safe bet here.
+        numver = self._version
+        assert numver is not None
+
+        # https://www.postgresql.org/docs/current/libpq-status.html#LIBPQ-PQSERVERVERSION
+        if numver >= 100000:
+            return "%i.%i" % (
+                numver / 10000, numver % 10000,
+            )
+        else:
+            return "%i.%i.%i" % (
+                numver / 10000,
+                (numver % 10000) / 100,
+                numver % 100,
+            )
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 059ab81055..1bcd5b99a4 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -70,6 +70,15 @@ class Sqlite3Engine(object):
             self._current_state_group_id += 1
             return self._current_state_group_id
 
+    @property
+    def server_version(self):
+        """Gets a string giving the server version. For example: '3.22.0'
+
+        Returns:
+            string
+        """
+        return "%i.%i.%i" % self.module.sqlite_version_info
+
 
 # Following functions taken from: https://github.com/coleifer/peewee
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 9b9572890b..9b6c28892c 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -295,6 +295,39 @@ class RegistrationWorkerStore(SQLBaseStore):
             return ret['user_id']
         return None
 
+    @defer.inlineCallbacks
+    def user_add_threepid(self, user_id, medium, address, validated_at, added_at):
+        yield self._simple_upsert("user_threepids", {
+            "medium": medium,
+            "address": address,
+        }, {
+            "user_id": user_id,
+            "validated_at": validated_at,
+            "added_at": added_at,
+        })
+
+    @defer.inlineCallbacks
+    def user_get_threepids(self, user_id):
+        ret = yield self._simple_select_list(
+            "user_threepids", {
+                "user_id": user_id
+            },
+            ['medium', 'address', 'validated_at', 'added_at'],
+            'user_get_threepids'
+        )
+        defer.returnValue(ret)
+
+    def user_delete_threepid(self, user_id, medium, address):
+        return self._simple_delete(
+            "user_threepids",
+            keyvalues={
+                "user_id": user_id,
+                "medium": medium,
+                "address": address,
+            },
+            desc="user_delete_threepids",
+        )
+
 
 class RegistrationStore(RegistrationWorkerStore,
                         background_updates.BackgroundUpdateStore):
@@ -633,39 +666,6 @@ class RegistrationStore(RegistrationWorkerStore,
         defer.returnValue(res if res else False)
 
     @defer.inlineCallbacks
-    def user_add_threepid(self, user_id, medium, address, validated_at, added_at):
-        yield self._simple_upsert("user_threepids", {
-            "medium": medium,
-            "address": address,
-        }, {
-            "user_id": user_id,
-            "validated_at": validated_at,
-            "added_at": added_at,
-        })
-
-    @defer.inlineCallbacks
-    def user_get_threepids(self, user_id):
-        ret = yield self._simple_select_list(
-            "user_threepids", {
-                "user_id": user_id
-            },
-            ['medium', 'address', 'validated_at', 'added_at'],
-            'user_get_threepids'
-        )
-        defer.returnValue(ret)
-
-    def user_delete_threepid(self, user_id, medium, address):
-        return self._simple_delete(
-            "user_threepids",
-            keyvalues={
-                "user_id": user_id,
-                "medium": medium,
-                "address": address,
-            },
-            desc="user_delete_threepids",
-        )
-
-    @defer.inlineCallbacks
     def save_or_get_3pid_guest_access_token(
             self, medium, address, access_token, inviter_user_id
     ):