diff options
Diffstat (limited to 'synapse')
35 files changed, 234 insertions, 136 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index 353387f154..5bada5e290 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.30.0-rc1" +__version__ = "0.30.0" diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 824f4a42e3..29ae086786 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -20,6 +20,8 @@ from frozendict import frozendict import re +from six import string_types + # Split strings on "." but not "\." This uses a negative lookbehind assertion for '\' # (?<!stuff) matches if the current position in the string is not preceded # by a match for 'stuff'. @@ -277,7 +279,7 @@ def serialize_event(e, time_now_ms, as_client_event=True, if only_event_fields: if (not isinstance(only_event_fields, list) or - not all(isinstance(f, basestring) for f in only_event_fields)): + not all(isinstance(f, string_types) for f in only_event_fields)): raise TypeError("only_event_fields must be a list of strings") d = only_fields(d, only_event_fields) diff --git a/synapse/events/validator.py b/synapse/events/validator.py index 2f4c8a1018..e0e5bf818c 100644 --- a/synapse/events/validator.py +++ b/synapse/events/validator.py @@ -17,6 +17,8 @@ from synapse.types import EventID, RoomID, UserID from synapse.api.errors import SynapseError from synapse.api.constants import EventTypes, Membership +from six import string_types + class EventValidator(object): @@ -49,7 +51,7 @@ class EventValidator(object): strings.append("state_key") for s in strings: - if not isinstance(getattr(event, s), basestring): + if not isinstance(getattr(event, s), string_types): raise SynapseError(400, "Not '%s' a string type" % (s,)) if event.type == EventTypes.Member: @@ -88,5 +90,5 @@ class EventValidator(object): for s in keys: if s not in d: raise SynapseError(400, "'%s' not in content" % (s,)) - if not isinstance(d[s], basestring): + if not isinstance(d[s], string_types): raise SynapseError(400, "Not '%s' a string type" % (s,)) diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 2d95b04e0c..62d20ad130 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -20,6 +20,8 @@ from synapse.api.errors import SynapseError from synapse.types import GroupID, RoomID, UserID, get_domain_from_id from twisted.internet import defer +from six import string_types + logger = logging.getLogger(__name__) @@ -431,7 +433,7 @@ class GroupsServerHandler(object): "long_description"): if keyname in content: value = content[keyname] - if not isinstance(value, basestring): + if not isinstance(value, string_types): raise SynapseError(400, "%r value is not a string" % (keyname,)) profile[keyname] = value diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index d58ea6c650..c5e92f6214 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -30,6 +30,7 @@ class DeactivateAccountHandler(BaseHandler): self._auth_handler = hs.get_auth_handler() self._device_handler = hs.get_device_handler() self._room_member_handler = hs.get_room_member_handler() + self.user_directory_handler = hs.get_user_directory_handler() # Flag that indicates whether the process to part users from rooms is running self._user_parter_running = False @@ -65,6 +66,9 @@ class DeactivateAccountHandler(BaseHandler): # removal from all the rooms they're a member of) yield self.store.add_user_pending_deactivation(user_id) + # delete from user directory + yield self.user_directory_handler.handle_user_deactivated(user_id) + # Now start the process that goes through that list and # parts users from rooms (if it isn't already running) self._start_user_parting() diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index f7457a7082..31bd0e60c6 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -26,6 +26,8 @@ from ._base import BaseHandler import logging +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -318,7 +320,7 @@ class DeviceHandler(BaseHandler): # The user may have left the room # TODO: Check if they actually did or if we were just invited. if room_id not in room_ids: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -338,7 +340,7 @@ class DeviceHandler(BaseHandler): # special-case for an empty prev state: include all members # in the changed list if not event_ids: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -354,10 +356,10 @@ class DeviceHandler(BaseHandler): # Check if we've joined the room? If so we just blindly add all the users to # the "possibly changed" users. - for state_dict in prev_state_ids.itervalues(): + for state_dict in itervalues(prev_state_ids): member_event = state_dict.get((EventTypes.Member, user_id), None) if not member_event or member_event != current_member_id: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -367,14 +369,14 @@ class DeviceHandler(BaseHandler): # If there has been any change in membership, include them in the # possibly changed list. We'll check if they are joined below, # and we're not toooo worried about spuriously adding users. - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue # check if this member has changed since any of the extremities # at the stream_ordering, and add them to the list if so. - for state_dict in prev_state_ids.itervalues(): + for state_dict in itervalues(prev_state_ids): prev_event_id = state_dict.get(key, None) if not prev_event_id or prev_event_id != event_id: if state_key != user_id: diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 25aec624af..8a2d177539 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -19,6 +19,7 @@ import logging from canonicaljson import encode_canonical_json from twisted.internet import defer +from six import iteritems from synapse.api.errors import ( SynapseError, CodeMessageException, FederationDeniedError, @@ -92,7 +93,7 @@ class E2eKeysHandler(object): remote_queries_not_in_cache = {} if remote_queries: query_list = [] - for user_id, device_ids in remote_queries.iteritems(): + for user_id, device_ids in iteritems(remote_queries): if device_ids: query_list.extend((user_id, device_id) for device_id in device_ids) else: @@ -103,9 +104,9 @@ class E2eKeysHandler(object): query_list ) ) - for user_id, devices in remote_results.iteritems(): + for user_id, devices in iteritems(remote_results): user_devices = results.setdefault(user_id, {}) - for device_id, device in devices.iteritems(): + for device_id, device in iteritems(devices): keys = device.get("keys", None) device_display_name = device.get("device_display_name", None) if keys: @@ -250,9 +251,9 @@ class E2eKeysHandler(object): "Claimed one-time-keys: %s", ",".join(( "%s for %s:%s" % (key_id, user_id, device_id) - for user_id, user_keys in json_result.iteritems() - for device_id, device_keys in user_keys.iteritems() - for key_id, _ in device_keys.iteritems() + for user_id, user_keys in iteritems(json_result) + for device_id, device_keys in iteritems(user_keys) + for key_id, _ in iteritems(device_keys) )), ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ba3ede8024..87c0615820 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -24,6 +24,7 @@ from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json import six from six.moves import http_client +from six import iteritems from twisted.internet import defer from unpaddedbase64 import decode_base64 @@ -479,18 +480,18 @@ class FederationHandler(BaseHandler): # to get all state ids that we're interested in. event_map = yield self.store.get_events([ e_id - for key_to_eid in event_to_state_ids.values() - for key, e_id in key_to_eid.items() + for key_to_eid in event_to_state_ids.itervalues() + for key, e_id in key_to_eid.iteritems() if key[0] != EventTypes.Member or check_match(key[1]) ]) event_to_state = { e_id: { key: event_map[inner_e_id] - for key, inner_e_id in key_to_eid.items() + for key, inner_e_id in key_to_eid.iteritems() if inner_e_id in event_map } - for e_id, key_to_eid in event_to_state_ids.items() + for e_id, key_to_eid in event_to_state_ids.iteritems() } def redact_disallowed(event, state): @@ -505,7 +506,7 @@ class FederationHandler(BaseHandler): # membership states for the requesting server to determine # if the server is either in the room or has been invited # into the room. - for ev in state.values(): + for ev in state.itervalues(): if ev.type != EventTypes.Member: continue try: @@ -751,9 +752,19 @@ class FederationHandler(BaseHandler): curr_state = yield self.state_handler.get_current_state(room_id) def get_domains_from_state(state): + """Get joined domains from state + + Args: + state (dict[tuple, FrozenEvent]): State map from type/state + key to event. + + Returns: + list[tuple[str, int]]: Returns a list of servers with the + lowest depth of their joins. Sorted by lowest depth first. + """ joined_users = [ (state_key, int(event.depth)) - for (e_type, state_key), event in state.items() + for (e_type, state_key), event in state.iteritems() if e_type == EventTypes.Member and event.membership == Membership.JOIN ] @@ -770,7 +781,7 @@ class FederationHandler(BaseHandler): except Exception: pass - return sorted(joined_domains.items(), key=lambda d: d[1]) + return sorted(joined_domains.iteritems(), key=lambda d: d[1]) curr_domains = get_domains_from_state(curr_state) @@ -787,7 +798,7 @@ class FederationHandler(BaseHandler): yield self.backfill( dom, room_id, limit=100, - extremities=[e for e in extremities.keys()] + extremities=extremities, ) # If this succeeded then we probably already have the # appropriate stuff. @@ -833,7 +844,7 @@ class FederationHandler(BaseHandler): tried_domains = set(likely_domains) tried_domains.add(self.server_name) - event_ids = list(extremities.keys()) + event_ids = list(extremities.iterkeys()) logger.debug("calling resolve_state_groups in _maybe_backfill") resolve = logcontext.preserve_fn( @@ -843,31 +854,34 @@ class FederationHandler(BaseHandler): [resolve(room_id, [e]) for e in event_ids], consumeErrors=True, )) + + # dict[str, dict[tuple, str]], a map from event_id to state map of + # event_ids. states = dict(zip(event_ids, [s.state for s in states])) state_map = yield self.store.get_events( - [e_id for ids in states.values() for e_id in ids], + [e_id for ids in states.itervalues() for e_id in ids.itervalues()], get_prev_content=False ) states = { key: { k: state_map[e_id] - for k, e_id in state_dict.items() + for k, e_id in state_dict.iteritems() if e_id in state_map - } for key, state_dict in states.items() + } for key, state_dict in states.iteritems() } for e_id, _ in sorted_extremeties_tuple: likely_domains = get_domains_from_state(states[e_id]) success = yield try_backfill([ - dom for dom in likely_domains + dom for dom, _ in likely_domains if dom not in tried_domains ]) if success: defer.returnValue(True) - tried_domains.update(likely_domains) + tried_domains.update(dom for dom, _ in likely_domains) defer.returnValue(False) @@ -1375,7 +1389,7 @@ class FederationHandler(BaseHandler): ) if state_groups: - _, state = state_groups.items().pop() + _, state = list(iteritems(state_groups)).pop() results = { (e.type, e.state_key): e for e in state } @@ -2021,7 +2035,7 @@ class FederationHandler(BaseHandler): this will not be included in the current_state in the context. """ state_updates = { - k: a.event_id for k, a in auth_events.iteritems() + k: a.event_id for k, a in iteritems(auth_events) if k != event_key } context.current_state_ids = dict(context.current_state_ids) @@ -2031,7 +2045,7 @@ class FederationHandler(BaseHandler): context.delta_ids.update(state_updates) context.prev_state_ids = dict(context.prev_state_ids) context.prev_state_ids.update({ - k: a.event_id for k, a in auth_events.iteritems() + k: a.event_id for k, a in iteritems(auth_events) }) context.state_group = yield self.store.store_state_group( event.event_id, @@ -2083,7 +2097,7 @@ class FederationHandler(BaseHandler): def get_next(it, opt=None): try: - return it.next() + return next(it) except Exception: return opt diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 977993e7d4..dcae083734 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -15,6 +15,7 @@ # limitations under the License. from twisted.internet import defer +from six import iteritems from synapse.api.errors import SynapseError from synapse.types import get_domain_from_id @@ -449,7 +450,7 @@ class GroupsLocalHandler(object): results = {} failed_results = [] - for destination, dest_user_ids in destinations.iteritems(): + for destination, dest_user_ids in iteritems(destinations): try: r = yield self.transport_client.bulk_get_publicised_groups( destination, list(dest_user_ids), diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c32b9bcae4..81cff0870e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -19,6 +19,7 @@ import sys from canonicaljson import encode_canonical_json import six +from six import string_types, itervalues, iteritems from twisted.internet import defer, reactor from twisted.internet.defer import succeed from twisted.python.failure import Failure @@ -402,7 +403,7 @@ class MessageHandler(BaseHandler): "avatar_url": profile.avatar_url, "display_name": profile.display_name, } - for user_id, profile in users_with_profile.iteritems() + for user_id, profile in iteritems(users_with_profile) }) @@ -667,7 +668,7 @@ class EventCreationHandler(object): spam_error = self.spam_checker.check_event_for_spam(event) if spam_error: - if not isinstance(spam_error, basestring): + if not isinstance(spam_error, string_types): spam_error = "Spam is not permitted here" raise SynapseError( 403, spam_error, Codes.FORBIDDEN @@ -881,7 +882,7 @@ class EventCreationHandler(object): state_to_include_ids = [ e_id - for k, e_id in context.current_state_ids.iteritems() + for k, e_id in iteritems(context.current_state_ids) if k[0] in self.hs.config.room_invite_state_types or k == (EventTypes.Member, event.sender) ] @@ -895,7 +896,7 @@ class EventCreationHandler(object): "content": e.content, "sender": e.sender, } - for e in state_to_include.itervalues() + for e in itervalues(state_to_include) ] invitee = UserID.from_string(event.state_key) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 500a131874..13b343c470 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -25,6 +25,8 @@ The methods that define policy are: from twisted.internet import defer, reactor from contextlib import contextmanager +from six import itervalues, iteritems + from synapse.api.errors import SynapseError from synapse.api.constants import PresenceState from synapse.storage.presence import UserPresenceState @@ -40,7 +42,6 @@ import synapse.metrics import logging - logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) @@ -530,7 +531,7 @@ class PresenceHandler(object): prev_state.copy_and_replace( last_user_sync_ts=time_now_ms, ) - for prev_state in prev_states.itervalues() + for prev_state in itervalues(prev_states) ]) self.external_process_last_updated_ms.pop(process_id, None) @@ -553,14 +554,14 @@ class PresenceHandler(object): for user_id in user_ids } - missing = [user_id for user_id, state in states.iteritems() if not state] + missing = [user_id for user_id, state in iteritems(states) if not state] if missing: # There are things not in our in memory cache. Lets pull them out of # the database. res = yield self.store.get_presence_for_users(missing) states.update(res) - missing = [user_id for user_id, state in states.iteritems() if not state] + missing = [user_id for user_id, state in iteritems(states) if not state] if missing: new = { user_id: UserPresenceState.default(user_id) @@ -1048,7 +1049,7 @@ class PresenceEventSource(object): defer.returnValue((updates.values(), max_token)) else: defer.returnValue(([ - s for s in updates.itervalues() + s for s in itervalues(updates) if s.state != PresenceState.OFFLINE ], max_token)) @@ -1305,11 +1306,11 @@ def get_interested_remotes(store, states, state_handler): # hosts in those rooms. room_ids_to_states, users_to_states = yield get_interested_parties(store, states) - for room_id, states in room_ids_to_states.iteritems(): + for room_id, states in iteritems(room_ids_to_states): hosts = yield state_handler.get_current_hosts_in_room(room_id) hosts_and_states.append((hosts, states)) - for user_id, states in users_to_states.iteritems(): + for user_id, states in iteritems(users_to_states): host = get_domain_from_id(user_id) hosts_and_states.append(([host], states)) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 263e42dded..d0c99c35e3 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -28,6 +28,8 @@ import collections import logging import itertools +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -275,7 +277,7 @@ class SyncHandler(object): # result returned by the event source is poor form (it might cache # the object) room_id = event["room_id"] - event_copy = {k: v for (k, v) in event.iteritems() + event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) @@ -294,7 +296,7 @@ class SyncHandler(object): for event in receipts: room_id = event["room_id"] # exclude room id, as above - event_copy = {k: v for (k, v) in event.iteritems() + event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) @@ -325,7 +327,7 @@ class SyncHandler(object): current_state_ids = frozenset() if any(e.is_state() for e in recents): current_state_ids = yield self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(current_state_ids.itervalues()) + current_state_ids = frozenset(itervalues(current_state_ids)) recents = yield filter_events_for_client( self.store, @@ -382,7 +384,7 @@ class SyncHandler(object): current_state_ids = frozenset() if any(e.is_state() for e in loaded_recents): current_state_ids = yield self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(current_state_ids.itervalues()) + current_state_ids = frozenset(itervalues(current_state_ids)) loaded_recents = yield filter_events_for_client( self.store, @@ -984,7 +986,7 @@ class SyncHandler(object): if since_token: for joined_sync in sync_result_builder.joined: it = itertools.chain( - joined_sync.timeline.events, joined_sync.state.itervalues() + joined_sync.timeline.events, itervalues(joined_sync.state) ) for event in it: if event.type == EventTypes.Member: @@ -1062,7 +1064,7 @@ class SyncHandler(object): newly_left_rooms = [] room_entries = [] invited = [] - for room_id, events in mem_change_events_by_room_id.iteritems(): + for room_id, events in iteritems(mem_change_events_by_room_id): non_joins = [e for e in events if e.membership != Membership.JOIN] has_join = len(non_joins) != len(events) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 714f0195c8..a39f0f7343 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -22,6 +22,7 @@ from synapse.util.metrics import Measure from synapse.util.async import sleep from synapse.types import get_localpart_from_id +from six import iteritems logger = logging.getLogger(__name__) @@ -123,6 +124,13 @@ class UserDirectoryHandler(object): ) @defer.inlineCallbacks + def handle_user_deactivated(self, user_id): + """Called when a user ID is deactivated + """ + yield self.store.remove_from_user_dir(user_id) + yield self.store.remove_from_user_in_public_room(user_id) + + @defer.inlineCallbacks def _unsafe_process(self): # If self.pos is None then means we haven't fetched it from DB if self.pos is None: @@ -403,7 +411,7 @@ class UserDirectoryHandler(object): if change: users_with_profile = yield self.state.get_current_user_in_room(room_id) - for user_id, profile in users_with_profile.iteritems(): + for user_id, profile in iteritems(users_with_profile): yield self._handle_new_user(room_id, user_id, profile) else: users = yield self.store.get_users_in_public_due_to_room(room_id) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 4b2b85464d..4650f43029 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -42,6 +42,8 @@ import random import sys import urllib from six.moves.urllib import parse as urlparse +from six import string_types + logger = logging.getLogger(__name__) outbound_logger = logging.getLogger("synapse.http.outbound") @@ -553,7 +555,7 @@ class MatrixFederationHttpClient(object): encoded_args = {} for k, vs in args.items(): - if isinstance(vs, basestring): + if isinstance(vs, string_types): vs = [vs] encoded_args[k] = [v.encode("UTF-8") for v in vs] @@ -668,7 +670,7 @@ def check_content_type_is_json(headers): RuntimeError if the """ - c_type = headers.getRawHeaders("Content-Type") + c_type = headers.getRawHeaders(b"Content-Type") if c_type is None: raise RuntimeError( "No Content-Type header" @@ -685,7 +687,7 @@ def check_content_type_is_json(headers): def encode_query_args(args): encoded_args = {} for k, vs in args.items(): - if isinstance(vs, basestring): + if isinstance(vs, string_types): vs = [vs] encoded_args[k] = [v.encode("UTF-8") for v in vs] diff --git a/synapse/http/site.py b/synapse/http/site.py index b608504225..23c1b76922 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -56,7 +56,7 @@ class SynapseRequest(Request): def __repr__(self): # We overwrite this so that we don't log ``access_token`` - return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % ( + return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % ( self.__class__.__name__, id(self), self.method, diff --git a/synapse/metrics/process_collector.py b/synapse/metrics/process_collector.py index 6fec3de399..50e5b48a2b 100644 --- a/synapse/metrics/process_collector.py +++ b/synapse/metrics/process_collector.py @@ -15,6 +15,7 @@ import os +from six import iteritems TICKS_PER_SEC = 100 BYTES_PER_PAGE = 4096 @@ -55,7 +56,7 @@ def update_resource_metrics(): # line is PID (command) more stats go here ... raw_stats = line.split(") ", 1)[1].split(" ") - for (name, index) in STAT_FIELDS.iteritems(): + for (name, index) in iteritems(STAT_FIELDS): # subtract 3 from the index, because proc(5) is 1-based, and # we've lost the first two fields in PID and COMMAND above stats[name] = int(raw_stats[index - 3]) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 7c680659b6..2f7e77f5f5 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -30,6 +30,7 @@ from synapse.state import POWER_KEY from collections import namedtuple +from six import itervalues, iteritems logger = logging.getLogger(__name__) @@ -126,7 +127,7 @@ class BulkPushRuleEvaluator(object): ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { - (e.type, e.state_key): e for e in auth_events.itervalues() + (e.type, e.state_key): e for e in itervalues(auth_events) } sender_level = get_user_power_level(event.sender, auth_events) @@ -160,7 +161,7 @@ class BulkPushRuleEvaluator(object): condition_cache = {} - for uid, rules in rules_by_user.iteritems(): + for uid, rules in iteritems(rules_by_user): if event.sender == uid: continue @@ -406,7 +407,7 @@ class RulesForRoom(object): # If the event is a join event then it will be in current state evnts # map but not in the DB, so we have to explicitly insert it. if event.type == EventTypes.Member: - for event_id in member_event_ids.itervalues(): + for event_id in itervalues(member_event_ids): if event_id == event.event_id: members[event_id] = (event.state_key, event.membership) @@ -414,7 +415,7 @@ class RulesForRoom(object): logger.debug("Found members %r: %r", self.room_id, members.values()) interested_in_user_ids = set( - user_id for user_id, membership in members.itervalues() + user_id for user_id, membership in itervalues(members) if membership == Membership.JOIN ) @@ -426,7 +427,7 @@ class RulesForRoom(object): ) user_ids = set( - uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher + uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher ) logger.debug("With pushers: %r", user_ids) @@ -447,7 +448,7 @@ class RulesForRoom(object): ) ret_rules_by_user.update( - item for item in rules_by_user.iteritems() if item[0] is not None + item for item in iteritems(rules_by_user) if item[0] is not None ) self.update_cache(sequence, members, ret_rules_by_user, state_group) diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 3601f2d365..d55efde8cc 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -21,6 +21,8 @@ from synapse.types import UserID from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache from synapse.util.caches.lrucache import LruCache +from six import string_types + logger = logging.getLogger(__name__) @@ -238,7 +240,7 @@ def _flatten_dict(d, prefix=[], result=None): if result is None: result = {} for key, value in d.items(): - if isinstance(value, basestring): + if isinstance(value, string_types): result[".".join(prefix + [key])] = value.lower() elif hasattr(value, "items"): _flatten_dict(value, prefix=(prefix + [key]), result=result) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index d7d38464b2..7ca1588f6a 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -68,6 +68,7 @@ import synapse.metrics import struct import fcntl +from six import iterkeys, iteritems metrics = synapse.metrics.get_metrics_for(__name__) @@ -392,7 +393,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): if stream_name == "ALL": # Subscribe to all streams we're publishing to. - for stream in self.streamer.streams_by_name.iterkeys(): + for stream in iterkeys(self.streamer.streams_by_name): self.subscribe_to_stream(stream, token) else: self.subscribe_to_stream(stream_name, token) @@ -498,7 +499,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): BaseReplicationStreamProtocol.connectionMade(self) # Once we've connected subscribe to the necessary streams - for stream_name, token in self.handler.get_streams_to_replicate().iteritems(): + for stream_name, token in iteritems(self.handler.get_streams_to_replicate()): self.replicate(stream_name, token) # Tell the server if we have any users currently syncing (should only @@ -633,7 +634,7 @@ metrics.register_callback( lambda: { (k[0], p.name, p.conn_id): count for p in connected_connections - for k, count in p.inbound_commands_counter.counts.iteritems() + for k, count in iteritems(p.inbound_commands_counter.counts) }, labels=["command", "name", "conn_id"], ) @@ -643,7 +644,7 @@ metrics.register_callback( lambda: { (k[0], p.name, p.conn_id): count for p in connected_connections - for k, count in p.outbound_commands_counter.counts.iteritems() + for k, count in iteritems(p.outbound_commands_counter.counts) }, labels=["command", "name", "conn_id"], ) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index a603c520ea..1969072ab9 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -26,6 +26,7 @@ from synapse.util.metrics import Measure, measure_func import logging import synapse.metrics +from six import itervalues metrics = synapse.metrics.get_metrics_for(__name__) stream_updates_counter = metrics.register_counter( @@ -80,7 +81,7 @@ class ReplicationStreamer(object): # We only support federation stream if federation sending hase been # disabled on the master. self.streams = [ - stream(hs) for stream in STREAMS_MAP.itervalues() + stream(hs) for stream in itervalues(STREAMS_MAP) if stream != FederationStream or not hs.config.send_federation ] diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index 4a73813c58..647994bd53 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -23,6 +23,8 @@ from synapse.handlers.presence import format_user_presence_state from synapse.http.servlet import parse_json_object_from_request from .base import ClientV1RestServlet, client_path_patterns +from six import string_types + import logging logger = logging.getLogger(__name__) @@ -71,7 +73,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet): if "status_msg" in content: state["status_msg"] = content.pop("status_msg") - if not isinstance(state["status_msg"], basestring): + if not isinstance(state["status_msg"], string_types): raise SynapseError(400, "status_msg must be a string.") if content: @@ -129,7 +131,7 @@ class PresenceListRestServlet(ClientV1RestServlet): if "invite" in content: for u in content["invite"]: - if not isinstance(u, basestring): + if not isinstance(u, string_types): raise SynapseError(400, "Bad invite value.") if len(u) == 0: continue @@ -140,7 +142,7 @@ class PresenceListRestServlet(ClientV1RestServlet): if "drop" in content: for u in content["drop"]: - if not isinstance(u, basestring): + if not isinstance(u, string_types): raise SynapseError(400, "Bad drop value.") if len(u) == 0: continue diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 9800ce7581..2ac767d2dc 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -48,6 +48,7 @@ import shutil import cgi import logging from six.moves.urllib import parse as urlparse +from six import iteritems logger = logging.getLogger(__name__) @@ -603,7 +604,7 @@ class MediaRepository(object): thumbnails[(t_width, t_height, r_type)] = r_method # Now we generate the thumbnails for each dimension, store it - for (t_width, t_height, t_type), t_method in thumbnails.iteritems(): + for (t_width, t_height, t_type), t_method in iteritems(thumbnails): # Generate the thumbnail if t_method == "crop": t_byte_source = yield make_deferred_yieldable(threads.deferToThread( diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 2839207abc..565cef2b8d 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -24,7 +24,9 @@ import shutil import sys import traceback import simplejson as json -import urlparse + +from six.moves import urllib_parse as urlparse +from six import string_types from twisted.web.server import NOT_DONE_YET from twisted.internet import defer @@ -590,8 +592,8 @@ def _iterate_over_text(tree, *tags_to_ignore): # to be returned. elements = iter([tree]) while True: - el = elements.next() - if isinstance(el, basestring): + el = next(elements) + if isinstance(el, string_types): yield el elif el is not None and el.tag not in tags_to_ignore: # el.text is the text before the first child, so we can immediately diff --git a/synapse/state.py b/synapse/state.py index 26093c8434..b8c27c6815 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -32,6 +32,8 @@ from frozendict import frozendict import logging import hashlib +from six import iteritems, itervalues + logger = logging.getLogger(__name__) @@ -132,7 +134,7 @@ class StateHandler(object): state_map = yield self.store.get_events(state.values(), get_prev_content=False) state = { - key: state_map[e_id] for key, e_id in state.iteritems() if e_id in state_map + key: state_map[e_id] for key, e_id in iteritems(state) if e_id in state_map } defer.returnValue(state) @@ -338,7 +340,7 @@ class StateHandler(object): ) if len(state_groups_ids) == 1: - name, state_list = state_groups_ids.items().pop() + name, state_list = list(state_groups_ids.items()).pop() prev_group, delta_ids = yield self.store.get_state_group_delta(name) @@ -378,7 +380,7 @@ class StateHandler(object): new_state = resolve_events_with_state_map(state_set_ids, state_map) new_state = { - key: state_map[ev_id] for key, ev_id in new_state.iteritems() + key: state_map[ev_id] for key, ev_id in iteritems(new_state) } return new_state @@ -458,15 +460,15 @@ class StateResolutionHandler(object): # build a map from state key to the event_ids which set that state. # dict[(str, str), set[str]) state = {} - for st in state_groups_ids.itervalues(): - for key, e_id in st.iteritems(): + for st in itervalues(state_groups_ids): + for key, e_id in iteritems(st): state.setdefault(key, set()).add(e_id) # build a map from state key to the event_ids which set that state, # including only those where there are state keys in conflict. conflicted_state = { k: list(v) - for k, v in state.iteritems() + for k, v in iteritems(state) if len(v) > 1 } @@ -474,13 +476,13 @@ class StateResolutionHandler(object): logger.info("Resolving conflicted state for %r", room_id) with Measure(self.clock, "state._resolve_events"): new_state = yield resolve_events_with_factory( - state_groups_ids.values(), + list(state_groups_ids.values()), event_map=event_map, state_map_factory=state_map_factory, ) else: new_state = { - key: e_ids.pop() for key, e_ids in state.iteritems() + key: e_ids.pop() for key, e_ids in iteritems(state) } with Measure(self.clock, "state.create_group_ids"): @@ -489,8 +491,8 @@ class StateResolutionHandler(object): # which will be used as a cache key for future resolutions, but # not get persisted. state_group = None - new_state_event_ids = frozenset(new_state.itervalues()) - for sg, events in state_groups_ids.iteritems(): + new_state_event_ids = frozenset(itervalues(new_state)) + for sg, events in iteritems(state_groups_ids): if new_state_event_ids == frozenset(e_id for e_id in events): state_group = sg break @@ -501,11 +503,11 @@ class StateResolutionHandler(object): prev_group = None delta_ids = None - for old_group, old_ids in state_groups_ids.iteritems(): + for old_group, old_ids in iteritems(state_groups_ids): if not set(new_state) - set(old_ids): n_delta_ids = { k: v - for k, v in new_state.iteritems() + for k, v in iteritems(new_state) if old_ids.get(k) != v } if not delta_ids or len(n_delta_ids) < len(delta_ids): @@ -527,7 +529,7 @@ class StateResolutionHandler(object): def _ordered_events(events): def key_func(e): - return -int(e.depth), hashlib.sha1(e.event_id).hexdigest() + return -int(e.depth), hashlib.sha1(e.event_id.encode()).hexdigest() return sorted(events, key=key_func) @@ -584,7 +586,7 @@ def _seperate(state_sets): conflicted_state = {} for state_set in state_sets[1:]: - for key, value in state_set.iteritems(): + for key, value in iteritems(state_set): # Check if there is an unconflicted entry for the state key. unconflicted_value = unconflicted_state.get(key) if unconflicted_value is None: @@ -640,7 +642,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): needed_events = set( event_id - for event_ids in conflicted_state.itervalues() + for event_ids in itervalues(conflicted_state) for event_id in event_ids ) if event_map is not None: @@ -662,7 +664,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): unconflicted_state, conflicted_state, state_map ) - new_needed_events = set(auth_events.itervalues()) + new_needed_events = set(itervalues(auth_events)) new_needed_events -= needed_events if event_map is not None: new_needed_events -= set(event_map.iterkeys()) @@ -679,7 +681,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_map): auth_events = {} - for event_ids in conflicted_state.itervalues(): + for event_ids in itervalues(conflicted_state): for event_id in event_ids: if event_id in state_map: keys = event_auth.auth_types_for_event(state_map[event_id]) @@ -694,7 +696,7 @@ def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_ma def _resolve_with_state(unconflicted_state_ids, conflicted_state_ds, auth_event_ids, state_map): conflicted_state = {} - for key, event_ids in conflicted_state_ds.iteritems(): + for key, event_ids in iteritems(conflicted_state_ds): events = [state_map[ev_id] for ev_id in event_ids if ev_id in state_map] if len(events) > 1: conflicted_state[key] = events @@ -703,7 +705,7 @@ def _resolve_with_state(unconflicted_state_ids, conflicted_state_ds, auth_event_ auth_events = { key: state_map[ev_id] - for key, ev_id in auth_event_ids.iteritems() + for key, ev_id in iteritems(auth_event_ids) if ev_id in state_map } @@ -716,7 +718,7 @@ def _resolve_with_state(unconflicted_state_ids, conflicted_state_ds, auth_event_ raise new_state = unconflicted_state_ids - for key, event in resolved_state.iteritems(): + for key, event in iteritems(resolved_state): new_state[key] = event.event_id return new_state @@ -741,7 +743,7 @@ def _resolve_state_events(conflicted_state, auth_events): auth_events.update(resolved_state) - for key, events in conflicted_state.iteritems(): + for key, events in iteritems(conflicted_state): if key[0] == EventTypes.JoinRules: logger.debug("Resolving conflicted join rules %r", events) resolved_state[key] = _resolve_auth_events( @@ -751,7 +753,7 @@ def _resolve_state_events(conflicted_state, auth_events): auth_events.update(resolved_state) - for key, events in conflicted_state.iteritems(): + for key, events in iteritems(conflicted_state): if key[0] == EventTypes.Member: logger.debug("Resolving conflicted member lists %r", events) resolved_state[key] = _resolve_auth_events( @@ -761,7 +763,7 @@ def _resolve_state_events(conflicted_state, auth_events): auth_events.update(resolved_state) - for key, events in conflicted_state.iteritems(): + for key, events in iteritems(conflicted_state): if key not in resolved_state: logger.debug("Resolving conflicted state %r:%r", key, events) resolved_state[key] = _resolve_normal_events( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2262776ab2..6133d305bd 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -27,9 +27,17 @@ import sys import time import threading +from six import itervalues, iterkeys, iteritems +from six.moves import intern, range logger = logging.getLogger(__name__) +try: + MAX_TXN_ID = sys.maxint - 1 +except AttributeError: + # python 3 does not have a maximum int value + MAX_TXN_ID = 2**63 - 1 + sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") perf_logger = logging.getLogger("synapse.storage.TIME") @@ -137,7 +145,7 @@ class PerformanceCounters(object): def interval(self, interval_duration, limit=3): counters = [] - for name, (count, cum_time) in self.current_counters.iteritems(): + for name, (count, cum_time) in iteritems(self.current_counters): prev_count, prev_time = self.previous_counters.get(name, (0, 0)) counters.append(( (cum_time - prev_time) / interval_duration, @@ -222,7 +230,7 @@ class SQLBaseStore(object): # We don't really need these to be unique, so lets stop it from # growing really large. - self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1) + self._TXN_ID = (self._TXN_ID + 1) % (MAX_TXN_ID) name = "%s-%x" % (desc, txn_id, ) @@ -543,7 +551,7 @@ class SQLBaseStore(object): ", ".join("%s = ?" % (k,) for k in values), " AND ".join("%s = ?" % (k,) for k in keyvalues) ) - sqlargs = values.values() + keyvalues.values() + sqlargs = list(values.values()) + list(keyvalues.values()) txn.execute(sql, sqlargs) if txn.rowcount > 0: @@ -561,7 +569,7 @@ class SQLBaseStore(object): ", ".join(k for k in allvalues), ", ".join("?" for _ in allvalues) ) - txn.execute(sql, allvalues.values()) + txn.execute(sql, list(allvalues.values())) # successfully inserted return True @@ -629,8 +637,8 @@ class SQLBaseStore(object): } if keyvalues: - sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) - txn.execute(sql, keyvalues.values()) + sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)) + txn.execute(sql, list(keyvalues.values())) else: txn.execute(sql) @@ -694,7 +702,7 @@ class SQLBaseStore(object): table, " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) - txn.execute(sql, keyvalues.values()) + txn.execute(sql, list(keyvalues.values())) else: sql = "SELECT %s FROM %s" % ( ", ".join(retcols), @@ -725,9 +733,12 @@ class SQLBaseStore(object): if not iterable: defer.returnValue(results) + # iterables can not be sliced, so convert it to a list first + it_list = list(iterable) + chunks = [ - iterable[i:i + batch_size] - for i in xrange(0, len(iterable), batch_size) + it_list[i:i + batch_size] + for i in range(0, len(it_list), batch_size) ] for chunk in chunks: rows = yield self.runInteraction( @@ -767,7 +778,7 @@ class SQLBaseStore(object): ) values.extend(iterable) - for key, value in keyvalues.iteritems(): + for key, value in iteritems(keyvalues): clauses.append("%s = ?" % (key,)) values.append(value) @@ -790,7 +801,7 @@ class SQLBaseStore(object): @staticmethod def _simple_update_txn(txn, table, keyvalues, updatevalues): if keyvalues: - where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) + where = "WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)) else: where = "" @@ -802,7 +813,7 @@ class SQLBaseStore(object): txn.execute( update_sql, - updatevalues.values() + keyvalues.values() + list(updatevalues.values()) + list(keyvalues.values()) ) return txn.rowcount @@ -850,7 +861,7 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k,) for k in keyvalues) ) - txn.execute(select_sql, keyvalues.values()) + txn.execute(select_sql, list(keyvalues.values())) row = txn.fetchone() if not row: @@ -888,7 +899,7 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) - txn.execute(sql, keyvalues.values()) + txn.execute(sql, list(keyvalues.values())) if txn.rowcount == 0: raise StoreError(404, "No row found") if txn.rowcount > 1: @@ -906,7 +917,7 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) - return txn.execute(sql, keyvalues.values()) + return txn.execute(sql, list(keyvalues.values())) def _simple_delete_many(self, table, column, iterable, keyvalues, desc): return self.runInteraction( @@ -938,7 +949,7 @@ class SQLBaseStore(object): ) values.extend(iterable) - for key, value in keyvalues.iteritems(): + for key, value in iteritems(keyvalues): clauses.append("%s = ?" % (key,)) values.append(value) @@ -978,7 +989,7 @@ class SQLBaseStore(object): txn.close() if cache: - min_val = min(cache.itervalues()) + min_val = min(itervalues(cache)) else: min_val = max_value @@ -1093,7 +1104,7 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k,) for k in keyvalues), " ? ASC LIMIT ? OFFSET ?" ) - txn.execute(sql, keyvalues.values() + pagevalues) + txn.execute(sql, list(keyvalues.values()) + list(pagevalues)) else: sql = "SELECT %s FROM %s ORDER BY %s" % ( ", ".join(retcols), diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index ba46907737..ce338514e8 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -22,6 +22,8 @@ from . import background_updates from synapse.util.caches import CACHE_SIZE_FACTOR +from six import iteritems + logger = logging.getLogger(__name__) @@ -99,7 +101,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): def _update_client_ips_batch_txn(self, txn, to_update): self.database_engine.lock_table(txn, "user_ips") - for entry in to_update.iteritems(): + for entry in iteritems(to_update): (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry self._simple_upsert_txn( @@ -231,5 +233,5 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): "user_agent": user_agent, "last_seen": last_seen, } - for (access_token, ip), (user_agent, last_seen) in results.iteritems() + for (access_token, ip), (user_agent, last_seen) in iteritems(results) )) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 712106b83a..d149d8392e 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -21,6 +21,7 @@ from synapse.api.errors import StoreError from ._base import SQLBaseStore, Cache from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks +from six import itervalues, iteritems logger = logging.getLogger(__name__) @@ -360,7 +361,7 @@ class DeviceStore(SQLBaseStore): return (now_stream_id, []) if len(query_map) >= 20: - now_stream_id = max(stream_id for stream_id in query_map.itervalues()) + now_stream_id = max(stream_id for stream_id in itervalues(query_map)) devices = self._get_e2e_device_keys_txn( txn, query_map.keys(), include_all_devices=True @@ -373,13 +374,13 @@ class DeviceStore(SQLBaseStore): """ results = [] - for user_id, user_devices in devices.iteritems(): + for user_id, user_devices in iteritems(devices): # The prev_id for the first row is always the last row before # `from_stream_id` txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id)) rows = txn.fetchall() prev_id = rows[0][0] - for device_id, device in user_devices.iteritems(): + for device_id, device in iteritems(user_devices): stream_id = query_map[(user_id, device_id)] result = { "user_id": user_id, @@ -483,7 +484,7 @@ class DeviceStore(SQLBaseStore): if devices: user_devices = devices[user_id] results = [] - for device_id, device in user_devices.iteritems(): + for device_id, device in iteritems(user_devices): result = { "device_id": device_id, } diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index ff8538ddf8..b146487943 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -21,6 +21,8 @@ import simplejson as json from ._base import SQLBaseStore +from six import iteritems + class EndToEndKeyStore(SQLBaseStore): def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): @@ -81,8 +83,8 @@ class EndToEndKeyStore(SQLBaseStore): query_list, include_all_devices, ) - for user_id, device_keys in results.iteritems(): - for device_id, device_info in device_keys.iteritems(): + for user_id, device_keys in iteritems(results): + for device_id, device_info in iteritems(device_keys): device_info["keys"] = json.loads(device_info.pop("key_json")) defer.returnValue(results) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index f084a5f54b..d0350ee5fe 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -22,6 +22,8 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks import logging import simplejson as json +from six import iteritems + logger = logging.getLogger(__name__) @@ -420,7 +422,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): txn.executemany(sql, ( _gen_entry(user_id, actions) - for user_id, actions in user_id_actions.iteritems() + for user_id, actions in iteritems(user_id_actions) )) return self.runInteraction( diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index ba834854e1..32d9d00ffb 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -337,7 +337,7 @@ class EventsWorkerStore(SQLBaseStore): def _fetch_event_rows(self, txn, events): rows = [] N = 200 - for i in range(1 + len(events) / N): + for i in range(1 + len(events) // N): evs = events[i * N:(i + 1) * N] if not evs: break diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index 78b1e30945..2e2763126d 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -44,7 +44,7 @@ class FilteringStore(SQLBaseStore): desc="get_user_filter", ) - defer.returnValue(json.loads(str(def_json).decode("utf-8"))) + defer.returnValue(json.loads(bytes(def_json).decode("utf-8"))) def add_user_filter(self, user_localpart, user_filter): def_json = encode_canonical_json(user_filter) diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 87aeaf71d6..0540c2b0b1 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -92,7 +92,7 @@ class KeyStore(SQLBaseStore): if verify_key_bytes: defer.returnValue(decode_verify_key_bytes( - key_id, str(verify_key_bytes) + key_id, bytes(verify_key_bytes) )) @defer.inlineCallbacks diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 6a861943a2..7bfc3d91b5 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -30,6 +30,8 @@ from synapse.types import get_domain_from_id import logging import simplejson as json +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -272,7 +274,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): users_in_room = {} member_event_ids = [ e_id - for key, e_id in current_state_ids.iteritems() + for key, e_id in iteritems(current_state_ids) if key[0] == EventTypes.Member ] @@ -289,7 +291,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): users_in_room = dict(prev_res) member_event_ids = [ e_id - for key, e_id in context.delta_ids.iteritems() + for key, e_id in iteritems(context.delta_ids) if key[0] == EventTypes.Member ] for etype, state_key in context.delta_ids: @@ -741,7 +743,7 @@ class _JoinedHostsCache(object): if state_entry.state_group == self.state_group: pass elif state_entry.prev_group == self.state_group: - for (typ, state_key), event_id in state_entry.delta_ids.iteritems(): + for (typ, state_key), event_id in iteritems(state_entry.delta_ids): if typ != EventTypes.Member: continue @@ -771,7 +773,7 @@ class _JoinedHostsCache(object): self.state_group = state_entry.state_group else: self.state_group = object() - self._len = sum(len(v) for v in self.hosts_to_joined_users.itervalues()) + self._len = sum(len(v) for v in itervalues(self.hosts_to_joined_users)) defer.returnValue(frozenset(self.hosts_to_joined_users)) def __len__(self): diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 814a7bf71b..fc11e26623 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -20,6 +20,8 @@ from twisted.internet import defer, reactor, task import time import logging +from itertools import islice + logger = logging.getLogger(__name__) @@ -79,3 +81,19 @@ class Clock(object): except Exception: if not ignore_errs: raise + + +def batch_iter(iterable, size): + """batch an iterable up into tuples with a maximum size + + Args: + iterable (iterable): the iterable to slice + size (int): the maximum batch size + + Returns: + an iterator over the chunks + """ + # make sure we can deal with iterables like lists too + sourceiter = iter(iterable) + # call islice until it returns an empty tuple + return iter(lambda: tuple(islice(sourceiter, size)), ()) diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 4adae96681..329ccbb866 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -16,6 +16,9 @@ import synapse.metrics import os +from six.moves import intern +import six + CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5)) metrics = synapse.metrics.get_metrics_for("synapse.util.caches") @@ -66,7 +69,9 @@ def intern_string(string): return None try: - string = string.encode("ascii") + if six.PY2: + string = string.encode("ascii") + return intern(string) except UnicodeEncodeError: return string |