summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2019-06-05 13:38:01 +0100
committerBrendan Abolivier <babolivier@matrix.org>2019-06-05 13:38:01 +0100
commit40596aec0ec6da1e8918255b75eb5329292901ab (patch)
treea918b5ebfc4b48f8996b247df7ca75cd7cbb5d0f /synapse/handlers
parentLint (diff)
parentClean up debug logging (#5347) (diff)
downloadsynapse-40596aec0ec6da1e8918255b75eb5329292901ab.tar.xz
Merge branch 'develop' into m-heroes-empty-room-name
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py4
-rw-r--r--synapse/handlers/directory.py23
-rw-r--r--synapse/handlers/events.py11
-rw-r--r--synapse/handlers/federation.py308
-rw-r--r--synapse/handlers/initial_sync.py44
-rw-r--r--synapse/handlers/message.py56
-rw-r--r--synapse/handlers/pagination.py22
-rw-r--r--synapse/handlers/presence.py104
-rw-r--r--synapse/handlers/profile.py56
-rw-r--r--synapse/handlers/register.py13
-rw-r--r--synapse/handlers/room.py12
-rw-r--r--synapse/handlers/room_member.py22
-rw-r--r--synapse/handlers/search.py42
-rw-r--r--synapse/handlers/stats.py325
-rw-r--r--synapse/handlers/sync.py44
15 files changed, 832 insertions, 254 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index ac09d03ba9..dca337ec61 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -90,8 +90,8 @@ class BaseHandler(object):
             messages_per_second = override.messages_per_second
             burst_count = override.burst_count
         else:
-            messages_per_second = self.hs.config.rc_messages_per_second
-            burst_count = self.hs.config.rc_message_burst_count
+            messages_per_second = self.hs.config.rc_message.per_second
+            burst_count = self.hs.config.rc_message.burst_count
 
         allowed, time_allowed = self.ratelimiter.can_do_action(
             user_id, time_now,
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 27bd06df5d..a12f9508d8 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -19,7 +19,7 @@ import string
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes
+from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes
 from synapse.api.errors import (
     AuthError,
     CodeMessageException,
@@ -43,8 +43,10 @@ class DirectoryHandler(BaseHandler):
         self.state = hs.get_state_handler()
         self.appservice_handler = hs.get_application_service_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
+        self.store = hs.get_datastore()
         self.config = hs.config
         self.enable_room_list_search = hs.config.enable_room_list_search
+        self.require_membership = hs.config.require_membership_for_aliases
 
         self.federation = hs.get_federation_client()
         hs.get_federation_registry().register_query_handler(
@@ -83,7 +85,7 @@ class DirectoryHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def create_association(self, requester, room_alias, room_id, servers=None,
-                           send_event=True):
+                           send_event=True, check_membership=True):
         """Attempt to create a new alias
 
         Args:
@@ -93,6 +95,8 @@ class DirectoryHandler(BaseHandler):
             servers (list[str]|None): List of servers that others servers
                 should try and join via
             send_event (bool): Whether to send an updated m.room.aliases event
+            check_membership (bool): Whether to check if the user is in the room
+                before the alias can be set (if the server's config requires it).
 
         Returns:
             Deferred
@@ -100,6 +104,13 @@ class DirectoryHandler(BaseHandler):
 
         user_id = requester.user.to_string()
 
+        if len(room_alias.to_string()) > MAX_ALIAS_LENGTH:
+            raise SynapseError(
+                400,
+                "Can't create aliases longer than %s characters" % MAX_ALIAS_LENGTH,
+                Codes.INVALID_PARAM,
+            )
+
         service = requester.app_service
         if service:
             if not service.is_interested_in_alias(room_alias.to_string()):
@@ -108,6 +119,14 @@ class DirectoryHandler(BaseHandler):
                     " this kind of alias.", errcode=Codes.EXCLUSIVE
                 )
         else:
+            if self.require_membership and check_membership:
+                rooms_for_user = yield self.store.get_rooms_for_user(user_id)
+                if room_id not in rooms_for_user:
+                    raise AuthError(
+                        403,
+                        "You must be in the room to create an alias for it",
+                    )
+
             if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
                 raise AuthError(
                     403, "This user is not permitted to create this alias",
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 1b4d8c74ae..eb525070cf 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -21,7 +21,6 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, SynapseError
 from synapse.events import EventBase
-from synapse.events.utils import serialize_event
 from synapse.types import UserID
 from synapse.util.logutils import log_function
 from synapse.visibility import filter_events_for_client
@@ -50,6 +49,7 @@ class EventStreamHandler(BaseHandler):
         self.notifier = hs.get_notifier()
         self.state = hs.get_state_handler()
         self._server_notices_sender = hs.get_server_notices_sender()
+        self._event_serializer = hs.get_event_client_serializer()
 
     @defer.inlineCallbacks
     @log_function
@@ -120,9 +120,12 @@ class EventStreamHandler(BaseHandler):
 
             time_now = self.clock.time_msec()
 
-            chunks = [
-                serialize_event(e, time_now, as_client_event) for e in events
-            ]
+            chunks = yield self._event_serializer.serialize_events(
+                events, time_now, as_client_event=as_client_event,
+                # We don't bundle "live" events, as otherwise clients
+                # will end up double counting annotations.
+                bundle_aggregations=False,
+            )
 
             chunk = {
                 "chunk": chunks,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0684778882..cf4fad7de0 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1916,6 +1916,11 @@ class FederationHandler(BaseHandler):
                     event.room_id, latest_event_ids=extrem_ids,
                 )
 
+            logger.debug(
+                "Doing soft-fail check for %s: state %s",
+                event.event_id, current_state_ids,
+            )
+
             # Now check if event pass auth against said current state
             auth_types = auth_types_for_event(event)
             current_state_ids = [
@@ -1932,7 +1937,7 @@ class FederationHandler(BaseHandler):
                 self.auth.check(room_version, event, auth_events=current_auth_events)
             except AuthError as e:
                 logger.warn(
-                    "Failed current state auth resolution for %r because %s",
+                    "Soft-failing %r because %s",
                     event, e,
                 )
                 event.internal_metadata.soft_failed = True
@@ -2008,15 +2013,44 @@ class FederationHandler(BaseHandler):
 
         Args:
             origin (str):
-            event (synapse.events.FrozenEvent):
+            event (synapse.events.EventBase):
             context (synapse.events.snapshot.EventContext):
-            auth_events (dict[(str, str)->str]):
+            auth_events (dict[(str, str)->synapse.events.EventBase]):
+                Map from (event_type, state_key) to event
+
+                What we expect the event's auth_events to be, based on the event's
+                position in the dag. I think? maybe??
+
+                Also NB that this function adds entries to it.
+        Returns:
+            defer.Deferred[None]
+        """
+        room_version = yield self.store.get_room_version(event.room_id)
+
+        yield self._update_auth_events_and_context_for_auth(
+            origin, event, context, auth_events
+        )
+        try:
+            self.auth.check(room_version, event, auth_events=auth_events)
+        except AuthError as e:
+            logger.warn("Failed auth resolution for %r because %s", event, e)
+            raise e
+
+    @defer.inlineCallbacks
+    def _update_auth_events_and_context_for_auth(
+        self, origin, event, context, auth_events
+    ):
+        """Helper for do_auth. See there for docs.
+
+        Args:
+            origin (str):
+            event (synapse.events.EventBase):
+            context (synapse.events.snapshot.EventContext):
+            auth_events (dict[(str, str)->synapse.events.EventBase]):
 
         Returns:
             defer.Deferred[None]
         """
-        # Check if we have all the auth events.
-        current_state = set(e.event_id for e in auth_events.values())
         event_auth_events = set(event.auth_event_ids())
 
         if event.is_state():
@@ -2024,11 +2058,21 @@ class FederationHandler(BaseHandler):
         else:
             event_key = None
 
-        if event_auth_events - current_state:
+        # if the event's auth_events refers to events which are not in our
+        # calculated auth_events, we need to fetch those events from somewhere.
+        #
+        # we start by fetching them from the store, and then try calling /event_auth/.
+        missing_auth = event_auth_events.difference(
+            e.event_id for e in auth_events.values()
+        )
+
+        if missing_auth:
             # TODO: can we use store.have_seen_events here instead?
             have_events = yield self.store.get_seen_events_with_rejections(
-                event_auth_events - current_state
+                missing_auth
             )
+            logger.debug("Got events %s from store", have_events)
+            missing_auth.difference_update(have_events.keys())
         else:
             have_events = {}
 
@@ -2037,13 +2081,12 @@ class FederationHandler(BaseHandler):
             for e in auth_events.values()
         })
 
-        seen_events = set(have_events.keys())
-
-        missing_auth = event_auth_events - seen_events - current_state
-
         if missing_auth:
-            logger.info("Missing auth: %s", missing_auth)
             # If we don't have all the auth events, we need to get them.
+            logger.info(
+                "auth_events contains unknown events: %s",
+                missing_auth,
+            )
             try:
                 remote_auth_chain = yield self.federation_client.get_event_auth(
                     origin, event.room_id, event.event_id
@@ -2084,145 +2127,168 @@ class FederationHandler(BaseHandler):
                 have_events = yield self.store.get_seen_events_with_rejections(
                     event.auth_event_ids()
                 )
-                seen_events = set(have_events.keys())
             except Exception:
                 # FIXME:
                 logger.exception("Failed to get auth chain")
 
+        if event.internal_metadata.is_outlier():
+            logger.info("Skipping auth_event fetch for outlier")
+            return
+
         # FIXME: Assumes we have and stored all the state for all the
         # prev_events
-        current_state = set(e.event_id for e in auth_events.values())
-        different_auth = event_auth_events - current_state
+        different_auth = event_auth_events.difference(
+            e.event_id for e in auth_events.values()
+        )
 
-        room_version = yield self.store.get_room_version(event.room_id)
+        if not different_auth:
+            return
 
-        if different_auth and not event.internal_metadata.is_outlier():
-            # Do auth conflict res.
-            logger.info("Different auth: %s", different_auth)
-
-            different_events = yield logcontext.make_deferred_yieldable(
-                defer.gatherResults([
-                    logcontext.run_in_background(
-                        self.store.get_event,
-                        d,
-                        allow_none=True,
-                        allow_rejected=False,
-                    )
-                    for d in different_auth
-                    if d in have_events and not have_events[d]
-                ], consumeErrors=True)
-            ).addErrback(unwrapFirstError)
-
-            if different_events:
-                local_view = dict(auth_events)
-                remote_view = dict(auth_events)
-                remote_view.update({
-                    (d.type, d.state_key): d for d in different_events if d
-                })
+        logger.info(
+            "auth_events refers to events which are not in our calculated auth "
+            "chain: %s",
+            different_auth,
+        )
 
-                new_state = yield self.state_handler.resolve_events(
-                    room_version,
-                    [list(local_view.values()), list(remote_view.values())],
-                    event
+        room_version = yield self.store.get_room_version(event.room_id)
+
+        different_events = yield logcontext.make_deferred_yieldable(
+            defer.gatherResults([
+                logcontext.run_in_background(
+                    self.store.get_event,
+                    d,
+                    allow_none=True,
+                    allow_rejected=False,
                 )
+                for d in different_auth
+                if d in have_events and not have_events[d]
+            ], consumeErrors=True)
+        ).addErrback(unwrapFirstError)
+
+        if different_events:
+            local_view = dict(auth_events)
+            remote_view = dict(auth_events)
+            remote_view.update({
+                (d.type, d.state_key): d for d in different_events if d
+            })
 
-                auth_events.update(new_state)
+            new_state = yield self.state_handler.resolve_events(
+                room_version,
+                [list(local_view.values()), list(remote_view.values())],
+                event
+            )
 
-                current_state = set(e.event_id for e in auth_events.values())
-                different_auth = event_auth_events - current_state
+            logger.info(
+                "After state res: updating auth_events with new state %s",
+                {
+                    (d.type, d.state_key): d.event_id for d in new_state.values()
+                    if auth_events.get((d.type, d.state_key)) != d
+                },
+            )
 
-                yield self._update_context_for_auth_events(
-                    event, context, auth_events, event_key,
-                )
+            auth_events.update(new_state)
 
-        if different_auth and not event.internal_metadata.is_outlier():
-            logger.info("Different auth after resolution: %s", different_auth)
+            different_auth = event_auth_events.difference(
+                e.event_id for e in auth_events.values()
+            )
 
-            # Only do auth resolution if we have something new to say.
-            # We can't rove an auth failure.
-            do_resolution = False
+            yield self._update_context_for_auth_events(
+                event, context, auth_events, event_key,
+            )
 
-            provable = [
-                RejectedReason.NOT_ANCESTOR, RejectedReason.NOT_ANCESTOR,
-            ]
+        if not different_auth:
+            # we're done
+            return
 
-            for e_id in different_auth:
-                if e_id in have_events:
-                    if have_events[e_id] in provable:
-                        do_resolution = True
-                        break
+        logger.info(
+            "auth_events still refers to events which are not in the calculated auth "
+            "chain after state resolution: %s",
+            different_auth,
+        )
 
-            if do_resolution:
-                prev_state_ids = yield context.get_prev_state_ids(self.store)
-                # 1. Get what we think is the auth chain.
-                auth_ids = yield self.auth.compute_auth_events(
-                    event, prev_state_ids
-                )
-                local_auth_chain = yield self.store.get_auth_chain(
-                    auth_ids, include_given=True
-                )
+        # Only do auth resolution if we have something new to say.
+        # We can't prove an auth failure.
+        do_resolution = False
 
-                try:
-                    # 2. Get remote difference.
-                    result = yield self.federation_client.query_auth(
-                        origin,
-                        event.room_id,
-                        event.event_id,
-                        local_auth_chain,
-                    )
+        for e_id in different_auth:
+            if e_id in have_events:
+                if have_events[e_id] == RejectedReason.NOT_ANCESTOR:
+                    do_resolution = True
+                    break
 
-                    seen_remotes = yield self.store.have_seen_events(
-                        [e.event_id for e in result["auth_chain"]]
-                    )
+        if not do_resolution:
+            logger.info(
+                "Skipping auth resolution due to lack of provable rejection reasons"
+            )
+            return
 
-                    # 3. Process any remote auth chain events we haven't seen.
-                    for ev in result["auth_chain"]:
-                        if ev.event_id in seen_remotes:
-                            continue
+        logger.info("Doing auth resolution")
 
-                        if ev.event_id == event.event_id:
-                            continue
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
 
-                        try:
-                            auth_ids = ev.auth_event_ids()
-                            auth = {
-                                (e.type, e.state_key): e
-                                for e in result["auth_chain"]
-                                if e.event_id in auth_ids
-                                or event.type == EventTypes.Create
-                            }
-                            ev.internal_metadata.outlier = True
+        # 1. Get what we think is the auth chain.
+        auth_ids = yield self.auth.compute_auth_events(
+            event, prev_state_ids
+        )
+        local_auth_chain = yield self.store.get_auth_chain(
+            auth_ids, include_given=True
+        )
 
-                            logger.debug(
-                                "do_auth %s different_auth: %s",
-                                event.event_id, e.event_id
-                            )
+        try:
+            # 2. Get remote difference.
+            result = yield self.federation_client.query_auth(
+                origin,
+                event.room_id,
+                event.event_id,
+                local_auth_chain,
+            )
 
-                            yield self._handle_new_event(
-                                origin, ev, auth_events=auth
-                            )
+            seen_remotes = yield self.store.have_seen_events(
+                [e.event_id for e in result["auth_chain"]]
+            )
 
-                            if ev.event_id in event_auth_events:
-                                auth_events[(ev.type, ev.state_key)] = ev
-                        except AuthError:
-                            pass
+            # 3. Process any remote auth chain events we haven't seen.
+            for ev in result["auth_chain"]:
+                if ev.event_id in seen_remotes:
+                    continue
 
-                except Exception:
-                    # FIXME:
-                    logger.exception("Failed to query auth chain")
+                if ev.event_id == event.event_id:
+                    continue
 
-                # 4. Look at rejects and their proofs.
-                # TODO.
+                try:
+                    auth_ids = ev.auth_event_ids()
+                    auth = {
+                        (e.type, e.state_key): e
+                        for e in result["auth_chain"]
+                        if e.event_id in auth_ids
+                        or event.type == EventTypes.Create
+                    }
+                    ev.internal_metadata.outlier = True
+
+                    logger.debug(
+                        "do_auth %s different_auth: %s",
+                        event.event_id, e.event_id
+                    )
 
-                yield self._update_context_for_auth_events(
-                    event, context, auth_events, event_key,
-                )
+                    yield self._handle_new_event(
+                        origin, ev, auth_events=auth
+                    )
 
-        try:
-            self.auth.check(room_version, event, auth_events=auth_events)
-        except AuthError as e:
-            logger.warn("Failed auth resolution for %r because %s", event, e)
-            raise e
+                    if ev.event_id in event_auth_events:
+                        auth_events[(ev.type, ev.state_key)] = ev
+                except AuthError:
+                    pass
+
+        except Exception:
+            # FIXME:
+            logger.exception("Failed to query auth chain")
+
+        # 4. Look at rejects and their proofs.
+        # TODO.
+
+        yield self._update_context_for_auth_events(
+            event, context, auth_events, event_key,
+        )
 
     @defer.inlineCallbacks
     def _update_context_for_auth_events(self, event, context, auth_events,
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 7dfae78db0..aaee5db0b7 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -19,7 +19,6 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.handlers.presence import format_user_presence_state
 from synapse.streams.config import PaginationConfig
@@ -43,6 +42,7 @@ class InitialSyncHandler(BaseHandler):
         self.clock = hs.get_clock()
         self.validator = EventValidator()
         self.snapshot_cache = SnapshotCache()
+        self._event_serializer = hs.get_event_client_serializer()
 
     def snapshot_all_rooms(self, user_id=None, pagin_config=None,
                            as_client_event=True, include_archived=False):
@@ -138,7 +138,9 @@ class InitialSyncHandler(BaseHandler):
                 d["inviter"] = event.sender
 
                 invite_event = yield self.store.get_event(event.event_id)
-                d["invite"] = serialize_event(invite_event, time_now, as_client_event)
+                d["invite"] = yield self._event_serializer.serialize_event(
+                    invite_event, time_now, as_client_event,
+                )
 
             rooms_ret.append(d)
 
@@ -185,18 +187,21 @@ class InitialSyncHandler(BaseHandler):
                 time_now = self.clock.time_msec()
 
                 d["messages"] = {
-                    "chunk": [
-                        serialize_event(m, time_now, as_client_event)
-                        for m in messages
-                    ],
+                    "chunk": (
+                        yield self._event_serializer.serialize_events(
+                            messages, time_now=time_now,
+                            as_client_event=as_client_event,
+                        )
+                    ),
                     "start": start_token.to_string(),
                     "end": end_token.to_string(),
                 }
 
-                d["state"] = [
-                    serialize_event(c, time_now, as_client_event)
-                    for c in current_state.values()
-                ]
+                d["state"] = yield self._event_serializer.serialize_events(
+                    current_state.values(),
+                    time_now=time_now,
+                    as_client_event=as_client_event
+                )
 
                 account_data_events = []
                 tags = tags_by_room.get(event.room_id)
@@ -337,11 +342,15 @@ class InitialSyncHandler(BaseHandler):
             "membership": membership,
             "room_id": room_id,
             "messages": {
-                "chunk": [serialize_event(m, time_now) for m in messages],
+                "chunk": (yield self._event_serializer.serialize_events(
+                    messages, time_now,
+                )),
                 "start": start_token.to_string(),
                 "end": end_token.to_string(),
             },
-            "state": [serialize_event(s, time_now) for s in room_state.values()],
+            "state": (yield self._event_serializer.serialize_events(
+                room_state.values(), time_now,
+            )),
             "presence": [],
             "receipts": [],
         })
@@ -355,10 +364,9 @@ class InitialSyncHandler(BaseHandler):
 
         # TODO: These concurrently
         time_now = self.clock.time_msec()
-        state = [
-            serialize_event(x, time_now)
-            for x in current_state.values()
-        ]
+        state = yield self._event_serializer.serialize_events(
+            current_state.values(), time_now,
+        )
 
         now_token = yield self.hs.get_event_sources().get_current_token()
 
@@ -425,7 +433,9 @@ class InitialSyncHandler(BaseHandler):
         ret = {
             "room_id": room_id,
             "messages": {
-                "chunk": [serialize_event(m, time_now) for m in messages],
+                "chunk": (yield self._event_serializer.serialize_events(
+                    messages, time_now,
+                )),
                 "start": start_token.to_string(),
                 "end": end_token.to_string(),
             },
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 224d34ef3a..0b02469ceb 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,7 +22,7 @@ from canonicaljson import encode_canonical_json, json
 from twisted.internet import defer
 from twisted.internet.defer import succeed
 
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import EventTypes, Membership, RelationTypes
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -32,7 +32,6 @@ from synapse.api.errors import (
 )
 from synapse.api.room_versions import RoomVersions
 from synapse.api.urls import ConsentURIBuilder
-from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.replication.http.send_event import ReplicationSendEventRestServlet
 from synapse.storage.state import StateFilter
@@ -57,6 +56,7 @@ class MessageHandler(object):
         self.clock = hs.get_clock()
         self.state = hs.get_state_handler()
         self.store = hs.get_datastore()
+        self._event_serializer = hs.get_event_client_serializer()
 
     @defer.inlineCallbacks
     def get_room_data(self, user_id=None, room_id=None,
@@ -164,9 +164,13 @@ class MessageHandler(object):
                 room_state = room_state[membership_event_id]
 
         now = self.clock.time_msec()
-        defer.returnValue(
-            [serialize_event(c, now) for c in room_state.values()]
+        events = yield self._event_serializer.serialize_events(
+            room_state.values(), now,
+            # We don't bother bundling aggregations in when asked for state
+            # events, as clients won't use them.
+            bundle_aggregations=False,
         )
+        defer.returnValue(events)
 
     @defer.inlineCallbacks
     def get_joined_members(self, requester, room_id):
@@ -228,6 +232,7 @@ class EventCreationHandler(object):
         self.ratelimiter = hs.get_ratelimiter()
         self.notifier = hs.get_notifier()
         self.config = hs.config
+        self.require_membership_for_aliases = hs.config.require_membership_for_aliases
 
         self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
 
@@ -336,6 +341,35 @@ class EventCreationHandler(object):
             prev_events_and_hashes=prev_events_and_hashes,
         )
 
+        # In an ideal world we wouldn't need the second part of this condition. However,
+        # this behaviour isn't spec'd yet, meaning we should be able to deactivate this
+        # behaviour. Another reason is that this code is also evaluated each time a new
+        # m.room.aliases event is created, which includes hitting a /directory route.
+        # Therefore not including this condition here would render the similar one in
+        # synapse.handlers.directory pointless.
+        if builder.type == EventTypes.Aliases and self.require_membership_for_aliases:
+            # Ideally we'd do the membership check in event_auth.check(), which
+            # describes a spec'd algorithm for authenticating events received over
+            # federation as well as those created locally. As of room v3, aliases events
+            # can be created by users that are not in the room, therefore we have to
+            # tolerate them in event_auth.check().
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
+            prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
+            prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+            if not prev_event or prev_event.membership != Membership.JOIN:
+                logger.warning(
+                    ("Attempt to send `m.room.aliases` in room %s by user %s but"
+                     " membership is %s"),
+                    event.room_id,
+                    event.sender,
+                    prev_event.membership if prev_event else None,
+                )
+
+                raise AuthError(
+                    403,
+                    "You must be in the room to create an alias for it",
+                )
+
         self.validator.validate_new(event)
 
         defer.returnValue((event, context))
@@ -570,6 +604,20 @@ class EventCreationHandler(object):
 
         self.validator.validate_new(event)
 
+        # If this event is an annotation then we check that that the sender
+        # can't annotate the same way twice (e.g. stops users from liking an
+        # event multiple times).
+        relation = event.content.get("m.relates_to", {})
+        if relation.get("rel_type") == RelationTypes.ANNOTATION:
+            relates_to = relation["event_id"]
+            aggregation_key = relation["key"]
+
+            already_exists = yield self.store.has_user_annotated_event(
+                relates_to, event.type, aggregation_key, event.sender,
+            )
+            if already_exists:
+                raise SynapseError(400, "Can't send same reaction twice")
+
         logger.debug(
             "Created event %s",
             event.event_id,
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index e4fdae9266..8f811e24fe 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -20,7 +20,6 @@ from twisted.python.failure import Failure
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
-from synapse.events.utils import serialize_event
 from synapse.storage.state import StateFilter
 from synapse.types import RoomStreamToken
 from synapse.util.async_helpers import ReadWriteLock
@@ -78,6 +77,7 @@ class PaginationHandler(object):
         self._purges_in_progress_by_room = set()
         # map from purge id to PurgeStatus
         self._purges_by_id = {}
+        self._event_serializer = hs.get_event_client_serializer()
 
     def start_purge_history(self, room_id, token,
                             delete_local_events=False):
@@ -278,18 +278,22 @@ class PaginationHandler(object):
         time_now = self.clock.time_msec()
 
         chunk = {
-            "chunk": [
-                serialize_event(e, time_now, as_client_event)
-                for e in events
-            ],
+            "chunk": (
+                yield self._event_serializer.serialize_events(
+                    events, time_now,
+                    as_client_event=as_client_event,
+                )
+            ),
             "start": pagin_config.from_token.to_string(),
             "end": next_token.to_string(),
         }
 
         if state:
-            chunk["state"] = [
-                serialize_event(e, time_now, as_client_event)
-                for e in state
-            ]
+            chunk["state"] = (
+                yield self._event_serializer.serialize_events(
+                    state, time_now,
+                    as_client_event=as_client_event,
+                )
+            )
 
         defer.returnValue(chunk)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index bd1285b15c..6209858bbb 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -182,17 +182,27 @@ class PresenceHandler(object):
         # Start a LoopingCall in 30s that fires every 5s.
         # The initial delay is to allow disconnected clients a chance to
         # reconnect before we treat them as offline.
+        def run_timeout_handler():
+            return run_as_background_process(
+                "handle_presence_timeouts", self._handle_timeouts
+            )
+
         self.clock.call_later(
             30,
             self.clock.looping_call,
-            self._handle_timeouts,
+            run_timeout_handler,
             5000,
         )
 
+        def run_persister():
+            return run_as_background_process(
+                "persist_presence_changes", self._persist_unpersisted_changes
+            )
+
         self.clock.call_later(
             60,
             self.clock.looping_call,
-            self._persist_unpersisted_changes,
+            run_persister,
             60 * 1000,
         )
 
@@ -229,6 +239,7 @@ class PresenceHandler(object):
         )
 
         if self.unpersisted_users_changes:
+
             yield self.store.update_presence([
                 self.user_to_current_state[user_id]
                 for user_id in self.unpersisted_users_changes
@@ -240,30 +251,18 @@ class PresenceHandler(object):
         """We periodically persist the unpersisted changes, as otherwise they
         may stack up and slow down shutdown times.
         """
-        logger.info(
-            "Performing _persist_unpersisted_changes. Persisting %d unpersisted changes",
-            len(self.unpersisted_users_changes)
-        )
-
         unpersisted = self.unpersisted_users_changes
         self.unpersisted_users_changes = set()
 
         if unpersisted:
+            logger.info(
+                "Persisting %d upersisted presence updates", len(unpersisted)
+            )
             yield self.store.update_presence([
                 self.user_to_current_state[user_id]
                 for user_id in unpersisted
             ])
 
-        logger.info("Finished _persist_unpersisted_changes")
-
-    @defer.inlineCallbacks
-    def _update_states_and_catch_exception(self, new_states):
-        try:
-            res = yield self._update_states(new_states)
-            defer.returnValue(res)
-        except Exception:
-            logger.exception("Error updating presence")
-
     @defer.inlineCallbacks
     def _update_states(self, new_states):
         """Updates presence of users. Sets the appropriate timeouts. Pokes
@@ -338,45 +337,41 @@ class PresenceHandler(object):
         logger.info("Handling presence timeouts")
         now = self.clock.time_msec()
 
-        try:
-            with Measure(self.clock, "presence_handle_timeouts"):
-                # Fetch the list of users that *may* have timed out. Things may have
-                # changed since the timeout was set, so we won't necessarily have to
-                # take any action.
-                users_to_check = set(self.wheel_timer.fetch(now))
-
-                # Check whether the lists of syncing processes from an external
-                # process have expired.
-                expired_process_ids = [
-                    process_id for process_id, last_update
-                    in self.external_process_last_updated_ms.items()
-                    if now - last_update > EXTERNAL_PROCESS_EXPIRY
-                ]
-                for process_id in expired_process_ids:
-                    users_to_check.update(
-                        self.external_process_last_updated_ms.pop(process_id, ())
-                    )
-                    self.external_process_last_update.pop(process_id)
+        # Fetch the list of users that *may* have timed out. Things may have
+        # changed since the timeout was set, so we won't necessarily have to
+        # take any action.
+        users_to_check = set(self.wheel_timer.fetch(now))
+
+        # Check whether the lists of syncing processes from an external
+        # process have expired.
+        expired_process_ids = [
+            process_id for process_id, last_update
+            in self.external_process_last_updated_ms.items()
+            if now - last_update > EXTERNAL_PROCESS_EXPIRY
+        ]
+        for process_id in expired_process_ids:
+            users_to_check.update(
+                self.external_process_last_updated_ms.pop(process_id, ())
+            )
+            self.external_process_last_update.pop(process_id)
 
-                states = [
-                    self.user_to_current_state.get(
-                        user_id, UserPresenceState.default(user_id)
-                    )
-                    for user_id in users_to_check
-                ]
+        states = [
+            self.user_to_current_state.get(
+                user_id, UserPresenceState.default(user_id)
+            )
+            for user_id in users_to_check
+        ]
 
-                timers_fired_counter.inc(len(states))
+        timers_fired_counter.inc(len(states))
 
-                changes = handle_timeouts(
-                    states,
-                    is_mine_fn=self.is_mine_id,
-                    syncing_user_ids=self.get_currently_syncing_users(),
-                    now=now,
-                )
+        changes = handle_timeouts(
+            states,
+            is_mine_fn=self.is_mine_id,
+            syncing_user_ids=self.get_currently_syncing_users(),
+            now=now,
+        )
 
-            run_in_background(self._update_states_and_catch_exception, changes)
-        except Exception:
-            logger.exception("Exception in _handle_timeouts loop")
+        return self._update_states(changes)
 
     @defer.inlineCallbacks
     def bump_presence_active_time(self, user):
@@ -828,6 +823,11 @@ class PresenceHandler(object):
             if typ != EventTypes.Member:
                 continue
 
+            if event_id is None:
+                # state has been deleted, so this is not a join. We only care about
+                # joins.
+                continue
+
             event = yield self.store.get_event(event_id)
             if event.content.get("membership") != Membership.JOIN:
                 # We only care about joins
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index a65c98ff5c..a5fc6c5dbf 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -31,6 +31,9 @@ from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
+MAX_DISPLAYNAME_LEN = 100
+MAX_AVATAR_URL_LEN = 1000
+
 
 class BaseProfileHandler(BaseHandler):
     """Handles fetching and updating user profile information.
@@ -53,6 +56,7 @@ class BaseProfileHandler(BaseHandler):
     @defer.inlineCallbacks
     def get_profile(self, user_id):
         target_user = UserID.from_string(user_id)
+
         if self.hs.is_mine(target_user):
             try:
                 displayname = yield self.store.get_profile_displayname(
@@ -161,6 +165,11 @@ class BaseProfileHandler(BaseHandler):
         if not by_admin and target_user != requester.user:
             raise AuthError(400, "Cannot set another user's displayname")
 
+        if len(new_displayname) > MAX_DISPLAYNAME_LEN:
+            raise SynapseError(
+                400, "Displayname is too long (max %i)" % (MAX_DISPLAYNAME_LEN, ),
+            )
+
         if new_displayname == '':
             new_displayname = None
 
@@ -216,6 +225,11 @@ class BaseProfileHandler(BaseHandler):
         if not by_admin and target_user != requester.user:
             raise AuthError(400, "Cannot set another user's avatar_url")
 
+        if len(new_avatar_url) > MAX_AVATAR_URL_LEN:
+            raise SynapseError(
+                400, "Avatar URL is too long (max %i)" % (MAX_AVATAR_URL_LEN, ),
+            )
+
         yield self.store.set_profile_avatar_url(
             target_user.localpart, new_avatar_url
         )
@@ -283,6 +297,48 @@ class BaseProfileHandler(BaseHandler):
                     room_id, str(e)
                 )
 
+    @defer.inlineCallbacks
+    def check_profile_query_allowed(self, target_user, requester=None):
+        """Checks whether a profile query is allowed. If the
+        'require_auth_for_profile_requests' config flag is set to True and a
+        'requester' is provided, the query is only allowed if the two users
+        share a room.
+
+        Args:
+            target_user (UserID): The owner of the queried profile.
+            requester (None|UserID): The user querying for the profile.
+
+        Raises:
+            SynapseError(403): The two users share no room, or ne user couldn't
+                be found to be in any room the server is in, and therefore the query
+                is denied.
+        """
+        # Implementation of MSC1301: don't allow looking up profiles if the
+        # requester isn't in the same room as the target. We expect requester to
+        # be None when this function is called outside of a profile query, e.g.
+        # when building a membership event. In this case, we must allow the
+        # lookup.
+        if not self.hs.config.require_auth_for_profile_requests or not requester:
+            return
+
+        try:
+            requester_rooms = yield self.store.get_rooms_for_user(
+                requester.to_string()
+            )
+            target_user_rooms = yield self.store.get_rooms_for_user(
+                target_user.to_string(),
+            )
+
+            # Check if the room lists have no elements in common.
+            if requester_rooms.isdisjoint(target_user_rooms):
+                raise SynapseError(403, "Profile isn't available", Codes.FORBIDDEN)
+        except StoreError as e:
+            if e.code == 404:
+                # This likely means that one of the users doesn't exist,
+                # so we act as if we couldn't find the profile.
+                raise SynapseError(403, "Profile isn't available", Codes.FORBIDDEN)
+            raise
+
 
 class MasterProfileHandler(BaseProfileHandler):
     PROFILE_UPDATE_MS = 60 * 1000
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index a51d11a257..9a388ea013 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -19,7 +19,7 @@ import logging
 from twisted.internet import defer
 
 from synapse import types
-from synapse.api.constants import LoginType
+from synapse.api.constants import MAX_USERID_LENGTH, LoginType
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -123,6 +123,15 @@ class RegistrationHandler(BaseHandler):
 
         self.check_user_id_not_appservice_exclusive(user_id)
 
+        if len(user_id) > MAX_USERID_LENGTH:
+            raise SynapseError(
+                400,
+                "User ID may not be longer than %s characters" % (
+                    MAX_USERID_LENGTH,
+                ),
+                Codes.INVALID_USERNAME
+            )
+
         users = yield self.store.get_users_by_id_case_insensitive(user_id)
         if users:
             if not guest_access_token:
@@ -522,6 +531,8 @@ class RegistrationHandler(BaseHandler):
             A tuple of (user_id, access_token).
         Raises:
             RegistrationError if there was a problem registering.
+
+        NB this is only used in tests. TODO: move it to the test package!
         """
         if localpart is None:
             raise SynapseError(400, "Request must include user id")
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 17628e2684..4a17911a87 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -27,7 +27,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
 from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
-from synapse.api.room_versions import DEFAULT_ROOM_VERSION, KNOWN_ROOM_VERSIONS
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.storage.state import StateFilter
 from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
 from synapse.util import stringutils
@@ -70,6 +70,7 @@ class RoomCreationHandler(BaseHandler):
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
         self.room_member_handler = hs.get_room_member_handler()
+        self.config = hs.config
 
         # linearizer to stop two upgrades happening at once
         self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
@@ -402,7 +403,7 @@ class RoomCreationHandler(BaseHandler):
                 yield directory_handler.create_association(
                     requester, RoomAlias.from_string(alias),
                     new_room_id, servers=(self.hs.hostname, ),
-                    send_event=False,
+                    send_event=False, check_membership=False,
                 )
                 logger.info("Moved alias %s to new room", alias)
             except SynapseError as e:
@@ -475,7 +476,11 @@ class RoomCreationHandler(BaseHandler):
         if ratelimit:
             yield self.ratelimit(requester)
 
-        room_version = config.get("room_version", DEFAULT_ROOM_VERSION.identifier)
+        room_version = config.get(
+            "room_version",
+            self.config.default_room_version.identifier,
+        )
+
         if not isinstance(room_version, string_types):
             raise SynapseError(
                 400,
@@ -538,6 +543,7 @@ class RoomCreationHandler(BaseHandler):
                 room_alias=room_alias,
                 servers=[self.hs.hostname],
                 send_event=False,
+                check_membership=False,
             )
 
         preset_config = config.get(
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 024d6db27a..93ac986c86 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
 # Copyright 2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -33,6 +34,8 @@ from synapse.types import RoomID, UserID
 from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room, user_left_room
 
+from ._base import BaseHandler
+
 logger = logging.getLogger(__name__)
 
 id_server_scheme = "https://"
@@ -71,6 +74,12 @@ class RoomMemberHandler(object):
         self.spam_checker = hs.get_spam_checker()
         self._server_notices_mxid = self.config.server_notices_mxid
         self._enable_lookup = hs.config.enable_3pid_lookup
+        self.allow_per_room_profiles = self.config.allow_per_room_profiles
+
+        # This is only used to get at ratelimit function, and
+        # maybe_kick_guest_users. It's fine there are multiple of these as
+        # it doesn't store state.
+        self.base_handler = BaseHandler(hs)
 
     @abc.abstractmethod
     def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
@@ -350,6 +359,13 @@ class RoomMemberHandler(object):
             # later on.
             content = dict(content)
 
+        if not self.allow_per_room_profiles:
+            # Strip profile data, knowing that new profile data will be added to the
+            # event's content in event_creation_handler.create_event() using the target's
+            # global profile.
+            content.pop("displayname", None)
+            content.pop("avatar_url", None)
+
         effective_membership_state = action
         if action in ["kick", "unban"]:
             effective_membership_state = "leave"
@@ -703,6 +719,10 @@ class RoomMemberHandler(object):
                     Codes.FORBIDDEN,
                 )
 
+        # We need to rate limit *before* we send out any 3PID invites, so we
+        # can't just rely on the standard ratelimiting of events.
+        yield self.base_handler.ratelimit(requester)
+
         invitee = yield self._lookup_3pid(
             id_server, medium, address
         )
@@ -924,7 +944,7 @@ class RoomMemberHandler(object):
         }
 
         if self.config.invite_3pid_guest:
-            guest_access_token, guest_user_id = yield self.get_or_register_3pid_guest(
+            guest_user_id, guest_access_token = yield self.get_or_register_3pid_guest(
                 requester=requester,
                 medium=medium,
                 address=address,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 49c439313e..9bba74d6c9 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -23,7 +23,6 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.api.filtering import Filter
-from synapse.events.utils import serialize_event
 from synapse.storage.state import StateFilter
 from synapse.visibility import filter_events_for_client
 
@@ -36,6 +35,7 @@ class SearchHandler(BaseHandler):
 
     def __init__(self, hs):
         super(SearchHandler, self).__init__(hs)
+        self._event_serializer = hs.get_event_client_serializer()
 
     @defer.inlineCallbacks
     def get_old_rooms_from_upgraded_room(self, room_id):
@@ -401,14 +401,16 @@ class SearchHandler(BaseHandler):
         time_now = self.clock.time_msec()
 
         for context in contexts.values():
-            context["events_before"] = [
-                serialize_event(e, time_now)
-                for e in context["events_before"]
-            ]
-            context["events_after"] = [
-                serialize_event(e, time_now)
-                for e in context["events_after"]
-            ]
+            context["events_before"] = (
+                yield self._event_serializer.serialize_events(
+                    context["events_before"], time_now,
+                )
+            )
+            context["events_after"] = (
+                yield self._event_serializer.serialize_events(
+                    context["events_after"], time_now,
+                )
+            )
 
         state_results = {}
         if include_state:
@@ -422,14 +424,13 @@ class SearchHandler(BaseHandler):
         # We're now about to serialize the events. We should not make any
         # blocking calls after this. Otherwise the 'age' will be wrong
 
-        results = [
-            {
+        results = []
+        for e in allowed_events:
+            results.append({
                 "rank": rank_map[e.event_id],
-                "result": serialize_event(e, time_now),
+                "result": (yield self._event_serializer.serialize_event(e, time_now)),
                 "context": contexts.get(e.event_id, {}),
-            }
-            for e in allowed_events
-        ]
+            })
 
         rooms_cat_res = {
             "results": results,
@@ -438,10 +439,13 @@ class SearchHandler(BaseHandler):
         }
 
         if state_results:
-            rooms_cat_res["state"] = {
-                room_id: [serialize_event(e, time_now) for e in state]
-                for room_id, state in state_results.items()
-            }
+            s = {}
+            for room_id, state in state_results.items():
+                s[room_id] = yield self._event_serializer.serialize_events(
+                    state, time_now,
+                )
+
+            rooms_cat_res["state"] = s
 
         if room_groups and "room_id" in group_keys:
             rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
new file mode 100644
index 0000000000..0e92b405ba
--- /dev/null
+++ b/synapse/handlers/stats.py
@@ -0,0 +1,325 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.handlers.state_deltas import StateDeltasHandler
+from synapse.metrics import event_processing_positions
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import UserID
+from synapse.util.metrics import Measure
+
+logger = logging.getLogger(__name__)
+
+
+class StatsHandler(StateDeltasHandler):
+    """Handles keeping the *_stats tables updated with a simple time-series of
+    information about the users, rooms and media on the server, such that admins
+    have some idea of who is consuming their resources.
+
+    Heavily derived from UserDirectoryHandler
+    """
+
+    def __init__(self, hs):
+        super(StatsHandler, self).__init__(hs)
+        self.hs = hs
+        self.store = hs.get_datastore()
+        self.state = hs.get_state_handler()
+        self.server_name = hs.hostname
+        self.clock = hs.get_clock()
+        self.notifier = hs.get_notifier()
+        self.is_mine_id = hs.is_mine_id
+        self.stats_bucket_size = hs.config.stats_bucket_size
+
+        # The current position in the current_state_delta stream
+        self.pos = None
+
+        # Guard to ensure we only process deltas one at a time
+        self._is_processing = False
+
+        if hs.config.stats_enabled:
+            self.notifier.add_replication_callback(self.notify_new_event)
+
+            # We kick this off so that we don't have to wait for a change before
+            # we start populating stats
+            self.clock.call_later(0, self.notify_new_event)
+
+    def notify_new_event(self):
+        """Called when there may be more deltas to process
+        """
+        if not self.hs.config.stats_enabled:
+            return
+
+        if self._is_processing:
+            return
+
+        @defer.inlineCallbacks
+        def process():
+            try:
+                yield self._unsafe_process()
+            finally:
+                self._is_processing = False
+
+        self._is_processing = True
+        run_as_background_process("stats.notify_new_event", process)
+
+    @defer.inlineCallbacks
+    def _unsafe_process(self):
+        # If self.pos is None then means we haven't fetched it from DB
+        if self.pos is None:
+            self.pos = yield self.store.get_stats_stream_pos()
+
+        # If still None then the initial background update hasn't happened yet
+        if self.pos is None:
+            defer.returnValue(None)
+
+        # Loop round handling deltas until we're up to date
+        while True:
+            with Measure(self.clock, "stats_delta"):
+                deltas = yield self.store.get_current_state_deltas(self.pos)
+                if not deltas:
+                    return
+
+                logger.info("Handling %d state deltas", len(deltas))
+                yield self._handle_deltas(deltas)
+
+                self.pos = deltas[-1]["stream_id"]
+                yield self.store.update_stats_stream_pos(self.pos)
+
+                event_processing_positions.labels("stats").set(self.pos)
+
+    @defer.inlineCallbacks
+    def _handle_deltas(self, deltas):
+        """
+        Called with the state deltas to process
+        """
+        for delta in deltas:
+            typ = delta["type"]
+            state_key = delta["state_key"]
+            room_id = delta["room_id"]
+            event_id = delta["event_id"]
+            stream_id = delta["stream_id"]
+            prev_event_id = delta["prev_event_id"]
+
+            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+
+            token = yield self.store.get_earliest_token_for_room_stats(room_id)
+
+            # If the earliest token to begin from is larger than our current
+            # stream ID, skip processing this delta.
+            if token is not None and token >= stream_id:
+                logger.debug(
+                    "Ignoring: %s as earlier than this room's initial ingestion event",
+                    event_id,
+                )
+                continue
+
+            if event_id is None and prev_event_id is None:
+                # Errr...
+                continue
+
+            event_content = {}
+
+            if event_id is not None:
+                event_content = (yield self.store.get_event(event_id)).content or {}
+
+            # quantise time to the nearest bucket
+            now = yield self.store.get_received_ts(event_id)
+            now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
+
+            if typ == EventTypes.Member:
+                # we could use _get_key_change here but it's a bit inefficient
+                # given we're not testing for a specific result; might as well
+                # just grab the prev_membership and membership strings and
+                # compare them.
+                prev_event_content = {}
+                if prev_event_id is not None:
+                    prev_event_content = (
+                        yield self.store.get_event(prev_event_id)
+                    ).content
+
+                membership = event_content.get("membership", Membership.LEAVE)
+                prev_membership = prev_event_content.get("membership", Membership.LEAVE)
+
+                if prev_membership == membership:
+                    continue
+
+                if prev_membership == Membership.JOIN:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "joined_members", -1
+                    )
+                elif prev_membership == Membership.INVITE:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "invited_members", -1
+                    )
+                elif prev_membership == Membership.LEAVE:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "left_members", -1
+                    )
+                elif prev_membership == Membership.BAN:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "banned_members", -1
+                    )
+                else:
+                    err = "%s is not a valid prev_membership" % (repr(prev_membership),)
+                    logger.error(err)
+                    raise ValueError(err)
+
+                if membership == Membership.JOIN:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "joined_members", +1
+                    )
+                elif membership == Membership.INVITE:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "invited_members", +1
+                    )
+                elif membership == Membership.LEAVE:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "left_members", +1
+                    )
+                elif membership == Membership.BAN:
+                    yield self.store.update_stats_delta(
+                        now, "room", room_id, "banned_members", +1
+                    )
+                else:
+                    err = "%s is not a valid membership" % (repr(membership),)
+                    logger.error(err)
+                    raise ValueError(err)
+
+                user_id = state_key
+                if self.is_mine_id(user_id):
+                    # update user_stats as it's one of our users
+                    public = yield self._is_public_room(room_id)
+
+                    if membership == Membership.LEAVE:
+                        yield self.store.update_stats_delta(
+                            now,
+                            "user",
+                            user_id,
+                            "public_rooms" if public else "private_rooms",
+                            -1,
+                        )
+                    elif membership == Membership.JOIN:
+                        yield self.store.update_stats_delta(
+                            now,
+                            "user",
+                            user_id,
+                            "public_rooms" if public else "private_rooms",
+                            +1,
+                        )
+
+            elif typ == EventTypes.Create:
+                # Newly created room. Add it with all blank portions.
+                yield self.store.update_room_state(
+                    room_id,
+                    {
+                        "join_rules": None,
+                        "history_visibility": None,
+                        "encryption": None,
+                        "name": None,
+                        "topic": None,
+                        "avatar": None,
+                        "canonical_alias": None,
+                    },
+                )
+
+            elif typ == EventTypes.JoinRules:
+                yield self.store.update_room_state(
+                    room_id, {"join_rules": event_content.get("join_rule")}
+                )
+
+                is_public = yield self._get_key_change(
+                    prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
+                )
+                if is_public is not None:
+                    yield self.update_public_room_stats(now, room_id, is_public)
+
+            elif typ == EventTypes.RoomHistoryVisibility:
+                yield self.store.update_room_state(
+                    room_id,
+                    {"history_visibility": event_content.get("history_visibility")},
+                )
+
+                is_public = yield self._get_key_change(
+                    prev_event_id, event_id, "history_visibility", "world_readable"
+                )
+                if is_public is not None:
+                    yield self.update_public_room_stats(now, room_id, is_public)
+
+            elif typ == EventTypes.Encryption:
+                yield self.store.update_room_state(
+                    room_id, {"encryption": event_content.get("algorithm")}
+                )
+            elif typ == EventTypes.Name:
+                yield self.store.update_room_state(
+                    room_id, {"name": event_content.get("name")}
+                )
+            elif typ == EventTypes.Topic:
+                yield self.store.update_room_state(
+                    room_id, {"topic": event_content.get("topic")}
+                )
+            elif typ == EventTypes.RoomAvatar:
+                yield self.store.update_room_state(
+                    room_id, {"avatar": event_content.get("url")}
+                )
+            elif typ == EventTypes.CanonicalAlias:
+                yield self.store.update_room_state(
+                    room_id, {"canonical_alias": event_content.get("alias")}
+                )
+
+    @defer.inlineCallbacks
+    def update_public_room_stats(self, ts, room_id, is_public):
+        """
+        Increment/decrement a user's number of public rooms when a room they are
+        in changes to/from public visibility.
+
+        Args:
+            ts (int): Timestamp in seconds
+            room_id (str)
+            is_public (bool)
+        """
+        # For now, blindly iterate over all local users in the room so that
+        # we can handle the whole problem of copying buckets over as needed
+        user_ids = yield self.store.get_users_in_room(room_id)
+
+        for user_id in user_ids:
+            if self.hs.is_mine(UserID.from_string(user_id)):
+                yield self.store.update_stats_delta(
+                    ts, "user", user_id, "public_rooms", +1 if is_public else -1
+                )
+                yield self.store.update_stats_delta(
+                    ts, "user", user_id, "private_rooms", -1 if is_public else +1
+                )
+
+    @defer.inlineCallbacks
+    def _is_public_room(self, room_id):
+        join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
+        history_visibility = yield self.state.get_current_state(
+            room_id, EventTypes.RoomHistoryVisibility
+        )
+
+        if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
+            (
+                history_visibility
+                and history_visibility.content.get("history_visibility")
+                == "world_readable"
+            )
+        ):
+            defer.returnValue(True)
+        else:
+            defer.returnValue(False)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 7cf757f65a..72997d6d04 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -934,7 +934,7 @@ class SyncHandler(object):
         res = yield self._generate_sync_entry_for_rooms(
             sync_result_builder, account_data_by_room
         )
-        newly_joined_rooms, newly_joined_users, _, _ = res
+        newly_joined_rooms, newly_joined_or_invited_users, _, _ = res
         _, _, newly_left_rooms, newly_left_users = res
 
         block_all_presence_data = (
@@ -943,7 +943,7 @@ class SyncHandler(object):
         )
         if self.hs_config.use_presence and not block_all_presence_data:
             yield self._generate_sync_entry_for_presence(
-                sync_result_builder, newly_joined_rooms, newly_joined_users
+                sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users
             )
 
         yield self._generate_sync_entry_for_to_device(sync_result_builder)
@@ -951,7 +951,7 @@ class SyncHandler(object):
         device_lists = yield self._generate_sync_entry_for_device_list(
             sync_result_builder,
             newly_joined_rooms=newly_joined_rooms,
-            newly_joined_users=newly_joined_users,
+            newly_joined_or_invited_users=newly_joined_or_invited_users,
             newly_left_rooms=newly_left_rooms,
             newly_left_users=newly_left_users,
         )
@@ -1036,7 +1036,8 @@ class SyncHandler(object):
     @measure_func("_generate_sync_entry_for_device_list")
     @defer.inlineCallbacks
     def _generate_sync_entry_for_device_list(self, sync_result_builder,
-                                             newly_joined_rooms, newly_joined_users,
+                                             newly_joined_rooms,
+                                             newly_joined_or_invited_users,
                                              newly_left_rooms, newly_left_users):
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
@@ -1050,7 +1051,7 @@ class SyncHandler(object):
             # share a room with?
             for room_id in newly_joined_rooms:
                 joined_users = yield self.state.get_current_users_in_room(room_id)
-                newly_joined_users.update(joined_users)
+                newly_joined_or_invited_users.update(joined_users)
 
             for room_id in newly_left_rooms:
                 left_users = yield self.state.get_current_users_in_room(room_id)
@@ -1058,7 +1059,7 @@ class SyncHandler(object):
 
             # TODO: Check that these users are actually new, i.e. either they
             # weren't in the previous sync *or* they left and rejoined.
-            changed.update(newly_joined_users)
+            changed.update(newly_joined_or_invited_users)
 
             if not changed and not newly_left_users:
                 defer.returnValue(DeviceLists(
@@ -1176,7 +1177,7 @@ class SyncHandler(object):
 
     @defer.inlineCallbacks
     def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms,
-                                          newly_joined_users):
+                                          newly_joined_or_invited_users):
         """Generates the presence portion of the sync response. Populates the
         `sync_result_builder` with the result.
 
@@ -1184,8 +1185,9 @@ class SyncHandler(object):
             sync_result_builder(SyncResultBuilder)
             newly_joined_rooms(list): List of rooms that the user has joined
                 since the last sync (or empty if an initial sync)
-            newly_joined_users(list): List of users that have joined rooms
-                since the last sync (or empty if an initial sync)
+            newly_joined_or_invited_users(list): List of users that have joined
+                or been invited to rooms since the last sync (or empty if an initial
+                sync)
         """
         now_token = sync_result_builder.now_token
         sync_config = sync_result_builder.sync_config
@@ -1211,7 +1213,7 @@ class SyncHandler(object):
             "presence_key", presence_key
         )
 
-        extra_users_ids = set(newly_joined_users)
+        extra_users_ids = set(newly_joined_or_invited_users)
         for room_id in newly_joined_rooms:
             users = yield self.state.get_current_users_in_room(room_id)
             extra_users_ids.update(users)
@@ -1243,7 +1245,8 @@ class SyncHandler(object):
 
         Returns:
             Deferred(tuple): Returns a 4-tuple of
-            `(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)`
+            `(newly_joined_rooms, newly_joined_or_invited_users,
+            newly_left_rooms, newly_left_users)`
         """
         user_id = sync_result_builder.sync_config.user.to_string()
         block_all_room_ephemeral = (
@@ -1314,8 +1317,8 @@ class SyncHandler(object):
 
         sync_result_builder.invited.extend(invited)
 
-        # Now we want to get any newly joined users
-        newly_joined_users = set()
+        # Now we want to get any newly joined or invited users
+        newly_joined_or_invited_users = set()
         newly_left_users = set()
         if since_token:
             for joined_sync in sync_result_builder.joined:
@@ -1324,19 +1327,22 @@ class SyncHandler(object):
                 )
                 for event in it:
                     if event.type == EventTypes.Member:
-                        if event.membership == Membership.JOIN:
-                            newly_joined_users.add(event.state_key)
+                        if (
+                            event.membership == Membership.JOIN or
+                            event.membership == Membership.INVITE
+                        ):
+                            newly_joined_or_invited_users.add(event.state_key)
                         else:
                             prev_content = event.unsigned.get("prev_content", {})
                             prev_membership = prev_content.get("membership", None)
                             if prev_membership == Membership.JOIN:
                                 newly_left_users.add(event.state_key)
 
-        newly_left_users -= newly_joined_users
+        newly_left_users -= newly_joined_or_invited_users
 
         defer.returnValue((
             newly_joined_rooms,
-            newly_joined_users,
+            newly_joined_or_invited_users,
             newly_left_rooms,
             newly_left_users,
         ))
@@ -1381,7 +1387,7 @@ class SyncHandler(object):
             where:
                 room_entries is a list [RoomSyncResultBuilder]
                 invited_rooms is a list [InvitedSyncResult]
-                newly_joined rooms is a list[str] of room ids
+                newly_joined_rooms is a list[str] of room ids
                 newly_left_rooms is a list[str] of room ids
         """
         user_id = sync_result_builder.sync_config.user.to_string()
@@ -1422,7 +1428,7 @@ class SyncHandler(object):
             if room_id in sync_result_builder.joined_room_ids and non_joins:
                 # Always include if the user (re)joined the room, especially
                 # important so that device list changes are calculated correctly.
-                # If there are non join member events, but we are still in the room,
+                # If there are non-join member events, but we are still in the room,
                 # then the user must have left and joined
                 newly_joined_rooms.append(room_id)