summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/app/homeserver.py7
-rwxr-xr-xsynapse/app/synctl.py1
-rw-r--r--synapse/federation/transport/client.py29
-rw-r--r--synapse/federation/transport/server.py38
-rw-r--r--synapse/groups/groups_server.py124
-rw-r--r--synapse/handlers/groups_local.py43
-rw-r--r--synapse/handlers/room_member.py8
-rw-r--r--synapse/http/matrixfederationclient.py6
-rw-r--r--synapse/http/server.py7
-rw-r--r--synapse/rest/client/v1/room.py7
-rw-r--r--synapse/rest/client/v2_alpha/groups.py28
-rw-r--r--synapse/storage/__init__.py92
-rw-r--r--synapse/storage/client_ips.py7
-rw-r--r--synapse/storage/group_server.py24
-rw-r--r--synapse/storage/prepare_database.py3
-rw-r--r--synapse/storage/room.py7
-rw-r--r--synapse/storage/roommember.py6
-rw-r--r--synapse/storage/schema/delta/48/add_user_ips_last_seen_index.sql17
-rw-r--r--synapse/storage/schema/delta/48/groups_joinable.sql22
-rw-r--r--synapse/storage/search.py6
-rw-r--r--synapse/types.py7
-rw-r--r--synapse/util/caches/descriptors.py64
22 files changed, 485 insertions, 68 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index b935beb974..a0e465d644 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -48,6 +48,7 @@ from synapse.server import HomeServer
 from synapse.storage import are_all_users_on_domain
 from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
 from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
+from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
@@ -429,8 +430,14 @@ def run(hs):
         stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
         stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
 
+        r30_results = yield hs.get_datastore().count_r30_users()
+        for name, count in r30_results.iteritems():
+            stats["r30_users_" + name] = count
+
         daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
         stats["daily_sent_messages"] = daily_sent_messages
+        stats["cache_factor"] = CACHE_SIZE_FACTOR
+        stats["event_cache_size"] = hs.config.event_cache_size
 
         if len(stats_process) > 0:
             stats["memory_rss"] = 0
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index b0e1b5e66a..712dfa870e 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -252,6 +252,7 @@ def main():
             for running_pid in running_pids:
                 while pid_running(running_pid):
                     time.sleep(0.2)
+            write("All processes exited; now restarting...")
 
     if action == "start" or action == "restart":
         if start_stop_synapse:
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 5488e82985..50a967a7ec 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -614,6 +615,19 @@ class TransportLayerClient(object):
         )
 
     @log_function
+    def join_group(self, destination, group_id, user_id, content):
+        """Attempts to join a group
+        """
+        path = PREFIX + "/groups/%s/users/%s/join" % (group_id, user_id)
+
+        return self.client.post_json(
+            destination=destination,
+            path=path,
+            data=content,
+            ignore_backoff=True,
+        )
+
+    @log_function
     def invite_to_group(self, destination, group_id, user_id, requester_user_id, content):
         """Invite a user to a group
         """
@@ -857,6 +871,21 @@ class TransportLayerClient(object):
         )
 
     @log_function
+    def set_group_join_policy(self, destination, group_id, requester_user_id,
+                              content):
+        """Sets the join policy for a group
+        """
+        path = PREFIX + "/groups/%s/settings/m.join_policy" % (group_id,)
+
+        return self.client.put_json(
+            destination=destination,
+            path=path,
+            args={"requester_user_id": requester_user_id},
+            data=content,
+            ignore_backoff=True,
+        )
+
+    @log_function
     def delete_group_summary_user(self, destination, group_id, requester_user_id,
                                   user_id, role_id):
         """Delete a users entry in a group
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a66a6b0692..4c94d5a36c 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -802,6 +803,23 @@ class FederationGroupsAcceptInviteServlet(BaseFederationServlet):
         defer.returnValue((200, new_content))
 
 
+class FederationGroupsJoinServlet(BaseFederationServlet):
+    """Attempt to join a group
+    """
+    PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join$"
+
+    @defer.inlineCallbacks
+    def on_POST(self, origin, content, query, group_id, user_id):
+        if get_domain_from_id(user_id) != origin:
+            raise SynapseError(403, "user_id doesn't match origin")
+
+        new_content = yield self.handler.join_group(
+            group_id, user_id, content,
+        )
+
+        defer.returnValue((200, new_content))
+
+
 class FederationGroupsRemoveUserServlet(BaseFederationServlet):
     """Leave or kick a user from the group
     """
@@ -1124,6 +1142,24 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet):
         defer.returnValue((200, resp))
 
 
+class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
+    """Sets whether a group is joinable without an invite or knock
+    """
+    PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy$"
+
+    @defer.inlineCallbacks
+    def on_PUT(self, origin, content, query, group_id):
+        requester_user_id = parse_string_from_args(query, "requester_user_id")
+        if get_domain_from_id(requester_user_id) != origin:
+            raise SynapseError(403, "requester_user_id doesn't match origin")
+
+        new_content = yield self.handler.set_group_join_policy(
+            group_id, requester_user_id, content
+        )
+
+        defer.returnValue((200, new_content))
+
+
 FEDERATION_SERVLET_CLASSES = (
     FederationSendServlet,
     FederationPullServlet,
@@ -1163,6 +1199,7 @@ GROUP_SERVER_SERVLET_CLASSES = (
     FederationGroupsInvitedUsersServlet,
     FederationGroupsInviteServlet,
     FederationGroupsAcceptInviteServlet,
+    FederationGroupsJoinServlet,
     FederationGroupsRemoveUserServlet,
     FederationGroupsSummaryRoomsServlet,
     FederationGroupsCategoriesServlet,
@@ -1172,6 +1209,7 @@ GROUP_SERVER_SERVLET_CLASSES = (
     FederationGroupsSummaryUsersServlet,
     FederationGroupsAddRoomsServlet,
     FederationGroupsAddRoomsConfigServlet,
+    FederationGroupsSettingJoinPolicyServlet,
 )
 
 
diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py
index 0b995aed70..2d95b04e0c 100644
--- a/synapse/groups/groups_server.py
+++ b/synapse/groups/groups_server.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -206,6 +207,28 @@ class GroupsServerHandler(object):
         defer.returnValue({})
 
     @defer.inlineCallbacks
+    def set_group_join_policy(self, group_id, requester_user_id, content):
+        """Sets the group join policy.
+
+        Currently supported policies are:
+         - "invite": an invite must be received and accepted in order to join.
+         - "open": anyone can join.
+        """
+        yield self.check_group_is_ours(
+            group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
+        )
+
+        join_policy = _parse_join_policy_from_contents(content)
+        if join_policy is None:
+            raise SynapseError(
+                400, "No value specified for 'm.join_policy'"
+            )
+
+        yield self.store.set_group_join_policy(group_id, join_policy=join_policy)
+
+        defer.returnValue({})
+
+    @defer.inlineCallbacks
     def get_group_categories(self, group_id, requester_user_id):
         """Get all categories in a group (as seen by user)
         """
@@ -381,9 +404,16 @@ class GroupsServerHandler(object):
 
         yield self.check_group_is_ours(group_id, requester_user_id)
 
-        group_description = yield self.store.get_group(group_id)
+        group = yield self.store.get_group(group_id)
+
+        if group:
+            cols = [
+                "name", "short_description", "long_description",
+                "avatar_url", "is_public",
+            ]
+            group_description = {key: group[key] for key in cols}
+            group_description["is_openly_joinable"] = group["join_policy"] == "open"
 
-        if group_description:
             defer.returnValue(group_description)
         else:
             raise SynapseError(404, "Unknown group")
@@ -655,30 +685,21 @@ class GroupsServerHandler(object):
             raise SynapseError(502, "Unknown state returned by HS")
 
     @defer.inlineCallbacks
-    def accept_invite(self, group_id, requester_user_id, content):
-        """User tries to accept an invite to the group.
+    def _add_user(self, group_id, user_id, content):
+        """Add a user to a group based on a content dict.
 
-        This is different from them asking to join, and so should error if no
-        invite exists (and they're not a member of the group)
+        See accept_invite, join_group.
         """
-
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
-
-        is_invited = yield self.store.is_user_invited_to_local_group(
-            group_id, requester_user_id,
-        )
-        if not is_invited:
-            raise SynapseError(403, "User not invited to group")
-
-        if not self.hs.is_mine_id(requester_user_id):
+        if not self.hs.is_mine_id(user_id):
             local_attestation = self.attestations.create_attestation(
-                group_id, requester_user_id,
+                group_id, user_id,
             )
+
             remote_attestation = content["attestation"]
 
             yield self.attestations.verify_attestation(
                 remote_attestation,
-                user_id=requester_user_id,
+                user_id=user_id,
                 group_id=group_id,
             )
         else:
@@ -688,13 +709,53 @@ class GroupsServerHandler(object):
         is_public = _parse_visibility_from_contents(content)
 
         yield self.store.add_user_to_group(
-            group_id, requester_user_id,
+            group_id, user_id,
             is_admin=False,
             is_public=is_public,
             local_attestation=local_attestation,
             remote_attestation=remote_attestation,
         )
 
+        defer.returnValue(local_attestation)
+
+    @defer.inlineCallbacks
+    def accept_invite(self, group_id, requester_user_id, content):
+        """User tries to accept an invite to the group.
+
+        This is different from them asking to join, and so should error if no
+        invite exists (and they're not a member of the group)
+        """
+
+        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+
+        is_invited = yield self.store.is_user_invited_to_local_group(
+            group_id, requester_user_id,
+        )
+        if not is_invited:
+            raise SynapseError(403, "User not invited to group")
+
+        local_attestation = yield self._add_user(group_id, requester_user_id, content)
+
+        defer.returnValue({
+            "state": "join",
+            "attestation": local_attestation,
+        })
+
+    @defer.inlineCallbacks
+    def join_group(self, group_id, requester_user_id, content):
+        """User tries to join the group.
+
+        This will error if the group requires an invite/knock to join
+        """
+
+        group_info = yield self.check_group_is_ours(
+            group_id, requester_user_id, and_exists=True
+        )
+        if group_info['join_policy'] != "open":
+            raise SynapseError(403, "Group is not publicly joinable")
+
+        local_attestation = yield self._add_user(group_id, requester_user_id, content)
+
         defer.returnValue({
             "state": "join",
             "attestation": local_attestation,
@@ -835,6 +896,31 @@ class GroupsServerHandler(object):
         })
 
 
+def _parse_join_policy_from_contents(content):
+    """Given a content for a request, return the specified join policy or None
+    """
+
+    join_policy_dict = content.get("m.join_policy")
+    if join_policy_dict:
+        return _parse_join_policy_dict(join_policy_dict)
+    else:
+        return None
+
+
+def _parse_join_policy_dict(join_policy_dict):
+    """Given a dict for the "m.join_policy" config return the join policy specified
+    """
+    join_policy_type = join_policy_dict.get("type")
+    if not join_policy_type:
+        return "invite"
+
+    if join_policy_type not in ("invite", "open"):
+        raise SynapseError(
+            400, "Synapse only supports 'invite'/'open' join rule"
+        )
+    return join_policy_type
+
+
 def _parse_visibility_from_contents(content):
     """Given a content for a request parse out whether the entity should be
     public or not
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index e4d0cc8b02..977993e7d4 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -90,6 +91,8 @@ class GroupsLocalHandler(object):
     get_group_role = _create_rerouter("get_group_role")
     get_group_roles = _create_rerouter("get_group_roles")
 
+    set_group_join_policy = _create_rerouter("set_group_join_policy")
+
     @defer.inlineCallbacks
     def get_group_summary(self, group_id, requester_user_id):
         """Get the group summary for a group.
@@ -226,7 +229,45 @@ class GroupsLocalHandler(object):
     def join_group(self, group_id, user_id, content):
         """Request to join a group
         """
-        raise NotImplementedError()  # TODO
+        if self.is_mine_id(group_id):
+            yield self.groups_server_handler.join_group(
+                group_id, user_id, content
+            )
+            local_attestation = None
+            remote_attestation = None
+        else:
+            local_attestation = self.attestations.create_attestation(group_id, user_id)
+            content["attestation"] = local_attestation
+
+            res = yield self.transport_client.join_group(
+                get_domain_from_id(group_id), group_id, user_id, content,
+            )
+
+            remote_attestation = res["attestation"]
+
+            yield self.attestations.verify_attestation(
+                remote_attestation,
+                group_id=group_id,
+                user_id=user_id,
+                server_name=get_domain_from_id(group_id),
+            )
+
+        # TODO: Check that the group is public and we're being added publically
+        is_publicised = content.get("publicise", False)
+
+        token = yield self.store.register_user_group_membership(
+            group_id, user_id,
+            membership="join",
+            is_admin=False,
+            local_attestation=local_attestation,
+            remote_attestation=remote_attestation,
+            is_publicised=is_publicised,
+        )
+        self.notifier.on_new_event(
+            "groups_key", token, users=[user_id],
+        )
+
+        defer.returnValue({})
 
     @defer.inlineCallbacks
     def accept_invite(self, group_id, user_id, content):
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 9977be8831..c45142d38d 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -852,6 +852,14 @@ class RoomMemberMasterHandler(RoomMemberHandler):
     def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
         """Implements RoomMemberHandler._remote_join
         """
+        # filter ourselves out of remote_room_hosts: do_invite_join ignores it
+        # and if it is the only entry we'd like to return a 404 rather than a
+        # 500.
+
+        remote_room_hosts = [
+            host for host in remote_room_hosts if host != self.hs.hostname
+        ]
+
         if len(remote_room_hosts) == 0:
             raise SynapseError(404, "No known servers")
 
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 9145405cb0..60a29081e8 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -286,7 +286,8 @@ class MatrixFederationHttpClient(object):
         headers_dict[b"Authorization"] = auth_headers
 
     @defer.inlineCallbacks
-    def put_json(self, destination, path, data={}, json_data_callback=None,
+    def put_json(self, destination, path, args={}, data={},
+                 json_data_callback=None,
                  long_retries=False, timeout=None,
                  ignore_backoff=False,
                  backoff_on_404=False):
@@ -296,6 +297,7 @@ class MatrixFederationHttpClient(object):
             destination (str): The remote server to send the HTTP request
                 to.
             path (str): The HTTP path.
+            args (dict): query params
             data (dict): A dict containing the data that will be used as
                 the request body. This will be encoded as JSON.
             json_data_callback (callable): A callable returning the dict to
@@ -342,6 +344,7 @@ class MatrixFederationHttpClient(object):
             path,
             body_callback=body_callback,
             headers_dict={"Content-Type": ["application/json"]},
+            query_bytes=encode_query_args(args),
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
@@ -373,6 +376,7 @@ class MatrixFederationHttpClient(object):
                 giving up. None indicates no timeout.
             ignore_backoff (bool): true to ignore the historical backoff data and
                 try the request anyway.
+            args (dict): query params
         Returns:
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body.
diff --git a/synapse/http/server.py b/synapse/http/server.py
index f19c068ef6..64e083ebfc 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -113,6 +113,11 @@ response_db_sched_duration = metrics.register_counter(
     "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
 )
 
+# size in bytes of the response written
+response_size = metrics.register_counter(
+    "response_size", labels=["method", "servlet", "tag"]
+)
+
 _next_request_id = 0
 
 
@@ -426,6 +431,8 @@ class RequestMetrics(object):
             context.db_sched_duration_ms / 1000., request.method, self.name, tag
         )
 
+        response_size.inc_by(request.sentLength, request.method, self.name, tag)
+
 
 class RootRedirect(resource.Resource):
     """Redirects the root '/' path to another path."""
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 70d788deea..d06cbdc35e 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -655,7 +655,12 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
             content=event_content,
         )
 
-        defer.returnValue((200, {}))
+        return_value = {}
+
+        if membership_action == "join":
+            return_value["room_id"] = room_id
+
+        defer.returnValue((200, return_value))
 
     def _has_3pid_invite_keys(self, content):
         for key in {"id_server", "medium", "address"}:
diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py
index f762dbfa9a..3bb1ec2af6 100644
--- a/synapse/rest/client/v2_alpha/groups.py
+++ b/synapse/rest/client/v2_alpha/groups.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -401,6 +402,32 @@ class GroupInvitedUsersServlet(RestServlet):
         defer.returnValue((200, result))
 
 
+class GroupSettingJoinPolicyServlet(RestServlet):
+    """Set group join policy
+    """
+    PATTERNS = client_v2_patterns("/groups/(?P<group_id>[^/]*)/settings/m.join_policy$")
+
+    def __init__(self, hs):
+        super(GroupSettingJoinPolicyServlet, self).__init__()
+        self.auth = hs.get_auth()
+        self.groups_handler = hs.get_groups_local_handler()
+
+    @defer.inlineCallbacks
+    def on_PUT(self, request, group_id):
+        requester = yield self.auth.get_user_by_req(request)
+        requester_user_id = requester.user.to_string()
+
+        content = parse_json_object_from_request(request)
+
+        result = yield self.groups_handler.set_group_join_policy(
+            group_id,
+            requester_user_id,
+            content,
+        )
+
+        defer.returnValue((200, result))
+
+
 class GroupCreateServlet(RestServlet):
     """Create a group
     """
@@ -738,6 +765,7 @@ def register_servlets(hs, http_server):
     GroupInvitedUsersServlet(hs).register(http_server)
     GroupUsersServlet(hs).register(http_server)
     GroupRoomServlet(hs).register(http_server)
+    GroupSettingJoinPolicyServlet(hs).register(http_server)
     GroupCreateServlet(hs).register(http_server)
     GroupAdminRoomsServlet(hs).register(http_server)
     GroupAdminRoomsConfigServlet(hs).register(http_server)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index de00cae447..4800584b59 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -14,8 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
-
 from synapse.storage.devices import DeviceStore
 from .appservice import (
     ApplicationServiceStore, ApplicationServiceTransactionStore
@@ -244,13 +242,12 @@ class DataStore(RoomMemberStore, RoomStore,
 
         return [UserPresenceState(**row) for row in rows]
 
-    @defer.inlineCallbacks
     def count_daily_users(self):
         """
         Counts the number of users who used this homeserver in the last 24 hours.
         """
         def _count_users(txn):
-            yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),
+            yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
 
             sql = """
                 SELECT COALESCE(count(*), 0) FROM (
@@ -264,8 +261,91 @@ class DataStore(RoomMemberStore, RoomStore,
             count, = txn.fetchone()
             return count
 
-        ret = yield self.runInteraction("count_users", _count_users)
-        defer.returnValue(ret)
+        return self.runInteraction("count_users", _count_users)
+
+    def count_r30_users(self):
+        """
+        Counts the number of 30 day retained users, defined as:-
+         * Users who have created their accounts more than 30 days
+         * Where last seen at most 30 days ago
+         * Where account creation and last_seen are > 30 days
+
+         Returns counts globaly for a given user as well as breaking
+         by platform
+        """
+        def _count_r30_users(txn):
+            thirty_days_in_secs = 86400 * 30
+            now = int(self._clock.time_msec())
+            thirty_days_ago_in_secs = now - thirty_days_in_secs
+
+            sql = """
+                SELECT platform, COALESCE(count(*), 0) FROM (
+                     SELECT
+                        users.name, platform, users.creation_ts * 1000,
+                        MAX(uip.last_seen)
+                     FROM users
+                     INNER JOIN (
+                         SELECT
+                         user_id,
+                         last_seen,
+                         CASE
+                             WHEN user_agent LIKE '%Android%' THEN 'android'
+                             WHEN user_agent LIKE '%iOS%' THEN 'ios'
+                             WHEN user_agent LIKE '%Electron%' THEN 'electron'
+                             WHEN user_agent LIKE '%Mozilla%' THEN 'web'
+                             WHEN user_agent LIKE '%Gecko%' THEN 'web'
+                             ELSE 'unknown'
+                         END
+                         AS platform
+                         FROM user_ips
+                     ) uip
+                     ON users.name = uip.user_id
+                     AND users.appservice_id is NULL
+                     AND users.creation_ts < ?
+                     AND uip.last_seen/1000 > ?
+                     AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
+                     GROUP BY users.name, platform, users.creation_ts
+                ) u GROUP BY platform
+            """
+
+            results = {}
+            txn.execute(sql, (thirty_days_ago_in_secs,
+                              thirty_days_ago_in_secs))
+
+            for row in txn:
+                if row[0] is 'unknown':
+                    pass
+                results[row[0]] = row[1]
+
+            sql = """
+                SELECT COALESCE(count(*), 0) FROM (
+                    SELECT users.name, users.creation_ts * 1000,
+                                                        MAX(uip.last_seen)
+                    FROM users
+                    INNER JOIN (
+                        SELECT
+                        user_id,
+                        last_seen
+                        FROM user_ips
+                    ) uip
+                    ON users.name = uip.user_id
+                    AND appservice_id is NULL
+                    AND users.creation_ts < ?
+                    AND uip.last_seen/1000 > ?
+                    AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
+                    GROUP BY users.name, users.creation_ts
+                ) u
+            """
+
+            txn.execute(sql, (thirty_days_ago_in_secs,
+                              thirty_days_ago_in_secs))
+
+            count, = txn.fetchone()
+            results['all'] = count
+
+            return results
+
+        return self.runInteraction("count_r30_users", _count_r30_users)
 
     def get_users(self):
         """Function to reterive a list of users in users table.
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index a03d1d6104..7b44dae0fc 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -48,6 +48,13 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
             columns=["user_id", "device_id", "last_seen"],
         )
 
+        self.register_background_index_update(
+            "user_ips_last_seen_index",
+            index_name="user_ips_last_seen",
+            table="user_ips",
+            columns=["user_id", "last_seen"],
+        )
+
         # (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
         self._batch_row_update = {}
 
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index d03858234b..da05ccb027 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -29,6 +30,24 @@ _DEFAULT_ROLE_ID = ""
 
 
 class GroupServerStore(SQLBaseStore):
+    def set_group_join_policy(self, group_id, join_policy):
+        """Set the join policy of a group.
+
+        join_policy can be one of:
+         * "invite"
+         * "open"
+        """
+        return self._simple_update_one(
+            table="groups",
+            keyvalues={
+                "group_id": group_id,
+            },
+            updatevalues={
+                "join_policy": join_policy,
+            },
+            desc="set_group_join_policy",
+        )
+
     def get_group(self, group_id):
         return self._simple_select_one(
             table="groups",
@@ -36,10 +55,11 @@ class GroupServerStore(SQLBaseStore):
                 "group_id": group_id,
             },
             retcols=(
-                "name", "short_description", "long_description", "avatar_url", "is_public"
+                "name", "short_description", "long_description",
+                "avatar_url", "is_public", "join_policy",
             ),
             allow_none=True,
-            desc="is_user_in_group",
+            desc="get_group",
         )
 
     def get_users_in_group(self, group_id, include_private=False):
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index c845a0cec5..04411a665f 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -25,7 +26,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 47
+SCHEMA_VERSION = 48
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 908551d6d9..740c036975 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -594,7 +594,8 @@ class RoomStore(RoomWorkerStore, SearchStore):
 
         while next_token:
             sql = """
-                SELECT stream_ordering, content FROM events
+                SELECT stream_ordering, json FROM events
+                JOIN event_json USING (event_id)
                 WHERE room_id = ?
                     AND stream_ordering < ?
                     AND contains_url = ? AND outlier = ?
@@ -606,8 +607,8 @@ class RoomStore(RoomWorkerStore, SearchStore):
             next_token = None
             for stream_ordering, content_json in txn:
                 next_token = stream_ordering
-                content = json.loads(content_json)
-
+                event_json = json.loads(content_json)
+                content = event_json["content"]
                 content_url = content.get("url")
                 thumbnail_url = content.get("info", {}).get("thumbnail_url")
 
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index d662d1cfc0..6a861943a2 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -645,8 +645,9 @@ class RoomMemberStore(RoomMemberWorkerStore):
 
         def add_membership_profile_txn(txn):
             sql = ("""
-                SELECT stream_ordering, event_id, events.room_id, content
+                SELECT stream_ordering, event_id, events.room_id, event_json.json
                 FROM events
+                INNER JOIN event_json USING (event_id)
                 INNER JOIN room_memberships USING (event_id)
                 WHERE ? <= stream_ordering AND stream_ordering < ?
                 AND type = 'm.room.member'
@@ -667,7 +668,8 @@ class RoomMemberStore(RoomMemberWorkerStore):
                 event_id = row["event_id"]
                 room_id = row["room_id"]
                 try:
-                    content = json.loads(row["content"])
+                    event_json = json.loads(row["json"])
+                    content = event_json['content']
                 except Exception:
                     continue
 
diff --git a/synapse/storage/schema/delta/48/add_user_ips_last_seen_index.sql b/synapse/storage/schema/delta/48/add_user_ips_last_seen_index.sql
new file mode 100644
index 0000000000..9248b0b24a
--- /dev/null
+++ b/synapse/storage/schema/delta/48/add_user_ips_last_seen_index.sql
@@ -0,0 +1,17 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('user_ips_last_seen_index', '{}');
diff --git a/synapse/storage/schema/delta/48/groups_joinable.sql b/synapse/storage/schema/delta/48/groups_joinable.sql
new file mode 100644
index 0000000000..ce26eaf0c9
--- /dev/null
+++ b/synapse/storage/schema/delta/48/groups_joinable.sql
@@ -0,0 +1,22 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* 
+ * This isn't a real ENUM because sqlite doesn't support it
+ * and we use a default of NULL for inserted rows and interpret
+ * NULL at the python store level as necessary so that existing
+ * rows are given the correct default policy.
+ */
+ALTER TABLE groups ADD COLUMN join_policy TEXT NOT NULL DEFAULT 'invite';
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 984643b057..426cbe6e1a 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -75,8 +75,9 @@ class SearchStore(BackgroundUpdateStore):
 
         def reindex_search_txn(txn):
             sql = (
-                "SELECT stream_ordering, event_id, room_id, type, content, "
+                "SELECT stream_ordering, event_id, room_id, type, json, "
                 " origin_server_ts FROM events"
+                " JOIN event_json USING (event_id)"
                 " WHERE ? <= stream_ordering AND stream_ordering < ?"
                 " AND (%s)"
                 " ORDER BY stream_ordering DESC"
@@ -104,7 +105,8 @@ class SearchStore(BackgroundUpdateStore):
                     stream_ordering = row["stream_ordering"]
                     origin_server_ts = row["origin_server_ts"]
                     try:
-                        content = json.loads(row["content"])
+                        event_json = json.loads(row["json"])
+                        content = event_json["content"]
                     except Exception:
                         continue
 
diff --git a/synapse/types.py b/synapse/types.py
index f1f41ccf90..7cb24cecb2 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -12,11 +12,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import string
 
 from synapse.api.errors import SynapseError
 
 from collections import namedtuple
-import re
 
 
 class Requester(namedtuple("Requester", [
@@ -214,8 +214,7 @@ class GroupID(DomainSpecificString):
         return group_id
 
 
-# A regex that matches any valid mxid characters
-MXID_LOCALPART_REGEX = re.compile("^[_\-./=a-z0-9]*$")
+mxid_localpart_allowed_characters = set("_-./=" + string.ascii_lowercase + string.digits)
 
 
 def contains_invalid_mxid_characters(localpart):
@@ -227,7 +226,7 @@ def contains_invalid_mxid_characters(localpart):
     Returns:
         bool: True if there are any naughty characters
     """
-    return not MXID_LOCALPART_REGEX.match(localpart)
+    return any(c not in mxid_localpart_allowed_characters for c in localpart)
 
 
 class StreamToken(
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index bf3a66eae4..68285a7594 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -39,12 +40,11 @@ _CacheSentinel = object()
 
 class CacheEntry(object):
     __slots__ = [
-        "deferred", "sequence", "callbacks", "invalidated"
+        "deferred", "callbacks", "invalidated"
     ]
 
-    def __init__(self, deferred, sequence, callbacks):
+    def __init__(self, deferred, callbacks):
         self.deferred = deferred
-        self.sequence = sequence
         self.callbacks = set(callbacks)
         self.invalidated = False
 
@@ -62,7 +62,6 @@ class Cache(object):
         "max_entries",
         "name",
         "keylen",
-        "sequence",
         "thread",
         "metrics",
         "_pending_deferred_cache",
@@ -80,7 +79,6 @@ class Cache(object):
 
         self.name = name
         self.keylen = keylen
-        self.sequence = 0
         self.thread = None
         self.metrics = register_cache(name, self.cache)
 
@@ -113,11 +111,10 @@ class Cache(object):
         callbacks = [callback] if callback else []
         val = self._pending_deferred_cache.get(key, _CacheSentinel)
         if val is not _CacheSentinel:
-            if val.sequence == self.sequence:
-                val.callbacks.update(callbacks)
-                if update_metrics:
-                    self.metrics.inc_hits()
-                return val.deferred
+            val.callbacks.update(callbacks)
+            if update_metrics:
+                self.metrics.inc_hits()
+            return val.deferred
 
         val = self.cache.get(key, _CacheSentinel, callbacks=callbacks)
         if val is not _CacheSentinel:
@@ -137,12 +134,9 @@ class Cache(object):
         self.check_thread()
         entry = CacheEntry(
             deferred=value,
-            sequence=self.sequence,
             callbacks=callbacks,
         )
 
-        entry.callbacks.update(callbacks)
-
         existing_entry = self._pending_deferred_cache.pop(key, None)
         if existing_entry:
             existing_entry.invalidate()
@@ -150,13 +144,25 @@ class Cache(object):
         self._pending_deferred_cache[key] = entry
 
         def shuffle(result):
-            if self.sequence == entry.sequence:
-                existing_entry = self._pending_deferred_cache.pop(key, None)
-                if existing_entry is entry:
-                    self.cache.set(key, result, entry.callbacks)
-                else:
-                    entry.invalidate()
+            existing_entry = self._pending_deferred_cache.pop(key, None)
+            if existing_entry is entry:
+                self.cache.set(key, result, entry.callbacks)
             else:
+                # oops, the _pending_deferred_cache has been updated since
+                # we started our query, so we are out of date.
+                #
+                # Better put back whatever we took out. (We do it this way
+                # round, rather than peeking into the _pending_deferred_cache
+                # and then removing on a match, to make the common case faster)
+                if existing_entry is not None:
+                    self._pending_deferred_cache[key] = existing_entry
+
+                # we're not going to put this entry into the cache, so need
+                # to make sure that the invalidation callbacks are called.
+                # That was probably done when _pending_deferred_cache was
+                # updated, but it's possible that `set` was called without
+                # `invalidate` being previously called, in which case it may
+                # not have been. Either way, let's double-check now.
                 entry.invalidate()
             return result
 
@@ -168,25 +174,29 @@ class Cache(object):
 
     def invalidate(self, key):
         self.check_thread()
+        self.cache.pop(key, None)
 
-        # Increment the sequence number so that any SELECT statements that
-        # raced with the INSERT don't update the cache (SYN-369)
-        self.sequence += 1
+        # if we have a pending lookup for this key, remove it from the
+        # _pending_deferred_cache, which will (a) stop it being returned
+        # for future queries and (b) stop it being persisted as a proper entry
+        # in self.cache.
         entry = self._pending_deferred_cache.pop(key, None)
+
+        # run the invalidation callbacks now, rather than waiting for the
+        # deferred to resolve.
         if entry:
             entry.invalidate()
 
-        self.cache.pop(key, None)
-
     def invalidate_many(self, key):
         self.check_thread()
         if not isinstance(key, tuple):
             raise TypeError(
                 "The cache key must be a tuple not %r" % (type(key),)
             )
-        self.sequence += 1
         self.cache.del_multi(key)
 
+        # if we have a pending lookup for this key, remove it from the
+        # _pending_deferred_cache, as above
         entry_dict = self._pending_deferred_cache.pop(key, None)
         if entry_dict is not None:
             for entry in iterate_tree_cache_entry(entry_dict):
@@ -194,8 +204,10 @@ class Cache(object):
 
     def invalidate_all(self):
         self.check_thread()
-        self.sequence += 1
         self.cache.clear()
+        for entry in self._pending_deferred_cache.itervalues():
+            entry.invalidate()
+        self._pending_deferred_cache.clear()
 
 
 class _CacheDescriptorBase(object):