diff options
author | Matthew Hodgson <matthew@matrix.org> | 2018-06-04 00:09:17 +0300 |
---|---|---|
committer | Matthew Hodgson <matthew@matrix.org> | 2018-06-04 00:09:17 +0300 |
commit | 28f09fcdd51796dd5036cb74e62b7a74d753f4be (patch) | |
tree | 40f098938d04cf93c8f7bb7aa82af879479b3334 /synapse/storage | |
parent | more comments (diff) | |
parent | Merge pull request #3317 from thegcat/feature/3312-add_ipv6_to_blacklist_exam... (diff) | |
download | synapse-28f09fcdd51796dd5036cb74e62b7a74d753f4be.tar.xz |
Merge branch 'develop' into matthew/filter_members
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/events.py | 47 | ||||
-rw-r--r-- | synapse/storage/keys.py | 14 | ||||
-rw-r--r-- | synapse/storage/prepare_database.py | 2 | ||||
-rw-r--r-- | synapse/storage/presence.py | 7 | ||||
-rw-r--r-- | synapse/storage/receipts.py | 59 | ||||
-rw-r--r-- | synapse/storage/registration.py | 37 | ||||
-rw-r--r-- | synapse/storage/schema/delta/50/add_creation_ts_users_index.sql | 19 | ||||
-rw-r--r-- | synapse/storage/search.py | 9 | ||||
-rw-r--r-- | synapse/storage/signatures.py | 12 | ||||
-rw-r--r-- | synapse/storage/state.py | 47 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 10 | ||||
-rw-r--r-- | synapse/storage/user_directory.py | 8 |
12 files changed, 179 insertions, 92 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b96104ccae..cb1082e864 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -40,6 +40,9 @@ import synapse.metrics from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 +from six.moves import range +from six import itervalues, iteritems + from prometheus_client import Counter logger = logging.getLogger(__name__) @@ -245,7 +248,7 @@ class EventsStore(EventsWorkerStore): partitioned.setdefault(event.room_id, []).append((event, ctx)) deferreds = [] - for room_id, evs_ctxs in partitioned.iteritems(): + for room_id, evs_ctxs in iteritems(partitioned): d = self._event_persist_queue.add_to_queue( room_id, evs_ctxs, backfilled=backfilled, @@ -330,7 +333,7 @@ class EventsStore(EventsWorkerStore): chunks = [ events_and_contexts[x:x + 100] - for x in xrange(0, len(events_and_contexts), 100) + for x in range(0, len(events_and_contexts), 100) ] for chunk in chunks: @@ -364,7 +367,7 @@ class EventsStore(EventsWorkerStore): (event, context) ) - for room_id, ev_ctx_rm in events_by_room.iteritems(): + for room_id, ev_ctx_rm in iteritems(events_by_room): # Work out new extremities by recursively adding and removing # the new events. latest_event_ids = yield self.get_latest_event_ids_in_room( @@ -459,12 +462,12 @@ class EventsStore(EventsWorkerStore): event_counter.labels(event.type, origin_type, origin_entity).inc() - for room_id, new_state in current_state_for_room.iteritems(): + for room_id, new_state in iteritems(current_state_for_room): self.get_current_state_ids.prefill( (room_id, ), new_state ) - for room_id, latest_event_ids in new_forward_extremeties.iteritems(): + for room_id, latest_event_ids in iteritems(new_forward_extremeties): self.get_latest_event_ids_in_room.prefill( (room_id,), list(latest_event_ids) ) @@ -641,20 +644,20 @@ class EventsStore(EventsWorkerStore): """ existing_state = yield self.get_current_state_ids(room_id) - existing_events = set(existing_state.itervalues()) - new_events = set(ev_id for ev_id in current_state.itervalues()) + existing_events = set(itervalues(existing_state)) + new_events = set(ev_id for ev_id in itervalues(current_state)) changed_events = existing_events ^ new_events if not changed_events: return to_delete = { - key: ev_id for key, ev_id in existing_state.iteritems() + key: ev_id for key, ev_id in iteritems(existing_state) if ev_id in changed_events } events_to_insert = (new_events - existing_events) to_insert = { - key: ev_id for key, ev_id in current_state.iteritems() + key: ev_id for key, ev_id in iteritems(current_state) if ev_id in events_to_insert } @@ -757,11 +760,11 @@ class EventsStore(EventsWorkerStore): ) def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): - for room_id, current_state_tuple in state_delta_by_room.iteritems(): + for room_id, current_state_tuple in iteritems(state_delta_by_room): to_delete, to_insert = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", - [(ev_id,) for ev_id in to_delete.itervalues()], + [(ev_id,) for ev_id in itervalues(to_delete)], ) self._simple_insert_many_txn( @@ -774,7 +777,7 @@ class EventsStore(EventsWorkerStore): "type": key[0], "state_key": key[1], } - for key, ev_id in to_insert.iteritems() + for key, ev_id in iteritems(to_insert) ], ) @@ -793,7 +796,7 @@ class EventsStore(EventsWorkerStore): "event_id": ev_id, "prev_event_id": to_delete.get(key, None), } - for key, ev_id in state_deltas.iteritems() + for key, ev_id in iteritems(state_deltas) ] ) @@ -836,7 +839,7 @@ class EventsStore(EventsWorkerStore): def _update_forward_extremities_txn(self, txn, new_forward_extremities, max_stream_order): - for room_id, new_extrem in new_forward_extremities.iteritems(): + for room_id, new_extrem in iteritems(new_forward_extremities): self._simple_delete_txn( txn, table="event_forward_extremities", @@ -854,7 +857,7 @@ class EventsStore(EventsWorkerStore): "event_id": ev_id, "room_id": room_id, } - for room_id, new_extrem in new_forward_extremities.iteritems() + for room_id, new_extrem in iteritems(new_forward_extremities) for ev_id in new_extrem ], ) @@ -871,7 +874,7 @@ class EventsStore(EventsWorkerStore): "event_id": event_id, "stream_ordering": max_stream_order, } - for room_id, new_extrem in new_forward_extremities.iteritems() + for room_id, new_extrem in iteritems(new_forward_extremities) for event_id in new_extrem ] ) @@ -899,7 +902,7 @@ class EventsStore(EventsWorkerStore): new_events_and_contexts[event.event_id] = (event, context) else: new_events_and_contexts[event.event_id] = (event, context) - return new_events_and_contexts.values() + return list(new_events_and_contexts.values()) def _update_room_depths_txn(self, txn, events_and_contexts, backfilled): """Update min_depth for each room @@ -925,7 +928,7 @@ class EventsStore(EventsWorkerStore): event.depth, depth_updates.get(event.room_id, event.depth) ) - for room_id, depth in depth_updates.iteritems(): + for room_id, depth in iteritems(depth_updates): self._update_min_depth_for_room_txn(txn, room_id, depth) def _update_outliers_txn(self, txn, events_and_contexts): @@ -1309,7 +1312,7 @@ class EventsStore(EventsWorkerStore): " WHERE e.event_id IN (%s)" ) % (",".join(["?"] * len(ev_map)),) - txn.execute(sql, ev_map.keys()) + txn.execute(sql, list(ev_map)) rows = self.cursor_to_dict(txn) for row in rows: event = ev_map[row["event_id"]] @@ -1572,7 +1575,7 @@ class EventsStore(EventsWorkerStore): chunks = [ event_ids[i:i + 100] - for i in xrange(0, len(event_ids), 100) + for i in range(0, len(event_ids), 100) ] for chunk in chunks: ev_rows = self._simple_select_many_txn( @@ -1986,7 +1989,7 @@ class EventsStore(EventsWorkerStore): logger.info("[purge] finding state groups which depend on redundant" " state groups") remaining_state_groups = [] - for i in xrange(0, len(state_rows), 100): + for i in range(0, len(state_rows), 100): chunk = [sg for sg, in state_rows[i:i + 100]] # look for state groups whose prev_state_group is one we are about # to delete @@ -2042,7 +2045,7 @@ class EventsStore(EventsWorkerStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in curr_state.iteritems() + for key, state_id in iteritems(curr_state) ], ) diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 0540c2b0b1..0f13b61da8 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -17,6 +17,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks from twisted.internet import defer +import six import OpenSSL from signedjson.key import decode_verify_key_bytes @@ -26,6 +27,13 @@ import logging logger = logging.getLogger(__name__) +# py2 sqlite has buffer hardcoded as only binary type, so we must use it, +# despite being deprecated and removed in favor of memoryview +if six.PY2: + db_binary_type = buffer +else: + db_binary_type = memoryview + class KeyStore(SQLBaseStore): """Persistence for signature verification keys and tls X.509 certificates @@ -72,7 +80,7 @@ class KeyStore(SQLBaseStore): values={ "from_server": from_server, "ts_added_ms": time_now_ms, - "tls_certificate": buffer(tls_certificate_bytes), + "tls_certificate": db_binary_type(tls_certificate_bytes), }, desc="store_server_certificate", ) @@ -135,7 +143,7 @@ class KeyStore(SQLBaseStore): values={ "from_server": from_server, "ts_added_ms": time_now_ms, - "verify_key": buffer(verify_key.encode()), + "verify_key": db_binary_type(verify_key.encode()), }, ) txn.call_after( @@ -172,7 +180,7 @@ class KeyStore(SQLBaseStore): "from_server": from_server, "ts_added_ms": ts_now_ms, "ts_valid_until_ms": ts_expires_ms, - "key_json": buffer(key_json_bytes), + "key_json": db_binary_type(key_json_bytes), }, desc="store_server_keys_json", ) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index c08e9cd65a..cf2aae0468 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 49 +SCHEMA_VERSION = 50 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 9e9d3c2591..f05d91cc58 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -16,6 +16,7 @@ from ._base import SQLBaseStore from synapse.api.constants import PresenceState from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList +from synapse.util import batch_iter from collections import namedtuple from twisted.internet import defer @@ -115,11 +116,7 @@ class PresenceStore(SQLBaseStore): " AND user_id IN (%s)" ) - batches = ( - presence_states[i:i + 50] - for i in xrange(0, len(presence_states), 50) - ) - for states in batches: + for states in batch_iter(presence_states, 50): args = [stream_id] args.extend(s.user_id for s in states) txn.execute( diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 709c69a926..c93c228f6e 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -332,6 +332,35 @@ class ReceiptsStore(ReceiptsWorkerStore): def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, user_id, event_id, data, stream_id): + res = self._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + allow_none=True + ) + + stream_ordering = int(res["stream_ordering"]) if res else None + + # We don't want to clobber receipts for more recent events, so we + # have to compare orderings of existing receipts + if stream_ordering is not None: + sql = ( + "SELECT stream_ordering, event_id FROM events" + " INNER JOIN receipts_linearized as r USING (event_id, room_id)" + " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" + ) + txn.execute(sql, (room_id, receipt_type, user_id)) + + for so, eid in txn: + if int(so) >= stream_ordering: + logger.debug( + "Ignoring new receipt for %s in favour of existing " + "one for later event %s", + event_id, eid, + ) + return False + txn.call_after( self.get_receipts_for_room.invalidate, (room_id, receipt_type) ) @@ -355,34 +384,6 @@ class ReceiptsStore(ReceiptsWorkerStore): (user_id, room_id, receipt_type) ) - res = self._simple_select_one_txn( - txn, - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": event_id}, - allow_none=True - ) - - topological_ordering = int(res["topological_ordering"]) if res else None - stream_ordering = int(res["stream_ordering"]) if res else None - - # We don't want to clobber receipts for more recent events, so we - # have to compare orderings of existing receipts - sql = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " INNER JOIN receipts_linearized as r USING (event_id, room_id)" - " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" - ) - - txn.execute(sql, (room_id, receipt_type, user_id)) - - if topological_ordering: - for to, so, _ in txn: - if int(to) > topological_ordering: - return False - elif int(to) == topological_ordering and int(so) >= stream_ordering: - return False - self._simple_delete_txn( txn, table="receipts_linearized", @@ -406,7 +407,7 @@ class ReceiptsStore(ReceiptsWorkerStore): } ) - if receipt_type == "m.read" and topological_ordering: + if receipt_type == "m.read" and stream_ordering is not None: self._remove_old_push_actions_before_txn( txn, room_id=room_id, diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index a530e29f43..c241167fbe 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -36,6 +36,7 @@ class RegistrationWorkerStore(SQLBaseStore): retcols=[ "name", "password_hash", "is_guest", "consent_version", "consent_server_notice_sent", + "appservice_id", ], allow_none=True, desc="get_user_by_id", @@ -101,6 +102,13 @@ class RegistrationStore(RegistrationWorkerStore, columns=["user_id", "device_id"], ) + self.register_background_index_update( + "users_creation_ts", + index_name="users_creation_ts", + table="users", + columns=["creation_ts"], + ) + # we no longer use refresh tokens, but it's possible that some people # might have a background update queued to build this index. Just # clear the background update. @@ -485,6 +493,35 @@ class RegistrationStore(RegistrationWorkerStore, ret = yield self.runInteraction("count_users", _count_users) defer.returnValue(ret) + def count_daily_user_type(self): + """ + Counts 1) native non guest users + 2) native guests users + 3) bridged users + who registered on the homeserver in the past 24 hours + """ + def _count_daily_user_type(txn): + yesterday = int(self._clock.time()) - (60 * 60 * 24) + + sql = """ + SELECT user_type, COALESCE(count(*), 0) AS count FROM ( + SELECT + CASE + WHEN is_guest=0 AND appservice_id IS NULL THEN 'native' + WHEN is_guest=1 AND appservice_id IS NULL THEN 'guest' + WHEN is_guest=0 AND appservice_id IS NOT NULL THEN 'bridged' + END AS user_type + FROM users + WHERE creation_ts > ? + ) AS t GROUP BY user_type + """ + results = {'native': 0, 'guest': 0, 'bridged': 0} + txn.execute(sql, (yesterday,)) + for row in txn: + results[row[0]] = row[1] + return results + return self.runInteraction("count_daily_user_type", _count_daily_user_type) + @defer.inlineCallbacks def count_nonbridged_users(self): def _count_users(txn): diff --git a/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql b/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql new file mode 100644 index 0000000000..c93ae47532 --- /dev/null +++ b/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql @@ -0,0 +1,19 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +INSERT into background_updates (update_name, progress_json) + VALUES ('users_creation_ts', '{}'); diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 6ba3e59889..f0fa5d7631 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -18,13 +18,14 @@ import logging import re import simplejson as json +from six import string_types + from twisted.internet import defer from .background_updates import BackgroundUpdateStore from synapse.api.errors import SynapseError from synapse.storage.engines import PostgresEngine, Sqlite3Engine - logger = logging.getLogger(__name__) SearchEntry = namedtuple('SearchEntry', [ @@ -126,7 +127,7 @@ class SearchStore(BackgroundUpdateStore): # skip over it. continue - if not isinstance(value, basestring): + if not isinstance(value, string_types): # If the event body, name or topic isn't a string # then skip over it continue @@ -447,7 +448,7 @@ class SearchStore(BackgroundUpdateStore): "search_msgs", self.cursor_to_dict, sql, *args ) - results = filter(lambda row: row["room_id"] in room_ids, results) + results = list(filter(lambda row: row["room_id"] in room_ids, results)) events = yield self._get_events([r["event_id"] for r in results]) @@ -602,7 +603,7 @@ class SearchStore(BackgroundUpdateStore): "search_rooms", self.cursor_to_dict, sql, *args ) - results = filter(lambda row: row["room_id"] in room_ids, results) + results = list(filter(lambda row: row["room_id"] in room_ids, results)) events = yield self._get_events([r["event_id"] for r in results]) diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 9e6eaaa532..25922e5a9c 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -14,6 +14,7 @@ # limitations under the License. from twisted.internet import defer +import six from ._base import SQLBaseStore @@ -21,6 +22,13 @@ from unpaddedbase64 import encode_base64 from synapse.crypto.event_signing import compute_event_reference_hash from synapse.util.caches.descriptors import cached, cachedList +# py2 sqlite has buffer hardcoded as only binary type, so we must use it, +# despite being deprecated and removed in favor of memoryview +if six.PY2: + db_binary_type = buffer +else: + db_binary_type = memoryview + class SignatureWorkerStore(SQLBaseStore): @cached() @@ -56,7 +64,7 @@ class SignatureWorkerStore(SQLBaseStore): for e_id, h in hashes.items() } - defer.returnValue(hashes.items()) + defer.returnValue(list(hashes.items())) def _get_event_reference_hashes_txn(self, txn, event_id): """Get all the hashes for a given PDU. @@ -91,7 +99,7 @@ class SignatureStore(SignatureWorkerStore): vals.append({ "event_id": event.event_id, "algorithm": ref_alg, - "hash": buffer(ref_hash_bytes), + "hash": db_binary_type(ref_hash_bytes), }) self._simple_insert_many_txn( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 63b6834202..3b87d981b5 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -16,6 +16,9 @@ from collections import namedtuple import logging +from six import iteritems, itervalues +from six.moves import range + from twisted.internet import defer from synapse.storage.background_updates import BackgroundUpdateStore @@ -134,7 +137,7 @@ class StateGroupWorkerStore(SQLBaseStore): event_ids, ) - groups = set(event_to_groups.itervalues()) + groups = set(itervalues(event_to_groups)) group_to_state = yield self._get_state_for_groups(groups) defer.returnValue(group_to_state) @@ -166,18 +169,18 @@ class StateGroupWorkerStore(SQLBaseStore): state_event_map = yield self.get_events( [ - ev_id for group_ids in group_to_ids.itervalues() - for ev_id in group_ids.itervalues() + ev_id for group_ids in itervalues(group_to_ids) + for ev_id in itervalues(group_ids) ], get_prev_content=False ) defer.returnValue({ group: [ - state_event_map[v] for v in event_id_map.itervalues() + state_event_map[v] for v in itervalues(event_id_map) if v in state_event_map ] - for group, event_id_map in group_to_ids.iteritems() + for group, event_id_map in iteritems(group_to_ids) }) @defer.inlineCallbacks @@ -198,7 +201,7 @@ class StateGroupWorkerStore(SQLBaseStore): """ results = {} - chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)] + chunks = [groups[i:i + 100] for i in range(0, len(groups), 100)] for chunk in chunks: res = yield self.runInteraction( "_get_state_groups_from_groups", @@ -390,21 +393,21 @@ class StateGroupWorkerStore(SQLBaseStore): event_ids, ) - groups = set(event_to_groups.itervalues()) + groups = set(itervalues(event_to_groups)) group_to_state = yield self._get_state_for_groups(groups, types) state_event_map = yield self.get_events( - [ev_id for sd in group_to_state.itervalues() for ev_id in sd.itervalues()], + [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)], get_prev_content=False ) event_to_state = { event_id: { k: state_event_map[v] - for k, v in group_to_state[group].iteritems() + for k, v in iteritems(group_to_state[group]) if v in state_event_map } - for event_id, group in event_to_groups.iteritems() + for event_id, group in iteritems(event_to_groups) } defer.returnValue({event: event_to_state[event] for event in event_ids}) @@ -429,12 +432,12 @@ class StateGroupWorkerStore(SQLBaseStore): event_ids, ) - groups = set(event_to_groups.itervalues()) + groups = set(itervalues(event_to_groups)) group_to_state = yield self._get_state_for_groups(groups, types) event_to_state = { event_id: group_to_state[group] - for event_id, group in event_to_groups.iteritems() + for event_id, group in iteritems(event_to_groups) } defer.returnValue({event: event_to_state[event] for event in event_ids}) @@ -562,7 +565,7 @@ class StateGroupWorkerStore(SQLBaseStore): got_all = is_all or not missing_types return { - k: v for k, v in state_dict_ids.iteritems() + k: v for k, v in iteritems(state_dict_ids) if include(k[0], k[1]) }, missing_types, got_all @@ -630,12 +633,12 @@ class StateGroupWorkerStore(SQLBaseStore): # Now we want to update the cache with all the things we fetched # from the database. - for group, group_state_dict in group_to_state_dict.iteritems(): + for group, group_state_dict in iteritems(group_to_state_dict): state_dict = results[group] state_dict.update( ((intern_string(k[0]), intern_string(k[1])), to_ascii(v)) - for k, v in group_state_dict.iteritems() + for k, v in iteritems(group_state_dict) ) self._state_group_cache.update( @@ -722,7 +725,7 @@ class StateGroupWorkerStore(SQLBaseStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in delta_ids.iteritems() + for key, state_id in iteritems(delta_ids) ], ) else: @@ -737,7 +740,7 @@ class StateGroupWorkerStore(SQLBaseStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in current_state_ids.iteritems() + for key, state_id in iteritems(current_state_ids) ], ) @@ -862,11 +865,11 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): "state_group": state_group_id, "event_id": event_id, } - for event_id, state_group_id in state_groups.iteritems() + for event_id, state_group_id in iteritems(state_groups) ], ) - for event_id, state_group_id in state_groups.iteritems(): + for event_id, state_group_id in iteritems(state_groups): txn.call_after( self._get_state_group_for_event.prefill, (event_id,), state_group_id @@ -894,7 +897,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): def reindex_txn(txn): new_last_state_group = last_state_group - for count in xrange(batch_size): + for count in range(batch_size): txn.execute( "SELECT id, room_id FROM state_groups" " WHERE ? < id AND id <= ?" @@ -952,7 +955,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): # of keys delta_state = { - key: value for key, value in curr_state.iteritems() + key: value for key, value in iteritems(curr_state) if prev_state.get(key, None) != value } @@ -992,7 +995,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in delta_state.iteritems() + for key, state_id in iteritems(delta_state) ], ) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index f825264ea9..e485d19b84 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -17,6 +17,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached from twisted.internet import defer +import six from canonicaljson import encode_canonical_json @@ -25,6 +26,13 @@ from collections import namedtuple import logging import simplejson as json +# py2 sqlite has buffer hardcoded as only binary type, so we must use it, +# despite being deprecated and removed in favor of memoryview +if six.PY2: + db_binary_type = buffer +else: + db_binary_type = memoryview + logger = logging.getLogger(__name__) @@ -110,7 +118,7 @@ class TransactionStore(SQLBaseStore): "transaction_id": transaction_id, "origin": origin, "response_code": code, - "response_json": buffer(encode_canonical_json(response_dict)), + "response_json": db_binary_type(encode_canonical_json(response_dict)), "ts": self._clock.time_msec(), }, or_ignore=True, diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index d6e289ffbe..275c299998 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -22,6 +22,8 @@ from synapse.api.constants import EventTypes, JoinRules from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import get_domain_from_id, get_localpart_from_id +from six import iteritems + import re import logging @@ -100,7 +102,7 @@ class UserDirectoryStore(SQLBaseStore): user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id), profile.display_name, ) - for user_id, profile in users_with_profile.iteritems() + for user_id, profile in iteritems(users_with_profile) ) elif isinstance(self.database_engine, Sqlite3Engine): sql = """ @@ -112,7 +114,7 @@ class UserDirectoryStore(SQLBaseStore): user_id, "%s %s" % (user_id, p.display_name,) if p.display_name else user_id ) - for user_id, p in users_with_profile.iteritems() + for user_id, p in iteritems(users_with_profile) ) else: # This should be unreachable. @@ -130,7 +132,7 @@ class UserDirectoryStore(SQLBaseStore): "display_name": profile.display_name, "avatar_url": profile.avatar_url, } - for user_id, profile in users_with_profile.iteritems() + for user_id, profile in iteritems(users_with_profile) ] ) for user_id in users_with_profile: |