summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/federation/transaction_queue.py56
-rw-r--r--synapse/federation/transport/client.py128
-rw-r--r--synapse/handlers/message.py104
-rw-r--r--synapse/rest/client/v1/room.py7
-rw-r--r--synapse/storage/__init__.py16
6 files changed, 186 insertions, 127 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index a9d5198aba..7f6090baf8 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.27.2"
+__version__ = "0.27.3-rc2"
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index a141ec9953..5b0b798e57 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -169,7 +169,7 @@ class TransactionQueue(object):
             while True:
                 last_token = yield self.store.get_federation_out_pos("events")
                 next_token, events = yield self.store.get_all_new_events_stream(
-                    last_token, self._last_poked_id, limit=20,
+                    last_token, self._last_poked_id, limit=100,
                 )
 
                 logger.debug("Handling %s -> %s", last_token, next_token)
@@ -177,24 +177,33 @@ class TransactionQueue(object):
                 if not events and next_token >= self._last_poked_id:
                     break
 
-                for event in events:
+                @defer.inlineCallbacks
+                def handle_event(event):
                     # Only send events for this server.
                     send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
                     is_mine = self.is_mine_id(event.event_id)
                     if not is_mine and send_on_behalf_of is None:
-                        continue
-
-                    # Get the state from before the event.
-                    # We need to make sure that this is the state from before
-                    # the event and not from after it.
-                    # Otherwise if the last member on a server in a room is
-                    # banned then it won't receive the event because it won't
-                    # be in the room after the ban.
-                    destinations = yield self.state.get_current_hosts_in_room(
-                        event.room_id, latest_event_ids=[
-                            prev_id for prev_id, _ in event.prev_events
-                        ],
-                    )
+                        return
+
+                    try:
+                        # Get the state from before the event.
+                        # We need to make sure that this is the state from before
+                        # the event and not from after it.
+                        # Otherwise if the last member on a server in a room is
+                        # banned then it won't receive the event because it won't
+                        # be in the room after the ban.
+                        destinations = yield self.state.get_current_hosts_in_room(
+                            event.room_id, latest_event_ids=[
+                                prev_id for prev_id, _ in event.prev_events
+                            ],
+                        )
+                    except Exception:
+                        logger.exception(
+                            "Failed to calculate hosts in room for event: %s",
+                            event.event_id,
+                        )
+                        return
+
                     destinations = set(destinations)
 
                     if send_on_behalf_of is not None:
@@ -207,6 +216,23 @@ class TransactionQueue(object):
 
                     self._send_pdu(event, destinations)
 
+                @defer.inlineCallbacks
+                def handle_room_events(events):
+                    for event in events:
+                        yield handle_event(event)
+
+                events_by_room = {}
+                for event in events:
+                    events_by_room.setdefault(event.room_id, []).append(event)
+
+                yield logcontext.make_deferred_yieldable(defer.gatherResults(
+                    [
+                        logcontext.run_in_background(handle_room_events, evs)
+                        for evs in events_by_room.itervalues()
+                    ],
+                    consumeErrors=True
+                ))
+
                 events_processed_counter.inc_by(len(events))
 
                 yield self.store.update_federation_out_pos(
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 50a967a7ec..6db8efa6dd 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -21,6 +21,7 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX
 from synapse.util.logutils import log_function
 
 import logging
+import urllib
 
 
 logger = logging.getLogger(__name__)
@@ -50,7 +51,7 @@ class TransportLayerClient(object):
         logger.debug("get_room_state dest=%s, room=%s",
                      destination, room_id)
 
-        path = PREFIX + "/state/%s/" % room_id
+        path = _create_path(PREFIX, "/state/%s/", room_id)
         return self.client.get_json(
             destination, path=path, args={"event_id": event_id},
         )
@@ -72,7 +73,7 @@ class TransportLayerClient(object):
         logger.debug("get_room_state_ids dest=%s, room=%s",
                      destination, room_id)
 
-        path = PREFIX + "/state_ids/%s/" % room_id
+        path = _create_path(PREFIX, "/state_ids/%s/", room_id)
         return self.client.get_json(
             destination, path=path, args={"event_id": event_id},
         )
@@ -94,7 +95,7 @@ class TransportLayerClient(object):
         logger.debug("get_pdu dest=%s, event_id=%s",
                      destination, event_id)
 
-        path = PREFIX + "/event/%s/" % (event_id, )
+        path = _create_path(PREFIX, "/event/%s/", event_id)
         return self.client.get_json(destination, path=path, timeout=timeout)
 
     @log_function
@@ -120,7 +121,7 @@ class TransportLayerClient(object):
             # TODO: raise?
             return
 
-        path = PREFIX + "/backfill/%s/" % (room_id,)
+        path = _create_path(PREFIX, "/backfill/%s/", room_id)
 
         args = {
             "v": event_tuples,
@@ -158,9 +159,11 @@ class TransportLayerClient(object):
         # generated by the json_data_callback.
         json_data = transaction.get_dict()
 
+        path = _create_path(PREFIX, "/send/%s/", transaction.transaction_id)
+
         response = yield self.client.put_json(
             transaction.destination,
-            path=PREFIX + "/send/%s/" % transaction.transaction_id,
+            path=path,
             data=json_data,
             json_data_callback=json_data_callback,
             long_retries=True,
@@ -178,7 +181,7 @@ class TransportLayerClient(object):
     @log_function
     def make_query(self, destination, query_type, args, retry_on_dns_fail,
                    ignore_backoff=False):
-        path = PREFIX + "/query/%s" % query_type
+        path = _create_path(PREFIX, "/query/%s", query_type)
 
         content = yield self.client.get_json(
             destination=destination,
@@ -223,7 +226,7 @@ class TransportLayerClient(object):
                 "make_membership_event called with membership='%s', must be one of %s" %
                 (membership, ",".join(valid_memberships))
             )
-        path = PREFIX + "/make_%s/%s/%s" % (membership, room_id, user_id)
+        path = _create_path(PREFIX, "/make_%s/%s/%s", membership, room_id, user_id)
 
         ignore_backoff = False
         retry_on_dns_fail = False
@@ -249,7 +252,7 @@ class TransportLayerClient(object):
     @defer.inlineCallbacks
     @log_function
     def send_join(self, destination, room_id, event_id, content):
-        path = PREFIX + "/send_join/%s/%s" % (room_id, event_id)
+        path = _create_path(PREFIX, "/send_join/%s/%s", room_id, event_id)
 
         response = yield self.client.put_json(
             destination=destination,
@@ -262,7 +265,7 @@ class TransportLayerClient(object):
     @defer.inlineCallbacks
     @log_function
     def send_leave(self, destination, room_id, event_id, content):
-        path = PREFIX + "/send_leave/%s/%s" % (room_id, event_id)
+        path = _create_path(PREFIX, "/send_leave/%s/%s", room_id, event_id)
 
         response = yield self.client.put_json(
             destination=destination,
@@ -281,7 +284,7 @@ class TransportLayerClient(object):
     @defer.inlineCallbacks
     @log_function
     def send_invite(self, destination, room_id, event_id, content):
-        path = PREFIX + "/invite/%s/%s" % (room_id, event_id)
+        path = _create_path(PREFIX, "/invite/%s/%s", room_id, event_id)
 
         response = yield self.client.put_json(
             destination=destination,
@@ -323,7 +326,7 @@ class TransportLayerClient(object):
     @defer.inlineCallbacks
     @log_function
     def exchange_third_party_invite(self, destination, room_id, event_dict):
-        path = PREFIX + "/exchange_third_party_invite/%s" % (room_id,)
+        path = _create_path(PREFIX, "/exchange_third_party_invite/%s", room_id,)
 
         response = yield self.client.put_json(
             destination=destination,
@@ -336,7 +339,7 @@ class TransportLayerClient(object):
     @defer.inlineCallbacks
     @log_function
     def get_event_auth(self, destination, room_id, event_id):
-        path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id)
+        path = _create_path(PREFIX, "/event_auth/%s/%s", room_id, event_id)
 
         content = yield self.client.get_json(
             destination=destination,
@@ -348,7 +351,7 @@ class TransportLayerClient(object):
     @defer.inlineCallbacks
     @log_function
     def send_query_auth(self, destination, room_id, event_id, content):
-        path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id)
+        path = _create_path(PREFIX, "/query_auth/%s/%s", room_id, event_id)
 
         content = yield self.client.post_json(
             destination=destination,
@@ -410,7 +413,7 @@ class TransportLayerClient(object):
         Returns:
             A dict containg the device keys.
         """
-        path = PREFIX + "/user/devices/" + user_id
+        path = _create_path(PREFIX, "/user/devices/%s", user_id)
 
         content = yield self.client.get_json(
             destination=destination,
@@ -460,7 +463,7 @@ class TransportLayerClient(object):
     @log_function
     def get_missing_events(self, destination, room_id, earliest_events,
                            latest_events, limit, min_depth, timeout):
-        path = PREFIX + "/get_missing_events/%s" % (room_id,)
+        path = _create_path(PREFIX, "/get_missing_events/%s", room_id,)
 
         content = yield self.client.post_json(
             destination=destination,
@@ -480,7 +483,7 @@ class TransportLayerClient(object):
     def get_group_profile(self, destination, group_id, requester_user_id):
         """Get a group profile
         """
-        path = PREFIX + "/groups/%s/profile" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/profile", group_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -499,7 +502,7 @@ class TransportLayerClient(object):
             requester_user_id (str)
             content (dict): The new profile of the group
         """
-        path = PREFIX + "/groups/%s/profile" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/profile", group_id,)
 
         return self.client.post_json(
             destination=destination,
@@ -513,7 +516,7 @@ class TransportLayerClient(object):
     def get_group_summary(self, destination, group_id, requester_user_id):
         """Get a group summary
         """
-        path = PREFIX + "/groups/%s/summary" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/summary", group_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -526,7 +529,7 @@ class TransportLayerClient(object):
     def get_rooms_in_group(self, destination, group_id, requester_user_id):
         """Get all rooms in a group
         """
-        path = PREFIX + "/groups/%s/rooms" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/rooms", group_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -539,7 +542,7 @@ class TransportLayerClient(object):
                           content):
         """Add a room to a group
         """
-        path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,)
+        path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,)
 
         return self.client.post_json(
             destination=destination,
@@ -553,7 +556,10 @@ class TransportLayerClient(object):
                              config_key, content):
         """Update room in group
         """
-        path = PREFIX + "/groups/%s/room/%s/config/%s" % (group_id, room_id, config_key,)
+        path = _create_path(
+            PREFIX, "/groups/%s/room/%s/config/%s",
+            group_id, room_id, config_key,
+        )
 
         return self.client.post_json(
             destination=destination,
@@ -566,7 +572,7 @@ class TransportLayerClient(object):
     def remove_room_from_group(self, destination, group_id, requester_user_id, room_id):
         """Remove a room from a group
         """
-        path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,)
+        path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,)
 
         return self.client.delete_json(
             destination=destination,
@@ -579,7 +585,7 @@ class TransportLayerClient(object):
     def get_users_in_group(self, destination, group_id, requester_user_id):
         """Get users in a group
         """
-        path = PREFIX + "/groups/%s/users" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/users", group_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -592,7 +598,7 @@ class TransportLayerClient(object):
     def get_invited_users_in_group(self, destination, group_id, requester_user_id):
         """Get users that have been invited to a group
         """
-        path = PREFIX + "/groups/%s/invited_users" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/invited_users", group_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -605,7 +611,10 @@ class TransportLayerClient(object):
     def accept_group_invite(self, destination, group_id, user_id, content):
         """Accept a group invite
         """
-        path = PREFIX + "/groups/%s/users/%s/accept_invite" % (group_id, user_id)
+        path = _create_path(
+            PREFIX, "/groups/%s/users/%s/accept_invite",
+            group_id, user_id,
+        )
 
         return self.client.post_json(
             destination=destination,
@@ -618,7 +627,7 @@ class TransportLayerClient(object):
     def join_group(self, destination, group_id, user_id, content):
         """Attempts to join a group
         """
-        path = PREFIX + "/groups/%s/users/%s/join" % (group_id, user_id)
+        path = _create_path(PREFIX, "/groups/%s/users/%s/join", group_id, user_id)
 
         return self.client.post_json(
             destination=destination,
@@ -631,7 +640,7 @@ class TransportLayerClient(object):
     def invite_to_group(self, destination, group_id, user_id, requester_user_id, content):
         """Invite a user to a group
         """
-        path = PREFIX + "/groups/%s/users/%s/invite" % (group_id, user_id)
+        path = _create_path(PREFIX, "/groups/%s/users/%s/invite", group_id, user_id)
 
         return self.client.post_json(
             destination=destination,
@@ -647,7 +656,7 @@ class TransportLayerClient(object):
         invited.
         """
 
-        path = PREFIX + "/groups/local/%s/users/%s/invite" % (group_id, user_id)
+        path = _create_path(PREFIX, "/groups/local/%s/users/%s/invite", group_id, user_id)
 
         return self.client.post_json(
             destination=destination,
@@ -661,7 +670,7 @@ class TransportLayerClient(object):
                                user_id, content):
         """Remove a user fron a group
         """
-        path = PREFIX + "/groups/%s/users/%s/remove" % (group_id, user_id)
+        path = _create_path(PREFIX, "/groups/%s/users/%s/remove", group_id, user_id)
 
         return self.client.post_json(
             destination=destination,
@@ -678,7 +687,7 @@ class TransportLayerClient(object):
         kicked from the group.
         """
 
-        path = PREFIX + "/groups/local/%s/users/%s/remove" % (group_id, user_id)
+        path = _create_path(PREFIX, "/groups/local/%s/users/%s/remove", group_id, user_id)
 
         return self.client.post_json(
             destination=destination,
@@ -693,7 +702,7 @@ class TransportLayerClient(object):
         the attestations
         """
 
-        path = PREFIX + "/groups/%s/renew_attestation/%s" % (group_id, user_id)
+        path = _create_path(PREFIX, "/groups/%s/renew_attestation/%s", group_id, user_id)
 
         return self.client.post_json(
             destination=destination,
@@ -708,11 +717,12 @@ class TransportLayerClient(object):
         """Update a room entry in a group summary
         """
         if category_id:
-            path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % (
+            path = _create_path(
+                PREFIX, "/groups/%s/summary/categories/%s/rooms/%s",
                 group_id, category_id, room_id,
             )
         else:
-            path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,)
+            path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,)
 
         return self.client.post_json(
             destination=destination,
@@ -728,11 +738,12 @@ class TransportLayerClient(object):
         """Delete a room entry in a group summary
         """
         if category_id:
-            path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % (
+            path = _create_path(
+                PREFIX + "/groups/%s/summary/categories/%s/rooms/%s",
                 group_id, category_id, room_id,
             )
         else:
-            path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,)
+            path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,)
 
         return self.client.delete_json(
             destination=destination,
@@ -745,7 +756,7 @@ class TransportLayerClient(object):
     def get_group_categories(self, destination, group_id, requester_user_id):
         """Get all categories in a group
         """
-        path = PREFIX + "/groups/%s/categories" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/categories", group_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -758,7 +769,7 @@ class TransportLayerClient(object):
     def get_group_category(self, destination, group_id, requester_user_id, category_id):
         """Get category info in a group
         """
-        path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
+        path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -772,7 +783,7 @@ class TransportLayerClient(object):
                               content):
         """Update a category in a group
         """
-        path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
+        path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
 
         return self.client.post_json(
             destination=destination,
@@ -787,7 +798,7 @@ class TransportLayerClient(object):
                               category_id):
         """Delete a category in a group
         """
-        path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
+        path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
 
         return self.client.delete_json(
             destination=destination,
@@ -800,7 +811,7 @@ class TransportLayerClient(object):
     def get_group_roles(self, destination, group_id, requester_user_id):
         """Get all roles in a group
         """
-        path = PREFIX + "/groups/%s/roles" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/roles", group_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -813,7 +824,7 @@ class TransportLayerClient(object):
     def get_group_role(self, destination, group_id, requester_user_id, role_id):
         """Get a roles info
         """
-        path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
+        path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
 
         return self.client.get_json(
             destination=destination,
@@ -827,7 +838,7 @@ class TransportLayerClient(object):
                           content):
         """Update a role in a group
         """
-        path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
+        path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
 
         return self.client.post_json(
             destination=destination,
@@ -841,7 +852,7 @@ class TransportLayerClient(object):
     def delete_group_role(self, destination, group_id, requester_user_id, role_id):
         """Delete a role in a group
         """
-        path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
+        path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
 
         return self.client.delete_json(
             destination=destination,
@@ -856,11 +867,12 @@ class TransportLayerClient(object):
         """Update a users entry in a group
         """
         if role_id:
-            path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % (
+            path = _create_path(
+                PREFIX, "/groups/%s/summary/roles/%s/users/%s",
                 group_id, role_id, user_id,
             )
         else:
-            path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,)
+            path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,)
 
         return self.client.post_json(
             destination=destination,
@@ -875,7 +887,7 @@ class TransportLayerClient(object):
                               content):
         """Sets the join policy for a group
         """
-        path = PREFIX + "/groups/%s/settings/m.join_policy" % (group_id,)
+        path = _create_path(PREFIX, "/groups/%s/settings/m.join_policy", group_id,)
 
         return self.client.put_json(
             destination=destination,
@@ -891,11 +903,12 @@ class TransportLayerClient(object):
         """Delete a users entry in a group
         """
         if role_id:
-            path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % (
+            path = _create_path(
+                PREFIX, "/groups/%s/summary/roles/%s/users/%s",
                 group_id, role_id, user_id,
             )
         else:
-            path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,)
+            path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,)
 
         return self.client.delete_json(
             destination=destination,
@@ -918,3 +931,22 @@ class TransportLayerClient(object):
             data=content,
             ignore_backoff=True,
         )
+
+
+def _create_path(prefix, path, *args):
+    """Creates a path from the prefix, path template and args. Ensures that
+    all args are url encoded.
+
+    Example:
+
+        _create_path(PREFIX, "/event/%s/", event_id)
+
+    Args:
+        prefix (str)
+        path (str): String template for the path
+        args: ([str]): Args to insert into path. Each arg will be url encoded
+
+    Returns:
+        str
+    """
+    return prefix + path % tuple(urllib.quote(arg, "") for arg in args)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6de6e13b7b..54cd691f91 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -454,40 +454,39 @@ class EventCreationHandler(object):
         """
         builder = self.event_builder_factory.new(event_dict)
 
-        with (yield self.limiter.queue(builder.room_id)):
-            self.validator.validate_new(builder)
-
-            if builder.type == EventTypes.Member:
-                membership = builder.content.get("membership", None)
-                target = UserID.from_string(builder.state_key)
-
-                if membership in {Membership.JOIN, Membership.INVITE}:
-                    # If event doesn't include a display name, add one.
-                    profile = self.profile_handler
-                    content = builder.content
-
-                    try:
-                        if "displayname" not in content:
-                            content["displayname"] = yield profile.get_displayname(target)
-                        if "avatar_url" not in content:
-                            content["avatar_url"] = yield profile.get_avatar_url(target)
-                    except Exception as e:
-                        logger.info(
-                            "Failed to get profile information for %r: %s",
-                            target, e
-                        )
+        self.validator.validate_new(builder)
+
+        if builder.type == EventTypes.Member:
+            membership = builder.content.get("membership", None)
+            target = UserID.from_string(builder.state_key)
+
+            if membership in {Membership.JOIN, Membership.INVITE}:
+                # If event doesn't include a display name, add one.
+                profile = self.profile_handler
+                content = builder.content
+
+                try:
+                    if "displayname" not in content:
+                        content["displayname"] = yield profile.get_displayname(target)
+                    if "avatar_url" not in content:
+                        content["avatar_url"] = yield profile.get_avatar_url(target)
+                except Exception as e:
+                    logger.info(
+                        "Failed to get profile information for %r: %s",
+                        target, e
+                    )
 
-            if token_id is not None:
-                builder.internal_metadata.token_id = token_id
+        if token_id is not None:
+            builder.internal_metadata.token_id = token_id
 
-            if txn_id is not None:
-                builder.internal_metadata.txn_id = txn_id
+        if txn_id is not None:
+            builder.internal_metadata.txn_id = txn_id
 
-            event, context = yield self.create_new_client_event(
-                builder=builder,
-                requester=requester,
-                prev_event_ids=prev_event_ids,
-            )
+        event, context = yield self.create_new_client_event(
+            builder=builder,
+            requester=requester,
+            prev_event_ids=prev_event_ids,
+        )
 
         defer.returnValue((event, context))
 
@@ -557,27 +556,34 @@ class EventCreationHandler(object):
 
         See self.create_event and self.send_nonmember_event.
         """
-        event, context = yield self.create_event(
-            requester,
-            event_dict,
-            token_id=requester.access_token_id,
-            txn_id=txn_id
-        )
 
-        spam_error = self.spam_checker.check_event_for_spam(event)
-        if spam_error:
-            if not isinstance(spam_error, basestring):
-                spam_error = "Spam is not permitted here"
-            raise SynapseError(
-                403, spam_error, Codes.FORBIDDEN
+        # We limit the number of concurrent event sends in a room so that we
+        # don't fork the DAG too much. If we don't limit then we can end up in
+        # a situation where event persistence can't keep up, causing
+        # extremities to pile up, which in turn leads to state resolution
+        # taking longer.
+        with (yield self.limiter.queue(event_dict["room_id"])):
+            event, context = yield self.create_event(
+                requester,
+                event_dict,
+                token_id=requester.access_token_id,
+                txn_id=txn_id
             )
 
-        yield self.send_nonmember_event(
-            requester,
-            event,
-            context,
-            ratelimit=ratelimit,
-        )
+            spam_error = self.spam_checker.check_event_for_spam(event)
+            if spam_error:
+                if not isinstance(spam_error, basestring):
+                    spam_error = "Spam is not permitted here"
+                raise SynapseError(
+                    403, spam_error, Codes.FORBIDDEN
+                )
+
+            yield self.send_nonmember_event(
+                requester,
+                event,
+                context,
+                ratelimit=ratelimit,
+            )
         defer.returnValue(event)
 
     @measure_func("create_new_client_event")
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index d06cbdc35e..2ad0e5943b 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -165,17 +165,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
                 content=content,
             )
         else:
-            event, context = yield self.event_creation_hander.create_event(
+            event = yield self.event_creation_hander.create_and_send_nonmember_event(
                 requester,
                 event_dict,
-                token_id=requester.access_token_id,
                 txn_id=txn_id,
             )
 
-            yield self.event_creation_hander.send_nonmember_event(
-                requester, event, context,
-            )
-
         ret = {}
         if event:
             ret = {"event_id": event.event_id}
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 4800584b59..8cdfd50f90 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -266,16 +266,16 @@ class DataStore(RoomMemberStore, RoomStore,
     def count_r30_users(self):
         """
         Counts the number of 30 day retained users, defined as:-
-         * Users who have created their accounts more than 30 days
+         * Users who have created their accounts more than 30 days ago
          * Where last seen at most 30 days ago
-         * Where account creation and last_seen are > 30 days
+         * Where account creation and last_seen are > 30 days apart
 
          Returns counts globaly for a given user as well as breaking
          by platform
         """
         def _count_r30_users(txn):
             thirty_days_in_secs = 86400 * 30
-            now = int(self._clock.time_msec())
+            now = int(self._clock.time())
             thirty_days_ago_in_secs = now - thirty_days_in_secs
 
             sql = """
@@ -289,11 +289,11 @@ class DataStore(RoomMemberStore, RoomStore,
                          user_id,
                          last_seen,
                          CASE
-                             WHEN user_agent LIKE '%Android%' THEN 'android'
-                             WHEN user_agent LIKE '%iOS%' THEN 'ios'
-                             WHEN user_agent LIKE '%Electron%' THEN 'electron'
-                             WHEN user_agent LIKE '%Mozilla%' THEN 'web'
-                             WHEN user_agent LIKE '%Gecko%' THEN 'web'
+                             WHEN user_agent LIKE '%%Android%%' THEN 'android'
+                             WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
+                             WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
+                             WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
+                             WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
                              ELSE 'unknown'
                          END
                          AS platform