summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-03-29 10:57:19 +0100
committerErik Johnston <erik@matrix.org>2017-03-29 10:57:19 +0100
commit4ad613f6be7211edf4b39d7b031bc7d2947d8297 (patch)
treed3e1835e7d3f5694098d06b093edcf2cd7d0e60f /synapse/storage
parentCorrectly look up key (diff)
parentMerge pull request #2037 from ricco386/fix_readme_centos_issues (diff)
downloadsynapse-4ad613f6be7211edf4b39d7b031bc7d2947d8297.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/e2e_one_time_upsert
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py25
-rw-r--r--synapse/storage/account_data.py4
-rw-r--r--synapse/storage/deviceinbox.py10
-rw-r--r--synapse/storage/devices.py11
-rw-r--r--synapse/storage/end_to_end_keys.py4
-rw-r--r--synapse/storage/event_federation.py13
-rw-r--r--synapse/storage/event_push_actions.py2
-rw-r--r--synapse/storage/events.py117
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/presence.py4
-rw-r--r--synapse/storage/receipts.py5
-rw-r--r--synapse/storage/registration.py2
-rw-r--r--synapse/storage/room.py4
-rw-r--r--synapse/storage/roommember.py53
-rw-r--r--synapse/storage/signatures.py2
-rw-r--r--synapse/storage/state.py66
-rw-r--r--synapse/storage/tags.py4
17 files changed, 164 insertions, 164 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 13b106bba1..c659004e8d 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -73,6 +73,9 @@ class LoggingTransaction(object):
     def __setattr__(self, name, value):
         setattr(self.txn, name, value)
 
+    def __iter__(self):
+        return self.txn.__iter__()
+
     def execute(self, sql, *args):
         self._do_execute(self.txn.execute, sql, *args)
 
@@ -132,7 +135,7 @@ class PerformanceCounters(object):
 
     def interval(self, interval_duration, limit=3):
         counters = []
-        for name, (count, cum_time) in self.current_counters.items():
+        for name, (count, cum_time) in self.current_counters.iteritems():
             prev_count, prev_time = self.previous_counters.get(name, (0, 0))
             counters.append((
                 (cum_time - prev_time) / interval_duration,
@@ -357,7 +360,7 @@ class SQLBaseStore(object):
         """
         col_headers = list(intern(column[0]) for column in cursor.description)
         results = list(
-            dict(zip(col_headers, row)) for row in cursor.fetchall()
+            dict(zip(col_headers, row)) for row in cursor
         )
         return results
 
@@ -565,7 +568,7 @@ class SQLBaseStore(object):
     @staticmethod
     def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
         if keyvalues:
-            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
         else:
             where = ""
 
@@ -579,7 +582,7 @@ class SQLBaseStore(object):
 
         txn.execute(sql, keyvalues.values())
 
-        return [r[0] for r in txn.fetchall()]
+        return [r[0] for r in txn]
 
     def _simple_select_onecol(self, table, keyvalues, retcol,
                               desc="_simple_select_onecol"):
@@ -712,7 +715,7 @@ class SQLBaseStore(object):
         )
         values.extend(iterable)
 
-        for key, value in keyvalues.items():
+        for key, value in keyvalues.iteritems():
             clauses.append("%s = ?" % (key,))
             values.append(value)
 
@@ -753,7 +756,7 @@ class SQLBaseStore(object):
     @staticmethod
     def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
         if keyvalues:
-            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
         else:
             where = ""
 
@@ -870,7 +873,7 @@ class SQLBaseStore(object):
         )
         values.extend(iterable)
 
-        for key, value in keyvalues.items():
+        for key, value in keyvalues.iteritems():
             clauses.append("%s = ?" % (key,))
             values.append(value)
 
@@ -901,16 +904,16 @@ class SQLBaseStore(object):
 
         txn = db_conn.cursor()
         txn.execute(sql, (int(max_value),))
-        rows = txn.fetchall()
-        txn.close()
 
         cache = {
             row[0]: int(row[1])
-            for row in rows
+            for row in txn
         }
 
+        txn.close()
+
         if cache:
-            min_val = min(cache.values())
+            min_val = min(cache.itervalues())
         else:
             min_val = max_value
 
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 3fa226e92d..aa84ffc2b0 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -182,7 +182,7 @@ class AccountDataStore(SQLBaseStore):
             txn.execute(sql, (user_id, stream_id))
 
             global_account_data = {
-                row[0]: json.loads(row[1]) for row in txn.fetchall()
+                row[0]: json.loads(row[1]) for row in txn
             }
 
             sql = (
@@ -193,7 +193,7 @@ class AccountDataStore(SQLBaseStore):
             txn.execute(sql, (user_id, stream_id))
 
             account_data_by_room = {}
-            for row in txn.fetchall():
+            for row in txn:
                 room_account_data = account_data_by_room.setdefault(row[0], {})
                 room_account_data[row[1]] = json.loads(row[2])
 
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 7925cb5f1b..2714519d21 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -178,7 +178,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                 )
                 txn.execute(sql, (user_id,))
                 message_json = ujson.dumps(messages_by_device["*"])
-                for row in txn.fetchall():
+                for row in txn:
                     # Add the message for all devices for this user on this
                     # server.
                     device = row[0]
@@ -195,7 +195,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                 # TODO: Maybe this needs to be done in batches if there are
                 # too many local devices for a given user.
                 txn.execute(sql, [user_id] + devices)
-                for row in txn.fetchall():
+                for row in txn:
                     # Only insert into the local inbox if the device exists on
                     # this server
                     device = row[0]
@@ -251,7 +251,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                 user_id, device_id, last_stream_id, current_stream_id, limit
             ))
             messages = []
-            for row in txn.fetchall():
+            for row in txn:
                 stream_pos = row[0]
                 messages.append(ujson.loads(row[1]))
             if len(messages) < limit:
@@ -340,7 +340,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                 " ORDER BY stream_id ASC"
             )
             txn.execute(sql, (last_pos, upper_pos))
-            rows.extend(txn.fetchall())
+            rows.extend(txn)
 
             return rows
 
@@ -384,7 +384,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
                 destination, last_stream_id, current_stream_id, limit
             ))
             messages = []
-            for row in txn.fetchall():
+            for row in txn:
                 stream_pos = row[0]
                 messages.append(ujson.loads(row[1]))
             if len(messages) < limit:
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index e545b62e39..53e36791d5 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -329,17 +329,20 @@ class DeviceStore(SQLBaseStore):
             SELECT user_id, device_id, max(stream_id) FROM device_lists_outbound_pokes
             WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
             GROUP BY user_id, device_id
+            LIMIT 20
         """
         txn.execute(
             sql, (destination, from_stream_id, now_stream_id, False)
         )
-        rows = txn.fetchall()
 
-        if not rows:
+        # maps (user_id, device_id) -> stream_id
+        query_map = {(r[0], r[1]): r[2] for r in txn}
+        if not query_map:
             return (now_stream_id, [])
 
-        # maps (user_id, device_id) -> stream_id
-        query_map = {(r[0], r[1]): r[2] for r in rows}
+        if len(query_map) >= 20:
+            now_stream_id = max(stream_id for stream_id in query_map.itervalues())
+
         devices = self._get_e2e_device_keys_txn(
             txn, query_map.keys(), include_all_devices=True
         )
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index c4020869f4..1fafeae3f3 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -193,7 +193,7 @@ class EndToEndKeyStore(SQLBaseStore):
             )
             txn.execute(sql, (user_id, device_id))
             result = {}
-            for algorithm, key_count in txn.fetchall():
+            for algorithm, key_count in txn:
                 result[algorithm] = key_count
             return result
         return self.runInteraction(
@@ -214,7 +214,7 @@ class EndToEndKeyStore(SQLBaseStore):
                 user_result = result.setdefault(user_id, {})
                 device_result = user_result.setdefault(device_id, {})
                 txn.execute(sql, (user_id, device_id, algorithm))
-                for key_id, key_json in txn.fetchall():
+                for key_id, key_json in txn:
                     device_result[algorithm + ":" + key_id] = key_json
                     delete.append((user_id, device_id, algorithm, key_id))
             sql = (
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 0d97de2fe7..43b5b49986 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -74,7 +74,7 @@ class EventFederationStore(SQLBaseStore):
                     base_sql % (",".join(["?"] * len(chunk)),),
                     chunk
                 )
-                new_front.update([r[0] for r in txn.fetchall()])
+                new_front.update([r[0] for r in txn])
 
             new_front -= results
 
@@ -110,7 +110,7 @@ class EventFederationStore(SQLBaseStore):
 
         txn.execute(sql, (room_id, False,))
 
-        return dict(txn.fetchall())
+        return dict(txn)
 
     def _get_oldest_events_in_room_txn(self, txn, room_id):
         return self._simple_select_onecol_txn(
@@ -152,7 +152,7 @@ class EventFederationStore(SQLBaseStore):
         txn.execute(sql, (room_id, ))
 
         results = []
-        for event_id, depth in txn.fetchall():
+        for event_id, depth in txn:
             hashes = self._get_event_reference_hashes_txn(txn, event_id)
             prev_hashes = {
                 k: encode_base64(v) for k, v in hashes.items()
@@ -334,8 +334,7 @@ class EventFederationStore(SQLBaseStore):
 
         def get_forward_extremeties_for_room_txn(txn):
             txn.execute(sql, (stream_ordering, room_id))
-            rows = txn.fetchall()
-            return [event_id for event_id, in rows]
+            return [event_id for event_id, in txn]
 
         return self.runInteraction(
             "get_forward_extremeties_for_room",
@@ -436,7 +435,7 @@ class EventFederationStore(SQLBaseStore):
                 (room_id, event_id, False, limit - len(event_results))
             )
 
-            for row in txn.fetchall():
+            for row in txn:
                 if row[1] not in event_results:
                     queue.put((-row[0], row[1]))
 
@@ -482,7 +481,7 @@ class EventFederationStore(SQLBaseStore):
                     (room_id, event_id, False, limit - len(event_results))
                 )
 
-                for e_id, in txn.fetchall():
+                for e_id, in txn:
                     new_front.add(e_id)
 
             new_front -= earliest_events
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 14543b4269..d6d8723b4a 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -206,7 +206,7 @@ class EventPushActionsStore(SQLBaseStore):
                 " stream_ordering >= ? AND stream_ordering <= ?"
             )
             txn.execute(sql, (min_stream_ordering, max_stream_ordering))
-            return [r[0] for r in txn.fetchall()]
+            return [r[0] for r in txn]
         ret = yield self.runInteraction("get_push_action_users_in_range", f)
         defer.returnValue(ret)
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 3c8393bfe8..3f6833fad2 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -217,14 +217,14 @@ class EventsStore(SQLBaseStore):
             partitioned.setdefault(event.room_id, []).append((event, ctx))
 
         deferreds = []
-        for room_id, evs_ctxs in partitioned.items():
+        for room_id, evs_ctxs in partitioned.iteritems():
             d = preserve_fn(self._event_persist_queue.add_to_queue)(
                 room_id, evs_ctxs,
                 backfilled=backfilled,
             )
             deferreds.append(d)
 
-        for room_id in partitioned.keys():
+        for room_id in partitioned:
             self._maybe_start_persisting(room_id)
 
         return preserve_context_over_deferred(
@@ -323,7 +323,7 @@ class EventsStore(SQLBaseStore):
                                 (event, context)
                             )
 
-                        for room_id, ev_ctx_rm in events_by_room.items():
+                        for room_id, ev_ctx_rm in events_by_room.iteritems():
                             # Work out new extremities by recursively adding and removing
                             # the new events.
                             latest_event_ids = yield self.get_latest_event_ids_in_room(
@@ -428,6 +428,7 @@ class EventsStore(SQLBaseStore):
         # Now we need to work out the different state sets for
         # each state extremities
         state_sets = []
+        state_groups = set()
         missing_event_ids = []
         was_updated = False
         for event_id in new_latest_event_ids:
@@ -437,9 +438,17 @@ class EventsStore(SQLBaseStore):
                 if event_id == ev.event_id:
                     if ctx.current_state_ids is None:
                         raise Exception("Unknown current state")
-                    state_sets.append(ctx.current_state_ids)
-                    if ctx.delta_ids or hasattr(ev, "state_key"):
-                        was_updated = True
+
+                    # If we've already seen the state group don't bother adding
+                    # it to the state sets again
+                    if ctx.state_group not in state_groups:
+                        state_sets.append(ctx.current_state_ids)
+                        if ctx.delta_ids or hasattr(ev, "state_key"):
+                            was_updated = True
+                        if ctx.state_group:
+                            # Add this as a seen state group (if it has a state
+                            # group)
+                            state_groups.add(ctx.state_group)
                     break
             else:
                 # If we couldn't find it, then we'll need to pull
@@ -453,45 +462,51 @@ class EventsStore(SQLBaseStore):
                 missing_event_ids,
             )
 
-            groups = set(event_to_groups.values())
-            group_to_state = yield self._get_state_for_groups(groups)
+            groups = set(event_to_groups.itervalues()) - state_groups
 
-            state_sets.extend(group_to_state.values())
+            if groups:
+                group_to_state = yield self._get_state_for_groups(groups)
+                state_sets.extend(group_to_state.itervalues())
 
         if not new_latest_event_ids:
             current_state = {}
         elif was_updated:
-            # We work out the current state by passing the state sets to the
-            # state resolution algorithm. It may ask for some events, including
-            # the events we have yet to persist, so we need a slightly more
-            # complicated event lookup function than simply looking the events
-            # up in the db.
-            events_map = {ev.event_id: ev for ev, _ in events_context}
-
-            @defer.inlineCallbacks
-            def get_events(ev_ids):
-                # We get the events by first looking at the list of events we
-                # are trying to persist, and then fetching the rest from the DB.
-                db = []
-                to_return = {}
-                for ev_id in ev_ids:
-                    ev = events_map.get(ev_id, None)
-                    if ev:
-                        to_return[ev_id] = ev
-                    else:
-                        db.append(ev_id)
-
-                if db:
-                    evs = yield self.get_events(
-                        ev_ids, get_prev_content=False, check_redacted=False,
-                    )
-                    to_return.update(evs)
-                defer.returnValue(to_return)
+            if len(state_sets) == 1:
+                # If there is only one state set, then we know what the current
+                # state is.
+                current_state = state_sets[0]
+            else:
+                # We work out the current state by passing the state sets to the
+                # state resolution algorithm. It may ask for some events, including
+                # the events we have yet to persist, so we need a slightly more
+                # complicated event lookup function than simply looking the events
+                # up in the db.
+                events_map = {ev.event_id: ev for ev, _ in events_context}
+
+                @defer.inlineCallbacks
+                def get_events(ev_ids):
+                    # We get the events by first looking at the list of events we
+                    # are trying to persist, and then fetching the rest from the DB.
+                    db = []
+                    to_return = {}
+                    for ev_id in ev_ids:
+                        ev = events_map.get(ev_id, None)
+                        if ev:
+                            to_return[ev_id] = ev
+                        else:
+                            db.append(ev_id)
 
-            current_state = yield resolve_events(
-                state_sets,
-                state_map_factory=get_events,
-            )
+                    if db:
+                        evs = yield self.get_events(
+                            ev_ids, get_prev_content=False, check_redacted=False,
+                        )
+                        to_return.update(evs)
+                    defer.returnValue(to_return)
+
+                current_state = yield resolve_events(
+                    state_sets,
+                    state_map_factory=get_events,
+                )
         else:
             return
 
@@ -718,7 +733,7 @@ class EventsStore(SQLBaseStore):
 
     def _update_forward_extremities_txn(self, txn, new_forward_extremities,
                                         max_stream_order):
-        for room_id, new_extrem in new_forward_extremities.items():
+        for room_id, new_extrem in new_forward_extremities.iteritems():
             self._simple_delete_txn(
                 txn,
                 table="event_forward_extremities",
@@ -736,7 +751,7 @@ class EventsStore(SQLBaseStore):
                     "event_id": ev_id,
                     "room_id": room_id,
                 }
-                for room_id, new_extrem in new_forward_extremities.items()
+                for room_id, new_extrem in new_forward_extremities.iteritems()
                 for ev_id in new_extrem
             ],
         )
@@ -753,7 +768,7 @@ class EventsStore(SQLBaseStore):
                     "event_id": event_id,
                     "stream_ordering": max_stream_order,
                 }
-                for room_id, new_extrem in new_forward_extremities.items()
+                for room_id, new_extrem in new_forward_extremities.iteritems()
                 for event_id in new_extrem
             ]
         )
@@ -807,7 +822,7 @@ class EventsStore(SQLBaseStore):
                     event.depth, depth_updates.get(event.room_id, event.depth)
                 )
 
-        for room_id, depth in depth_updates.items():
+        for room_id, depth in depth_updates.iteritems():
             self._update_min_depth_for_room_txn(txn, room_id, depth)
 
     def _update_outliers_txn(self, txn, events_and_contexts):
@@ -834,7 +849,7 @@ class EventsStore(SQLBaseStore):
 
         have_persisted = {
             event_id: outlier
-            for event_id, outlier in txn.fetchall()
+            for event_id, outlier in txn
         }
 
         to_remove = set()
@@ -958,14 +973,10 @@ class EventsStore(SQLBaseStore):
             return
 
         def event_dict(event):
-            return {
-                k: v
-                for k, v in event.get_dict().items()
-                if k not in [
-                    "redacted",
-                    "redacted_because",
-                ]
-            }
+            d = event.get_dict()
+            d.pop("redacted", None)
+            d.pop("redacted_because", None)
+            return d
 
         self._simple_insert_many_txn(
             txn,
@@ -1998,7 +2009,7 @@ class EventsStore(SQLBaseStore):
                         "state_key": key[1],
                         "event_id": state_id,
                     }
-                    for key, state_id in curr_state.items()
+                    for key, state_id in curr_state.iteritems()
                 ],
             )
 
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index ed84db6b4b..6e623843d5 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -356,7 +356,7 @@ def _get_or_create_schema_state(txn, database_engine):
             ),
             (current_version,)
         )
-        applied_deltas = [d for d, in txn.fetchall()]
+        applied_deltas = [d for d, in txn]
         return current_version, applied_deltas, upgraded
 
     return None
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 4d1590d2b4..9e9d3c2591 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -85,8 +85,8 @@ class PresenceStore(SQLBaseStore):
                 self.presence_stream_cache.entity_has_changed,
                 state.user_id, stream_id,
             )
-            self._invalidate_cache_and_stream(
-                txn, self._get_presence_for_user, (state.user_id,)
+            txn.call_after(
+                self._get_presence_for_user.invalidate, (state.user_id,)
             )
 
         # Actually insert new rows
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 5cf41501ea..6b0f8c2787 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -313,10 +313,9 @@ class ReceiptsStore(SQLBaseStore):
         )
 
         txn.execute(sql, (room_id, receipt_type, user_id))
-        results = txn.fetchall()
 
-        if results and topological_ordering:
-            for to, so, _ in results:
+        if topological_ordering:
+            for to, so, _ in txn:
                 if int(to) > topological_ordering:
                     return False
                 elif int(to) == topological_ordering and int(so) >= stream_ordering:
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 26be6060c3..ec2c52ab93 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -209,7 +209,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
                 " WHERE lower(name) = lower(?)"
             )
             txn.execute(sql, (user_id,))
-            return dict(txn.fetchall())
+            return dict(txn)
 
         return self.runInteraction("get_users_by_id_case_insensitive", f)
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 8a2fe2fdf5..e4c56cc175 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -396,7 +396,7 @@ class RoomStore(SQLBaseStore):
                     sql % ("AND appservice_id IS NULL",),
                     (stream_id,)
                 )
-            return dict(txn.fetchall())
+            return dict(txn)
         else:
             # We want to get from all lists, so we need to aggregate the results
 
@@ -422,7 +422,7 @@ class RoomStore(SQLBaseStore):
 
             results = {}
             # A room is visible if its visible on any list.
-            for room_id, visibility in txn.fetchall():
+            for room_id, visibility in txn:
                 results[room_id] = bool(visibility) or results.get(room_id, False)
 
             return results
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index e38d8927bf..367dbbbcf6 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -129,17 +129,30 @@ class RoomMemberStore(SQLBaseStore):
         with self._stream_id_gen.get_next() as stream_ordering:
             yield self.runInteraction("locally_reject_invite", f, stream_ordering)
 
+    @cachedInlineCallbacks(max_entries=100000, iterable=True, cache_context=True)
+    def get_hosts_in_room(self, room_id, cache_context):
+        """Returns the set of all hosts currently in the room
+        """
+        user_ids = yield self.get_users_in_room(
+            room_id, on_invalidate=cache_context.invalidate,
+        )
+        hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
+        defer.returnValue(hosts)
+
     @cached(max_entries=500000, iterable=True)
     def get_users_in_room(self, room_id):
         def f(txn):
-
-            rows = self._get_members_rows_txn(
-                txn,
-                room_id=room_id,
-                membership=Membership.JOIN,
+            sql = (
+                "SELECT m.user_id FROM room_memberships as m"
+                " INNER JOIN current_state_events as c"
+                " ON m.event_id = c.event_id "
+                " AND m.room_id = c.room_id "
+                " AND m.user_id = c.state_key"
+                " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
             )
 
-            return [r["user_id"] for r in rows]
+            txn.execute(sql, (room_id, Membership.JOIN,))
+            return [r[0] for r in txn]
         return self.runInteraction("get_users_in_room", f)
 
     @cached()
@@ -246,34 +259,6 @@ class RoomMemberStore(SQLBaseStore):
 
         return results
 
-    def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None):
-        where_clause = "c.room_id = ?"
-        where_values = [room_id]
-
-        if membership:
-            where_clause += " AND m.membership = ?"
-            where_values.append(membership)
-
-        if user_id:
-            where_clause += " AND m.user_id = ?"
-            where_values.append(user_id)
-
-        sql = (
-            "SELECT m.* FROM room_memberships as m"
-            " INNER JOIN current_state_events as c"
-            " ON m.event_id = c.event_id "
-            " AND m.room_id = c.room_id "
-            " AND m.user_id = c.state_key"
-            " WHERE c.type = 'm.room.member' AND %(where)s"
-        ) % {
-            "where": where_clause,
-        }
-
-        txn.execute(sql, where_values)
-        rows = self.cursor_to_dict(txn)
-
-        return rows
-
     @cachedInlineCallbacks(max_entries=500000, iterable=True)
     def get_rooms_for_user(self, user_id):
         """Returns a set of room_ids the user is currently joined to
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index e1dca927d7..67d5d9969a 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -72,7 +72,7 @@ class SignatureStore(SQLBaseStore):
             " WHERE event_id = ?"
         )
         txn.execute(query, (event_id, ))
-        return {k: v for k, v in txn.fetchall()}
+        return {k: v for k, v in txn}
 
     def _store_event_reference_hashes_txn(self, txn, events):
         """Store a hash for a PDU
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 1b42bea07a..314216f039 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -90,7 +90,7 @@ class StateStore(SQLBaseStore):
             event_ids,
         )
 
-        groups = set(event_to_groups.values())
+        groups = set(event_to_groups.itervalues())
         group_to_state = yield self._get_state_for_groups(groups)
 
         defer.returnValue(group_to_state)
@@ -108,17 +108,18 @@ class StateStore(SQLBaseStore):
 
         state_event_map = yield self.get_events(
             [
-                ev_id for group_ids in group_to_ids.values()
-                for ev_id in group_ids.values()
+                ev_id for group_ids in group_to_ids.itervalues()
+                for ev_id in group_ids.itervalues()
             ],
             get_prev_content=False
         )
 
         defer.returnValue({
             group: [
-                state_event_map[v] for v in event_id_map.values() if v in state_event_map
+                state_event_map[v] for v in event_id_map.itervalues()
+                if v in state_event_map
             ]
-            for group, event_id_map in group_to_ids.items()
+            for group, event_id_map in group_to_ids.iteritems()
         })
 
     def _have_persisted_state_group_txn(self, txn, state_group):
@@ -190,7 +191,7 @@ class StateStore(SQLBaseStore):
                             "state_key": key[1],
                             "event_id": state_id,
                         }
-                        for key, state_id in context.delta_ids.items()
+                        for key, state_id in context.delta_ids.iteritems()
                     ],
                 )
             else:
@@ -205,7 +206,7 @@ class StateStore(SQLBaseStore):
                             "state_key": key[1],
                             "event_id": state_id,
                         }
-                        for key, state_id in context.current_state_ids.items()
+                        for key, state_id in context.current_state_ids.iteritems()
                     ],
                 )
 
@@ -217,7 +218,7 @@ class StateStore(SQLBaseStore):
                     "state_group": state_group_id,
                     "event_id": event_id,
                 }
-                for event_id, state_group_id in state_groups.items()
+                for event_id, state_group_id in state_groups.iteritems()
             ],
         )
 
@@ -341,10 +342,10 @@ class StateStore(SQLBaseStore):
                     args.extend(where_args)
 
                     txn.execute(sql % (where_clause,), args)
-                    rows = self.cursor_to_dict(txn)
-                    for row in rows:
-                        key = (row["type"], row["state_key"])
-                        results[group][key] = row["event_id"]
+                    for row in txn:
+                        typ, state_key, event_id = row
+                        key = (typ, state_key)
+                        results[group][key] = event_id
         else:
             if types is not None:
                 where_clause = "AND (%s)" % (
@@ -373,12 +374,11 @@ class StateStore(SQLBaseStore):
                         " WHERE state_group = ? %s" % (where_clause,),
                         args
                     )
-                    rows = txn.fetchall()
-                    results[group].update({
-                        (typ, state_key): event_id
-                        for typ, state_key, event_id in rows
+                    results[group].update(
+                        ((typ, state_key), event_id)
+                        for typ, state_key, event_id in txn
                         if (typ, state_key) not in results[group]
-                    })
+                    )
 
                     # If the lengths match then we must have all the types,
                     # so no need to go walk further down the tree.
@@ -415,21 +415,21 @@ class StateStore(SQLBaseStore):
             event_ids,
         )
 
-        groups = set(event_to_groups.values())
+        groups = set(event_to_groups.itervalues())
         group_to_state = yield self._get_state_for_groups(groups, types)
 
         state_event_map = yield self.get_events(
-            [ev_id for sd in group_to_state.values() for ev_id in sd.values()],
+            [ev_id for sd in group_to_state.itervalues() for ev_id in sd.itervalues()],
             get_prev_content=False
         )
 
         event_to_state = {
             event_id: {
                 k: state_event_map[v]
-                for k, v in group_to_state[group].items()
+                for k, v in group_to_state[group].iteritems()
                 if v in state_event_map
             }
-            for event_id, group in event_to_groups.items()
+            for event_id, group in event_to_groups.iteritems()
         }
 
         defer.returnValue({event: event_to_state[event] for event in event_ids})
@@ -452,12 +452,12 @@ class StateStore(SQLBaseStore):
             event_ids,
         )
 
-        groups = set(event_to_groups.values())
+        groups = set(event_to_groups.itervalues())
         group_to_state = yield self._get_state_for_groups(groups, types)
 
         event_to_state = {
             event_id: group_to_state[group]
-            for event_id, group in event_to_groups.items()
+            for event_id, group in event_to_groups.iteritems()
         }
 
         defer.returnValue({event: event_to_state[event] for event in event_ids})
@@ -569,7 +569,7 @@ class StateStore(SQLBaseStore):
         got_all = not (missing_types or types is None)
 
         return {
-            k: v for k, v in state_dict_ids.items()
+            k: v for k, v in state_dict_ids.iteritems()
             if include(k[0], k[1])
         }, missing_types, got_all
 
@@ -628,7 +628,7 @@ class StateStore(SQLBaseStore):
 
             # Now we want to update the cache with all the things we fetched
             # from the database.
-            for group, group_state_dict in group_to_state_dict.items():
+            for group, group_state_dict in group_to_state_dict.iteritems():
                 if types:
                     # We delibrately put key -> None mappings into the cache to
                     # cache absence of the key, on the assumption that if we've
@@ -643,10 +643,10 @@ class StateStore(SQLBaseStore):
                 else:
                     state_dict = results[group]
 
-                state_dict.update({
-                    (intern_string(k[0]), intern_string(k[1])): v
-                    for k, v in group_state_dict.items()
-                })
+                state_dict.update(
+                    ((intern_string(k[0]), intern_string(k[1])), v)
+                    for k, v in group_state_dict.iteritems()
+                )
 
                 self._state_group_cache.update(
                     cache_seq_num,
@@ -657,10 +657,10 @@ class StateStore(SQLBaseStore):
 
         # Remove all the entries with None values. The None values were just
         # used for bookkeeping in the cache.
-        for group, state_dict in results.items():
+        for group, state_dict in results.iteritems():
             results[group] = {
                 key: event_id
-                for key, event_id in state_dict.items()
+                for key, event_id in state_dict.iteritems()
                 if event_id
             }
 
@@ -749,7 +749,7 @@ class StateStore(SQLBaseStore):
                         # of keys
 
                         delta_state = {
-                            key: value for key, value in curr_state.items()
+                            key: value for key, value in curr_state.iteritems()
                             if prev_state.get(key, None) != value
                         }
 
@@ -789,7 +789,7 @@ class StateStore(SQLBaseStore):
                                     "state_key": key[1],
                                     "event_id": state_id,
                                 }
-                                for key, state_id in delta_state.items()
+                                for key, state_id in delta_state.iteritems()
                             ],
                         )
 
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 5a2c1aa59b..bff73f3f04 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -95,7 +95,7 @@ class TagsStore(SQLBaseStore):
             for stream_id, user_id, room_id in tag_ids:
                 txn.execute(sql, (user_id, room_id))
                 tags = []
-                for tag, content in txn.fetchall():
+                for tag, content in txn:
                     tags.append(json.dumps(tag) + ":" + content)
                 tag_json = "{" + ",".join(tags) + "}"
                 results.append((stream_id, user_id, room_id, tag_json))
@@ -132,7 +132,7 @@ class TagsStore(SQLBaseStore):
                 " WHERE user_id = ? AND stream_id > ?"
             )
             txn.execute(sql, (user_id, stream_id))
-            room_ids = [row[0] for row in txn.fetchall()]
+            room_ids = [row[0] for row in txn]
             return room_ids
 
         changed = self._account_data_stream_cache.has_entity_changed(