summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/4338.feature1
-rw-r--r--changelog.d/5200.bugfix1
-rw-r--r--changelog.d/5216.misc1
-rw-r--r--changelog.d/5220.feature1
-rw-r--r--changelog.d/5223.feature1
-rw-r--r--changelog.d/5227.misc1
-rw-r--r--changelog.d/5230.misc1
-rw-r--r--changelog.d/5232.misc1
-rw-r--r--changelog.d/5233.bugfix1
-rw-r--r--changelog.d/5234.misc1
-rw-r--r--changelog.d/5235.misc1
-rw-r--r--changelog.d/5236.misc1
-rw-r--r--changelog.d/5237.misc1
-rw-r--r--changelog.d/5244.misc1
-rw-r--r--changelog.d/5249.feature1
-rw-r--r--changelog.d/5250.misc1
-rw-r--r--changelog.d/5251.bugfix1
-rw-r--r--changelog.d/5256.bugfix1
-rw-r--r--changelog.d/5257.bugfix1
-rw-r--r--changelog.d/5258.bugfix1
-rw-r--r--changelog.d/5260.feature1
-rw-r--r--changelog.d/5268.bugfix1
-rw-r--r--changelog.d/5274.bugfix1
-rw-r--r--changelog.d/5275.bugfix1
-rw-r--r--changelog.d/5277.bugfix1
-rw-r--r--changelog.d/5278.bugfix1
-rw-r--r--changelog.d/5282.doc1
-rw-r--r--changelog.d/5283.misc1
-rw-r--r--changelog.d/5286.feature1
-rw-r--r--changelog.d/5287.misc1
-rw-r--r--changelog.d/5288.misc1
-rw-r--r--changelog.d/5291.bugfix1
-rw-r--r--docker/README.md2
-rw-r--r--docs/CAPTCHA_SETUP.rst1
-rw-r--r--docs/sample_config.yaml32
-rw-r--r--docs/user_directory.md10
-rw-r--r--synapse/api/constants.py1
-rw-r--r--synapse/api/room_versions.py4
-rw-r--r--synapse/api/urls.py1
-rw-r--r--synapse/app/_base.py20
-rw-r--r--synapse/app/client_reader.py2
-rw-r--r--synapse/config/homeserver.py42
-rw-r--r--synapse/config/server.py33
-rw-r--r--synapse/config/stats.py60
-rw-r--r--synapse/config/user_directory.py6
-rw-r--r--synapse/crypto/keyring.py660
-rw-r--r--synapse/events/builder.py7
-rw-r--r--synapse/events/utils.py5
-rw-r--r--synapse/federation/transport/server.py31
-rw-r--r--synapse/handlers/events.py3
-rw-r--r--synapse/handlers/federation.py301
-rw-r--r--synapse/handlers/message.py3
-rw-r--r--synapse/handlers/presence.py99
-rw-r--r--synapse/handlers/room.py9
-rw-r--r--synapse/handlers/stats.py325
-rw-r--r--synapse/http/matrixfederationclient.py4
-rw-r--r--synapse/http/servlet.py2
-rw-r--r--synapse/python_dependencies.py8
-rw-r--r--synapse/rest/admin/__init__.py12
-rw-r--r--synapse/rest/client/v1/login.py4
-rw-r--r--synapse/rest/client/v1/logout.py27
-rw-r--r--synapse/rest/client/v2_alpha/capabilities.py5
-rw-r--r--synapse/rest/client/v2_alpha/sync.py3
-rw-r--r--synapse/rest/key/v2/remote_key_resource.py6
-rw-r--r--synapse/rest/media/v1/thumbnail_resource.py4
-rw-r--r--synapse/server.py6
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/events_worker.py74
-rw-r--r--synapse/storage/keys.py90
-rw-r--r--synapse/storage/roommember.py32
-rw-r--r--synapse/storage/schema/delta/54/account_validity_with_renewal.sql (renamed from synapse/storage/schema/delta/54/account_validity.sql)3
-rw-r--r--synapse/storage/schema/delta/54/add_validity_to_server_keys.sql23
-rw-r--r--synapse/storage/schema/delta/54/stats.sql80
-rw-r--r--synapse/storage/state_deltas.py12
-rw-r--r--synapse/storage/stats.py450
-rw-r--r--synapse/util/logcontext.py22
-rw-r--r--tests/crypto/test_keyring.py156
-rw-r--r--tests/federation/test_complexity.py90
-rw-r--r--tests/handlers/test_stats.py251
-rw-r--r--tests/rest/client/v1/utils.py17
-rw-r--r--tests/rest/client/v2_alpha/test_capabilities.py7
-rw-r--r--tests/storage/test_keys.py70
82 files changed, 2502 insertions, 647 deletions
diff --git a/changelog.d/4338.feature b/changelog.d/4338.feature
new file mode 100644
index 0000000000..01285e965c
--- /dev/null
+++ b/changelog.d/4338.feature
@@ -0,0 +1 @@
+Synapse now more efficiently collates room statistics.
diff --git a/changelog.d/5200.bugfix b/changelog.d/5200.bugfix
new file mode 100644
index 0000000000..f346c7b0cc
--- /dev/null
+++ b/changelog.d/5200.bugfix
@@ -0,0 +1 @@
+Fix worker registration bug caused by ClientReaderSlavedStore being unable to see get_profileinfo.
diff --git a/changelog.d/5216.misc b/changelog.d/5216.misc
new file mode 100644
index 0000000000..dbfa29475f
--- /dev/null
+++ b/changelog.d/5216.misc
@@ -0,0 +1 @@
+Synapse will now serve the experimental "room complexity" API endpoint.
diff --git a/changelog.d/5220.feature b/changelog.d/5220.feature
new file mode 100644
index 0000000000..747098c166
--- /dev/null
+++ b/changelog.d/5220.feature
@@ -0,0 +1 @@
+Add experimental support for relations (aka reactions and edits).
diff --git a/changelog.d/5223.feature b/changelog.d/5223.feature
new file mode 100644
index 0000000000..cfdf1ad41b
--- /dev/null
+++ b/changelog.d/5223.feature
@@ -0,0 +1 @@
+Ability to configure default room version.
diff --git a/changelog.d/5227.misc b/changelog.d/5227.misc
new file mode 100644
index 0000000000..32bd7b6009
--- /dev/null
+++ b/changelog.d/5227.misc
@@ -0,0 +1 @@
+Simplifications and comments in do_auth.
diff --git a/changelog.d/5230.misc b/changelog.d/5230.misc
new file mode 100644
index 0000000000..c681bc9748
--- /dev/null
+++ b/changelog.d/5230.misc
@@ -0,0 +1 @@
+Remove urllib3 pin as requests 2.22.0 has been released supporting urllib3 1.25.2.
diff --git a/changelog.d/5232.misc b/changelog.d/5232.misc
new file mode 100644
index 0000000000..1cdc71f095
--- /dev/null
+++ b/changelog.d/5232.misc
@@ -0,0 +1 @@
+Run black on synapse.crypto.keyring.
diff --git a/changelog.d/5233.bugfix b/changelog.d/5233.bugfix
new file mode 100644
index 0000000000..d71b962160
--- /dev/null
+++ b/changelog.d/5233.bugfix
@@ -0,0 +1 @@
+Fix appservice timestamp massaging.
diff --git a/changelog.d/5234.misc b/changelog.d/5234.misc
new file mode 100644
index 0000000000..43fbd6f67c
--- /dev/null
+++ b/changelog.d/5234.misc
@@ -0,0 +1 @@
+Rewrite store_server_verify_key to store several keys at once.
diff --git a/changelog.d/5235.misc b/changelog.d/5235.misc
new file mode 100644
index 0000000000..2296ad2a4f
--- /dev/null
+++ b/changelog.d/5235.misc
@@ -0,0 +1 @@
+Remove unused VerifyKey.expired and .time_added fields.
diff --git a/changelog.d/5236.misc b/changelog.d/5236.misc
new file mode 100644
index 0000000000..cb4417a9f4
--- /dev/null
+++ b/changelog.d/5236.misc
@@ -0,0 +1 @@
+Simplify Keyring.process_v2_response.
\ No newline at end of file
diff --git a/changelog.d/5237.misc b/changelog.d/5237.misc
new file mode 100644
index 0000000000..f4fe3b821b
--- /dev/null
+++ b/changelog.d/5237.misc
@@ -0,0 +1 @@
+Store key validity time in the storage layer.
diff --git a/changelog.d/5244.misc b/changelog.d/5244.misc
new file mode 100644
index 0000000000..9cc1fb869d
--- /dev/null
+++ b/changelog.d/5244.misc
@@ -0,0 +1 @@
+Refactor synapse.crypto.keyring to use a KeyFetcher interface.
diff --git a/changelog.d/5249.feature b/changelog.d/5249.feature
new file mode 100644
index 0000000000..cfdf1ad41b
--- /dev/null
+++ b/changelog.d/5249.feature
@@ -0,0 +1 @@
+Ability to configure default room version.
diff --git a/changelog.d/5250.misc b/changelog.d/5250.misc
new file mode 100644
index 0000000000..575a299a82
--- /dev/null
+++ b/changelog.d/5250.misc
@@ -0,0 +1 @@
+Simplification to Keyring.wait_for_previous_lookups.
diff --git a/changelog.d/5251.bugfix b/changelog.d/5251.bugfix
new file mode 100644
index 0000000000..9a053204b6
--- /dev/null
+++ b/changelog.d/5251.bugfix
@@ -0,0 +1 @@
+Ensure that server_keys fetched via a notary server are correctly signed.
\ No newline at end of file
diff --git a/changelog.d/5256.bugfix b/changelog.d/5256.bugfix
new file mode 100644
index 0000000000..86316ab5dd
--- /dev/null
+++ b/changelog.d/5256.bugfix
@@ -0,0 +1 @@
+Show the correct error when logging out and access token is missing.
diff --git a/changelog.d/5257.bugfix b/changelog.d/5257.bugfix
new file mode 100644
index 0000000000..8334af9b99
--- /dev/null
+++ b/changelog.d/5257.bugfix
@@ -0,0 +1 @@
+Fix error code when there is an invalid parameter on /_matrix/client/r0/publicRooms
diff --git a/changelog.d/5258.bugfix b/changelog.d/5258.bugfix
new file mode 100644
index 0000000000..fb5d44aedb
--- /dev/null
+++ b/changelog.d/5258.bugfix
@@ -0,0 +1 @@
+Fix error when downloading thumbnail with missing width/height parameter.
diff --git a/changelog.d/5260.feature b/changelog.d/5260.feature
new file mode 100644
index 0000000000..01285e965c
--- /dev/null
+++ b/changelog.d/5260.feature
@@ -0,0 +1 @@
+Synapse now more efficiently collates room statistics.
diff --git a/changelog.d/5268.bugfix b/changelog.d/5268.bugfix
new file mode 100644
index 0000000000..1a5a03bf0a
--- /dev/null
+++ b/changelog.d/5268.bugfix
@@ -0,0 +1 @@
+Fix schema update for account validity.
diff --git a/changelog.d/5274.bugfix b/changelog.d/5274.bugfix
new file mode 100644
index 0000000000..9e14d20289
--- /dev/null
+++ b/changelog.d/5274.bugfix
@@ -0,0 +1 @@
+Fix bug where we leaked extremities when we soft failed events, leading to performance degradation.
diff --git a/changelog.d/5275.bugfix b/changelog.d/5275.bugfix
new file mode 100644
index 0000000000..45a554642a
--- /dev/null
+++ b/changelog.d/5275.bugfix
@@ -0,0 +1 @@
+Fix "db txn 'update_presence' from sentinel context" log messages.
diff --git a/changelog.d/5277.bugfix b/changelog.d/5277.bugfix
new file mode 100644
index 0000000000..371aa2e7fb
--- /dev/null
+++ b/changelog.d/5277.bugfix
@@ -0,0 +1 @@
+Fix dropped logcontexts during high outbound traffic.
diff --git a/changelog.d/5278.bugfix b/changelog.d/5278.bugfix
new file mode 100644
index 0000000000..9e14d20289
--- /dev/null
+++ b/changelog.d/5278.bugfix
@@ -0,0 +1 @@
+Fix bug where we leaked extremities when we soft failed events, leading to performance degradation.
diff --git a/changelog.d/5282.doc b/changelog.d/5282.doc
new file mode 100644
index 0000000000..350e15bc03
--- /dev/null
+++ b/changelog.d/5282.doc
@@ -0,0 +1 @@
+Fix docs on resetting the user directory.
diff --git a/changelog.d/5283.misc b/changelog.d/5283.misc
new file mode 100644
index 0000000000..002721e566
--- /dev/null
+++ b/changelog.d/5283.misc
@@ -0,0 +1 @@
+Specify the type of reCAPTCHA key to use.
diff --git a/changelog.d/5286.feature b/changelog.d/5286.feature
new file mode 100644
index 0000000000..81860279a3
--- /dev/null
+++ b/changelog.d/5286.feature
@@ -0,0 +1 @@
+CAS login will now hit the r0 API, not the deprecated v1 one.
diff --git a/changelog.d/5287.misc b/changelog.d/5287.misc
new file mode 100644
index 0000000000..1286f1dd08
--- /dev/null
+++ b/changelog.d/5287.misc
@@ -0,0 +1 @@
+Remove spurious debug from MatrixFederationHttpClient.get_json.
diff --git a/changelog.d/5288.misc b/changelog.d/5288.misc
new file mode 100644
index 0000000000..fbf049ba6a
--- /dev/null
+++ b/changelog.d/5288.misc
@@ -0,0 +1 @@
+Improve logging for logcontext leaks.
diff --git a/changelog.d/5291.bugfix b/changelog.d/5291.bugfix
new file mode 100644
index 0000000000..9e14d20289
--- /dev/null
+++ b/changelog.d/5291.bugfix
@@ -0,0 +1 @@
+Fix bug where we leaked extremities when we soft failed events, leading to performance degradation.
diff --git a/docker/README.md b/docker/README.md
index b27a692d5b..df5d0151e2 100644
--- a/docker/README.md
+++ b/docker/README.md
@@ -161,7 +161,7 @@ specify values for `SYNAPSE_CONFIG_PATH`, `SYNAPSE_SERVER_NAME` and
 example:
 
 ```
-docker run -it --rm
+docker run -it --rm \
     --mount type=volume,src=synapse-data,dst=/data \
     -e SYNAPSE_CONFIG_PATH=/data/homeserver.yaml \
     -e SYNAPSE_SERVER_NAME=my.matrix.host \
diff --git a/docs/CAPTCHA_SETUP.rst b/docs/CAPTCHA_SETUP.rst
index 19a204d9ce..0c22ee4ff6 100644
--- a/docs/CAPTCHA_SETUP.rst
+++ b/docs/CAPTCHA_SETUP.rst
@@ -7,6 +7,7 @@ Requires a public/private key pair from:
 
 https://developers.google.com/recaptcha/
 
+Must be a reCAPTCHA v2 key using the "I'm not a robot" Checkbox option
 
 Setting ReCaptcha Keys
 ----------------------
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index f658ec8ecd..edfde05a23 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -83,6 +83,16 @@ pid_file: DATADIR/homeserver.pid
 #
 #restrict_public_rooms_to_local_users: true
 
+# The default room version for newly created rooms.
+#
+# Known room versions are listed here:
+# https://matrix.org/docs/spec/#complete-list-of-room-versions
+#
+# For example, for room version 1, default_room_version should be set
+# to "1".
+#
+#default_room_version: "1"
+
 # The GC threshold parameters to pass to `gc.set_threshold`, if defined
 #
 #gc_thresholds: [700, 10, 10]
@@ -1093,9 +1103,9 @@ password_config:
 #
 # 'search_all_users' defines whether to search all users visible to your HS
 # when searching the user directory, rather than limiting to users visible
-# in public rooms.  Defaults to false.  If you set it True, you'll have to run
-# UPDATE user_directory_stream_pos SET stream_id = NULL;
-# on your database to tell it to rebuild the user_directory search indexes.
+# in public rooms.  Defaults to false.  If you set it True, you'll have to
+# rebuild the user_directory search indexes, see
+# https://github.com/matrix-org/synapse/blob/master/docs/user_directory.md
 #
 #user_directory:
 #  enabled: true
@@ -1153,6 +1163,22 @@ password_config:
 #
 
 
+
+# Local statistics collection. Used in populating the room directory.
+#
+# 'bucket_size' controls how large each statistics timeslice is. It can
+# be defined in a human readable short form -- e.g. "1d", "1y".
+#
+# 'retention' controls how long historical statistics will be kept for.
+# It can be defined in a human readable short form -- e.g. "1d", "1y".
+#
+#
+#stats:
+#   enabled: true
+#   bucket_size: 1d
+#   retention: 1y
+
+
 # Server Notices room configuration
 #
 # Uncomment this section to enable a room which can be used to send notices
diff --git a/docs/user_directory.md b/docs/user_directory.md
index 4c8ee44f37..e64aa453cc 100644
--- a/docs/user_directory.md
+++ b/docs/user_directory.md
@@ -7,11 +7,7 @@ who are present in a publicly viewable room present on the server.
 
 The directory info is stored in various tables, which can (typically after
 DB corruption) get stale or out of sync.  If this happens, for now the
-quickest solution to fix it is:
-
-```
-UPDATE user_directory_stream_pos SET stream_id = NULL;
-```
-
-and restart the synapse, which should then start a background task to
+solution to fix it is to execute the SQL here
+https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/delta/53/user_dir_populate.sql
+and then restart synapse. This should then start a background task to
 flush the current tables and regenerate the directory.
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 6b347b1749..ee129c8689 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -79,6 +79,7 @@ class EventTypes(object):
 
     RoomHistoryVisibility = "m.room.history_visibility"
     CanonicalAlias = "m.room.canonical_alias"
+    Encryption = "m.room.encryption"
     RoomAvatar = "m.room.avatar"
     RoomEncryption = "m.room.encryption"
     GuestAccess = "m.room.guest_access"
diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
index b2895355a8..4085bd10b9 100644
--- a/synapse/api/room_versions.py
+++ b/synapse/api/room_versions.py
@@ -85,10 +85,6 @@ class RoomVersions(object):
     )
 
 
-# the version we will give rooms which are created on this server
-DEFAULT_ROOM_VERSION = RoomVersions.V1
-
-
 KNOWN_ROOM_VERSIONS = {
     v.identifier: v for v in (
         RoomVersions.V1,
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index 3c6bddff7a..e16c386a14 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -26,6 +26,7 @@ CLIENT_API_PREFIX = "/_matrix/client"
 FEDERATION_PREFIX = "/_matrix/federation"
 FEDERATION_V1_PREFIX = FEDERATION_PREFIX + "/v1"
 FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2"
+FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable"
 STATIC_PREFIX = "/_matrix/static"
 WEB_CLIENT_PREFIX = "/_matrix/client"
 CONTENT_REPO_PREFIX = "/_matrix/content"
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 08199a5e8d..8cc990399f 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -344,15 +344,21 @@ class _LimitedHostnameResolver(object):
 
     def resolveHostName(self, resolutionReceiver, hostName, portNumber=0,
                         addressTypes=None, transportSemantics='TCP'):
-        # Note this is happening deep within the reactor, so we don't need to
-        # worry about log contexts.
-
         # We need this function to return `resolutionReceiver` so we do all the
         # actual logic involving deferreds in a separate function.
-        self._resolve(
-            resolutionReceiver, hostName, portNumber,
-            addressTypes, transportSemantics,
-        )
+
+        # even though this is happening within the depths of twisted, we need to drop
+        # our logcontext before starting _resolve, otherwise: (a) _resolve will drop
+        # the logcontext if it returns an incomplete deferred; (b) _resolve will
+        # call the resolutionReceiver *with* a logcontext, which it won't be expecting.
+        with PreserveLoggingContext():
+            self._resolve(
+                resolutionReceiver,
+                hostName,
+                portNumber,
+                addressTypes,
+                transportSemantics,
+            )
 
         return resolutionReceiver
 
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 864f1eac48..a16e037f32 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -38,6 +38,7 @@ from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.profile import SlavedProfileStore
 from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
@@ -81,6 +82,7 @@ class ClientReaderSlavedStore(
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
     SlavedTransactionStore,
+    SlavedProfileStore,
     SlavedClientIpStore,
     BaseSlavedStore,
 ):
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 727fdc54d8..5c4fc8ff21 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -13,6 +13,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 from .api import ApiConfig
 from .appservice import AppServiceConfig
 from .captcha import CaptchaConfig
@@ -36,20 +37,41 @@ from .saml2_config import SAML2Config
 from .server import ServerConfig
 from .server_notices_config import ServerNoticesConfig
 from .spam_checker import SpamCheckerConfig
+from .stats import StatsConfig
 from .tls import TlsConfig
 from .user_directory import UserDirectoryConfig
 from .voip import VoipConfig
 from .workers import WorkerConfig
 
 
-class HomeServerConfig(ServerConfig, TlsConfig, DatabaseConfig, LoggingConfig,
-                       RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
-                       VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
-                       AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
-                       JWTConfig, PasswordConfig, EmailConfig,
-                       WorkerConfig, PasswordAuthProviderConfig, PushConfig,
-                       SpamCheckerConfig, GroupsConfig, UserDirectoryConfig,
-                       ConsentConfig,
-                       ServerNoticesConfig, RoomDirectoryConfig,
-                       ):
+class HomeServerConfig(
+    ServerConfig,
+    TlsConfig,
+    DatabaseConfig,
+    LoggingConfig,
+    RatelimitConfig,
+    ContentRepositoryConfig,
+    CaptchaConfig,
+    VoipConfig,
+    RegistrationConfig,
+    MetricsConfig,
+    ApiConfig,
+    AppServiceConfig,
+    KeyConfig,
+    SAML2Config,
+    CasConfig,
+    JWTConfig,
+    PasswordConfig,
+    EmailConfig,
+    WorkerConfig,
+    PasswordAuthProviderConfig,
+    PushConfig,
+    SpamCheckerConfig,
+    GroupsConfig,
+    UserDirectoryConfig,
+    ConsentConfig,
+    StatsConfig,
+    ServerNoticesConfig,
+    RoomDirectoryConfig,
+):
     pass
diff --git a/synapse/config/server.py b/synapse/config/server.py
index f34aa42afa..e763e19e15 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -20,6 +20,7 @@ import os.path
 
 from netaddr import IPSet
 
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.http.endpoint import parse_and_validate_server_name
 from synapse.python_dependencies import DependencyException, check_requirements
 
@@ -35,6 +36,8 @@ logger = logging.Logger(__name__)
 # in the list.
 DEFAULT_BIND_ADDRESSES = ['::', '0.0.0.0']
 
+DEFAULT_ROOM_VERSION = "1"
+
 
 class ServerConfig(Config):
 
@@ -88,6 +91,22 @@ class ServerConfig(Config):
             "restrict_public_rooms_to_local_users", False,
         )
 
+        default_room_version = config.get(
+            "default_room_version", DEFAULT_ROOM_VERSION,
+        )
+
+        # Ensure room version is a str
+        default_room_version = str(default_room_version)
+
+        if default_room_version not in KNOWN_ROOM_VERSIONS:
+            raise ConfigError(
+                "Unknown default_room_version: %s, known room versions: %s" %
+                (default_room_version, list(KNOWN_ROOM_VERSIONS.keys()))
+            )
+
+        # Get the actual room version object rather than just the identifier
+        self.default_room_version = KNOWN_ROOM_VERSIONS[default_room_version]
+
         # whether to enable search. If disabled, new entries will not be inserted
         # into the search tables and they will not be indexed. Users will receive
         # errors when attempting to search for messages.
@@ -310,6 +329,10 @@ class ServerConfig(Config):
             unsecure_port = 8008
 
         pid_file = os.path.join(data_dir_path, "homeserver.pid")
+
+        # Bring DEFAULT_ROOM_VERSION into the local-scope for use in the
+        # default config string
+        default_room_version = DEFAULT_ROOM_VERSION
         return """\
         ## Server ##
 
@@ -384,6 +407,16 @@ class ServerConfig(Config):
         #
         #restrict_public_rooms_to_local_users: true
 
+        # The default room version for newly created rooms.
+        #
+        # Known room versions are listed here:
+        # https://matrix.org/docs/spec/#complete-list-of-room-versions
+        #
+        # For example, for room version 1, default_room_version should be set
+        # to "1".
+        #
+        #default_room_version: "%(default_room_version)s"
+
         # The GC threshold parameters to pass to `gc.set_threshold`, if defined
         #
         #gc_thresholds: [700, 10, 10]
diff --git a/synapse/config/stats.py b/synapse/config/stats.py
new file mode 100644
index 0000000000..80fc1b9dd0
--- /dev/null
+++ b/synapse/config/stats.py
@@ -0,0 +1,60 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import division
+
+import sys
+
+from ._base import Config
+
+
+class StatsConfig(Config):
+    """Stats Configuration
+    Configuration for the behaviour of synapse's stats engine
+    """
+
+    def read_config(self, config):
+        self.stats_enabled = True
+        self.stats_bucket_size = 86400
+        self.stats_retention = sys.maxsize
+        stats_config = config.get("stats", None)
+        if stats_config:
+            self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
+            self.stats_bucket_size = (
+                self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
+            )
+            self.stats_retention = (
+                self.parse_duration(
+                    stats_config.get("retention", "%ds" % (sys.maxsize,))
+                )
+                / 1000
+            )
+
+    def default_config(self, config_dir_path, server_name, **kwargs):
+        return """
+        # Local statistics collection. Used in populating the room directory.
+        #
+        # 'bucket_size' controls how large each statistics timeslice is. It can
+        # be defined in a human readable short form -- e.g. "1d", "1y".
+        #
+        # 'retention' controls how long historical statistics will be kept for.
+        # It can be defined in a human readable short form -- e.g. "1d", "1y".
+        #
+        #
+        #stats:
+        #   enabled: true
+        #   bucket_size: 1d
+        #   retention: 1y
+        """
diff --git a/synapse/config/user_directory.py b/synapse/config/user_directory.py
index 142754a7dc..023997ccde 100644
--- a/synapse/config/user_directory.py
+++ b/synapse/config/user_directory.py
@@ -43,9 +43,9 @@ class UserDirectoryConfig(Config):
         #
         # 'search_all_users' defines whether to search all users visible to your HS
         # when searching the user directory, rather than limiting to users visible
-        # in public rooms.  Defaults to false.  If you set it True, you'll have to run
-        # UPDATE user_directory_stream_pos SET stream_id = NULL;
-        # on your database to tell it to rebuild the user_directory search indexes.
+        # in public rooms.  Defaults to false.  If you set it True, you'll have to
+        # rebuild the user_directory search indexes, see
+        # https://github.com/matrix-org/synapse/blob/master/docs/user_directory.md
         #
         #user_directory:
         #  enabled: true
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index d8ba870cca..c63f106cf3 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -17,10 +17,10 @@
 import logging
 from collections import namedtuple
 
+import six
 from six import raise_from
 from six.moves import urllib
 
-import nacl.signing
 from signedjson.key import (
     decode_verify_key_bytes,
     encode_verify_key_base64,
@@ -43,6 +43,7 @@ from synapse.api.errors import (
     RequestSendFailed,
     SynapseError,
 )
+from synapse.storage.keys import FetchKeyResult
 from synapse.util import logcontext, unwrapFirstError
 from synapse.util.logcontext import (
     LoggingContext,
@@ -56,9 +57,9 @@ from synapse.util.retryutils import NotRetryingDestination
 logger = logging.getLogger(__name__)
 
 
-VerifyKeyRequest = namedtuple("VerifyRequest", (
-    "server_name", "key_ids", "json_object", "deferred"
-))
+VerifyKeyRequest = namedtuple(
+    "VerifyRequest", ("server_name", "key_ids", "json_object", "deferred")
+)
 """
 A request for a verify key to verify a JSON object.
 
@@ -80,12 +81,13 @@ class KeyLookupError(ValueError):
 
 class Keyring(object):
     def __init__(self, hs):
-        self.store = hs.get_datastore()
         self.clock = hs.get_clock()
-        self.client = hs.get_http_client()
-        self.config = hs.get_config()
-        self.perspective_servers = self.config.perspectives
-        self.hs = hs
+
+        self._key_fetchers = (
+            StoreKeyFetcher(hs),
+            PerspectivesKeyFetcher(hs),
+            ServerKeyFetcher(hs),
+        )
 
         # map from server name to Deferred. Has an entry for each server with
         # an ongoing key download; the Deferred completes once the download
@@ -96,9 +98,7 @@ class Keyring(object):
 
     def verify_json_for_server(self, server_name, json_object):
         return logcontext.make_deferred_yieldable(
-            self.verify_json_objects_for_server(
-                [(server_name, json_object)]
-            )[0]
+            self.verify_json_objects_for_server([(server_name, json_object)])[0]
         )
 
     def verify_json_objects_for_server(self, server_and_json):
@@ -130,18 +130,15 @@ class Keyring(object):
             if not key_ids:
                 return defer.fail(
                     SynapseError(
-                        400,
-                        "Not signed by %s" % (server_name,),
-                        Codes.UNAUTHORIZED,
+                        400, "Not signed by %s" % (server_name,), Codes.UNAUTHORIZED
                     )
                 )
 
-            logger.debug("Verifying for %s with key_ids %s",
-                         server_name, key_ids)
+            logger.debug("Verifying for %s with key_ids %s", server_name, key_ids)
 
             # add the key request to the queue, but don't start it off yet.
             verify_request = VerifyKeyRequest(
-                server_name, key_ids, json_object, defer.Deferred(),
+                server_name, key_ids, json_object, defer.Deferred()
             )
             verify_requests.append(verify_request)
 
@@ -179,16 +176,12 @@ class Keyring(object):
             # any other lookups until we have finished.
             # The deferreds are called with no logcontext.
             server_to_deferred = {
-                rq.server_name: defer.Deferred()
-                for rq in verify_requests
+                rq.server_name: defer.Deferred() for rq in verify_requests
             }
 
             # We want to wait for any previous lookups to complete before
             # proceeding.
-            yield self.wait_for_previous_lookups(
-                [rq.server_name for rq in verify_requests],
-                server_to_deferred,
-            )
+            yield self.wait_for_previous_lookups(server_to_deferred)
 
             # Actually start fetching keys.
             self._get_server_verify_keys(verify_requests)
@@ -216,19 +209,16 @@ class Keyring(object):
                 return res
 
             for verify_request in verify_requests:
-                verify_request.deferred.addBoth(
-                    remove_deferreds, verify_request,
-                )
+                verify_request.deferred.addBoth(remove_deferreds, verify_request)
         except Exception:
             logger.exception("Error starting key lookups")
 
     @defer.inlineCallbacks
-    def wait_for_previous_lookups(self, server_names, server_to_deferred):
+    def wait_for_previous_lookups(self, server_to_deferred):
         """Waits for any previous key lookups for the given servers to finish.
 
         Args:
-            server_names (list): list of server_names we want to lookup
-            server_to_deferred (dict): server_name to deferred which gets
+            server_to_deferred (dict[str, Deferred]): server_name to deferred which gets
                 resolved once we've finished looking up keys for that server.
                 The Deferreds should be regular twisted ones which call their
                 callbacks with no logcontext.
@@ -241,14 +231,15 @@ class Keyring(object):
         while True:
             wait_on = [
                 (server_name, self.key_downloads[server_name])
-                for server_name in server_names
+                for server_name in server_to_deferred.keys()
                 if server_name in self.key_downloads
             ]
             if not wait_on:
                 break
             logger.info(
                 "Waiting for existing lookups for %s to complete [loop %i]",
-                [w[0] for w in wait_on], loop_count,
+                [w[0] for w in wait_on],
+                loop_count,
             )
             with PreserveLoggingContext():
                 yield defer.DeferredList((w[1] for w in wait_on))
@@ -279,13 +270,6 @@ class Keyring(object):
             verify_requests (list[VerifyKeyRequest]): list of verify requests
         """
 
-        # These are functions that produce keys given a list of key ids
-        key_fetch_fns = (
-            self.get_keys_from_store,  # First try the local store
-            self.get_keys_from_perspectives,  # Then try via perspectives
-            self.get_keys_from_server,  # Then try directly
-        )
-
         @defer.inlineCallbacks
         def do_iterations():
             with Measure(self.clock, "get_server_verify_keys"):
@@ -296,8 +280,8 @@ class Keyring(object):
                         verify_request.key_ids
                     )
 
-                for fn in key_fetch_fns:
-                    results = yield fn(missing_keys.items())
+                for f in self._key_fetchers:
+                    results = yield f.get_keys(missing_keys.items())
 
                     # We now need to figure out which verify requests we have keys
                     # for and which we don't
@@ -315,11 +299,15 @@ class Keyring(object):
                         # complete this VerifyKeyRequest.
                         result_keys = results.get(server_name, {})
                         for key_id in verify_request.key_ids:
-                            key = result_keys.get(key_id)
-                            if key:
+                            fetch_key_result = result_keys.get(key_id)
+                            if fetch_key_result:
                                 with PreserveLoggingContext():
                                     verify_request.deferred.callback(
-                                        (server_name, key_id, key)
+                                        (
+                                            server_name,
+                                            key_id,
+                                            fetch_key_result.verify_key,
+                                        )
                                     )
                                 break
                         else:
@@ -335,13 +323,14 @@ class Keyring(object):
 
                 with PreserveLoggingContext():
                     for verify_request in requests_missing_keys:
-                        verify_request.deferred.errback(SynapseError(
-                            401,
-                            "No key for %s with id %s" % (
-                                verify_request.server_name, verify_request.key_ids,
-                            ),
-                            Codes.UNAUTHORIZED,
-                        ))
+                        verify_request.deferred.errback(
+                            SynapseError(
+                                401,
+                                "No key for %s with id %s"
+                                % (verify_request.server_name, verify_request.key_ids),
+                                Codes.UNAUTHORIZED,
+                            )
+                        )
 
         def on_err(err):
             with PreserveLoggingContext():
@@ -351,17 +340,31 @@ class Keyring(object):
 
         run_in_background(do_iterations).addErrback(on_err)
 
-    @defer.inlineCallbacks
-    def get_keys_from_store(self, server_name_and_key_ids):
+
+class KeyFetcher(object):
+    def get_keys(self, server_name_and_key_ids):
         """
         Args:
-            server_name_and_key_ids (iterable(Tuple[str, iterable[str]]):
+            server_name_and_key_ids (iterable[Tuple[str, iterable[str]]]):
                 list of (server_name, iterable[key_id]) tuples to fetch keys for
+                Note that the iterables may be iterated more than once.
 
         Returns:
-            Deferred: resolves to dict[str, dict[str, VerifyKey|None]]: map from
-                server_name -> key_id -> VerifyKey
+            Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult|None]]]:
+                map from server_name -> key_id -> FetchKeyResult
         """
+        raise NotImplementedError
+
+
+class StoreKeyFetcher(KeyFetcher):
+    """KeyFetcher impl which fetches keys from our data store"""
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def get_keys(self, server_name_and_key_ids):
+        """see KeyFetcher.get_keys"""
         keys_to_fetch = (
             (server_name, key_id)
             for server_name, key_ids in server_name_and_key_ids
@@ -373,8 +376,135 @@ class Keyring(object):
             keys.setdefault(server_name, {})[key_id] = key
         defer.returnValue(keys)
 
+
+class BaseV2KeyFetcher(object):
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.config = hs.get_config()
+
+    @defer.inlineCallbacks
+    def process_v2_response(
+        self, from_server, response_json, time_added_ms, requested_ids=[]
+    ):
+        """Parse a 'Server Keys' structure from the result of a /key request
+
+        This is used to parse either the entirety of the response from
+        GET /_matrix/key/v2/server, or a single entry from the list returned by
+        POST /_matrix/key/v2/query.
+
+        Checks that each signature in the response that claims to come from the origin
+        server is valid, and that there is at least one such signature.
+
+        Stores the json in server_keys_json so that it can be used for future responses
+        to /_matrix/key/v2/query.
+
+        Args:
+            from_server (str): the name of the server producing this result: either
+                the origin server for a /_matrix/key/v2/server request, or the notary
+                for a /_matrix/key/v2/query.
+
+            response_json (dict): the json-decoded Server Keys response object
+
+            time_added_ms (int): the timestamp to record in server_keys_json
+
+            requested_ids (iterable[str]): a list of the key IDs that were requested.
+                We will store the json for these key ids as well as any that are
+                actually in the response
+
+        Returns:
+            Deferred[dict[str, FetchKeyResult]]: map from key_id to result object
+        """
+        ts_valid_until_ms = response_json[u"valid_until_ts"]
+
+        # start by extracting the keys from the response, since they may be required
+        # to validate the signature on the response.
+        verify_keys = {}
+        for key_id, key_data in response_json["verify_keys"].items():
+            if is_signing_algorithm_supported(key_id):
+                key_base64 = key_data["key"]
+                key_bytes = decode_base64(key_base64)
+                verify_key = decode_verify_key_bytes(key_id, key_bytes)
+                verify_keys[key_id] = FetchKeyResult(
+                    verify_key=verify_key, valid_until_ts=ts_valid_until_ms
+                )
+
+        server_name = response_json["server_name"]
+        verified = False
+        for key_id in response_json["signatures"].get(server_name, {}):
+            # each of the keys used for the signature must be present in the response
+            # json.
+            key = verify_keys.get(key_id)
+            if not key:
+                raise KeyLookupError(
+                    "Key response is signed by key id %s:%s but that key is not "
+                    "present in the response" % (server_name, key_id)
+                )
+
+            verify_signed_json(response_json, server_name, key.verify_key)
+            verified = True
+
+        if not verified:
+            raise KeyLookupError(
+                "Key response for %s is not signed by the origin server"
+                % (server_name,)
+            )
+
+        for key_id, key_data in response_json["old_verify_keys"].items():
+            if is_signing_algorithm_supported(key_id):
+                key_base64 = key_data["key"]
+                key_bytes = decode_base64(key_base64)
+                verify_key = decode_verify_key_bytes(key_id, key_bytes)
+                verify_keys[key_id] = FetchKeyResult(
+                    verify_key=verify_key, valid_until_ts=key_data["expired_ts"]
+                )
+
+        # re-sign the json with our own key, so that it is ready if we are asked to
+        # give it out as a notary server
+        signed_key_json = sign_json(
+            response_json, self.config.server_name, self.config.signing_key[0]
+        )
+
+        signed_key_json_bytes = encode_canonical_json(signed_key_json)
+
+        # for reasons I don't quite understand, we store this json for the key ids we
+        # requested, as well as those we got.
+        updated_key_ids = set(requested_ids)
+        updated_key_ids.update(verify_keys)
+
+        yield logcontext.make_deferred_yieldable(
+            defer.gatherResults(
+                [
+                    run_in_background(
+                        self.store.store_server_keys_json,
+                        server_name=server_name,
+                        key_id=key_id,
+                        from_server=from_server,
+                        ts_now_ms=time_added_ms,
+                        ts_expires_ms=ts_valid_until_ms,
+                        key_json_bytes=signed_key_json_bytes,
+                    )
+                    for key_id in updated_key_ids
+                ],
+                consumeErrors=True,
+            ).addErrback(unwrapFirstError)
+        )
+
+        defer.returnValue(verify_keys)
+
+
+class PerspectivesKeyFetcher(BaseV2KeyFetcher):
+    """KeyFetcher impl which fetches keys from the "perspectives" servers"""
+
+    def __init__(self, hs):
+        super(PerspectivesKeyFetcher, self).__init__(hs)
+        self.clock = hs.get_clock()
+        self.client = hs.get_http_client()
+        self.perspective_servers = self.config.perspectives
+
     @defer.inlineCallbacks
-    def get_keys_from_perspectives(self, server_name_and_key_ids):
+    def get_keys(self, server_name_and_key_ids):
+        """see KeyFetcher.get_keys"""
+
         @defer.inlineCallbacks
         def get_key(perspective_name, perspective_keys):
             try:
@@ -383,25 +513,26 @@ class Keyring(object):
                 )
                 defer.returnValue(result)
             except KeyLookupError as e:
-                logger.warning(
-                    "Key lookup failed from %r: %s", perspective_name, e,
-                )
+                logger.warning("Key lookup failed from %r: %s", perspective_name, e)
             except Exception as e:
                 logger.exception(
                     "Unable to get key from %r: %s %s",
                     perspective_name,
-                    type(e).__name__, str(e),
+                    type(e).__name__,
+                    str(e),
                 )
 
             defer.returnValue({})
 
-        results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                run_in_background(get_key, p_name, p_keys)
-                for p_name, p_keys in self.perspective_servers.items()
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError))
+        results = yield logcontext.make_deferred_yieldable(
+            defer.gatherResults(
+                [
+                    run_in_background(get_key, p_name, p_keys)
+                    for p_name, p_keys in self.perspective_servers.items()
+                ],
+                consumeErrors=True,
+            ).addErrback(unwrapFirstError)
+        )
 
         union_of_keys = {}
         for result in results:
@@ -411,33 +542,30 @@ class Keyring(object):
         defer.returnValue(union_of_keys)
 
     @defer.inlineCallbacks
-    def get_keys_from_server(self, server_name_and_key_ids):
-        results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                run_in_background(
-                    self.get_server_verify_key_v2_direct,
-                    server_name,
-                    key_ids,
-                )
-                for server_name, key_ids in server_name_and_key_ids
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError))
-
-        merged = {}
-        for result in results:
-            merged.update(result)
+    def get_server_verify_key_v2_indirect(
+        self, server_names_and_key_ids, perspective_name, perspective_keys
+    ):
+        """
+        Args:
+            server_names_and_key_ids (iterable[Tuple[str, iterable[str]]]):
+                list of (server_name, iterable[key_id]) tuples to fetch keys for
+            perspective_name (str): name of the notary server to query for the keys
+            perspective_keys (dict[str, VerifyKey]): map of key_id->key for the
+                notary server
 
-        defer.returnValue({
-            server_name: keys
-            for server_name, keys in merged.items()
-            if keys
-        })
+        Returns:
+            Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult]]]: map
+                from server_name -> key_id -> FetchKeyResult
 
-    @defer.inlineCallbacks
-    def get_server_verify_key_v2_indirect(self, server_names_and_key_ids,
-                                          perspective_name,
-                                          perspective_keys):
+        Raises:
+            KeyLookupError if there was an error processing the entire response from
+                the server
+        """
+        logger.info(
+            "Requesting keys %s from notary server %s",
+            server_names_and_key_ids,
+            perspective_name,
+        )
         # TODO(mark): Set the minimum_valid_until_ts to that needed by
         # the events being validated or the current time if validating
         # an incoming request.
@@ -448,9 +576,7 @@ class Keyring(object):
                 data={
                     u"server_keys": {
                         server_name: {
-                            key_id: {
-                                u"minimum_valid_until_ts": 0
-                            } for key_id in key_ids
+                            key_id: {u"minimum_valid_until_ts": 0} for key_id in key_ids
                         }
                         for server_name, key_ids in server_names_and_key_ids
                     }
@@ -458,240 +584,174 @@ class Keyring(object):
                 long_retries=True,
             )
         except (NotRetryingDestination, RequestSendFailed) as e:
-            raise_from(
-                KeyLookupError("Failed to connect to remote server"), e,
-            )
+            raise_from(KeyLookupError("Failed to connect to remote server"), e)
         except HttpResponseException as e:
-            raise_from(
-                KeyLookupError("Remote server returned an error"), e,
-            )
+            raise_from(KeyLookupError("Remote server returned an error"), e)
 
         keys = {}
+        added_keys = []
 
-        responses = query_response["server_keys"]
+        time_now_ms = self.clock.time_msec()
 
-        for response in responses:
-            if (u"signatures" not in response
-                    or perspective_name not in response[u"signatures"]):
+        for response in query_response["server_keys"]:
+            # do this first, so that we can give useful errors thereafter
+            server_name = response.get("server_name")
+            if not isinstance(server_name, six.string_types):
                 raise KeyLookupError(
-                    "Key response not signed by perspective server"
-                    " %r" % (perspective_name,)
+                    "Malformed response from key notary server %s: invalid server_name"
+                    % (perspective_name,)
                 )
 
-            verified = False
-            for key_id in response[u"signatures"][perspective_name]:
-                if key_id in perspective_keys:
-                    verify_signed_json(
-                        response,
-                        perspective_name,
-                        perspective_keys[key_id]
-                    )
-                    verified = True
-
-            if not verified:
-                logging.info(
-                    "Response from perspective server %r not signed with a"
-                    " known key, signed with: %r, known keys: %r",
+            try:
+                processed_response = yield self._process_perspectives_response(
                     perspective_name,
-                    list(response[u"signatures"][perspective_name]),
-                    list(perspective_keys)
+                    perspective_keys,
+                    response,
+                    time_added_ms=time_now_ms,
                 )
-                raise KeyLookupError(
-                    "Response not signed with a known key for perspective"
-                    " server %r" % (perspective_name,)
+            except KeyLookupError as e:
+                logger.warning(
+                    "Error processing response from key notary server %s for origin "
+                    "server %s: %s",
+                    perspective_name,
+                    server_name,
+                    e,
                 )
+                # we continue to process the rest of the response
+                continue
 
-            processed_response = yield self.process_v2_response(
-                perspective_name, response
+            added_keys.extend(
+                (server_name, key_id, key) for key_id, key in processed_response.items()
             )
-            server_name = response["server_name"]
-
             keys.setdefault(server_name, {}).update(processed_response)
 
-        yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                run_in_background(
-                    self.store_keys,
-                    server_name=server_name,
-                    from_server=perspective_name,
-                    verify_keys=response_keys,
-                )
-                for server_name, response_keys in keys.items()
-            ],
-            consumeErrors=True
-        ).addErrback(unwrapFirstError))
+        yield self.store.store_server_verify_keys(
+            perspective_name, time_now_ms, added_keys
+        )
 
         defer.returnValue(keys)
 
+    def _process_perspectives_response(
+        self, perspective_name, perspective_keys, response, time_added_ms
+    ):
+        """Parse a 'Server Keys' structure from the result of a /key/query request
+
+        Checks that the entry is correctly signed by the perspectives server, and then
+        passes over to process_v2_response
+
+        Args:
+            perspective_name (str): the name of the notary server that produced this
+                result
+
+            perspective_keys (dict[str, VerifyKey]): map of key_id->key for the
+                notary server
+
+            response (dict): the json-decoded Server Keys response object
+
+            time_added_ms (int): the timestamp to record in server_keys_json
+
+        Returns:
+            Deferred[dict[str, FetchKeyResult]]: map from key_id to result object
+        """
+        if (
+            u"signatures" not in response
+            or perspective_name not in response[u"signatures"]
+        ):
+            raise KeyLookupError("Response not signed by the notary server")
+
+        verified = False
+        for key_id in response[u"signatures"][perspective_name]:
+            if key_id in perspective_keys:
+                verify_signed_json(response, perspective_name, perspective_keys[key_id])
+                verified = True
+
+        if not verified:
+            raise KeyLookupError(
+                "Response not signed with a known key: signed with: %r, known keys: %r"
+                % (
+                    list(response[u"signatures"][perspective_name].keys()),
+                    list(perspective_keys.keys()),
+                )
+            )
+
+        return self.process_v2_response(
+            perspective_name, response, time_added_ms=time_added_ms
+        )
+
+
+class ServerKeyFetcher(BaseV2KeyFetcher):
+    """KeyFetcher impl which fetches keys from the origin servers"""
+
+    def __init__(self, hs):
+        super(ServerKeyFetcher, self).__init__(hs)
+        self.clock = hs.get_clock()
+        self.client = hs.get_http_client()
+
+    @defer.inlineCallbacks
+    def get_keys(self, server_name_and_key_ids):
+        """see KeyFetcher.get_keys"""
+        results = yield logcontext.make_deferred_yieldable(
+            defer.gatherResults(
+                [
+                    run_in_background(
+                        self.get_server_verify_key_v2_direct, server_name, key_ids
+                    )
+                    for server_name, key_ids in server_name_and_key_ids
+                ],
+                consumeErrors=True,
+            ).addErrback(unwrapFirstError)
+        )
+
+        merged = {}
+        for result in results:
+            merged.update(result)
+
+        defer.returnValue(
+            {server_name: keys for server_name, keys in merged.items() if keys}
+        )
+
     @defer.inlineCallbacks
     def get_server_verify_key_v2_direct(self, server_name, key_ids):
-        keys = {}  # type: dict[str, nacl.signing.VerifyKey]
+        keys = {}  # type: dict[str, FetchKeyResult]
 
         for requested_key_id in key_ids:
             if requested_key_id in keys:
                 continue
 
+            time_now_ms = self.clock.time_msec()
             try:
                 response = yield self.client.get_json(
                     destination=server_name,
-                    path="/_matrix/key/v2/server/" + urllib.parse.quote(requested_key_id),
+                    path="/_matrix/key/v2/server/"
+                    + urllib.parse.quote(requested_key_id),
                     ignore_backoff=True,
                 )
             except (NotRetryingDestination, RequestSendFailed) as e:
-                raise_from(
-                    KeyLookupError("Failed to connect to remote server"), e,
-                )
+                raise_from(KeyLookupError("Failed to connect to remote server"), e)
             except HttpResponseException as e:
-                raise_from(
-                    KeyLookupError("Remote server returned an error"), e,
-                )
-
-            if (u"signatures" not in response
-                    or server_name not in response[u"signatures"]):
-                raise KeyLookupError("Key response not signed by remote server")
+                raise_from(KeyLookupError("Remote server returned an error"), e)
 
             if response["server_name"] != server_name:
-                raise KeyLookupError("Expected a response for server %r not %r" % (
-                    server_name, response["server_name"]
-                ))
+                raise KeyLookupError(
+                    "Expected a response for server %r not %r"
+                    % (server_name, response["server_name"])
+                )
 
             response_keys = yield self.process_v2_response(
                 from_server=server_name,
                 requested_ids=[requested_key_id],
                 response_json=response,
+                time_added_ms=time_now_ms,
+            )
+            yield self.store.store_server_verify_keys(
+                server_name,
+                time_now_ms,
+                ((server_name, key_id, key) for key_id, key in response_keys.items()),
             )
-
             keys.update(response_keys)
 
-        yield self.store_keys(
-            server_name=server_name,
-            from_server=server_name,
-            verify_keys=keys,
-        )
         defer.returnValue({server_name: keys})
 
-    @defer.inlineCallbacks
-    def process_v2_response(
-        self, from_server, response_json, requested_ids=[],
-    ):
-        """Parse a 'Server Keys' structure from the result of a /key request
-
-        This is used to parse either the entirety of the response from
-        GET /_matrix/key/v2/server, or a single entry from the list returned by
-        POST /_matrix/key/v2/query.
-
-        Checks that each signature in the response that claims to come from the origin
-        server is valid. (Does not check that there actually is such a signature, for
-        some reason.)
-
-        Stores the json in server_keys_json so that it can be used for future responses
-        to /_matrix/key/v2/query.
-
-        Args:
-            from_server (str): the name of the server producing this result: either
-                the origin server for a /_matrix/key/v2/server request, or the notary
-                for a /_matrix/key/v2/query.
-
-            response_json (dict): the json-decoded Server Keys response object
-
-            requested_ids (iterable[str]): a list of the key IDs that were requested.
-                We will store the json for these key ids as well as any that are
-                actually in the response
-
-        Returns:
-            Deferred[dict[str, nacl.signing.VerifyKey]]:
-                map from key_id to key object
-        """
-        time_now_ms = self.clock.time_msec()
-        response_keys = {}
-        verify_keys = {}
-        for key_id, key_data in response_json["verify_keys"].items():
-            if is_signing_algorithm_supported(key_id):
-                key_base64 = key_data["key"]
-                key_bytes = decode_base64(key_base64)
-                verify_key = decode_verify_key_bytes(key_id, key_bytes)
-                verify_key.time_added = time_now_ms
-                verify_keys[key_id] = verify_key
-
-        old_verify_keys = {}
-        for key_id, key_data in response_json["old_verify_keys"].items():
-            if is_signing_algorithm_supported(key_id):
-                key_base64 = key_data["key"]
-                key_bytes = decode_base64(key_base64)
-                verify_key = decode_verify_key_bytes(key_id, key_bytes)
-                verify_key.expired = key_data["expired_ts"]
-                verify_key.time_added = time_now_ms
-                old_verify_keys[key_id] = verify_key
-
-        server_name = response_json["server_name"]
-        for key_id in response_json["signatures"].get(server_name, {}):
-            if key_id not in response_json["verify_keys"]:
-                raise KeyLookupError(
-                    "Key response must include verification keys for all"
-                    " signatures"
-                )
-            if key_id in verify_keys:
-                verify_signed_json(
-                    response_json,
-                    server_name,
-                    verify_keys[key_id]
-                )
-
-        signed_key_json = sign_json(
-            response_json,
-            self.config.server_name,
-            self.config.signing_key[0],
-        )
-
-        signed_key_json_bytes = encode_canonical_json(signed_key_json)
-        ts_valid_until_ms = signed_key_json[u"valid_until_ts"]
-
-        updated_key_ids = set(requested_ids)
-        updated_key_ids.update(verify_keys)
-        updated_key_ids.update(old_verify_keys)
-
-        response_keys.update(verify_keys)
-        response_keys.update(old_verify_keys)
-
-        yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                run_in_background(
-                    self.store.store_server_keys_json,
-                    server_name=server_name,
-                    key_id=key_id,
-                    from_server=from_server,
-                    ts_now_ms=time_now_ms,
-                    ts_expires_ms=ts_valid_until_ms,
-                    key_json_bytes=signed_key_json_bytes,
-                )
-                for key_id in updated_key_ids
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError))
-
-        defer.returnValue(response_keys)
-
-    def store_keys(self, server_name, from_server, verify_keys):
-        """Store a collection of verify keys for a given server
-        Args:
-            server_name(str): The name of the server the keys are for.
-            from_server(str): The server the keys were downloaded from.
-            verify_keys(dict): A mapping of key_id to VerifyKey.
-        Returns:
-            A deferred that completes when the keys are stored.
-        """
-        # TODO(markjh): Store whether the keys have expired.
-        return logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                run_in_background(
-                    self.store.store_server_verify_key,
-                    server_name, server_name, key.time_added, key
-                )
-                for key_id, key in verify_keys.items()
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError))
-
 
 @defer.inlineCallbacks
 def _handle_key_deferred(verify_request):
@@ -713,17 +773,19 @@ def _handle_key_deferred(verify_request):
     except KeyLookupError as e:
         logger.warn(
             "Failed to download keys for %s: %s %s",
-            server_name, type(e).__name__, str(e),
+            server_name,
+            type(e).__name__,
+            str(e),
         )
         raise SynapseError(
-            502,
-            "Error downloading keys for %s" % (server_name,),
-            Codes.UNAUTHORIZED,
+            502, "Error downloading keys for %s" % (server_name,), Codes.UNAUTHORIZED
         )
     except Exception as e:
         logger.exception(
             "Got Exception when downloading keys for %s: %s %s",
-            server_name, type(e).__name__, str(e),
+            server_name,
+            type(e).__name__,
+            str(e),
         )
         raise SynapseError(
             401,
@@ -733,22 +795,24 @@ def _handle_key_deferred(verify_request):
 
     json_object = verify_request.json_object
 
-    logger.debug("Got key %s %s:%s for server %s, verifying" % (
-        key_id, verify_key.alg, verify_key.version, server_name,
-    ))
+    logger.debug(
+        "Got key %s %s:%s for server %s, verifying"
+        % (key_id, verify_key.alg, verify_key.version, server_name)
+    )
     try:
         verify_signed_json(json_object, server_name, verify_key)
     except SignatureVerifyException as e:
         logger.debug(
             "Error verifying signature for %s:%s:%s with key %s: %s",
-            server_name, verify_key.alg, verify_key.version,
+            server_name,
+            verify_key.alg,
+            verify_key.version,
             encode_verify_key_base64(verify_key),
             str(e),
         )
         raise SynapseError(
             401,
-            "Invalid signature for server %s with key %s:%s: %s" % (
-                server_name, verify_key.alg, verify_key.version, str(e),
-            ),
+            "Invalid signature for server %s with key %s:%s: %s"
+            % (server_name, verify_key.alg, verify_key.version, str(e)),
             Codes.UNAUTHORIZED,
         )
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 1fe995f212..546b6f4982 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -76,6 +76,7 @@ class EventBuilder(object):
     # someone tries to get them when they don't exist.
     _state_key = attr.ib(default=None)
     _redacts = attr.ib(default=None)
+    _origin_server_ts = attr.ib(default=None)
 
     internal_metadata = attr.ib(default=attr.Factory(lambda: _EventInternalMetadata({})))
 
@@ -142,6 +143,9 @@ class EventBuilder(object):
         if self._redacts is not None:
             event_dict["redacts"] = self._redacts
 
+        if self._origin_server_ts is not None:
+            event_dict["origin_server_ts"] = self._origin_server_ts
+
         defer.returnValue(
             create_local_event_from_event_dict(
                 clock=self._clock,
@@ -209,6 +213,7 @@ class EventBuilderFactory(object):
             content=key_values.get("content", {}),
             unsigned=key_values.get("unsigned", {}),
             redacts=key_values.get("redacts", None),
+            origin_server_ts=key_values.get("origin_server_ts", None),
         )
 
 
@@ -245,7 +250,7 @@ def create_local_event_from_event_dict(clock, hostname, signing_key,
         event_dict["event_id"] = _create_event_id(clock, hostname)
 
     event_dict["origin"] = hostname
-    event_dict["origin_server_ts"] = time_now
+    event_dict.setdefault("origin_server_ts", time_now)
 
     event_dict.setdefault("unsigned", {})
     age = event_dict["unsigned"].pop("age", 0)
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 27a2a9ef98..e2d4384de1 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -330,12 +330,13 @@ class EventClientSerializer(object):
         )
 
     @defer.inlineCallbacks
-    def serialize_event(self, event, time_now, **kwargs):
+    def serialize_event(self, event, time_now, bundle_aggregations=True, **kwargs):
         """Serializes a single event.
 
         Args:
             event (EventBase)
             time_now (int): The current time in milliseconds
+            bundle_aggregations (bool): Whether to bundle in related events
             **kwargs: Arguments to pass to `serialize_event`
 
         Returns:
@@ -350,7 +351,7 @@ class EventClientSerializer(object):
 
         # If MSC1849 is enabled then we need to look if thre are any relations
         # we need to bundle in with the event
-        if self.experimental_msc1849_support_enabled:
+        if self.experimental_msc1849_support_enabled and bundle_aggregations:
             annotations = yield self.store.get_aggregation_groups_for_event(
                 event_id,
             )
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 385eda2dca..d0efc4e0d3 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -23,7 +23,11 @@ from twisted.internet import defer
 import synapse
 from synapse.api.errors import Codes, FederationDeniedError, SynapseError
 from synapse.api.room_versions import RoomVersions
-from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
+from synapse.api.urls import (
+    FEDERATION_UNSTABLE_PREFIX,
+    FEDERATION_V1_PREFIX,
+    FEDERATION_V2_PREFIX,
+)
 from synapse.http.endpoint import parse_and_validate_server_name
 from synapse.http.server import JsonResource
 from synapse.http.servlet import (
@@ -1304,6 +1308,30 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
         defer.returnValue((200, new_content))
 
 
+class RoomComplexityServlet(BaseFederationServlet):
+    """
+    Indicates to other servers how complex (and therefore likely
+    resource-intensive) a public room this server knows about is.
+    """
+    PATH = "/rooms/(?P<room_id>[^/]*)/complexity"
+    PREFIX = FEDERATION_UNSTABLE_PREFIX
+
+    @defer.inlineCallbacks
+    def on_GET(self, origin, content, query, room_id):
+
+        store = self.handler.hs.get_datastore()
+
+        is_public = yield store.is_room_world_readable_or_publicly_joinable(
+            room_id
+        )
+
+        if not is_public:
+            raise SynapseError(404, "Room not found", errcode=Codes.INVALID_PARAM)
+
+        complexity = yield store.get_room_complexity(room_id)
+        defer.returnValue((200, complexity))
+
+
 FEDERATION_SERVLET_CLASSES = (
     FederationSendServlet,
     FederationEventServlet,
@@ -1327,6 +1355,7 @@ FEDERATION_SERVLET_CLASSES = (
     FederationThirdPartyInviteExchangeServlet,
     On3pidBindServlet,
     FederationVersionServlet,
+    RoomComplexityServlet,
 )
 
 OPENID_SERVLET_CLASSES = (
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 6003ad9cca..eb525070cf 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -122,6 +122,9 @@ class EventStreamHandler(BaseHandler):
 
             chunks = yield self._event_serializer.serialize_events(
                 events, time_now, as_client_event=as_client_event,
+                # We don't bundle "live" events, as otherwise clients
+                # will end up double counting annotations.
+                bundle_aggregations=False,
             )
 
             chunk = {
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2202ed699a..cf4fad7de0 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2013,15 +2013,44 @@ class FederationHandler(BaseHandler):
 
         Args:
             origin (str):
-            event (synapse.events.FrozenEvent):
+            event (synapse.events.EventBase):
             context (synapse.events.snapshot.EventContext):
-            auth_events (dict[(str, str)->str]):
+            auth_events (dict[(str, str)->synapse.events.EventBase]):
+                Map from (event_type, state_key) to event
+
+                What we expect the event's auth_events to be, based on the event's
+                position in the dag. I think? maybe??
+
+                Also NB that this function adds entries to it.
+        Returns:
+            defer.Deferred[None]
+        """
+        room_version = yield self.store.get_room_version(event.room_id)
+
+        yield self._update_auth_events_and_context_for_auth(
+            origin, event, context, auth_events
+        )
+        try:
+            self.auth.check(room_version, event, auth_events=auth_events)
+        except AuthError as e:
+            logger.warn("Failed auth resolution for %r because %s", event, e)
+            raise e
+
+    @defer.inlineCallbacks
+    def _update_auth_events_and_context_for_auth(
+        self, origin, event, context, auth_events
+    ):
+        """Helper for do_auth. See there for docs.
+
+        Args:
+            origin (str):
+            event (synapse.events.EventBase):
+            context (synapse.events.snapshot.EventContext):
+            auth_events (dict[(str, str)->synapse.events.EventBase]):
 
         Returns:
             defer.Deferred[None]
         """
-        # Check if we have all the auth events.
-        current_state = set(e.event_id for e in auth_events.values())
         event_auth_events = set(event.auth_event_ids())
 
         if event.is_state():
@@ -2029,11 +2058,21 @@ class FederationHandler(BaseHandler):
         else:
             event_key = None
 
-        if event_auth_events - current_state:
+        # if the event's auth_events refers to events which are not in our
+        # calculated auth_events, we need to fetch those events from somewhere.
+        #
+        # we start by fetching them from the store, and then try calling /event_auth/.
+        missing_auth = event_auth_events.difference(
+            e.event_id for e in auth_events.values()
+        )
+
+        if missing_auth:
             # TODO: can we use store.have_seen_events here instead?
             have_events = yield self.store.get_seen_events_with_rejections(
-                event_auth_events - current_state
+                missing_auth
             )
+            logger.debug("Got events %s from store", have_events)
+            missing_auth.difference_update(have_events.keys())
         else:
             have_events = {}
 
@@ -2042,13 +2081,12 @@ class FederationHandler(BaseHandler):
             for e in auth_events.values()
         })
 
-        seen_events = set(have_events.keys())
-
-        missing_auth = event_auth_events - seen_events - current_state
-
         if missing_auth:
-            logger.info("Missing auth: %s", missing_auth)
             # If we don't have all the auth events, we need to get them.
+            logger.info(
+                "auth_events contains unknown events: %s",
+                missing_auth,
+            )
             try:
                 remote_auth_chain = yield self.federation_client.get_event_auth(
                     origin, event.room_id, event.event_id
@@ -2089,145 +2127,168 @@ class FederationHandler(BaseHandler):
                 have_events = yield self.store.get_seen_events_with_rejections(
                     event.auth_event_ids()
                 )
-                seen_events = set(have_events.keys())
             except Exception:
                 # FIXME:
                 logger.exception("Failed to get auth chain")
 
+        if event.internal_metadata.is_outlier():
+            logger.info("Skipping auth_event fetch for outlier")
+            return
+
         # FIXME: Assumes we have and stored all the state for all the
         # prev_events
-        current_state = set(e.event_id for e in auth_events.values())
-        different_auth = event_auth_events - current_state
+        different_auth = event_auth_events.difference(
+            e.event_id for e in auth_events.values()
+        )
 
-        room_version = yield self.store.get_room_version(event.room_id)
+        if not different_auth:
+            return
 
-        if different_auth and not event.internal_metadata.is_outlier():
-            # Do auth conflict res.
-            logger.info("Different auth: %s", different_auth)
-
-            different_events = yield logcontext.make_deferred_yieldable(
-                defer.gatherResults([
-                    logcontext.run_in_background(
-                        self.store.get_event,
-                        d,
-                        allow_none=True,
-                        allow_rejected=False,
-                    )
-                    for d in different_auth
-                    if d in have_events and not have_events[d]
-                ], consumeErrors=True)
-            ).addErrback(unwrapFirstError)
-
-            if different_events:
-                local_view = dict(auth_events)
-                remote_view = dict(auth_events)
-                remote_view.update({
-                    (d.type, d.state_key): d for d in different_events if d
-                })
+        logger.info(
+            "auth_events refers to events which are not in our calculated auth "
+            "chain: %s",
+            different_auth,
+        )
+
+        room_version = yield self.store.get_room_version(event.room_id)
 
-                new_state = yield self.state_handler.resolve_events(
-                    room_version,
-                    [list(local_view.values()), list(remote_view.values())],
-                    event
+        different_events = yield logcontext.make_deferred_yieldable(
+            defer.gatherResults([
+                logcontext.run_in_background(
+                    self.store.get_event,
+                    d,
+                    allow_none=True,
+                    allow_rejected=False,
                 )
+                for d in different_auth
+                if d in have_events and not have_events[d]
+            ], consumeErrors=True)
+        ).addErrback(unwrapFirstError)
+
+        if different_events:
+            local_view = dict(auth_events)
+            remote_view = dict(auth_events)
+            remote_view.update({
+                (d.type, d.state_key): d for d in different_events if d
+            })
 
-                auth_events.update(new_state)
+            new_state = yield self.state_handler.resolve_events(
+                room_version,
+                [list(local_view.values()), list(remote_view.values())],
+                event
+            )
 
-                current_state = set(e.event_id for e in auth_events.values())
-                different_auth = event_auth_events - current_state
+            logger.info(
+                "After state res: updating auth_events with new state %s",
+                {
+                    (d.type, d.state_key): d.event_id for d in new_state.values()
+                    if auth_events.get((d.type, d.state_key)) != d
+                },
+            )
 
-                yield self._update_context_for_auth_events(
-                    event, context, auth_events, event_key,
-                )
+            auth_events.update(new_state)
+
+            different_auth = event_auth_events.difference(
+                e.event_id for e in auth_events.values()
+            )
 
-        if different_auth and not event.internal_metadata.is_outlier():
-            logger.info("Different auth after resolution: %s", different_auth)
+            yield self._update_context_for_auth_events(
+                event, context, auth_events, event_key,
+            )
 
-            # Only do auth resolution if we have something new to say.
-            # We can't rove an auth failure.
-            do_resolution = False
+        if not different_auth:
+            # we're done
+            return
 
-            provable = [
-                RejectedReason.NOT_ANCESTOR, RejectedReason.NOT_ANCESTOR,
-            ]
+        logger.info(
+            "auth_events still refers to events which are not in the calculated auth "
+            "chain after state resolution: %s",
+            different_auth,
+        )
 
-            for e_id in different_auth:
-                if e_id in have_events:
-                    if have_events[e_id] in provable:
-                        do_resolution = True
-                        break
+        # Only do auth resolution if we have something new to say.
+        # We can't prove an auth failure.
+        do_resolution = False
 
-            if do_resolution:
-                prev_state_ids = yield context.get_prev_state_ids(self.store)
-                # 1. Get what we think is the auth chain.
-                auth_ids = yield self.auth.compute_auth_events(
-                    event, prev_state_ids
-                )
-                local_auth_chain = yield self.store.get_auth_chain(
-                    auth_ids, include_given=True
-                )
+        for e_id in different_auth:
+            if e_id in have_events:
+                if have_events[e_id] == RejectedReason.NOT_ANCESTOR:
+                    do_resolution = True
+                    break
 
-                try:
-                    # 2. Get remote difference.
-                    result = yield self.federation_client.query_auth(
-                        origin,
-                        event.room_id,
-                        event.event_id,
-                        local_auth_chain,
-                    )
+        if not do_resolution:
+            logger.info(
+                "Skipping auth resolution due to lack of provable rejection reasons"
+            )
+            return
 
-                    seen_remotes = yield self.store.have_seen_events(
-                        [e.event_id for e in result["auth_chain"]]
-                    )
+        logger.info("Doing auth resolution")
 
-                    # 3. Process any remote auth chain events we haven't seen.
-                    for ev in result["auth_chain"]:
-                        if ev.event_id in seen_remotes:
-                            continue
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
 
-                        if ev.event_id == event.event_id:
-                            continue
+        # 1. Get what we think is the auth chain.
+        auth_ids = yield self.auth.compute_auth_events(
+            event, prev_state_ids
+        )
+        local_auth_chain = yield self.store.get_auth_chain(
+            auth_ids, include_given=True
+        )
 
-                        try:
-                            auth_ids = ev.auth_event_ids()
-                            auth = {
-                                (e.type, e.state_key): e
-                                for e in result["auth_chain"]
-                                if e.event_id in auth_ids
-                                or event.type == EventTypes.Create
-                            }
-                            ev.internal_metadata.outlier = True
+        try:
+            # 2. Get remote difference.
+            result = yield self.federation_client.query_auth(
+                origin,
+                event.room_id,
+                event.event_id,
+                local_auth_chain,
+            )
 
-                            logger.debug(
-                                "do_auth %s different_auth: %s",
-                                event.event_id, e.event_id
-                            )
+            seen_remotes = yield self.store.have_seen_events(
+                [e.event_id for e in result["auth_chain"]]
+            )
 
-                            yield self._handle_new_event(
-                                origin, ev, auth_events=auth
-                            )
+            # 3. Process any remote auth chain events we haven't seen.
+            for ev in result["auth_chain"]:
+                if ev.event_id in seen_remotes:
+                    continue
 
-                            if ev.event_id in event_auth_events:
-                                auth_events[(ev.type, ev.state_key)] = ev
-                        except AuthError:
-                            pass
+                if ev.event_id == event.event_id:
+                    continue
 
-                except Exception:
-                    # FIXME:
-                    logger.exception("Failed to query auth chain")
+                try:
+                    auth_ids = ev.auth_event_ids()
+                    auth = {
+                        (e.type, e.state_key): e
+                        for e in result["auth_chain"]
+                        if e.event_id in auth_ids
+                        or event.type == EventTypes.Create
+                    }
+                    ev.internal_metadata.outlier = True
+
+                    logger.debug(
+                        "do_auth %s different_auth: %s",
+                        event.event_id, e.event_id
+                    )
 
-                # 4. Look at rejects and their proofs.
-                # TODO.
+                    yield self._handle_new_event(
+                        origin, ev, auth_events=auth
+                    )
 
-                yield self._update_context_for_auth_events(
-                    event, context, auth_events, event_key,
-                )
+                    if ev.event_id in event_auth_events:
+                        auth_events[(ev.type, ev.state_key)] = ev
+                except AuthError:
+                    pass
 
-        try:
-            self.auth.check(room_version, event, auth_events=auth_events)
-        except AuthError as e:
-            logger.warn("Failed auth resolution for %r because %s", event, e)
-            raise e
+        except Exception:
+            # FIXME:
+            logger.exception("Failed to query auth chain")
+
+        # 4. Look at rejects and their proofs.
+        # TODO.
+
+        yield self._update_context_for_auth_events(
+            event, context, auth_events, event_key,
+        )
 
     @defer.inlineCallbacks
     def _update_context_for_auth_events(self, event, context, auth_events,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 792edc7579..0b02469ceb 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -166,6 +166,9 @@ class MessageHandler(object):
         now = self.clock.time_msec()
         events = yield self._event_serializer.serialize_events(
             room_state.values(), now,
+            # We don't bother bundling aggregations in when asked for state
+            # events, as clients won't use them.
+            bundle_aggregations=False,
         )
         defer.returnValue(events)
 
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 59d53f1050..6209858bbb 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -182,17 +182,27 @@ class PresenceHandler(object):
         # Start a LoopingCall in 30s that fires every 5s.
         # The initial delay is to allow disconnected clients a chance to
         # reconnect before we treat them as offline.
+        def run_timeout_handler():
+            return run_as_background_process(
+                "handle_presence_timeouts", self._handle_timeouts
+            )
+
         self.clock.call_later(
             30,
             self.clock.looping_call,
-            self._handle_timeouts,
+            run_timeout_handler,
             5000,
         )
 
+        def run_persister():
+            return run_as_background_process(
+                "persist_presence_changes", self._persist_unpersisted_changes
+            )
+
         self.clock.call_later(
             60,
             self.clock.looping_call,
-            self._persist_unpersisted_changes,
+            run_persister,
             60 * 1000,
         )
 
@@ -229,6 +239,7 @@ class PresenceHandler(object):
         )
 
         if self.unpersisted_users_changes:
+
             yield self.store.update_presence([
                 self.user_to_current_state[user_id]
                 for user_id in self.unpersisted_users_changes
@@ -240,30 +251,18 @@ class PresenceHandler(object):
         """We periodically persist the unpersisted changes, as otherwise they
         may stack up and slow down shutdown times.
         """
-        logger.info(
-            "Performing _persist_unpersisted_changes. Persisting %d unpersisted changes",
-            len(self.unpersisted_users_changes)
-        )
-
         unpersisted = self.unpersisted_users_changes
         self.unpersisted_users_changes = set()
 
         if unpersisted:
+            logger.info(
+                "Persisting %d upersisted presence updates", len(unpersisted)
+            )
             yield self.store.update_presence([
                 self.user_to_current_state[user_id]
                 for user_id in unpersisted
             ])
 
-        logger.info("Finished _persist_unpersisted_changes")
-
-    @defer.inlineCallbacks
-    def _update_states_and_catch_exception(self, new_states):
-        try:
-            res = yield self._update_states(new_states)
-            defer.returnValue(res)
-        except Exception:
-            logger.exception("Error updating presence")
-
     @defer.inlineCallbacks
     def _update_states(self, new_states):
         """Updates presence of users. Sets the appropriate timeouts. Pokes
@@ -338,45 +337,41 @@ class PresenceHandler(object):
         logger.info("Handling presence timeouts")
         now = self.clock.time_msec()
 
-        try:
-            with Measure(self.clock, "presence_handle_timeouts"):
-                # Fetch the list of users that *may* have timed out. Things may have
-                # changed since the timeout was set, so we won't necessarily have to
-                # take any action.
-                users_to_check = set(self.wheel_timer.fetch(now))
-
-                # Check whether the lists of syncing processes from an external
-                # process have expired.
-                expired_process_ids = [
-                    process_id for process_id, last_update
-                    in self.external_process_last_updated_ms.items()
-                    if now - last_update > EXTERNAL_PROCESS_EXPIRY
-                ]
-                for process_id in expired_process_ids:
-                    users_to_check.update(
-                        self.external_process_last_updated_ms.pop(process_id, ())
-                    )
-                    self.external_process_last_update.pop(process_id)
+        # Fetch the list of users that *may* have timed out. Things may have
+        # changed since the timeout was set, so we won't necessarily have to
+        # take any action.
+        users_to_check = set(self.wheel_timer.fetch(now))
+
+        # Check whether the lists of syncing processes from an external
+        # process have expired.
+        expired_process_ids = [
+            process_id for process_id, last_update
+            in self.external_process_last_updated_ms.items()
+            if now - last_update > EXTERNAL_PROCESS_EXPIRY
+        ]
+        for process_id in expired_process_ids:
+            users_to_check.update(
+                self.external_process_last_updated_ms.pop(process_id, ())
+            )
+            self.external_process_last_update.pop(process_id)
 
-                states = [
-                    self.user_to_current_state.get(
-                        user_id, UserPresenceState.default(user_id)
-                    )
-                    for user_id in users_to_check
-                ]
+        states = [
+            self.user_to_current_state.get(
+                user_id, UserPresenceState.default(user_id)
+            )
+            for user_id in users_to_check
+        ]
 
-                timers_fired_counter.inc(len(states))
+        timers_fired_counter.inc(len(states))
 
-                changes = handle_timeouts(
-                    states,
-                    is_mine_fn=self.is_mine_id,
-                    syncing_user_ids=self.get_currently_syncing_users(),
-                    now=now,
-                )
+        changes = handle_timeouts(
+            states,
+            is_mine_fn=self.is_mine_id,
+            syncing_user_ids=self.get_currently_syncing_users(),
+            now=now,
+        )
 
-            run_in_background(self._update_states_and_catch_exception, changes)
-        except Exception:
-            logger.exception("Exception in _handle_timeouts loop")
+        return self._update_states(changes)
 
     @defer.inlineCallbacks
     def bump_presence_active_time(self, user):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index e37ae96899..4a17911a87 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -27,7 +27,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
 from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
-from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.storage.state import StateFilter
 from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
 from synapse.util import stringutils
@@ -70,6 +70,7 @@ class RoomCreationHandler(BaseHandler):
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
         self.room_member_handler = hs.get_room_member_handler()
+        self.config = hs.config
 
         # linearizer to stop two upgrades happening at once
         self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
@@ -475,7 +476,11 @@ class RoomCreationHandler(BaseHandler):
         if ratelimit:
             yield self.ratelimit(requester)
 
-        room_version = config.get("room_version", DEFAULT_ROOM_VERSION.identifier)
+        room_version = config.get(
+            "room_version",
+            self.config.default_room_version.identifier,
+        )
+
         if not isinstance(room_version, string_types):
             raise SynapseError(
                 400,
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
new file mode 100644
index 0000000000..0e92b405ba
--- /dev/null
+++ b/synapse/handlers/stats.py
@@ -0,0 +1,325 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.handlers.state_deltas import StateDeltasHandler
+from synapse.metrics import event_processing_positions
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import UserID
+from synapse.util.metrics import Measure
+
+logger = logging.getLogger(__name__)
+
+
+class StatsHandler(StateDeltasHandler):
+    """Handles keeping the *_stats tables updated with a simple time-series of
+    information about the users, rooms and media on the server, such that admins
+    have some idea of who is consuming their resources.
+
+    Heavily derived from UserDirectoryHandler
+    """
+
+    def __init__(self, hs):
+        super(StatsHandler, self).__init__(hs)
+        self.hs = hs
+        self.store = hs.get_datastore()
+        self.state = hs.get_state_handler()
+        self.server_name = hs.hostname
+        self.clock = hs.get_clock()
+        self.notifier = hs.get_notifier()
+        self.is_mine_id = hs.is_mine_id
+        self.stats_bucket_size = hs.config.stats_bucket_size
+
+        # The current position in the current_state_delta stream
+        self.pos = None
+
+        # Guard to ensure we only process deltas one at a time
+        self._is_processing = False
+
+        if hs.config.stats_enabled:
+            self.notifier.add_replication_callback(self.notify_new_event)
+
+            # We kick this off so that we don't have to wait for a change before
+            # we start populating stats
+            self.clock.call_later(0, self.notify_new_event)
+
+    def notify_new_event(self):
+        """Called when there may be more deltas to process
+        """
+        if not self.hs.config.stats_enabled:
+            return
+
+        if self._is_processing:
+            return
+
+        @defer.inlineCallbacks
+        def process():
+            try:
+                yield self._unsafe_process()
+            finally:
+                self._is_processing = False
+
+        self._is_processing = True
+        run_as_background_process("stats.notify_new_event", process)
+
+    @defer.inlineCallbacks
+    def _unsafe_process(self):
+        # If self.pos is None then means we haven't fetched it from DB
+        if self.pos is None:
+            self.pos = yield self.store.get_stats_stream_pos()
+
+        # If still None then the initial background update hasn't happened yet
+        if self.pos is None:
+            defer.returnValue(None)
+
+        # Loop round handling deltas until we're up to date
+        while True:
+            with Measure(self.clock, "stats_delta"):
+                deltas = yield self.store.get_current_state_deltas(self.pos)
+                if not deltas:
+                    return
+
+                logger.info("Handling %d state deltas", len(deltas))
+                yield self._handle_deltas(deltas)
+
+                self.pos = deltas[-1]["stream_id"]
+                yield self.store.update_stats_stream_pos(self.pos)
+
+                event_processing_positions.labels("stats").set(self.pos)
+
+    @defer.inlineCallbacks
+    def _handle_deltas(self, deltas):
+        """
+        Called with the state deltas to process
+        """
+        for delta in deltas:
+            typ = delta["type"]
+            state_key = delta["state_key"]
+            room_id = delta["room_id"]
+            event_id = delta["event_id"]
+            stream_id = delta["stream_id"]
+            prev_event_id = delta["prev_event_id"]
+
+            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+
+            token = yield self.store.get_earliest_token_for_room_stats(room_id)
+
+            # If the earliest token to begin from is larger than our current
+            # stream ID, skip processing this delta.
+            if token is not None and token >= stream_id:
+                logger.debug(
+                    "Ignoring: %s as earlier than this room's initial ingestion event",
+                    event_id,
+                )
+                continue
+
+            if event_id is None and prev_event_id is None:
+                # Errr...
+                continue
+
+            event_content = {}
+
+            if event_id is not None:
+                event_content = (yield self.store.get_event(event_id)).content or {}
+
+            # quantise time to the nearest bucket
+            now = yield self.store.get_received_ts(event_id)
+            now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
+
+            if typ == EventTypes.Member:
+                # we could use _get_key_change here but it's a bit inefficient
+                # given we're not testing for a specific result; might as well
+                # just grab the prev_membership and membership strings and
+                # compare them.
+                prev_event_content = {}
+                if prev_event_id is not None:
+                    prev_event_content = (
+                        yield self.store.get_event(prev_event_id)
+                    ).content
+
+                membership = event_content.get("membership", Membership.LEAVE)
+                prev_membership = prev_event_content.get("membership", Membership.LEAVE)
+
+                if prev_membership == membership:
+                    continue
+
+                if prev_membership == Membership.JOIN:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "joined_members", -1
+                    )
+                elif prev_membership == Membership.INVITE:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "invited_members", -1
+                    )
+                elif prev_membership == Membership.LEAVE:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "left_members", -1
+                    )
+                elif prev_membership == Membership.BAN:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "banned_members", -1
+                    )
+                else:
+                    err = "%s is not a valid prev_membership" % (repr(prev_membership),)
+                    logger.error(err)
+                    raise ValueError(err)
+
+                if membership == Membership.JOIN:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "joined_members", +1
+                    )
+                elif membership == Membership.INVITE:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "invited_members", +1
+                    )
+                elif membership == Membership.LEAVE:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "left_members", +1
+                    )
+                elif membership == Membership.BAN:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "banned_members", +1
+                    )
+                else:
+                    err = "%s is not a valid membership" % (repr(membership),)
+                    logger.error(err)
+                    raise ValueError(err)
+
+                user_id = state_key
+                if self.is_mine_id(user_id):
+                    # update user_stats as it's one of our users
+                    public = yield self._is_public_room(room_id)
+
+                    if membership == Membership.LEAVE:
+                        yield self.store.update_stats_delta(
+                            now,
+                            "user",
+                            user_id,
+                            "public_rooms" if public else "private_rooms",
+                            -1,
+                        )
+                    elif membership == Membership.JOIN:
+                        yield self.store.update_stats_delta(
+                            now,
+                            "user",
+                            user_id,
+                            "public_rooms" if public else "private_rooms",
+                            +1,
+                        )
+
+            elif typ == EventTypes.Create:
+                # Newly created room. Add it with all blank portions.
+                yield self.store.update_room_state(
+                    room_id,
+                    {
+                        "join_rules": None,
+                        "history_visibility": None,
+                        "encryption": None,
+                        "name": None,
+                        "topic": None,
+                        "avatar": None,
+                        "canonical_alias": None,
+                    },
+                )
+
+            elif typ == EventTypes.JoinRules:
+                yield self.store.update_room_state(
+                    room_id, {"join_rules": event_content.get("join_rule")}
+                )
+
+                is_public = yield self._get_key_change(
+                    prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
+                )
+                if is_public is not None:
+                    yield self.update_public_room_stats(now, room_id, is_public)
+
+            elif typ == EventTypes.RoomHistoryVisibility:
+                yield self.store.update_room_state(
+                    room_id,
+                    {"history_visibility": event_content.get("history_visibility")},
+                )
+
+                is_public = yield self._get_key_change(
+                    prev_event_id, event_id, "history_visibility", "world_readable"
+                )
+                if is_public is not None:
+                    yield self.update_public_room_stats(now, room_id, is_public)
+
+            elif typ == EventTypes.Encryption:
+                yield self.store.update_room_state(
+                    room_id, {"encryption": event_content.get("algorithm")}
+                )
+            elif typ == EventTypes.Name:
+                yield self.store.update_room_state(
+                    room_id, {"name": event_content.get("name")}
+                )
+            elif typ == EventTypes.Topic:
+                yield self.store.update_room_state(
+                    room_id, {"topic": event_content.get("topic")}
+                )
+            elif typ == EventTypes.RoomAvatar:
+                yield self.store.update_room_state(
+                    room_id, {"avatar": event_content.get("url")}
+                )
+            elif typ == EventTypes.CanonicalAlias:
+                yield self.store.update_room_state(
+                    room_id, {"canonical_alias": event_content.get("alias")}
+                )
+
+    @defer.inlineCallbacks
+    def update_public_room_stats(self, ts, room_id, is_public):
+        """
+        Increment/decrement a user's number of public rooms when a room they are
+        in changes to/from public visibility.
+
+        Args:
+            ts (int): Timestamp in seconds
+            room_id (str)
+            is_public (bool)
+        """
+        # For now, blindly iterate over all local users in the room so that
+        # we can handle the whole problem of copying buckets over as needed
+        user_ids = yield self.store.get_users_in_room(room_id)
+
+        for user_id in user_ids:
+            if self.hs.is_mine(UserID.from_string(user_id)):
+                yield self.store.update_stats_delta(
+                    ts, "user", user_id, "public_rooms", +1 if is_public else -1
+                )
+                yield self.store.update_stats_delta(
+                    ts, "user", user_id, "private_rooms", -1 if is_public else +1
+                )
+
+    @defer.inlineCallbacks
+    def _is_public_room(self, room_id):
+        join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
+        history_visibility = yield self.state.get_current_state(
+            room_id, EventTypes.RoomHistoryVisibility
+        )
+
+        if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
+            (
+                history_visibility
+                and history_visibility.content.get("history_visibility")
+                == "world_readable"
+            )
+        ):
+            defer.returnValue(True)
+        else:
+            defer.returnValue(False)
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 7eefc7b1fc..8197619a78 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -711,10 +711,6 @@ class MatrixFederationHttpClient(object):
             RequestSendFailed: If there were problems connecting to the
                 remote, due to e.g. DNS failures, connection timeouts etc.
         """
-        logger.debug("get_json args: %s", args)
-
-        logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
-
         request = MatrixFederationRequest(
             method="GET",
             destination=destination,
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 528125e737..197c652850 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -55,7 +55,7 @@ def parse_integer_from_args(args, name, default=None, required=False):
             return int(args[name][0])
         except Exception:
             message = "Query parameter %r must be an integer" % (name,)
-            raise SynapseError(400, message)
+            raise SynapseError(400, message, errcode=Codes.INVALID_PARAM)
     else:
         if required:
             message = "Missing integer query parameter %r" % (name,)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index e3f828c4bb..f64baa4d58 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -74,14 +74,6 @@ REQUIREMENTS = [
     "attrs>=17.4.0",
 
     "netaddr>=0.7.18",
-
-    # requests is a transitive dep of treq, and urlib3 is a transitive dep
-    # of requests, as well as of sentry-sdk.
-    #
-    # As of requests 2.21, requests does not yet support urllib3 1.25.
-    # (If we do not pin it here, pip will give us the latest urllib3
-    # due to the dep via sentry-sdk.)
-    "urllib3<1.25",
 ]
 
 CONDITIONAL_REQUIREMENTS = {
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 744d85594f..d6c4dcdb18 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -822,10 +822,16 @@ class AdminRestResource(JsonResource):
 
     def __init__(self, hs):
         JsonResource.__init__(self, hs, canonical_json=False)
+        register_servlets(hs, self)
 
-        register_servlets_for_client_rest_resource(hs, self)
-        SendServerNoticeServlet(hs).register(self)
-        VersionServlet(hs).register(self)
+
+def register_servlets(hs, http_server):
+    """
+    Register all the admin servlets.
+    """
+    register_servlets_for_client_rest_resource(hs, http_server)
+    SendServerNoticeServlet(hs).register(http_server)
+    VersionServlet(hs).register(http_server)
 
 
 def register_servlets_for_client_rest_resource(hs, http_server):
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 5180e9eaf1..029039c162 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -386,7 +386,7 @@ class CasRedirectServlet(RestServlet):
             b"redirectUrl": args[b"redirectUrl"][0]
         }).encode('ascii')
         hs_redirect_url = (self.cas_service_url +
-                           b"/_matrix/client/api/v1/login/cas/ticket")
+                           b"/_matrix/client/r0/login/cas/ticket")
         service_param = urllib.parse.urlencode({
             b"service": b"%s?%s" % (hs_redirect_url, client_redirect_url_param)
         }).encode('ascii')
@@ -395,7 +395,7 @@ class CasRedirectServlet(RestServlet):
 
 
 class CasTicketServlet(ClientV1RestServlet):
-    PATTERNS = client_path_patterns("/login/cas/ticket", releases=())
+    PATTERNS = client_path_patterns("/login/cas/ticket")
 
     def __init__(self, hs):
         super(CasTicketServlet, self).__init__(hs)
diff --git a/synapse/rest/client/v1/logout.py b/synapse/rest/client/v1/logout.py
index 430c692336..ba20e75033 100644
--- a/synapse/rest/client/v1/logout.py
+++ b/synapse/rest/client/v1/logout.py
@@ -17,8 +17,6 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.api.errors import AuthError
-
 from .base import ClientV1RestServlet, client_path_patterns
 
 logger = logging.getLogger(__name__)
@@ -38,23 +36,16 @@ class LogoutRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def on_POST(self, request):
-        try:
-            requester = yield self.auth.get_user_by_req(request)
-        except AuthError:
-            # this implies the access token has already been deleted.
-            defer.returnValue((401, {
-                "errcode": "M_UNKNOWN_TOKEN",
-                "error": "Access Token unknown or expired"
-            }))
+        requester = yield self.auth.get_user_by_req(request)
+
+        if requester.device_id is None:
+            # the acccess token wasn't associated with a device.
+            # Just delete the access token
+            access_token = self._auth.get_access_token_from_request(request)
+            yield self._auth_handler.delete_access_token(access_token)
         else:
-            if requester.device_id is None:
-                # the acccess token wasn't associated with a device.
-                # Just delete the access token
-                access_token = self._auth.get_access_token_from_request(request)
-                yield self._auth_handler.delete_access_token(access_token)
-            else:
-                yield self._device_handler.delete_device(
-                    requester.user.to_string(), requester.device_id)
+            yield self._device_handler.delete_device(
+                requester.user.to_string(), requester.device_id)
 
         defer.returnValue((200, {}))
 
diff --git a/synapse/rest/client/v2_alpha/capabilities.py b/synapse/rest/client/v2_alpha/capabilities.py
index a868d06098..2b4892330c 100644
--- a/synapse/rest/client/v2_alpha/capabilities.py
+++ b/synapse/rest/client/v2_alpha/capabilities.py
@@ -16,7 +16,7 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.http.servlet import RestServlet
 
 from ._base import client_v2_patterns
@@ -36,6 +36,7 @@ class CapabilitiesRestServlet(RestServlet):
         """
         super(CapabilitiesRestServlet, self).__init__()
         self.hs = hs
+        self.config = hs.config
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
 
@@ -48,7 +49,7 @@ class CapabilitiesRestServlet(RestServlet):
         response = {
             "capabilities": {
                 "m.room_versions": {
-                    "default": DEFAULT_ROOM_VERSION.identifier,
+                    "default": self.config.default_room_version.identifier,
                     "available": {
                         v.identifier: v.disposition
                         for v in KNOWN_ROOM_VERSIONS.values()
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index c701e534e7..d3025025e3 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -358,6 +358,9 @@ class SyncRestServlet(RestServlet):
         def serialize(events):
             return self._event_serializer.serialize_events(
                 events, time_now=time_now,
+                # We don't bundle "live" events, as otherwise clients
+                # will end up double counting annotations.
+                bundle_aggregations=False,
                 token_id=token_id,
                 event_format=event_formatter,
                 only_event_fields=only_fields,
diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py
index eb8782aa6e..21c3c807b9 100644
--- a/synapse/rest/key/v2/remote_key_resource.py
+++ b/synapse/rest/key/v2/remote_key_resource.py
@@ -20,7 +20,7 @@ from twisted.web.resource import Resource
 from twisted.web.server import NOT_DONE_YET
 
 from synapse.api.errors import Codes, SynapseError
-from synapse.crypto.keyring import KeyLookupError
+from synapse.crypto.keyring import KeyLookupError, ServerKeyFetcher
 from synapse.http.server import respond_with_json_bytes, wrap_json_request_handler
 from synapse.http.servlet import parse_integer, parse_json_object_from_request
 
@@ -89,7 +89,7 @@ class RemoteKey(Resource):
     isLeaf = True
 
     def __init__(self, hs):
-        self.keyring = hs.get_keyring()
+        self.fetcher = ServerKeyFetcher(hs)
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
         self.federation_domain_whitelist = hs.config.federation_domain_whitelist
@@ -217,7 +217,7 @@ class RemoteKey(Resource):
         if cache_misses and query_remote_on_cache_miss:
             for server_name, key_ids in cache_misses.items():
                 try:
-                    yield self.keyring.get_server_verify_key_v2_direct(
+                    yield self.fetcher.get_server_verify_key_v2_direct(
                         server_name, key_ids
                     )
                 except KeyLookupError as e:
diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py
index 5305e9175f..35a750923b 100644
--- a/synapse/rest/media/v1/thumbnail_resource.py
+++ b/synapse/rest/media/v1/thumbnail_resource.py
@@ -56,8 +56,8 @@ class ThumbnailResource(Resource):
     def _async_render_GET(self, request):
         set_cors_headers(request)
         server_name, media_id, _ = parse_media_id(request)
-        width = parse_integer(request, "width")
-        height = parse_integer(request, "height")
+        width = parse_integer(request, "width", required=True)
+        height = parse_integer(request, "height", required=True)
         method = parse_string(request, "method", "scale")
         m_type = parse_string(request, "type", "image/png")
 
diff --git a/synapse/server.py b/synapse/server.py
index 80d40b9272..9229a68a8d 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -72,6 +72,7 @@ from synapse.handlers.room_list import RoomListHandler
 from synapse.handlers.room_member import RoomMemberMasterHandler
 from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
 from synapse.handlers.set_password import SetPasswordHandler
+from synapse.handlers.stats import StatsHandler
 from synapse.handlers.sync import SyncHandler
 from synapse.handlers.typing import TypingHandler
 from synapse.handlers.user_directory import UserDirectoryHandler
@@ -139,6 +140,7 @@ class HomeServer(object):
         'acme_handler',
         'auth_handler',
         'device_handler',
+        'stats_handler',
         'e2e_keys_handler',
         'e2e_room_keys_handler',
         'event_handler',
@@ -191,6 +193,7 @@ class HomeServer(object):
 
     REQUIRED_ON_MASTER_STARTUP = [
         "user_directory_handler",
+        "stats_handler"
     ]
 
     # This is overridden in derived application classes
@@ -474,6 +477,9 @@ class HomeServer(object):
     def build_secrets(self):
         return Secrets()
 
+    def build_stats_handler(self):
+        return StatsHandler(self)
+
     def build_spam_checker(self):
         return SpamChecker(self)
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 56c434d4e8..71316f7d09 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -56,6 +56,7 @@ from .roommember import RoomMemberStore
 from .search import SearchStore
 from .signatures import SignatureStore
 from .state import StateStore
+from .stats import StatsStore
 from .stream import StreamStore
 from .tags import TagsStore
 from .transactions import TransactionStore
@@ -102,6 +103,7 @@ class DataStore(
     GroupServerStore,
     UserErasureStore,
     MonthlyActiveUsersStore,
+    StatsStore,
     RelationsStore,
 ):
     def __init__(self, db_conn, hs):
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index adc6cf26b5..b56c83e460 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import division
+
 import itertools
 import logging
 from collections import namedtuple
@@ -610,4 +612,74 @@ class EventsWorkerStore(SQLBaseStore):
 
             return res
 
-        return self.runInteraction("get_rejection_reasons", f)
+        return self.runInteraction("get_seen_events_with_rejections", f)
+
+    def _get_total_state_event_counts_txn(self, txn, room_id):
+        """
+        See get_total_state_event_counts.
+        """
+        sql = "SELECT COUNT(*) FROM state_events WHERE room_id=?"
+        txn.execute(sql, (room_id,))
+        row = txn.fetchone()
+        return row[0] if row else 0
+
+    def get_total_state_event_counts(self, room_id):
+        """
+        Gets the total number of state events in a room.
+
+        Args:
+            room_id (str)
+
+        Returns:
+            Deferred[int]
+        """
+        return self.runInteraction(
+            "get_total_state_event_counts",
+            self._get_total_state_event_counts_txn, room_id
+        )
+
+    def _get_current_state_event_counts_txn(self, txn, room_id):
+        """
+        See get_current_state_event_counts.
+        """
+        sql = "SELECT COUNT(*) FROM current_state_events WHERE room_id=?"
+        txn.execute(sql, (room_id,))
+        row = txn.fetchone()
+        return row[0] if row else 0
+
+    def get_current_state_event_counts(self, room_id):
+        """
+        Gets the current number of state events in a room.
+
+        Args:
+            room_id (str)
+
+        Returns:
+            Deferred[int]
+        """
+        return self.runInteraction(
+            "get_current_state_event_counts",
+            self._get_current_state_event_counts_txn, room_id
+        )
+
+    @defer.inlineCallbacks
+    def get_room_complexity(self, room_id):
+        """
+        Get a rough approximation of the complexity of the room. This is used by
+        remote servers to decide whether they wish to join the room or not.
+        Higher complexity value indicates that being in the room will consume
+        more resources.
+
+        Args:
+            room_id (str)
+
+        Returns:
+            Deferred[dict[str:int]] of complexity version to complexity.
+        """
+        state_events = yield self.get_current_state_event_counts(room_id)
+
+        # Call this one "v1", so we can introduce new ones as we want to develop
+        # it.
+        complexity_v1 = round(state_events / 500, 2)
+
+        defer.returnValue({"v1": complexity_v1})
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 7036541792..5300720dbb 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -19,6 +19,7 @@ import logging
 
 import six
 
+import attr
 from signedjson.key import decode_verify_key_bytes
 
 from synapse.util import batch_iter
@@ -36,6 +37,12 @@ else:
     db_binary_type = memoryview
 
 
+@attr.s(slots=True, frozen=True)
+class FetchKeyResult(object):
+    verify_key = attr.ib()  # VerifyKey: the key itself
+    valid_until_ts = attr.ib()  # int: how long we can use this key for
+
+
 class KeyStore(SQLBaseStore):
     """Persistence for signature verification keys
     """
@@ -54,8 +61,8 @@ class KeyStore(SQLBaseStore):
                 iterable of (server_name, key-id) tuples to fetch keys for
 
         Returns:
-            Deferred: resolves to dict[Tuple[str, str], VerifyKey|None]:
-                map from (server_name, key_id) -> VerifyKey, or None if the key is
+            Deferred: resolves to dict[Tuple[str, str], FetchKeyResult|None]:
+                map from (server_name, key_id) -> FetchKeyResult, or None if the key is
                 unknown
         """
         keys = {}
@@ -65,17 +72,19 @@ class KeyStore(SQLBaseStore):
 
             # batch_iter always returns tuples so it's safe to do len(batch)
             sql = (
-                "SELECT server_name, key_id, verify_key FROM server_signature_keys "
-                "WHERE 1=0"
+                "SELECT server_name, key_id, verify_key, ts_valid_until_ms "
+                "FROM server_signature_keys WHERE 1=0"
             ) + " OR (server_name=? AND key_id=?)" * len(batch)
 
             txn.execute(sql, tuple(itertools.chain.from_iterable(batch)))
 
             for row in txn:
-                server_name, key_id, key_bytes = row
-                keys[(server_name, key_id)] = decode_verify_key_bytes(
-                    key_id, bytes(key_bytes)
+                server_name, key_id, key_bytes, ts_valid_until_ms = row
+                res = FetchKeyResult(
+                    verify_key=decode_verify_key_bytes(key_id, bytes(key_bytes)),
+                    valid_until_ts=ts_valid_until_ms,
                 )
+                keys[(server_name, key_id)] = res
 
         def _txn(txn):
             for batch in batch_iter(server_name_and_key_ids, 50):
@@ -84,38 +93,53 @@ class KeyStore(SQLBaseStore):
 
         return self.runInteraction("get_server_verify_keys", _txn)
 
-    def store_server_verify_key(
-        self, server_name, from_server, time_now_ms, verify_key
-    ):
-        """Stores a NACL verification key for the given server.
+    def store_server_verify_keys(self, from_server, ts_added_ms, verify_keys):
+        """Stores NACL verification keys for remote servers.
         Args:
-            server_name (str): The name of the server.
-            from_server (str): Where the verification key was looked up
-            time_now_ms (int): The time now in milliseconds
-            verify_key (nacl.signing.VerifyKey): The NACL verify key.
+            from_server (str): Where the verification keys were looked up
+            ts_added_ms (int): The time to record that the key was added
+            verify_keys (iterable[tuple[str, str, FetchKeyResult]]):
+                keys to be stored. Each entry is a triplet of
+                (server_name, key_id, key).
         """
-        key_id = "%s:%s" % (verify_key.alg, verify_key.version)
-
-        # XXX fix this to not need a lock (#3819)
-        def _txn(txn):
-            self._simple_upsert_txn(
-                txn,
-                table="server_signature_keys",
-                keyvalues={"server_name": server_name, "key_id": key_id},
-                values={
-                    "from_server": from_server,
-                    "ts_added_ms": time_now_ms,
-                    "verify_key": db_binary_type(verify_key.encode()),
-                },
+        key_values = []
+        value_values = []
+        invalidations = []
+        for server_name, key_id, fetch_result in verify_keys:
+            key_values.append((server_name, key_id))
+            value_values.append(
+                (
+                    from_server,
+                    ts_added_ms,
+                    fetch_result.valid_until_ts,
+                    db_binary_type(fetch_result.verify_key.encode()),
+                )
             )
             # invalidate takes a tuple corresponding to the params of
             # _get_server_verify_key. _get_server_verify_key only takes one
             # param, which is itself the 2-tuple (server_name, key_id).
-            txn.call_after(
-                self._get_server_verify_key.invalidate, ((server_name, key_id),)
-            )
-
-        return self.runInteraction("store_server_verify_key", _txn)
+            invalidations.append((server_name, key_id))
+
+        def _invalidate(res):
+            f = self._get_server_verify_key.invalidate
+            for i in invalidations:
+                f((i, ))
+            return res
+
+        return self.runInteraction(
+            "store_server_verify_keys",
+            self._simple_upsert_many_txn,
+            table="server_signature_keys",
+            key_names=("server_name", "key_id"),
+            key_values=key_values,
+            value_names=(
+                "from_server",
+                "ts_added_ms",
+                "ts_valid_until_ms",
+                "verify_key",
+            ),
+            value_values=value_values,
+        ).addCallback(_invalidate)
 
     def store_server_keys_json(
         self, server_name, key_id, from_server, ts_now_ms, ts_expires_ms, key_json_bytes
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 57df17bcc2..4bd1669458 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -142,6 +142,38 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         return self.runInteraction("get_room_summary", _get_room_summary_txn)
 
+    def _get_user_count_in_room_txn(self, txn, room_id, membership):
+        """
+        See get_user_count_in_room.
+        """
+        sql = (
+            "SELECT count(*) FROM room_memberships as m"
+            " INNER JOIN current_state_events as c"
+            " ON m.event_id = c.event_id "
+            " AND m.room_id = c.room_id "
+            " AND m.user_id = c.state_key"
+            " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
+        )
+
+        txn.execute(sql, (room_id, membership))
+        row = txn.fetchone()
+        return row[0]
+
+    def get_user_count_in_room(self, room_id, membership):
+        """
+        Get the user count in a room with a particular membership.
+
+        Args:
+            room_id (str)
+            membership (Membership)
+
+        Returns:
+            Deferred[int]
+        """
+        return self.runInteraction(
+            "get_users_in_room", self._get_user_count_in_room_txn, room_id, membership
+        )
+
     @cached()
     def get_invited_rooms_for_user(self, user_id):
         """ Get all the rooms the user is invited to
diff --git a/synapse/storage/schema/delta/54/account_validity.sql b/synapse/storage/schema/delta/54/account_validity_with_renewal.sql
index 2357626000..0adb2ad55e 100644
--- a/synapse/storage/schema/delta/54/account_validity.sql
+++ b/synapse/storage/schema/delta/54/account_validity_with_renewal.sql
@@ -13,6 +13,9 @@
  * limitations under the License.
  */
 
+-- We previously changed the schema for this table without renaming the file, which means
+-- that some databases might still be using the old schema. This ensures Synapse uses the
+-- right schema for the table.
 DROP TABLE IF EXISTS account_validity;
 
 -- Track what users are in public rooms.
diff --git a/synapse/storage/schema/delta/54/add_validity_to_server_keys.sql b/synapse/storage/schema/delta/54/add_validity_to_server_keys.sql
new file mode 100644
index 0000000000..c01aa9d2d9
--- /dev/null
+++ b/synapse/storage/schema/delta/54/add_validity_to_server_keys.sql
@@ -0,0 +1,23 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* When we can use this key until, before we have to refresh it. */
+ALTER TABLE server_signature_keys ADD COLUMN ts_valid_until_ms BIGINT;
+
+UPDATE server_signature_keys SET ts_valid_until_ms = (
+    SELECT MAX(ts_valid_until_ms) FROM server_keys_json skj WHERE
+        skj.server_name = server_signature_keys.server_name AND
+        skj.key_id = server_signature_keys.key_id
+);
diff --git a/synapse/storage/schema/delta/54/stats.sql b/synapse/storage/schema/delta/54/stats.sql
new file mode 100644
index 0000000000..652e58308e
--- /dev/null
+++ b/synapse/storage/schema/delta/54/stats.sql
@@ -0,0 +1,80 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE stats_stream_pos (
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
+    stream_id BIGINT,
+    CHECK (Lock='X')
+);
+
+INSERT INTO stats_stream_pos (stream_id) VALUES (null);
+
+CREATE TABLE user_stats (
+    user_id TEXT NOT NULL,
+    ts BIGINT NOT NULL,
+    bucket_size INT NOT NULL,
+    public_rooms INT NOT NULL,
+    private_rooms INT NOT NULL
+);
+
+CREATE UNIQUE INDEX user_stats_user_ts ON user_stats(user_id, ts);
+
+CREATE TABLE room_stats (
+    room_id TEXT NOT NULL,
+    ts BIGINT NOT NULL,
+    bucket_size INT NOT NULL,
+    current_state_events INT NOT NULL,
+    joined_members INT NOT NULL,
+    invited_members INT NOT NULL,
+    left_members INT NOT NULL,
+    banned_members INT NOT NULL,
+    state_events INT NOT NULL
+);
+
+CREATE UNIQUE INDEX room_stats_room_ts ON room_stats(room_id, ts);
+
+-- cache of current room state; useful for the publicRooms list
+CREATE TABLE room_state (
+    room_id TEXT NOT NULL,
+    join_rules TEXT,
+    history_visibility TEXT,
+    encryption TEXT,
+    name TEXT,
+    topic TEXT,
+    avatar TEXT,
+    canonical_alias TEXT
+    -- get aliases straight from the right table
+);
+
+CREATE UNIQUE INDEX room_state_room ON room_state(room_id);
+
+CREATE TABLE room_stats_earliest_token (
+    room_id TEXT NOT NULL,
+    token BIGINT NOT NULL
+);
+
+CREATE UNIQUE INDEX room_stats_earliest_token_idx ON room_stats_earliest_token(room_id);
+
+-- Set up staging tables
+INSERT INTO background_updates (update_name, progress_json) VALUES
+    ('populate_stats_createtables', '{}');
+
+-- Run through each room and update stats
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_stats_process_rooms', '{}', 'populate_stats_createtables');
+
+-- Clean up staging tables
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_stats_cleanup', '{}', 'populate_stats_process_rooms');
diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py
index 31a0279b18..5fdb442104 100644
--- a/synapse/storage/state_deltas.py
+++ b/synapse/storage/state_deltas.py
@@ -84,10 +84,16 @@ class StateDeltasStore(SQLBaseStore):
             "get_current_state_deltas", get_current_state_deltas_txn
         )
 
-    def get_max_stream_id_in_current_state_deltas(self):
-        return self._simple_select_one_onecol(
+    def _get_max_stream_id_in_current_state_deltas_txn(self, txn):
+        return self._simple_select_one_onecol_txn(
+            txn,
             table="current_state_delta_stream",
             keyvalues={},
             retcol="COALESCE(MAX(stream_id), -1)",
-            desc="get_max_stream_id_in_current_state_deltas",
+        )
+
+    def get_max_stream_id_in_current_state_deltas(self):
+        return self.runInteraction(
+            "get_max_stream_id_in_current_state_deltas",
+            self._get_max_stream_id_in_current_state_deltas_txn,
         )
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
new file mode 100644
index 0000000000..eb0ced5b5e
--- /dev/null
+++ b/synapse/storage/stats.py
@@ -0,0 +1,450 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018, 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.storage.state_deltas import StateDeltasStore
+from synapse.util.caches.descriptors import cached
+
+logger = logging.getLogger(__name__)
+
+# these fields track absolutes (e.g. total number of rooms on the server)
+ABSOLUTE_STATS_FIELDS = {
+    "room": (
+        "current_state_events",
+        "joined_members",
+        "invited_members",
+        "left_members",
+        "banned_members",
+        "state_events",
+    ),
+    "user": ("public_rooms", "private_rooms"),
+}
+
+TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+
+TEMP_TABLE = "_temp_populate_stats"
+
+
+class StatsStore(StateDeltasStore):
+    def __init__(self, db_conn, hs):
+        super(StatsStore, self).__init__(db_conn, hs)
+
+        self.server_name = hs.hostname
+        self.clock = self.hs.get_clock()
+        self.stats_enabled = hs.config.stats_enabled
+        self.stats_bucket_size = hs.config.stats_bucket_size
+
+        self.register_background_update_handler(
+            "populate_stats_createtables", self._populate_stats_createtables
+        )
+        self.register_background_update_handler(
+            "populate_stats_process_rooms", self._populate_stats_process_rooms
+        )
+        self.register_background_update_handler(
+            "populate_stats_cleanup", self._populate_stats_cleanup
+        )
+
+    @defer.inlineCallbacks
+    def _populate_stats_createtables(self, progress, batch_size):
+
+        if not self.stats_enabled:
+            yield self._end_background_update("populate_stats_createtables")
+            defer.returnValue(1)
+
+        # Get all the rooms that we want to process.
+        def _make_staging_area(txn):
+            sql = (
+                "CREATE TABLE IF NOT EXISTS "
+                + TEMP_TABLE
+                + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
+            )
+            txn.execute(sql)
+
+            sql = (
+                "CREATE TABLE IF NOT EXISTS "
+                + TEMP_TABLE
+                + "_position(position TEXT NOT NULL)"
+            )
+            txn.execute(sql)
+
+            # Get rooms we want to process from the database
+            sql = """
+                SELECT room_id, count(*) FROM current_state_events
+                GROUP BY room_id
+            """
+            txn.execute(sql)
+            rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
+            self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
+            del rooms
+
+        new_pos = yield self.get_max_stream_id_in_current_state_deltas()
+        yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
+        yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
+        self.get_earliest_token_for_room_stats.invalidate_all()
+
+        yield self._end_background_update("populate_stats_createtables")
+        defer.returnValue(1)
+
+    @defer.inlineCallbacks
+    def _populate_stats_cleanup(self, progress, batch_size):
+        """
+        Update the user directory stream position, then clean up the old tables.
+        """
+        if not self.stats_enabled:
+            yield self._end_background_update("populate_stats_cleanup")
+            defer.returnValue(1)
+
+        position = yield self._simple_select_one_onecol(
+            TEMP_TABLE + "_position", None, "position"
+        )
+        yield self.update_stats_stream_pos(position)
+
+        def _delete_staging_area(txn):
+            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
+            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
+
+        yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
+
+        yield self._end_background_update("populate_stats_cleanup")
+        defer.returnValue(1)
+
+    @defer.inlineCallbacks
+    def _populate_stats_process_rooms(self, progress, batch_size):
+
+        if not self.stats_enabled:
+            yield self._end_background_update("populate_stats_process_rooms")
+            defer.returnValue(1)
+
+        # If we don't have progress filed, delete everything.
+        if not progress:
+            yield self.delete_all_stats()
+
+        def _get_next_batch(txn):
+            # Only fetch 250 rooms, so we don't fetch too many at once, even
+            # if those 250 rooms have less than batch_size state events.
+            sql = """
+                SELECT room_id, events FROM %s_rooms
+                ORDER BY events DESC
+                LIMIT 250
+            """ % (
+                TEMP_TABLE,
+            )
+            txn.execute(sql)
+            rooms_to_work_on = txn.fetchall()
+
+            if not rooms_to_work_on:
+                return None
+
+            # Get how many are left to process, so we can give status on how
+            # far we are in processing
+            txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
+            progress["remaining"] = txn.fetchone()[0]
+
+            return rooms_to_work_on
+
+        rooms_to_work_on = yield self.runInteraction(
+            "populate_stats_temp_read", _get_next_batch
+        )
+
+        # No more rooms -- complete the transaction.
+        if not rooms_to_work_on:
+            yield self._end_background_update("populate_stats_process_rooms")
+            defer.returnValue(1)
+
+        logger.info(
+            "Processing the next %d rooms of %d remaining",
+            len(rooms_to_work_on), progress["remaining"],
+        )
+
+        # Number of state events we've processed by going through each room
+        processed_event_count = 0
+
+        for room_id, event_count in rooms_to_work_on:
+
+            current_state_ids = yield self.get_current_state_ids(room_id)
+
+            join_rules = yield self.get_event(
+                current_state_ids.get((EventTypes.JoinRules, "")), allow_none=True
+            )
+            history_visibility = yield self.get_event(
+                current_state_ids.get((EventTypes.RoomHistoryVisibility, "")),
+                allow_none=True,
+            )
+            encryption = yield self.get_event(
+                current_state_ids.get((EventTypes.RoomEncryption, "")), allow_none=True
+            )
+            name = yield self.get_event(
+                current_state_ids.get((EventTypes.Name, "")), allow_none=True
+            )
+            topic = yield self.get_event(
+                current_state_ids.get((EventTypes.Topic, "")), allow_none=True
+            )
+            avatar = yield self.get_event(
+                current_state_ids.get((EventTypes.RoomAvatar, "")), allow_none=True
+            )
+            canonical_alias = yield self.get_event(
+                current_state_ids.get((EventTypes.CanonicalAlias, "")), allow_none=True
+            )
+
+            def _or_none(x, arg):
+                if x:
+                    return x.content.get(arg)
+                return None
+
+            yield self.update_room_state(
+                room_id,
+                {
+                    "join_rules": _or_none(join_rules, "join_rule"),
+                    "history_visibility": _or_none(
+                        history_visibility, "history_visibility"
+                    ),
+                    "encryption": _or_none(encryption, "algorithm"),
+                    "name": _or_none(name, "name"),
+                    "topic": _or_none(topic, "topic"),
+                    "avatar": _or_none(avatar, "url"),
+                    "canonical_alias": _or_none(canonical_alias, "alias"),
+                },
+            )
+
+            now = self.hs.get_reactor().seconds()
+
+            # quantise time to the nearest bucket
+            now = (now // self.stats_bucket_size) * self.stats_bucket_size
+
+            def _fetch_data(txn):
+
+                # Get the current token of the room
+                current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
+
+                current_state_events = len(current_state_ids)
+                joined_members = self._get_user_count_in_room_txn(
+                    txn, room_id, Membership.JOIN
+                )
+                invited_members = self._get_user_count_in_room_txn(
+                    txn, room_id, Membership.INVITE
+                )
+                left_members = self._get_user_count_in_room_txn(
+                    txn, room_id, Membership.LEAVE
+                )
+                banned_members = self._get_user_count_in_room_txn(
+                    txn, room_id, Membership.BAN
+                )
+                total_state_events = self._get_total_state_event_counts_txn(
+                    txn, room_id
+                )
+
+                self._update_stats_txn(
+                    txn,
+                    "room",
+                    room_id,
+                    now,
+                    {
+                        "bucket_size": self.stats_bucket_size,
+                        "current_state_events": current_state_events,
+                        "joined_members": joined_members,
+                        "invited_members": invited_members,
+                        "left_members": left_members,
+                        "banned_members": banned_members,
+                        "state_events": total_state_events,
+                    },
+                )
+                self._simple_insert_txn(
+                    txn,
+                    "room_stats_earliest_token",
+                    {"room_id": room_id, "token": current_token},
+                )
+
+            yield self.runInteraction("update_room_stats", _fetch_data)
+
+            # We've finished a room. Delete it from the table.
+            yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
+            # Update the remaining counter.
+            progress["remaining"] -= 1
+            yield self.runInteraction(
+                "populate_stats",
+                self._background_update_progress_txn,
+                "populate_stats_process_rooms",
+                progress,
+            )
+
+            processed_event_count += event_count
+
+            if processed_event_count > batch_size:
+                # Don't process any more rooms, we've hit our batch size.
+                defer.returnValue(processed_event_count)
+
+        defer.returnValue(processed_event_count)
+
+    def delete_all_stats(self):
+        """
+        Delete all statistics records.
+        """
+
+        def _delete_all_stats_txn(txn):
+            txn.execute("DELETE FROM room_state")
+            txn.execute("DELETE FROM room_stats")
+            txn.execute("DELETE FROM room_stats_earliest_token")
+            txn.execute("DELETE FROM user_stats")
+
+        return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
+
+    def get_stats_stream_pos(self):
+        return self._simple_select_one_onecol(
+            table="stats_stream_pos",
+            keyvalues={},
+            retcol="stream_id",
+            desc="stats_stream_pos",
+        )
+
+    def update_stats_stream_pos(self, stream_id):
+        return self._simple_update_one(
+            table="stats_stream_pos",
+            keyvalues={},
+            updatevalues={"stream_id": stream_id},
+            desc="update_stats_stream_pos",
+        )
+
+    def update_room_state(self, room_id, fields):
+        """
+        Args:
+            room_id (str)
+            fields (dict[str:Any])
+        """
+        return self._simple_upsert(
+            table="room_state",
+            keyvalues={"room_id": room_id},
+            values=fields,
+            desc="update_room_state",
+        )
+
+    def get_deltas_for_room(self, room_id, start, size=100):
+        """
+        Get statistics deltas for a given room.
+
+        Args:
+            room_id (str)
+            start (int): Pagination start. Number of entries, not timestamp.
+            size (int): How many entries to return.
+
+        Returns:
+            Deferred[list[dict]], where the dict has the keys of
+            ABSOLUTE_STATS_FIELDS["room"] and "ts".
+        """
+        return self._simple_select_list_paginate(
+            "room_stats",
+            {"room_id": room_id},
+            "ts",
+            start,
+            size,
+            retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
+            order_direction="DESC",
+        )
+
+    def get_all_room_state(self):
+        return self._simple_select_list(
+            "room_state", None, retcols=("name", "topic", "canonical_alias")
+        )
+
+    @cached()
+    def get_earliest_token_for_room_stats(self, room_id):
+        """
+        Fetch the "earliest token". This is used by the room stats delta
+        processor to ignore deltas that have been processed between the
+        start of the background task and any particular room's stats
+        being calculated.
+
+        Returns:
+            Deferred[int]
+        """
+        return self._simple_select_one_onecol(
+            "room_stats_earliest_token",
+            {"room_id": room_id},
+            retcol="token",
+            allow_none=True,
+        )
+
+    def update_stats(self, stats_type, stats_id, ts, fields):
+        table, id_col = TYPE_TO_ROOM[stats_type]
+        return self._simple_upsert(
+            table=table,
+            keyvalues={id_col: stats_id, "ts": ts},
+            values=fields,
+            desc="update_stats",
+        )
+
+    def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
+        table, id_col = TYPE_TO_ROOM[stats_type]
+        return self._simple_upsert_txn(
+            txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
+        )
+
+    def update_stats_delta(self, ts, stats_type, stats_id, field, value):
+        def _update_stats_delta(txn):
+            table, id_col = TYPE_TO_ROOM[stats_type]
+
+            sql = (
+                "SELECT * FROM %s"
+                " WHERE %s=? and ts=("
+                "  SELECT MAX(ts) FROM %s"
+                "  WHERE %s=?"
+                ")"
+            ) % (table, id_col, table, id_col)
+            txn.execute(sql, (stats_id, stats_id))
+            rows = self.cursor_to_dict(txn)
+            if len(rows) == 0:
+                # silently skip as we don't have anything to apply a delta to yet.
+                # this tries to minimise any race between the initial sync and
+                # subsequent deltas arriving.
+                return
+
+            current_ts = ts
+            latest_ts = rows[0]["ts"]
+            if current_ts < latest_ts:
+                # This one is in the past, but we're just encountering it now.
+                # Mark it as part of the current bucket.
+                current_ts = latest_ts
+            elif ts != latest_ts:
+                # we have to copy our absolute counters over to the new entry.
+                values = {
+                    key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
+                }
+                values[id_col] = stats_id
+                values["ts"] = ts
+                values["bucket_size"] = self.stats_bucket_size
+
+                self._simple_insert_txn(txn, table=table, values=values)
+
+            # actually update the new value
+            if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
+                self._simple_update_txn(
+                    txn,
+                    table=table,
+                    keyvalues={id_col: stats_id, "ts": current_ts},
+                    updatevalues={field: value},
+                )
+            else:
+                sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
+                    table,
+                    field,
+                    field,
+                    id_col,
+                )
+                txn.execute(sql, (value, stats_id, current_ts))
+
+        return self.runInteraction("update_stats_delta", _update_stats_delta)
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 311b49e18a..fe412355d8 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -226,6 +226,8 @@ class LoggingContext(object):
             self.request = request
 
     def __str__(self):
+        if self.request:
+            return str(self.request)
         return "%s@%x" % (self.name, id(self))
 
     @classmethod
@@ -274,12 +276,10 @@ class LoggingContext(object):
         current = self.set_current_context(self.previous_context)
         if current is not self:
             if current is self.sentinel:
-                logger.warn("Expected logging context %s has been lost", self)
+                logger.warning("Expected logging context %s was lost", self)
             else:
-                logger.warn(
-                    "Current logging context %s is not expected context %s",
-                    current,
-                    self
+                logger.warning(
+                    "Expected logging context %s but found %s", self, current
                 )
         self.previous_context = None
         self.alive = False
@@ -433,10 +433,14 @@ class PreserveLoggingContext(object):
         context = LoggingContext.set_current_context(self.current_context)
 
         if context != self.new_context:
-            logger.warn(
-                "Unexpected logging context: %s is not %s",
-                context, self.new_context,
-            )
+            if context is LoggingContext.sentinel:
+                logger.warning("Expected logging context %s was lost", self.new_context)
+            else:
+                logger.warning(
+                    "Expected logging context %s but found %s",
+                    self.new_context,
+                    context,
+                )
 
         if self.current_context is not LoggingContext.sentinel:
             if not self.current_context.alive:
diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py
index 3c79d4afe7..3933ad4347 100644
--- a/tests/crypto/test_keyring.py
+++ b/tests/crypto/test_keyring.py
@@ -24,7 +24,12 @@ from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
 from synapse.crypto import keyring
-from synapse.crypto.keyring import KeyLookupError
+from synapse.crypto.keyring import (
+    KeyLookupError,
+    PerspectivesKeyFetcher,
+    ServerKeyFetcher,
+)
+from synapse.storage.keys import FetchKeyResult
 from synapse.util import logcontext
 from synapse.util.logcontext import LoggingContext
 
@@ -50,11 +55,11 @@ class MockPerspectiveServer(object):
                 key_id: {"key": signedjson.key.encode_verify_key_base64(verify_key)}
             },
         }
-        return self.get_signed_response(res)
+        self.sign_response(res)
+        return res
 
-    def get_signed_response(self, res):
+    def sign_response(self, res):
         signedjson.sign.sign_json(res, self.server_name, self.key)
-        return res
 
 
 class KeyringTestCase(unittest.HomeserverTestCase):
@@ -80,7 +85,7 @@ class KeyringTestCase(unittest.HomeserverTestCase):
         # we run the lookup in a logcontext so that the patched inlineCallbacks can check
         # it is doing the right thing with logcontexts.
         wait_1_deferred = run_in_context(
-            kr.wait_for_previous_lookups, ["server1"], {"server1": lookup_1_deferred}
+            kr.wait_for_previous_lookups, {"server1": lookup_1_deferred}
         )
 
         # there were no previous lookups, so the deferred should be ready
@@ -89,7 +94,7 @@ class KeyringTestCase(unittest.HomeserverTestCase):
         # set off another wait. It should block because the first lookup
         # hasn't yet completed.
         wait_2_deferred = run_in_context(
-            kr.wait_for_previous_lookups, ["server1"], {"server1": lookup_2_deferred}
+            kr.wait_for_previous_lookups, {"server1": lookup_2_deferred}
         )
 
         self.assertFalse(wait_2_deferred.called)
@@ -192,8 +197,18 @@ class KeyringTestCase(unittest.HomeserverTestCase):
         kr = keyring.Keyring(self.hs)
 
         key1 = signedjson.key.generate_signing_key(1)
-        r = self.hs.datastore.store_server_verify_key(
-            "server9", "", time.time() * 1000, signedjson.key.get_verify_key(key1)
+        key1_id = "%s:%s" % (key1.alg, key1.version)
+
+        r = self.hs.datastore.store_server_verify_keys(
+            "server9",
+            time.time() * 1000,
+            [
+                (
+                    "server9",
+                    key1_id,
+                    FetchKeyResult(signedjson.key.get_verify_key(key1), 1000),
+                ),
+            ],
         )
         self.get_success(r)
         json1 = {}
@@ -207,16 +222,23 @@ class KeyringTestCase(unittest.HomeserverTestCase):
         self.assertFalse(d.called)
         self.get_success(d)
 
+
+class ServerKeyFetcherTestCase(unittest.HomeserverTestCase):
+    def make_homeserver(self, reactor, clock):
+        self.http_client = Mock()
+        hs = self.setup_test_homeserver(handlers=None, http_client=self.http_client)
+        return hs
+
     def test_get_keys_from_server(self):
         # arbitrarily advance the clock a bit
         self.reactor.advance(100)
 
         SERVER_NAME = "server2"
-        kr = keyring.Keyring(self.hs)
+        fetcher = ServerKeyFetcher(self.hs)
         testkey = signedjson.key.generate_signing_key("ver1")
         testverifykey = signedjson.key.get_verify_key(testkey)
         testverifykey_id = "ed25519:ver1"
-        VALID_UNTIL_TS = 1000
+        VALID_UNTIL_TS = 200 * 1000
 
         # valid response
         response = {
@@ -239,11 +261,12 @@ class KeyringTestCase(unittest.HomeserverTestCase):
         self.http_client.get_json.side_effect = get_json
 
         server_name_and_key_ids = [(SERVER_NAME, ("key1",))]
-        keys = self.get_success(kr.get_keys_from_server(server_name_and_key_ids))
+        keys = self.get_success(fetcher.get_keys(server_name_and_key_ids))
         k = keys[SERVER_NAME][testverifykey_id]
-        self.assertEqual(k, testverifykey)
-        self.assertEqual(k.alg, "ed25519")
-        self.assertEqual(k.version, "ver1")
+        self.assertEqual(k.valid_until_ts, VALID_UNTIL_TS)
+        self.assertEqual(k.verify_key, testverifykey)
+        self.assertEqual(k.verify_key.alg, "ed25519")
+        self.assertEqual(k.verify_key.version, "ver1")
 
         # check that the perspectives store is correctly updated
         lookup_triplet = (SERVER_NAME, testverifykey_id, None)
@@ -266,15 +289,26 @@ class KeyringTestCase(unittest.HomeserverTestCase):
         # change the server name: it should cause a rejection
         response["server_name"] = "OTHER_SERVER"
         self.get_failure(
-            kr.get_keys_from_server(server_name_and_key_ids), KeyLookupError
+            fetcher.get_keys(server_name_and_key_ids), KeyLookupError
         )
 
+
+class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
+    def make_homeserver(self, reactor, clock):
+        self.mock_perspective_server = MockPerspectiveServer()
+        self.http_client = Mock()
+        hs = self.setup_test_homeserver(handlers=None, http_client=self.http_client)
+        keys = self.mock_perspective_server.get_verify_keys()
+        hs.config.perspectives = {self.mock_perspective_server.server_name: keys}
+        return hs
+
     def test_get_keys_from_perspectives(self):
         # arbitrarily advance the clock a bit
         self.reactor.advance(100)
 
+        fetcher = PerspectivesKeyFetcher(self.hs)
+
         SERVER_NAME = "server2"
-        kr = keyring.Keyring(self.hs)
         testkey = signedjson.key.generate_signing_key("ver1")
         testverifykey = signedjson.key.get_verify_key(testkey)
         testverifykey_id = "ed25519:ver1"
@@ -292,9 +326,10 @@ class KeyringTestCase(unittest.HomeserverTestCase):
             },
         }
 
-        persp_resp = {
-            "server_keys": [self.mock_perspective_server.get_signed_response(response)]
-        }
+        # the response must be signed by both the origin server and the perspectives
+        # server.
+        signedjson.sign.sign_json(response, SERVER_NAME, testkey)
+        self.mock_perspective_server.sign_response(response)
 
         def post_json(destination, path, data, **kwargs):
             self.assertEqual(destination, self.mock_perspective_server.server_name)
@@ -303,17 +338,18 @@ class KeyringTestCase(unittest.HomeserverTestCase):
             # check that the request is for the expected key
             q = data["server_keys"]
             self.assertEqual(list(q[SERVER_NAME].keys()), ["key1"])
-            return persp_resp
+            return {"server_keys": [response]}
 
         self.http_client.post_json.side_effect = post_json
 
         server_name_and_key_ids = [(SERVER_NAME, ("key1",))]
-        keys = self.get_success(kr.get_keys_from_perspectives(server_name_and_key_ids))
+        keys = self.get_success(fetcher.get_keys(server_name_and_key_ids))
         self.assertIn(SERVER_NAME, keys)
         k = keys[SERVER_NAME][testverifykey_id]
-        self.assertEqual(k, testverifykey)
-        self.assertEqual(k.alg, "ed25519")
-        self.assertEqual(k.version, "ver1")
+        self.assertEqual(k.valid_until_ts, VALID_UNTIL_TS)
+        self.assertEqual(k.verify_key, testverifykey)
+        self.assertEqual(k.verify_key.alg, "ed25519")
+        self.assertEqual(k.verify_key.version, "ver1")
 
         # check that the perspectives store is correctly updated
         lookup_triplet = (SERVER_NAME, testverifykey_id, None)
@@ -330,13 +366,81 @@ class KeyringTestCase(unittest.HomeserverTestCase):
 
         self.assertEqual(
             bytes(res["key_json"]),
-            canonicaljson.encode_canonical_json(persp_resp["server_keys"][0]),
+            canonicaljson.encode_canonical_json(response),
         )
 
+    def test_invalid_perspectives_responses(self):
+        """Check that invalid responses from the perspectives server are rejected"""
+        # arbitrarily advance the clock a bit
+        self.reactor.advance(100)
+
+        SERVER_NAME = "server2"
+        testkey = signedjson.key.generate_signing_key("ver1")
+        testverifykey = signedjson.key.get_verify_key(testkey)
+        testverifykey_id = "ed25519:ver1"
+        VALID_UNTIL_TS = 200 * 1000
+
+        def build_response():
+            # valid response
+            response = {
+                "server_name": SERVER_NAME,
+                "old_verify_keys": {},
+                "valid_until_ts": VALID_UNTIL_TS,
+                "verify_keys": {
+                    testverifykey_id: {
+                        "key": signedjson.key.encode_verify_key_base64(testverifykey)
+                    }
+                },
+            }
+
+            # the response must be signed by both the origin server and the perspectives
+            # server.
+            signedjson.sign.sign_json(response, SERVER_NAME, testkey)
+            self.mock_perspective_server.sign_response(response)
+            return response
+
+        def get_key_from_perspectives(response):
+            fetcher = PerspectivesKeyFetcher(self.hs)
+            server_name_and_key_ids = [(SERVER_NAME, ("key1",))]
+
+            def post_json(destination, path, data, **kwargs):
+                self.assertEqual(destination, self.mock_perspective_server.server_name)
+                self.assertEqual(path, "/_matrix/key/v2/query")
+                return {"server_keys": [response]}
+
+            self.http_client.post_json.side_effect = post_json
+
+            return self.get_success(
+                fetcher.get_keys(server_name_and_key_ids)
+            )
+
+        # start with a valid response so we can check we are testing the right thing
+        response = build_response()
+        keys = get_key_from_perspectives(response)
+        k = keys[SERVER_NAME][testverifykey_id]
+        self.assertEqual(k.verify_key, testverifykey)
+
+        # remove the perspectives server's signature
+        response = build_response()
+        del response["signatures"][self.mock_perspective_server.server_name]
+        self.http_client.post_json.return_value = {"server_keys": [response]}
+        keys = get_key_from_perspectives(response)
+        self.assertEqual(keys, {}, "Expected empty dict with missing persp server sig")
+
+        # remove the origin server's signature
+        response = build_response()
+        del response["signatures"][SERVER_NAME]
+        self.http_client.post_json.return_value = {"server_keys": [response]}
+        keys = get_key_from_perspectives(response)
+        self.assertEqual(keys, {}, "Expected empty dict with missing origin server sig")
+
 
 @defer.inlineCallbacks
 def run_in_context(f, *args, **kwargs):
-    with LoggingContext("testctx"):
+    with LoggingContext("testctx") as ctx:
+        # we set the "request" prop to make it easier to follow what's going on in the
+        # logs.
+        ctx.request = "testctx"
         rv = yield f(*args, **kwargs)
     defer.returnValue(rv)
 
diff --git a/tests/federation/test_complexity.py b/tests/federation/test_complexity.py
new file mode 100644
index 0000000000..1e3e5aec66
--- /dev/null
+++ b/tests/federation/test_complexity.py
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 Matrix.org Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.config.ratelimiting import FederationRateLimitConfig
+from synapse.federation.transport import server
+from synapse.rest import admin
+from synapse.rest.client.v1 import login, room
+from synapse.util.ratelimitutils import FederationRateLimiter
+
+from tests import unittest
+
+
+class RoomComplexityTests(unittest.HomeserverTestCase):
+
+    servlets = [
+        admin.register_servlets,
+        room.register_servlets,
+        login.register_servlets,
+    ]
+
+    def default_config(self, name='test'):
+        config = super(RoomComplexityTests, self).default_config(name=name)
+        config["limit_large_remote_room_joins"] = True
+        config["limit_large_remote_room_complexity"] = 0.05
+        return config
+
+    def prepare(self, reactor, clock, homeserver):
+        class Authenticator(object):
+            def authenticate_request(self, request, content):
+                return defer.succeed("otherserver.nottld")
+
+        ratelimiter = FederationRateLimiter(
+            clock,
+            FederationRateLimitConfig(
+                window_size=1,
+                sleep_limit=1,
+                sleep_msec=1,
+                reject_limit=1000,
+                concurrent_requests=1000,
+            ),
+        )
+        server.register_servlets(
+            homeserver, self.resource, Authenticator(), ratelimiter
+        )
+
+    def test_complexity_simple(self):
+
+        u1 = self.register_user("u1", "pass")
+        u1_token = self.login("u1", "pass")
+
+        room_1 = self.helper.create_room_as(u1, tok=u1_token)
+        self.helper.send_state(
+            room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
+        )
+
+        # Get the room complexity
+        request, channel = self.make_request(
+            "GET", "/_matrix/federation/unstable/rooms/%s/complexity" % (room_1,)
+        )
+        self.render(request)
+        self.assertEquals(200, channel.code)
+        complexity = channel.json_body["v1"]
+        self.assertTrue(complexity > 0, complexity)
+
+        # Artificially raise the complexity
+        store = self.hs.get_datastore()
+        store.get_current_state_event_counts = lambda x: defer.succeed(500 * 1.23)
+
+        # Get the room complexity again -- make sure it's our artificial value
+        request, channel = self.make_request(
+            "GET", "/_matrix/federation/unstable/rooms/%s/complexity" % (room_1,)
+        )
+        self.render(request)
+        self.assertEquals(200, channel.code)
+        complexity = channel.json_body["v1"]
+        self.assertEqual(complexity, 1.23)
diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py
new file mode 100644
index 0000000000..249aba3d59
--- /dev/null
+++ b/tests/handlers/test_stats.py
@@ -0,0 +1,251 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from mock import Mock
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.rest import admin
+from synapse.rest.client.v1 import login, room
+
+from tests import unittest
+
+
+class StatsRoomTests(unittest.HomeserverTestCase):
+
+    servlets = [
+        admin.register_servlets_for_client_rest_resource,
+        room.register_servlets,
+        login.register_servlets,
+    ]
+
+    def prepare(self, reactor, clock, hs):
+
+        self.store = hs.get_datastore()
+        self.handler = self.hs.get_stats_handler()
+
+    def _add_background_updates(self):
+        """
+        Add the background updates we need to run.
+        """
+        # Ugh, have to reset this flag
+        self.store._all_done = False
+
+        self.get_success(
+            self.store._simple_insert(
+                "background_updates",
+                {"update_name": "populate_stats_createtables", "progress_json": "{}"},
+            )
+        )
+        self.get_success(
+            self.store._simple_insert(
+                "background_updates",
+                {
+                    "update_name": "populate_stats_process_rooms",
+                    "progress_json": "{}",
+                    "depends_on": "populate_stats_createtables",
+                },
+            )
+        )
+        self.get_success(
+            self.store._simple_insert(
+                "background_updates",
+                {
+                    "update_name": "populate_stats_cleanup",
+                    "progress_json": "{}",
+                    "depends_on": "populate_stats_process_rooms",
+                },
+            )
+        )
+
+    def test_initial_room(self):
+        """
+        The background updates will build the table from scratch.
+        """
+        r = self.get_success(self.store.get_all_room_state())
+        self.assertEqual(len(r), 0)
+
+        # Disable stats
+        self.hs.config.stats_enabled = False
+        self.handler.stats_enabled = False
+
+        u1 = self.register_user("u1", "pass")
+        u1_token = self.login("u1", "pass")
+
+        room_1 = self.helper.create_room_as(u1, tok=u1_token)
+        self.helper.send_state(
+            room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
+        )
+
+        # Stats disabled, shouldn't have done anything
+        r = self.get_success(self.store.get_all_room_state())
+        self.assertEqual(len(r), 0)
+
+        # Enable stats
+        self.hs.config.stats_enabled = True
+        self.handler.stats_enabled = True
+
+        # Do the initial population of the user directory via the background update
+        self._add_background_updates()
+
+        while not self.get_success(self.store.has_completed_background_updates()):
+            self.get_success(self.store.do_next_background_update(100), by=0.1)
+
+        r = self.get_success(self.store.get_all_room_state())
+
+        self.assertEqual(len(r), 1)
+        self.assertEqual(r[0]["topic"], "foo")
+
+    def test_initial_earliest_token(self):
+        """
+        Ingestion via notify_new_event will ignore tokens that the background
+        update have already processed.
+        """
+        self.reactor.advance(86401)
+
+        self.hs.config.stats_enabled = False
+        self.handler.stats_enabled = False
+
+        u1 = self.register_user("u1", "pass")
+        u1_token = self.login("u1", "pass")
+
+        u2 = self.register_user("u2", "pass")
+        u2_token = self.login("u2", "pass")
+
+        u3 = self.register_user("u3", "pass")
+        u3_token = self.login("u3", "pass")
+
+        room_1 = self.helper.create_room_as(u1, tok=u1_token)
+        self.helper.send_state(
+            room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
+        )
+
+        # Begin the ingestion by creating the temp tables. This will also store
+        # the position that the deltas should begin at, once they take over.
+        self.hs.config.stats_enabled = True
+        self.handler.stats_enabled = True
+        self.store._all_done = False
+        self.get_success(self.store.update_stats_stream_pos(None))
+
+        self.get_success(
+            self.store._simple_insert(
+                "background_updates",
+                {"update_name": "populate_stats_createtables", "progress_json": "{}"},
+            )
+        )
+
+        while not self.get_success(self.store.has_completed_background_updates()):
+            self.get_success(self.store.do_next_background_update(100), by=0.1)
+
+        # Now, before the table is actually ingested, add some more events.
+        self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
+        self.helper.join(room=room_1, user=u2, tok=u2_token)
+
+        # Now do the initial ingestion.
+        self.get_success(
+            self.store._simple_insert(
+                "background_updates",
+                {"update_name": "populate_stats_process_rooms", "progress_json": "{}"},
+            )
+        )
+        self.get_success(
+            self.store._simple_insert(
+                "background_updates",
+                {
+                    "update_name": "populate_stats_cleanup",
+                    "progress_json": "{}",
+                    "depends_on": "populate_stats_process_rooms",
+                },
+            )
+        )
+
+        self.store._all_done = False
+        while not self.get_success(self.store.has_completed_background_updates()):
+            self.get_success(self.store.do_next_background_update(100), by=0.1)
+
+        self.reactor.advance(86401)
+
+        # Now add some more events, triggering ingestion. Because of the stream
+        # position being set to before the events sent in the middle, a simpler
+        # implementation would reprocess those events, and say there were four
+        # users, not three.
+        self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
+        self.helper.join(room=room_1, user=u3, tok=u3_token)
+
+        # Get the deltas! There should be two -- day 1, and day 2.
+        r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
+
+        # The oldest has 2 joined members
+        self.assertEqual(r[-1]["joined_members"], 2)
+
+        # The newest has 3
+        self.assertEqual(r[0]["joined_members"], 3)
+
+    def test_incorrect_state_transition(self):
+        """
+        If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to
+        (JOIN, INVITE, LEAVE, BAN), an error is raised.
+        """
+        events = {
+            "a1": {"membership": Membership.LEAVE},
+            "a2": {"membership": "not a real thing"},
+        }
+
+        def get_event(event_id):
+            m = Mock()
+            m.content = events[event_id]
+            d = defer.Deferred()
+            self.reactor.callLater(0.0, d.callback, m)
+            return d
+
+        def get_received_ts(event_id):
+            return defer.succeed(1)
+
+        self.store.get_received_ts = get_received_ts
+        self.store.get_event = get_event
+
+        deltas = [
+            {
+                "type": EventTypes.Member,
+                "state_key": "some_user",
+                "room_id": "room",
+                "event_id": "a1",
+                "prev_event_id": "a2",
+                "stream_id": "bleb",
+            }
+        ]
+
+        f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
+        self.assertEqual(
+            f.value.args[0], "'not a real thing' is not a valid prev_membership"
+        )
+
+        # And the other way...
+        deltas = [
+            {
+                "type": EventTypes.Member,
+                "state_key": "some_user",
+                "room_id": "room",
+                "event_id": "a2",
+                "prev_event_id": "a1",
+                "stream_id": "bleb",
+            }
+        ]
+
+        f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
+        self.assertEqual(
+            f.value.args[0], "'not a real thing' is not a valid membership"
+        )
diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py
index 05b0143c42..f7133fc12e 100644
--- a/tests/rest/client/v1/utils.py
+++ b/tests/rest/client/v1/utils.py
@@ -127,3 +127,20 @@ class RestHelper(object):
         )
 
         return channel.json_body
+
+    def send_state(self, room_id, event_type, body, tok, expect_code=200):
+        path = "/_matrix/client/r0/rooms/%s/state/%s" % (room_id, event_type)
+        if tok:
+            path = path + "?access_token=%s" % tok
+
+        request, channel = make_request(
+            self.hs.get_reactor(), "PUT", path, json.dumps(body).encode('utf8')
+        )
+        render(request, self.resource, self.hs.get_reactor())
+
+        assert int(channel.result["code"]) == expect_code, (
+            "Expected: %d, got: %d, resp: %r"
+            % (expect_code, int(channel.result["code"]), channel.result["body"])
+        )
+
+        return channel.json_body
diff --git a/tests/rest/client/v2_alpha/test_capabilities.py b/tests/rest/client/v2_alpha/test_capabilities.py
index f3ef977404..bce5b0cf4c 100644
--- a/tests/rest/client/v2_alpha/test_capabilities.py
+++ b/tests/rest/client/v2_alpha/test_capabilities.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import synapse.rest.admin
-from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.rest.client.v1 import login
 from synapse.rest.client.v2_alpha import capabilities
 
@@ -32,6 +32,7 @@ class CapabilitiesTestCase(unittest.HomeserverTestCase):
         self.url = b"/_matrix/client/r0/capabilities"
         hs = self.setup_test_homeserver()
         self.store = hs.get_datastore()
+        self.config = hs.config
         return hs
 
     def test_check_auth_required(self):
@@ -51,8 +52,10 @@ class CapabilitiesTestCase(unittest.HomeserverTestCase):
         self.assertEqual(channel.code, 200)
         for room_version in capabilities['m.room_versions']['available'].keys():
             self.assertTrue(room_version in KNOWN_ROOM_VERSIONS, "" + room_version)
+
         self.assertEqual(
-            DEFAULT_ROOM_VERSION.identifier, capabilities['m.room_versions']['default']
+            self.config.default_room_version.identifier,
+            capabilities['m.room_versions']['default'],
         )
 
     def test_get_change_password_capabilities(self):
diff --git a/tests/storage/test_keys.py b/tests/storage/test_keys.py
index 6bfaa00fe9..e07ff01201 100644
--- a/tests/storage/test_keys.py
+++ b/tests/storage/test_keys.py
@@ -17,6 +17,8 @@ import signedjson.key
 
 from twisted.internet.defer import Deferred
 
+from synapse.storage.keys import FetchKeyResult
+
 import tests.unittest
 
 KEY_1 = signedjson.key.decode_verify_key_base64(
@@ -31,23 +33,34 @@ class KeyStoreTestCase(tests.unittest.HomeserverTestCase):
     def test_get_server_verify_keys(self):
         store = self.hs.get_datastore()
 
-        d = store.store_server_verify_key("server1", "from_server", 0, KEY_1)
-        self.get_success(d)
-        d = store.store_server_verify_key("server1", "from_server", 0, KEY_2)
+        key_id_1 = "ed25519:key1"
+        key_id_2 = "ed25519:KEY_ID_2"
+        d = store.store_server_verify_keys(
+            "from_server",
+            10,
+            [
+                ("server1", key_id_1, FetchKeyResult(KEY_1, 100)),
+                ("server1", key_id_2, FetchKeyResult(KEY_2, 200)),
+            ],
+        )
         self.get_success(d)
 
         d = store.get_server_verify_keys(
-            [
-                ("server1", "ed25519:key1"),
-                ("server1", "ed25519:key2"),
-                ("server1", "ed25519:key3"),
-            ]
+            [("server1", key_id_1), ("server1", key_id_2), ("server1", "ed25519:key3")]
         )
         res = self.get_success(d)
 
         self.assertEqual(len(res.keys()), 3)
-        self.assertEqual(res[("server1", "ed25519:key1")].version, "key1")
-        self.assertEqual(res[("server1", "ed25519:key2")].version, "key2")
+        res1 = res[("server1", key_id_1)]
+        self.assertEqual(res1.verify_key, KEY_1)
+        self.assertEqual(res1.verify_key.version, "key1")
+        self.assertEqual(res1.valid_until_ts, 100)
+
+        res2 = res[("server1", key_id_2)]
+        self.assertEqual(res2.verify_key, KEY_2)
+        # version comes from the ID it was stored with
+        self.assertEqual(res2.verify_key.version, "KEY_ID_2")
+        self.assertEqual(res2.valid_until_ts, 200)
 
         # non-existent result gives None
         self.assertIsNone(res[("server1", "ed25519:key3")])
@@ -60,32 +73,51 @@ class KeyStoreTestCase(tests.unittest.HomeserverTestCase):
         key_id_1 = "ed25519:key1"
         key_id_2 = "ed25519:key2"
 
-        d = store.store_server_verify_key("srv1", "from_server", 0, KEY_1)
-        self.get_success(d)
-        d = store.store_server_verify_key("srv1", "from_server", 0, KEY_2)
+        d = store.store_server_verify_keys(
+            "from_server",
+            0,
+            [
+                ("srv1", key_id_1, FetchKeyResult(KEY_1, 100)),
+                ("srv1", key_id_2, FetchKeyResult(KEY_2, 200)),
+            ],
+        )
         self.get_success(d)
 
         d = store.get_server_verify_keys([("srv1", key_id_1), ("srv1", key_id_2)])
         res = self.get_success(d)
         self.assertEqual(len(res.keys()), 2)
-        self.assertEqual(res[("srv1", key_id_1)], KEY_1)
-        self.assertEqual(res[("srv1", key_id_2)], KEY_2)
+
+        res1 = res[("srv1", key_id_1)]
+        self.assertEqual(res1.verify_key, KEY_1)
+        self.assertEqual(res1.valid_until_ts, 100)
+
+        res2 = res[("srv1", key_id_2)]
+        self.assertEqual(res2.verify_key, KEY_2)
+        self.assertEqual(res2.valid_until_ts, 200)
 
         # we should be able to look up the same thing again without a db hit
         res = store.get_server_verify_keys([("srv1", key_id_1)])
         if isinstance(res, Deferred):
             res = self.successResultOf(res)
         self.assertEqual(len(res.keys()), 1)
-        self.assertEqual(res[("srv1", key_id_1)], KEY_1)
+        self.assertEqual(res[("srv1", key_id_1)].verify_key, KEY_1)
 
         new_key_2 = signedjson.key.get_verify_key(
             signedjson.key.generate_signing_key("key2")
         )
-        d = store.store_server_verify_key("srv1", "from_server", 10, new_key_2)
+        d = store.store_server_verify_keys(
+            "from_server", 10, [("srv1", key_id_2, FetchKeyResult(new_key_2, 300))]
+        )
         self.get_success(d)
 
         d = store.get_server_verify_keys([("srv1", key_id_1), ("srv1", key_id_2)])
         res = self.get_success(d)
         self.assertEqual(len(res.keys()), 2)
-        self.assertEqual(res[("srv1", key_id_1)], KEY_1)
-        self.assertEqual(res[("srv1", key_id_2)], new_key_2)
+
+        res1 = res[("srv1", key_id_1)]
+        self.assertEqual(res1.verify_key, KEY_1)
+        self.assertEqual(res1.valid_until_ts, 100)
+
+        res2 = res[("srv1", key_id_2)]
+        self.assertEqual(res2.verify_key, new_key_2)
+        self.assertEqual(res2.valid_until_ts, 300)