summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2017-03-24 11:40:08 +0000
committerGitHub <noreply@github.com>2017-03-24 11:40:08 +0000
commit57cfa513f5ff939d88ebb3a213d977517395a631 (patch)
tree6e658aa7007fd8f890a631c5ea8c654b721d375d
parentMerge pull request #2055 from matrix-org/dbkr/fix_add_msisdn_requestToken (diff)
parentReplace some calls to cursor_to_dict (diff)
downloadsynapse-57cfa513f5ff939d88ebb3a213d977517395a631.tar.xz
Merge pull request #2054 from matrix-org/erikj/user_iter_cursor
Reduce some CPU work on DB threads
Diffstat (limited to '')
-rw-r--r--synapse/replication/slave/storage/events.py1
-rw-r--r--synapse/storage/_base.py25
-rw-r--r--synapse/storage/account_data.py4
-rw-r--r--synapse/storage/deviceinbox.py10
-rw-r--r--synapse/storage/devices.py7
-rw-r--r--synapse/storage/end_to_end_keys.py4
-rw-r--r--synapse/storage/event_federation.py13
-rw-r--r--synapse/storage/event_push_actions.py2
-rw-r--r--synapse/storage/events.py34
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/receipts.py5
-rw-r--r--synapse/storage/registration.py2
-rw-r--r--synapse/storage/room.py4
-rw-r--r--synapse/storage/roommember.py43
-rw-r--r--synapse/storage/signatures.py2
-rw-r--r--synapse/storage/state.py66
-rw-r--r--synapse/storage/tags.py4
-rw-r--r--tests/storage/test_base.py4
18 files changed, 101 insertions, 131 deletions
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index a1e1e54e5b..d4db1e452e 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -167,7 +167,6 @@ class SlavedEventStore(BaseSlavedStore):
     _get_rooms_for_user_where_membership_is_txn = (
         DataStore._get_rooms_for_user_where_membership_is_txn.__func__
     )
-    _get_members_rows_txn = DataStore._get_members_rows_txn.__func__
     _get_state_for_groups = DataStore._get_state_for_groups.__func__
     _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__
     _get_events_around_txn = DataStore._get_events_around_txn.__func__
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 13b106bba1..c659004e8d 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -73,6 +73,9 @@ class LoggingTransaction(object):
     def __setattr__(self, name, value):
         setattr(self.txn, name, value)
 
+    def __iter__(self):
+        return self.txn.__iter__()
+
     def execute(self, sql, *args):
         self._do_execute(self.txn.execute, sql, *args)
 
@@ -132,7 +135,7 @@ class PerformanceCounters(object):
 
     def interval(self, interval_duration, limit=3):
         counters = []
-        for name, (count, cum_time) in self.current_counters.items():
+        for name, (count, cum_time) in self.current_counters.iteritems():
             prev_count, prev_time = self.previous_counters.get(name, (0, 0))
             counters.append((
                 (cum_time - prev_time) / interval_duration,
@@ -357,7 +360,7 @@ class SQLBaseStore(object):
         """
         col_headers = list(intern(column[0]) for column in cursor.description)
         results = list(
-            dict(zip(col_headers, row)) for row in cursor.fetchall()
+            dict(zip(col_headers, row)) for row in cursor
         )
         return results
 
@@ -565,7 +568,7 @@ class SQLBaseStore(object):
     @staticmethod
     def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
         if keyvalues:
-            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
         else:
             where = ""
 
@@ -579,7 +582,7 @@ class SQLBaseStore(object):
 
         txn.execute(sql, keyvalues.values())
 
-        return [r[0] for r in txn.fetchall()]
+        return [r[0] for r in txn]
 
     def _simple_select_onecol(self, table, keyvalues, retcol,
                               desc="_simple_select_onecol"):
@@ -712,7 +715,7 @@ class SQLBaseStore(object):
         )
         values.extend(iterable)
 
-        for key, value in keyvalues.items():
+        for key, value in keyvalues.iteritems():
             clauses.append("%s = ?" % (key,))
             values.append(value)
 
@@ -753,7 +756,7 @@ class SQLBaseStore(object):
     @staticmethod
     def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
         if keyvalues:
-            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
         else:
             where = ""
 
@@ -870,7 +873,7 @@ class SQLBaseStore(object):
         )
         values.extend(iterable)
 
-        for key, value in keyvalues.items():
+        for key, value in keyvalues.iteritems():
             clauses.append("%s = ?" % (key,))
             values.append(value)
 
@@ -901,16 +904,16 @@ class SQLBaseStore(object):
 
         txn = db_conn.cursor()
         txn.execute(sql, (int(max_value),))
-        rows = txn.fetchall()
-        txn.close()
 
         cache = {
             row[0]: int(row[1])
-            for row in rows
+            for row in txn
         }
 
+        txn.close()
+
         if cache:
-            min_val = min(cache.values())
+            min_val = min(cache.itervalues())
         else:
             min_val = max_value
 
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 3fa226e92d..aa84ffc2b0 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -182,7 +182,7 @@ class AccountDataStore(SQLBaseStore):
             txn.execute(sql, (user_id, stream_id))
 
             global_account_data = {
-                row[0]: json.loads(row[1]) for row in txn.fetchall()
+                row[0]: json.loads(row[1]) for row in txn
             }
 
             sql = (
@@ -193,7 +193,7 @@ class AccountDataStore(SQLBaseStore):
             txn.execute(sql, (user_id, stream_id))
 
             account_data_by_room = {}
-            for row in txn.fetchall():
+            for row in txn:
                 room_account_data = account_data_by_room.setdefault(row[0], {})
                 room_account_data[row[1]] = json.loads(row[2])
 
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 7925cb5f1b..2714519d21 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -178,7 +178,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                 )
                 txn.execute(sql, (user_id,))
                 message_json = ujson.dumps(messages_by_device["*"])
-                for row in txn.fetchall():
+                for row in txn:
                     # Add the message for all devices for this user on this
                     # server.
                     device = row[0]
@@ -195,7 +195,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                 # TODO: Maybe this needs to be done in batches if there are
                 # too many local devices for a given user.
                 txn.execute(sql, [user_id] + devices)
-                for row in txn.fetchall():
+                for row in txn:
                     # Only insert into the local inbox if the device exists on
                     # this server
                     device = row[0]
@@ -251,7 +251,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                 user_id, device_id, last_stream_id, current_stream_id, limit
             ))
             messages = []
-            for row in txn.fetchall():
+            for row in txn:
                 stream_pos = row[0]
                 messages.append(ujson.loads(row[1]))
             if len(messages) < limit:
@@ -340,7 +340,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                 " ORDER BY stream_id ASC"
             )
             txn.execute(sql, (last_pos, upper_pos))
-            rows.extend(txn.fetchall())
+            rows.extend(txn)
 
             return rows
 
@@ -384,7 +384,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                 destination, last_stream_id, current_stream_id, limit
             ))
             messages = []
-            for row in txn.fetchall():
+            for row in txn:
                 stream_pos = row[0]
                 messages.append(ujson.loads(row[1]))
             if len(messages) < limit:
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index e545b62e39..6beeff8b00 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -333,13 +333,12 @@ class DeviceStore(SQLBaseStore):
         txn.execute(
             sql, (destination, from_stream_id, now_stream_id, False)
         )
-        rows = txn.fetchall()
 
-        if not rows:
+        # maps (user_id, device_id) -> stream_id
+        query_map = {(r[0], r[1]): r[2] for r in txn}
+        if not query_map:
             return (now_stream_id, [])
 
-        # maps (user_id, device_id) -> stream_id
-        query_map = {(r[0], r[1]): r[2] for r in rows}
         devices = self._get_e2e_device_keys_txn(
             txn, query_map.keys(), include_all_devices=True
         )
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index b9f1365f92..58bde65b6c 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -153,7 +153,7 @@ class EndToEndKeyStore(SQLBaseStore):
             )
             txn.execute(sql, (user_id, device_id))
             result = {}
-            for algorithm, key_count in txn.fetchall():
+            for algorithm, key_count in txn:
                 result[algorithm] = key_count
             return result
         return self.runInteraction(
@@ -174,7 +174,7 @@ class EndToEndKeyStore(SQLBaseStore):
                 user_result = result.setdefault(user_id, {})
                 device_result = user_result.setdefault(device_id, {})
                 txn.execute(sql, (user_id, device_id, algorithm))
-                for key_id, key_json in txn.fetchall():
+                for key_id, key_json in txn:
                     device_result[algorithm + ":" + key_id] = key_json
                     delete.append((user_id, device_id, algorithm, key_id))
             sql = (
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 0d97de2fe7..43b5b49986 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -74,7 +74,7 @@ class EventFederationStore(SQLBaseStore):
                     base_sql % (",".join(["?"] * len(chunk)),),
                     chunk
                 )
-                new_front.update([r[0] for r in txn.fetchall()])
+                new_front.update([r[0] for r in txn])
 
             new_front -= results
 
@@ -110,7 +110,7 @@ class EventFederationStore(SQLBaseStore):
 
         txn.execute(sql, (room_id, False,))
 
-        return dict(txn.fetchall())
+        return dict(txn)
 
     def _get_oldest_events_in_room_txn(self, txn, room_id):
         return self._simple_select_onecol_txn(
@@ -152,7 +152,7 @@ class EventFederationStore(SQLBaseStore):
         txn.execute(sql, (room_id, ))
 
         results = []
-        for event_id, depth in txn.fetchall():
+        for event_id, depth in txn:
             hashes = self._get_event_reference_hashes_txn(txn, event_id)
             prev_hashes = {
                 k: encode_base64(v) for k, v in hashes.items()
@@ -334,8 +334,7 @@ class EventFederationStore(SQLBaseStore):
 
         def get_forward_extremeties_for_room_txn(txn):
             txn.execute(sql, (stream_ordering, room_id))
-            rows = txn.fetchall()
-            return [event_id for event_id, in rows]
+            return [event_id for event_id, in txn]
 
         return self.runInteraction(
             "get_forward_extremeties_for_room",
@@ -436,7 +435,7 @@ class EventFederationStore(SQLBaseStore):
                 (room_id, event_id, False, limit - len(event_results))
             )
 
-            for row in txn.fetchall():
+            for row in txn:
                 if row[1] not in event_results:
                     queue.put((-row[0], row[1]))
 
@@ -482,7 +481,7 @@ class EventFederationStore(SQLBaseStore):
                     (room_id, event_id, False, limit - len(event_results))
                 )
 
-                for e_id, in txn.fetchall():
+                for e_id, in txn:
                     new_front.add(e_id)
 
             new_front -= earliest_events
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 14543b4269..d6d8723b4a 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -206,7 +206,7 @@ class EventPushActionsStore(SQLBaseStore):
                 " stream_ordering >= ? AND stream_ordering <= ?"
             )
             txn.execute(sql, (min_stream_ordering, max_stream_ordering))
-            return [r[0] for r in txn.fetchall()]
+            return [r[0] for r in txn]
         ret = yield self.runInteraction("get_push_action_users_in_range", f)
         defer.returnValue(ret)
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 3c8393bfe8..5f4fcd9832 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -217,14 +217,14 @@ class EventsStore(SQLBaseStore):
             partitioned.setdefault(event.room_id, []).append((event, ctx))
 
         deferreds = []
-        for room_id, evs_ctxs in partitioned.items():
+        for room_id, evs_ctxs in partitioned.iteritems():
             d = preserve_fn(self._event_persist_queue.add_to_queue)(
                 room_id, evs_ctxs,
                 backfilled=backfilled,
             )
             deferreds.append(d)
 
-        for room_id in partitioned.keys():
+        for room_id in partitioned:
             self._maybe_start_persisting(room_id)
 
         return preserve_context_over_deferred(
@@ -323,7 +323,7 @@ class EventsStore(SQLBaseStore):
                                 (event, context)
                             )
 
-                        for room_id, ev_ctx_rm in events_by_room.items():
+                        for room_id, ev_ctx_rm in events_by_room.iteritems():
                             # Work out new extremities by recursively adding and removing
                             # the new events.
                             latest_event_ids = yield self.get_latest_event_ids_in_room(
@@ -453,10 +453,10 @@ class EventsStore(SQLBaseStore):
                 missing_event_ids,
             )
 
-            groups = set(event_to_groups.values())
+            groups = set(event_to_groups.itervalues())
             group_to_state = yield self._get_state_for_groups(groups)
 
-            state_sets.extend(group_to_state.values())
+            state_sets.extend(group_to_state.itervalues())
 
         if not new_latest_event_ids:
             current_state = {}
@@ -718,7 +718,7 @@ class EventsStore(SQLBaseStore):
 
     def _update_forward_extremities_txn(self, txn, new_forward_extremities,
                                         max_stream_order):
-        for room_id, new_extrem in new_forward_extremities.items():
+        for room_id, new_extrem in new_forward_extremities.iteritems():
             self._simple_delete_txn(
                 txn,
                 table="event_forward_extremities",
@@ -736,7 +736,7 @@ class EventsStore(SQLBaseStore):
                     "event_id": ev_id,
                     "room_id": room_id,
                 }
-                for room_id, new_extrem in new_forward_extremities.items()
+                for room_id, new_extrem in new_forward_extremities.iteritems()
                 for ev_id in new_extrem
             ],
         )
@@ -753,7 +753,7 @@ class EventsStore(SQLBaseStore):
                     "event_id": event_id,
                     "stream_ordering": max_stream_order,
                 }
-                for room_id, new_extrem in new_forward_extremities.items()
+                for room_id, new_extrem in new_forward_extremities.iteritems()
                 for event_id in new_extrem
             ]
         )
@@ -807,7 +807,7 @@ class EventsStore(SQLBaseStore):
                     event.depth, depth_updates.get(event.room_id, event.depth)
                 )
 
-        for room_id, depth in depth_updates.items():
+        for room_id, depth in depth_updates.iteritems():
             self._update_min_depth_for_room_txn(txn, room_id, depth)
 
     def _update_outliers_txn(self, txn, events_and_contexts):
@@ -834,7 +834,7 @@ class EventsStore(SQLBaseStore):
 
         have_persisted = {
             event_id: outlier
-            for event_id, outlier in txn.fetchall()
+            for event_id, outlier in txn
         }
 
         to_remove = set()
@@ -958,14 +958,10 @@ class EventsStore(SQLBaseStore):
             return
 
         def event_dict(event):
-            return {
-                k: v
-                for k, v in event.get_dict().items()
-                if k not in [
-                    "redacted",
-                    "redacted_because",
-                ]
-            }
+            d = event.get_dict()
+            d.pop("redacted", None)
+            d.pop("redacted_because", None)
+            return d
 
         self._simple_insert_many_txn(
             txn,
@@ -1998,7 +1994,7 @@ class EventsStore(SQLBaseStore):
                         "state_key": key[1],
                         "event_id": state_id,
                     }
-                    for key, state_id in curr_state.items()
+                    for key, state_id in curr_state.iteritems()
                 ],
             )
 
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index ed84db6b4b..6e623843d5 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -356,7 +356,7 @@ def _get_or_create_schema_state(txn, database_engine):
             ),
             (current_version,)
         )
-        applied_deltas = [d for d, in txn.fetchall()]
+        applied_deltas = [d for d, in txn]
         return current_version, applied_deltas, upgraded
 
     return None
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 5cf41501ea..6b0f8c2787 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -313,10 +313,9 @@ class ReceiptsStore(SQLBaseStore):
         )
 
         txn.execute(sql, (room_id, receipt_type, user_id))
-        results = txn.fetchall()
 
-        if results and topological_ordering:
-            for to, so, _ in results:
+        if topological_ordering:
+            for to, so, _ in txn:
                 if int(to) > topological_ordering:
                     return False
                 elif int(to) == topological_ordering and int(so) >= stream_ordering:
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 26be6060c3..ec2c52ab93 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -209,7 +209,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
                 " WHERE lower(name) = lower(?)"
             )
             txn.execute(sql, (user_id,))
-            return dict(txn.fetchall())
+            return dict(txn)
 
         return self.runInteraction("get_users_by_id_case_insensitive", f)
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 8a2fe2fdf5..e4c56cc175 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -396,7 +396,7 @@ class RoomStore(SQLBaseStore):
                     sql % ("AND appservice_id IS NULL",),
                     (stream_id,)
                 )
-            return dict(txn.fetchall())
+            return dict(txn)
         else:
             # We want to get from all lists, so we need to aggregate the results
 
@@ -422,7 +422,7 @@ class RoomStore(SQLBaseStore):
 
             results = {}
             # A room is visible if its visible on any list.
-            for room_id, visibility in txn.fetchall():
+            for room_id, visibility in txn:
                 results[room_id] = bool(visibility) or results.get(room_id, False)
 
             return results
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index e38d8927bf..23127d3a95 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -132,14 +132,17 @@ class RoomMemberStore(SQLBaseStore):
     @cached(max_entries=500000, iterable=True)
     def get_users_in_room(self, room_id):
         def f(txn):
-
-            rows = self._get_members_rows_txn(
-                txn,
-                room_id=room_id,
-                membership=Membership.JOIN,
+            sql = (
+                "SELECT m.user_id FROM room_memberships as m"
+                " INNER JOIN current_state_events as c"
+                " ON m.event_id = c.event_id "
+                " AND m.room_id = c.room_id "
+                " AND m.user_id = c.state_key"
+                " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
             )
 
-            return [r["user_id"] for r in rows]
+            txn.execute(sql, (room_id, Membership.JOIN,))
+            return [r[0] for r in txn]
         return self.runInteraction("get_users_in_room", f)
 
     @cached()
@@ -246,34 +249,6 @@ class RoomMemberStore(SQLBaseStore):
 
         return results
 
-    def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None):
-        where_clause = "c.room_id = ?"
-        where_values = [room_id]
-
-        if membership:
-            where_clause += " AND m.membership = ?"
-            where_values.append(membership)
-
-        if user_id:
-            where_clause += " AND m.user_id = ?"
-            where_values.append(user_id)
-
-        sql = (
-            "SELECT m.* FROM room_memberships as m"
-            " INNER JOIN current_state_events as c"
-            " ON m.event_id = c.event_id "
-            " AND m.room_id = c.room_id "
-            " AND m.user_id = c.state_key"
-            " WHERE c.type = 'm.room.member' AND %(where)s"
-        ) % {
-            "where": where_clause,
-        }
-
-        txn.execute(sql, where_values)
-        rows = self.cursor_to_dict(txn)
-
-        return rows
-
     @cachedInlineCallbacks(max_entries=500000, iterable=True)
     def get_rooms_for_user(self, user_id):
         """Returns a set of room_ids the user is currently joined to
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index e1dca927d7..67d5d9969a 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -72,7 +72,7 @@ class SignatureStore(SQLBaseStore):
             " WHERE event_id = ?"
         )
         txn.execute(query, (event_id, ))
-        return {k: v for k, v in txn.fetchall()}
+        return {k: v for k, v in txn}
 
     def _store_event_reference_hashes_txn(self, txn, events):
         """Store a hash for a PDU
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 1b42bea07a..314216f039 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -90,7 +90,7 @@ class StateStore(SQLBaseStore):
             event_ids,
         )
 
-        groups = set(event_to_groups.values())
+        groups = set(event_to_groups.itervalues())
         group_to_state = yield self._get_state_for_groups(groups)
 
         defer.returnValue(group_to_state)
@@ -108,17 +108,18 @@ class StateStore(SQLBaseStore):
 
         state_event_map = yield self.get_events(
             [
-                ev_id for group_ids in group_to_ids.values()
-                for ev_id in group_ids.values()
+                ev_id for group_ids in group_to_ids.itervalues()
+                for ev_id in group_ids.itervalues()
             ],
             get_prev_content=False
         )
 
         defer.returnValue({
             group: [
-                state_event_map[v] for v in event_id_map.values() if v in state_event_map
+                state_event_map[v] for v in event_id_map.itervalues()
+                if v in state_event_map
             ]
-            for group, event_id_map in group_to_ids.items()
+            for group, event_id_map in group_to_ids.iteritems()
         })
 
     def _have_persisted_state_group_txn(self, txn, state_group):
@@ -190,7 +191,7 @@ class StateStore(SQLBaseStore):
                             "state_key": key[1],
                             "event_id": state_id,
                         }
-                        for key, state_id in context.delta_ids.items()
+                        for key, state_id in context.delta_ids.iteritems()
                     ],
                 )
             else:
@@ -205,7 +206,7 @@ class StateStore(SQLBaseStore):
                             "state_key": key[1],
                             "event_id": state_id,
                         }
-                        for key, state_id in context.current_state_ids.items()
+                        for key, state_id in context.current_state_ids.iteritems()
                     ],
                 )
 
@@ -217,7 +218,7 @@ class StateStore(SQLBaseStore):
                     "state_group": state_group_id,
                     "event_id": event_id,
                 }
-                for event_id, state_group_id in state_groups.items()
+                for event_id, state_group_id in state_groups.iteritems()
             ],
         )
 
@@ -341,10 +342,10 @@ class StateStore(SQLBaseStore):
                     args.extend(where_args)
 
                     txn.execute(sql % (where_clause,), args)
-                    rows = self.cursor_to_dict(txn)
-                    for row in rows:
-                        key = (row["type"], row["state_key"])
-                        results[group][key] = row["event_id"]
+                    for row in txn:
+                        typ, state_key, event_id = row
+                        key = (typ, state_key)
+                        results[group][key] = event_id
         else:
             if types is not None:
                 where_clause = "AND (%s)" % (
@@ -373,12 +374,11 @@ class StateStore(SQLBaseStore):
                         " WHERE state_group = ? %s" % (where_clause,),
                         args
                     )
-                    rows = txn.fetchall()
-                    results[group].update({
-                        (typ, state_key): event_id
-                        for typ, state_key, event_id in rows
+                    results[group].update(
+                        ((typ, state_key), event_id)
+                        for typ, state_key, event_id in txn
                         if (typ, state_key) not in results[group]
-                    })
+                    )
 
                     # If the lengths match then we must have all the types,
                     # so no need to go walk further down the tree.
@@ -415,21 +415,21 @@ class StateStore(SQLBaseStore):
             event_ids,
         )
 
-        groups = set(event_to_groups.values())
+        groups = set(event_to_groups.itervalues())
         group_to_state = yield self._get_state_for_groups(groups, types)
 
         state_event_map = yield self.get_events(
-            [ev_id for sd in group_to_state.values() for ev_id in sd.values()],
+            [ev_id for sd in group_to_state.itervalues() for ev_id in sd.itervalues()],
             get_prev_content=False
         )
 
         event_to_state = {
             event_id: {
                 k: state_event_map[v]
-                for k, v in group_to_state[group].items()
+                for k, v in group_to_state[group].iteritems()
                 if v in state_event_map
             }
-            for event_id, group in event_to_groups.items()
+            for event_id, group in event_to_groups.iteritems()
         }
 
         defer.returnValue({event: event_to_state[event] for event in event_ids})
@@ -452,12 +452,12 @@ class StateStore(SQLBaseStore):
             event_ids,
         )
 
-        groups = set(event_to_groups.values())
+        groups = set(event_to_groups.itervalues())
         group_to_state = yield self._get_state_for_groups(groups, types)
 
         event_to_state = {
             event_id: group_to_state[group]
-            for event_id, group in event_to_groups.items()
+            for event_id, group in event_to_groups.iteritems()
         }
 
         defer.returnValue({event: event_to_state[event] for event in event_ids})
@@ -569,7 +569,7 @@ class StateStore(SQLBaseStore):
         got_all = not (missing_types or types is None)
 
         return {
-            k: v for k, v in state_dict_ids.items()
+            k: v for k, v in state_dict_ids.iteritems()
             if include(k[0], k[1])
         }, missing_types, got_all
 
@@ -628,7 +628,7 @@ class StateStore(SQLBaseStore):
 
             # Now we want to update the cache with all the things we fetched
             # from the database.
-            for group, group_state_dict in group_to_state_dict.items():
+            for group, group_state_dict in group_to_state_dict.iteritems():
                 if types:
                     # We delibrately put key -> None mappings into the cache to
                     # cache absence of the key, on the assumption that if we've
@@ -643,10 +643,10 @@ class StateStore(SQLBaseStore):
                 else:
                     state_dict = results[group]
 
-                state_dict.update({
-                    (intern_string(k[0]), intern_string(k[1])): v
-                    for k, v in group_state_dict.items()
-                })
+                state_dict.update(
+                    ((intern_string(k[0]), intern_string(k[1])), v)
+                    for k, v in group_state_dict.iteritems()
+                )
 
                 self._state_group_cache.update(
                     cache_seq_num,
@@ -657,10 +657,10 @@ class StateStore(SQLBaseStore):
 
         # Remove all the entries with None values. The None values were just
         # used for bookkeeping in the cache.
-        for group, state_dict in results.items():
+        for group, state_dict in results.iteritems():
             results[group] = {
                 key: event_id
-                for key, event_id in state_dict.items()
+                for key, event_id in state_dict.iteritems()
                 if event_id
             }
 
@@ -749,7 +749,7 @@ class StateStore(SQLBaseStore):
                         # of keys
 
                         delta_state = {
-                            key: value for key, value in curr_state.items()
+                            key: value for key, value in curr_state.iteritems()
                             if prev_state.get(key, None) != value
                         }
 
@@ -789,7 +789,7 @@ class StateStore(SQLBaseStore):
                                     "state_key": key[1],
                                     "event_id": state_id,
                                 }
-                                for key, state_id in delta_state.items()
+                                for key, state_id in delta_state.iteritems()
                             ],
                         )
 
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 5a2c1aa59b..bff73f3f04 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -95,7 +95,7 @@ class TagsStore(SQLBaseStore):
             for stream_id, user_id, room_id in tag_ids:
                 txn.execute(sql, (user_id, room_id))
                 tags = []
-                for tag, content in txn.fetchall():
+                for tag, content in txn:
                     tags.append(json.dumps(tag) + ":" + content)
                 tag_json = "{" + ",".join(tags) + "}"
                 results.append((stream_id, user_id, room_id, tag_json))
@@ -132,7 +132,7 @@ class TagsStore(SQLBaseStore):
                 " WHERE user_id = ? AND stream_id > ?"
             )
             txn.execute(sql, (user_id, stream_id))
-            room_ids = [row[0] for row in txn.fetchall()]
+            room_ids = [row[0] for row in txn]
             return room_ids
 
         changed = self._account_data_stream_cache.has_entity_changed(
diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py
index afbefb2e2d..91e971190c 100644
--- a/tests/storage/test_base.py
+++ b/tests/storage/test_base.py
@@ -89,7 +89,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
     @defer.inlineCallbacks
     def test_select_one_1col(self):
         self.mock_txn.rowcount = 1
-        self.mock_txn.fetchall.return_value = [("Value",)]
+        self.mock_txn.__iter__ = Mock(return_value=iter([("Value",)]))
 
         value = yield self.datastore._simple_select_one_onecol(
             table="tablename",
@@ -136,7 +136,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
     @defer.inlineCallbacks
     def test_select_list(self):
         self.mock_txn.rowcount = 3
-        self.mock_txn.fetchall.return_value = ((1,), (2,), (3,))
+        self.mock_txn.__iter__ = Mock(return_value=iter([(1,), (2,), (3,)]))
         self.mock_txn.description = (
             ("colA", None, None, None, None, None, None),
         )