summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2018-06-04 00:09:17 +0300
committerMatthew Hodgson <matthew@matrix.org>2018-06-04 00:09:17 +0300
commit28f09fcdd51796dd5036cb74e62b7a74d753f4be (patch)
tree40f098938d04cf93c8f7bb7aa82af879479b3334 /synapse/storage
parentmore comments (diff)
parentMerge pull request #3317 from thegcat/feature/3312-add_ipv6_to_blacklist_exam... (diff)
downloadsynapse-28f09fcdd51796dd5036cb74e62b7a74d753f4be.tar.xz
Merge branch 'develop' into matthew/filter_members
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/events.py47
-rw-r--r--synapse/storage/keys.py14
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/presence.py7
-rw-r--r--synapse/storage/receipts.py59
-rw-r--r--synapse/storage/registration.py37
-rw-r--r--synapse/storage/schema/delta/50/add_creation_ts_users_index.sql19
-rw-r--r--synapse/storage/search.py9
-rw-r--r--synapse/storage/signatures.py12
-rw-r--r--synapse/storage/state.py47
-rw-r--r--synapse/storage/transactions.py10
-rw-r--r--synapse/storage/user_directory.py8
12 files changed, 179 insertions, 92 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index b96104ccae..cb1082e864 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -40,6 +40,9 @@ import synapse.metrics
 from synapse.events import EventBase    # noqa: F401
 from synapse.events.snapshot import EventContext   # noqa: F401
 
+from six.moves import range
+from six import itervalues, iteritems
+
 from prometheus_client import Counter
 
 logger = logging.getLogger(__name__)
@@ -245,7 +248,7 @@ class EventsStore(EventsWorkerStore):
             partitioned.setdefault(event.room_id, []).append((event, ctx))
 
         deferreds = []
-        for room_id, evs_ctxs in partitioned.iteritems():
+        for room_id, evs_ctxs in iteritems(partitioned):
             d = self._event_persist_queue.add_to_queue(
                 room_id, evs_ctxs,
                 backfilled=backfilled,
@@ -330,7 +333,7 @@ class EventsStore(EventsWorkerStore):
 
             chunks = [
                 events_and_contexts[x:x + 100]
-                for x in xrange(0, len(events_and_contexts), 100)
+                for x in range(0, len(events_and_contexts), 100)
             ]
 
             for chunk in chunks:
@@ -364,7 +367,7 @@ class EventsStore(EventsWorkerStore):
                                 (event, context)
                             )
 
-                        for room_id, ev_ctx_rm in events_by_room.iteritems():
+                        for room_id, ev_ctx_rm in iteritems(events_by_room):
                             # Work out new extremities by recursively adding and removing
                             # the new events.
                             latest_event_ids = yield self.get_latest_event_ids_in_room(
@@ -459,12 +462,12 @@ class EventsStore(EventsWorkerStore):
 
                     event_counter.labels(event.type, origin_type, origin_entity).inc()
 
-                for room_id, new_state in current_state_for_room.iteritems():
+                for room_id, new_state in iteritems(current_state_for_room):
                     self.get_current_state_ids.prefill(
                         (room_id, ), new_state
                     )
 
-                for room_id, latest_event_ids in new_forward_extremeties.iteritems():
+                for room_id, latest_event_ids in iteritems(new_forward_extremeties):
                     self.get_latest_event_ids_in_room.prefill(
                         (room_id,), list(latest_event_ids)
                     )
@@ -641,20 +644,20 @@ class EventsStore(EventsWorkerStore):
         """
         existing_state = yield self.get_current_state_ids(room_id)
 
-        existing_events = set(existing_state.itervalues())
-        new_events = set(ev_id for ev_id in current_state.itervalues())
+        existing_events = set(itervalues(existing_state))
+        new_events = set(ev_id for ev_id in itervalues(current_state))
         changed_events = existing_events ^ new_events
 
         if not changed_events:
             return
 
         to_delete = {
-            key: ev_id for key, ev_id in existing_state.iteritems()
+            key: ev_id for key, ev_id in iteritems(existing_state)
             if ev_id in changed_events
         }
         events_to_insert = (new_events - existing_events)
         to_insert = {
-            key: ev_id for key, ev_id in current_state.iteritems()
+            key: ev_id for key, ev_id in iteritems(current_state)
             if ev_id in events_to_insert
         }
 
@@ -757,11 +760,11 @@ class EventsStore(EventsWorkerStore):
         )
 
     def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
-        for room_id, current_state_tuple in state_delta_by_room.iteritems():
+        for room_id, current_state_tuple in iteritems(state_delta_by_room):
                 to_delete, to_insert = current_state_tuple
                 txn.executemany(
                     "DELETE FROM current_state_events WHERE event_id = ?",
-                    [(ev_id,) for ev_id in to_delete.itervalues()],
+                    [(ev_id,) for ev_id in itervalues(to_delete)],
                 )
 
                 self._simple_insert_many_txn(
@@ -774,7 +777,7 @@ class EventsStore(EventsWorkerStore):
                             "type": key[0],
                             "state_key": key[1],
                         }
-                        for key, ev_id in to_insert.iteritems()
+                        for key, ev_id in iteritems(to_insert)
                     ],
                 )
 
@@ -793,7 +796,7 @@ class EventsStore(EventsWorkerStore):
                             "event_id": ev_id,
                             "prev_event_id": to_delete.get(key, None),
                         }
-                        for key, ev_id in state_deltas.iteritems()
+                        for key, ev_id in iteritems(state_deltas)
                     ]
                 )
 
@@ -836,7 +839,7 @@ class EventsStore(EventsWorkerStore):
 
     def _update_forward_extremities_txn(self, txn, new_forward_extremities,
                                         max_stream_order):
-        for room_id, new_extrem in new_forward_extremities.iteritems():
+        for room_id, new_extrem in iteritems(new_forward_extremities):
             self._simple_delete_txn(
                 txn,
                 table="event_forward_extremities",
@@ -854,7 +857,7 @@ class EventsStore(EventsWorkerStore):
                     "event_id": ev_id,
                     "room_id": room_id,
                 }
-                for room_id, new_extrem in new_forward_extremities.iteritems()
+                for room_id, new_extrem in iteritems(new_forward_extremities)
                 for ev_id in new_extrem
             ],
         )
@@ -871,7 +874,7 @@ class EventsStore(EventsWorkerStore):
                     "event_id": event_id,
                     "stream_ordering": max_stream_order,
                 }
-                for room_id, new_extrem in new_forward_extremities.iteritems()
+                for room_id, new_extrem in iteritems(new_forward_extremities)
                 for event_id in new_extrem
             ]
         )
@@ -899,7 +902,7 @@ class EventsStore(EventsWorkerStore):
                         new_events_and_contexts[event.event_id] = (event, context)
             else:
                 new_events_and_contexts[event.event_id] = (event, context)
-        return new_events_and_contexts.values()
+        return list(new_events_and_contexts.values())
 
     def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
         """Update min_depth for each room
@@ -925,7 +928,7 @@ class EventsStore(EventsWorkerStore):
                     event.depth, depth_updates.get(event.room_id, event.depth)
                 )
 
-        for room_id, depth in depth_updates.iteritems():
+        for room_id, depth in iteritems(depth_updates):
             self._update_min_depth_for_room_txn(txn, room_id, depth)
 
     def _update_outliers_txn(self, txn, events_and_contexts):
@@ -1309,7 +1312,7 @@ class EventsStore(EventsWorkerStore):
                 " WHERE e.event_id IN (%s)"
             ) % (",".join(["?"] * len(ev_map)),)
 
-            txn.execute(sql, ev_map.keys())
+            txn.execute(sql, list(ev_map))
             rows = self.cursor_to_dict(txn)
             for row in rows:
                 event = ev_map[row["event_id"]]
@@ -1572,7 +1575,7 @@ class EventsStore(EventsWorkerStore):
 
             chunks = [
                 event_ids[i:i + 100]
-                for i in xrange(0, len(event_ids), 100)
+                for i in range(0, len(event_ids), 100)
             ]
             for chunk in chunks:
                 ev_rows = self._simple_select_many_txn(
@@ -1986,7 +1989,7 @@ class EventsStore(EventsWorkerStore):
         logger.info("[purge] finding state groups which depend on redundant"
                     " state groups")
         remaining_state_groups = []
-        for i in xrange(0, len(state_rows), 100):
+        for i in range(0, len(state_rows), 100):
             chunk = [sg for sg, in state_rows[i:i + 100]]
             # look for state groups whose prev_state_group is one we are about
             # to delete
@@ -2042,7 +2045,7 @@ class EventsStore(EventsWorkerStore):
                         "state_key": key[1],
                         "event_id": state_id,
                     }
-                    for key, state_id in curr_state.iteritems()
+                    for key, state_id in iteritems(curr_state)
                 ],
             )
 
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 0540c2b0b1..0f13b61da8 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -17,6 +17,7 @@ from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 
 from twisted.internet import defer
+import six
 
 import OpenSSL
 from signedjson.key import decode_verify_key_bytes
@@ -26,6 +27,13 @@ import logging
 
 logger = logging.getLogger(__name__)
 
+# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
+# despite being deprecated and removed in favor of memoryview
+if six.PY2:
+    db_binary_type = buffer
+else:
+    db_binary_type = memoryview
+
 
 class KeyStore(SQLBaseStore):
     """Persistence for signature verification keys and tls X.509 certificates
@@ -72,7 +80,7 @@ class KeyStore(SQLBaseStore):
             values={
                 "from_server": from_server,
                 "ts_added_ms": time_now_ms,
-                "tls_certificate": buffer(tls_certificate_bytes),
+                "tls_certificate": db_binary_type(tls_certificate_bytes),
             },
             desc="store_server_certificate",
         )
@@ -135,7 +143,7 @@ class KeyStore(SQLBaseStore):
                 values={
                     "from_server": from_server,
                     "ts_added_ms": time_now_ms,
-                    "verify_key": buffer(verify_key.encode()),
+                    "verify_key": db_binary_type(verify_key.encode()),
                 },
             )
             txn.call_after(
@@ -172,7 +180,7 @@ class KeyStore(SQLBaseStore):
                 "from_server": from_server,
                 "ts_added_ms": ts_now_ms,
                 "ts_valid_until_ms": ts_expires_ms,
-                "key_json": buffer(key_json_bytes),
+                "key_json": db_binary_type(key_json_bytes),
             },
             desc="store_server_keys_json",
         )
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index c08e9cd65a..cf2aae0468 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 49
+SCHEMA_VERSION = 50
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 9e9d3c2591..f05d91cc58 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -16,6 +16,7 @@
 from ._base import SQLBaseStore
 from synapse.api.constants import PresenceState
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
+from synapse.util import batch_iter
 
 from collections import namedtuple
 from twisted.internet import defer
@@ -115,11 +116,7 @@ class PresenceStore(SQLBaseStore):
             " AND user_id IN (%s)"
         )
 
-        batches = (
-            presence_states[i:i + 50]
-            for i in xrange(0, len(presence_states), 50)
-        )
-        for states in batches:
+        for states in batch_iter(presence_states, 50):
             args = [stream_id]
             args.extend(s.user_id for s in states)
             txn.execute(
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 709c69a926..c93c228f6e 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -332,6 +332,35 @@ class ReceiptsStore(ReceiptsWorkerStore):
 
     def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
                                       user_id, event_id, data, stream_id):
+        res = self._simple_select_one_txn(
+            txn,
+            table="events",
+            retcols=["topological_ordering", "stream_ordering"],
+            keyvalues={"event_id": event_id},
+            allow_none=True
+        )
+
+        stream_ordering = int(res["stream_ordering"]) if res else None
+
+        # We don't want to clobber receipts for more recent events, so we
+        # have to compare orderings of existing receipts
+        if stream_ordering is not None:
+            sql = (
+                "SELECT stream_ordering, event_id FROM events"
+                " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
+                " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
+            )
+            txn.execute(sql, (room_id, receipt_type, user_id))
+
+            for so, eid in txn:
+                if int(so) >= stream_ordering:
+                    logger.debug(
+                        "Ignoring new receipt for %s in favour of existing "
+                        "one for later event %s",
+                        event_id, eid,
+                    )
+                    return False
+
         txn.call_after(
             self.get_receipts_for_room.invalidate, (room_id, receipt_type)
         )
@@ -355,34 +384,6 @@ class ReceiptsStore(ReceiptsWorkerStore):
             (user_id, room_id, receipt_type)
         )
 
-        res = self._simple_select_one_txn(
-            txn,
-            table="events",
-            retcols=["topological_ordering", "stream_ordering"],
-            keyvalues={"event_id": event_id},
-            allow_none=True
-        )
-
-        topological_ordering = int(res["topological_ordering"]) if res else None
-        stream_ordering = int(res["stream_ordering"]) if res else None
-
-        # We don't want to clobber receipts for more recent events, so we
-        # have to compare orderings of existing receipts
-        sql = (
-            "SELECT topological_ordering, stream_ordering, event_id FROM events"
-            " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
-            " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
-        )
-
-        txn.execute(sql, (room_id, receipt_type, user_id))
-
-        if topological_ordering:
-            for to, so, _ in txn:
-                if int(to) > topological_ordering:
-                    return False
-                elif int(to) == topological_ordering and int(so) >= stream_ordering:
-                    return False
-
         self._simple_delete_txn(
             txn,
             table="receipts_linearized",
@@ -406,7 +407,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
             }
         )
 
-        if receipt_type == "m.read" and topological_ordering:
+        if receipt_type == "m.read" and stream_ordering is not None:
             self._remove_old_push_actions_before_txn(
                 txn,
                 room_id=room_id,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index a530e29f43..c241167fbe 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -36,6 +36,7 @@ class RegistrationWorkerStore(SQLBaseStore):
             retcols=[
                 "name", "password_hash", "is_guest",
                 "consent_version", "consent_server_notice_sent",
+                "appservice_id",
             ],
             allow_none=True,
             desc="get_user_by_id",
@@ -101,6 +102,13 @@ class RegistrationStore(RegistrationWorkerStore,
             columns=["user_id", "device_id"],
         )
 
+        self.register_background_index_update(
+            "users_creation_ts",
+            index_name="users_creation_ts",
+            table="users",
+            columns=["creation_ts"],
+        )
+
         # we no longer use refresh tokens, but it's possible that some people
         # might have a background update queued to build this index. Just
         # clear the background update.
@@ -485,6 +493,35 @@ class RegistrationStore(RegistrationWorkerStore,
         ret = yield self.runInteraction("count_users", _count_users)
         defer.returnValue(ret)
 
+    def count_daily_user_type(self):
+        """
+        Counts 1) native non guest users
+               2) native guests users
+               3) bridged users
+        who registered on the homeserver in the past 24 hours
+        """
+        def _count_daily_user_type(txn):
+            yesterday = int(self._clock.time()) - (60 * 60 * 24)
+
+            sql = """
+                SELECT user_type, COALESCE(count(*), 0) AS count FROM (
+                    SELECT
+                    CASE
+                        WHEN is_guest=0 AND appservice_id IS NULL THEN 'native'
+                        WHEN is_guest=1 AND appservice_id IS NULL THEN 'guest'
+                        WHEN is_guest=0 AND appservice_id IS NOT NULL THEN 'bridged'
+                    END AS user_type
+                    FROM users
+                    WHERE creation_ts > ?
+                ) AS t GROUP BY user_type
+            """
+            results = {'native': 0, 'guest': 0, 'bridged': 0}
+            txn.execute(sql, (yesterday,))
+            for row in txn:
+                results[row[0]] = row[1]
+            return results
+        return self.runInteraction("count_daily_user_type", _count_daily_user_type)
+
     @defer.inlineCallbacks
     def count_nonbridged_users(self):
         def _count_users(txn):
diff --git a/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql b/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql
new file mode 100644
index 0000000000..c93ae47532
--- /dev/null
+++ b/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql
@@ -0,0 +1,19 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('users_creation_ts', '{}');
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 6ba3e59889..f0fa5d7631 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -18,13 +18,14 @@ import logging
 import re
 import simplejson as json
 
+from six import string_types
+
 from twisted.internet import defer
 
 from .background_updates import BackgroundUpdateStore
 from synapse.api.errors import SynapseError
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
-
 logger = logging.getLogger(__name__)
 
 SearchEntry = namedtuple('SearchEntry', [
@@ -126,7 +127,7 @@ class SearchStore(BackgroundUpdateStore):
                     # skip over it.
                     continue
 
-                if not isinstance(value, basestring):
+                if not isinstance(value, string_types):
                     # If the event body, name or topic isn't a string
                     # then skip over it
                     continue
@@ -447,7 +448,7 @@ class SearchStore(BackgroundUpdateStore):
             "search_msgs", self.cursor_to_dict, sql, *args
         )
 
-        results = filter(lambda row: row["room_id"] in room_ids, results)
+        results = list(filter(lambda row: row["room_id"] in room_ids, results))
 
         events = yield self._get_events([r["event_id"] for r in results])
 
@@ -602,7 +603,7 @@ class SearchStore(BackgroundUpdateStore):
             "search_rooms", self.cursor_to_dict, sql, *args
         )
 
-        results = filter(lambda row: row["room_id"] in room_ids, results)
+        results = list(filter(lambda row: row["room_id"] in room_ids, results))
 
         events = yield self._get_events([r["event_id"] for r in results])
 
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 9e6eaaa532..25922e5a9c 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from twisted.internet import defer
+import six
 
 from ._base import SQLBaseStore
 
@@ -21,6 +22,13 @@ from unpaddedbase64 import encode_base64
 from synapse.crypto.event_signing import compute_event_reference_hash
 from synapse.util.caches.descriptors import cached, cachedList
 
+# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
+# despite being deprecated and removed in favor of memoryview
+if six.PY2:
+    db_binary_type = buffer
+else:
+    db_binary_type = memoryview
+
 
 class SignatureWorkerStore(SQLBaseStore):
     @cached()
@@ -56,7 +64,7 @@ class SignatureWorkerStore(SQLBaseStore):
             for e_id, h in hashes.items()
         }
 
-        defer.returnValue(hashes.items())
+        defer.returnValue(list(hashes.items()))
 
     def _get_event_reference_hashes_txn(self, txn, event_id):
         """Get all the hashes for a given PDU.
@@ -91,7 +99,7 @@ class SignatureStore(SignatureWorkerStore):
             vals.append({
                 "event_id": event.event_id,
                 "algorithm": ref_alg,
-                "hash": buffer(ref_hash_bytes),
+                "hash": db_binary_type(ref_hash_bytes),
             })
 
         self._simple_insert_many_txn(
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 63b6834202..3b87d981b5 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -16,6 +16,9 @@
 from collections import namedtuple
 import logging
 
+from six import iteritems, itervalues
+from six.moves import range
+
 from twisted.internet import defer
 
 from synapse.storage.background_updates import BackgroundUpdateStore
@@ -134,7 +137,7 @@ class StateGroupWorkerStore(SQLBaseStore):
             event_ids,
         )
 
-        groups = set(event_to_groups.itervalues())
+        groups = set(itervalues(event_to_groups))
         group_to_state = yield self._get_state_for_groups(groups)
 
         defer.returnValue(group_to_state)
@@ -166,18 +169,18 @@ class StateGroupWorkerStore(SQLBaseStore):
 
         state_event_map = yield self.get_events(
             [
-                ev_id for group_ids in group_to_ids.itervalues()
-                for ev_id in group_ids.itervalues()
+                ev_id for group_ids in itervalues(group_to_ids)
+                for ev_id in itervalues(group_ids)
             ],
             get_prev_content=False
         )
 
         defer.returnValue({
             group: [
-                state_event_map[v] for v in event_id_map.itervalues()
+                state_event_map[v] for v in itervalues(event_id_map)
                 if v in state_event_map
             ]
-            for group, event_id_map in group_to_ids.iteritems()
+            for group, event_id_map in iteritems(group_to_ids)
         })
 
     @defer.inlineCallbacks
@@ -198,7 +201,7 @@ class StateGroupWorkerStore(SQLBaseStore):
         """
         results = {}
 
-        chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
+        chunks = [groups[i:i + 100] for i in range(0, len(groups), 100)]
         for chunk in chunks:
             res = yield self.runInteraction(
                 "_get_state_groups_from_groups",
@@ -390,21 +393,21 @@ class StateGroupWorkerStore(SQLBaseStore):
             event_ids,
         )
 
-        groups = set(event_to_groups.itervalues())
+        groups = set(itervalues(event_to_groups))
         group_to_state = yield self._get_state_for_groups(groups, types)
 
         state_event_map = yield self.get_events(
-            [ev_id for sd in group_to_state.itervalues() for ev_id in sd.itervalues()],
+            [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
             get_prev_content=False
         )
 
         event_to_state = {
             event_id: {
                 k: state_event_map[v]
-                for k, v in group_to_state[group].iteritems()
+                for k, v in iteritems(group_to_state[group])
                 if v in state_event_map
             }
-            for event_id, group in event_to_groups.iteritems()
+            for event_id, group in iteritems(event_to_groups)
         }
 
         defer.returnValue({event: event_to_state[event] for event in event_ids})
@@ -429,12 +432,12 @@ class StateGroupWorkerStore(SQLBaseStore):
             event_ids,
         )
 
-        groups = set(event_to_groups.itervalues())
+        groups = set(itervalues(event_to_groups))
         group_to_state = yield self._get_state_for_groups(groups, types)
 
         event_to_state = {
             event_id: group_to_state[group]
-            for event_id, group in event_to_groups.iteritems()
+            for event_id, group in iteritems(event_to_groups)
         }
 
         defer.returnValue({event: event_to_state[event] for event in event_ids})
@@ -562,7 +565,7 @@ class StateGroupWorkerStore(SQLBaseStore):
         got_all = is_all or not missing_types
 
         return {
-            k: v for k, v in state_dict_ids.iteritems()
+            k: v for k, v in iteritems(state_dict_ids)
             if include(k[0], k[1])
         }, missing_types, got_all
 
@@ -630,12 +633,12 @@ class StateGroupWorkerStore(SQLBaseStore):
 
             # Now we want to update the cache with all the things we fetched
             # from the database.
-            for group, group_state_dict in group_to_state_dict.iteritems():
+            for group, group_state_dict in iteritems(group_to_state_dict):
                 state_dict = results[group]
 
                 state_dict.update(
                     ((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
-                    for k, v in group_state_dict.iteritems()
+                    for k, v in iteritems(group_state_dict)
                 )
 
                 self._state_group_cache.update(
@@ -722,7 +725,7 @@ class StateGroupWorkerStore(SQLBaseStore):
                             "state_key": key[1],
                             "event_id": state_id,
                         }
-                        for key, state_id in delta_ids.iteritems()
+                        for key, state_id in iteritems(delta_ids)
                     ],
                 )
             else:
@@ -737,7 +740,7 @@ class StateGroupWorkerStore(SQLBaseStore):
                             "state_key": key[1],
                             "event_id": state_id,
                         }
-                        for key, state_id in current_state_ids.iteritems()
+                        for key, state_id in iteritems(current_state_ids)
                     ],
                 )
 
@@ -862,11 +865,11 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
                     "state_group": state_group_id,
                     "event_id": event_id,
                 }
-                for event_id, state_group_id in state_groups.iteritems()
+                for event_id, state_group_id in iteritems(state_groups)
             ],
         )
 
-        for event_id, state_group_id in state_groups.iteritems():
+        for event_id, state_group_id in iteritems(state_groups):
             txn.call_after(
                 self._get_state_group_for_event.prefill,
                 (event_id,), state_group_id
@@ -894,7 +897,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
 
         def reindex_txn(txn):
             new_last_state_group = last_state_group
-            for count in xrange(batch_size):
+            for count in range(batch_size):
                 txn.execute(
                     "SELECT id, room_id FROM state_groups"
                     " WHERE ? < id AND id <= ?"
@@ -952,7 +955,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
                         # of keys
 
                         delta_state = {
-                            key: value for key, value in curr_state.iteritems()
+                            key: value for key, value in iteritems(curr_state)
                             if prev_state.get(key, None) != value
                         }
 
@@ -992,7 +995,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
                                     "state_key": key[1],
                                     "event_id": state_id,
                                 }
-                                for key, state_id in delta_state.iteritems()
+                                for key, state_id in iteritems(delta_state)
                             ],
                         )
 
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index f825264ea9..e485d19b84 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -17,6 +17,7 @@ from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached
 
 from twisted.internet import defer
+import six
 
 from canonicaljson import encode_canonical_json
 
@@ -25,6 +26,13 @@ from collections import namedtuple
 import logging
 import simplejson as json
 
+# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
+# despite being deprecated and removed in favor of memoryview
+if six.PY2:
+    db_binary_type = buffer
+else:
+    db_binary_type = memoryview
+
 logger = logging.getLogger(__name__)
 
 
@@ -110,7 +118,7 @@ class TransactionStore(SQLBaseStore):
                 "transaction_id": transaction_id,
                 "origin": origin,
                 "response_code": code,
-                "response_json": buffer(encode_canonical_json(response_dict)),
+                "response_json": db_binary_type(encode_canonical_json(response_dict)),
                 "ts": self._clock.time_msec(),
             },
             or_ignore=True,
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index d6e289ffbe..275c299998 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -22,6 +22,8 @@ from synapse.api.constants import EventTypes, JoinRules
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import get_domain_from_id, get_localpart_from_id
 
+from six import iteritems
+
 import re
 import logging
 
@@ -100,7 +102,7 @@ class UserDirectoryStore(SQLBaseStore):
                     user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id),
                     profile.display_name,
                 )
-                for user_id, profile in users_with_profile.iteritems()
+                for user_id, profile in iteritems(users_with_profile)
             )
         elif isinstance(self.database_engine, Sqlite3Engine):
             sql = """
@@ -112,7 +114,7 @@ class UserDirectoryStore(SQLBaseStore):
                     user_id,
                     "%s %s" % (user_id, p.display_name,) if p.display_name else user_id
                 )
-                for user_id, p in users_with_profile.iteritems()
+                for user_id, p in iteritems(users_with_profile)
             )
         else:
             # This should be unreachable.
@@ -130,7 +132,7 @@ class UserDirectoryStore(SQLBaseStore):
                         "display_name": profile.display_name,
                         "avatar_url": profile.avatar_url,
                     }
-                    for user_id, profile in users_with_profile.iteritems()
+                    for user_id, profile in iteritems(users_with_profile)
                 ]
             )
             for user_id in users_with_profile: