From 926ba76e23ea9d55638baff541cdfaeb9e01ac47 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Mar 2018 23:43:31 +0000 Subject: Replace ujson with simplejson --- synapse/handlers/e2e_keys.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers/e2e_keys.py') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 668a90e495..ce2c87e400 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import ujson as json +import simplejson as json import logging from canonicaljson import encode_canonical_json -- cgit 1.5.1 From c2a5cf2fe32d2cd582711669b7c0ce74682e1c05 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 27 Mar 2018 17:07:31 +0100 Subject: factor out exception handling for keys/claim and keys/query this stuff is badly c&p'ed --- synapse/handlers/e2e_keys.py | 53 +++++++++++++++++++++----------------------- 1 file changed, 25 insertions(+), 28 deletions(-) (limited to 'synapse/handlers/e2e_keys.py') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 80b359b2e7..41521e6990 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -134,23 +135,8 @@ class E2eKeysHandler(object): if user_id in destination_query: results[user_id] = keys - except CodeMessageException as e: - failures[destination] = { - "status": e.code, "message": e.message - } - except NotRetryingDestination as e: - failures[destination] = { - "status": 503, "message": "Not ready for retry", - } - except FederationDeniedError as e: - failures[destination] = { - "status": 403, "message": "Federation Denied", - } except Exception as e: - # include ConnectionRefused and other errors - failures[destination] = { - "status": 503, "message": e.message - } + failures[destination] = _exception_to_failure(e) yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(do_remote_query)(destination) @@ -252,19 +238,8 @@ class E2eKeysHandler(object): for user_id, keys in remote_result["one_time_keys"].items(): if user_id in device_keys: json_result[user_id] = keys - except CodeMessageException as e: - failures[destination] = { - "status": e.code, "message": e.message - } - except NotRetryingDestination as e: - failures[destination] = { - "status": 503, "message": "Not ready for retry", - } except Exception as e: - # include ConnectionRefused and other errors - failures[destination] = { - "status": 503, "message": e.message - } + failures[destination] = _exception_to_failure(e) yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(claim_client_keys)(destination) @@ -362,6 +337,28 @@ class E2eKeysHandler(object): ) +def _exception_to_failure(e): + if isinstance(e, CodeMessageException): + return { + "status": e.code, "message": e.message, + } + + if isinstance(e, NotRetryingDestination): + return { + "status": 503, "message": "Not ready for retry", + } + + if isinstance(e, FederationDeniedError): + return { + "status": 403, "message": "Federation Denied", + } + + # include ConnectionRefused and other errors + return { + "status": 503, "message": e.message, + } + + def _one_time_keys_match(old_key_json, new_key): old_key = json.loads(old_key_json) -- cgit 1.5.1 From a134c572a6697fa6443525493e3fc13f74452d34 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 27 Mar 2018 17:15:06 +0100 Subject: Stringify exceptions for keys/{query,claim} Make sure we stringify any exceptions we return from keys/query and keys/claim, to avoid a 'not JSON serializable' error later Fixes #3010 --- synapse/handlers/e2e_keys.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/handlers/e2e_keys.py') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 41521e6990..325c0c4a9f 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -354,8 +354,11 @@ def _exception_to_failure(e): } # include ConnectionRefused and other errors + # + # Note that some Exceptions (notably twisted's ResponseFailed etc) don't + # give a string for e.message, which simplejson then fails to serialize. return { - "status": 503, "message": e.message, + "status": 503, "message": str(e.message), } -- cgit 1.5.1 From 605defb9e4c274d61d9da86546e6fd6c78f3f6cf Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 11:16:28 +0100 Subject: Add missing consumeErrors In general we want defer.gatherResults to consumeErrors, rather than having exceptions hanging around and getting logged as CRITICAL unhandled errors. --- synapse/handlers/e2e_keys.py | 4 ++-- synapse/push/pusherpool.py | 8 ++++++-- synapse/storage/stream.py | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) (limited to 'synapse/handlers/e2e_keys.py') diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 325c0c4a9f..7eb03ad32e 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -141,7 +141,7 @@ class E2eKeysHandler(object): yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(do_remote_query)(destination) for destination in remote_queries_not_in_cache - ])) + ], consumeErrors=True)) defer.returnValue({ "device_keys": results, "failures": failures, @@ -244,7 +244,7 @@ class E2eKeysHandler(object): yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(claim_client_keys)(destination) for destination in remote_queries - ])) + ], consumeErrors=True)) logger.info( "Claimed one-time-keys: %s", diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 134e89b371..2f467d1f9c 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -142,7 +142,9 @@ class PusherPool: ) ) - yield make_deferred_yieldable(defer.gatherResults(deferreds)) + yield make_deferred_yieldable( + defer.gatherResults(deferreds, consumeErrors=True), + ) except Exception: logger.exception("Exception in pusher on_new_notifications") @@ -167,7 +169,9 @@ class PusherPool: preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) ) - yield make_deferred_yieldable(defer.gatherResults(deferreds)) + yield make_deferred_yieldable( + defer.gatherResults(deferreds, consumeErrors=True), + ) except Exception: logger.exception("Exception in pusher on_new_receipts") diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 2956c3b3e0..3b8b539993 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -202,7 +202,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids - ])) + ], consumeErrors=True)) results.update(dict(zip(rm_ids, res))) defer.returnValue(results) -- cgit 1.5.1 From 2a13af23bc0561ab48e0a90528231c40ee209724 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 27 Apr 2018 11:29:27 +0100 Subject: Use run_in_background in preference to preserve_fn While I was going through uses of preserve_fn for other PRs, I converted places which only use the wrapped function once to use run_in_background, to avoid creating the function object. --- synapse/app/federation_sender.py | 4 ++-- synapse/app/pusher.py | 4 ++-- synapse/app/synchrotron.py | 5 ++--- synapse/appservice/scheduler.py | 12 ++++++------ synapse/crypto/keyring.py | 28 ++++++++++++++++----------- synapse/federation/federation_client.py | 5 +++-- synapse/groups/attestations.py | 4 ++-- synapse/handlers/appservice.py | 5 ++++- synapse/handlers/e2e_keys.py | 6 +++--- synapse/handlers/federation.py | 16 +++++++++------ synapse/handlers/initial_sync.py | 12 +++++++----- synapse/handlers/message.py | 5 +++-- synapse/handlers/typing.py | 7 ++++--- synapse/push/pusherpool.py | 20 +++++++++++-------- synapse/rest/media/v1/preview_url_resource.py | 5 +++-- synapse/storage/events_worker.py | 5 +++-- synapse/storage/stream.py | 5 +++-- synapse/util/async.py | 4 ++-- synapse/util/file_consumer.py | 6 ++++-- synapse/util/logcontext.py | 2 +- synapse/util/ratelimitutils.py | 4 ++-- synapse/util/retryutils.py | 4 ++-- 22 files changed, 97 insertions(+), 71 deletions(-) (limited to 'synapse/handlers/e2e_keys.py') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 0cc3331519..c6daa0d43f 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -38,7 +38,7 @@ from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor @@ -229,7 +229,7 @@ class FederationSenderHandler(object): # presence, typing, etc. if stream_name == "federation": send_queue.process_rows_for_federation(self.federation_sender, rows) - preserve_fn(self.update_token)(token) + run_in_background(self.update_token, token) # We also need to poke the federation sender when new events happen elif stream_name == "events": diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index d5c3a85195..8bd5c0c2b7 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -33,7 +33,7 @@ from synapse.server import HomeServer from synapse.storage import DataStore from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor @@ -140,7 +140,7 @@ class PusherReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) - preserve_fn(self.poke_pushers)(stream_name, token, rows) + run_in_background(self.poke_pushers, stream_name, token, rows) @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 2fddcd935a..0c4ccc58bc 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -51,7 +51,7 @@ from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.stringutils import random_string from synapse.util.versionstring import get_version_string @@ -327,8 +327,7 @@ class SyncReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) - - preserve_fn(self.process_and_notify)(stream_name, token, rows) + run_in_background(self.process_and_notify, stream_name, token, rows) def get_streams_to_replicate(self): args = super(SyncReplicationHandler, self).get_streams_to_replicate() diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 6da315473d..ba1631b5c8 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -51,7 +51,7 @@ components. from twisted.internet import defer from synapse.appservice import ApplicationServiceState -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure import logging @@ -106,7 +106,7 @@ class _ServiceQueuer(object): def enqueue(self, service, event): # if this service isn't being sent something self.queued_events.setdefault(service.id, []).append(event) - preserve_fn(self._send_request)(service) + run_in_background(self._send_request, service) @defer.inlineCallbacks def _send_request(self, service): @@ -152,10 +152,10 @@ class _TransactionController(object): if sent: yield txn.complete(self.store) else: - preserve_fn(self._start_recoverer)(service) - except Exception as e: - logger.exception(e) - preserve_fn(self._start_recoverer)(service) + run_in_background(self._start_recoverer, service) + except Exception: + logger.exception("Error creating appservice transaction") + run_in_background(self._start_recoverer, service) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index fce83d445f..38944a7326 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -19,7 +19,8 @@ from synapse.api.errors import SynapseError, Codes from synapse.util import unwrapFirstError, logcontext from synapse.util.logcontext import ( PreserveLoggingContext, - preserve_fn + preserve_fn, + run_in_background, ) from synapse.util.metrics import Measure @@ -127,7 +128,7 @@ class Keyring(object): verify_requests.append(verify_request) - preserve_fn(self._start_key_lookups)(verify_requests) + run_in_background(self._start_key_lookups, verify_requests) # Pass those keys to handle_key_deferred so that the json object # signatures can be verified @@ -313,7 +314,7 @@ class Keyring(object): if not verify_request.deferred.called: verify_request.deferred.errback(err) - preserve_fn(do_iterations)().addErrback(on_err) + run_in_background(do_iterations).addErrback(on_err) @defer.inlineCallbacks def get_keys_from_store(self, server_name_and_key_ids): @@ -329,8 +330,9 @@ class Keyring(object): """ res = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store.get_server_verify_keys)( - server_name, key_ids + run_in_background( + self.store.get_server_verify_keys, + server_name, key_ids, ).addCallback(lambda ks, server: (server, ks), server_name) for server_name, key_ids in server_name_and_key_ids ], @@ -358,7 +360,7 @@ class Keyring(object): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(get_key)(p_name, p_keys) + run_in_background(get_key, p_name, p_keys) for p_name, p_keys in self.perspective_servers.items() ], consumeErrors=True, @@ -398,7 +400,7 @@ class Keyring(object): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(get_key)(server_name, key_ids) + run_in_background(get_key, server_name, key_ids) for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, @@ -481,7 +483,8 @@ class Keyring(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store_keys)( + run_in_background( + self.store_keys, server_name=server_name, from_server=perspective_name, verify_keys=response_keys, @@ -539,7 +542,8 @@ class Keyring(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store_keys)( + run_in_background( + self.store_keys, server_name=key_server_name, from_server=server_name, verify_keys=verify_keys, @@ -615,7 +619,8 @@ class Keyring(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store.store_server_keys_json)( + run_in_background( + self.store.store_server_keys_json, server_name=server_name, key_id=key_id, from_server=server_name, @@ -716,7 +721,8 @@ class Keyring(object): # TODO(markjh): Store whether the keys have expired. return logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.store.store_server_verify_key)( + run_in_background( + self.store.store_server_verify_key, server_name, server_name, key.time_added, key ) for key_id, key in verify_keys.items() diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 8e2c0c4cd2..8adc60863e 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -33,7 +33,7 @@ from synapse.federation.federation_base import ( import synapse.metrics from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination @@ -417,7 +417,8 @@ class FederationClient(FederationBase): batch = set(missing_events[i:i + batch_size]) deferreds = [ - preserve_fn(self.get_pdu)( + run_in_background( + self.get_pdu, destinations=random_server_list(), event_id=e_id, ) diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 1fb709e6c3..5f53f17954 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -42,7 +42,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.types import get_domain_from_id -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from signedjson.sign import sign_json @@ -192,4 +192,4 @@ class GroupAttestionRenewer(object): group_id = row["group_id"] user_id = row["user_id"] - preserve_fn(_renew_attestation)(group_id, user_id) + run_in_background(_renew_attestation, group_id, user_id) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 0245197c02..6cc2388306 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -198,7 +198,10 @@ class ApplicationServicesHandler(object): services = yield self._get_services_for_3pn(protocol) results = yield make_deferred_yieldable(defer.DeferredList([ - preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) + run_in_background( + self.appservice_api.query_3pe, + service, kind, protocol, fields, + ) for service in services ], consumeErrors=True)) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 325c0c4a9f..fc958404a1 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -24,7 +24,7 @@ from synapse.api.errors import ( SynapseError, CodeMessageException, FederationDeniedError, ) from synapse.types import get_domain_from_id, UserID -from synapse.util.logcontext import preserve_fn, make_deferred_yieldable +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -139,7 +139,7 @@ class E2eKeysHandler(object): failures[destination] = _exception_to_failure(e) yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(do_remote_query)(destination) + run_in_background(do_remote_query, destination) for destination in remote_queries_not_in_cache ])) @@ -242,7 +242,7 @@ class E2eKeysHandler(object): failures[destination] = _exception_to_failure(e) yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(claim_client_keys)(destination) + run_in_background(claim_client_keys, destination) for destination in remote_queries ])) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ae7e0d6da2..c66ca0f381 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -637,7 +637,8 @@ class FederationHandler(BaseHandler): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - logcontext.preserve_fn(self.replication_layer.get_pdu)( + logcontext.run_in_background( + self.replication_layer.get_pdu, [dest], event_id, outlier=True, @@ -1023,7 +1024,7 @@ class FederationHandler(BaseHandler): # lots of requests for missing prev_events which we do actually # have. Hence we fire off the deferred, but don't wait for it. - logcontext.preserve_fn(self._handle_queued_pdus)(room_queue) + logcontext.run_in_background(self._handle_queued_pdus, room_queue) defer.returnValue(True) @@ -1523,8 +1524,9 @@ class FederationHandler(BaseHandler): if not backfilled: # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - logcontext.preserve_fn(self.pusher_pool.on_new_notifications)( - event_stream_id, max_stream_id + logcontext.run_in_background( + self.pusher_pool.on_new_notifications, + event_stream_id, max_stream_id, ) defer.returnValue((context, event_stream_id, max_stream_id)) @@ -1538,7 +1540,8 @@ class FederationHandler(BaseHandler): """ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - logcontext.preserve_fn(self._prep_event)( + logcontext.run_in_background( + self._prep_event, origin, ev_info["event"], state=ev_info.get("state"), @@ -1867,7 +1870,8 @@ class FederationHandler(BaseHandler): different_events = yield logcontext.make_deferred_yieldable( defer.gatherResults([ - logcontext.preserve_fn(self.store.get_event)( + logcontext.run_in_background( + self.store.get_event, d, allow_none=True, allow_rejected=False, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index c5267b4b84..cd33a86599 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -27,7 +27,7 @@ from synapse.types import ( from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -166,7 +166,8 @@ class InitialSyncHandler(BaseHandler): (messages, token), current_state = yield make_deferred_yieldable( defer.gatherResults( [ - preserve_fn(self.store.get_recent_events_for_room)( + run_in_background( + self.store.get_recent_events_for_room, event.room_id, limit=limit, end_token=room_end_token, @@ -391,9 +392,10 @@ class InitialSyncHandler(BaseHandler): presence, receipts, (messages, token) = yield defer.gatherResults( [ - preserve_fn(get_presence)(), - preserve_fn(get_receipts)(), - preserve_fn(self.store.get_recent_events_for_room)( + run_in_background(get_presence), + run_in_background(get_receipts), + run_in_background( + self.store.get_recent_events_for_room, room_id, limit=limit, end_token=now_token.room_key, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 21628a8540..244b98dd8d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -850,7 +850,8 @@ class EventCreationHandler(object): # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - preserve_fn(self.pusher_pool.on_new_notifications)( + run_in_background( + self.pusher_pool.on_new_notifications, event_stream_id, max_stream_id ) @@ -862,7 +863,7 @@ class EventCreationHandler(object): extra_users=extra_users ) - preserve_fn(_notify)() + run_in_background(_notify) if event.type == EventTypes.Message: presence = self.hs.get_presence_handler() diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 77c0cf146f..19cde70adf 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer from synapse.types import UserID, get_domain_from_id @@ -97,7 +97,8 @@ class TypingHandler(object): if self.hs.is_mine_id(member.user_id): last_fed_poke = self._member_last_federation_poke.get(member, None) if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now: - preserve_fn(self._push_remote)( + run_in_background( + self._push_remote, member=member, typing=True ) @@ -196,7 +197,7 @@ class TypingHandler(object): def _push_update(self, member, typing): if self.hs.is_mine_id(member.user_id): # Only send updates for changes to our own users. - preserve_fn(self._push_remote)(member, typing) + run_in_background(self._push_remote, member, typing) self._push_update_local( member=member, diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 134e89b371..7bb5733090 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -14,13 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer -from .pusher import PusherFactory -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.push.pusher import PusherFactory from synapse.util.async import run_on_reactor - -import logging +from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) @@ -137,8 +137,9 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - preserve_fn(p.on_new_notifications)( - min_stream_id, max_stream_id + run_in_background( + p.on_new_notifications, + min_stream_id, max_stream_id, ) ) @@ -164,7 +165,10 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) + run_in_background( + p.on_new_receipts, + min_stream_id, max_stream_id, + ) ) yield make_deferred_yieldable(defer.gatherResults(deferreds)) @@ -207,7 +211,7 @@ class PusherPool: if appid_pushkey in byuser: byuser[appid_pushkey].on_stop() byuser[appid_pushkey] = p - preserve_fn(p.on_started)() + run_in_background(p.on_started) logger.info("Started pushers") diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 0fc21540c6..9290d7946f 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -35,7 +35,7 @@ from ._base import FileInfo from synapse.api.errors import ( SynapseError, Codes, ) -from synapse.util.logcontext import preserve_fn, make_deferred_yieldable +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.stringutils import random_string from synapse.util.caches.expiringcache import ExpiringCache from synapse.http.client import SpiderHttpClient @@ -144,7 +144,8 @@ class PreviewUrlResource(Resource): observable = self._cache.get(url) if not observable: - download = preserve_fn(self._do_preview)( + download = run_in_background( + self._do_preview, url, requester.user, ts, ) observable = ObservableDeferred( diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index a937b9bceb..ba834854e1 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -20,7 +20,7 @@ from synapse.events import FrozenEvent from synapse.events.utils import prune_event from synapse.util.logcontext import ( - preserve_fn, PreserveLoggingContext, make_deferred_yieldable + PreserveLoggingContext, make_deferred_yieldable, run_in_background, ) from synapse.util.metrics import Measure from synapse.api.errors import SynapseError @@ -319,7 +319,8 @@ class EventsWorkerStore(SQLBaseStore): res = yield make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self._get_event_from_row)( + run_in_background( + self._get_event_from_row, row["internal_metadata"], row["json"], row["redacts"], rejected_reason=row["rejects"], ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 2956c3b3e0..5b245a936c 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -41,7 +41,7 @@ from synapse.storage.events import EventsWorkerStore from synapse.util.caches.descriptors import cached from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.storage.engines import PostgresEngine, Sqlite3Engine import abc @@ -198,7 +198,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): res = yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(self.get_room_events_stream_for_room)( + run_in_background( + self.get_room_events_stream_for_room, room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids diff --git a/synapse/util/async.py b/synapse/util/async.py index 0729bb2863..bd07067328 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -17,7 +17,7 @@ from twisted.internet import defer, reactor from .logcontext import ( - PreserveLoggingContext, make_deferred_yieldable, preserve_fn + PreserveLoggingContext, make_deferred_yieldable, run_in_background ) from synapse.util import logcontext, unwrapFirstError @@ -161,7 +161,7 @@ def concurrently_execute(func, args, limit): pass return logcontext.make_deferred_yieldable(defer.gatherResults([ - preserve_fn(_concurrently_execute_inner)() + run_in_background(_concurrently_execute_inner) for _ in xrange(limit) ], consumeErrors=True)).addErrback(unwrapFirstError) diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 3c8a165331..3380970e4e 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -15,7 +15,7 @@ from twisted.internet import threads, reactor -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from six.moves import queue @@ -70,7 +70,9 @@ class BackgroundFileConsumer(object): self._producer = producer self.streaming = streaming - self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer) + self._finished_deferred = run_in_background( + threads.deferToThread, self._writer + ) if not streaming: self._producer.resumeProducing() diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d59adc236e..c2edf87e58 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -341,7 +341,7 @@ def make_deferred_yieldable(deferred): returning a deferred. Then, when the deferred completes, restores the current logcontext before running callbacks/errbacks. - (This is more-or-less the opposite operation to preserve_fn.) + (This is more-or-less the opposite operation to run_in_background.) """ if isinstance(deferred, defer.Deferred) and not deferred.called: prev_context = LoggingContext.set_current_context(LoggingContext.sentinel) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 1101881a2d..18424f6c36 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError from synapse.util.async import sleep -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background import collections import contextlib @@ -150,7 +150,7 @@ class _PerHostRatelimiter(object): "Ratelimit [%s]: sleeping req", id(request_id), ) - ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0) + ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0) self.sleeping_requests.add(request_id) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 47b0bb5eb3..4e93f69d3a 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -203,8 +203,8 @@ class RetryDestinationLimiter(object): ) except Exception: logger.exception( - "Failed to store set_destination_retry_timings", + "Failed to store destination_retry_timings", ) # we deliberately do this in the background. - synapse.util.logcontext.preserve_fn(store_retry_timings)() + synapse.util.logcontext.run_in_background(store_retry_timings) -- cgit 1.5.1 From 933bf2dd357842b0e7c3fc7a1111d89ab52f5329 Mon Sep 17 00:00:00 2001 From: Adrian Tschira Date: Sat, 28 Apr 2018 13:19:12 +0200 Subject: replace some iteritems with six Signed-off-by: Adrian Tschira --- synapse/handlers/device.py | 14 ++++++++------ synapse/handlers/e2e_keys.py | 13 +++++++------ synapse/handlers/groups_local.py | 3 ++- synapse/handlers/presence.py | 15 ++++++++------- synapse/handlers/sync.py | 14 ++++++++------ synapse/handlers/user_directory.py | 3 ++- synapse/metrics/process_collector.py | 3 ++- synapse/push/bulk_push_rule_evaluator.py | 13 +++++++------ synapse/replication/tcp/protocol.py | 9 +++++---- synapse/replication/tcp/resource.py | 3 ++- synapse/rest/media/v1/media_repository.py | 3 ++- synapse/storage/client_ips.py | 6 ++++-- synapse/storage/devices.py | 9 +++++---- synapse/storage/end_to_end_keys.py | 6 ++++-- synapse/storage/event_push_actions.py | 4 +++- 15 files changed, 69 insertions(+), 49 deletions(-) (limited to 'synapse/handlers/e2e_keys.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index f7457a7082..31bd0e60c6 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -26,6 +26,8 @@ from ._base import BaseHandler import logging +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -318,7 +320,7 @@ class DeviceHandler(BaseHandler): # The user may have left the room # TODO: Check if they actually did or if we were just invited. if room_id not in room_ids: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -338,7 +340,7 @@ class DeviceHandler(BaseHandler): # special-case for an empty prev state: include all members # in the changed list if not event_ids: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -354,10 +356,10 @@ class DeviceHandler(BaseHandler): # Check if we've joined the room? If so we just blindly add all the users to # the "possibly changed" users. - for state_dict in prev_state_ids.itervalues(): + for state_dict in itervalues(prev_state_ids): member_event = state_dict.get((EventTypes.Member, user_id), None) if not member_event or member_event != current_member_id: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -367,14 +369,14 @@ class DeviceHandler(BaseHandler): # If there has been any change in membership, include them in the # possibly changed list. We'll check if they are joined below, # and we're not toooo worried about spuriously adding users. - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue # check if this member has changed since any of the extremities # at the stream_ordering, and add them to the list if so. - for state_dict in prev_state_ids.itervalues(): + for state_dict in itervalues(prev_state_ids): prev_event_id = state_dict.get(key, None) if not prev_event_id or prev_event_id != event_id: if state_key != user_id: diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 25aec624af..8a2d177539 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -19,6 +19,7 @@ import logging from canonicaljson import encode_canonical_json from twisted.internet import defer +from six import iteritems from synapse.api.errors import ( SynapseError, CodeMessageException, FederationDeniedError, @@ -92,7 +93,7 @@ class E2eKeysHandler(object): remote_queries_not_in_cache = {} if remote_queries: query_list = [] - for user_id, device_ids in remote_queries.iteritems(): + for user_id, device_ids in iteritems(remote_queries): if device_ids: query_list.extend((user_id, device_id) for device_id in device_ids) else: @@ -103,9 +104,9 @@ class E2eKeysHandler(object): query_list ) ) - for user_id, devices in remote_results.iteritems(): + for user_id, devices in iteritems(remote_results): user_devices = results.setdefault(user_id, {}) - for device_id, device in devices.iteritems(): + for device_id, device in iteritems(devices): keys = device.get("keys", None) device_display_name = device.get("device_display_name", None) if keys: @@ -250,9 +251,9 @@ class E2eKeysHandler(object): "Claimed one-time-keys: %s", ",".join(( "%s for %s:%s" % (key_id, user_id, device_id) - for user_id, user_keys in json_result.iteritems() - for device_id, device_keys in user_keys.iteritems() - for key_id, _ in device_keys.iteritems() + for user_id, user_keys in iteritems(json_result) + for device_id, device_keys in iteritems(user_keys) + for key_id, _ in iteritems(device_keys) )), ) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 977993e7d4..dcae083734 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -15,6 +15,7 @@ # limitations under the License. from twisted.internet import defer +from six import iteritems from synapse.api.errors import SynapseError from synapse.types import get_domain_from_id @@ -449,7 +450,7 @@ class GroupsLocalHandler(object): results = {} failed_results = [] - for destination, dest_user_ids in destinations.iteritems(): + for destination, dest_user_ids in iteritems(destinations): try: r = yield self.transport_client.bulk_get_publicised_groups( destination, list(dest_user_ids), diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 91218e40e6..b51f925220 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -25,6 +25,8 @@ The methods that define policy are: from twisted.internet import defer, reactor from contextlib import contextmanager +from six import itervalues, iteritems + from synapse.api.errors import SynapseError from synapse.api.constants import PresenceState from synapse.storage.presence import UserPresenceState @@ -40,7 +42,6 @@ import synapse.metrics import logging - logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) @@ -526,7 +527,7 @@ class PresenceHandler(object): prev_state.copy_and_replace( last_user_sync_ts=time_now_ms, ) - for prev_state in prev_states.itervalues() + for prev_state in itervalues(prev_states) ]) self.external_process_last_updated_ms.pop(process_id, None) @@ -549,14 +550,14 @@ class PresenceHandler(object): for user_id in user_ids } - missing = [user_id for user_id, state in states.iteritems() if not state] + missing = [user_id for user_id, state in iteritems(states) if not state] if missing: # There are things not in our in memory cache. Lets pull them out of # the database. res = yield self.store.get_presence_for_users(missing) states.update(res) - missing = [user_id for user_id, state in states.iteritems() if not state] + missing = [user_id for user_id, state in iteritems(states) if not state] if missing: new = { user_id: UserPresenceState.default(user_id) @@ -1044,7 +1045,7 @@ class PresenceEventSource(object): defer.returnValue((updates.values(), max_token)) else: defer.returnValue(([ - s for s in updates.itervalues() + s for s in itervalues(updates) if s.state != PresenceState.OFFLINE ], max_token)) @@ -1301,11 +1302,11 @@ def get_interested_remotes(store, states, state_handler): # hosts in those rooms. room_ids_to_states, users_to_states = yield get_interested_parties(store, states) - for room_id, states in room_ids_to_states.iteritems(): + for room_id, states in iteritems(room_ids_to_states): hosts = yield state_handler.get_current_hosts_in_room(room_id) hosts_and_states.append((hosts, states)) - for user_id, states in users_to_states.iteritems(): + for user_id, states in iteritems(users_to_states): host = get_domain_from_id(user_id) hosts_and_states.append(([host], states)) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 263e42dded..d0c99c35e3 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -28,6 +28,8 @@ import collections import logging import itertools +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -275,7 +277,7 @@ class SyncHandler(object): # result returned by the event source is poor form (it might cache # the object) room_id = event["room_id"] - event_copy = {k: v for (k, v) in event.iteritems() + event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) @@ -294,7 +296,7 @@ class SyncHandler(object): for event in receipts: room_id = event["room_id"] # exclude room id, as above - event_copy = {k: v for (k, v) in event.iteritems() + event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) @@ -325,7 +327,7 @@ class SyncHandler(object): current_state_ids = frozenset() if any(e.is_state() for e in recents): current_state_ids = yield self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(current_state_ids.itervalues()) + current_state_ids = frozenset(itervalues(current_state_ids)) recents = yield filter_events_for_client( self.store, @@ -382,7 +384,7 @@ class SyncHandler(object): current_state_ids = frozenset() if any(e.is_state() for e in loaded_recents): current_state_ids = yield self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(current_state_ids.itervalues()) + current_state_ids = frozenset(itervalues(current_state_ids)) loaded_recents = yield filter_events_for_client( self.store, @@ -984,7 +986,7 @@ class SyncHandler(object): if since_token: for joined_sync in sync_result_builder.joined: it = itertools.chain( - joined_sync.timeline.events, joined_sync.state.itervalues() + joined_sync.timeline.events, itervalues(joined_sync.state) ) for event in it: if event.type == EventTypes.Member: @@ -1062,7 +1064,7 @@ class SyncHandler(object): newly_left_rooms = [] room_entries = [] invited = [] - for room_id, events in mem_change_events_by_room_id.iteritems(): + for room_id, events in iteritems(mem_change_events_by_room_id): non_joins = [e for e in events if e.membership != Membership.JOIN] has_join = len(non_joins) != len(events) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 714f0195c8..cd0b7290f0 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -22,6 +22,7 @@ from synapse.util.metrics import Measure from synapse.util.async import sleep from synapse.types import get_localpart_from_id +from six import iteritems logger = logging.getLogger(__name__) @@ -403,7 +404,7 @@ class UserDirectoryHandler(object): if change: users_with_profile = yield self.state.get_current_user_in_room(room_id) - for user_id, profile in users_with_profile.iteritems(): + for user_id, profile in iteritems(users_with_profile): yield self._handle_new_user(room_id, user_id, profile) else: users = yield self.store.get_users_in_public_due_to_room(room_id) diff --git a/synapse/metrics/process_collector.py b/synapse/metrics/process_collector.py index 6fec3de399..50e5b48a2b 100644 --- a/synapse/metrics/process_collector.py +++ b/synapse/metrics/process_collector.py @@ -15,6 +15,7 @@ import os +from six import iteritems TICKS_PER_SEC = 100 BYTES_PER_PAGE = 4096 @@ -55,7 +56,7 @@ def update_resource_metrics(): # line is PID (command) more stats go here ... raw_stats = line.split(") ", 1)[1].split(" ") - for (name, index) in STAT_FIELDS.iteritems(): + for (name, index) in iteritems(STAT_FIELDS): # subtract 3 from the index, because proc(5) is 1-based, and # we've lost the first two fields in PID and COMMAND above stats[name] = int(raw_stats[index - 3]) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 7c680659b6..2f7e77f5f5 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -30,6 +30,7 @@ from synapse.state import POWER_KEY from collections import namedtuple +from six import itervalues, iteritems logger = logging.getLogger(__name__) @@ -126,7 +127,7 @@ class BulkPushRuleEvaluator(object): ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { - (e.type, e.state_key): e for e in auth_events.itervalues() + (e.type, e.state_key): e for e in itervalues(auth_events) } sender_level = get_user_power_level(event.sender, auth_events) @@ -160,7 +161,7 @@ class BulkPushRuleEvaluator(object): condition_cache = {} - for uid, rules in rules_by_user.iteritems(): + for uid, rules in iteritems(rules_by_user): if event.sender == uid: continue @@ -406,7 +407,7 @@ class RulesForRoom(object): # If the event is a join event then it will be in current state evnts # map but not in the DB, so we have to explicitly insert it. if event.type == EventTypes.Member: - for event_id in member_event_ids.itervalues(): + for event_id in itervalues(member_event_ids): if event_id == event.event_id: members[event_id] = (event.state_key, event.membership) @@ -414,7 +415,7 @@ class RulesForRoom(object): logger.debug("Found members %r: %r", self.room_id, members.values()) interested_in_user_ids = set( - user_id for user_id, membership in members.itervalues() + user_id for user_id, membership in itervalues(members) if membership == Membership.JOIN ) @@ -426,7 +427,7 @@ class RulesForRoom(object): ) user_ids = set( - uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher + uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher ) logger.debug("With pushers: %r", user_ids) @@ -447,7 +448,7 @@ class RulesForRoom(object): ) ret_rules_by_user.update( - item for item in rules_by_user.iteritems() if item[0] is not None + item for item in iteritems(rules_by_user) if item[0] is not None ) self.update_cache(sequence, members, ret_rules_by_user, state_group) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index d7d38464b2..7ca1588f6a 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -68,6 +68,7 @@ import synapse.metrics import struct import fcntl +from six import iterkeys, iteritems metrics = synapse.metrics.get_metrics_for(__name__) @@ -392,7 +393,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): if stream_name == "ALL": # Subscribe to all streams we're publishing to. - for stream in self.streamer.streams_by_name.iterkeys(): + for stream in iterkeys(self.streamer.streams_by_name): self.subscribe_to_stream(stream, token) else: self.subscribe_to_stream(stream_name, token) @@ -498,7 +499,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): BaseReplicationStreamProtocol.connectionMade(self) # Once we've connected subscribe to the necessary streams - for stream_name, token in self.handler.get_streams_to_replicate().iteritems(): + for stream_name, token in iteritems(self.handler.get_streams_to_replicate()): self.replicate(stream_name, token) # Tell the server if we have any users currently syncing (should only @@ -633,7 +634,7 @@ metrics.register_callback( lambda: { (k[0], p.name, p.conn_id): count for p in connected_connections - for k, count in p.inbound_commands_counter.counts.iteritems() + for k, count in iteritems(p.inbound_commands_counter.counts) }, labels=["command", "name", "conn_id"], ) @@ -643,7 +644,7 @@ metrics.register_callback( lambda: { (k[0], p.name, p.conn_id): count for p in connected_connections - for k, count in p.outbound_commands_counter.counts.iteritems() + for k, count in iteritems(p.outbound_commands_counter.counts) }, labels=["command", "name", "conn_id"], ) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index a41af4fd6c..d1c291e17d 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -26,6 +26,7 @@ from synapse.util.metrics import Measure, measure_func import logging import synapse.metrics +from six import itervalues metrics = synapse.metrics.get_metrics_for(__name__) stream_updates_counter = metrics.register_counter( @@ -79,7 +80,7 @@ class ReplicationStreamer(object): # We only support federation stream if federation sending hase been # disabled on the master. self.streams = [ - stream(hs) for stream in STREAMS_MAP.itervalues() + stream(hs) for stream in itervalues(STREAMS_MAP) if stream != FederationStream or not hs.config.send_federation ] diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 9800ce7581..2ac767d2dc 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -48,6 +48,7 @@ import shutil import cgi import logging from six.moves.urllib import parse as urlparse +from six import iteritems logger = logging.getLogger(__name__) @@ -603,7 +604,7 @@ class MediaRepository(object): thumbnails[(t_width, t_height, r_type)] = r_method # Now we generate the thumbnails for each dimension, store it - for (t_width, t_height, t_type), t_method in thumbnails.iteritems(): + for (t_width, t_height, t_type), t_method in iteritems(thumbnails): # Generate the thumbnail if t_method == "crop": t_byte_source = yield make_deferred_yieldable(threads.deferToThread( diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index ba46907737..ce338514e8 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -22,6 +22,8 @@ from . import background_updates from synapse.util.caches import CACHE_SIZE_FACTOR +from six import iteritems + logger = logging.getLogger(__name__) @@ -99,7 +101,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): def _update_client_ips_batch_txn(self, txn, to_update): self.database_engine.lock_table(txn, "user_ips") - for entry in to_update.iteritems(): + for entry in iteritems(to_update): (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry self._simple_upsert_txn( @@ -231,5 +233,5 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): "user_agent": user_agent, "last_seen": last_seen, } - for (access_token, ip), (user_agent, last_seen) in results.iteritems() + for (access_token, ip), (user_agent, last_seen) in iteritems(results) )) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 712106b83a..d149d8392e 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -21,6 +21,7 @@ from synapse.api.errors import StoreError from ._base import SQLBaseStore, Cache from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks +from six import itervalues, iteritems logger = logging.getLogger(__name__) @@ -360,7 +361,7 @@ class DeviceStore(SQLBaseStore): return (now_stream_id, []) if len(query_map) >= 20: - now_stream_id = max(stream_id for stream_id in query_map.itervalues()) + now_stream_id = max(stream_id for stream_id in itervalues(query_map)) devices = self._get_e2e_device_keys_txn( txn, query_map.keys(), include_all_devices=True @@ -373,13 +374,13 @@ class DeviceStore(SQLBaseStore): """ results = [] - for user_id, user_devices in devices.iteritems(): + for user_id, user_devices in iteritems(devices): # The prev_id for the first row is always the last row before # `from_stream_id` txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id)) rows = txn.fetchall() prev_id = rows[0][0] - for device_id, device in user_devices.iteritems(): + for device_id, device in iteritems(user_devices): stream_id = query_map[(user_id, device_id)] result = { "user_id": user_id, @@ -483,7 +484,7 @@ class DeviceStore(SQLBaseStore): if devices: user_devices = devices[user_id] results = [] - for device_id, device in user_devices.iteritems(): + for device_id, device in iteritems(user_devices): result = { "device_id": device_id, } diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index ff8538ddf8..b146487943 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -21,6 +21,8 @@ import simplejson as json from ._base import SQLBaseStore +from six import iteritems + class EndToEndKeyStore(SQLBaseStore): def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): @@ -81,8 +83,8 @@ class EndToEndKeyStore(SQLBaseStore): query_list, include_all_devices, ) - for user_id, device_keys in results.iteritems(): - for device_id, device_info in device_keys.iteritems(): + for user_id, device_keys in iteritems(results): + for device_id, device_info in iteritems(device_keys): device_info["keys"] = json.loads(device_info.pop("key_json")) defer.returnValue(results) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index f084a5f54b..d0350ee5fe 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -22,6 +22,8 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks import logging import simplejson as json +from six import iteritems + logger = logging.getLogger(__name__) @@ -420,7 +422,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): txn.executemany(sql, ( _gen_entry(user_id, actions) - for user_id, actions in user_id_actions.iteritems() + for user_id, actions in iteritems(user_id_actions) )) return self.runInteraction( -- cgit 1.5.1