diff options
Diffstat (limited to 'synapse/handlers')
30 files changed, 441 insertions, 416 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index d358842b3e..4b9923d8c0 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -13,13 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .register import RegistrationHandler -from .room import RoomContextHandler -from .message import MessageHandler -from .federation import FederationHandler -from .directory import DirectoryHandler from .admin import AdminHandler +from .directory import DirectoryHandler +from .federation import FederationHandler from .identity import IdentityHandler +from .message import MessageHandler +from .register import RegistrationHandler +from .room import RoomContextHandler from .search import SearchHandler diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 2d1db0c245..b6a8b3aa3b 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -18,11 +18,10 @@ import logging from twisted.internet import defer import synapse.types -from synapse.api.constants import Membership, EventTypes +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import LimitExceededError from synapse.types import UserID - logger = logging.getLogger(__name__) diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index f36b358b45..5d629126fc 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -13,12 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer from ._base import BaseHandler -import logging - logger = logging.getLogger(__name__) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 1c29c43a83..ec9fe01a5a 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -13,19 +13,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +import logging from six import itervalues +from prometheus_client import Counter + +from twisted.internet import defer + import synapse from synapse.api.constants import EventTypes +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.metrics import Measure -from synapse.util.logcontext import ( - make_deferred_yieldable, run_in_background, -) -from prometheus_client import Counter - -import logging logger = logging.getLogger(__name__) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 3c0051586d..402e44cdef 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -13,29 +13,33 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import logging + +import attr +import bcrypt +import pymacaroons +from canonicaljson import json + from twisted.internet import defer, threads +from twisted.web.client import PartialDownloadError -from ._base import BaseHandler +import synapse.util.stringutils as stringutils from synapse.api.constants import LoginType from synapse.api.errors import ( - AuthError, Codes, InteractiveAuthIncompleteError, LoginError, StoreError, + AuthError, + Codes, + InteractiveAuthIncompleteError, + LoginError, + StoreError, SynapseError, ) from synapse.module_api import ModuleApi from synapse.types import UserID -from synapse.util.async import run_on_reactor from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logcontext import make_deferred_yieldable -from twisted.web.client import PartialDownloadError - -import logging -import bcrypt -import pymacaroons -import simplejson - -import synapse.util.stringutils as stringutils - +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -402,7 +406,7 @@ class AuthHandler(BaseHandler): except PartialDownloadError as pde: # Twisted is silly data = pde.response - resp_body = simplejson.loads(data) + resp_body = json.loads(data) if 'success' in resp_body: # Note that we do NOT check the hostname here: we explicitly @@ -423,15 +427,11 @@ class AuthHandler(BaseHandler): def _check_msisdn(self, authdict, _): return self._check_threepid('msisdn', authdict) - @defer.inlineCallbacks def _check_dummy_auth(self, authdict, _): - yield run_on_reactor() - defer.returnValue(True) + return defer.succeed(True) @defer.inlineCallbacks def _check_threepid(self, medium, authdict): - yield run_on_reactor() - if 'threepid_creds' not in authdict: raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM) @@ -825,6 +825,15 @@ class AuthHandler(BaseHandler): if medium == 'email': address = address.lower() + identity_handler = self.hs.get_handlers().identity_handler + yield identity_handler.unbind_threepid( + user_id, + { + 'medium': medium, + 'address': address, + }, + ) + ret = yield self.store.user_delete_threepid( user_id, medium, address, ) @@ -849,7 +858,11 @@ class AuthHandler(BaseHandler): return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper, bcrypt.gensalt(self.bcrypt_rounds)) - return make_deferred_yieldable(threads.deferToThread(_do_hash)) + return make_deferred_yieldable( + threads.deferToThreadPool( + self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash + ), + ) def validate_hash(self, password, stored_hash): """Validates that self.hash(password) == stored_hash. @@ -869,16 +882,21 @@ class AuthHandler(BaseHandler): ) if stored_hash: - return make_deferred_yieldable(threads.deferToThread(_do_validate_hash)) + return make_deferred_yieldable( + threads.deferToThreadPool( + self.hs.get_reactor(), + self.hs.get_reactor().getThreadPool(), + _do_validate_hash, + ), + ) else: return defer.succeed(False) -class MacaroonGeneartor(object): - def __init__(self, hs): - self.clock = hs.get_clock() - self.server_name = hs.config.server_name - self.macaroon_secret_key = hs.config.macaroon_secret_key +@attr.s +class MacaroonGenerator(object): + + hs = attr.ib() def generate_access_token(self, user_id, extra_caveats=None): extra_caveats = extra_caveats or [] @@ -896,7 +914,7 @@ class MacaroonGeneartor(object): def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)): macaroon = self._generate_base_macaroon(user_id) macaroon.add_first_party_caveat("type = login") - now = self.clock.time_msec() + now = self.hs.get_clock().time_msec() expiry = now + duration_in_ms macaroon.add_first_party_caveat("time < %d" % (expiry,)) return macaroon.serialize() @@ -908,9 +926,9 @@ class MacaroonGeneartor(object): def _generate_base_macaroon(self, user_id): macaroon = pymacaroons.Macaroon( - location=self.server_name, + location=self.hs.config.server_name, identifier="key", - key=self.macaroon_secret_key) + key=self.hs.config.macaroon_secret_key) macaroon.add_first_party_caveat("gen = 1") macaroon.add_first_party_caveat("user_id = %s" % (user_id,)) return macaroon diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index c5e92f6214..b3c5a9ee64 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -12,13 +12,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer, reactor +import logging -from ._base import BaseHandler +from twisted.internet import defer + +from synapse.api.errors import SynapseError from synapse.types import UserID, create_requester from synapse.util.logcontext import run_in_background -import logging +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -30,6 +32,7 @@ class DeactivateAccountHandler(BaseHandler): self._auth_handler = hs.get_auth_handler() self._device_handler = hs.get_device_handler() self._room_member_handler = hs.get_room_member_handler() + self._identity_handler = hs.get_handlers().identity_handler self.user_directory_handler = hs.get_user_directory_handler() # Flag that indicates whether the process to part users from rooms is running @@ -37,14 +40,15 @@ class DeactivateAccountHandler(BaseHandler): # Start the user parter loop so it can resume parting users from rooms where # it left off (if it has work left to do). - reactor.callWhenRunning(self._start_user_parting) + hs.get_reactor().callWhenRunning(self._start_user_parting) @defer.inlineCallbacks - def deactivate_account(self, user_id): + def deactivate_account(self, user_id, erase_data): """Deactivate a user's account Args: user_id (str): ID of user to be deactivated + erase_data (bool): whether to GDPR-erase the user's data Returns: Deferred @@ -52,14 +56,35 @@ class DeactivateAccountHandler(BaseHandler): # FIXME: Theoretically there is a race here wherein user resets # password using threepid. - # first delete any devices belonging to the user, which will also + # delete threepids first. We remove these from the IS so if this fails, + # leave the user still active so they can try again. + # Ideally we would prevent password resets and then do this in the + # background thread. + threepids = yield self.store.user_get_threepids(user_id) + for threepid in threepids: + try: + yield self._identity_handler.unbind_threepid( + user_id, + { + 'medium': threepid['medium'], + 'address': threepid['address'], + }, + ) + except Exception: + # Do we want this to be a fatal error or should we carry on? + logger.exception("Failed to remove threepid from ID server") + raise SynapseError(400, "Failed to remove threepid from ID server") + yield self.store.user_delete_threepid( + user_id, threepid['medium'], threepid['address'], + ) + + # delete any devices belonging to the user, which will also # delete corresponding access tokens. yield self._device_handler.delete_all_devices_for_user(user_id) # then delete any remaining access tokens which weren't associated with # a device. yield self._auth_handler.delete_access_tokens_for_user(user_id) - yield self.store.user_delete_threepids(user_id) yield self.store.user_set_password_hash(user_id, None) # Add the user to a table of users pending deactivation (ie. @@ -69,6 +94,11 @@ class DeactivateAccountHandler(BaseHandler): # delete from user directory yield self.user_directory_handler.handle_user_deactivated(user_id) + # Mark the user as erased, if they asked for that + if erase_data: + logger.info("Marking %s as erased", user_id) + yield self.store.mark_user_erased(user_id) + # Now start the process that goes through that list and # parts users from rooms (if it isn't already running) self._start_user_parting() diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 11c6fb3657..2d44f15da3 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -12,21 +12,23 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging + +from six import iteritems, itervalues + +from twisted.internet import defer + from synapse.api import errors from synapse.api.constants import EventTypes from synapse.api.errors import FederationDeniedError +from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util import stringutils from synapse.util.async import Linearizer from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.retryutils import NotRetryingDestination from synapse.util.metrics import measure_func -from synapse.types import get_domain_from_id, RoomStreamToken -from twisted.internet import defer -from ._base import BaseHandler - -import logging +from synapse.util.retryutils import NotRetryingDestination -from six import itervalues, iteritems +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -537,7 +539,7 @@ class DeviceListEduUpdater(object): yield self.device_handler.notify_device_update(user_id, device_ids) else: # Simply update the single device, since we know that is the only - # change (becuase of the single prev_id matching the current cache) + # change (because of the single prev_id matching the current cache) for device_id, stream_id, prev_ids, content in pending_updates: yield self.store.update_remote_device_list_cache_entry( user_id, device_id, content, stream_id, diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index f147a20b73..2e2e5261de 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -18,10 +18,9 @@ import logging from twisted.internet import defer from synapse.api.errors import SynapseError -from synapse.types import get_domain_from_id, UserID +from synapse.types import UserID, get_domain_from_id from synapse.util.stringutils import random_string - logger = logging.getLogger(__name__) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index c5b6e75e03..ef866da1b6 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -14,15 +14,16 @@ # limitations under the License. +import logging +import string + from twisted.internet import defer -from ._base import BaseHandler -from synapse.api.errors import SynapseError, Codes, CodeMessageException, AuthError from synapse.api.constants import EventTypes +from synapse.api.errors import AuthError, CodeMessageException, Codes, SynapseError from synapse.types import RoomAlias, UserID, get_domain_from_id -import logging -import string +from ._base import BaseHandler logger = logging.getLogger(__name__) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 8a2d177539..5816bf8b4f 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -14,17 +14,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import simplejson as json import logging -from canonicaljson import encode_canonical_json -from twisted.internet import defer from six import iteritems -from synapse.api.errors import ( - SynapseError, CodeMessageException, FederationDeniedError, -) -from synapse.types import get_domain_from_id, UserID +from canonicaljson import encode_canonical_json, json + +from twisted.internet import defer + +from synapse.api.errors import CodeMessageException, FederationDeniedError, SynapseError +from synapse.types import UserID, get_domain_from_id from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.retryutils import NotRetryingDestination @@ -80,7 +79,7 @@ class E2eKeysHandler(object): else: remote_queries[user_id] = device_ids - # Firt get local devices. + # First get local devices. failures = {} results = {} if local_query: @@ -357,7 +356,7 @@ def _exception_to_failure(e): # include ConnectionRefused and other errors # # Note that some Exceptions (notably twisted's ResponseFailed etc) don't - # give a string for e.message, which simplejson then fails to serialize. + # give a string for e.message, which json then fails to serialize. return { "status": 503, "message": str(e.message), } diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 8bc642675f..c3f2d7feff 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -13,20 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging +import random + from twisted.internet import defer -from synapse.util.logutils import log_function -from synapse.types import UserID -from synapse.events.utils import serialize_event -from synapse.api.constants import Membership, EventTypes +from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase +from synapse.events.utils import serialize_event +from synapse.types import UserID +from synapse.util.logutils import log_function from ._base import BaseHandler -import logging -import random - - logger = logging.getLogger(__name__) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 495ac4c648..65f6041b10 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -20,37 +20,41 @@ import itertools import logging import sys -from signedjson.key import decode_verify_key_bytes -from signedjson.sign import verify_signed_json import six -from six.moves import http_client from six import iteritems -from twisted.internet import defer +from six.moves import http_client + +from signedjson.key import decode_verify_key_bytes +from signedjson.sign import verify_signed_json from unpaddedbase64 import decode_base64 -from ._base import BaseHandler +from twisted.internet import defer +from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.api.errors import ( - AuthError, FederationError, StoreError, CodeMessageException, SynapseError, + AuthError, + CodeMessageException, FederationDeniedError, + FederationError, + StoreError, + SynapseError, ) -from synapse.api.constants import EventTypes, Membership, RejectedReason -from synapse.events.validator import EventValidator -from synapse.util import unwrapFirstError, logcontext -from synapse.util.metrics import measure_func -from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor, Linearizer -from synapse.util.frozenutils import unfreeze from synapse.crypto.event_signing import ( - compute_event_signature, add_hashes_and_signatures, + add_hashes_and_signatures, + compute_event_signature, ) +from synapse.events.validator import EventValidator +from synapse.state import resolve_events_with_factory from synapse.types import UserID, get_domain_from_id - -from synapse.events.utils import prune_event - +from synapse.util import logcontext, unwrapFirstError +from synapse.util.async import Linearizer +from synapse.util.distributor import user_joined_room +from synapse.util.frozenutils import unfreeze +from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination +from synapse.visibility import filter_events_for_server -from synapse.util.distributor import user_joined_room +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -89,7 +93,9 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def on_receive_pdu(self, origin, pdu, get_missing=True): + def on_receive_pdu( + self, origin, pdu, get_missing=True, sent_to_us_directly=False, + ): """ Process a PDU received via a federation /send/ transaction, or via backfill of missing prev_events @@ -103,8 +109,10 @@ class FederationHandler(BaseHandler): """ # We reprocess pdus when we have seen them only as outliers - existing = yield self.get_persisted_pdu( - origin, pdu.event_id, do_auth=False + existing = yield self.store.get_event( + pdu.event_id, + allow_none=True, + allow_rejected=True, ) # FIXME: Currently we fetch an event again when we already have it @@ -161,14 +169,11 @@ class FederationHandler(BaseHandler): "Ignoring PDU %s for room %s from %s as we've left the room!", pdu.event_id, pdu.room_id, origin, ) - return + defer.returnValue(None) state = None - auth_chain = [] - fetch_state = False - # Get missing pdus if necessary. if not pdu.internal_metadata.is_outlier(): # We only backfill backwards to the min depth. @@ -223,26 +228,60 @@ class FederationHandler(BaseHandler): list(prevs - seen)[:5], ) - if prevs - seen: - logger.info( - "Still missing %d events for room %r: %r...", - len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + if sent_to_us_directly and prevs - seen: + # If they have sent it to us directly, and the server + # isn't telling us about the auth events that it's + # made a message referencing, we explode + raise FederationError( + "ERROR", + 403, + ( + "Your server isn't divulging details about prev_events " + "referenced in this event." + ), + affected=pdu.event_id, ) - fetch_state = True + elif prevs - seen: + # Calculate the state of the previous events, and + # de-conflict them to find the current state. + state_groups = [] + auth_chains = set() + try: + # Get the state of the events we know about + ours = yield self.store.get_state_groups(pdu.room_id, list(seen)) + state_groups.append(ours) + + # Ask the remote server for the states we don't + # know about + for p in prevs - seen: + state, got_auth_chain = ( + yield self.replication_layer.get_state_for_room( + origin, pdu.room_id, p + ) + ) + auth_chains.update(got_auth_chain) + state_group = {(x.type, x.state_key): x.event_id for x in state} + state_groups.append(state_group) + + # Resolve any conflicting state + def fetch(ev_ids): + return self.store.get_events( + ev_ids, get_prev_content=False, check_redacted=False + ) - if fetch_state: - # We need to get the state at this event, since we haven't - # processed all the prev events. - logger.debug( - "_handle_new_pdu getting state for %s", - pdu.room_id - ) - try: - state, auth_chain = yield self.replication_layer.get_state_for_room( - origin, pdu.room_id, pdu.event_id, - ) - except Exception: - logger.exception("Failed to get state for event: %s", pdu.event_id) + state_map = yield resolve_events_with_factory( + state_groups, {pdu.event_id: pdu}, fetch + ) + + state = (yield self.store.get_events(state_map.values())).values() + auth_chain = list(auth_chains) + except Exception: + raise FederationError( + "ERROR", + 403, + "We can't get valid state history.", + affected=pdu.event_id, + ) yield self._process_received_pdu( origin, @@ -320,11 +359,17 @@ class FederationHandler(BaseHandler): for e in missing_events: logger.info("Handling found event %s", e.event_id) - yield self.on_receive_pdu( - origin, - e, - get_missing=False - ) + try: + yield self.on_receive_pdu( + origin, + e, + get_missing=False + ) + except FederationError as e: + if e.code == 403: + logger.warn("Event %s failed history check.") + else: + raise @log_function @defer.inlineCallbacks @@ -455,83 +500,6 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - @measure_func("_filter_events_for_server") - @defer.inlineCallbacks - def _filter_events_for_server(self, server_name, room_id, events): - event_to_state_ids = yield self.store.get_state_ids_for_events( - frozenset(e.event_id for e in events), - types=( - (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, None), - ) - ) - - # We only want to pull out member events that correspond to the - # server's domain. - - def check_match(id): - try: - return server_name == get_domain_from_id(id) - except Exception: - return False - - # Parses mapping `event_id -> (type, state_key) -> state event_id` - # to get all state ids that we're interested in. - event_map = yield self.store.get_events([ - e_id - for key_to_eid in list(event_to_state_ids.values()) - for key, e_id in key_to_eid.items() - if key[0] != EventTypes.Member or check_match(key[1]) - ]) - - event_to_state = { - e_id: { - key: event_map[inner_e_id] - for key, inner_e_id in key_to_eid.iteritems() - if inner_e_id in event_map - } - for e_id, key_to_eid in event_to_state_ids.iteritems() - } - - def redact_disallowed(event, state): - if not state: - return event - - history = state.get((EventTypes.RoomHistoryVisibility, ''), None) - if history: - visibility = history.content.get("history_visibility", "shared") - if visibility in ["invited", "joined"]: - # We now loop through all state events looking for - # membership states for the requesting server to determine - # if the server is either in the room or has been invited - # into the room. - for ev in state.itervalues(): - if ev.type != EventTypes.Member: - continue - try: - domain = get_domain_from_id(ev.state_key) - except Exception: - continue - - if domain != server_name: - continue - - memtype = ev.membership - if memtype == Membership.JOIN: - return event - elif memtype == Membership.INVITE: - if visibility == "invited": - return event - else: - return prune_event(event) - - return event - - defer.returnValue([ - redact_disallowed(e, event_to_state[e.event_id]) - for e in events - ]) - @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit, extremities): @@ -938,16 +906,6 @@ class FederationHandler(BaseHandler): [auth_id for auth_id, _ in event.auth_events], include_given=True ) - - for event in auth: - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - defer.returnValue([e for e in auth]) @log_function @@ -1381,8 +1339,6 @@ class FederationHandler(BaseHandler): def get_state_for_pdu(self, room_id, event_id): """Returns the state at the event. i.e. not including said event. """ - yield run_on_reactor() - state_groups = yield self.store.get_state_groups( room_id, [event_id] ) @@ -1405,18 +1361,6 @@ class FederationHandler(BaseHandler): del results[(event.type, event.state_key)] res = list(results.values()) - for event in res: - # We sign these again because there was a bug where we - # incorrectly signed things the first time round - if self.is_mine_id(event.event_id): - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - defer.returnValue(res) else: defer.returnValue([]) @@ -1425,8 +1369,6 @@ class FederationHandler(BaseHandler): def get_state_ids_for_pdu(self, room_id, event_id): """Returns the state at the event. i.e. not including said event. """ - yield run_on_reactor() - state_groups = yield self.store.get_state_groups_ids( room_id, [event_id] ) @@ -1462,17 +1404,26 @@ class FederationHandler(BaseHandler): limit ) - events = yield self._filter_events_for_server(origin, room_id, events) + events = yield filter_events_for_server(self.store, origin, events) defer.returnValue(events) @defer.inlineCallbacks @log_function - def get_persisted_pdu(self, origin, event_id, do_auth=True): - """ Get a PDU from the database with given origin and id. + def get_persisted_pdu(self, origin, event_id): + """Get an event from the database for the given server. + + Args: + origin [str]: hostname of server which is requesting the event; we + will check that the server is allowed to see it. + event_id [str]: id of the event being requested Returns: - Deferred: Results in a `Pdu`. + Deferred[EventBase|None]: None if we know nothing about the event; + otherwise the (possibly-redacted) event. + + Raises: + AuthError if the server is not currently in the room """ event = yield self.store.get_event( event_id, @@ -1481,32 +1432,17 @@ class FederationHandler(BaseHandler): ) if event: - if self.is_mine_id(event.event_id): - # FIXME: This is a temporary work around where we occasionally - # return events slightly differently than when they were - # originally signed - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - - if do_auth: - in_room = yield self.auth.check_host_in_room( - event.room_id, - origin - ) - if not in_room: - raise AuthError(403, "Host not in room.") - - events = yield self._filter_events_for_server( - origin, event.room_id, [event] - ) - - event = events[0] + in_room = yield self.auth.check_host_in_room( + event.room_id, + origin + ) + if not in_room: + raise AuthError(403, "Host not in room.") + events = yield filter_events_for_server( + self.store, origin, [event], + ) + event = events[0] defer.returnValue(event) else: defer.returnValue(None) @@ -1760,15 +1696,6 @@ class FederationHandler(BaseHandler): local_auth_chain, remote_auth_chain ) - for event in ret["auth_chain"]: - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - logger.debug("on_query_auth returning: %s", ret) defer.returnValue(ret) @@ -1794,8 +1721,8 @@ class FederationHandler(BaseHandler): min_depth=min_depth, ) - missing_events = yield self._filter_events_for_server( - origin, room_id, missing_events, + missing_events = yield filter_events_for_server( + self.store, origin, missing_events, ) defer.returnValue(missing_events) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index dcae083734..53e5e2648b 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -14,14 +14,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +import logging + from six import iteritems +from twisted.internet import defer + from synapse.api.errors import SynapseError from synapse.types import get_domain_from_id -import logging - logger = logging.getLogger(__name__) diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 91a0898860..8c8aedb2b8 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2017 Vector Creations Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,16 +19,18 @@ import logging -import simplejson as json +from canonicaljson import json from twisted.internet import defer from synapse.api.errors import ( - MatrixCodeMessageException, CodeMessageException + CodeMessageException, + Codes, + MatrixCodeMessageException, + SynapseError, ) + from ._base import BaseHandler -from synapse.util.async import run_on_reactor -from synapse.api.errors import SynapseError, Codes logger = logging.getLogger(__name__) @@ -38,6 +41,7 @@ class IdentityHandler(BaseHandler): super(IdentityHandler, self).__init__(hs) self.http_client = hs.get_simple_http_client() + self.federation_http_client = hs.get_http_client() self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers) self.trust_any_id_server_just_for_testing_do_not_use = ( @@ -60,8 +64,6 @@ class IdentityHandler(BaseHandler): @defer.inlineCallbacks def threepid_from_creds(self, creds): - yield run_on_reactor() - if 'id_server' in creds: id_server = creds['id_server'] elif 'idServer' in creds: @@ -104,7 +106,6 @@ class IdentityHandler(BaseHandler): @defer.inlineCallbacks def bind_threepid(self, creds, mxid): - yield run_on_reactor() logger.debug("binding threepid %r to %s", creds, mxid) data = None @@ -139,9 +140,53 @@ class IdentityHandler(BaseHandler): defer.returnValue(data) @defer.inlineCallbacks - def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs): - yield run_on_reactor() + def unbind_threepid(self, mxid, threepid): + """ + Removes a binding from an identity server + Args: + mxid (str): Matrix user ID of binding to be removed + threepid (dict): Dict with medium & address of binding to be removed + + Returns: + Deferred[bool]: True on success, otherwise False + """ + logger.debug("unbinding threepid %r from %s", threepid, mxid) + if not self.trusted_id_servers: + logger.warn("Can't unbind threepid: no trusted ID servers set in config") + defer.returnValue(False) + + # We don't track what ID server we added 3pids on (perhaps we ought to) + # but we assume that any of the servers in the trusted list are in the + # same ID server federation, so we can pick any one of them to send the + # deletion request to. + id_server = next(iter(self.trusted_id_servers)) + + url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,) + content = { + "mxid": mxid, + "threepid": threepid, + } + headers = {} + # we abuse the federation http client to sign the request, but we have to send it + # using the normal http client since we don't want the SRV lookup and want normal + # 'browser-like' HTTPS. + self.federation_http_client.sign_request( + destination=None, + method='POST', + url_bytes='/_matrix/identity/api/v1/3pid/unbind'.encode('ascii'), + headers_dict=headers, + content=content, + destination_is=id_server, + ) + yield self.http_client.post_json_get_json( + url, + content, + headers, + ) + defer.returnValue(True) + @defer.inlineCallbacks + def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs): if not self._should_trust_id_server(id_server): raise SynapseError( 400, "Untrusted ID server '%s'" % id_server, @@ -176,8 +221,6 @@ class IdentityHandler(BaseHandler): self, id_server, country, phone_number, client_secret, send_attempt, **kwargs ): - yield run_on_reactor() - if not self._should_trust_id_server(id_server): raise SynapseError( 400, "Untrusted ID server '%s'" % id_server, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 71af86fe21..fb11716eb8 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer from synapse.api.constants import EventTypes, Membership @@ -21,9 +23,7 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.handlers.presence import format_user_presence_state from synapse.streams.config import PaginationConfig -from synapse.types import ( - UserID, StreamToken, -) +from synapse.types import StreamToken, UserID from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache @@ -32,9 +32,6 @@ from synapse.visibility import filter_events_for_client from ._base import BaseHandler -import logging - - logger = logging.getLogger(__name__) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 1cb81b6cf8..a39b852ceb 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -14,35 +14,31 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import simplejson import sys -from canonicaljson import encode_canonical_json import six -from six import string_types, itervalues, iteritems -from twisted.internet import defer, reactor +from six import iteritems, itervalues, string_types + +from canonicaljson import encode_canonical_json, json + +from twisted.internet import defer from twisted.internet.defer import succeed from twisted.python.failure import Failure -from synapse.api.constants import EventTypes, Membership, MAX_DEPTH -from synapse.api.errors import ( - AuthError, Codes, SynapseError, - ConsentNotGivenError, -) +from synapse.api.constants import MAX_DEPTH, EventTypes, Membership +from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError from synapse.api.urls import ConsentURIBuilder from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator -from synapse.types import ( - UserID, RoomAlias, RoomStreamToken, -) -from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter +from synapse.replication.http.send_event import send_event_to_master +from synapse.types import RoomAlias, RoomStreamToken, UserID +from synapse.util.async import Limiter, ReadWriteLock +from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func -from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client -from synapse.replication.http.send_event import send_event_to_master from ._base import BaseHandler @@ -157,7 +153,7 @@ class MessageHandler(BaseHandler): # remove the purge from the list 24 hours after it completes def clear_purge(): del self._purges_by_id[purge_id] - reactor.callLater(24 * 3600, clear_purge) + self.hs.get_reactor().callLater(24 * 3600, clear_purge) def get_purge_status(self, purge_id): """Get the current status of an active purge @@ -388,7 +384,7 @@ class MessageHandler(BaseHandler): users_with_profile = yield self.state.get_current_user_in_room(room_id) # If this is an AS, double check that they are allowed to see the members. - # This can either be because the AS user is in the room or becuase there + # This can either be because the AS user is in the room or because there # is a user in the room that the AS is "interested in" if requester.app_service and user_id not in users_with_profile: for uid in users_with_profile: @@ -491,7 +487,7 @@ class EventCreationHandler(object): target, e ) - is_exempt = yield self._is_exempt_from_privacy_policy(builder) + is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester) if not is_exempt: yield self.assert_accepted_privacy_policy(requester) @@ -509,12 +505,13 @@ class EventCreationHandler(object): defer.returnValue((event, context)) - def _is_exempt_from_privacy_policy(self, builder): + def _is_exempt_from_privacy_policy(self, builder, requester): """"Determine if an event to be sent is exempt from having to consent to the privacy policy Args: builder (synapse.events.builder.EventBuilder): event being created + requester (Requster): user requesting this event Returns: Deferred[bool]: true if the event can be sent without the user @@ -525,6 +522,9 @@ class EventCreationHandler(object): membership = builder.content.get("membership", None) if membership == Membership.JOIN: return self._is_server_notices_room(builder.room_id) + elif membership == Membership.LEAVE: + # the user is always allowed to leave (but not kick people) + return builder.state_key == requester.user.to_string() return succeed(False) @defer.inlineCallbacks @@ -793,7 +793,7 @@ class EventCreationHandler(object): # Ensure that we can round trip before trying to persist in db try: dump = frozendict_json_encoder.encode(event.content) - simplejson.loads(dump) + json.loads(dump) except Exception: logger.exception("Failed to encode content: %r", event.content) raise @@ -806,6 +806,7 @@ class EventCreationHandler(object): # If we're a worker we need to hit out to the master. if self.config.worker_app: yield send_event_to_master( + self.hs.get_clock(), self.http_client, host=self.config.worker_replication_host, port=self.config.worker_replication_http_port, @@ -959,9 +960,7 @@ class EventCreationHandler(object): event_stream_id, max_stream_id ) - @defer.inlineCallbacks def _notify(): - yield run_on_reactor() try: self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7fe568132f..3732830194 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -22,27 +22,26 @@ The methods that define policy are: - should_notify """ -from twisted.internet import defer, reactor +import logging from contextlib import contextmanager -from six import itervalues, iteritems +from six import iteritems, itervalues + +from prometheus_client import Counter + +from twisted.internet import defer -from synapse.api.errors import SynapseError from synapse.api.constants import PresenceState +from synapse.api.errors import SynapseError +from synapse.metrics import LaterGauge from synapse.storage.presence import UserPresenceState - -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.types import UserID, get_domain_from_id from synapse.util.async import Linearizer +from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.logcontext import run_in_background from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer -from synapse.types import UserID, get_domain_from_id -from synapse.metrics import LaterGauge - -import logging - -from prometheus_client import Counter logger = logging.getLogger(__name__) @@ -179,7 +178,7 @@ class PresenceHandler(object): # have not yet been persisted self.unpersisted_users_changes = set() - reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown) + hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown) self.serial_to_user = {} self._next_serial = 1 diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 3465a787ab..859f6d2b2e 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -17,8 +17,9 @@ import logging from twisted.internet import defer -from synapse.api.errors import SynapseError, AuthError, CodeMessageException +from synapse.api.errors import AuthError, CodeMessageException, SynapseError from synapse.types import UserID, get_domain_from_id + from ._base import BaseHandler logger = logging.getLogger(__name__) diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 5142ae153d..995460f82a 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -13,13 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import BaseHandler +import logging from twisted.internet import defer from synapse.util.async import Linearizer -import logging +from ._base import BaseHandler + logger = logging.getLogger(__name__) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 2e0672161c..cb905a3903 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -12,17 +12,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util import logcontext - -from ._base import BaseHandler +import logging from twisted.internet import defer -from synapse.util.logcontext import PreserveLoggingContext from synapse.types import get_domain_from_id +from synapse.util import logcontext +from synapse.util.logcontext import PreserveLoggingContext -import logging - +from ._base import BaseHandler logger = logging.getLogger(__name__) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 7e52adda3c..7caff0cbc8 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -18,14 +18,19 @@ import logging from twisted.internet import defer +from synapse import types from synapse.api.errors import ( - AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError + AuthError, + Codes, + InvalidCaptchaError, + RegistrationError, + SynapseError, ) from synapse.http.client import CaptchaServerHttpClient -from synapse import types -from synapse.types import UserID, create_requester, RoomID, RoomAlias -from synapse.util.async import run_on_reactor, Linearizer +from synapse.types import RoomAlias, RoomID, UserID, create_requester +from synapse.util.async import Linearizer from synapse.util.threepids import check_3pid_allowed + from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -139,7 +144,6 @@ class RegistrationHandler(BaseHandler): Raises: RegistrationError if there was a problem registering. """ - yield run_on_reactor() password_hash = None if password: password_hash = yield self.auth_handler().hash(password) @@ -431,8 +435,6 @@ class RegistrationHandler(BaseHandler): Raises: RegistrationError if there was a problem registering. """ - yield run_on_reactor() - if localpart is None: raise SynapseError(400, "Request must include user id") diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 2abd63ad05..f67512078b 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -15,23 +15,20 @@ # limitations under the License. """Contains functions for performing events on rooms.""" -from twisted.internet import defer +import logging +import math +import string +from collections import OrderedDict -from ._base import BaseHandler +from twisted.internet import defer -from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken -from synapse.api.constants import ( - EventTypes, JoinRules, RoomCreationPreset -) -from synapse.api.errors import AuthError, StoreError, SynapseError +from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset +from synapse.api.errors import AuthError, Codes, StoreError, SynapseError +from synapse.types import RoomAlias, RoomID, RoomStreamToken, UserID from synapse.util import stringutils from synapse.visibility import filter_events_for_client -from collections import OrderedDict - -import logging -import math -import string +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -115,7 +112,11 @@ class RoomCreationHandler(BaseHandler): ) if mapping: - raise SynapseError(400, "Room alias already taken") + raise SynapseError( + 400, + "Room alias already taken", + Codes.ROOM_IN_USE + ) else: room_alias = None diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index fc507cef36..828229f5c3 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -13,26 +13,24 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +import logging +from collections import namedtuple from six import iteritems from six.moves import range -from ._base import BaseHandler +import msgpack +from unpaddedbase64 import decode_base64, encode_base64 + +from twisted.internet import defer -from synapse.api.constants import ( - EventTypes, JoinRules, -) +from synapse.api.constants import EventTypes, JoinRules +from synapse.types import ThirdPartyInstanceID from synapse.util.async import concurrently_execute from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.response_cache import ResponseCache -from synapse.types import ThirdPartyInstanceID - -from collections import namedtuple -from unpaddedbase64 import encode_base64, decode_base64 -import logging -import msgpack +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -40,7 +38,7 @@ REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000 # This is used to indicate we should only return rooms published to the main list. -EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) +EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) class RoomListHandler(BaseHandler): @@ -52,7 +50,7 @@ class RoomListHandler(BaseHandler): def get_local_public_room_list(self, limit=None, since_token=None, search_filter=None, - network_tuple=EMTPY_THIRD_PARTY_ID,): + network_tuple=EMPTY_THIRD_PARTY_ID,): """Generate a local public room list. There are multiple different lists: the main one plus one per third @@ -89,7 +87,7 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def _get_public_room_list(self, limit=None, since_token=None, search_filter=None, - network_tuple=EMTPY_THIRD_PARTY_ID,): + network_tuple=EMPTY_THIRD_PARTY_ID,): if since_token and since_token != "END": since_token = RoomListNextBatch.from_token(since_token) else: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index f930e939e8..00f2e279bc 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -21,19 +21,17 @@ from six.moves import http_client from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json -from twisted.internet import defer from unpaddedbase64 import decode_base64 +from twisted.internet import defer + import synapse.server import synapse.types -from synapse.api.constants import ( - EventTypes, Membership, -) -from synapse.api.errors import AuthError, SynapseError, Codes -from synapse.types import UserID, RoomID +from synapse.api.constants import EventTypes, Membership +from synapse.api.errors import AuthError, Codes, SynapseError +from synapse.types import RoomID, UserID from synapse.util.async import Linearizer -from synapse.util.distributor import user_left_room, user_joined_room - +from synapse.util.distributor import user_joined_room, user_left_room logger = logging.getLogger(__name__) diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 493aec1e48..22d8b4b0d3 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -20,11 +20,12 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.handlers.room_member import RoomMemberHandler from synapse.replication.http.membership import ( - remote_join, remote_reject_invite, get_or_register_3pid_guest, + get_or_register_3pid_guest, notify_user_membership_change, + remote_join, + remote_reject_invite, ) - logger = logging.getLogger(__name__) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 1eca26aa1e..69ae9731d5 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -13,21 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +import itertools +import logging -from ._base import BaseHandler +from unpaddedbase64 import decode_base64, encode_base64 -from synapse.api.constants import Membership, EventTypes -from synapse.api.filtering import Filter +from twisted.internet import defer + +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import SynapseError +from synapse.api.filtering import Filter from synapse.events.utils import serialize_event from synapse.visibility import filter_events_for_client -from unpaddedbase64 import decode_base64, encode_base64 - -import itertools -import logging - +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -64,6 +63,13 @@ class SearchHandler(BaseHandler): except Exception: raise SynapseError(400, "Invalid batch") + logger.info( + "Search batch properties: %r, %r, %r", + batch_group, batch_group_key, batch_token, + ) + + logger.info("Search content: %s", content) + try: room_cat = content["search_categories"]["room_events"] @@ -271,6 +277,8 @@ class SearchHandler(BaseHandler): # We should never get here due to the guard earlier. raise NotImplementedError() + logger.info("Found %d events to return", len(allowed_events)) + # If client has asked for "context" for each event (i.e. some surrounding # events and state), fetch that if event_context is not None: @@ -282,6 +290,11 @@ class SearchHandler(BaseHandler): event.room_id, event.event_id, before_limit, after_limit ) + logger.info( + "Context for search returned %d and %d events", + len(res["events_before"]), len(res["events_after"]), + ) + res["events_before"] = yield filter_events_for_client( self.store, user.to_string(), res["events_before"] ) diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py index e057ae54c9..7ecdede4dc 100644 --- a/synapse/handlers/set_password.py +++ b/synapse/handlers/set_password.py @@ -17,6 +17,7 @@ import logging from twisted.internet import defer from synapse.api.errors import Codes, StoreError, SynapseError + from ._base import BaseHandler logger = logging.getLogger(__name__) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index aaf2a406df..0c21ac2c77 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -14,22 +14,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.api.constants import Membership, EventTypes +import collections +import itertools +import logging + +from six import iteritems, itervalues + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, Membership +from synapse.push.clientformat import format_push_rules_for_user +from synapse.types import RoomStreamToken from synapse.util.async import concurrently_execute +from synapse.util.caches.response_cache import ResponseCache from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure, measure_func -from synapse.util.caches.response_cache import ResponseCache -from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client -from synapse.types import RoomStreamToken - -from twisted.internet import defer - -import collections -import logging -import itertools - -from six import itervalues, iteritems logger = logging.getLogger(__name__) @@ -146,7 +146,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ "invited", # InvitedSyncResult for each invited room. "archived", # ArchivedSyncResult for each archived room. "to_device", # List of direct messages for the device. - "device_lists", # List of user_ids whose devices have chanegd + "device_lists", # List of user_ids whose devices have changed "device_one_time_keys_count", # Dict of algorithm to count for one time keys # for this device "groups", diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 5d9736e88f..2d2d3d5a0d 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -13,17 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging +from collections import namedtuple + from twisted.internet import defer -from synapse.api.errors import SynapseError, AuthError +from synapse.api.errors import AuthError, SynapseError +from synapse.types import UserID, get_domain_from_id from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer -from synapse.types import UserID, get_domain_from_id - -import logging - -from collections import namedtuple logger = logging.getLogger(__name__) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index a39f0f7343..37dda64587 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -14,15 +14,15 @@ # limitations under the License. import logging + +from six import iteritems + from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.storage.roommember import ProfileInfo -from synapse.util.metrics import Measure -from synapse.util.async import sleep from synapse.types import get_localpart_from_id - -from six import iteritems +from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -174,7 +174,7 @@ class UserDirectoryHandler(object): logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids)) yield self._handle_initial_room(room_id) num_processed_rooms += 1 - yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) logger.info("Processed all rooms.") @@ -188,7 +188,7 @@ class UserDirectoryHandler(object): logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids)) yield self._handle_local_user(user_id) num_processed_users += 1 - yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.) + yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.) logger.info("Processed all users") @@ -236,7 +236,7 @@ class UserDirectoryHandler(object): count = 0 for user_id in user_ids: if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) if not self.is_mine_id(user_id): count += 1 @@ -251,7 +251,7 @@ class UserDirectoryHandler(object): continue if count % self.INITIAL_ROOM_SLEEP_COUNT == 0: - yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) + yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.) count += 1 user_set = (user_id, other_user_id) |