summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-10-23 13:13:53 +0100
committerErik Johnston <erik@matrix.org>2017-10-23 13:13:53 +0100
commitffba97807746f6f0d7a8820c0d980f188dcad08c (patch)
tree36301e98cdc6400afe71b89fc4b0c8beccd0deea /synapse/storage
parentMerge pull request #2540 from 4nd3r/patch-1 (diff)
parentBump version and changelog (diff)
downloadsynapse-ffba97807746f6f0d7a8820c0d980f188dcad08c.tar.xz
Merge branch 'release-v0.24.0' of github.com:matrix-org/synapse v0.24.0
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py18
-rw-r--r--synapse/storage/_base.py51
-rw-r--r--synapse/storage/events.py27
-rw-r--r--synapse/storage/group_server.py1199
-rw-r--r--synapse/storage/media_repository.py71
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/profile.py98
-rw-r--r--synapse/storage/roommember.py40
-rw-r--r--synapse/storage/schema/delta/44/expire_url_cache.sql38
-rw-r--r--synapse/storage/schema/delta/45/group_server.sql167
-rw-r--r--synapse/storage/schema/delta/45/profile_cache.sql28
11 files changed, 1706 insertions, 33 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index b92472df33..594566eb38 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -37,7 +37,7 @@ from .media_repository import MediaRepositoryStore
 from .rejections import RejectionsStore
 from .event_push_actions import EventPushActionsStore
 from .deviceinbox import DeviceInboxStore
-
+from .group_server import GroupServerStore
 from .state import StateStore
 from .signatures import SignatureStore
 from .filtering import FilteringStore
@@ -88,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 DeviceStore,
                 DeviceInboxStore,
                 UserDirectoryStore,
+                GroupServerStore,
                 ):
 
     def __init__(self, db_conn, hs):
@@ -135,6 +136,9 @@ class DataStore(RoomMemberStore, RoomStore,
             db_conn, "pushers", "id",
             extra_tables=[("deleted_pushers", "stream_id")],
         )
+        self._group_updates_id_gen = StreamIdGenerator(
+            db_conn, "local_group_updates", "stream_id",
+        )
 
         if isinstance(self.database_engine, PostgresEngine):
             self._cache_id_gen = StreamIdGenerator(
@@ -235,6 +239,18 @@ class DataStore(RoomMemberStore, RoomStore,
             prefilled_cache=curr_state_delta_prefill,
         )
 
+        _group_updates_prefill, min_group_updates_id = self._get_cache_dict(
+            db_conn, "local_group_updates",
+            entity_column="user_id",
+            stream_column="stream_id",
+            max_value=self._group_updates_id_gen.get_current_token(),
+            limit=1000,
+        )
+        self._group_updates_stream_cache = StreamChangeCache(
+            "_group_updates_stream_cache", min_group_updates_id,
+            prefilled_cache=_group_updates_prefill,
+        )
+
         cur = LoggingTransaction(
             db_conn.cursor(),
             name="_find_stream_orderings_for_times_txn",
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 6f54036d67..5124a833a5 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -743,6 +743,33 @@ class SQLBaseStore(object):
         txn.execute(sql, values)
         return cls.cursor_to_dict(txn)
 
+    def _simple_update(self, table, keyvalues, updatevalues, desc):
+        return self.runInteraction(
+            desc,
+            self._simple_update_txn,
+            table, keyvalues, updatevalues,
+        )
+
+    @staticmethod
+    def _simple_update_txn(txn, table, keyvalues, updatevalues):
+        if keyvalues:
+            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
+        else:
+            where = ""
+
+        update_sql = "UPDATE %s SET %s %s" % (
+            table,
+            ", ".join("%s = ?" % (k,) for k in updatevalues),
+            where,
+        )
+
+        txn.execute(
+            update_sql,
+            updatevalues.values() + keyvalues.values()
+        )
+
+        return txn.rowcount
+
     def _simple_update_one(self, table, keyvalues, updatevalues,
                            desc="_simple_update_one"):
         """Executes an UPDATE query on the named table, setting new values for
@@ -768,27 +795,13 @@ class SQLBaseStore(object):
             table, keyvalues, updatevalues,
         )
 
-    @staticmethod
-    def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
-        if keyvalues:
-            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
-        else:
-            where = ""
-
-        update_sql = "UPDATE %s SET %s %s" % (
-            table,
-            ", ".join("%s = ?" % (k,) for k in updatevalues),
-            where,
-        )
-
-        txn.execute(
-            update_sql,
-            updatevalues.values() + keyvalues.values()
-        )
+    @classmethod
+    def _simple_update_one_txn(cls, txn, table, keyvalues, updatevalues):
+        rowcount = cls._simple_update_txn(txn, table, keyvalues, updatevalues)
 
-        if txn.rowcount == 0:
+        if rowcount == 0:
             raise StoreError(404, "No row found")
-        if txn.rowcount > 1:
+        if rowcount > 1:
             raise StoreError(500, "More than one row matched")
 
     @staticmethod
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 7002b3752e..637640ec2a 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -21,7 +21,7 @@ from synapse.events.utils import prune_event
 
 from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import (
-    preserve_fn, PreserveLoggingContext, preserve_context_over_deferred
+    preserve_fn, PreserveLoggingContext, make_deferred_yieldable
 )
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
@@ -88,13 +88,23 @@ class _EventPeristenceQueue(object):
     def add_to_queue(self, room_id, events_and_contexts, backfilled):
         """Add events to the queue, with the given persist_event options.
 
+        NB: due to the normal usage pattern of this method, it does *not*
+        follow the synapse logcontext rules, and leaves the logcontext in
+        place whether or not the returned deferred is ready.
+
         Args:
             room_id (str):
             events_and_contexts (list[(EventBase, EventContext)]):
             backfilled (bool):
+
+        Returns:
+            defer.Deferred: a deferred which will resolve once the events are
+                persisted. Runs its callbacks *without* a logcontext.
         """
         queue = self._event_persist_queues.setdefault(room_id, deque())
         if queue:
+            # if the last item in the queue has the same `backfilled` setting,
+            # we can just add these new events to that item.
             end_item = queue[-1]
             if end_item.backfilled == backfilled:
                 end_item.events_and_contexts.extend(events_and_contexts)
@@ -113,11 +123,11 @@ class _EventPeristenceQueue(object):
     def handle_queue(self, room_id, per_item_callback):
         """Attempts to handle the queue for a room if not already being handled.
 
-        The given callback will be invoked with for each item in the queue,1
+        The given callback will be invoked with for each item in the queue,
         of type _EventPersistQueueItem. The per_item_callback will continuously
         be called with new items, unless the queue becomnes empty. The return
         value of the function will be given to the deferreds waiting on the item,
-        exceptions will be passed to the deferres as well.
+        exceptions will be passed to the deferreds as well.
 
         This function should therefore be called whenever anything is added
         to the queue.
@@ -233,7 +243,7 @@ class EventsStore(SQLBaseStore):
 
         deferreds = []
         for room_id, evs_ctxs in partitioned.iteritems():
-            d = preserve_fn(self._event_persist_queue.add_to_queue)(
+            d = self._event_persist_queue.add_to_queue(
                 room_id, evs_ctxs,
                 backfilled=backfilled,
             )
@@ -242,7 +252,7 @@ class EventsStore(SQLBaseStore):
         for room_id in partitioned:
             self._maybe_start_persisting(room_id)
 
-        return preserve_context_over_deferred(
+        return make_deferred_yieldable(
             defer.gatherResults(deferreds, consumeErrors=True)
         )
 
@@ -267,7 +277,7 @@ class EventsStore(SQLBaseStore):
 
         self._maybe_start_persisting(event.room_id)
 
-        yield preserve_context_over_deferred(deferred)
+        yield make_deferred_yieldable(deferred)
 
         max_persisted_id = yield self._stream_id_gen.get_current_token()
         defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
@@ -784,6 +794,9 @@ class EventsStore(SQLBaseStore):
                     self._invalidate_cache_and_stream(
                         txn, self.is_host_joined, (room_id, host)
                     )
+                    self._invalidate_cache_and_stream(
+                        txn, self.was_host_joined, (room_id, host)
+                    )
 
                 self._invalidate_cache_and_stream(
                     txn, self.get_users_in_room, (room_id,)
@@ -1523,7 +1536,7 @@ class EventsStore(SQLBaseStore):
         if not allow_rejected:
             rows[:] = [r for r in rows if not r["rejects"]]
 
-        res = yield preserve_context_over_deferred(defer.gatherResults(
+        res = yield make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self._get_event_from_row)(
                     row["internal_metadata"], row["json"], row["redacts"],
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
new file mode 100644
index 0000000000..9e63db5c6c
--- /dev/null
+++ b/synapse/storage/group_server.py
@@ -0,0 +1,1199 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
+
+from ._base import SQLBaseStore
+
+import ujson as json
+
+
+# The category ID for the "default" category. We don't store as null in the
+# database to avoid the fun of null != null
+_DEFAULT_CATEGORY_ID = ""
+_DEFAULT_ROLE_ID = ""
+
+
+class GroupServerStore(SQLBaseStore):
+    def get_group(self, group_id):
+        return self._simple_select_one(
+            table="groups",
+            keyvalues={
+                "group_id": group_id,
+            },
+            retcols=("name", "short_description", "long_description", "avatar_url",),
+            allow_none=True,
+            desc="is_user_in_group",
+        )
+
+    def get_users_in_group(self, group_id, include_private=False):
+        # TODO: Pagination
+
+        keyvalues = {
+            "group_id": group_id,
+        }
+        if not include_private:
+            keyvalues["is_public"] = True
+
+        return self._simple_select_list(
+            table="group_users",
+            keyvalues=keyvalues,
+            retcols=("user_id", "is_public",),
+            desc="get_users_in_group",
+        )
+
+    def get_invited_users_in_group(self, group_id):
+        # TODO: Pagination
+
+        return self._simple_select_onecol(
+            table="group_invites",
+            keyvalues={
+                "group_id": group_id,
+            },
+            retcol="user_id",
+            desc="get_invited_users_in_group",
+        )
+
+    def get_rooms_in_group(self, group_id, include_private=False):
+        # TODO: Pagination
+
+        keyvalues = {
+            "group_id": group_id,
+        }
+        if not include_private:
+            keyvalues["is_public"] = True
+
+        return self._simple_select_list(
+            table="group_rooms",
+            keyvalues=keyvalues,
+            retcols=("room_id", "is_public",),
+            desc="get_rooms_in_group",
+        )
+
+    def get_rooms_for_summary_by_category(self, group_id, include_private=False):
+        """Get the rooms and categories that should be included in a summary request
+
+        Returns ([rooms], [categories])
+        """
+        def _get_rooms_for_summary_txn(txn):
+            keyvalues = {
+                "group_id": group_id,
+            }
+            if not include_private:
+                keyvalues["is_public"] = True
+
+            sql = """
+                SELECT room_id, is_public, category_id, room_order
+                FROM group_summary_rooms
+                WHERE group_id = ?
+            """
+
+            if not include_private:
+                sql += " AND is_public = ?"
+                txn.execute(sql, (group_id, True))
+            else:
+                txn.execute(sql, (group_id,))
+
+            rooms = [
+                {
+                    "room_id": row[0],
+                    "is_public": row[1],
+                    "category_id": row[2] if row[2] != _DEFAULT_CATEGORY_ID else None,
+                    "order": row[3],
+                }
+                for row in txn
+            ]
+
+            sql = """
+                SELECT category_id, is_public, profile, cat_order
+                FROM group_summary_room_categories
+                INNER JOIN group_room_categories USING (group_id, category_id)
+                WHERE group_id = ?
+            """
+
+            if not include_private:
+                sql += " AND is_public = ?"
+                txn.execute(sql, (group_id, True))
+            else:
+                txn.execute(sql, (group_id,))
+
+            categories = {
+                row[0]: {
+                    "is_public": row[1],
+                    "profile": json.loads(row[2]),
+                    "order": row[3],
+                }
+                for row in txn
+            }
+
+            return rooms, categories
+        return self.runInteraction(
+            "get_rooms_for_summary", _get_rooms_for_summary_txn
+        )
+
+    def add_room_to_summary(self, group_id, room_id, category_id, order, is_public):
+        return self.runInteraction(
+            "add_room_to_summary", self._add_room_to_summary_txn,
+            group_id, room_id, category_id, order, is_public,
+        )
+
+    def _add_room_to_summary_txn(self, txn, group_id, room_id, category_id, order,
+                                 is_public):
+        """Add (or update) room's entry in summary.
+
+        Args:
+            group_id (str)
+            room_id (str)
+            category_id (str): If not None then adds the category to the end of
+                the summary if its not already there. [Optional]
+            order (int): If not None inserts the room at that position, e.g.
+                an order of 1 will put the room first. Otherwise, the room gets
+                added to the end.
+        """
+        room_in_group = self._simple_select_one_onecol_txn(
+            txn,
+            table="group_rooms",
+            keyvalues={
+                "group_id": group_id,
+                "room_id": room_id,
+            },
+            retcol="room_id",
+            allow_none=True,
+        )
+        if not room_in_group:
+            raise SynapseError(400, "room not in group")
+
+        if category_id is None:
+            category_id = _DEFAULT_CATEGORY_ID
+        else:
+            cat_exists = self._simple_select_one_onecol_txn(
+                txn,
+                table="group_room_categories",
+                keyvalues={
+                    "group_id": group_id,
+                    "category_id": category_id,
+                },
+                retcol="group_id",
+                allow_none=True,
+            )
+            if not cat_exists:
+                raise SynapseError(400, "Category doesn't exist")
+
+            # TODO: Check category is part of summary already
+            cat_exists = self._simple_select_one_onecol_txn(
+                txn,
+                table="group_summary_room_categories",
+                keyvalues={
+                    "group_id": group_id,
+                    "category_id": category_id,
+                },
+                retcol="group_id",
+                allow_none=True,
+            )
+            if not cat_exists:
+                # If not, add it with an order larger than all others
+                txn.execute("""
+                    INSERT INTO group_summary_room_categories
+                    (group_id, category_id, cat_order)
+                    SELECT ?, ?, COALESCE(MAX(cat_order), 0) + 1
+                    FROM group_summary_room_categories
+                    WHERE group_id = ? AND category_id = ?
+                """, (group_id, category_id, group_id, category_id))
+
+        existing = self._simple_select_one_txn(
+            txn,
+            table="group_summary_rooms",
+            keyvalues={
+                "group_id": group_id,
+                "room_id": room_id,
+                "category_id": category_id,
+            },
+            retcols=("room_order", "is_public",),
+            allow_none=True,
+        )
+
+        if order is not None:
+            # Shuffle other room orders that come after the given order
+            sql = """
+                UPDATE group_summary_rooms SET room_order = room_order + 1
+                WHERE group_id = ? AND category_id = ? AND room_order >= ?
+            """
+            txn.execute(sql, (group_id, category_id, order,))
+        elif not existing:
+            sql = """
+                SELECT COALESCE(MAX(room_order), 0) + 1 FROM group_summary_rooms
+                WHERE group_id = ? AND category_id = ?
+            """
+            txn.execute(sql, (group_id, category_id,))
+            order, = txn.fetchone()
+
+        if existing:
+            to_update = {}
+            if order is not None:
+                to_update["room_order"] = order
+            if is_public is not None:
+                to_update["is_public"] = is_public
+            self._simple_update_txn(
+                txn,
+                table="group_summary_rooms",
+                keyvalues={
+                    "group_id": group_id,
+                    "category_id": category_id,
+                    "room_id": room_id,
+                },
+                values=to_update,
+            )
+        else:
+            if is_public is None:
+                is_public = True
+
+            self._simple_insert_txn(
+                txn,
+                table="group_summary_rooms",
+                values={
+                    "group_id": group_id,
+                    "category_id": category_id,
+                    "room_id": room_id,
+                    "room_order": order,
+                    "is_public": is_public,
+                },
+            )
+
+    def remove_room_from_summary(self, group_id, room_id, category_id):
+        if category_id is None:
+            category_id = _DEFAULT_CATEGORY_ID
+
+        return self._simple_delete(
+            table="group_summary_rooms",
+            keyvalues={
+                "group_id": group_id,
+                "category_id": category_id,
+                "room_id": room_id,
+            },
+            desc="remove_room_from_summary",
+        )
+
+    @defer.inlineCallbacks
+    def get_group_categories(self, group_id):
+        rows = yield self._simple_select_list(
+            table="group_room_categories",
+            keyvalues={
+                "group_id": group_id,
+            },
+            retcols=("category_id", "is_public", "profile"),
+            desc="get_group_categories",
+        )
+
+        defer.returnValue({
+            row["category_id"]: {
+                "is_public": row["is_public"],
+                "profile": json.loads(row["profile"]),
+            }
+            for row in rows
+        })
+
+    @defer.inlineCallbacks
+    def get_group_category(self, group_id, category_id):
+        category = yield self._simple_select_one(
+            table="group_room_categories",
+            keyvalues={
+                "group_id": group_id,
+                "category_id": category_id,
+            },
+            retcols=("is_public", "profile"),
+            desc="get_group_category",
+        )
+
+        category["profile"] = json.loads(category["profile"])
+
+        defer.returnValue(category)
+
+    def upsert_group_category(self, group_id, category_id, profile, is_public):
+        """Add/update room category for group
+        """
+        insertion_values = {}
+        update_values = {"category_id": category_id}  # This cannot be empty
+
+        if profile is None:
+            insertion_values["profile"] = "{}"
+        else:
+            update_values["profile"] = json.dumps(profile)
+
+        if is_public is None:
+            insertion_values["is_public"] = True
+        else:
+            update_values["is_public"] = is_public
+
+        return self._simple_upsert(
+            table="group_room_categories",
+            keyvalues={
+                "group_id": group_id,
+                "category_id": category_id,
+            },
+            values=update_values,
+            insertion_values=insertion_values,
+            desc="upsert_group_category",
+        )
+
+    def remove_group_category(self, group_id, category_id):
+        return self._simple_delete(
+            table="group_room_categories",
+            keyvalues={
+                "group_id": group_id,
+                "category_id": category_id,
+            },
+            desc="remove_group_category",
+        )
+
+    @defer.inlineCallbacks
+    def get_group_roles(self, group_id):
+        rows = yield self._simple_select_list(
+            table="group_roles",
+            keyvalues={
+                "group_id": group_id,
+            },
+            retcols=("role_id", "is_public", "profile"),
+            desc="get_group_roles",
+        )
+
+        defer.returnValue({
+            row["role_id"]: {
+                "is_public": row["is_public"],
+                "profile": json.loads(row["profile"]),
+            }
+            for row in rows
+        })
+
+    @defer.inlineCallbacks
+    def get_group_role(self, group_id, role_id):
+        role = yield self._simple_select_one(
+            table="group_roles",
+            keyvalues={
+                "group_id": group_id,
+                "role_id": role_id,
+            },
+            retcols=("is_public", "profile"),
+            desc="get_group_role",
+        )
+
+        role["profile"] = json.loads(role["profile"])
+
+        defer.returnValue(role)
+
+    def upsert_group_role(self, group_id, role_id, profile, is_public):
+        """Add/remove user role
+        """
+        insertion_values = {}
+        update_values = {"role_id": role_id}  # This cannot be empty
+
+        if profile is None:
+            insertion_values["profile"] = "{}"
+        else:
+            update_values["profile"] = json.dumps(profile)
+
+        if is_public is None:
+            insertion_values["is_public"] = True
+        else:
+            update_values["is_public"] = is_public
+
+        return self._simple_upsert(
+            table="group_roles",
+            keyvalues={
+                "group_id": group_id,
+                "role_id": role_id,
+            },
+            values=update_values,
+            insertion_values=insertion_values,
+            desc="upsert_group_role",
+        )
+
+    def remove_group_role(self, group_id, role_id):
+        return self._simple_delete(
+            table="group_roles",
+            keyvalues={
+                "group_id": group_id,
+                "role_id": role_id,
+            },
+            desc="remove_group_role",
+        )
+
+    def add_user_to_summary(self, group_id, user_id, role_id, order, is_public):
+        return self.runInteraction(
+            "add_user_to_summary", self._add_user_to_summary_txn,
+            group_id, user_id, role_id, order, is_public,
+        )
+
+    def _add_user_to_summary_txn(self, txn, group_id, user_id, role_id, order,
+                                 is_public):
+        """Add (or update) user's entry in summary.
+
+        Args:
+            group_id (str)
+            user_id (str)
+            role_id (str): If not None then adds the role to the end of
+                the summary if its not already there. [Optional]
+            order (int): If not None inserts the user at that position, e.g.
+                an order of 1 will put the user first. Otherwise, the user gets
+                added to the end.
+        """
+        user_in_group = self._simple_select_one_onecol_txn(
+            txn,
+            table="group_users",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            retcol="user_id",
+            allow_none=True,
+        )
+        if not user_in_group:
+            raise SynapseError(400, "user not in group")
+
+        if role_id is None:
+            role_id = _DEFAULT_ROLE_ID
+        else:
+            role_exists = self._simple_select_one_onecol_txn(
+                txn,
+                table="group_roles",
+                keyvalues={
+                    "group_id": group_id,
+                    "role_id": role_id,
+                },
+                retcol="group_id",
+                allow_none=True,
+            )
+            if not role_exists:
+                raise SynapseError(400, "Role doesn't exist")
+
+            # TODO: Check role is part of the summary already
+            role_exists = self._simple_select_one_onecol_txn(
+                txn,
+                table="group_summary_roles",
+                keyvalues={
+                    "group_id": group_id,
+                    "role_id": role_id,
+                },
+                retcol="group_id",
+                allow_none=True,
+            )
+            if not role_exists:
+                # If not, add it with an order larger than all others
+                txn.execute("""
+                    INSERT INTO group_summary_roles
+                    (group_id, role_id, role_order)
+                    SELECT ?, ?, COALESCE(MAX(role_order), 0) + 1
+                    FROM group_summary_roles
+                    WHERE group_id = ? AND role_id = ?
+                """, (group_id, role_id, group_id, role_id))
+
+        existing = self._simple_select_one_txn(
+            txn,
+            table="group_summary_users",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+                "role_id": role_id,
+            },
+            retcols=("user_order", "is_public",),
+            allow_none=True,
+        )
+
+        if order is not None:
+            # Shuffle other users orders that come after the given order
+            sql = """
+                UPDATE group_summary_users SET user_order = user_order + 1
+                WHERE group_id = ? AND role_id = ? AND user_order >= ?
+            """
+            txn.execute(sql, (group_id, role_id, order,))
+        elif not existing:
+            sql = """
+                SELECT COALESCE(MAX(user_order), 0) + 1 FROM group_summary_users
+                WHERE group_id = ? AND role_id = ?
+            """
+            txn.execute(sql, (group_id, role_id,))
+            order, = txn.fetchone()
+
+        if existing:
+            to_update = {}
+            if order is not None:
+                to_update["user_order"] = order
+            if is_public is not None:
+                to_update["is_public"] = is_public
+            self._simple_update_txn(
+                txn,
+                table="group_summary_users",
+                keyvalues={
+                    "group_id": group_id,
+                    "role_id": role_id,
+                    "user_id": user_id,
+                },
+                values=to_update,
+            )
+        else:
+            if is_public is None:
+                is_public = True
+
+            self._simple_insert_txn(
+                txn,
+                table="group_summary_users",
+                values={
+                    "group_id": group_id,
+                    "role_id": role_id,
+                    "user_id": user_id,
+                    "user_order": order,
+                    "is_public": is_public,
+                },
+            )
+
+    def remove_user_from_summary(self, group_id, user_id, role_id):
+        if role_id is None:
+            role_id = _DEFAULT_ROLE_ID
+
+        return self._simple_delete(
+            table="group_summary_users",
+            keyvalues={
+                "group_id": group_id,
+                "role_id": role_id,
+                "user_id": user_id,
+            },
+            desc="remove_user_from_summary",
+        )
+
+    def get_users_for_summary_by_role(self, group_id, include_private=False):
+        """Get the users and roles that should be included in a summary request
+
+        Returns ([users], [roles])
+        """
+        def _get_users_for_summary_txn(txn):
+            keyvalues = {
+                "group_id": group_id,
+            }
+            if not include_private:
+                keyvalues["is_public"] = True
+
+            sql = """
+                SELECT user_id, is_public, role_id, user_order
+                FROM group_summary_users
+                WHERE group_id = ?
+            """
+
+            if not include_private:
+                sql += " AND is_public = ?"
+                txn.execute(sql, (group_id, True))
+            else:
+                txn.execute(sql, (group_id,))
+
+            users = [
+                {
+                    "user_id": row[0],
+                    "is_public": row[1],
+                    "role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None,
+                    "order": row[3],
+                }
+                for row in txn
+            ]
+
+            sql = """
+                SELECT role_id, is_public, profile, role_order
+                FROM group_summary_roles
+                INNER JOIN group_roles USING (group_id, role_id)
+                WHERE group_id = ?
+            """
+
+            if not include_private:
+                sql += " AND is_public = ?"
+                txn.execute(sql, (group_id, True))
+            else:
+                txn.execute(sql, (group_id,))
+
+            roles = {
+                row[0]: {
+                    "is_public": row[1],
+                    "profile": json.loads(row[2]),
+                    "order": row[3],
+                }
+                for row in txn
+            }
+
+            return users, roles
+        return self.runInteraction(
+            "get_users_for_summary_by_role", _get_users_for_summary_txn
+        )
+
+    def is_user_in_group(self, user_id, group_id):
+        return self._simple_select_one_onecol(
+            table="group_users",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            retcol="user_id",
+            allow_none=True,
+            desc="is_user_in_group",
+        ).addCallback(lambda r: bool(r))
+
+    def is_user_admin_in_group(self, group_id, user_id):
+        return self._simple_select_one_onecol(
+            table="group_users",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            retcol="is_admin",
+            allow_none=True,
+            desc="is_user_admin_in_group",
+        )
+
+    def add_group_invite(self, group_id, user_id):
+        """Record that the group server has invited a user
+        """
+        return self._simple_insert(
+            table="group_invites",
+            values={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            desc="add_group_invite",
+        )
+
+    def is_user_invited_to_local_group(self, group_id, user_id):
+        """Has the group server invited a user?
+        """
+        return self._simple_select_one_onecol(
+            table="group_invites",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            retcol="user_id",
+            desc="is_user_invited_to_local_group",
+            allow_none=True,
+        )
+
+    def get_users_membership_info_in_group(self, group_id, user_id):
+        """Get a dict describing the membership of a user in a group.
+
+        Example if joined:
+
+            {
+                "membership": "join",
+                "is_public": True,
+                "is_privileged": False,
+            }
+
+        Returns an empty dict if the user is not join/invite/etc
+        """
+        def _get_users_membership_in_group_txn(txn):
+            row = self._simple_select_one_txn(
+                txn,
+                table="group_users",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+                retcols=("is_admin", "is_public"),
+                allow_none=True,
+            )
+
+            if row:
+                return {
+                    "membership": "join",
+                    "is_public": row["is_public"],
+                    "is_privileged": row["is_admin"],
+                }
+
+            row = self._simple_select_one_onecol_txn(
+                txn,
+                table="group_invites",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+                retcol="user_id",
+                allow_none=True,
+            )
+
+            if row:
+                return {
+                    "membership": "invite",
+                }
+
+            return {}
+
+        return self.runInteraction(
+            "get_users_membership_info_in_group", _get_users_membership_in_group_txn,
+        )
+
+    def add_user_to_group(self, group_id, user_id, is_admin=False, is_public=True,
+                          local_attestation=None, remote_attestation=None):
+        """Add a user to the group server.
+
+        Args:
+            group_id (str)
+            user_id (str)
+            is_admin (bool)
+            is_public (bool)
+            local_attestation (dict): The attestation the GS created to give
+                to the remote server. Optional if the user and group are on the
+                same server
+            remote_attestation (dict): The attestation given to GS by remote
+                server. Optional if the user and group are on the same server
+        """
+        def _add_user_to_group_txn(txn):
+            self._simple_insert_txn(
+                txn,
+                table="group_users",
+                values={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                    "is_admin": is_admin,
+                    "is_public": is_public,
+                },
+            )
+
+            self._simple_delete_txn(
+                txn,
+                table="group_invites",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+            )
+
+            if local_attestation:
+                self._simple_insert_txn(
+                    txn,
+                    table="group_attestations_renewals",
+                    values={
+                        "group_id": group_id,
+                        "user_id": user_id,
+                        "valid_until_ms": local_attestation["valid_until_ms"],
+                    },
+                )
+            if remote_attestation:
+                self._simple_insert_txn(
+                    txn,
+                    table="group_attestations_remote",
+                    values={
+                        "group_id": group_id,
+                        "user_id": user_id,
+                        "valid_until_ms": remote_attestation["valid_until_ms"],
+                        "attestation_json": json.dumps(remote_attestation),
+                    },
+                )
+
+        return self.runInteraction(
+            "add_user_to_group", _add_user_to_group_txn
+        )
+
+    def remove_user_from_group(self, group_id, user_id):
+        def _remove_user_from_group_txn(txn):
+            self._simple_delete_txn(
+                txn,
+                table="group_users",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+            )
+            self._simple_delete_txn(
+                txn,
+                table="group_invites",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+            )
+            self._simple_delete_txn(
+                txn,
+                table="group_attestations_renewals",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+            )
+            self._simple_delete_txn(
+                txn,
+                table="group_attestations_remote",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+            )
+            self._simple_delete_txn(
+                txn,
+                table="group_summary_users",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+            )
+        return self.runInteraction("remove_user_from_group", _remove_user_from_group_txn)
+
+    def add_room_to_group(self, group_id, room_id, is_public):
+        return self._simple_insert(
+            table="group_rooms",
+            values={
+                "group_id": group_id,
+                "room_id": room_id,
+                "is_public": is_public,
+            },
+            desc="add_room_to_group",
+        )
+
+    def remove_room_from_group(self, group_id, room_id):
+        def _remove_room_from_group_txn(txn):
+            self._simple_delete_txn(
+                txn,
+                table="group_rooms",
+                keyvalues={
+                    "group_id": group_id,
+                    "room_id": room_id,
+                },
+            )
+
+            self._simple_delete_txn(
+                txn,
+                table="group_summary_rooms",
+                keyvalues={
+                    "group_id": group_id,
+                    "room_id": room_id,
+                },
+            )
+        return self.runInteraction(
+            "remove_room_from_group", _remove_room_from_group_txn,
+        )
+
+    def get_publicised_groups_for_user(self, user_id):
+        """Get all groups a user is publicising
+        """
+        return self._simple_select_onecol(
+            table="local_group_membership",
+            keyvalues={
+                "user_id": user_id,
+                "membership": "join",
+                "is_publicised": True,
+            },
+            retcol="group_id",
+            desc="get_publicised_groups_for_user",
+        )
+
+    def update_group_publicity(self, group_id, user_id, publicise):
+        """Update whether the user is publicising their membership of the group
+        """
+        return self._simple_update_one(
+            table="local_group_membership",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            updatevalues={
+                "is_publicised": publicise,
+            },
+            desc="update_group_publicity"
+        )
+
+    @defer.inlineCallbacks
+    def register_user_group_membership(self, group_id, user_id, membership,
+                                       is_admin=False, content={},
+                                       local_attestation=None,
+                                       remote_attestation=None,
+                                       is_publicised=False,
+                                       ):
+        """Registers that a local user is a member of a (local or remote) group.
+
+        Args:
+            group_id (str)
+            user_id (str)
+            membership (str)
+            is_admin (bool)
+            content (dict): Content of the membership, e.g. includes the inviter
+                if the user has been invited.
+            local_attestation (dict): If remote group then store the fact that we
+                have given out an attestation, else None.
+            remote_attestation (dict): If remote group then store the remote
+                attestation from the group, else None.
+        """
+        def _register_user_group_membership_txn(txn, next_id):
+            # TODO: Upsert?
+            self._simple_delete_txn(
+                txn,
+                table="local_group_membership",
+                keyvalues={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                },
+            )
+            self._simple_insert_txn(
+                txn,
+                table="local_group_membership",
+                values={
+                    "group_id": group_id,
+                    "user_id": user_id,
+                    "is_admin": is_admin,
+                    "membership": membership,
+                    "is_publicised": is_publicised,
+                    "content": json.dumps(content),
+                },
+            )
+
+            self._simple_insert_txn(
+                txn,
+                table="local_group_updates",
+                values={
+                    "stream_id": next_id,
+                    "group_id": group_id,
+                    "user_id": user_id,
+                    "type": "membership",
+                    "content": json.dumps({"membership": membership, "content": content}),
+                }
+            )
+            self._group_updates_stream_cache.entity_has_changed(user_id, next_id)
+
+            # TODO: Insert profile to ensure it comes down stream if its a join.
+
+            if membership == "join":
+                if local_attestation:
+                    self._simple_insert_txn(
+                        txn,
+                        table="group_attestations_renewals",
+                        values={
+                            "group_id": group_id,
+                            "user_id": user_id,
+                            "valid_until_ms": local_attestation["valid_until_ms"],
+                        }
+                    )
+                if remote_attestation:
+                    self._simple_insert_txn(
+                        txn,
+                        table="group_attestations_remote",
+                        values={
+                            "group_id": group_id,
+                            "user_id": user_id,
+                            "valid_until_ms": remote_attestation["valid_until_ms"],
+                            "attestation_json": json.dumps(remote_attestation),
+                        }
+                    )
+            else:
+                self._simple_delete_txn(
+                    txn,
+                    table="group_attestations_renewals",
+                    keyvalues={
+                        "group_id": group_id,
+                        "user_id": user_id,
+                    },
+                )
+                self._simple_delete_txn(
+                    txn,
+                    table="group_attestations_remote",
+                    keyvalues={
+                        "group_id": group_id,
+                        "user_id": user_id,
+                    },
+                )
+
+            return next_id
+
+        with self._group_updates_id_gen.get_next() as next_id:
+            res = yield self.runInteraction(
+                "register_user_group_membership",
+                _register_user_group_membership_txn, next_id,
+            )
+        defer.returnValue(res)
+
+    @defer.inlineCallbacks
+    def create_group(self, group_id, user_id, name, avatar_url, short_description,
+                     long_description,):
+        yield self._simple_insert(
+            table="groups",
+            values={
+                "group_id": group_id,
+                "name": name,
+                "avatar_url": avatar_url,
+                "short_description": short_description,
+                "long_description": long_description,
+            },
+            desc="create_group",
+        )
+
+    @defer.inlineCallbacks
+    def update_group_profile(self, group_id, profile,):
+        yield self._simple_update_one(
+            table="groups",
+            keyvalues={
+                "group_id": group_id,
+            },
+            updatevalues=profile,
+            desc="update_group_profile",
+        )
+
+    def get_attestations_need_renewals(self, valid_until_ms):
+        """Get all attestations that need to be renewed until givent time
+        """
+        def _get_attestations_need_renewals_txn(txn):
+            sql = """
+                SELECT group_id, user_id FROM group_attestations_renewals
+                WHERE valid_until_ms <= ?
+            """
+            txn.execute(sql, (valid_until_ms,))
+            return self.cursor_to_dict(txn)
+        return self.runInteraction(
+            "get_attestations_need_renewals", _get_attestations_need_renewals_txn
+        )
+
+    def update_attestation_renewal(self, group_id, user_id, attestation):
+        """Update an attestation that we have renewed
+        """
+        return self._simple_update_one(
+            table="group_attestations_renewals",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            updatevalues={
+                "valid_until_ms": attestation["valid_until_ms"],
+            },
+            desc="update_attestation_renewal",
+        )
+
+    def update_remote_attestion(self, group_id, user_id, attestation):
+        """Update an attestation that a remote has renewed
+        """
+        return self._simple_update_one(
+            table="group_attestations_remote",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            updatevalues={
+                "valid_until_ms": attestation["valid_until_ms"],
+                "attestation_json": json.dumps(attestation)
+            },
+            desc="update_remote_attestion",
+        )
+
+    @defer.inlineCallbacks
+    def get_remote_attestation(self, group_id, user_id):
+        """Get the attestation that proves the remote agrees that the user is
+        in the group.
+        """
+        row = yield self._simple_select_one(
+            table="group_attestations_remote",
+            keyvalues={
+                "group_id": group_id,
+                "user_id": user_id,
+            },
+            retcols=("valid_until_ms", "attestation_json"),
+            desc="get_remote_attestation",
+            allow_none=True,
+        )
+
+        now = int(self._clock.time_msec())
+        if row and now < row["valid_until_ms"]:
+            defer.returnValue(json.loads(row["attestation_json"]))
+
+        defer.returnValue(None)
+
+    def get_joined_groups(self, user_id):
+        return self._simple_select_onecol(
+            table="local_group_membership",
+            keyvalues={
+                "user_id": user_id,
+                "membership": "join",
+            },
+            retcol="group_id",
+            desc="get_joined_groups",
+        )
+
+    def get_all_groups_for_user(self, user_id, now_token):
+        def _get_all_groups_for_user_txn(txn):
+            sql = """
+                SELECT group_id, type, membership, u.content
+                FROM local_group_updates AS u
+                INNER JOIN local_group_membership USING (group_id, user_id)
+                WHERE user_id = ? AND membership != 'leave'
+                    AND stream_id <= ?
+            """
+            txn.execute(sql, (user_id, now_token,))
+            return [
+                {
+                    "group_id": row[0],
+                    "type": row[1],
+                    "membership": row[2],
+                    "content": json.loads(row[3]),
+                }
+                for row in txn
+            ]
+        return self.runInteraction(
+            "get_all_groups_for_user", _get_all_groups_for_user_txn,
+        )
+
+    def get_groups_changes_for_user(self, user_id, from_token, to_token):
+        from_token = int(from_token)
+        has_changed = self._group_updates_stream_cache.has_entity_changed(
+            user_id, from_token,
+        )
+        if not has_changed:
+            return []
+
+        def _get_groups_changes_for_user_txn(txn):
+            sql = """
+                SELECT group_id, membership, type, u.content
+                FROM local_group_updates AS u
+                INNER JOIN local_group_membership USING (group_id, user_id)
+                WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
+            """
+            txn.execute(sql, (user_id, from_token, to_token,))
+            return [{
+                "group_id": group_id,
+                "membership": membership,
+                "type": gtype,
+                "content": json.loads(content_json),
+            } for group_id, membership, gtype, content_json in txn]
+        return self.runInteraction(
+            "get_groups_changes_for_user", _get_groups_changes_for_user_txn,
+        )
+
+    def get_all_groups_changes(self, from_token, to_token, limit):
+        from_token = int(from_token)
+        has_changed = self._group_updates_stream_cache.has_any_entity_changed(
+            from_token,
+        )
+        if not has_changed:
+            return []
+
+        def _get_all_groups_changes_txn(txn):
+            sql = """
+                SELECT stream_id, group_id, user_id, type, content
+                FROM local_group_updates
+                WHERE ? < stream_id AND stream_id <= ?
+                LIMIT ?
+            """
+            txn.execute(sql, (from_token, to_token, limit,))
+            return [(
+                stream_id,
+                group_id,
+                user_id,
+                gtype,
+                json.loads(content_json),
+            ) for stream_id, group_id, user_id, gtype, content_json in txn]
+        return self.runInteraction(
+            "get_all_groups_changes", _get_all_groups_changes_txn,
+        )
+
+    def get_group_stream_token(self):
+        return self._group_updates_id_gen.get_current_token()
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 82bb61b811..7110a71279 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -62,7 +62,7 @@ class MediaRepositoryStore(SQLBaseStore):
         def get_url_cache_txn(txn):
             # get the most recently cached result (relative to the given ts)
             sql = (
-                "SELECT response_code, etag, expires, og, media_id, download_ts"
+                "SELECT response_code, etag, expires_ts, og, media_id, download_ts"
                 " FROM local_media_repository_url_cache"
                 " WHERE url = ? AND download_ts <= ?"
                 " ORDER BY download_ts DESC LIMIT 1"
@@ -74,7 +74,7 @@ class MediaRepositoryStore(SQLBaseStore):
                 # ...or if we've requested a timestamp older than the oldest
                 # copy in the cache, return the oldest copy (if any)
                 sql = (
-                    "SELECT response_code, etag, expires, og, media_id, download_ts"
+                    "SELECT response_code, etag, expires_ts, og, media_id, download_ts"
                     " FROM local_media_repository_url_cache"
                     " WHERE url = ? AND download_ts > ?"
                     " ORDER BY download_ts ASC LIMIT 1"
@@ -86,14 +86,14 @@ class MediaRepositoryStore(SQLBaseStore):
                 return None
 
             return dict(zip((
-                'response_code', 'etag', 'expires', 'og', 'media_id', 'download_ts'
+                'response_code', 'etag', 'expires_ts', 'og', 'media_id', 'download_ts'
             ), row))
 
         return self.runInteraction(
             "get_url_cache", get_url_cache_txn
         )
 
-    def store_url_cache(self, url, response_code, etag, expires, og, media_id,
+    def store_url_cache(self, url, response_code, etag, expires_ts, og, media_id,
                         download_ts):
         return self._simple_insert(
             "local_media_repository_url_cache",
@@ -101,7 +101,7 @@ class MediaRepositoryStore(SQLBaseStore):
                 "url": url,
                 "response_code": response_code,
                 "etag": etag,
-                "expires": expires,
+                "expires_ts": expires_ts,
                 "og": og,
                 "media_id": media_id,
                 "download_ts": download_ts,
@@ -238,3 +238,64 @@ class MediaRepositoryStore(SQLBaseStore):
                 },
             )
         return self.runInteraction("delete_remote_media", delete_remote_media_txn)
+
+    def get_expired_url_cache(self, now_ts):
+        sql = (
+            "SELECT media_id FROM local_media_repository_url_cache"
+            " WHERE expires_ts < ?"
+            " ORDER BY expires_ts ASC"
+            " LIMIT 500"
+        )
+
+        def _get_expired_url_cache_txn(txn):
+            txn.execute(sql, (now_ts,))
+            return [row[0] for row in txn]
+
+        return self.runInteraction("get_expired_url_cache", _get_expired_url_cache_txn)
+
+    def delete_url_cache(self, media_ids):
+        sql = (
+            "DELETE FROM local_media_repository_url_cache"
+            " WHERE media_id = ?"
+        )
+
+        def _delete_url_cache_txn(txn):
+            txn.executemany(sql, [(media_id,) for media_id in media_ids])
+
+        return self.runInteraction("delete_url_cache", _delete_url_cache_txn)
+
+    def get_url_cache_media_before(self, before_ts):
+        sql = (
+            "SELECT media_id FROM local_media_repository"
+            " WHERE created_ts < ? AND url_cache IS NOT NULL"
+            " ORDER BY created_ts ASC"
+            " LIMIT 500"
+        )
+
+        def _get_url_cache_media_before_txn(txn):
+            txn.execute(sql, (before_ts,))
+            return [row[0] for row in txn]
+
+        return self.runInteraction(
+            "get_url_cache_media_before", _get_url_cache_media_before_txn,
+        )
+
+    def delete_url_cache_media(self, media_ids):
+        def _delete_url_cache_media_txn(txn):
+            sql = (
+                "DELETE FROM local_media_repository"
+                " WHERE media_id = ?"
+            )
+
+            txn.executemany(sql, [(media_id,) for media_id in media_ids])
+
+            sql = (
+                "DELETE FROM local_media_repository_thumbnails"
+                " WHERE media_id = ?"
+            )
+
+            txn.executemany(sql, [(media_id,) for media_id in media_ids])
+
+        return self.runInteraction(
+            "delete_url_cache_media", _delete_url_cache_media_txn,
+        )
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 72b670b83b..ccaaabcfa0 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 43
+SCHEMA_VERSION = 45
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index 26a40905ae..beea3102fc 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
+
 from ._base import SQLBaseStore
 
 
@@ -55,3 +57,99 @@ class ProfileStore(SQLBaseStore):
             updatevalues={"avatar_url": new_avatar_url},
             desc="set_profile_avatar_url",
         )
+
+    def get_from_remote_profile_cache(self, user_id):
+        return self._simple_select_one(
+            table="remote_profile_cache",
+            keyvalues={"user_id": user_id},
+            retcols=("displayname", "avatar_url",),
+            allow_none=True,
+            desc="get_from_remote_profile_cache",
+        )
+
+    def add_remote_profile_cache(self, user_id, displayname, avatar_url):
+        """Ensure we are caching the remote user's profiles.
+
+        This should only be called when `is_subscribed_remote_profile_for_user`
+        would return true for the user.
+        """
+        return self._simple_upsert(
+            table="remote_profile_cache",
+            keyvalues={"user_id": user_id},
+            values={
+                "displayname": displayname,
+                "avatar_url": avatar_url,
+                "last_check": self._clock.time_msec(),
+            },
+            desc="add_remote_profile_cache",
+        )
+
+    def update_remote_profile_cache(self, user_id, displayname, avatar_url):
+        return self._simple_update(
+            table="remote_profile_cache",
+            keyvalues={"user_id": user_id},
+            values={
+                "displayname": displayname,
+                "avatar_url": avatar_url,
+                "last_check": self._clock.time_msec(),
+            },
+            desc="update_remote_profile_cache",
+        )
+
+    @defer.inlineCallbacks
+    def maybe_delete_remote_profile_cache(self, user_id):
+        """Check if we still care about the remote user's profile, and if we
+        don't then remove their profile from the cache
+        """
+        subscribed = yield self.is_subscribed_remote_profile_for_user(user_id)
+        if not subscribed:
+            yield self._simple_delete(
+                table="remote_profile_cache",
+                keyvalues={"user_id": user_id},
+                desc="delete_remote_profile_cache",
+            )
+
+    def get_remote_profile_cache_entries_that_expire(self, last_checked):
+        """Get all users who haven't been checked since `last_checked`
+        """
+        def _get_remote_profile_cache_entries_that_expire_txn(txn):
+            sql = """
+                SELECT user_id, displayname, avatar_url
+                FROM remote_profile_cache
+                WHERE last_check < ?
+            """
+
+            txn.execute(sql, (last_checked,))
+
+            return self.cursor_to_dict(txn)
+
+        return self.runInteraction(
+            "get_remote_profile_cache_entries_that_expire",
+            _get_remote_profile_cache_entries_that_expire_txn,
+        )
+
+    @defer.inlineCallbacks
+    def is_subscribed_remote_profile_for_user(self, user_id):
+        """Check whether we are interested in a remote user's profile.
+        """
+        res = yield self._simple_select_one_onecol(
+            table="group_users",
+            keyvalues={"user_id": user_id},
+            retcol="user_id",
+            allow_none=True,
+            desc="should_update_remote_profile_cache_for_user",
+        )
+
+        if res:
+            defer.returnValue(True)
+
+        res = yield self._simple_select_one_onecol(
+            table="group_invites",
+            keyvalues={"user_id": user_id},
+            retcol="user_id",
+            allow_none=True,
+            desc="should_update_remote_profile_cache_for_user",
+        )
+
+        if res:
+            defer.returnValue(True)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 457ca288d0..a0fc9a6867 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -533,6 +533,46 @@ class RoomMemberStore(SQLBaseStore):
 
         defer.returnValue(True)
 
+    @cachedInlineCallbacks()
+    def was_host_joined(self, room_id, host):
+        """Check whether the server is or ever was in the room.
+
+        Args:
+            room_id (str)
+            host (str)
+
+        Returns:
+            Deferred: Resolves to True if the host is/was in the room, otherwise
+            False.
+        """
+        if '%' in host or '_' in host:
+            raise Exception("Invalid host name")
+
+        sql = """
+            SELECT user_id FROM room_memberships
+            WHERE room_id = ?
+                AND user_id LIKE ?
+                AND membership = 'join'
+            LIMIT 1
+        """
+
+        # We do need to be careful to ensure that host doesn't have any wild cards
+        # in it, but we checked above for known ones and we'll check below that
+        # the returned user actually has the correct domain.
+        like_clause = "%:" + host
+
+        rows = yield self._execute("was_host_joined", None, sql, room_id, like_clause)
+
+        if not rows:
+            defer.returnValue(False)
+
+        user_id = rows[0][0]
+        if get_domain_from_id(user_id) != host:
+            # This can only happen if the host name has something funky in it
+            raise Exception("Invalid host name")
+
+        defer.returnValue(True)
+
     def get_joined_hosts(self, room_id, state_entry):
         state_group = state_entry.state_group
         if not state_group:
diff --git a/synapse/storage/schema/delta/44/expire_url_cache.sql b/synapse/storage/schema/delta/44/expire_url_cache.sql
new file mode 100644
index 0000000000..e2b775f038
--- /dev/null
+++ b/synapse/storage/schema/delta/44/expire_url_cache.sql
@@ -0,0 +1,38 @@
+/* Copyright 2017 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL;
+
+-- we need to change `expires` to `expires_ts` so that we can index on it. SQLite doesn't support
+-- indices on expressions until 3.9.
+CREATE TABLE local_media_repository_url_cache_new(
+    url TEXT,
+    response_code INTEGER,
+    etag TEXT,
+    expires_ts BIGINT,
+    og TEXT,
+    media_id TEXT,
+    download_ts BIGINT
+);
+
+INSERT INTO local_media_repository_url_cache_new
+    SELECT url, response_code, etag, expires + download_ts, og, media_id, download_ts FROM local_media_repository_url_cache;
+
+DROP TABLE local_media_repository_url_cache;
+ALTER TABLE local_media_repository_url_cache_new RENAME TO local_media_repository_url_cache;
+
+CREATE INDEX local_media_repository_url_cache_expires_idx ON local_media_repository_url_cache(expires_ts);
+CREATE INDEX local_media_repository_url_cache_by_url_download_ts ON local_media_repository_url_cache(url, download_ts);
+CREATE INDEX local_media_repository_url_cache_media_idx ON local_media_repository_url_cache(media_id);
diff --git a/synapse/storage/schema/delta/45/group_server.sql b/synapse/storage/schema/delta/45/group_server.sql
new file mode 100644
index 0000000000..b2333848a0
--- /dev/null
+++ b/synapse/storage/schema/delta/45/group_server.sql
@@ -0,0 +1,167 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE groups (
+    group_id TEXT NOT NULL,
+    name TEXT,  -- the display name of the room
+    avatar_url TEXT,
+    short_description TEXT,
+    long_description TEXT
+);
+
+CREATE UNIQUE INDEX groups_idx ON groups(group_id);
+
+
+-- list of users the group server thinks are joined
+CREATE TABLE group_users (
+    group_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    is_admin BOOLEAN NOT NULL,
+    is_public BOOLEAN NOT NULL  -- whether the users membership can be seen by everyone
+);
+
+
+CREATE INDEX groups_users_g_idx ON group_users(group_id, user_id);
+CREATE INDEX groups_users_u_idx ON group_users(user_id);
+
+-- list of users the group server thinks are invited
+CREATE TABLE group_invites (
+    group_id TEXT NOT NULL,
+    user_id TEXT NOT NULL
+);
+
+CREATE INDEX groups_invites_g_idx ON group_invites(group_id, user_id);
+CREATE INDEX groups_invites_u_idx ON group_invites(user_id);
+
+
+CREATE TABLE group_rooms (
+    group_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    is_public BOOLEAN NOT NULL  -- whether the room can be seen by everyone
+);
+
+CREATE UNIQUE INDEX groups_rooms_g_idx ON group_rooms(group_id, room_id);
+CREATE INDEX groups_rooms_r_idx ON group_rooms(room_id);
+
+
+-- Rooms to include in the summary
+CREATE TABLE group_summary_rooms (
+    group_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    category_id TEXT NOT NULL,
+    room_order BIGINT NOT NULL,
+    is_public BOOLEAN NOT NULL, -- whether the room should be show to everyone
+    UNIQUE (group_id, category_id, room_id, room_order),
+    CHECK (room_order > 0)
+);
+
+CREATE UNIQUE INDEX group_summary_rooms_g_idx ON group_summary_rooms(group_id, room_id, category_id);
+
+
+-- Categories to include in the summary
+CREATE TABLE group_summary_room_categories (
+    group_id TEXT NOT NULL,
+    category_id TEXT NOT NULL,
+    cat_order BIGINT NOT NULL,
+    UNIQUE (group_id, category_id, cat_order),
+    CHECK (cat_order > 0)
+);
+
+-- The categories in the group
+CREATE TABLE group_room_categories (
+    group_id TEXT NOT NULL,
+    category_id TEXT NOT NULL,
+    profile TEXT NOT NULL,
+    is_public BOOLEAN NOT NULL, -- whether the category should be show to everyone
+    UNIQUE (group_id, category_id)
+);
+
+-- The users to include in the group summary
+CREATE TABLE group_summary_users (
+    group_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    role_id TEXT NOT NULL,
+    user_order BIGINT NOT NULL,
+    is_public BOOLEAN NOT NULL  -- whether the user should be show to everyone
+);
+
+CREATE INDEX group_summary_users_g_idx ON group_summary_users(group_id);
+
+-- The roles to include in the group summary
+CREATE TABLE group_summary_roles (
+    group_id TEXT NOT NULL,
+    role_id TEXT NOT NULL,
+    role_order BIGINT NOT NULL,
+    UNIQUE (group_id, role_id, role_order),
+    CHECK (role_order > 0)
+);
+
+
+-- The roles in a groups
+CREATE TABLE group_roles (
+    group_id TEXT NOT NULL,
+    role_id TEXT NOT NULL,
+    profile TEXT NOT NULL,
+    is_public BOOLEAN NOT NULL,  -- whether the role should be show to everyone
+    UNIQUE (group_id, role_id)
+);
+
+
+-- List of  attestations we've given out and need to renew
+CREATE TABLE group_attestations_renewals (
+    group_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    valid_until_ms BIGINT NOT NULL
+);
+
+CREATE INDEX group_attestations_renewals_g_idx ON group_attestations_renewals(group_id, user_id);
+CREATE INDEX group_attestations_renewals_u_idx ON group_attestations_renewals(user_id);
+CREATE INDEX group_attestations_renewals_v_idx ON group_attestations_renewals(valid_until_ms);
+
+
+-- List of attestations we've received from remotes and are interested in.
+CREATE TABLE group_attestations_remote (
+    group_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    valid_until_ms BIGINT NOT NULL,
+    attestation_json TEXT NOT NULL
+);
+
+CREATE INDEX group_attestations_remote_g_idx ON group_attestations_remote(group_id, user_id);
+CREATE INDEX group_attestations_remote_u_idx ON group_attestations_remote(user_id);
+CREATE INDEX group_attestations_remote_v_idx ON group_attestations_remote(valid_until_ms);
+
+
+-- The group membership for the HS's users
+CREATE TABLE local_group_membership (
+    group_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    is_admin BOOLEAN NOT NULL,
+    membership TEXT NOT NULL,
+    is_publicised BOOLEAN NOT NULL,  -- if the user is publicising their membership
+    content TEXT NOT NULL
+);
+
+CREATE INDEX local_group_membership_u_idx ON local_group_membership(user_id, group_id);
+CREATE INDEX local_group_membership_g_idx ON local_group_membership(group_id);
+
+
+CREATE TABLE local_group_updates (
+    stream_id BIGINT NOT NULL,
+    group_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    content TEXT NOT NULL
+);
diff --git a/synapse/storage/schema/delta/45/profile_cache.sql b/synapse/storage/schema/delta/45/profile_cache.sql
new file mode 100644
index 0000000000..e5ddc84df0
--- /dev/null
+++ b/synapse/storage/schema/delta/45/profile_cache.sql
@@ -0,0 +1,28 @@
+/* Copyright 2017 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- A subset of remote users whose profiles we have cached.
+-- Whether a user is in this table or not is defined by the storage function
+-- `is_subscribed_remote_profile_for_user`
+CREATE TABLE remote_profile_cache (
+    user_id TEXT NOT NULL,
+    displayname TEXT,
+    avatar_url TEXT,
+    last_check BIGINT NOT NULL
+);
+
+CREATE UNIQUE INDEX remote_profile_cache_user_id ON remote_profile_cache(user_id);
+CREATE INDEX remote_profile_cache_time ON remote_profile_cache(last_check);