summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py84
-rw-r--r--synapse/storage/client_ips.py6
-rw-r--r--synapse/storage/devices.py9
-rw-r--r--synapse/storage/end_to_end_keys.py6
-rw-r--r--synapse/storage/event_push_actions.py4
-rw-r--r--synapse/storage/events.py78
-rw-r--r--synapse/storage/events_worker.py2
-rw-r--r--synapse/storage/filtering.py2
-rw-r--r--synapse/storage/keys.py16
-rw-r--r--synapse/storage/presence.py7
-rw-r--r--synapse/storage/receipts.py59
-rw-r--r--synapse/storage/registration.py1
-rw-r--r--synapse/storage/roommember.py10
-rw-r--r--synapse/storage/search.py9
-rw-r--r--synapse/storage/signatures.py12
-rw-r--r--synapse/storage/state.py47
-rw-r--r--synapse/storage/stream.py15
-rw-r--r--synapse/storage/transactions.py10
-rw-r--r--synapse/storage/user_directory.py8
19 files changed, 214 insertions, 171 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 2262776ab2..22d6257a9f 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -18,8 +18,8 @@ from synapse.api.errors import StoreError
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.caches.descriptors import Cache
 from synapse.storage.engines import PostgresEngine
-import synapse.metrics
 
+from prometheus_client import Histogram
 
 from twisted.internet import defer
 
@@ -27,20 +27,25 @@ import sys
 import time
 import threading
 
+from six import itervalues, iterkeys, iteritems
+from six.moves import intern, range
 
 logger = logging.getLogger(__name__)
 
+try:
+    MAX_TXN_ID = sys.maxint - 1
+except AttributeError:
+    # python 3 does not have a maximum int value
+    MAX_TXN_ID = 2**63 - 1
+
 sql_logger = logging.getLogger("synapse.storage.SQL")
 transaction_logger = logging.getLogger("synapse.storage.txn")
 perf_logger = logging.getLogger("synapse.storage.TIME")
 
+sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "sec")
 
-metrics = synapse.metrics.get_metrics_for("synapse.storage")
-
-sql_scheduling_timer = metrics.register_distribution("schedule_time")
-
-sql_query_timer = metrics.register_distribution("query_time", labels=["verb"])
-sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"])
+sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"])
+sql_txn_timer = Histogram("synapse_storage_transaction_time", "sec", ["desc"])
 
 
 class LoggingTransaction(object):
@@ -105,7 +110,7 @@ class LoggingTransaction(object):
                 # Don't let logging failures stop SQL from working
                 pass
 
-        start = time.time() * 1000
+        start = time.time()
 
         try:
             return func(
@@ -115,9 +120,9 @@ class LoggingTransaction(object):
             logger.debug("[SQL FAIL] {%s} %s", self.name, e)
             raise
         finally:
-            msecs = (time.time() * 1000) - start
-            sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
-            sql_query_timer.inc_by(msecs, sql.split()[0])
+            secs = time.time() - start
+            sql_logger.debug("[SQL time] {%s} %f sec", self.name, secs)
+            sql_query_timer.labels(sql.split()[0]).observe(secs)
 
 
 class PerformanceCounters(object):
@@ -127,7 +132,7 @@ class PerformanceCounters(object):
 
     def update(self, key, start_time, end_time=None):
         if end_time is None:
-            end_time = time.time() * 1000
+            end_time = time.time()
         duration = end_time - start_time
         count, cum_time = self.current_counters.get(key, (0, 0))
         count += 1
@@ -137,7 +142,7 @@ class PerformanceCounters(object):
 
     def interval(self, interval_duration, limit=3):
         counters = []
-        for name, (count, cum_time) in self.current_counters.iteritems():
+        for name, (count, cum_time) in iteritems(self.current_counters):
             prev_count, prev_time = self.previous_counters.get(name, (0, 0))
             counters.append((
                 (cum_time - prev_time) / interval_duration,
@@ -217,12 +222,12 @@ class SQLBaseStore(object):
 
     def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
                          logging_context, func, *args, **kwargs):
-        start = time.time() * 1000
+        start = time.time()
         txn_id = self._TXN_ID
 
         # We don't really need these to be unique, so lets stop it from
         # growing really large.
-        self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1)
+        self._TXN_ID = (self._TXN_ID + 1) % (MAX_TXN_ID)
 
         name = "%s-%x" % (desc, txn_id, )
 
@@ -277,17 +282,17 @@ class SQLBaseStore(object):
             logger.debug("[TXN FAIL] {%s} %s", name, e)
             raise
         finally:
-            end = time.time() * 1000
+            end = time.time()
             duration = end - start
 
             if logging_context is not None:
                 logging_context.add_database_transaction(duration)
 
-            transaction_logger.debug("[TXN END] {%s} %f", name, duration)
+            transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
 
             self._current_txn_total_time += duration
             self._txn_perf_counters.update(desc, start, end)
-            sql_txn_timer.inc_by(duration, desc)
+            sql_txn_timer.labels(desc).observe(duration)
 
     @defer.inlineCallbacks
     def runInteraction(self, desc, func, *args, **kwargs):
@@ -344,13 +349,13 @@ class SQLBaseStore(object):
         """
         current_context = LoggingContext.current_context()
 
-        start_time = time.time() * 1000
+        start_time = time.time()
 
         def inner_func(conn, *args, **kwargs):
             with LoggingContext("runWithConnection") as context:
-                sched_duration_ms = time.time() * 1000 - start_time
-                sql_scheduling_timer.inc_by(sched_duration_ms)
-                current_context.add_database_scheduled(sched_duration_ms)
+                sched_duration_sec = time.time() - start_time
+                sql_scheduling_timer.observe(sched_duration_sec)
+                current_context.add_database_scheduled(sched_duration_sec)
 
                 if self.database_engine.is_connection_closed(conn):
                     logger.debug("Reconnecting closed database connection")
@@ -543,7 +548,7 @@ class SQLBaseStore(object):
             ", ".join("%s = ?" % (k,) for k in values),
             " AND ".join("%s = ?" % (k,) for k in keyvalues)
         )
-        sqlargs = values.values() + keyvalues.values()
+        sqlargs = list(values.values()) + list(keyvalues.values())
 
         txn.execute(sql, sqlargs)
         if txn.rowcount > 0:
@@ -561,7 +566,7 @@ class SQLBaseStore(object):
             ", ".join(k for k in allvalues),
             ", ".join("?" for _ in allvalues)
         )
-        txn.execute(sql, allvalues.values())
+        txn.execute(sql, list(allvalues.values()))
         # successfully inserted
         return True
 
@@ -629,8 +634,8 @@ class SQLBaseStore(object):
         }
 
         if keyvalues:
-            sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
-            txn.execute(sql, keyvalues.values())
+            sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues))
+            txn.execute(sql, list(keyvalues.values()))
         else:
             txn.execute(sql)
 
@@ -694,7 +699,7 @@ class SQLBaseStore(object):
                 table,
                 " AND ".join("%s = ?" % (k, ) for k in keyvalues)
             )
-            txn.execute(sql, keyvalues.values())
+            txn.execute(sql, list(keyvalues.values()))
         else:
             sql = "SELECT %s FROM %s" % (
                 ", ".join(retcols),
@@ -725,9 +730,12 @@ class SQLBaseStore(object):
         if not iterable:
             defer.returnValue(results)
 
+        # iterables can not be sliced, so convert it to a list first
+        it_list = list(iterable)
+
         chunks = [
-            iterable[i:i + batch_size]
-            for i in xrange(0, len(iterable), batch_size)
+            it_list[i:i + batch_size]
+            for i in range(0, len(it_list), batch_size)
         ]
         for chunk in chunks:
             rows = yield self.runInteraction(
@@ -767,7 +775,7 @@ class SQLBaseStore(object):
         )
         values.extend(iterable)
 
-        for key, value in keyvalues.iteritems():
+        for key, value in iteritems(keyvalues):
             clauses.append("%s = ?" % (key,))
             values.append(value)
 
@@ -790,7 +798,7 @@ class SQLBaseStore(object):
     @staticmethod
     def _simple_update_txn(txn, table, keyvalues, updatevalues):
         if keyvalues:
-            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
+            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues))
         else:
             where = ""
 
@@ -802,7 +810,7 @@ class SQLBaseStore(object):
 
         txn.execute(
             update_sql,
-            updatevalues.values() + keyvalues.values()
+            list(updatevalues.values()) + list(keyvalues.values())
         )
 
         return txn.rowcount
@@ -850,7 +858,7 @@ class SQLBaseStore(object):
             " AND ".join("%s = ?" % (k,) for k in keyvalues)
         )
 
-        txn.execute(select_sql, keyvalues.values())
+        txn.execute(select_sql, list(keyvalues.values()))
 
         row = txn.fetchone()
         if not row:
@@ -888,7 +896,7 @@ class SQLBaseStore(object):
             " AND ".join("%s = ?" % (k, ) for k in keyvalues)
         )
 
-        txn.execute(sql, keyvalues.values())
+        txn.execute(sql, list(keyvalues.values()))
         if txn.rowcount == 0:
             raise StoreError(404, "No row found")
         if txn.rowcount > 1:
@@ -906,7 +914,7 @@ class SQLBaseStore(object):
             " AND ".join("%s = ?" % (k, ) for k in keyvalues)
         )
 
-        return txn.execute(sql, keyvalues.values())
+        return txn.execute(sql, list(keyvalues.values()))
 
     def _simple_delete_many(self, table, column, iterable, keyvalues, desc):
         return self.runInteraction(
@@ -938,7 +946,7 @@ class SQLBaseStore(object):
         )
         values.extend(iterable)
 
-        for key, value in keyvalues.iteritems():
+        for key, value in iteritems(keyvalues):
             clauses.append("%s = ?" % (key,))
             values.append(value)
 
@@ -978,7 +986,7 @@ class SQLBaseStore(object):
         txn.close()
 
         if cache:
-            min_val = min(cache.itervalues())
+            min_val = min(itervalues(cache))
         else:
             min_val = max_value
 
@@ -1093,7 +1101,7 @@ class SQLBaseStore(object):
                 " AND ".join("%s = ?" % (k,) for k in keyvalues),
                 " ? ASC LIMIT ? OFFSET ?"
             )
-            txn.execute(sql, keyvalues.values() + pagevalues)
+            txn.execute(sql, list(keyvalues.values()) + list(pagevalues))
         else:
             sql = "SELECT %s FROM %s ORDER BY %s" % (
                 ", ".join(retcols),
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index ba46907737..ce338514e8 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -22,6 +22,8 @@ from . import background_updates
 
 from synapse.util.caches import CACHE_SIZE_FACTOR
 
+from six import iteritems
+
 
 logger = logging.getLogger(__name__)
 
@@ -99,7 +101,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
     def _update_client_ips_batch_txn(self, txn, to_update):
         self.database_engine.lock_table(txn, "user_ips")
 
-        for entry in to_update.iteritems():
+        for entry in iteritems(to_update):
             (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
 
             self._simple_upsert_txn(
@@ -231,5 +233,5 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
                 "user_agent": user_agent,
                 "last_seen": last_seen,
             }
-            for (access_token, ip), (user_agent, last_seen) in results.iteritems()
+            for (access_token, ip), (user_agent, last_seen) in iteritems(results)
         ))
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 712106b83a..d149d8392e 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -21,6 +21,7 @@ from synapse.api.errors import StoreError
 from ._base import SQLBaseStore, Cache
 from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
 
+from six import itervalues, iteritems
 
 logger = logging.getLogger(__name__)
 
@@ -360,7 +361,7 @@ class DeviceStore(SQLBaseStore):
             return (now_stream_id, [])
 
         if len(query_map) >= 20:
-            now_stream_id = max(stream_id for stream_id in query_map.itervalues())
+            now_stream_id = max(stream_id for stream_id in itervalues(query_map))
 
         devices = self._get_e2e_device_keys_txn(
             txn, query_map.keys(), include_all_devices=True
@@ -373,13 +374,13 @@ class DeviceStore(SQLBaseStore):
         """
 
         results = []
-        for user_id, user_devices in devices.iteritems():
+        for user_id, user_devices in iteritems(devices):
             # The prev_id for the first row is always the last row before
             # `from_stream_id`
             txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id))
             rows = txn.fetchall()
             prev_id = rows[0][0]
-            for device_id, device in user_devices.iteritems():
+            for device_id, device in iteritems(user_devices):
                 stream_id = query_map[(user_id, device_id)]
                 result = {
                     "user_id": user_id,
@@ -483,7 +484,7 @@ class DeviceStore(SQLBaseStore):
         if devices:
             user_devices = devices[user_id]
             results = []
-            for device_id, device in user_devices.iteritems():
+            for device_id, device in iteritems(user_devices):
                 result = {
                     "device_id": device_id,
                 }
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index ff8538ddf8..b146487943 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -21,6 +21,8 @@ import simplejson as json
 
 from ._base import SQLBaseStore
 
+from six import iteritems
+
 
 class EndToEndKeyStore(SQLBaseStore):
     def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys):
@@ -81,8 +83,8 @@ class EndToEndKeyStore(SQLBaseStore):
             query_list, include_all_devices,
         )
 
-        for user_id, device_keys in results.iteritems():
-            for device_id, device_info in device_keys.iteritems():
+        for user_id, device_keys in iteritems(results):
+            for device_id, device_info in iteritems(device_keys):
                 device_info["keys"] = json.loads(device_info.pop("key_json"))
 
         defer.returnValue(results)
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index f084a5f54b..d0350ee5fe 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -22,6 +22,8 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks
 import logging
 import simplejson as json
 
+from six import iteritems
+
 logger = logging.getLogger(__name__)
 
 
@@ -420,7 +422,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
 
             txn.executemany(sql, (
                 _gen_entry(user_id, actions)
-                for user_id, actions in user_id_actions.iteritems()
+                for user_id, actions in iteritems(user_id_actions)
             ))
 
         return self.runInteraction(
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5ebef98c4f..cb1082e864 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -40,30 +40,30 @@ import synapse.metrics
 from synapse.events import EventBase    # noqa: F401
 from synapse.events.snapshot import EventContext   # noqa: F401
 
-logger = logging.getLogger(__name__)
+from six.moves import range
+from six import itervalues, iteritems
 
+from prometheus_client import Counter
 
-metrics = synapse.metrics.get_metrics_for(__name__)
-persist_event_counter = metrics.register_counter("persisted_events")
-event_counter = metrics.register_counter(
-    "persisted_events_sep", labels=["type", "origin_type", "origin_entity"]
-)
+logger = logging.getLogger(__name__)
+
+persist_event_counter = Counter("synapse_storage_events_persisted_events", "")
+event_counter = Counter("synapse_storage_events_persisted_events_sep", "",
+                        ["type", "origin_type", "origin_entity"])
 
 # The number of times we are recalculating the current state
-state_delta_counter = metrics.register_counter(
-    "state_delta",
-)
+state_delta_counter = Counter("synapse_storage_events_state_delta", "")
+
 # The number of times we are recalculating state when there is only a
 # single forward extremity
-state_delta_single_event_counter = metrics.register_counter(
-    "state_delta_single_event",
-)
+state_delta_single_event_counter = Counter(
+    "synapse_storage_events_state_delta_single_event", "")
+
 # The number of times we are reculating state when we could have resonably
 # calculated the delta when we calculated the state for an event we were
 # persisting.
-state_delta_reuse_delta_counter = metrics.register_counter(
-    "state_delta_reuse_delta",
-)
+state_delta_reuse_delta_counter = Counter(
+    "synapse_storage_events_state_delta_reuse_delta", "")
 
 
 def encode_json(json_object):
@@ -248,7 +248,7 @@ class EventsStore(EventsWorkerStore):
             partitioned.setdefault(event.room_id, []).append((event, ctx))
 
         deferreds = []
-        for room_id, evs_ctxs in partitioned.iteritems():
+        for room_id, evs_ctxs in iteritems(partitioned):
             d = self._event_persist_queue.add_to_queue(
                 room_id, evs_ctxs,
                 backfilled=backfilled,
@@ -333,7 +333,7 @@ class EventsStore(EventsWorkerStore):
 
             chunks = [
                 events_and_contexts[x:x + 100]
-                for x in xrange(0, len(events_and_contexts), 100)
+                for x in range(0, len(events_and_contexts), 100)
             ]
 
             for chunk in chunks:
@@ -367,7 +367,7 @@ class EventsStore(EventsWorkerStore):
                                 (event, context)
                             )
 
-                        for room_id, ev_ctx_rm in events_by_room.iteritems():
+                        for room_id, ev_ctx_rm in iteritems(events_by_room):
                             # Work out new extremities by recursively adding and removing
                             # the new events.
                             latest_event_ids = yield self.get_latest_event_ids_in_room(
@@ -445,7 +445,7 @@ class EventsStore(EventsWorkerStore):
                     state_delta_for_room=state_delta_for_room,
                     new_forward_extremeties=new_forward_extremeties,
                 )
-                persist_event_counter.inc_by(len(chunk))
+                persist_event_counter.inc(len(chunk))
                 synapse.metrics.event_persisted_position.set(
                     chunk[-1][0].internal_metadata.stream_ordering,
                 )
@@ -460,14 +460,14 @@ class EventsStore(EventsWorkerStore):
                         origin_type = "remote"
                         origin_entity = get_domain_from_id(event.sender)
 
-                    event_counter.inc(event.type, origin_type, origin_entity)
+                    event_counter.labels(event.type, origin_type, origin_entity).inc()
 
-                for room_id, new_state in current_state_for_room.iteritems():
+                for room_id, new_state in iteritems(current_state_for_room):
                     self.get_current_state_ids.prefill(
                         (room_id, ), new_state
                     )
 
-                for room_id, latest_event_ids in new_forward_extremeties.iteritems():
+                for room_id, latest_event_ids in iteritems(new_forward_extremeties):
                     self.get_latest_event_ids_in_room.prefill(
                         (room_id,), list(latest_event_ids)
                     )
@@ -644,20 +644,20 @@ class EventsStore(EventsWorkerStore):
         """
         existing_state = yield self.get_current_state_ids(room_id)
 
-        existing_events = set(existing_state.itervalues())
-        new_events = set(ev_id for ev_id in current_state.itervalues())
+        existing_events = set(itervalues(existing_state))
+        new_events = set(ev_id for ev_id in itervalues(current_state))
         changed_events = existing_events ^ new_events
 
         if not changed_events:
             return
 
         to_delete = {
-            key: ev_id for key, ev_id in existing_state.iteritems()
+            key: ev_id for key, ev_id in iteritems(existing_state)
             if ev_id in changed_events
         }
         events_to_insert = (new_events - existing_events)
         to_insert = {
-            key: ev_id for key, ev_id in current_state.iteritems()
+            key: ev_id for key, ev_id in iteritems(current_state)
             if ev_id in events_to_insert
         }
 
@@ -760,11 +760,11 @@ class EventsStore(EventsWorkerStore):
         )
 
     def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
-        for room_id, current_state_tuple in state_delta_by_room.iteritems():
+        for room_id, current_state_tuple in iteritems(state_delta_by_room):
                 to_delete, to_insert = current_state_tuple
                 txn.executemany(
                     "DELETE FROM current_state_events WHERE event_id = ?",
-                    [(ev_id,) for ev_id in to_delete.itervalues()],
+                    [(ev_id,) for ev_id in itervalues(to_delete)],
                 )
 
                 self._simple_insert_many_txn(
@@ -777,7 +777,7 @@ class EventsStore(EventsWorkerStore):
                             "type": key[0],
                             "state_key": key[1],
                         }
-                        for key, ev_id in to_insert.iteritems()
+                        for key, ev_id in iteritems(to_insert)
                     ],
                 )
 
@@ -796,7 +796,7 @@ class EventsStore(EventsWorkerStore):
                             "event_id": ev_id,
                             "prev_event_id": to_delete.get(key, None),
                         }
-                        for key, ev_id in state_deltas.iteritems()
+                        for key, ev_id in iteritems(state_deltas)
                     ]
                 )
 
@@ -839,7 +839,7 @@ class EventsStore(EventsWorkerStore):
 
     def _update_forward_extremities_txn(self, txn, new_forward_extremities,
                                         max_stream_order):
-        for room_id, new_extrem in new_forward_extremities.iteritems():
+        for room_id, new_extrem in iteritems(new_forward_extremities):
             self._simple_delete_txn(
                 txn,
                 table="event_forward_extremities",
@@ -857,7 +857,7 @@ class EventsStore(EventsWorkerStore):
                     "event_id": ev_id,
                     "room_id": room_id,
                 }
-                for room_id, new_extrem in new_forward_extremities.iteritems()
+                for room_id, new_extrem in iteritems(new_forward_extremities)
                 for ev_id in new_extrem
             ],
         )
@@ -874,7 +874,7 @@ class EventsStore(EventsWorkerStore):
                     "event_id": event_id,
                     "stream_ordering": max_stream_order,
                 }
-                for room_id, new_extrem in new_forward_extremities.iteritems()
+                for room_id, new_extrem in iteritems(new_forward_extremities)
                 for event_id in new_extrem
             ]
         )
@@ -902,7 +902,7 @@ class EventsStore(EventsWorkerStore):
                         new_events_and_contexts[event.event_id] = (event, context)
             else:
                 new_events_and_contexts[event.event_id] = (event, context)
-        return new_events_and_contexts.values()
+        return list(new_events_and_contexts.values())
 
     def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
         """Update min_depth for each room
@@ -928,7 +928,7 @@ class EventsStore(EventsWorkerStore):
                     event.depth, depth_updates.get(event.room_id, event.depth)
                 )
 
-        for room_id, depth in depth_updates.iteritems():
+        for room_id, depth in iteritems(depth_updates):
             self._update_min_depth_for_room_txn(txn, room_id, depth)
 
     def _update_outliers_txn(self, txn, events_and_contexts):
@@ -1312,7 +1312,7 @@ class EventsStore(EventsWorkerStore):
                 " WHERE e.event_id IN (%s)"
             ) % (",".join(["?"] * len(ev_map)),)
 
-            txn.execute(sql, ev_map.keys())
+            txn.execute(sql, list(ev_map))
             rows = self.cursor_to_dict(txn)
             for row in rows:
                 event = ev_map[row["event_id"]]
@@ -1575,7 +1575,7 @@ class EventsStore(EventsWorkerStore):
 
             chunks = [
                 event_ids[i:i + 100]
-                for i in xrange(0, len(event_ids), 100)
+                for i in range(0, len(event_ids), 100)
             ]
             for chunk in chunks:
                 ev_rows = self._simple_select_many_txn(
@@ -1989,7 +1989,7 @@ class EventsStore(EventsWorkerStore):
         logger.info("[purge] finding state groups which depend on redundant"
                     " state groups")
         remaining_state_groups = []
-        for i in xrange(0, len(state_rows), 100):
+        for i in range(0, len(state_rows), 100):
             chunk = [sg for sg, in state_rows[i:i + 100]]
             # look for state groups whose prev_state_group is one we are about
             # to delete
@@ -2045,7 +2045,7 @@ class EventsStore(EventsWorkerStore):
                         "state_key": key[1],
                         "event_id": state_id,
                     }
-                    for key, state_id in curr_state.iteritems()
+                    for key, state_id in iteritems(curr_state)
                 ],
             )
 
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index ba834854e1..32d9d00ffb 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -337,7 +337,7 @@ class EventsWorkerStore(SQLBaseStore):
     def _fetch_event_rows(self, txn, events):
         rows = []
         N = 200
-        for i in range(1 + len(events) / N):
+        for i in range(1 + len(events) // N):
             evs = events[i * N:(i + 1) * N]
             if not evs:
                 break
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 78b1e30945..2e2763126d 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -44,7 +44,7 @@ class FilteringStore(SQLBaseStore):
             desc="get_user_filter",
         )
 
-        defer.returnValue(json.loads(str(def_json).decode("utf-8")))
+        defer.returnValue(json.loads(bytes(def_json).decode("utf-8")))
 
     def add_user_filter(self, user_localpart, user_filter):
         def_json = encode_canonical_json(user_filter)
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 87aeaf71d6..0f13b61da8 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -17,6 +17,7 @@ from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 
 from twisted.internet import defer
+import six
 
 import OpenSSL
 from signedjson.key import decode_verify_key_bytes
@@ -26,6 +27,13 @@ import logging
 
 logger = logging.getLogger(__name__)
 
+# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
+# despite being deprecated and removed in favor of memoryview
+if six.PY2:
+    db_binary_type = buffer
+else:
+    db_binary_type = memoryview
+
 
 class KeyStore(SQLBaseStore):
     """Persistence for signature verification keys and tls X.509 certificates
@@ -72,7 +80,7 @@ class KeyStore(SQLBaseStore):
             values={
                 "from_server": from_server,
                 "ts_added_ms": time_now_ms,
-                "tls_certificate": buffer(tls_certificate_bytes),
+                "tls_certificate": db_binary_type(tls_certificate_bytes),
             },
             desc="store_server_certificate",
         )
@@ -92,7 +100,7 @@ class KeyStore(SQLBaseStore):
 
         if verify_key_bytes:
             defer.returnValue(decode_verify_key_bytes(
-                key_id, str(verify_key_bytes)
+                key_id, bytes(verify_key_bytes)
             ))
 
     @defer.inlineCallbacks
@@ -135,7 +143,7 @@ class KeyStore(SQLBaseStore):
                 values={
                     "from_server": from_server,
                     "ts_added_ms": time_now_ms,
-                    "verify_key": buffer(verify_key.encode()),
+                    "verify_key": db_binary_type(verify_key.encode()),
                 },
             )
             txn.call_after(
@@ -172,7 +180,7 @@ class KeyStore(SQLBaseStore):
                 "from_server": from_server,
                 "ts_added_ms": ts_now_ms,
                 "ts_valid_until_ms": ts_expires_ms,
-                "key_json": buffer(key_json_bytes),
+                "key_json": db_binary_type(key_json_bytes),
             },
             desc="store_server_keys_json",
         )
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 9e9d3c2591..f05d91cc58 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -16,6 +16,7 @@
 from ._base import SQLBaseStore
 from synapse.api.constants import PresenceState
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
+from synapse.util import batch_iter
 
 from collections import namedtuple
 from twisted.internet import defer
@@ -115,11 +116,7 @@ class PresenceStore(SQLBaseStore):
             " AND user_id IN (%s)"
         )
 
-        batches = (
-            presence_states[i:i + 50]
-            for i in xrange(0, len(presence_states), 50)
-        )
-        for states in batches:
+        for states in batch_iter(presence_states, 50):
             args = [stream_id]
             args.extend(s.user_id for s in states)
             txn.execute(
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 709c69a926..c93c228f6e 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -332,6 +332,35 @@ class ReceiptsStore(ReceiptsWorkerStore):
 
     def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
                                       user_id, event_id, data, stream_id):
+        res = self._simple_select_one_txn(
+            txn,
+            table="events",
+            retcols=["topological_ordering", "stream_ordering"],
+            keyvalues={"event_id": event_id},
+            allow_none=True
+        )
+
+        stream_ordering = int(res["stream_ordering"]) if res else None
+
+        # We don't want to clobber receipts for more recent events, so we
+        # have to compare orderings of existing receipts
+        if stream_ordering is not None:
+            sql = (
+                "SELECT stream_ordering, event_id FROM events"
+                " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
+                " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
+            )
+            txn.execute(sql, (room_id, receipt_type, user_id))
+
+            for so, eid in txn:
+                if int(so) >= stream_ordering:
+                    logger.debug(
+                        "Ignoring new receipt for %s in favour of existing "
+                        "one for later event %s",
+                        event_id, eid,
+                    )
+                    return False
+
         txn.call_after(
             self.get_receipts_for_room.invalidate, (room_id, receipt_type)
         )
@@ -355,34 +384,6 @@ class ReceiptsStore(ReceiptsWorkerStore):
             (user_id, room_id, receipt_type)
         )
 
-        res = self._simple_select_one_txn(
-            txn,
-            table="events",
-            retcols=["topological_ordering", "stream_ordering"],
-            keyvalues={"event_id": event_id},
-            allow_none=True
-        )
-
-        topological_ordering = int(res["topological_ordering"]) if res else None
-        stream_ordering = int(res["stream_ordering"]) if res else None
-
-        # We don't want to clobber receipts for more recent events, so we
-        # have to compare orderings of existing receipts
-        sql = (
-            "SELECT topological_ordering, stream_ordering, event_id FROM events"
-            " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
-            " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
-        )
-
-        txn.execute(sql, (room_id, receipt_type, user_id))
-
-        if topological_ordering:
-            for to, so, _ in txn:
-                if int(to) > topological_ordering:
-                    return False
-                elif int(to) == topological_ordering and int(so) >= stream_ordering:
-                    return False
-
         self._simple_delete_txn(
             txn,
             table="receipts_linearized",
@@ -406,7 +407,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
             }
         )
 
-        if receipt_type == "m.read" and topological_ordering:
+        if receipt_type == "m.read" and stream_ordering is not None:
             self._remove_old_push_actions_before_txn(
                 txn,
                 room_id=room_id,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index c0bf5e9ed6..c241167fbe 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -36,6 +36,7 @@ class RegistrationWorkerStore(SQLBaseStore):
             retcols=[
                 "name", "password_hash", "is_guest",
                 "consent_version", "consent_server_notice_sent",
+                "appservice_id",
             ],
             allow_none=True,
             desc="get_user_by_id",
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 6a861943a2..7bfc3d91b5 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -30,6 +30,8 @@ from synapse.types import get_domain_from_id
 import logging
 import simplejson as json
 
+from six import itervalues, iteritems
+
 logger = logging.getLogger(__name__)
 
 
@@ -272,7 +274,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         users_in_room = {}
         member_event_ids = [
             e_id
-            for key, e_id in current_state_ids.iteritems()
+            for key, e_id in iteritems(current_state_ids)
             if key[0] == EventTypes.Member
         ]
 
@@ -289,7 +291,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
                     users_in_room = dict(prev_res)
                     member_event_ids = [
                         e_id
-                        for key, e_id in context.delta_ids.iteritems()
+                        for key, e_id in iteritems(context.delta_ids)
                         if key[0] == EventTypes.Member
                     ]
                     for etype, state_key in context.delta_ids:
@@ -741,7 +743,7 @@ class _JoinedHostsCache(object):
             if state_entry.state_group == self.state_group:
                 pass
             elif state_entry.prev_group == self.state_group:
-                for (typ, state_key), event_id in state_entry.delta_ids.iteritems():
+                for (typ, state_key), event_id in iteritems(state_entry.delta_ids):
                     if typ != EventTypes.Member:
                         continue
 
@@ -771,7 +773,7 @@ class _JoinedHostsCache(object):
                 self.state_group = state_entry.state_group
             else:
                 self.state_group = object()
-            self._len = sum(len(v) for v in self.hosts_to_joined_users.itervalues())
+            self._len = sum(len(v) for v in itervalues(self.hosts_to_joined_users))
         defer.returnValue(frozenset(self.hosts_to_joined_users))
 
     def __len__(self):
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 6ba3e59889..f0fa5d7631 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -18,13 +18,14 @@ import logging
 import re
 import simplejson as json
 
+from six import string_types
+
 from twisted.internet import defer
 
 from .background_updates import BackgroundUpdateStore
 from synapse.api.errors import SynapseError
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
-
 logger = logging.getLogger(__name__)
 
 SearchEntry = namedtuple('SearchEntry', [
@@ -126,7 +127,7 @@ class SearchStore(BackgroundUpdateStore):
                     # skip over it.
                     continue
 
-                if not isinstance(value, basestring):
+                if not isinstance(value, string_types):
                     # If the event body, name or topic isn't a string
                     # then skip over it
                     continue
@@ -447,7 +448,7 @@ class SearchStore(BackgroundUpdateStore):
             "search_msgs", self.cursor_to_dict, sql, *args
         )
 
-        results = filter(lambda row: row["room_id"] in room_ids, results)
+        results = list(filter(lambda row: row["room_id"] in room_ids, results))
 
         events = yield self._get_events([r["event_id"] for r in results])
 
@@ -602,7 +603,7 @@ class SearchStore(BackgroundUpdateStore):
             "search_rooms", self.cursor_to_dict, sql, *args
         )
 
-        results = filter(lambda row: row["room_id"] in room_ids, results)
+        results = list(filter(lambda row: row["room_id"] in room_ids, results))
 
         events = yield self._get_events([r["event_id"] for r in results])
 
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 9e6eaaa532..25922e5a9c 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from twisted.internet import defer
+import six
 
 from ._base import SQLBaseStore
 
@@ -21,6 +22,13 @@ from unpaddedbase64 import encode_base64
 from synapse.crypto.event_signing import compute_event_reference_hash
 from synapse.util.caches.descriptors import cached, cachedList
 
+# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
+# despite being deprecated and removed in favor of memoryview
+if six.PY2:
+    db_binary_type = buffer
+else:
+    db_binary_type = memoryview
+
 
 class SignatureWorkerStore(SQLBaseStore):
     @cached()
@@ -56,7 +64,7 @@ class SignatureWorkerStore(SQLBaseStore):
             for e_id, h in hashes.items()
         }
 
-        defer.returnValue(hashes.items())
+        defer.returnValue(list(hashes.items()))
 
     def _get_event_reference_hashes_txn(self, txn, event_id):
         """Get all the hashes for a given PDU.
@@ -91,7 +99,7 @@ class SignatureStore(SignatureWorkerStore):
             vals.append({
                 "event_id": event.event_id,
                 "algorithm": ref_alg,
-                "hash": buffer(ref_hash_bytes),
+                "hash": db_binary_type(ref_hash_bytes),
             })
 
         self._simple_insert_many_txn(
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index ffa4246031..bdee14a8eb 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -16,6 +16,9 @@
 from collections import namedtuple
 import logging
 
+from six import iteritems, itervalues
+from six.moves import range
+
 from twisted.internet import defer
 
 from synapse.storage.background_updates import BackgroundUpdateStore
@@ -134,7 +137,7 @@ class StateGroupWorkerStore(SQLBaseStore):
             event_ids,
         )
 
-        groups = set(event_to_groups.itervalues())
+        groups = set(itervalues(event_to_groups))
         group_to_state = yield self._get_state_for_groups(groups)
 
         defer.returnValue(group_to_state)
@@ -166,18 +169,18 @@ class StateGroupWorkerStore(SQLBaseStore):
 
         state_event_map = yield self.get_events(
             [
-                ev_id for group_ids in group_to_ids.itervalues()
-                for ev_id in group_ids.itervalues()
+                ev_id for group_ids in itervalues(group_to_ids)
+                for ev_id in itervalues(group_ids)
             ],
             get_prev_content=False
         )
 
         defer.returnValue({
             group: [
-                state_event_map[v] for v in event_id_map.itervalues()
+                state_event_map[v] for v in itervalues(event_id_map)
                 if v in state_event_map
             ]
-            for group, event_id_map in group_to_ids.iteritems()
+            for group, event_id_map in iteritems(group_to_ids)
         })
 
     @defer.inlineCallbacks
@@ -186,7 +189,7 @@ class StateGroupWorkerStore(SQLBaseStore):
         """
         results = {}
 
-        chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
+        chunks = [groups[i:i + 100] for i in range(0, len(groups), 100)]
         for chunk in chunks:
             res = yield self.runInteraction(
                 "_get_state_groups_from_groups",
@@ -347,21 +350,21 @@ class StateGroupWorkerStore(SQLBaseStore):
             event_ids,
         )
 
-        groups = set(event_to_groups.itervalues())
+        groups = set(itervalues(event_to_groups))
         group_to_state = yield self._get_state_for_groups(groups, types)
 
         state_event_map = yield self.get_events(
-            [ev_id for sd in group_to_state.itervalues() for ev_id in sd.itervalues()],
+            [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
             get_prev_content=False
         )
 
         event_to_state = {
             event_id: {
                 k: state_event_map[v]
-                for k, v in group_to_state[group].iteritems()
+                for k, v in iteritems(group_to_state[group])
                 if v in state_event_map
             }
-            for event_id, group in event_to_groups.iteritems()
+            for event_id, group in iteritems(event_to_groups)
         }
 
         defer.returnValue({event: event_to_state[event] for event in event_ids})
@@ -384,12 +387,12 @@ class StateGroupWorkerStore(SQLBaseStore):
             event_ids,
         )
 
-        groups = set(event_to_groups.itervalues())
+        groups = set(itervalues(event_to_groups))
         group_to_state = yield self._get_state_for_groups(groups, types)
 
         event_to_state = {
             event_id: group_to_state[group]
-            for event_id, group in event_to_groups.iteritems()
+            for event_id, group in iteritems(event_to_groups)
         }
 
         defer.returnValue({event: event_to_state[event] for event in event_ids})
@@ -503,7 +506,7 @@ class StateGroupWorkerStore(SQLBaseStore):
         got_all = is_all or not missing_types
 
         return {
-            k: v for k, v in state_dict_ids.iteritems()
+            k: v for k, v in iteritems(state_dict_ids)
             if include(k[0], k[1])
         }, missing_types, got_all
 
@@ -562,12 +565,12 @@ class StateGroupWorkerStore(SQLBaseStore):
 
             # Now we want to update the cache with all the things we fetched
             # from the database.
-            for group, group_state_dict in group_to_state_dict.iteritems():
+            for group, group_state_dict in iteritems(group_to_state_dict):
                 state_dict = results[group]
 
                 state_dict.update(
                     ((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
-                    for k, v in group_state_dict.iteritems()
+                    for k, v in iteritems(group_state_dict)
                 )
 
                 self._state_group_cache.update(
@@ -654,7 +657,7 @@ class StateGroupWorkerStore(SQLBaseStore):
                             "state_key": key[1],
                             "event_id": state_id,
                         }
-                        for key, state_id in delta_ids.iteritems()
+                        for key, state_id in iteritems(delta_ids)
                     ],
                 )
             else:
@@ -669,7 +672,7 @@ class StateGroupWorkerStore(SQLBaseStore):
                             "state_key": key[1],
                             "event_id": state_id,
                         }
-                        for key, state_id in current_state_ids.iteritems()
+                        for key, state_id in iteritems(current_state_ids)
                     ],
                 )
 
@@ -794,11 +797,11 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
                     "state_group": state_group_id,
                     "event_id": event_id,
                 }
-                for event_id, state_group_id in state_groups.iteritems()
+                for event_id, state_group_id in iteritems(state_groups)
             ],
         )
 
-        for event_id, state_group_id in state_groups.iteritems():
+        for event_id, state_group_id in iteritems(state_groups):
             txn.call_after(
                 self._get_state_group_for_event.prefill,
                 (event_id,), state_group_id
@@ -826,7 +829,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
 
         def reindex_txn(txn):
             new_last_state_group = last_state_group
-            for count in xrange(batch_size):
+            for count in range(batch_size):
                 txn.execute(
                     "SELECT id, room_id FROM state_groups"
                     " WHERE ? < id AND id <= ?"
@@ -884,7 +887,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
                         # of keys
 
                         delta_state = {
-                            key: value for key, value in curr_state.iteritems()
+                            key: value for key, value in iteritems(curr_state)
                             if prev_state.get(key, None) != value
                         }
 
@@ -924,7 +927,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
                                     "state_key": key[1],
                                     "event_id": state_id,
                                 }
-                                for key, state_id in delta_state.iteritems()
+                                for key, state_id in iteritems(delta_state)
                             ],
                         )
 
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index ea24710ad8..fb463c525a 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -684,8 +684,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 results to only those before
             direction(char): Either 'b' or 'f' to indicate whether we are
                 paginating forwards or backwards from `from_key`.
-            limit (int): The maximum number of events to return. Zero or less
-                means no limit.
+            limit (int): The maximum number of events to return.
             event_filter (Filter|None): If provided filters the events to
                 those that match the filter.
 
@@ -694,6 +693,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             as a list of _EventDictReturn and a token that points to the end
             of the result set.
         """
+
+        assert int(limit) >= 0
+
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
@@ -723,22 +725,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             bounds += " AND " + filter_clause
             args.extend(filter_args)
 
-        if int(limit) > 0:
-            args.append(int(limit))
-            limit_str = " LIMIT ?"
-        else:
-            limit_str = ""
+        args.append(int(limit))
 
         sql = (
             "SELECT event_id, topological_ordering, stream_ordering"
             " FROM events"
             " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
             " ORDER BY topological_ordering %(order)s,"
-            " stream_ordering %(order)s %(limit)s"
+            " stream_ordering %(order)s LIMIT ?"
         ) % {
             "bounds": bounds,
             "order": order,
-            "limit": limit_str
         }
 
         txn.execute(sql, args)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index f825264ea9..e485d19b84 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -17,6 +17,7 @@ from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached
 
 from twisted.internet import defer
+import six
 
 from canonicaljson import encode_canonical_json
 
@@ -25,6 +26,13 @@ from collections import namedtuple
 import logging
 import simplejson as json
 
+# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
+# despite being deprecated and removed in favor of memoryview
+if six.PY2:
+    db_binary_type = buffer
+else:
+    db_binary_type = memoryview
+
 logger = logging.getLogger(__name__)
 
 
@@ -110,7 +118,7 @@ class TransactionStore(SQLBaseStore):
                 "transaction_id": transaction_id,
                 "origin": origin,
                 "response_code": code,
-                "response_json": buffer(encode_canonical_json(response_dict)),
+                "response_json": db_binary_type(encode_canonical_json(response_dict)),
                 "ts": self._clock.time_msec(),
             },
             or_ignore=True,
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index d6e289ffbe..275c299998 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -22,6 +22,8 @@ from synapse.api.constants import EventTypes, JoinRules
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import get_domain_from_id, get_localpart_from_id
 
+from six import iteritems
+
 import re
 import logging
 
@@ -100,7 +102,7 @@ class UserDirectoryStore(SQLBaseStore):
                     user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id),
                     profile.display_name,
                 )
-                for user_id, profile in users_with_profile.iteritems()
+                for user_id, profile in iteritems(users_with_profile)
             )
         elif isinstance(self.database_engine, Sqlite3Engine):
             sql = """
@@ -112,7 +114,7 @@ class UserDirectoryStore(SQLBaseStore):
                     user_id,
                     "%s %s" % (user_id, p.display_name,) if p.display_name else user_id
                 )
-                for user_id, p in users_with_profile.iteritems()
+                for user_id, p in iteritems(users_with_profile)
             )
         else:
             # This should be unreachable.
@@ -130,7 +132,7 @@ class UserDirectoryStore(SQLBaseStore):
                         "display_name": profile.display_name,
                         "avatar_url": profile.avatar_url,
                     }
-                    for user_id, profile in users_with_profile.iteritems()
+                    for user_id, profile in iteritems(users_with_profile)
                 ]
             )
             for user_id in users_with_profile: