summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py165
-rw-r--r--synapse/storage/_base.py86
-rw-r--r--synapse/storage/account_data.py2
-rw-r--r--synapse/storage/background_updates.py2
-rw-r--r--synapse/storage/client_ips.py20
-rw-r--r--synapse/storage/deviceinbox.py12
-rw-r--r--synapse/storage/devices.py11
-rw-r--r--synapse/storage/end_to_end_keys.py8
-rw-r--r--synapse/storage/engines/__init__.py5
-rw-r--r--synapse/storage/event_federation.py63
-rw-r--r--synapse/storage/event_push_actions.py83
-rw-r--r--synapse/storage/events.py307
-rw-r--r--synapse/storage/events_worker.py27
-rw-r--r--synapse/storage/filtering.py2
-rw-r--r--synapse/storage/group_server.py26
-rw-r--r--synapse/storage/keys.py2
-rw-r--r--synapse/storage/prepare_database.py3
-rw-r--r--synapse/storage/receipts.py21
-rw-r--r--synapse/storage/registration.py99
-rw-r--r--synapse/storage/room.py13
-rw-r--r--synapse/storage/roommember.py18
-rw-r--r--synapse/storage/schema/delta/14/upgrade_appservice_db.py3
-rw-r--r--synapse/storage/schema/delta/25/fts.py4
-rw-r--r--synapse/storage/schema/delta/27/ts.py4
-rw-r--r--synapse/storage/schema/delta/30/as_users.py4
-rw-r--r--synapse/storage/schema/delta/31/search_update.py4
-rw-r--r--synapse/storage/schema/delta/33/event_fields.py4
-rw-r--r--synapse/storage/schema/delta/48/add_user_consent.sql18
-rw-r--r--synapse/storage/schema/delta/48/add_user_ips_last_seen_index.sql17
-rw-r--r--synapse/storage/schema/delta/48/deactivated_users.sql25
-rw-r--r--synapse/storage/schema/delta/48/group_unique_indexes.py57
-rw-r--r--synapse/storage/schema/delta/48/groups_joinable.sql22
-rw-r--r--synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql20
-rw-r--r--synapse/storage/schema/delta/49/add_user_daily_visits.sql21
-rw-r--r--synapse/storage/schema/delta/49/add_user_ips_last_seen_only_index.sql17
-rw-r--r--synapse/storage/search.py8
-rw-r--r--synapse/storage/stream.py469
-rw-r--r--synapse/storage/tags.py6
-rw-r--r--synapse/storage/transactions.py2
-rw-r--r--synapse/storage/user_directory.py4
40 files changed, 1165 insertions, 519 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index de00cae447..979fa22438 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -14,7 +14,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+import datetime
+from dateutil import tz
+import time
+import logging
 
 from synapse.storage.devices import DeviceStore
 from .appservice import (
@@ -57,10 +60,6 @@ from .engines import PostgresEngine
 from synapse.api.constants import PresenceState
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
-
-import logging
-
-
 logger = logging.getLogger(__name__)
 
 
@@ -215,6 +214,9 @@ class DataStore(RoomMemberStore, RoomStore,
         self._stream_order_on_start = self.get_room_max_stream_ordering()
         self._min_stream_order_on_start = self.get_room_min_stream_ordering()
 
+        # Used in _generate_user_daily_visits to keep track of progress
+        self._last_user_visit_update = self._get_start_of_day()
+
         super(DataStore, self).__init__(db_conn, hs)
 
     def take_presence_startup_info(self):
@@ -244,13 +246,12 @@ class DataStore(RoomMemberStore, RoomStore,
 
         return [UserPresenceState(**row) for row in rows]
 
-    @defer.inlineCallbacks
     def count_daily_users(self):
         """
         Counts the number of users who used this homeserver in the last 24 hours.
         """
         def _count_users(txn):
-            yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),
+            yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
 
             sql = """
                 SELECT COALESCE(count(*), 0) FROM (
@@ -264,8 +265,154 @@ class DataStore(RoomMemberStore, RoomStore,
             count, = txn.fetchone()
             return count
 
-        ret = yield self.runInteraction("count_users", _count_users)
-        defer.returnValue(ret)
+        return self.runInteraction("count_users", _count_users)
+
+    def count_r30_users(self):
+        """
+        Counts the number of 30 day retained users, defined as:-
+         * Users who have created their accounts more than 30 days ago
+         * Where last seen at most 30 days ago
+         * Where account creation and last_seen are > 30 days apart
+
+         Returns counts globaly for a given user as well as breaking
+         by platform
+        """
+        def _count_r30_users(txn):
+            thirty_days_in_secs = 86400 * 30
+            now = int(self._clock.time())
+            thirty_days_ago_in_secs = now - thirty_days_in_secs
+
+            sql = """
+                SELECT platform, COALESCE(count(*), 0) FROM (
+                     SELECT
+                        users.name, platform, users.creation_ts * 1000,
+                        MAX(uip.last_seen)
+                     FROM users
+                     INNER JOIN (
+                         SELECT
+                         user_id,
+                         last_seen,
+                         CASE
+                             WHEN user_agent LIKE '%%Android%%' THEN 'android'
+                             WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
+                             WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
+                             WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
+                             WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
+                             ELSE 'unknown'
+                         END
+                         AS platform
+                         FROM user_ips
+                     ) uip
+                     ON users.name = uip.user_id
+                     AND users.appservice_id is NULL
+                     AND users.creation_ts < ?
+                     AND uip.last_seen/1000 > ?
+                     AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
+                     GROUP BY users.name, platform, users.creation_ts
+                ) u GROUP BY platform
+            """
+
+            results = {}
+            txn.execute(sql, (thirty_days_ago_in_secs,
+                              thirty_days_ago_in_secs))
+
+            for row in txn:
+                if row[0] is 'unknown':
+                    pass
+                results[row[0]] = row[1]
+
+            sql = """
+                SELECT COALESCE(count(*), 0) FROM (
+                    SELECT users.name, users.creation_ts * 1000,
+                                                        MAX(uip.last_seen)
+                    FROM users
+                    INNER JOIN (
+                        SELECT
+                        user_id,
+                        last_seen
+                        FROM user_ips
+                    ) uip
+                    ON users.name = uip.user_id
+                    AND appservice_id is NULL
+                    AND users.creation_ts < ?
+                    AND uip.last_seen/1000 > ?
+                    AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
+                    GROUP BY users.name, users.creation_ts
+                ) u
+            """
+
+            txn.execute(sql, (thirty_days_ago_in_secs,
+                              thirty_days_ago_in_secs))
+
+            count, = txn.fetchone()
+            results['all'] = count
+
+            return results
+
+        return self.runInteraction("count_r30_users", _count_r30_users)
+
+    def _get_start_of_day(self):
+        """
+        Returns millisecond unixtime for start of UTC day.
+        """
+        now = datetime.datetime.utcnow()
+        today_start = datetime.datetime(now.year, now.month,
+                                        now.day, tzinfo=tz.tzutc())
+        return int(time.mktime(today_start.timetuple())) * 1000
+
+    def generate_user_daily_visits(self):
+        """
+        Generates daily visit data for use in cohort/ retention analysis
+        """
+        def _generate_user_daily_visits(txn):
+            logger.info("Calling _generate_user_daily_visits")
+            today_start = self._get_start_of_day()
+            a_day_in_milliseconds = 24 * 60 * 60 * 1000
+            now = self.clock.time_msec()
+
+            sql = """
+                INSERT INTO user_daily_visits (user_id, device_id, timestamp)
+                    SELECT u.user_id, u.device_id, ?
+                    FROM user_ips AS u
+                    LEFT JOIN (
+                      SELECT user_id, device_id, timestamp FROM user_daily_visits
+                      WHERE timestamp = ?
+                    ) udv
+                    ON u.user_id = udv.user_id AND u.device_id=udv.device_id
+                    INNER JOIN users ON users.name=u.user_id
+                    WHERE last_seen > ? AND last_seen <= ?
+                    AND udv.timestamp IS NULL AND users.is_guest=0
+                    AND users.appservice_id IS NULL
+                    GROUP BY u.user_id, u.device_id
+            """
+
+            # This means that the day has rolled over but there could still
+            # be entries from the previous day. There is an edge case
+            # where if the user logs in at 23:59 and overwrites their
+            # last_seen at 00:01 then they will not be counted in the
+            # previous day's stats - it is important that the query is run
+            # often to minimise this case.
+            if today_start > self._last_user_visit_update:
+                yesterday_start = today_start - a_day_in_milliseconds
+                txn.execute(sql, (
+                    yesterday_start, yesterday_start,
+                    self._last_user_visit_update, today_start
+                ))
+                self._last_user_visit_update = today_start
+
+            txn.execute(sql, (
+                today_start, today_start,
+                self._last_user_visit_update,
+                now
+            ))
+            # Update _last_user_visit_update to now. The reason to do this
+            # rather just clamping to the beginning of the day is to limit
+            # the size of the join - meaning that the query can be run more
+            # frequently
+            self._last_user_visit_update = now
+
+        return self.runInteraction("generate_user_daily_visits",
+                                   _generate_user_daily_visits)
 
     def get_users(self):
         """Function to reterive a list of users in users table.
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 2fbebd4907..22d6257a9f 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -18,8 +18,8 @@ from synapse.api.errors import StoreError
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.caches.descriptors import Cache
 from synapse.storage.engines import PostgresEngine
-import synapse.metrics
 
+from prometheus_client import Histogram
 
 from twisted.internet import defer
 
@@ -27,20 +27,25 @@ import sys
 import time
 import threading
 
+from six import itervalues, iterkeys, iteritems
+from six.moves import intern, range
 
 logger = logging.getLogger(__name__)
 
+try:
+    MAX_TXN_ID = sys.maxint - 1
+except AttributeError:
+    # python 3 does not have a maximum int value
+    MAX_TXN_ID = 2**63 - 1
+
 sql_logger = logging.getLogger("synapse.storage.SQL")
 transaction_logger = logging.getLogger("synapse.storage.txn")
 perf_logger = logging.getLogger("synapse.storage.TIME")
 
+sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "sec")
 
-metrics = synapse.metrics.get_metrics_for("synapse.storage")
-
-sql_scheduling_timer = metrics.register_distribution("schedule_time")
-
-sql_query_timer = metrics.register_distribution("query_time", labels=["verb"])
-sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"])
+sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"])
+sql_txn_timer = Histogram("synapse_storage_transaction_time", "sec", ["desc"])
 
 
 class LoggingTransaction(object):
@@ -105,7 +110,7 @@ class LoggingTransaction(object):
                 # Don't let logging failures stop SQL from working
                 pass
 
-        start = time.time() * 1000
+        start = time.time()
 
         try:
             return func(
@@ -115,9 +120,9 @@ class LoggingTransaction(object):
             logger.debug("[SQL FAIL] {%s} %s", self.name, e)
             raise
         finally:
-            msecs = (time.time() * 1000) - start
-            sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
-            sql_query_timer.inc_by(msecs, sql.split()[0])
+            secs = time.time() - start
+            sql_logger.debug("[SQL time] {%s} %f sec", self.name, secs)
+            sql_query_timer.labels(sql.split()[0]).observe(secs)
 
 
 class PerformanceCounters(object):
@@ -127,7 +132,7 @@ class PerformanceCounters(object):
 
     def update(self, key, start_time, end_time=None):
         if end_time is None:
-            end_time = time.time() * 1000
+            end_time = time.time()
         duration = end_time - start_time
         count, cum_time = self.current_counters.get(key, (0, 0))
         count += 1
@@ -137,7 +142,7 @@ class PerformanceCounters(object):
 
     def interval(self, interval_duration, limit=3):
         counters = []
-        for name, (count, cum_time) in self.current_counters.iteritems():
+        for name, (count, cum_time) in iteritems(self.current_counters):
             prev_count, prev_time = self.previous_counters.get(name, (0, 0))
             counters.append((
                 (cum_time - prev_time) / interval_duration,
@@ -217,12 +222,12 @@ class SQLBaseStore(object):
 
     def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
                          logging_context, func, *args, **kwargs):
-        start = time.time() * 1000
+        start = time.time()
         txn_id = self._TXN_ID
 
         # We don't really need these to be unique, so lets stop it from
         # growing really large.
-        self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1)
+        self._TXN_ID = (self._TXN_ID + 1) % (MAX_TXN_ID)
 
         name = "%s-%x" % (desc, txn_id, )
 
@@ -277,17 +282,17 @@ class SQLBaseStore(object):
             logger.debug("[TXN FAIL] {%s} %s", name, e)
             raise
         finally:
-            end = time.time() * 1000
+            end = time.time()
             duration = end - start
 
             if logging_context is not None:
                 logging_context.add_database_transaction(duration)
 
-            transaction_logger.debug("[TXN END] {%s} %f", name, duration)
+            transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
 
             self._current_txn_total_time += duration
             self._txn_perf_counters.update(desc, start, end)
-            sql_txn_timer.inc_by(duration, desc)
+            sql_txn_timer.labels(desc).observe(duration)
 
     @defer.inlineCallbacks
     def runInteraction(self, desc, func, *args, **kwargs):
@@ -344,13 +349,13 @@ class SQLBaseStore(object):
         """
         current_context = LoggingContext.current_context()
 
-        start_time = time.time() * 1000
+        start_time = time.time()
 
         def inner_func(conn, *args, **kwargs):
             with LoggingContext("runWithConnection") as context:
-                sched_duration_ms = time.time() * 1000 - start_time
-                sql_scheduling_timer.inc_by(sched_duration_ms)
-                current_context.add_database_scheduled(sched_duration_ms)
+                sched_duration_sec = time.time() - start_time
+                sql_scheduling_timer.observe(sched_duration_sec)
+                current_context.add_database_scheduled(sched_duration_sec)
 
                 if self.database_engine.is_connection_closed(conn):
                     logger.debug("Reconnecting closed database connection")
@@ -376,7 +381,7 @@ class SQLBaseStore(object):
         Returns:
             A list of dicts where the key is the column header.
         """
-        col_headers = list(intern(column[0]) for column in cursor.description)
+        col_headers = list(intern(str(column[0])) for column in cursor.description)
         results = list(
             dict(zip(col_headers, row)) for row in cursor
         )
@@ -543,7 +548,7 @@ class SQLBaseStore(object):
             ", ".join("%s = ?" % (k,) for k in values),
             " AND ".join("%s = ?" % (k,) for k in keyvalues)
         )
-        sqlargs = values.values() + keyvalues.values()
+        sqlargs = list(values.values()) + list(keyvalues.values())
 
         txn.execute(sql, sqlargs)
         if txn.rowcount > 0:
@@ -561,7 +566,7 @@ class SQLBaseStore(object):
             ", ".join(k for k in allvalues),
             ", ".join("?" for _ in allvalues)
         )
-        txn.execute(sql, allvalues.values())
+        txn.execute(sql, list(allvalues.values()))
         # successfully inserted
         return True
 
@@ -629,8 +634,8 @@ class SQLBaseStore(object):
         }
 
         if keyvalues:
-            sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
-            txn.execute(sql, keyvalues.values())
+            sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues))
+            txn.execute(sql, list(keyvalues.values()))
         else:
             txn.execute(sql)
 
@@ -694,7 +699,7 @@ class SQLBaseStore(object):
                 table,
                 " AND ".join("%s = ?" % (k, ) for k in keyvalues)
             )
-            txn.execute(sql, keyvalues.values())
+            txn.execute(sql, list(keyvalues.values()))
         else:
             sql = "SELECT %s FROM %s" % (
                 ", ".join(retcols),
@@ -725,9 +730,12 @@ class SQLBaseStore(object):
         if not iterable:
             defer.returnValue(results)
 
+        # iterables can not be sliced, so convert it to a list first
+        it_list = list(iterable)
+
         chunks = [
-            iterable[i:i + batch_size]
-            for i in xrange(0, len(iterable), batch_size)
+            it_list[i:i + batch_size]
+            for i in range(0, len(it_list), batch_size)
         ]
         for chunk in chunks:
             rows = yield self.runInteraction(
@@ -767,7 +775,7 @@ class SQLBaseStore(object):
         )
         values.extend(iterable)
 
-        for key, value in keyvalues.iteritems():
+        for key, value in iteritems(keyvalues):
             clauses.append("%s = ?" % (key,))
             values.append(value)
 
@@ -790,7 +798,7 @@ class SQLBaseStore(object):
     @staticmethod
     def _simple_update_txn(txn, table, keyvalues, updatevalues):
         if keyvalues:
-            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
+            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues))
         else:
             where = ""
 
@@ -802,7 +810,7 @@ class SQLBaseStore(object):
 
         txn.execute(
             update_sql,
-            updatevalues.values() + keyvalues.values()
+            list(updatevalues.values()) + list(keyvalues.values())
         )
 
         return txn.rowcount
@@ -850,7 +858,7 @@ class SQLBaseStore(object):
             " AND ".join("%s = ?" % (k,) for k in keyvalues)
         )
 
-        txn.execute(select_sql, keyvalues.values())
+        txn.execute(select_sql, list(keyvalues.values()))
 
         row = txn.fetchone()
         if not row:
@@ -888,7 +896,7 @@ class SQLBaseStore(object):
             " AND ".join("%s = ?" % (k, ) for k in keyvalues)
         )
 
-        txn.execute(sql, keyvalues.values())
+        txn.execute(sql, list(keyvalues.values()))
         if txn.rowcount == 0:
             raise StoreError(404, "No row found")
         if txn.rowcount > 1:
@@ -906,7 +914,7 @@ class SQLBaseStore(object):
             " AND ".join("%s = ?" % (k, ) for k in keyvalues)
         )
 
-        return txn.execute(sql, keyvalues.values())
+        return txn.execute(sql, list(keyvalues.values()))
 
     def _simple_delete_many(self, table, column, iterable, keyvalues, desc):
         return self.runInteraction(
@@ -938,7 +946,7 @@ class SQLBaseStore(object):
         )
         values.extend(iterable)
 
-        for key, value in keyvalues.iteritems():
+        for key, value in iteritems(keyvalues):
             clauses.append("%s = ?" % (key,))
             values.append(value)
 
@@ -978,7 +986,7 @@ class SQLBaseStore(object):
         txn.close()
 
         if cache:
-            min_val = min(cache.itervalues())
+            min_val = min(itervalues(cache))
         else:
             min_val = max_value
 
@@ -1093,7 +1101,7 @@ class SQLBaseStore(object):
                 " AND ".join("%s = ?" % (k,) for k in keyvalues),
                 " ? ASC LIMIT ? OFFSET ?"
             )
-            txn.execute(sql, keyvalues.values() + pagevalues)
+            txn.execute(sql, list(keyvalues.values()) + list(pagevalues))
         else:
             sql = "SELECT %s FROM %s ORDER BY %s" % (
                 ", ".join(retcols),
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index e70c9423e3..f83ff0454a 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -23,7 +23,7 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
 
 import abc
-import ujson as json
+import simplejson as json
 import logging
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index c88759bf2c..8af325a9f5 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -19,7 +19,7 @@ from . import engines
 
 from twisted.internet import defer
 
-import ujson as json
+import simplejson as json
 import logging
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index a03d1d6104..ce338514e8 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -22,6 +22,8 @@ from . import background_updates
 
 from synapse.util.caches import CACHE_SIZE_FACTOR
 
+from six import iteritems
+
 
 logger = logging.getLogger(__name__)
 
@@ -48,6 +50,20 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
             columns=["user_id", "device_id", "last_seen"],
         )
 
+        self.register_background_index_update(
+            "user_ips_last_seen_index",
+            index_name="user_ips_last_seen",
+            table="user_ips",
+            columns=["user_id", "last_seen"],
+        )
+
+        self.register_background_index_update(
+            "user_ips_last_seen_only_index",
+            index_name="user_ips_last_seen_only",
+            table="user_ips",
+            columns=["last_seen"],
+        )
+
         # (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
         self._batch_row_update = {}
 
@@ -85,7 +101,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
     def _update_client_ips_batch_txn(self, txn, to_update):
         self.database_engine.lock_table(txn, "user_ips")
 
-        for entry in to_update.iteritems():
+        for entry in iteritems(to_update):
             (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
 
             self._simple_upsert_txn(
@@ -217,5 +233,5 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
                 "user_agent": user_agent,
                 "last_seen": last_seen,
             }
-            for (access_token, ip), (user_agent, last_seen) in results.iteritems()
+            for (access_token, ip), (user_agent, last_seen) in iteritems(results)
         ))
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 548e795daf..a879e5bfc1 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 import logging
-import ujson
+import simplejson
 
 from twisted.internet import defer
 
@@ -85,7 +85,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
             )
             rows = []
             for destination, edu in remote_messages_by_destination.items():
-                edu_json = ujson.dumps(edu)
+                edu_json = simplejson.dumps(edu)
                 rows.append((destination, stream_id, now_ms, edu_json))
             txn.executemany(sql, rows)
 
@@ -177,7 +177,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                     " WHERE user_id = ?"
                 )
                 txn.execute(sql, (user_id,))
-                message_json = ujson.dumps(messages_by_device["*"])
+                message_json = simplejson.dumps(messages_by_device["*"])
                 for row in txn:
                     # Add the message for all devices for this user on this
                     # server.
@@ -199,7 +199,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                     # Only insert into the local inbox if the device exists on
                     # this server
                     device = row[0]
-                    message_json = ujson.dumps(messages_by_device[device])
+                    message_json = simplejson.dumps(messages_by_device[device])
                     messages_json_for_user[device] = message_json
 
             if messages_json_for_user:
@@ -253,7 +253,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
             messages = []
             for row in txn:
                 stream_pos = row[0]
-                messages.append(ujson.loads(row[1]))
+                messages.append(simplejson.loads(row[1]))
             if len(messages) < limit:
                 stream_pos = current_stream_id
             return (messages, stream_pos)
@@ -389,7 +389,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
             messages = []
             for row in txn:
                 stream_pos = row[0]
-                messages.append(ujson.loads(row[1]))
+                messages.append(simplejson.loads(row[1]))
             if len(messages) < limit:
                 stream_pos = current_stream_id
             return (messages, stream_pos)
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index bd2effdf34..d149d8392e 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import ujson as json
+import simplejson as json
 
 from twisted.internet import defer
 
@@ -21,6 +21,7 @@ from synapse.api.errors import StoreError
 from ._base import SQLBaseStore, Cache
 from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
 
+from six import itervalues, iteritems
 
 logger = logging.getLogger(__name__)
 
@@ -360,7 +361,7 @@ class DeviceStore(SQLBaseStore):
             return (now_stream_id, [])
 
         if len(query_map) >= 20:
-            now_stream_id = max(stream_id for stream_id in query_map.itervalues())
+            now_stream_id = max(stream_id for stream_id in itervalues(query_map))
 
         devices = self._get_e2e_device_keys_txn(
             txn, query_map.keys(), include_all_devices=True
@@ -373,13 +374,13 @@ class DeviceStore(SQLBaseStore):
         """
 
         results = []
-        for user_id, user_devices in devices.iteritems():
+        for user_id, user_devices in iteritems(devices):
             # The prev_id for the first row is always the last row before
             # `from_stream_id`
             txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id))
             rows = txn.fetchall()
             prev_id = rows[0][0]
-            for device_id, device in user_devices.iteritems():
+            for device_id, device in iteritems(user_devices):
                 stream_id = query_map[(user_id, device_id)]
                 result = {
                     "user_id": user_id,
@@ -483,7 +484,7 @@ class DeviceStore(SQLBaseStore):
         if devices:
             user_devices = devices[user_id]
             results = []
-            for device_id, device in user_devices.iteritems():
+            for device_id, device in iteritems(user_devices):
                 result = {
                     "device_id": device_id,
                 }
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 2cebb203c6..b146487943 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -17,10 +17,12 @@ from twisted.internet import defer
 from synapse.util.caches.descriptors import cached
 
 from canonicaljson import encode_canonical_json
-import ujson as json
+import simplejson as json
 
 from ._base import SQLBaseStore
 
+from six import iteritems
+
 
 class EndToEndKeyStore(SQLBaseStore):
     def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys):
@@ -81,8 +83,8 @@ class EndToEndKeyStore(SQLBaseStore):
             query_list, include_all_devices,
         )
 
-        for user_id, device_keys in results.iteritems():
-            for device_id, device_info in device_keys.iteritems():
+        for user_id, device_keys in iteritems(results):
+            for device_id, device_info in iteritems(device_keys):
                 device_info["keys"] = json.loads(device_info.pop("key_json"))
 
         defer.returnValue(results)
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index 338b495611..8c868ece75 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -18,6 +18,7 @@ from .postgres import PostgresEngine
 from .sqlite3 import Sqlite3Engine
 
 import importlib
+import platform
 
 
 SUPPORTED_MODULE = {
@@ -31,6 +32,10 @@ def create_engine(database_config):
     engine_class = SUPPORTED_MODULE.get(name, None)
 
     if engine_class:
+        # pypy requires psycopg2cffi rather than psycopg2
+        if (name == "psycopg2" and
+                platform.python_implementation() == "PyPy"):
+            name = "psycopg2cffi"
         module = importlib.import_module(name)
         return engine_class(module, database_config)
 
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 00ee82d300..8fbf7ffba7 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import random
 
 from twisted.internet import defer
 
@@ -24,7 +25,9 @@ from synapse.util.caches.descriptors import cached
 from unpaddedbase64 import encode_base64
 
 import logging
-from Queue import PriorityQueue, Empty
+from six.moves.queue import PriorityQueue, Empty
+
+from six.moves import range
 
 
 logger = logging.getLogger(__name__)
@@ -78,7 +81,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             front_list = list(front)
             chunks = [
                 front_list[x:x + 100]
-                for x in xrange(0, len(front), 100)
+                for x in range(0, len(front), 100)
             ]
             for chunk in chunks:
                 txn.execute(
@@ -133,7 +136,47 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             retcol="event_id",
         )
 
+    @defer.inlineCallbacks
+    def get_prev_events_for_room(self, room_id):
+        """
+        Gets a subset of the current forward extremities in the given room.
+
+        Limits the result to 10 extremities, so that we can avoid creating
+        events which refer to hundreds of prev_events.
+
+        Args:
+            room_id (str): room_id
+
+        Returns:
+            Deferred[list[(str, dict[str, str], int)]]
+                for each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+        """
+        res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
+        if len(res) > 10:
+            # Sort by reverse depth, so we point to the most recent.
+            res.sort(key=lambda a: -a[2])
+
+            # we use half of the limit for the actual most recent events, and
+            # the other half to randomly point to some of the older events, to
+            # make sure that we don't completely ignore the older events.
+            res = res[0:5] + random.sample(res[5:], 5)
+
+        defer.returnValue(res)
+
     def get_latest_event_ids_and_hashes_in_room(self, room_id):
+        """
+        Gets the current forward extremities in the given room
+
+        Args:
+            room_id (str): room_id
+
+        Returns:
+            Deferred[list[(str, dict[str, str], int)]]
+                for each event, a tuple of (event_id, hashes, depth)
+                where *hashes* is a map from algorithm to hash.
+        """
+
         return self.runInteraction(
             "get_latest_event_ids_and_hashes_in_room",
             self._get_latest_event_ids_and_hashes_in_room,
@@ -182,22 +225,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
             room_id,
         )
 
-    @defer.inlineCallbacks
-    def get_max_depth_of_events(self, event_ids):
-        sql = (
-            "SELECT MAX(depth) FROM events WHERE event_id IN (%s)"
-        ) % (",".join(["?"] * len(event_ids)),)
-
-        rows = yield self._execute(
-            "get_max_depth_of_events", None,
-            sql, *event_ids
-        )
-
-        if rows:
-            defer.returnValue(rows[0][0])
-        else:
-            defer.returnValue(1)
-
     def _get_min_depth_interaction(self, txn, room_id):
         min_depth = self._simple_select_one_onecol_txn(
             txn,
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 01f8339825..d0350ee5fe 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -18,11 +18,11 @@ from synapse.storage._base import SQLBaseStore, LoggingTransaction
 from twisted.internet import defer
 from synapse.util.async import sleep
 from synapse.util.caches.descriptors import cachedInlineCallbacks
-from synapse.types import RoomStreamToken
-from .stream import lower_bound
 
 import logging
-import ujson as json
+import simplejson as json
+
+from six import iteritems
 
 logger = logging.getLogger(__name__)
 
@@ -99,7 +99,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
     def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id,
                                           last_read_event_id):
         sql = (
-            "SELECT stream_ordering, topological_ordering"
+            "SELECT stream_ordering"
             " FROM events"
             " WHERE room_id = ? AND event_id = ?"
         )
@@ -111,17 +111,12 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             return {"notify_count": 0, "highlight_count": 0}
 
         stream_ordering = results[0][0]
-        topological_ordering = results[0][1]
 
         return self._get_unread_counts_by_pos_txn(
-            txn, room_id, user_id, topological_ordering, stream_ordering
+            txn, room_id, user_id, stream_ordering
         )
 
-    def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering,
-                                      stream_ordering):
-        token = RoomStreamToken(
-            topological_ordering, stream_ordering
-        )
+    def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):
 
         # First get number of notifications.
         # We don't need to put a notif=1 clause as all rows always have
@@ -132,10 +127,10 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             " WHERE"
             " user_id = ?"
             " AND room_id = ?"
-            " AND %s"
-        ) % (lower_bound(token, self.database_engine, inclusive=False),)
+            " AND stream_ordering > ?"
+        )
 
-        txn.execute(sql, (user_id, room_id))
+        txn.execute(sql, (user_id, room_id, stream_ordering))
         row = txn.fetchone()
         notify_count = row[0] if row else 0
 
@@ -155,10 +150,10 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             " highlight = 1"
             " AND user_id = ?"
             " AND room_id = ?"
-            " AND %s"
-        ) % (lower_bound(token, self.database_engine, inclusive=False),)
+            " AND stream_ordering > ?"
+        )
 
-        txn.execute(sql, (user_id, room_id))
+        txn.execute(sql, (user_id, room_id, stream_ordering))
         row = txn.fetchone()
         highlight_count = row[0] if row else 0
 
@@ -209,7 +204,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 "   ep.highlight "
                 " FROM ("
                 "   SELECT room_id,"
-                "       MAX(topological_ordering) as topological_ordering,"
                 "       MAX(stream_ordering) as stream_ordering"
                 "   FROM events"
                 "   INNER JOIN receipts_linearized USING (room_id, event_id)"
@@ -219,13 +213,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 " event_push_actions AS ep"
                 " WHERE"
                 "   ep.room_id = rl.room_id"
-                "   AND ("
-                "       ep.topological_ordering > rl.topological_ordering"
-                "       OR ("
-                "           ep.topological_ordering = rl.topological_ordering"
-                "           AND ep.stream_ordering > rl.stream_ordering"
-                "       )"
-                "   )"
+                "   AND ep.stream_ordering > rl.stream_ordering"
                 "   AND ep.user_id = ?"
                 "   AND ep.stream_ordering > ?"
                 "   AND ep.stream_ordering <= ?"
@@ -318,7 +306,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 "  ep.highlight, e.received_ts"
                 " FROM ("
                 "   SELECT room_id,"
-                "       MAX(topological_ordering) as topological_ordering,"
                 "       MAX(stream_ordering) as stream_ordering"
                 "   FROM events"
                 "   INNER JOIN receipts_linearized USING (room_id, event_id)"
@@ -329,13 +316,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 " INNER JOIN events AS e USING (room_id, event_id)"
                 " WHERE"
                 "   ep.room_id = rl.room_id"
-                "   AND ("
-                "       ep.topological_ordering > rl.topological_ordering"
-                "       OR ("
-                "           ep.topological_ordering = rl.topological_ordering"
-                "           AND ep.stream_ordering > rl.stream_ordering"
-                "       )"
-                "   )"
+                "   AND ep.stream_ordering > rl.stream_ordering"
                 "   AND ep.user_id = ?"
                 "   AND ep.stream_ordering > ?"
                 "   AND ep.stream_ordering <= ?"
@@ -441,13 +422,14 @@ class EventPushActionsWorkerStore(SQLBaseStore):
 
             txn.executemany(sql, (
                 _gen_entry(user_id, actions)
-                for user_id, actions in user_id_actions.iteritems()
+                for user_id, actions in iteritems(user_id_actions)
             ))
 
         return self.runInteraction(
             "add_push_actions_to_staging", _add_push_actions_to_staging_txn
         )
 
+    @defer.inlineCallbacks
     def remove_push_actions_from_staging(self, event_id):
         """Called if we failed to persist the event to ensure that stale push
         actions don't build up in the DB
@@ -456,13 +438,22 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             event_id (str)
         """
 
-        return self._simple_delete(
-            table="event_push_actions_staging",
-            keyvalues={
-                "event_id": event_id,
-            },
-            desc="remove_push_actions_from_staging",
-        )
+        try:
+            res = yield self._simple_delete(
+                table="event_push_actions_staging",
+                keyvalues={
+                    "event_id": event_id,
+                },
+                desc="remove_push_actions_from_staging",
+            )
+            defer.returnValue(res)
+        except Exception:
+            # this method is called from an exception handler, so propagating
+            # another exception here really isn't helpful - there's nothing
+            # the caller can do about it. Just log the exception and move on.
+            logger.exception(
+                "Error removing push actions after event persistence failure",
+            )
 
     @defer.inlineCallbacks
     def _find_stream_orderings_for_times(self):
@@ -752,10 +743,10 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         )
 
     def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
-                                            topological_ordering, stream_ordering):
+                                            stream_ordering):
         """
         Purges old push actions for a user and room before a given
-        topological_ordering.
+        stream_ordering.
 
         We however keep a months worth of highlighted notifications, so that
         users can still get a list of recent highlights.
@@ -764,7 +755,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             txn: The transcation
             room_id: Room ID to delete from
             user_id: user ID to delete for
-            topological_ordering: The lowest topological ordering which will
+            stream_ordering: The lowest stream ordering which will
                                   not be deleted.
         """
         txn.call_after(
@@ -783,9 +774,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         txn.execute(
             "DELETE FROM event_push_actions "
             " WHERE user_id = ? AND room_id = ? AND "
-            " topological_ordering <= ?"
+            " stream_ordering <= ?"
             " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
-            (user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
+            (user_id, room_id, stream_ordering, self.stream_ordering_month_ago)
         )
 
         txn.execute("""
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 9fc65229fd..b96104ccae 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -14,52 +14,57 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage.events_worker import EventsWorkerStore
+from collections import OrderedDict, deque, namedtuple
+from functools import wraps
+import itertools
+import logging
 
+import simplejson as json
 from twisted.internet import defer
 
-from synapse.events import USE_FROZEN_DICTS
-
+from synapse.storage.events_worker import EventsWorkerStore
 from synapse.util.async import ObservableDeferred
+from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import (
-    PreserveLoggingContext, make_deferred_yieldable
+    PreserveLoggingContext, make_deferred_yieldable,
 )
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from synapse.types import get_domain_from_id
-
-from canonicaljson import encode_canonical_json
-from collections import deque, namedtuple, OrderedDict
-from functools import wraps
-
+from synapse.types import get_domain_from_id, RoomStreamToken
 import synapse.metrics
 
-import logging
-import simplejson as json
-
 # these are only included to make the type annotations work
 from synapse.events import EventBase    # noqa: F401
 from synapse.events.snapshot import EventContext   # noqa: F401
 
+from prometheus_client import Counter
+
 logger = logging.getLogger(__name__)
 
+persist_event_counter = Counter("synapse_storage_events_persisted_events", "")
+event_counter = Counter("synapse_storage_events_persisted_events_sep", "",
+                        ["type", "origin_type", "origin_entity"])
 
-metrics = synapse.metrics.get_metrics_for(__name__)
-persist_event_counter = metrics.register_counter("persisted_events")
-event_counter = metrics.register_counter(
-    "persisted_events_sep", labels=["type", "origin_type", "origin_entity"]
-)
+# The number of times we are recalculating the current state
+state_delta_counter = Counter("synapse_storage_events_state_delta", "")
+
+# The number of times we are recalculating state when there is only a
+# single forward extremity
+state_delta_single_event_counter = Counter(
+    "synapse_storage_events_state_delta_single_event", "")
+
+# The number of times we are reculating state when we could have resonably
+# calculated the delta when we calculated the state for an event we were
+# persisting.
+state_delta_reuse_delta_counter = Counter(
+    "synapse_storage_events_state_delta_reuse_delta", "")
 
 
 def encode_json(json_object):
-    if USE_FROZEN_DICTS:
-        # ujson doesn't like frozen_dicts
-        return encode_canonical_json(json_object)
-    else:
-        return json.dumps(json_object, ensure_ascii=False)
+    return frozendict_json_encoder.encode(json_object)
 
 
 class _EventPeristenceQueue(object):
@@ -369,7 +374,8 @@ class EventsStore(EventsWorkerStore):
                                 room_id, ev_ctx_rm, latest_event_ids
                             )
 
-                            if new_latest_event_ids == set(latest_event_ids):
+                            latest_event_ids = set(latest_event_ids)
+                            if new_latest_event_ids == latest_event_ids:
                                 # No change in extremities, so no change in state
                                 continue
 
@@ -390,12 +396,34 @@ class EventsStore(EventsWorkerStore):
                                 if all_single_prev_not_state:
                                     continue
 
+                            state_delta_counter.inc()
+                            if len(new_latest_event_ids) == 1:
+                                state_delta_single_event_counter.inc()
+
+                                # This is a fairly handwavey check to see if we could
+                                # have guessed what the delta would have been when
+                                # processing one of these events.
+                                # What we're interested in is if the latest extremities
+                                # were the same when we created the event as they are
+                                # now. When this server creates a new event (as opposed
+                                # to receiving it over federation) it will use the
+                                # forward extremities as the prev_events, so we can
+                                # guess this by looking at the prev_events and checking
+                                # if they match the current forward extremities.
+                                for ev, _ in ev_ctx_rm:
+                                    prev_event_ids = set(e for e, _ in ev.prev_events)
+                                    if latest_event_ids == prev_event_ids:
+                                        state_delta_reuse_delta_counter.inc()
+                                        break
+
                             logger.info(
                                 "Calculating state delta for room %s", room_id,
                             )
                             current_state = yield self._get_new_state_after_events(
                                 room_id,
-                                ev_ctx_rm, new_latest_event_ids,
+                                ev_ctx_rm,
+                                latest_event_ids,
+                                new_latest_event_ids,
                             )
                             if current_state is not None:
                                 current_state_for_room[room_id] = current_state
@@ -414,7 +442,10 @@ class EventsStore(EventsWorkerStore):
                     state_delta_for_room=state_delta_for_room,
                     new_forward_extremeties=new_forward_extremeties,
                 )
-                persist_event_counter.inc_by(len(chunk))
+                persist_event_counter.inc(len(chunk))
+                synapse.metrics.event_persisted_position.set(
+                    chunk[-1][0].internal_metadata.stream_ordering,
+                )
                 for event, context in chunk:
                     if context.app_service:
                         origin_type = "local"
@@ -426,7 +457,7 @@ class EventsStore(EventsWorkerStore):
                         origin_type = "remote"
                         origin_entity = get_domain_from_id(event.sender)
 
-                    event_counter.inc(event.type, origin_type, origin_entity)
+                    event_counter.labels(event.type, origin_type, origin_entity).inc()
 
                 for room_id, new_state in current_state_for_room.iteritems():
                     self.get_current_state_ids.prefill(
@@ -480,7 +511,8 @@ class EventsStore(EventsWorkerStore):
         defer.returnValue(new_latest_event_ids)
 
     @defer.inlineCallbacks
-    def _get_new_state_after_events(self, room_id, events_context, new_latest_event_ids):
+    def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids,
+                                    new_latest_event_ids):
         """Calculate the current state dict after adding some new events to
         a room
 
@@ -491,6 +523,9 @@ class EventsStore(EventsWorkerStore):
             events_context (list[(EventBase, EventContext)]):
                 events and contexts which are being added to the room
 
+            old_latest_event_ids (iterable[str]):
+                the old forward extremities for the room.
+
             new_latest_event_ids (iterable[str]):
                 the new forward extremities for the room.
 
@@ -501,64 +536,89 @@ class EventsStore(EventsWorkerStore):
         """
 
         if not new_latest_event_ids:
-            defer.returnValue({})
+            return
 
         # map from state_group to ((type, key) -> event_id) state map
-        state_groups = {}
-        missing_event_ids = []
-        was_updated = False
+        state_groups_map = {}
+        for ev, ctx in events_context:
+            if ctx.state_group is None:
+                # I don't think this can happen, but let's double-check
+                raise Exception(
+                    "Context for new extremity event %s has no state "
+                    "group" % (ev.event_id, ),
+                )
+
+            if ctx.state_group in state_groups_map:
+                continue
+
+            state_groups_map[ctx.state_group] = ctx.current_state_ids
+
+        # We need to map the event_ids to their state groups. First, let's
+        # check if the event is one we're persisting, in which case we can
+        # pull the state group from its context.
+        # Otherwise we need to pull the state group from the database.
+
+        # Set of events we need to fetch groups for. (We know none of the old
+        # extremities are going to be in events_context).
+        missing_event_ids = set(old_latest_event_ids)
+
+        event_id_to_state_group = {}
         for event_id in new_latest_event_ids:
-            # First search in the list of new events we're adding,
-            # and then use the current state from that
+            # First search in the list of new events we're adding.
             for ev, ctx in events_context:
                 if event_id == ev.event_id:
-                    if ctx.current_state_ids is None:
-                        raise Exception("Unknown current state")
-
-                    if ctx.state_group is None:
-                        # I don't think this can happen, but let's double-check
-                        raise Exception(
-                            "Context for new extremity event %s has no state "
-                            "group" % (event_id, ),
-                        )
-
-                    # If we've already seen the state group don't bother adding
-                    # it to the state sets again
-                    if ctx.state_group not in state_groups:
-                        state_groups[ctx.state_group] = ctx.current_state_ids
-                        if ctx.delta_ids or hasattr(ev, "state_key"):
-                            was_updated = True
+                    event_id_to_state_group[event_id] = ctx.state_group
                     break
             else:
                 # If we couldn't find it, then we'll need to pull
                 # the state from the database
-                was_updated = True
-                missing_event_ids.append(event_id)
-
-        if not was_updated:
-            return
+                missing_event_ids.add(event_id)
 
         if missing_event_ids:
-            # Now pull out the state for any missing events from DB
+            # Now pull out the state groups for any missing events from DB
             event_to_groups = yield self._get_state_group_for_events(
                 missing_event_ids,
             )
+            event_id_to_state_group.update(event_to_groups)
 
-            groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys())
+        # State groups of old_latest_event_ids
+        old_state_groups = set(
+            event_id_to_state_group[evid] for evid in old_latest_event_ids
+        )
 
-            if groups:
-                group_to_state = yield self._get_state_for_groups(groups)
-                state_groups.update(group_to_state)
+        # State groups of new_latest_event_ids
+        new_state_groups = set(
+            event_id_to_state_group[evid] for evid in new_latest_event_ids
+        )
 
-        if len(state_groups) == 1:
+        # If they old and new groups are the same then we don't need to do
+        # anything.
+        if old_state_groups == new_state_groups:
+            return
+
+        # Now that we have calculated new_state_groups we need to get
+        # their state IDs so we can resolve to a single state set.
+        missing_state = new_state_groups - set(state_groups_map)
+        if missing_state:
+            group_to_state = yield self._get_state_for_groups(missing_state)
+            state_groups_map.update(group_to_state)
+
+        if len(new_state_groups) == 1:
             # If there is only one state group, then we know what the current
             # state is.
-            defer.returnValue(state_groups.values()[0])
+            defer.returnValue(state_groups_map[new_state_groups.pop()])
+
+        # Ok, we need to defer to the state handler to resolve our state sets.
 
         def get_events(ev_ids):
             return self.get_events(
                 ev_ids, get_prev_content=False, check_redacted=False,
             )
+
+        state_groups = {
+            sg: state_groups_map[sg] for sg in new_state_groups
+        }
+
         events_map = {ev.event_id: ev for ev, _ in events_context}
         logger.debug("calling resolve_state_groups from preserve_events")
         res = yield self._state_resolution_handler.resolve_state_groups(
@@ -1288,13 +1348,49 @@ class EventsStore(EventsWorkerStore):
 
         defer.returnValue(set(r["event_id"] for r in rows))
 
-    def have_events(self, event_ids):
+    @defer.inlineCallbacks
+    def have_seen_events(self, event_ids):
         """Given a list of event ids, check if we have already processed them.
 
+        Args:
+            event_ids (iterable[str]):
+
+        Returns:
+            Deferred[set[str]]: The events we have already seen.
+        """
+        results = set()
+
+        def have_seen_events_txn(txn, chunk):
+            sql = (
+                "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
+                % (",".join("?" * len(chunk)), )
+            )
+            txn.execute(sql, chunk)
+            for (event_id, ) in txn:
+                results.add(event_id)
+
+        # break the input up into chunks of 100
+        input_iterator = iter(event_ids)
+        for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
+                          []):
+            yield self.runInteraction(
+                "have_seen_events",
+                have_seen_events_txn,
+                chunk,
+            )
+        defer.returnValue(results)
+
+    def get_seen_events_with_rejections(self, event_ids):
+        """Given a list of event ids, check if we rejected them.
+
+        Args:
+            event_ids (list[str])
+
         Returns:
-            dict: Has an entry for each event id we already have seen. Maps to
-            the rejected reason string if we rejected the event, else maps to
-            None.
+            Deferred[dict[str, str|None):
+                Has an entry for each event id we already have seen. Maps to
+                the rejected reason string if we rejected the event, else maps
+                to None.
         """
         if not event_ids:
             return defer.succeed({})
@@ -1316,9 +1412,7 @@ class EventsStore(EventsWorkerStore):
 
             return res
 
-        return self.runInteraction(
-            "have_events", f,
-        )
+        return self.runInteraction("get_rejection_reasons", f)
 
     @defer.inlineCallbacks
     def count_daily_messages(self):
@@ -1706,15 +1800,14 @@ class EventsStore(EventsWorkerStore):
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
 
     def purge_history(
-        self, room_id, topological_ordering, delete_local_events,
+        self, room_id, token, delete_local_events,
     ):
         """Deletes room history before a certain point
 
         Args:
             room_id (str):
 
-            topological_ordering (int):
-                minimum topo ordering to preserve
+            token (str): A topological token to delete events before
 
             delete_local_events (bool):
                 if True, we will delete local events as well as remote ones
@@ -1724,13 +1817,15 @@ class EventsStore(EventsWorkerStore):
 
         return self.runInteraction(
             "purge_history",
-            self._purge_history_txn, room_id, topological_ordering,
+            self._purge_history_txn, room_id, token,
             delete_local_events,
         )
 
     def _purge_history_txn(
-        self, txn, room_id, topological_ordering, delete_local_events,
+        self, txn, room_id, token_str, delete_local_events,
     ):
+        token = RoomStreamToken.parse(token_str)
+
         # Tables that should be pruned:
         #     event_auth
         #     event_backward_extremities
@@ -1775,6 +1870,13 @@ class EventsStore(EventsWorkerStore):
             " ON events_to_purge(should_delete)",
         )
 
+        # We do joins against events_to_purge for e.g. calculating state
+        # groups to purge, etc., so lets make an index.
+        txn.execute(
+            "CREATE INDEX events_to_purge_id"
+            " ON events_to_purge(event_id)",
+        )
+
         # First ensure that we're not about to delete all the forward extremeties
         txn.execute(
             "SELECT e.event_id, e.depth FROM events as e "
@@ -1787,7 +1889,7 @@ class EventsStore(EventsWorkerStore):
         rows = txn.fetchall()
         max_depth = max(row[0] for row in rows)
 
-        if max_depth <= topological_ordering:
+        if max_depth <= token.topological:
             # We need to ensure we don't delete all the events from the datanase
             # otherwise we wouldn't be able to send any events (due to not
             # having any backwards extremeties)
@@ -1803,7 +1905,7 @@ class EventsStore(EventsWorkerStore):
             should_delete_expr += " AND event_id NOT LIKE ?"
             should_delete_params += ("%:" + self.hs.hostname, )
 
-        should_delete_params += (room_id, topological_ordering)
+        should_delete_params += (room_id, token.topological)
 
         txn.execute(
             "INSERT INTO events_to_purge"
@@ -1826,13 +1928,13 @@ class EventsStore(EventsWorkerStore):
         logger.info("[purge] Finding new backward extremities")
 
         # We calculate the new entries for the backward extremeties by finding
-        # all events that point to events that are to be purged
+        # events to be purged that are pointed to by events we're not going to
+        # purge.
         txn.execute(
             "SELECT DISTINCT e.event_id FROM events_to_purge AS e"
             " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
-            " INNER JOIN events AS e2 ON e2.event_id = ed.event_id"
-            " WHERE e2.topological_ordering >= ?",
-            (topological_ordering, )
+            " LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id"
+            " WHERE ep2.event_id IS NULL",
         )
         new_backwards_extrems = txn.fetchall()
 
@@ -1856,16 +1958,22 @@ class EventsStore(EventsWorkerStore):
 
         # Get all state groups that are only referenced by events that are
         # to be deleted.
-        txn.execute(
-            "SELECT state_group FROM event_to_state_groups"
-            " INNER JOIN events USING (event_id)"
-            " WHERE state_group IN ("
-            "   SELECT DISTINCT state_group FROM events_to_purge"
-            "   INNER JOIN event_to_state_groups USING (event_id)"
-            " )"
-            " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
-            (topological_ordering, )
-        )
+        # This works by first getting state groups that we may want to delete,
+        # joining against event_to_state_groups to get events that use that
+        # state group, then left joining against events_to_purge again. Any
+        # state group where the left join produce *no nulls* are referenced
+        # only by events that are going to be purged.
+        txn.execute("""
+            SELECT state_group FROM
+            (
+                SELECT DISTINCT state_group FROM events_to_purge
+                INNER JOIN event_to_state_groups USING (event_id)
+            ) AS sp
+            INNER JOIN event_to_state_groups USING (state_group)
+            LEFT JOIN events_to_purge AS ep USING (event_id)
+            GROUP BY state_group
+            HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0
+        """)
 
         state_rows = txn.fetchall()
         logger.info("[purge] found %i redundant state groups", len(state_rows))
@@ -2012,10 +2120,25 @@ class EventsStore(EventsWorkerStore):
         #
         # So, let's stick it at the end so that we don't block event
         # persistence.
-        logger.info("[purge] updating room_depth")
+        #
+        # We do this by calculating the minimum depth of the backwards
+        # extremities. However, the events in event_backward_extremities
+        # are ones we don't have yet so we need to look at the events that
+        # point to it via event_edges table.
+        txn.execute("""
+            SELECT COALESCE(MIN(depth), 0)
+            FROM event_backward_extremities AS eb
+            INNER JOIN event_edges AS eg ON eg.prev_event_id = eb.event_id
+            INNER JOIN events AS e ON e.event_id = eg.event_id
+            WHERE eb.room_id = ?
+        """, (room_id,))
+        min_depth, = txn.fetchone()
+
+        logger.info("[purge] updating room_depth to %d", min_depth)
+
         txn.execute(
             "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
-            (topological_ordering, room_id,)
+            (min_depth, room_id,)
         )
 
         # finally, drop the temp table. this will commit the txn in sqlite,
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 2e23dd78ba..32d9d00ffb 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -20,7 +20,7 @@ from synapse.events import FrozenEvent
 from synapse.events.utils import prune_event
 
 from synapse.util.logcontext import (
-    preserve_fn, PreserveLoggingContext, make_deferred_yieldable
+    PreserveLoggingContext, make_deferred_yieldable, run_in_background,
 )
 from synapse.util.metrics import Measure
 from synapse.api.errors import SynapseError
@@ -51,6 +51,26 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
 
 
 class EventsWorkerStore(SQLBaseStore):
+    def get_received_ts(self, event_id):
+        """Get received_ts (when it was persisted) for the event.
+
+        Raises an exception for unknown events.
+
+        Args:
+            event_id (str)
+
+        Returns:
+            Deferred[int|None]: Timestamp in milliseconds, or None for events
+            that were persisted before received_ts was implemented.
+        """
+        return self._simple_select_one_onecol(
+            table="events",
+            keyvalues={
+                "event_id": event_id,
+            },
+            retcol="received_ts",
+            desc="get_received_ts",
+        )
 
     @defer.inlineCallbacks
     def get_event(self, event_id, check_redacted=True,
@@ -299,7 +319,8 @@ class EventsWorkerStore(SQLBaseStore):
 
         res = yield make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self._get_event_from_row)(
+                run_in_background(
+                    self._get_event_from_row,
                     row["internal_metadata"], row["json"], row["redacts"],
                     rejected_reason=row["rejects"],
                 )
@@ -316,7 +337,7 @@ class EventsWorkerStore(SQLBaseStore):
     def _fetch_event_rows(self, txn, events):
         rows = []
         N = 200
-        for i in range(1 + len(events) / N):
+        for i in range(1 + len(events) // N):
             evs = events[i * N:(i + 1) * N]
             if not evs:
                 break
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 78b1e30945..2e2763126d 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -44,7 +44,7 @@ class FilteringStore(SQLBaseStore):
             desc="get_user_filter",
         )
 
-        defer.returnValue(json.loads(str(def_json).decode("utf-8")))
+        defer.returnValue(json.loads(bytes(def_json).decode("utf-8")))
 
     def add_user_filter(self, user_localpart, user_filter):
         def_json = encode_canonical_json(user_filter)
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index 8fde1aab8e..da05ccb027 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -19,7 +20,7 @@ from synapse.api.errors import SynapseError
 
 from ._base import SQLBaseStore
 
-import ujson as json
+import simplejson as json
 
 
 # The category ID for the "default" category. We don't store as null in the
@@ -29,6 +30,24 @@ _DEFAULT_ROLE_ID = ""
 
 
 class GroupServerStore(SQLBaseStore):
+    def set_group_join_policy(self, group_id, join_policy):
+        """Set the join policy of a group.
+
+        join_policy can be one of:
+         * "invite"
+         * "open"
+        """
+        return self._simple_update_one(
+            table="groups",
+            keyvalues={
+                "group_id": group_id,
+            },
+            updatevalues={
+                "join_policy": join_policy,
+            },
+            desc="set_group_join_policy",
+        )
+
     def get_group(self, group_id):
         return self._simple_select_one(
             table="groups",
@@ -36,10 +55,11 @@ class GroupServerStore(SQLBaseStore):
                 "group_id": group_id,
             },
             retcols=(
-                "name", "short_description", "long_description", "avatar_url", "is_public"
+                "name", "short_description", "long_description",
+                "avatar_url", "is_public", "join_policy",
             ),
             allow_none=True,
-            desc="is_user_in_group",
+            desc="get_group",
         )
 
     def get_users_in_group(self, group_id, include_private=False):
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 87aeaf71d6..0540c2b0b1 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -92,7 +92,7 @@ class KeyStore(SQLBaseStore):
 
         if verify_key_bytes:
             defer.returnValue(decode_verify_key_bytes(
-                key_id, str(verify_key_bytes)
+                key_id, bytes(verify_key_bytes)
             ))
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index c845a0cec5..c08e9cd65a 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -25,7 +26,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 47
+SCHEMA_VERSION = 49
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index eac8694e0f..709c69a926 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -23,7 +23,7 @@ from twisted.internet import defer
 
 import abc
 import logging
-import ujson as json
+import simplejson as json
 
 
 logger = logging.getLogger(__name__)
@@ -297,18 +297,22 @@ class ReceiptsWorkerStore(SQLBaseStore):
         if receipt_type != "m.read":
             return
 
-        # Returns an ObservableDeferred
+        # Returns either an ObservableDeferred or the raw result
         res = self.get_users_with_read_receipts_in_room.cache.get(
             room_id, None, update_metrics=False,
         )
 
-        if res:
-            if isinstance(res, defer.Deferred) and res.called:
+        # first handle the Deferred case
+        if isinstance(res, defer.Deferred):
+            if res.called:
                 res = res.result
-            if user_id in res:
-                # We'd only be adding to the set, so no point invalidating if the
-                # user is already there
-                return
+            else:
+                res = None
+
+        if res and user_id in res:
+            # We'd only be adding to the set, so no point invalidating if the
+            # user is already there
+            return
 
         self.get_users_with_read_receipts_in_room.invalidate((room_id,))
 
@@ -407,7 +411,6 @@ class ReceiptsStore(ReceiptsWorkerStore):
                 txn,
                 room_id=room_id,
                 user_id=user_id,
-                topological_ordering=topological_ordering,
                 stream_ordering=stream_ordering,
             )
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index d809b2ba46..a530e29f43 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -22,6 +22,8 @@ from synapse.storage import background_updates
 from synapse.storage._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
+from six.moves import range
+
 
 class RegistrationWorkerStore(SQLBaseStore):
     @cached()
@@ -31,7 +33,10 @@ class RegistrationWorkerStore(SQLBaseStore):
             keyvalues={
                 "name": user_id,
             },
-            retcols=["name", "password_hash", "is_guest"],
+            retcols=[
+                "name", "password_hash", "is_guest",
+                "consent_version", "consent_server_notice_sent",
+            ],
             allow_none=True,
             desc="get_user_by_id",
         )
@@ -284,6 +289,53 @@ class RegistrationStore(RegistrationWorkerStore,
             "user_set_password_hash", user_set_password_hash_txn
         )
 
+    def user_set_consent_version(self, user_id, consent_version):
+        """Updates the user table to record privacy policy consent
+
+        Args:
+            user_id (str): full mxid of the user to update
+            consent_version (str): version of the policy the user has consented
+                to
+
+        Raises:
+            StoreError(404) if user not found
+        """
+        def f(txn):
+            self._simple_update_one_txn(
+                txn,
+                table='users',
+                keyvalues={'name': user_id, },
+                updatevalues={'consent_version': consent_version, },
+            )
+            self._invalidate_cache_and_stream(
+                txn, self.get_user_by_id, (user_id,)
+            )
+        return self.runInteraction("user_set_consent_version", f)
+
+    def user_set_consent_server_notice_sent(self, user_id, consent_version):
+        """Updates the user table to record that we have sent the user a server
+        notice about privacy policy consent
+
+        Args:
+            user_id (str): full mxid of the user to update
+            consent_version (str): version of the policy we have notified the
+                user about
+
+        Raises:
+            StoreError(404) if user not found
+        """
+        def f(txn):
+            self._simple_update_one_txn(
+                txn,
+                table='users',
+                keyvalues={'name': user_id, },
+                updatevalues={'consent_server_notice_sent': consent_version, },
+            )
+            self._invalidate_cache_and_stream(
+                txn, self.get_user_by_id, (user_id,)
+            )
+        return self.runInteraction("user_set_consent_server_notice_sent", f)
+
     def user_delete_access_tokens(self, user_id, except_token_id=None,
                                   device_id=None):
         """
@@ -460,18 +512,16 @@ class RegistrationStore(RegistrationWorkerStore,
         """
         def _find_next_generated_user_id(txn):
             txn.execute("SELECT name FROM users")
-            rows = self.cursor_to_dict(txn)
 
             regex = re.compile("^@(\d+):")
 
             found = set()
 
-            for r in rows:
-                user_id = r["name"]
+            for user_id, in txn:
                 match = regex.search(user_id)
                 if match:
                     found.add(int(match.group(1)))
-            for i in xrange(len(found) + 1):
+            for i in range(len(found) + 1):
                 if i not in found:
                     return i
 
@@ -526,3 +576,42 @@ class RegistrationStore(RegistrationWorkerStore,
         except self.database_engine.module.IntegrityError:
             ret = yield self.get_3pid_guest_access_token(medium, address)
             defer.returnValue(ret)
+
+    def add_user_pending_deactivation(self, user_id):
+        """
+        Adds a user to the table of users who need to be parted from all the rooms they're
+        in
+        """
+        return self._simple_insert(
+            "users_pending_deactivation",
+            values={
+                "user_id": user_id,
+            },
+            desc="add_user_pending_deactivation",
+        )
+
+    def del_user_pending_deactivation(self, user_id):
+        """
+        Removes the given user to the table of users who need to be parted from all the
+        rooms they're in, effectively marking that user as fully deactivated.
+        """
+        return self._simple_delete_one(
+            "users_pending_deactivation",
+            keyvalues={
+                "user_id": user_id,
+            },
+            desc="del_user_pending_deactivation",
+        )
+
+    def get_user_pending_deactivation(self):
+        """
+        Gets one user from the table of users waiting to be parted from all the rooms
+        they're in.
+        """
+        return self._simple_select_one_onecol(
+            "users_pending_deactivation",
+            keyvalues={},
+            retcol="user_id",
+            allow_none=True,
+            desc="get_users_pending_deactivation",
+        )
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 34ed84ea22..ea6a189185 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -22,7 +22,7 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
 import collections
 import logging
-import ujson as json
+import simplejson as json
 import re
 
 logger = logging.getLogger(__name__)
@@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
 
             # Convert the IDs to MXC URIs
             for media_id in local_mxcs:
-                local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id))
+                local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
             for hostname, media_id in remote_mxcs:
                 remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
 
@@ -594,7 +594,8 @@ class RoomStore(RoomWorkerStore, SearchStore):
 
         while next_token:
             sql = """
-                SELECT stream_ordering, content FROM events
+                SELECT stream_ordering, json FROM events
+                JOIN event_json USING (room_id, event_id)
                 WHERE room_id = ?
                     AND stream_ordering < ?
                     AND contains_url = ? AND outlier = ?
@@ -606,8 +607,8 @@ class RoomStore(RoomWorkerStore, SearchStore):
             next_token = None
             for stream_ordering, content_json in txn:
                 next_token = stream_ordering
-                content = json.loads(content_json)
-
+                event_json = json.loads(content_json)
+                content = event_json["content"]
                 content_url = content.get("url")
                 thumbnail_url = content.get("info", {}).get("thumbnail_url")
 
@@ -618,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
                     if matches:
                         hostname = matches.group(1)
                         media_id = matches.group(2)
-                        if hostname == self.hostname:
+                        if hostname == self.hs.hostname:
                             local_media_mxcs.append(media_id)
                         else:
                             remote_media_mxcs.append((hostname, media_id))
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 52e19e16b0..7bfc3d91b5 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -28,7 +28,9 @@ from synapse.api.constants import Membership, EventTypes
 from synapse.types import get_domain_from_id
 
 import logging
-import ujson as json
+import simplejson as json
+
+from six import itervalues, iteritems
 
 logger = logging.getLogger(__name__)
 
@@ -272,7 +274,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         users_in_room = {}
         member_event_ids = [
             e_id
-            for key, e_id in current_state_ids.iteritems()
+            for key, e_id in iteritems(current_state_ids)
             if key[0] == EventTypes.Member
         ]
 
@@ -289,7 +291,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
                     users_in_room = dict(prev_res)
                     member_event_ids = [
                         e_id
-                        for key, e_id in context.delta_ids.iteritems()
+                        for key, e_id in iteritems(context.delta_ids)
                         if key[0] == EventTypes.Member
                     ]
                     for etype, state_key in context.delta_ids:
@@ -645,8 +647,9 @@ class RoomMemberStore(RoomMemberWorkerStore):
 
         def add_membership_profile_txn(txn):
             sql = ("""
-                SELECT stream_ordering, event_id, events.room_id, content
+                SELECT stream_ordering, event_id, events.room_id, event_json.json
                 FROM events
+                INNER JOIN event_json USING (event_id)
                 INNER JOIN room_memberships USING (event_id)
                 WHERE ? <= stream_ordering AND stream_ordering < ?
                 AND type = 'm.room.member'
@@ -667,7 +670,8 @@ class RoomMemberStore(RoomMemberWorkerStore):
                 event_id = row["event_id"]
                 room_id = row["room_id"]
                 try:
-                    content = json.loads(row["content"])
+                    event_json = json.loads(row["json"])
+                    content = event_json['content']
                 except Exception:
                     continue
 
@@ -739,7 +743,7 @@ class _JoinedHostsCache(object):
             if state_entry.state_group == self.state_group:
                 pass
             elif state_entry.prev_group == self.state_group:
-                for (typ, state_key), event_id in state_entry.delta_ids.iteritems():
+                for (typ, state_key), event_id in iteritems(state_entry.delta_ids):
                     if typ != EventTypes.Member:
                         continue
 
@@ -769,7 +773,7 @@ class _JoinedHostsCache(object):
                 self.state_group = state_entry.state_group
             else:
                 self.state_group = object()
-            self._len = sum(len(v) for v in self.hosts_to_joined_users.itervalues())
+            self._len = sum(len(v) for v in itervalues(self.hosts_to_joined_users))
         defer.returnValue(frozenset(self.hosts_to_joined_users))
 
     def __len__(self):
diff --git a/synapse/storage/schema/delta/14/upgrade_appservice_db.py b/synapse/storage/schema/delta/14/upgrade_appservice_db.py
index 8755bb2e49..4d725b92fe 100644
--- a/synapse/storage/schema/delta/14/upgrade_appservice_db.py
+++ b/synapse/storage/schema/delta/14/upgrade_appservice_db.py
@@ -12,9 +12,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import json
 import logging
 
+import simplejson as json
+
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
index 4269ac69ad..e7351c3ae6 100644
--- a/synapse/storage/schema/delta/25/fts.py
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -17,7 +17,7 @@ import logging
 from synapse.storage.prepare_database import get_statements
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
-import ujson
+import simplejson
 
 logger = logging.getLogger(__name__)
 
@@ -66,7 +66,7 @@ def run_create(cur, database_engine, *args, **kwargs):
             "max_stream_id_exclusive": max_stream_id + 1,
             "rows_inserted": 0,
         }
-        progress_json = ujson.dumps(progress)
+        progress_json = simplejson.dumps(progress)
 
         sql = (
             "INSERT into background_updates (update_name, progress_json)"
diff --git a/synapse/storage/schema/delta/27/ts.py b/synapse/storage/schema/delta/27/ts.py
index 71b12a2731..6df57b5206 100644
--- a/synapse/storage/schema/delta/27/ts.py
+++ b/synapse/storage/schema/delta/27/ts.py
@@ -16,7 +16,7 @@ import logging
 
 from synapse.storage.prepare_database import get_statements
 
-import ujson
+import simplejson
 
 logger = logging.getLogger(__name__)
 
@@ -45,7 +45,7 @@ def run_create(cur, database_engine, *args, **kwargs):
             "max_stream_id_exclusive": max_stream_id + 1,
             "rows_inserted": 0,
         }
-        progress_json = ujson.dumps(progress)
+        progress_json = simplejson.dumps(progress)
 
         sql = (
             "INSERT into background_updates (update_name, progress_json)"
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index c53e53c94f..85bd1a2006 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -14,6 +14,8 @@
 import logging
 from synapse.config.appservice import load_appservices
 
+from six.moves import range
+
 
 logger = logging.getLogger(__name__)
 
@@ -58,7 +60,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
 
     for as_id, user_ids in owned.items():
         n = 100
-        user_chunks = (user_ids[i:i + 100] for i in xrange(0, len(user_ids), n))
+        user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n))
         for chunk in user_chunks:
             cur.execute(
                 database_engine.convert_param_style(
diff --git a/synapse/storage/schema/delta/31/search_update.py b/synapse/storage/schema/delta/31/search_update.py
index 470ae0c005..fe6b7d196d 100644
--- a/synapse/storage/schema/delta/31/search_update.py
+++ b/synapse/storage/schema/delta/31/search_update.py
@@ -16,7 +16,7 @@ from synapse.storage.engines import PostgresEngine
 from synapse.storage.prepare_database import get_statements
 
 import logging
-import ujson
+import simplejson
 
 logger = logging.getLogger(__name__)
 
@@ -49,7 +49,7 @@ def run_create(cur, database_engine, *args, **kwargs):
             "rows_inserted": 0,
             "have_added_indexes": False,
         }
-        progress_json = ujson.dumps(progress)
+        progress_json = simplejson.dumps(progress)
 
         sql = (
             "INSERT into background_updates (update_name, progress_json)"
diff --git a/synapse/storage/schema/delta/33/event_fields.py b/synapse/storage/schema/delta/33/event_fields.py
index 83066cccc9..1e002f9db2 100644
--- a/synapse/storage/schema/delta/33/event_fields.py
+++ b/synapse/storage/schema/delta/33/event_fields.py
@@ -15,7 +15,7 @@
 from synapse.storage.prepare_database import get_statements
 
 import logging
-import ujson
+import simplejson
 
 logger = logging.getLogger(__name__)
 
@@ -44,7 +44,7 @@ def run_create(cur, database_engine, *args, **kwargs):
             "max_stream_id_exclusive": max_stream_id + 1,
             "rows_inserted": 0,
         }
-        progress_json = ujson.dumps(progress)
+        progress_json = simplejson.dumps(progress)
 
         sql = (
             "INSERT into background_updates (update_name, progress_json)"
diff --git a/synapse/storage/schema/delta/48/add_user_consent.sql b/synapse/storage/schema/delta/48/add_user_consent.sql
new file mode 100644
index 0000000000..5237491506
--- /dev/null
+++ b/synapse/storage/schema/delta/48/add_user_consent.sql
@@ -0,0 +1,18 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* record the version of the privacy policy the user has consented to
+ */
+ALTER TABLE users ADD COLUMN consent_version TEXT;
diff --git a/synapse/storage/schema/delta/48/add_user_ips_last_seen_index.sql b/synapse/storage/schema/delta/48/add_user_ips_last_seen_index.sql
new file mode 100644
index 0000000000..9248b0b24a
--- /dev/null
+++ b/synapse/storage/schema/delta/48/add_user_ips_last_seen_index.sql
@@ -0,0 +1,17 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('user_ips_last_seen_index', '{}');
diff --git a/synapse/storage/schema/delta/48/deactivated_users.sql b/synapse/storage/schema/delta/48/deactivated_users.sql
new file mode 100644
index 0000000000..e9013a6969
--- /dev/null
+++ b/synapse/storage/schema/delta/48/deactivated_users.sql
@@ -0,0 +1,25 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Store any accounts that have been requested to be deactivated.
+ * We part the account from all the rooms its in when its
+ * deactivated. This can take some time and synapse may be restarted
+ * before it completes, so store the user IDs here until the process
+ * is complete.
+ */
+CREATE TABLE users_pending_deactivation (
+    user_id TEXT NOT NULL
+);
diff --git a/synapse/storage/schema/delta/48/group_unique_indexes.py b/synapse/storage/schema/delta/48/group_unique_indexes.py
new file mode 100644
index 0000000000..2233af87d7
--- /dev/null
+++ b/synapse/storage/schema/delta/48/group_unique_indexes.py
@@ -0,0 +1,57 @@
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.prepare_database import get_statements
+
+FIX_INDEXES = """
+-- rebuild indexes as uniques
+DROP INDEX groups_invites_g_idx;
+CREATE UNIQUE INDEX group_invites_g_idx ON group_invites(group_id, user_id);
+DROP INDEX groups_users_g_idx;
+CREATE UNIQUE INDEX group_users_g_idx ON group_users(group_id, user_id);
+
+-- rename other indexes to actually match their table names..
+DROP INDEX groups_users_u_idx;
+CREATE INDEX group_users_u_idx ON group_users(user_id);
+DROP INDEX groups_invites_u_idx;
+CREATE INDEX group_invites_u_idx ON group_invites(user_id);
+DROP INDEX groups_rooms_g_idx;
+CREATE UNIQUE INDEX group_rooms_g_idx ON group_rooms(group_id, room_id);
+DROP INDEX groups_rooms_r_idx;
+CREATE INDEX group_rooms_r_idx ON group_rooms(room_id);
+"""
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid"
+
+    # remove duplicates from group_users & group_invites tables
+    cur.execute("""
+        DELETE FROM group_users WHERE %s NOT IN (
+           SELECT min(%s) FROM group_users GROUP BY group_id, user_id
+        );
+    """ % (rowid, rowid))
+    cur.execute("""
+        DELETE FROM group_invites WHERE %s NOT IN (
+           SELECT min(%s) FROM group_invites GROUP BY group_id, user_id
+        );
+    """ % (rowid, rowid))
+
+    for statement in get_statements(FIX_INDEXES.splitlines()):
+        cur.execute(statement)
+
+
+def run_upgrade(*args, **kwargs):
+    pass
diff --git a/synapse/storage/schema/delta/48/groups_joinable.sql b/synapse/storage/schema/delta/48/groups_joinable.sql
new file mode 100644
index 0000000000..ce26eaf0c9
--- /dev/null
+++ b/synapse/storage/schema/delta/48/groups_joinable.sql
@@ -0,0 +1,22 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* 
+ * This isn't a real ENUM because sqlite doesn't support it
+ * and we use a default of NULL for inserted rows and interpret
+ * NULL at the python store level as necessary so that existing
+ * rows are given the correct default policy.
+ */
+ALTER TABLE groups ADD COLUMN join_policy TEXT NOT NULL DEFAULT 'invite';
diff --git a/synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql b/synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql
new file mode 100644
index 0000000000..14dcf18d73
--- /dev/null
+++ b/synapse/storage/schema/delta/49/add_user_consent_server_notice_sent.sql
@@ -0,0 +1,20 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* record whether we have sent a server notice about consenting to the
+ * privacy policy. Specifically records the version of the policy we sent
+ * a message about.
+ */
+ALTER TABLE users ADD COLUMN consent_server_notice_sent TEXT;
diff --git a/synapse/storage/schema/delta/49/add_user_daily_visits.sql b/synapse/storage/schema/delta/49/add_user_daily_visits.sql
new file mode 100644
index 0000000000..3dd478196f
--- /dev/null
+++ b/synapse/storage/schema/delta/49/add_user_daily_visits.sql
@@ -0,0 +1,21 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE user_daily_visits ( user_id TEXT NOT NULL,
+                                 device_id TEXT,
+                                 timestamp BIGINT NOT NULL );
+CREATE INDEX user_daily_visits_uts_idx ON user_daily_visits(user_id, timestamp);
+CREATE INDEX user_daily_visits_ts_idx ON user_daily_visits(timestamp);
diff --git a/synapse/storage/schema/delta/49/add_user_ips_last_seen_only_index.sql b/synapse/storage/schema/delta/49/add_user_ips_last_seen_only_index.sql
new file mode 100644
index 0000000000..3a4ed59b5b
--- /dev/null
+++ b/synapse/storage/schema/delta/49/add_user_ips_last_seen_only_index.sql
@@ -0,0 +1,17 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('user_ips_last_seen_only_index', '{}');
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 2755acff40..6ba3e59889 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -16,7 +16,7 @@
 from collections import namedtuple
 import logging
 import re
-import ujson as json
+import simplejson as json
 
 from twisted.internet import defer
 
@@ -75,8 +75,9 @@ class SearchStore(BackgroundUpdateStore):
 
         def reindex_search_txn(txn):
             sql = (
-                "SELECT stream_ordering, event_id, room_id, type, content, "
+                "SELECT stream_ordering, event_id, room_id, type, json, "
                 " origin_server_ts FROM events"
+                " JOIN event_json USING (room_id, event_id)"
                 " WHERE ? <= stream_ordering AND stream_ordering < ?"
                 " AND (%s)"
                 " ORDER BY stream_ordering DESC"
@@ -104,7 +105,8 @@ class SearchStore(BackgroundUpdateStore):
                     stream_ordering = row["stream_ordering"]
                     origin_server_ts = row["origin_server_ts"]
                     try:
-                        content = json.loads(row["content"])
+                        event_json = json.loads(row["json"])
+                        content = event_json["content"]
                     except Exception:
                         continue
 
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 2956c3b3e0..fb463c525a 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -38,15 +38,17 @@ from twisted.internet import defer
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.events import EventsWorkerStore
 
-from synapse.util.caches.descriptors import cached
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
-from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+from synapse.storage.engines import PostgresEngine
 
 import abc
 import logging
 
+from six.moves import range
+from collections import namedtuple
+
 
 logger = logging.getLogger(__name__)
 
@@ -58,6 +60,12 @@ _STREAM_TOKEN = "stream"
 _TOPOLOGICAL_TOKEN = "topological"
 
 
+# Used as return values for pagination APIs
+_EventDictReturn = namedtuple("_EventDictReturn", (
+    "event_id", "topological_ordering", "stream_ordering",
+))
+
+
 def lower_bound(token, engine, inclusive=False):
     inclusive = "=" if inclusive else ""
     if token.topological is None:
@@ -196,13 +204,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         results = {}
         room_ids = list(room_ids)
-        for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
+        for rm_ids in (room_ids[i:i + 20] for i in range(0, len(room_ids), 20)):
             res = yield make_deferred_yieldable(defer.gatherResults([
-                preserve_fn(self.get_room_events_stream_for_room)(
+                run_in_background(
+                    self.get_room_events_stream_for_room,
                     room_id, from_key, to_key, limit, order=order,
                 )
                 for room_id in rm_ids
-            ]))
+            ], consumeErrors=True))
             results.update(dict(zip(rm_ids, res)))
 
         defer.returnValue(results)
@@ -224,54 +233,55 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
     @defer.inlineCallbacks
     def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0,
                                         order='DESC'):
-        # Note: If from_key is None then we return in topological order. This
-        # is because in that case we're using this as a "get the last few messages
-        # in a room" function, rather than "get new messages since last sync"
-        if from_key is not None:
-            from_id = RoomStreamToken.parse_stream_token(from_key).stream
-        else:
-            from_id = None
-        to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
+        """Get new room events in stream ordering since `from_key`.
+
+        Args:
+            room_id (str)
+            from_key (str): Token from which no events are returned before
+            to_key (str): Token from which no events are returned after. (This
+                is typically the current stream token)
+            limit (int): Maximum number of events to return
+            order (str): Either "DESC" or "ASC". Determines which events are
+                returned when the result is limited. If "DESC" then the most
+                recent `limit` events are returned, otherwise returns the
+                oldest `limit` events.
+
+        Returns:
+            Deferred[tuple[list[FrozenEvent], str]]: Returns the list of
+            events (in ascending order) and the token from the start of
+            the chunk of events returned.
+        """
         if from_key == to_key:
             defer.returnValue(([], from_key))
 
-        if from_id:
-            has_changed = yield self._events_stream_cache.has_entity_changed(
-                room_id, from_id
-            )
-
-            if not has_changed:
-                defer.returnValue(([], from_key))
+        from_id = RoomStreamToken.parse_stream_token(from_key).stream
+        to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
-        def f(txn):
-            if from_id is not None:
-                sql = (
-                    "SELECT event_id, stream_ordering FROM events WHERE"
-                    " room_id = ?"
-                    " AND not outlier"
-                    " AND stream_ordering > ? AND stream_ordering <= ?"
-                    " ORDER BY stream_ordering %s LIMIT ?"
-                ) % (order,)
-                txn.execute(sql, (room_id, from_id, to_id, limit))
-            else:
-                sql = (
-                    "SELECT event_id, stream_ordering FROM events WHERE"
-                    " room_id = ?"
-                    " AND not outlier"
-                    " AND stream_ordering <= ?"
-                    " ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?"
-                ) % (order, order,)
-                txn.execute(sql, (room_id, to_id, limit))
+        has_changed = yield self._events_stream_cache.has_entity_changed(
+            room_id, from_id
+        )
 
-            rows = self.cursor_to_dict(txn)
+        if not has_changed:
+            defer.returnValue(([], from_key))
 
+        def f(txn):
+            sql = (
+                "SELECT event_id, stream_ordering FROM events WHERE"
+                " room_id = ?"
+                " AND not outlier"
+                " AND stream_ordering > ? AND stream_ordering <= ?"
+                " ORDER BY stream_ordering %s LIMIT ?"
+            ) % (order,)
+            txn.execute(sql, (room_id, from_id, to_id, limit))
+
+            rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
             return rows
 
         rows = yield self.runInteraction("get_room_events_stream_for_room", f)
 
         ret = yield self._get_events(
-            [r["event_id"] for r in rows],
+            [r.event_id for r in rows],
             get_prev_content=True
         )
 
@@ -281,7 +291,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             ret.reverse()
 
         if rows:
-            key = "s%d" % min(r["stream_ordering"] for r in rows)
+            key = "s%d" % min(r.stream_ordering for r in rows)
         else:
             # Assume we didn't get anything because there was nothing to
             # get.
@@ -291,10 +301,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_membership_changes_for_user(self, user_id, from_key, to_key):
-        if from_key is not None:
-            from_id = RoomStreamToken.parse_stream_token(from_key).stream
-        else:
-            from_id = None
+        from_id = RoomStreamToken.parse_stream_token(from_key).stream
         to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
         if from_key == to_key:
@@ -308,34 +315,24 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 defer.returnValue([])
 
         def f(txn):
-            if from_id is not None:
-                sql = (
-                    "SELECT m.event_id, stream_ordering FROM events AS e,"
-                    " room_memberships AS m"
-                    " WHERE e.event_id = m.event_id"
-                    " AND m.user_id = ?"
-                    " AND e.stream_ordering > ? AND e.stream_ordering <= ?"
-                    " ORDER BY e.stream_ordering ASC"
-                )
-                txn.execute(sql, (user_id, from_id, to_id,))
-            else:
-                sql = (
-                    "SELECT m.event_id, stream_ordering FROM events AS e,"
-                    " room_memberships AS m"
-                    " WHERE e.event_id = m.event_id"
-                    " AND m.user_id = ?"
-                    " AND stream_ordering <= ?"
-                    " ORDER BY stream_ordering ASC"
-                )
-                txn.execute(sql, (user_id, to_id,))
-            rows = self.cursor_to_dict(txn)
+            sql = (
+                "SELECT m.event_id, stream_ordering FROM events AS e,"
+                " room_memberships AS m"
+                " WHERE e.event_id = m.event_id"
+                " AND m.user_id = ?"
+                " AND e.stream_ordering > ? AND e.stream_ordering <= ?"
+                " ORDER BY e.stream_ordering ASC"
+            )
+            txn.execute(sql, (user_id, from_id, to_id,))
+
+            rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
 
             return rows
 
         rows = yield self.runInteraction("get_membership_changes_for_user", f)
 
         ret = yield self._get_events(
-            [r["event_id"] for r in rows],
+            [r.event_id for r in rows],
             get_prev_content=True
         )
 
@@ -344,14 +341,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
+    def get_recent_events_for_room(self, room_id, limit, end_token):
+        """Get the most recent events in the room in topological ordering.
+
+        Args:
+            room_id (str)
+            limit (int)
+            end_token (str): The stream token representing now.
+
+        Returns:
+            Deferred[tuple[list[FrozenEvent],  str]]: Returns a list of
+            events and a token pointing to the start of the returned
+            events.
+            The events returned are in ascending order.
+        """
+
         rows, token = yield self.get_recent_event_ids_for_room(
-            room_id, limit, end_token, from_token
+            room_id, limit, end_token,
         )
 
         logger.debug("stream before")
         events = yield self._get_events(
-            [r["event_id"] for r in rows],
+            [r.event_id for r in rows],
             get_prev_content=True
         )
         logger.debug("stream after")
@@ -360,60 +371,36 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         defer.returnValue((events, token))
 
-    @cached(num_args=4)
-    def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None):
-        end_token = RoomStreamToken.parse_stream_token(end_token)
-
-        if from_token is None:
-            sql = (
-                "SELECT stream_ordering, topological_ordering, event_id"
-                " FROM events"
-                " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?"
-                " ORDER BY topological_ordering DESC, stream_ordering DESC"
-                " LIMIT ?"
-            )
-        else:
-            from_token = RoomStreamToken.parse_stream_token(from_token)
-            sql = (
-                "SELECT stream_ordering, topological_ordering, event_id"
-                " FROM events"
-                " WHERE room_id = ? AND stream_ordering > ?"
-                " AND stream_ordering <= ? AND outlier = ?"
-                " ORDER BY topological_ordering DESC, stream_ordering DESC"
-                " LIMIT ?"
-            )
-
-        def get_recent_events_for_room_txn(txn):
-            if from_token is None:
-                txn.execute(sql, (room_id, end_token.stream, False, limit,))
-            else:
-                txn.execute(sql, (
-                    room_id, from_token.stream, end_token.stream, False, limit
-                ))
+    @defer.inlineCallbacks
+    def get_recent_event_ids_for_room(self, room_id, limit, end_token):
+        """Get the most recent events in the room in topological ordering.
 
-            rows = self.cursor_to_dict(txn)
+        Args:
+            room_id (str)
+            limit (int)
+            end_token (str): The stream token representing now.
 
-            rows.reverse()  # As we selected with reverse ordering
+        Returns:
+            Deferred[tuple[list[_EventDictReturn],  str]]: Returns a list of
+            _EventDictReturn and a token pointing to the start of the returned
+            events.
+            The events returned are in ascending order.
+        """
+        # Allow a zero limit here, and no-op.
+        if limit == 0:
+            defer.returnValue(([], end_token))
 
-            if rows:
-                # Tokens are positions between events.
-                # This token points *after* the last event in the chunk.
-                # We need it to point to the event before it in the chunk
-                # since we are going backwards so we subtract one from the
-                # stream part.
-                topo = rows[0]["topological_ordering"]
-                toke = rows[0]["stream_ordering"] - 1
-                start_token = str(RoomStreamToken(topo, toke))
+        end_token = RoomStreamToken.parse(end_token)
 
-                token = (start_token, str(end_token))
-            else:
-                token = (str(end_token), str(end_token))
+        rows, token = yield self.runInteraction(
+            "get_recent_event_ids_for_room", self._paginate_room_events_txn,
+            room_id, from_token=end_token, limit=limit,
+        )
 
-            return rows, token
+        # We want to return the results in ascending order.
+        rows.reverse()
 
-        return self.runInteraction(
-            "get_recent_events_for_room", get_recent_events_for_room_txn
-        )
+        defer.returnValue((rows, token))
 
     def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
         """Gets details of the first event in a room at or after a stream ordering
@@ -517,10 +504,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @staticmethod
     def _set_before_and_after(events, rows, topo_order=True):
+        """Inserts ordering information to events' internal metadata from
+        the DB rows.
+
+        Args:
+            events (list[FrozenEvent])
+            rows (list[_EventDictReturn])
+            topo_order (bool): Whether the events were ordered topologically
+                or by stream ordering. If true then all rows should have a non
+                null topological_ordering.
+        """
         for event, row in zip(events, rows):
-            stream = row["stream_ordering"]
-            if topo_order:
-                topo = event.depth
+            stream = row.stream_ordering
+            if topo_order and row.topological_ordering:
+                topo = row.topological_ordering
             else:
                 topo = None
             internal = event.internal_metadata
@@ -592,87 +589,27 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             retcols=["stream_ordering", "topological_ordering"],
         )
 
-        token = RoomStreamToken(
-            results["topological_ordering"],
+        # Paginating backwards includes the event at the token, but paginating
+        # forward doesn't.
+        before_token = RoomStreamToken(
+            results["topological_ordering"] - 1,
             results["stream_ordering"],
         )
 
-        if isinstance(self.database_engine, Sqlite3Engine):
-            # SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)``
-            # So we give pass it to SQLite3 as the UNION ALL of the two queries.
-
-            query_before = (
-                "SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND topological_ordering < ?"
-                " UNION ALL"
-                " SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?"
-                " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
-            )
-            before_args = (
-                room_id, token.topological,
-                room_id, token.topological, token.stream,
-                before_limit,
-            )
-
-            query_after = (
-                "SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND topological_ordering > ?"
-                " UNION ALL"
-                " SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?"
-                " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
-            )
-            after_args = (
-                room_id, token.topological,
-                room_id, token.topological, token.stream,
-                after_limit,
-            )
-        else:
-            query_before = (
-                "SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND %s"
-                " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
-            ) % (upper_bound(token, self.database_engine, inclusive=False),)
-
-            before_args = (room_id, before_limit)
-
-            query_after = (
-                "SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND %s"
-                " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
-            ) % (lower_bound(token, self.database_engine, inclusive=False),)
-
-            after_args = (room_id, after_limit)
-
-        txn.execute(query_before, before_args)
-
-        rows = self.cursor_to_dict(txn)
-        events_before = [r["event_id"] for r in rows]
-
-        if rows:
-            start_token = str(RoomStreamToken(
-                rows[0]["topological_ordering"],
-                rows[0]["stream_ordering"] - 1,
-            ))
-        else:
-            start_token = str(RoomStreamToken(
-                token.topological,
-                token.stream - 1,
-            ))
-
-        txn.execute(query_after, after_args)
+        after_token = RoomStreamToken(
+            results["topological_ordering"],
+            results["stream_ordering"],
+        )
 
-        rows = self.cursor_to_dict(txn)
-        events_after = [r["event_id"] for r in rows]
+        rows, start_token = self._paginate_room_events_txn(
+            txn, room_id, before_token, direction='b', limit=before_limit,
+        )
+        events_before = [r.event_id for r in rows]
 
-        if rows:
-            end_token = str(RoomStreamToken(
-                rows[-1]["topological_ordering"],
-                rows[-1]["stream_ordering"],
-            ))
-        else:
-            end_token = str(token)
+        rows, end_token = self._paginate_room_events_txn(
+            txn, room_id, after_token, direction='f', limit=after_limit,
+        )
+        events_after = [r.event_id for r in rows]
 
         return {
             "before": {
@@ -735,17 +672,30 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
     def has_room_changed_since(self, room_id, stream_id):
         return self._events_stream_cache.has_entity_changed(room_id, stream_id)
 
+    def _paginate_room_events_txn(self, txn, room_id, from_token, to_token=None,
+                                  direction='b', limit=-1, event_filter=None):
+        """Returns list of events before or after a given token.
 
-class StreamStore(StreamWorkerStore):
-    def get_room_max_stream_ordering(self):
-        return self._stream_id_gen.get_current_token()
+        Args:
+            txn
+            room_id (str)
+            from_token (RoomStreamToken): The token used to stream from
+            to_token (RoomStreamToken|None): A token which if given limits the
+                results to only those before
+            direction(char): Either 'b' or 'f' to indicate whether we are
+                paginating forwards or backwards from `from_key`.
+            limit (int): The maximum number of events to return.
+            event_filter (Filter|None): If provided filters the events to
+                those that match the filter.
 
-    def get_room_min_stream_ordering(self):
-        return self._backfill_id_gen.get_current_token()
+        Returns:
+            Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
+            as a list of _EventDictReturn and a token that points to the end
+            of the result set.
+        """
+
+        assert int(limit) >= 0
 
-    @defer.inlineCallbacks
-    def paginate_room_events(self, room_id, from_key, to_key=None,
-                             direction='b', limit=-1, event_filter=None):
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
@@ -753,20 +703,20 @@ class StreamStore(StreamWorkerStore):
         if direction == 'b':
             order = "DESC"
             bounds = upper_bound(
-                RoomStreamToken.parse(from_key), self.database_engine
+                from_token, self.database_engine
             )
-            if to_key:
+            if to_token:
                 bounds = "%s AND %s" % (bounds, lower_bound(
-                    RoomStreamToken.parse(to_key), self.database_engine
+                    to_token, self.database_engine
                 ))
         else:
             order = "ASC"
             bounds = lower_bound(
-                RoomStreamToken.parse(from_key), self.database_engine
+                from_token, self.database_engine
             )
-            if to_key:
+            if to_token:
                 bounds = "%s AND %s" % (bounds, upper_bound(
-                    RoomStreamToken.parse(to_key), self.database_engine
+                    to_token, self.database_engine
                 ))
 
         filter_clause, filter_args = filter_to_clause(event_filter)
@@ -775,52 +725,85 @@ class StreamStore(StreamWorkerStore):
             bounds += " AND " + filter_clause
             args.extend(filter_args)
 
-        if int(limit) > 0:
-            args.append(int(limit))
-            limit_str = " LIMIT ?"
-        else:
-            limit_str = ""
+        args.append(int(limit))
 
         sql = (
-            "SELECT * FROM events"
+            "SELECT event_id, topological_ordering, stream_ordering"
+            " FROM events"
             " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
             " ORDER BY topological_ordering %(order)s,"
-            " stream_ordering %(order)s %(limit)s"
+            " stream_ordering %(order)s LIMIT ?"
         ) % {
             "bounds": bounds,
             "order": order,
-            "limit": limit_str
         }
 
-        def f(txn):
-            txn.execute(sql, args)
-
-            rows = self.cursor_to_dict(txn)
-
-            if rows:
-                topo = rows[-1]["topological_ordering"]
-                toke = rows[-1]["stream_ordering"]
-                if direction == 'b':
-                    # Tokens are positions between events.
-                    # This token points *after* the last event in the chunk.
-                    # We need it to point to the event before it in the chunk
-                    # when we are going backwards so we subtract one from the
-                    # stream part.
-                    toke -= 1
-                next_token = str(RoomStreamToken(topo, toke))
-            else:
-                # TODO (erikj): We should work out what to do here instead.
-                next_token = to_key if to_key else from_key
+        txn.execute(sql, args)
+
+        rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
+
+        if rows:
+            topo = rows[-1].topological_ordering
+            toke = rows[-1].stream_ordering
+            if direction == 'b':
+                # Tokens are positions between events.
+                # This token points *after* the last event in the chunk.
+                # We need it to point to the event before it in the chunk
+                # when we are going backwards so we subtract one from the
+                # stream part.
+                toke -= 1
+            next_token = RoomStreamToken(topo, toke)
+        else:
+            # TODO (erikj): We should work out what to do here instead.
+            next_token = to_token if to_token else from_token
+
+        return rows, str(next_token),
+
+    @defer.inlineCallbacks
+    def paginate_room_events(self, room_id, from_key, to_key=None,
+                             direction='b', limit=-1, event_filter=None):
+        """Returns list of events before or after a given token.
 
-            return rows, next_token,
+        Args:
+            room_id (str)
+            from_key (str): The token used to stream from
+            to_key (str|None): A token which if given limits the results to
+                only those before
+            direction(char): Either 'b' or 'f' to indicate whether we are
+                paginating forwards or backwards from `from_key`.
+            limit (int): The maximum number of events to return. Zero or less
+                means no limit.
+            event_filter (Filter|None): If provided filters the events to
+                those that match the filter.
 
-        rows, token = yield self.runInteraction("paginate_room_events", f)
+        Returns:
+            tuple[list[dict], str]: Returns the results as a list of dicts and
+            a token that points to the end of the result set. The dicts have
+            the keys "event_id", "topological_ordering" and "stream_orderign".
+        """
+
+        from_key = RoomStreamToken.parse(from_key)
+        if to_key:
+            to_key = RoomStreamToken.parse(to_key)
+
+        rows, token = yield self.runInteraction(
+            "paginate_room_events", self._paginate_room_events_txn,
+            room_id, from_key, to_key, direction, limit, event_filter,
+        )
 
         events = yield self._get_events(
-            [r["event_id"] for r in rows],
+            [r.event_id for r in rows],
             get_prev_content=True
         )
 
         self._set_before_and_after(events, rows)
 
         defer.returnValue((events, token))
+
+
+class StreamStore(StreamWorkerStore):
+    def get_room_max_stream_ordering(self):
+        return self._stream_id_gen.get_current_token()
+
+    def get_room_min_stream_ordering(self):
+        return self._backfill_id_gen.get_current_token()
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index fc46bf7bb3..6671d3cfca 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -19,9 +19,11 @@ from synapse.storage.account_data import AccountDataWorkerStore
 from synapse.util.caches.descriptors import cached
 from twisted.internet import defer
 
-import ujson as json
+import simplejson as json
 import logging
 
+from six.moves import range
+
 logger = logging.getLogger(__name__)
 
 
@@ -98,7 +100,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
 
         batch_size = 50
         results = []
-        for i in xrange(0, len(tag_ids), batch_size):
+        for i in range(0, len(tag_ids), batch_size):
             tags = yield self.runInteraction(
                 "get_all_updated_tag_content",
                 get_tag_content,
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 8f61f7ffae..f825264ea9 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -23,7 +23,7 @@ from canonicaljson import encode_canonical_json
 from collections import namedtuple
 
 import logging
-import ujson as json
+import simplejson as json
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index dfdcbb3181..d6e289ffbe 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -667,7 +667,7 @@ class UserDirectoryStore(SQLBaseStore):
             # The array of numbers are the weights for the various part of the
             # search: (domain, _, display name, localpart)
             sql = """
-                SELECT d.user_id, display_name, avatar_url
+                SELECT d.user_id AS user_id, display_name, avatar_url
                 FROM user_directory_search
                 INNER JOIN user_directory AS d USING (user_id)
                 %s
@@ -702,7 +702,7 @@ class UserDirectoryStore(SQLBaseStore):
             search_query = _parse_query_sqlite(search_term)
 
             sql = """
-                SELECT d.user_id, display_name, avatar_url
+                SELECT d.user_id AS user_id, display_name, avatar_url
                 FROM user_directory_search
                 INNER JOIN user_directory AS d USING (user_id)
                 %s