diff options
author | Matthew Hodgson <matthew@matrix.org> | 2018-05-29 00:25:22 +0100 |
---|---|---|
committer | Matthew Hodgson <matthew@matrix.org> | 2018-05-29 00:25:22 +0100 |
commit | 7a6df013cc8a128278d2ce7e5eb569e0b424f9b0 (patch) | |
tree | 5de624a65953eb96ab67274462d850a88c0cce3c /synapse/handlers | |
parent | make lazy_load_members configurable in filters (diff) | |
parent | Merge pull request #3256 from matrix-org/3218-official-prom (diff) | |
download | synapse-7a6df013cc8a128278d2ce7e5eb569e0b424f9b0.tar.xz |
merge develop
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/__init__.py | 5 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 62 | ||||
-rw-r--r-- | synapse/handlers/deactivate_account.py | 85 | ||||
-rw-r--r-- | synapse/handlers/device.py | 20 | ||||
-rw-r--r-- | synapse/handlers/e2e_keys.py | 81 | ||||
-rw-r--r-- | synapse/handlers/events.py | 5 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 199 | ||||
-rw-r--r-- | synapse/handlers/groups_local.py | 46 | ||||
-rw-r--r-- | synapse/handlers/identity.py | 8 | ||||
-rw-r--r-- | synapse/handlers/initial_sync.py | 24 | ||||
-rw-r--r-- | synapse/handlers/message.py | 354 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 154 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 61 | ||||
-rw-r--r-- | synapse/handlers/register.py | 64 | ||||
-rw-r--r-- | synapse/handlers/room.py | 28 | ||||
-rw-r--r-- | synapse/handlers/room_list.py | 47 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 74 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 65 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 50 | ||||
-rw-r--r-- | synapse/handlers/user_directory.py | 10 |
20 files changed, 970 insertions, 472 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 8f8fd82eb0..d358842b3e 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -14,9 +14,7 @@ # limitations under the License. from .register import RegistrationHandler -from .room import ( - RoomCreationHandler, RoomContextHandler, -) +from .room import RoomContextHandler from .message import MessageHandler from .federation import FederationHandler from .directory import DirectoryHandler @@ -47,7 +45,6 @@ class Handlers(object): def __init__(self, hs): self.registration_handler = RegistrationHandler(hs) self.message_handler = MessageHandler(hs) - self.room_creation_handler = RoomCreationHandler(hs) self.federation_handler = FederationHandler(hs) self.directory_handler = DirectoryHandler(hs) self.admin_handler = AdminHandler(hs) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 3dd3fa2a27..d9f35a5dba 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -18,15 +18,16 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes from synapse.util.metrics import Measure -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import ( + make_deferred_yieldable, run_in_background, +) +from prometheus_client import Counter import logging logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - -events_processed_counter = metrics.register_counter("events_processed") +events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "") def log_failure(failure): @@ -84,11 +85,16 @@ class ApplicationServicesHandler(object): if not events: break + events_by_room = {} for event in events: + events_by_room.setdefault(event.room_id, []).append(event) + + @defer.inlineCallbacks + def handle_event(event): # Gather interested services services = yield self._get_services_for_event(event) if len(services) == 0: - continue # no services need notifying + return # no services need notifying # Do we know this user exists? If not, poke the user # query API for all services which match that user regex. @@ -104,13 +110,32 @@ class ApplicationServicesHandler(object): # Fork off pushes to these services for service in services: - preserve_fn(self.scheduler.submit_event_for_as)( - service, event - ) + self.scheduler.submit_event_for_as(service, event) - events_processed_counter.inc_by(len(events)) + @defer.inlineCallbacks + def handle_room_events(events): + for event in events: + yield handle_event(event) + + yield make_deferred_yieldable(defer.gatherResults([ + run_in_background(handle_room_events, evs) + for evs in events_by_room.itervalues() + ], consumeErrors=True)) yield self.store.set_appservice_last_pos(upper_bound) + + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_positions.labels( + "appservice_sender").set(upper_bound) + + events_processed_counter.inc(len(events)) + + synapse.metrics.event_processing_lag.labels( + "appservice_sender").set(now - ts) + synapse.metrics.event_processing_last_ts.labels( + "appservice_sender").set(ts) finally: self.is_processing = False @@ -167,7 +192,10 @@ class ApplicationServicesHandler(object): services = yield self._get_services_for_3pn(protocol) results = yield make_deferred_yieldable(defer.DeferredList([ - preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) + run_in_background( + self.appservice_api.query_3pe, + service, kind, protocol, fields, + ) for service in services ], consumeErrors=True)) @@ -228,11 +256,15 @@ class ApplicationServicesHandler(object): event based on the service regex. """ services = self.store.get_app_services() - interested_list = [ - s for s in services if ( - yield s.is_interested(event, self.store) - ) - ] + + # we can't use a list comprehension here. Since python 3, list + # comprehensions use a generator internally. This means you can't yield + # inside of a list comprehension anymore. + interested_list = [] + for s in services: + if (yield s.is_interested(event, self.store)): + interested_list.append(s) + defer.returnValue(interested_list) def _get_services_for_user(self, user_id): diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index b1d3814909..c5e92f6214 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2017 New Vector Ltd +# Copyright 2017, 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,9 +12,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +from twisted.internet import defer, reactor from ._base import BaseHandler +from synapse.types import UserID, create_requester +from synapse.util.logcontext import run_in_background import logging @@ -27,6 +29,15 @@ class DeactivateAccountHandler(BaseHandler): super(DeactivateAccountHandler, self).__init__(hs) self._auth_handler = hs.get_auth_handler() self._device_handler = hs.get_device_handler() + self._room_member_handler = hs.get_room_member_handler() + self.user_directory_handler = hs.get_user_directory_handler() + + # Flag that indicates whether the process to part users from rooms is running + self._user_parter_running = False + + # Start the user parter loop so it can resume parting users from rooms where + # it left off (if it has work left to do). + reactor.callWhenRunning(self._start_user_parting) @defer.inlineCallbacks def deactivate_account(self, user_id): @@ -50,3 +61,73 @@ class DeactivateAccountHandler(BaseHandler): yield self.store.user_delete_threepids(user_id) yield self.store.user_set_password_hash(user_id, None) + + # Add the user to a table of users pending deactivation (ie. + # removal from all the rooms they're a member of) + yield self.store.add_user_pending_deactivation(user_id) + + # delete from user directory + yield self.user_directory_handler.handle_user_deactivated(user_id) + + # Now start the process that goes through that list and + # parts users from rooms (if it isn't already running) + self._start_user_parting() + + def _start_user_parting(self): + """ + Start the process that goes through the table of users + pending deactivation, if it isn't already running. + + Returns: + None + """ + if not self._user_parter_running: + run_in_background(self._user_parter_loop) + + @defer.inlineCallbacks + def _user_parter_loop(self): + """Loop that parts deactivated users from rooms + + Returns: + None + """ + self._user_parter_running = True + logger.info("Starting user parter") + try: + while True: + user_id = yield self.store.get_user_pending_deactivation() + if user_id is None: + break + logger.info("User parter parting %r", user_id) + yield self._part_user(user_id) + yield self.store.del_user_pending_deactivation(user_id) + logger.info("User parter finished parting %r", user_id) + logger.info("User parter finished: stopping") + finally: + self._user_parter_running = False + + @defer.inlineCallbacks + def _part_user(self, user_id): + """Causes the given user_id to leave all the rooms they're joined to + + Returns: + None + """ + user = UserID.from_string(user_id) + + rooms_for_user = yield self.store.get_rooms_for_user(user_id) + for room_id in rooms_for_user: + logger.info("User parter parting %r from %r", user_id, room_id) + try: + yield self._room_member_handler.update_membership( + create_requester(user), + user, + room_id, + "leave", + ratelimit=False, + ) + except Exception: + logger.exception( + "Failed to part user %r from room %r: ignoring and continuing", + user_id, room_id, + ) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 40f3d24678..31bd0e60c6 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -26,6 +26,8 @@ from ._base import BaseHandler import logging +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -155,7 +157,7 @@ class DeviceHandler(BaseHandler): try: yield self.store.delete_device(user_id, device_id) - except errors.StoreError, e: + except errors.StoreError as e: if e.code == 404: # no match pass @@ -204,7 +206,7 @@ class DeviceHandler(BaseHandler): try: yield self.store.delete_devices(user_id, device_ids) - except errors.StoreError, e: + except errors.StoreError as e: if e.code == 404: # no match pass @@ -243,7 +245,7 @@ class DeviceHandler(BaseHandler): new_display_name=content.get("display_name") ) yield self.notify_device_update(user_id, [device_id]) - except errors.StoreError, e: + except errors.StoreError as e: if e.code == 404: raise errors.NotFoundError() else: @@ -318,7 +320,7 @@ class DeviceHandler(BaseHandler): # The user may have left the room # TODO: Check if they actually did or if we were just invited. if room_id not in room_ids: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -338,7 +340,7 @@ class DeviceHandler(BaseHandler): # special-case for an empty prev state: include all members # in the changed list if not event_ids: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -354,10 +356,10 @@ class DeviceHandler(BaseHandler): # Check if we've joined the room? If so we just blindly add all the users to # the "possibly changed" users. - for state_dict in prev_state_ids.itervalues(): + for state_dict in itervalues(prev_state_ids): member_event = state_dict.get((EventTypes.Member, user_id), None) if not member_event or member_event != current_member_id: - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue @@ -367,14 +369,14 @@ class DeviceHandler(BaseHandler): # If there has been any change in membership, include them in the # possibly changed list. We'll check if they are joined below, # and we're not toooo worried about spuriously adding users. - for key, event_id in current_state_ids.iteritems(): + for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: continue # check if this member has changed since any of the extremities # at the stream_ordering, and add them to the list if so. - for state_dict in prev_state_ids.itervalues(): + for state_dict in itervalues(prev_state_ids): prev_event_id = state_dict.get(key, None) if not prev_event_id or prev_event_id != event_id: if state_key != user_id: diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 31b1ece13e..8a2d177539 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,17 +14,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -import ujson as json +import simplejson as json import logging from canonicaljson import encode_canonical_json from twisted.internet import defer +from six import iteritems from synapse.api.errors import ( SynapseError, CodeMessageException, FederationDeniedError, ) from synapse.types import get_domain_from_id, UserID -from synapse.util.logcontext import preserve_fn, make_deferred_yieldable +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -91,7 +93,7 @@ class E2eKeysHandler(object): remote_queries_not_in_cache = {} if remote_queries: query_list = [] - for user_id, device_ids in remote_queries.iteritems(): + for user_id, device_ids in iteritems(remote_queries): if device_ids: query_list.extend((user_id, device_id) for device_id in device_ids) else: @@ -102,9 +104,9 @@ class E2eKeysHandler(object): query_list ) ) - for user_id, devices in remote_results.iteritems(): + for user_id, devices in iteritems(remote_results): user_devices = results.setdefault(user_id, {}) - for device_id, device in devices.iteritems(): + for device_id, device in iteritems(devices): keys = device.get("keys", None) device_display_name = device.get("device_display_name", None) if keys: @@ -134,28 +136,13 @@ class E2eKeysHandler(object): if user_id in destination_query: results[user_id] = keys - except CodeMessageException as e: - failures[destination] = { - "status": e.code, "message": e.message - } - except NotRetryingDestination as e: - failures[destination] = { - "status": 503, "message": "Not ready for retry", - } - except FederationDeniedError as e: - failures[destination] = { - "status": 403, "message": "Federation Denied", - } except Exception as e: - # include ConnectionRefused and other errors - failures[destination] = { - "status": 503, "message": e.message - } + failures[destination] = _exception_to_failure(e) yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(do_remote_query)(destination) + run_in_background(do_remote_query, destination) for destination in remote_queries_not_in_cache - ])) + ], consumeErrors=True)) defer.returnValue({ "device_keys": results, "failures": failures, @@ -252,32 +239,21 @@ class E2eKeysHandler(object): for user_id, keys in remote_result["one_time_keys"].items(): if user_id in device_keys: json_result[user_id] = keys - except CodeMessageException as e: - failures[destination] = { - "status": e.code, "message": e.message - } - except NotRetryingDestination as e: - failures[destination] = { - "status": 503, "message": "Not ready for retry", - } except Exception as e: - # include ConnectionRefused and other errors - failures[destination] = { - "status": 503, "message": e.message - } + failures[destination] = _exception_to_failure(e) yield make_deferred_yieldable(defer.gatherResults([ - preserve_fn(claim_client_keys)(destination) + run_in_background(claim_client_keys, destination) for destination in remote_queries - ])) + ], consumeErrors=True)) logger.info( "Claimed one-time-keys: %s", ",".join(( "%s for %s:%s" % (key_id, user_id, device_id) - for user_id, user_keys in json_result.iteritems() - for device_id, device_keys in user_keys.iteritems() - for key_id, _ in device_keys.iteritems() + for user_id, user_keys in iteritems(json_result) + for device_id, device_keys in iteritems(user_keys) + for key_id, _ in iteritems(device_keys) )), ) @@ -362,6 +338,31 @@ class E2eKeysHandler(object): ) +def _exception_to_failure(e): + if isinstance(e, CodeMessageException): + return { + "status": e.code, "message": e.message, + } + + if isinstance(e, NotRetryingDestination): + return { + "status": 503, "message": "Not ready for retry", + } + + if isinstance(e, FederationDeniedError): + return { + "status": 403, "message": "Federation Denied", + } + + # include ConnectionRefused and other errors + # + # Note that some Exceptions (notably twisted's ResponseFailed etc) don't + # give a string for e.message, which simplejson then fails to serialize. + return { + "status": 503, "message": str(e.message), + } + + def _one_time_keys_match(old_key_json, new_key): old_key = json.loads(old_key_json) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index d3685fb12a..8bc642675f 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -48,6 +48,7 @@ class EventStreamHandler(BaseHandler): self.notifier = hs.get_notifier() self.state = hs.get_state_handler() + self._server_notices_sender = hs.get_server_notices_sender() @defer.inlineCallbacks @log_function @@ -58,6 +59,10 @@ class EventStreamHandler(BaseHandler): If `only_keys` is not None, events from keys will be sent down. """ + + # send any outstanding server notices to the user. + yield self._server_notices_sender.on_user_syncing(auth_user_id) + auth_user = UserID.from_string(auth_user_id) presence_handler = self.hs.get_presence_handler() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 080aca3d71..87c0615820 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -15,8 +15,17 @@ # limitations under the License. """Contains handlers for federation events.""" + +import itertools +import logging +import sys + from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json +import six +from six.moves import http_client +from six import iteritems +from twisted.internet import defer from unpaddedbase64 import decode_base64 from ._base import BaseHandler @@ -43,10 +52,6 @@ from synapse.util.retryutils import NotRetryingDestination from synapse.util.distributor import user_joined_room -from twisted.internet import defer - -import itertools -import logging logger = logging.getLogger(__name__) @@ -77,6 +82,7 @@ class FederationHandler(BaseHandler): self.pusher_pool = hs.get_pusherpool() self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() + self._server_notices_mxid = hs.config.server_notices_mxid # When joining a room we need to queue any events for that room up self.room_queues = {} @@ -115,6 +121,19 @@ class FederationHandler(BaseHandler): logger.debug("Already seen pdu %s", pdu.event_id) return + # do some initial sanity-checking of the event. In particular, make + # sure it doesn't have hundreds of prev_events or auth_events, which + # could cause a huge state resolution or cascade of event fetches. + try: + self._sanity_check_event(pdu) + except SynapseError as err: + raise FederationError( + "ERROR", + err.code, + err.msg, + affected=pdu.event_id, + ) + # If we are currently in the process of joining this room, then we # queue up events for later processing. if pdu.room_id in self.room_queues: @@ -149,10 +168,6 @@ class FederationHandler(BaseHandler): auth_chain = [] - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) - fetch_state = False # Get missing pdus if necessary. @@ -168,7 +183,7 @@ class FederationHandler(BaseHandler): ) prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) + seen = yield self.store.have_seen_events(prevs) if min_depth and pdu.depth < min_depth: # This is so that we don't notify the user about this @@ -196,8 +211,7 @@ class FederationHandler(BaseHandler): # Update the set of things we've seen after trying to # fetch the missing stuff - have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.iterkeys()) + seen = yield self.store.have_seen_events(prevs) if not prevs - seen: logger.info( @@ -248,8 +262,7 @@ class FederationHandler(BaseHandler): min_depth (int): Minimum depth of events to return. """ # We recalculate seen, since it may have changed. - have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.keys()) + seen = yield self.store.have_seen_events(prevs) if not prevs - seen: return @@ -361,9 +374,7 @@ class FederationHandler(BaseHandler): if auth_chain: event_ids |= {e.event_id for e in auth_chain} - seen_ids = set( - (yield self.store.have_events(event_ids)).keys() - ) + seen_ids = yield self.store.have_seen_events(event_ids) if state and auth_chain is not None: # If we have any state or auth_chain given to us by the replication @@ -469,18 +480,18 @@ class FederationHandler(BaseHandler): # to get all state ids that we're interested in. event_map = yield self.store.get_events([ e_id - for key_to_eid in event_to_state_ids.values() - for key, e_id in key_to_eid.items() + for key_to_eid in event_to_state_ids.itervalues() + for key, e_id in key_to_eid.iteritems() if key[0] != EventTypes.Member or check_match(key[1]) ]) event_to_state = { e_id: { key: event_map[inner_e_id] - for key, inner_e_id in key_to_eid.items() + for key, inner_e_id in key_to_eid.iteritems() if inner_e_id in event_map } - for e_id, key_to_eid in event_to_state_ids.items() + for e_id, key_to_eid in event_to_state_ids.iteritems() } def redact_disallowed(event, state): @@ -495,7 +506,7 @@ class FederationHandler(BaseHandler): # membership states for the requesting server to determine # if the server is either in the room or has been invited # into the room. - for ev in state.values(): + for ev in state.itervalues(): if ev.type != EventTypes.Member: continue try: @@ -527,9 +538,16 @@ class FederationHandler(BaseHandler): def backfill(self, dest, room_id, limit, extremities): """ Trigger a backfill request to `dest` for the given `room_id` - This will attempt to get more events from the remote. This may return - be successfull and still return no events if the other side has no new - events to offer. + This will attempt to get more events from the remote. If the other side + has no new events to offer, this will return an empty list. + + As the events are received, we check their signatures, and also do some + sanity-checking on them. If any of the backfilled events are invalid, + this method throws a SynapseError. + + TODO: make this more useful to distinguish failures of the remote + server from invalid events (there is probably no point in trying to + re-fetch invalid events from every other HS in the room.) """ if dest == self.server_name: raise SynapseError(400, "Can't backfill from self.") @@ -541,6 +559,16 @@ class FederationHandler(BaseHandler): extremities=extremities, ) + # ideally we'd sanity check the events here for excess prev_events etc, + # but it's hard to reject events at this point without completely + # breaking backfill in the same way that it is currently broken by + # events whose signature we cannot verify (#3121). + # + # So for now we accept the events anyway. #3124 tracks this. + # + # for ev in events: + # self._sanity_check_event(ev) + # Don't bother processing events we already have. seen_events = yield self.store.have_events_in_timeline( set(e.event_id for e in events) @@ -613,7 +641,8 @@ class FederationHandler(BaseHandler): results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - logcontext.preserve_fn(self.replication_layer.get_pdu)( + logcontext.run_in_background( + self.replication_layer.get_pdu, [dest], event_id, outlier=True, @@ -633,7 +662,7 @@ class FederationHandler(BaseHandler): failed_to_fetch = missing_auth - set(auth_events) - seen_events = yield self.store.have_events( + seen_events = yield self.store.have_seen_events( set(auth_events.keys()) | set(state_events.keys()) ) @@ -723,9 +752,19 @@ class FederationHandler(BaseHandler): curr_state = yield self.state_handler.get_current_state(room_id) def get_domains_from_state(state): + """Get joined domains from state + + Args: + state (dict[tuple, FrozenEvent]): State map from type/state + key to event. + + Returns: + list[tuple[str, int]]: Returns a list of servers with the + lowest depth of their joins. Sorted by lowest depth first. + """ joined_users = [ (state_key, int(event.depth)) - for (e_type, state_key), event in state.items() + for (e_type, state_key), event in state.iteritems() if e_type == EventTypes.Member and event.membership == Membership.JOIN ] @@ -742,7 +781,7 @@ class FederationHandler(BaseHandler): except Exception: pass - return sorted(joined_domains.items(), key=lambda d: d[1]) + return sorted(joined_domains.iteritems(), key=lambda d: d[1]) curr_domains = get_domains_from_state(curr_state) @@ -759,7 +798,7 @@ class FederationHandler(BaseHandler): yield self.backfill( dom, room_id, limit=100, - extremities=[e for e in extremities.keys()] + extremities=extremities, ) # If this succeeded then we probably already have the # appropriate stuff. @@ -805,7 +844,7 @@ class FederationHandler(BaseHandler): tried_domains = set(likely_domains) tried_domains.add(self.server_name) - event_ids = list(extremities.keys()) + event_ids = list(extremities.iterkeys()) logger.debug("calling resolve_state_groups in _maybe_backfill") resolve = logcontext.preserve_fn( @@ -815,34 +854,69 @@ class FederationHandler(BaseHandler): [resolve(room_id, [e]) for e in event_ids], consumeErrors=True, )) + + # dict[str, dict[tuple, str]], a map from event_id to state map of + # event_ids. states = dict(zip(event_ids, [s.state for s in states])) state_map = yield self.store.get_events( - [e_id for ids in states.values() for e_id in ids], + [e_id for ids in states.itervalues() for e_id in ids.itervalues()], get_prev_content=False ) states = { key: { k: state_map[e_id] - for k, e_id in state_dict.items() + for k, e_id in state_dict.iteritems() if e_id in state_map - } for key, state_dict in states.items() + } for key, state_dict in states.iteritems() } for e_id, _ in sorted_extremeties_tuple: likely_domains = get_domains_from_state(states[e_id]) success = yield try_backfill([ - dom for dom in likely_domains + dom for dom, _ in likely_domains if dom not in tried_domains ]) if success: defer.returnValue(True) - tried_domains.update(likely_domains) + tried_domains.update(dom for dom, _ in likely_domains) defer.returnValue(False) + def _sanity_check_event(self, ev): + """ + Do some early sanity checks of a received event + + In particular, checks it doesn't have an excessive number of + prev_events or auth_events, which could cause a huge state resolution + or cascade of event fetches. + + Args: + ev (synapse.events.EventBase): event to be checked + + Returns: None + + Raises: + SynapseError if the event does not pass muster + """ + if len(ev.prev_events) > 20: + logger.warn("Rejecting event %s which has %i prev_events", + ev.event_id, len(ev.prev_events)) + raise SynapseError( + http_client.BAD_REQUEST, + "Too many prev_events", + ) + + if len(ev.auth_events) > 10: + logger.warn("Rejecting event %s which has %i auth_events", + ev.event_id, len(ev.auth_events)) + raise SynapseError( + http_client.BAD_REQUEST, + "Too many auth_events", + ) + @defer.inlineCallbacks def send_invite(self, target_host, event): """ Sends the invite to the remote server for signing. @@ -967,7 +1041,7 @@ class FederationHandler(BaseHandler): # lots of requests for missing prev_events which we do actually # have. Hence we fire off the deferred, but don't wait for it. - logcontext.preserve_fn(self._handle_queued_pdus)(room_queue) + logcontext.run_in_background(self._handle_queued_pdus, room_queue) defer.returnValue(True) @@ -1121,6 +1195,13 @@ class FederationHandler(BaseHandler): if not self.is_mine_id(event.state_key): raise SynapseError(400, "The invite event must be for this server") + # block any attempts to invite the server notices mxid + if event.state_key == self._server_notices_mxid: + raise SynapseError( + http_client.FORBIDDEN, + "Cannot invite this user", + ) + event.internal_metadata.outlier = True event.internal_metadata.invite_from_remote = True @@ -1308,7 +1389,7 @@ class FederationHandler(BaseHandler): ) if state_groups: - _, state = state_groups.items().pop() + _, state = list(iteritems(state_groups)).pop() results = { (e.type, e.state_key): e for e in state } @@ -1457,18 +1538,21 @@ class FederationHandler(BaseHandler): backfilled=backfilled, ) except: # noqa: E722, as we reraise the exception this is fine. - # Ensure that we actually remove the entries in the push actions - # staging area - logcontext.preserve_fn( - self.store.remove_push_actions_from_staging - )(event.event_id) - raise + tp, value, tb = sys.exc_info() + + logcontext.run_in_background( + self.store.remove_push_actions_from_staging, + event.event_id, + ) + + six.reraise(tp, value, tb) if not backfilled: # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - logcontext.preserve_fn(self.pusher_pool.on_new_notifications)( - event_stream_id, max_stream_id + logcontext.run_in_background( + self.pusher_pool.on_new_notifications, + event_stream_id, max_stream_id, ) defer.returnValue((context, event_stream_id, max_stream_id)) @@ -1482,7 +1566,8 @@ class FederationHandler(BaseHandler): """ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - logcontext.preserve_fn(self._prep_event)( + logcontext.run_in_background( + self._prep_event, origin, ev_info["event"], state=ev_info.get("state"), @@ -1736,7 +1821,8 @@ class FederationHandler(BaseHandler): event_key = None if event_auth_events - current_state: - have_events = yield self.store.have_events( + # TODO: can we use store.have_seen_events here instead? + have_events = yield self.store.get_seen_events_with_rejections( event_auth_events - current_state ) else: @@ -1759,12 +1845,12 @@ class FederationHandler(BaseHandler): origin, event.room_id, event.event_id ) - seen_remotes = yield self.store.have_events( + seen_remotes = yield self.store.have_seen_events( [e.event_id for e in remote_auth_chain] ) for e in remote_auth_chain: - if e.event_id in seen_remotes.keys(): + if e.event_id in seen_remotes: continue if e.event_id == event.event_id: @@ -1791,7 +1877,7 @@ class FederationHandler(BaseHandler): except AuthError: pass - have_events = yield self.store.have_events( + have_events = yield self.store.get_seen_events_with_rejections( [e_id for e_id, _ in event.auth_events] ) seen_events = set(have_events.keys()) @@ -1810,7 +1896,8 @@ class FederationHandler(BaseHandler): different_events = yield logcontext.make_deferred_yieldable( defer.gatherResults([ - logcontext.preserve_fn(self.store.get_event)( + logcontext.run_in_background( + self.store.get_event, d, allow_none=True, allow_rejected=False, @@ -1876,13 +1963,13 @@ class FederationHandler(BaseHandler): local_auth_chain, ) - seen_remotes = yield self.store.have_events( + seen_remotes = yield self.store.have_seen_events( [e.event_id for e in result["auth_chain"]] ) # 3. Process any remote auth chain events we haven't seen. for ev in result["auth_chain"]: - if ev.event_id in seen_remotes.keys(): + if ev.event_id in seen_remotes: continue if ev.event_id == event.event_id: @@ -1948,7 +2035,7 @@ class FederationHandler(BaseHandler): this will not be included in the current_state in the context. """ state_updates = { - k: a.event_id for k, a in auth_events.iteritems() + k: a.event_id for k, a in iteritems(auth_events) if k != event_key } context.current_state_ids = dict(context.current_state_ids) @@ -1958,7 +2045,7 @@ class FederationHandler(BaseHandler): context.delta_ids.update(state_updates) context.prev_state_ids = dict(context.prev_state_ids) context.prev_state_ids.update({ - k: a.event_id for k, a in auth_events.iteritems() + k: a.event_id for k, a in iteritems(auth_events) }) context.state_group = yield self.store.store_state_group( event.event_id, @@ -2010,7 +2097,7 @@ class FederationHandler(BaseHandler): def get_next(it, opt=None): try: - return it.next() + return next(it) except Exception: return opt diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index e4d0cc8b02..dcae083734 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,6 +15,7 @@ # limitations under the License. from twisted.internet import defer +from six import iteritems from synapse.api.errors import SynapseError from synapse.types import get_domain_from_id @@ -90,6 +92,8 @@ class GroupsLocalHandler(object): get_group_role = _create_rerouter("get_group_role") get_group_roles = _create_rerouter("get_group_roles") + set_group_join_policy = _create_rerouter("set_group_join_policy") + @defer.inlineCallbacks def get_group_summary(self, group_id, requester_user_id): """Get the group summary for a group. @@ -226,7 +230,45 @@ class GroupsLocalHandler(object): def join_group(self, group_id, user_id, content): """Request to join a group """ - raise NotImplementedError() # TODO + if self.is_mine_id(group_id): + yield self.groups_server_handler.join_group( + group_id, user_id, content + ) + local_attestation = None + remote_attestation = None + else: + local_attestation = self.attestations.create_attestation(group_id, user_id) + content["attestation"] = local_attestation + + res = yield self.transport_client.join_group( + get_domain_from_id(group_id), group_id, user_id, content, + ) + + remote_attestation = res["attestation"] + + yield self.attestations.verify_attestation( + remote_attestation, + group_id=group_id, + user_id=user_id, + server_name=get_domain_from_id(group_id), + ) + + # TODO: Check that the group is public and we're being added publically + is_publicised = content.get("publicise", False) + + token = yield self.store.register_user_group_membership( + group_id, user_id, + membership="join", + is_admin=False, + local_attestation=local_attestation, + remote_attestation=remote_attestation, + is_publicised=is_publicised, + ) + self.notifier.on_new_event( + "groups_key", token, users=[user_id], + ) + + defer.returnValue({}) @defer.inlineCallbacks def accept_invite(self, group_id, user_id, content): @@ -408,7 +450,7 @@ class GroupsLocalHandler(object): results = {} failed_results = [] - for destination, dest_user_ids in destinations.iteritems(): + for destination, dest_user_ids in iteritems(destinations): try: r = yield self.transport_client.bulk_get_publicised_groups( destination, list(dest_user_ids), diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 9efcdff1d6..91a0898860 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -15,6 +15,11 @@ # limitations under the License. """Utilities for interacting with Identity Servers""" + +import logging + +import simplejson as json + from twisted.internet import defer from synapse.api.errors import ( @@ -24,9 +29,6 @@ from ._base import BaseHandler from synapse.util.async import run_on_reactor from synapse.api.errors import SynapseError, Codes -import json -import logging - logger = logging.getLogger(__name__) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index c5267b4b84..71af86fe21 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -27,7 +27,7 @@ from synapse.types import ( from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -166,7 +166,8 @@ class InitialSyncHandler(BaseHandler): (messages, token), current_state = yield make_deferred_yieldable( defer.gatherResults( [ - preserve_fn(self.store.get_recent_events_for_room)( + run_in_background( + self.store.get_recent_events_for_room, event.room_id, limit=limit, end_token=room_end_token, @@ -180,8 +181,8 @@ class InitialSyncHandler(BaseHandler): self.store, user_id, messages ) - start_token = now_token.copy_and_replace("room_key", token[0]) - end_token = now_token.copy_and_replace("room_key", token[1]) + start_token = now_token.copy_and_replace("room_key", token) + end_token = now_token.copy_and_replace("room_key", room_end_token) time_now = self.clock.time_msec() d["messages"] = { @@ -324,8 +325,8 @@ class InitialSyncHandler(BaseHandler): self.store, user_id, messages, is_peeking=is_peeking ) - start_token = StreamToken.START.copy_and_replace("room_key", token[0]) - end_token = StreamToken.START.copy_and_replace("room_key", token[1]) + start_token = StreamToken.START.copy_and_replace("room_key", token) + end_token = StreamToken.START.copy_and_replace("room_key", stream_token) time_now = self.clock.time_msec() @@ -391,9 +392,10 @@ class InitialSyncHandler(BaseHandler): presence, receipts, (messages, token) = yield defer.gatherResults( [ - preserve_fn(get_presence)(), - preserve_fn(get_receipts)(), - preserve_fn(self.store.get_recent_events_for_room)( + run_in_background(get_presence), + run_in_background(get_receipts), + run_in_background( + self.store.get_recent_events_for_room, room_id, limit=limit, end_token=now_token.room_key, @@ -406,8 +408,8 @@ class InitialSyncHandler(BaseHandler): self.store, user_id, messages, is_peeking=is_peeking, ) - start_token = now_token.copy_and_replace("room_key", token[0]) - end_token = now_token.copy_and_replace("room_key", token[1]) + start_token = now_token.copy_and_replace("room_key", token) + end_token = now_token time_now = self.clock.time_msec() diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4f97c8db79..81cff0870e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -13,11 +13,23 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import simplejson +import sys + +from canonicaljson import encode_canonical_json +import six +from six import string_types, itervalues, iteritems from twisted.internet import defer, reactor +from twisted.internet.defer import succeed from twisted.python.failure import Failure -from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import AuthError, Codes, SynapseError +from synapse.api.constants import EventTypes, Membership, MAX_DEPTH +from synapse.api.errors import ( + AuthError, Codes, SynapseError, + ConsentNotGivenError, +) +from synapse.api.urls import ConsentURIBuilder from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator @@ -25,21 +37,15 @@ from synapse.types import ( UserID, RoomAlias, RoomStreamToken, ) from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter -from synapse.util.logcontext import preserve_fn, run_in_background +from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func -from synapse.util.frozenutils import unfreeze +from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client from synapse.replication.http.send_event import send_event_to_master from ._base import BaseHandler -from canonicaljson import encode_canonical_json - -import logging -import random -import ujson - logger = logging.getLogger(__name__) @@ -86,14 +92,14 @@ class MessageHandler(BaseHandler): # map from purge id to PurgeStatus self._purges_by_id = {} - def start_purge_history(self, room_id, topological_ordering, + def start_purge_history(self, room_id, token, delete_local_events=False): """Start off a history purge on a room. Args: room_id (str): The room to purge from - topological_ordering (int): minimum topo ordering to preserve + token (str): topological token to delete events before delete_local_events (bool): True to delete local events as well as remote ones @@ -115,19 +121,19 @@ class MessageHandler(BaseHandler): self._purges_by_id[purge_id] = PurgeStatus() run_in_background( self._purge_history, - purge_id, room_id, topological_ordering, delete_local_events, + purge_id, room_id, token, delete_local_events, ) return purge_id @defer.inlineCallbacks - def _purge_history(self, purge_id, room_id, topological_ordering, + def _purge_history(self, purge_id, room_id, token, delete_local_events): """Carry out a history purge on a room. Args: purge_id (str): The id for this purge room_id (str): The room to purge from - topological_ordering (int): minimum topo ordering to preserve + token (str): topological token to delete events before delete_local_events (bool): True to delete local events as well as remote ones @@ -138,7 +144,7 @@ class MessageHandler(BaseHandler): try: with (yield self.pagination_lock.write(room_id)): yield self.store.purge_history( - room_id, topological_ordering, delete_local_events, + room_id, token, delete_local_events, ) logger.info("[purge] complete") self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE @@ -397,7 +403,7 @@ class MessageHandler(BaseHandler): "avatar_url": profile.avatar_url, "display_name": profile.display_name, } - for user_id, profile in users_with_profile.iteritems() + for user_id, profile in iteritems(users_with_profile) }) @@ -431,9 +437,12 @@ class EventCreationHandler(object): self.spam_checker = hs.get_spam_checker() + if self.config.block_events_without_consent_error is not None: + self._consent_uri_builder = ConsentURIBuilder(self.config) + @defer.inlineCallbacks def create_event(self, requester, event_dict, token_id=None, txn_id=None, - prev_event_ids=None): + prev_events_and_hashes=None): """ Given a dict from a client, create a new event. @@ -447,50 +456,136 @@ class EventCreationHandler(object): event_dict (dict): An entire event token_id (str) txn_id (str) - prev_event_ids (list): The prev event ids to use when creating the event + + prev_events_and_hashes (list[(str, dict[str, str], int)]|None): + the forward extremities to use as the prev_events for the + new event. For each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. + + If None, they will be requested from the database. Returns: Tuple of created event (FrozenEvent), Context """ builder = self.event_builder_factory.new(event_dict) - with (yield self.limiter.queue(builder.room_id)): - self.validator.validate_new(builder) - - if builder.type == EventTypes.Member: - membership = builder.content.get("membership", None) - target = UserID.from_string(builder.state_key) - - if membership in {Membership.JOIN, Membership.INVITE}: - # If event doesn't include a display name, add one. - profile = self.profile_handler - content = builder.content - - try: - if "displayname" not in content: - content["displayname"] = yield profile.get_displayname(target) - if "avatar_url" not in content: - content["avatar_url"] = yield profile.get_avatar_url(target) - except Exception as e: - logger.info( - "Failed to get profile information for %r: %s", - target, e - ) + self.validator.validate_new(builder) + + if builder.type == EventTypes.Member: + membership = builder.content.get("membership", None) + target = UserID.from_string(builder.state_key) + + if membership in {Membership.JOIN, Membership.INVITE}: + # If event doesn't include a display name, add one. + profile = self.profile_handler + content = builder.content + + try: + if "displayname" not in content: + content["displayname"] = yield profile.get_displayname(target) + if "avatar_url" not in content: + content["avatar_url"] = yield profile.get_avatar_url(target) + except Exception as e: + logger.info( + "Failed to get profile information for %r: %s", + target, e + ) - if token_id is not None: - builder.internal_metadata.token_id = token_id + is_exempt = yield self._is_exempt_from_privacy_policy(builder) + if not is_exempt: + yield self.assert_accepted_privacy_policy(requester) - if txn_id is not None: - builder.internal_metadata.txn_id = txn_id + if token_id is not None: + builder.internal_metadata.token_id = token_id - event, context = yield self.create_new_client_event( - builder=builder, - requester=requester, - prev_event_ids=prev_event_ids, - ) + if txn_id is not None: + builder.internal_metadata.txn_id = txn_id + + event, context = yield self.create_new_client_event( + builder=builder, + requester=requester, + prev_events_and_hashes=prev_events_and_hashes, + ) defer.returnValue((event, context)) + def _is_exempt_from_privacy_policy(self, builder): + """"Determine if an event to be sent is exempt from having to consent + to the privacy policy + + Args: + builder (synapse.events.builder.EventBuilder): event being created + + Returns: + Deferred[bool]: true if the event can be sent without the user + consenting + """ + # the only thing the user can do is join the server notices room. + if builder.type == EventTypes.Member: + membership = builder.content.get("membership", None) + if membership == Membership.JOIN: + return self._is_server_notices_room(builder.room_id) + return succeed(False) + + @defer.inlineCallbacks + def _is_server_notices_room(self, room_id): + if self.config.server_notices_mxid is None: + defer.returnValue(False) + user_ids = yield self.store.get_users_in_room(room_id) + defer.returnValue(self.config.server_notices_mxid in user_ids) + + @defer.inlineCallbacks + def assert_accepted_privacy_policy(self, requester): + """Check if a user has accepted the privacy policy + + Called when the given user is about to do something that requires + privacy consent. We see if the user is exempt and otherwise check that + they have given consent. If they have not, a ConsentNotGiven error is + raised. + + Args: + requester (synapse.types.Requester): + The user making the request + + Returns: + Deferred[None]: returns normally if the user has consented or is + exempt + + Raises: + ConsentNotGivenError: if the user has not given consent yet + """ + if self.config.block_events_without_consent_error is None: + return + + # exempt AS users from needing consent + if requester.app_service is not None: + return + + user_id = requester.user.to_string() + + # exempt the system notices user + if ( + self.config.server_notices_mxid is not None and + user_id == self.config.server_notices_mxid + ): + return + + u = yield self.store.get_user_by_id(user_id) + assert u is not None + if u["consent_version"] == self.config.user_consent_version: + return + + consent_uri = self._consent_uri_builder.build_user_consent_uri( + requester.user.localpart, + ) + msg = self.config.block_events_without_consent_error % { + 'consent_uri': consent_uri, + } + raise ConsentNotGivenError( + msg=msg, + consent_uri=consent_uri, + ) + @defer.inlineCallbacks def send_nonmember_event(self, requester, event, context, ratelimit=True): """ @@ -557,64 +652,80 @@ class EventCreationHandler(object): See self.create_event and self.send_nonmember_event. """ - event, context = yield self.create_event( - requester, - event_dict, - token_id=requester.access_token_id, - txn_id=txn_id - ) - spam_error = self.spam_checker.check_event_for_spam(event) - if spam_error: - if not isinstance(spam_error, basestring): - spam_error = "Spam is not permitted here" - raise SynapseError( - 403, spam_error, Codes.FORBIDDEN + # We limit the number of concurrent event sends in a room so that we + # don't fork the DAG too much. If we don't limit then we can end up in + # a situation where event persistence can't keep up, causing + # extremities to pile up, which in turn leads to state resolution + # taking longer. + with (yield self.limiter.queue(event_dict["room_id"])): + event, context = yield self.create_event( + requester, + event_dict, + token_id=requester.access_token_id, + txn_id=txn_id ) - yield self.send_nonmember_event( - requester, - event, - context, - ratelimit=ratelimit, - ) + spam_error = self.spam_checker.check_event_for_spam(event) + if spam_error: + if not isinstance(spam_error, string_types): + spam_error = "Spam is not permitted here" + raise SynapseError( + 403, spam_error, Codes.FORBIDDEN + ) + + yield self.send_nonmember_event( + requester, + event, + context, + ratelimit=ratelimit, + ) defer.returnValue(event) @measure_func("create_new_client_event") @defer.inlineCallbacks - def create_new_client_event(self, builder, requester=None, prev_event_ids=None): - if prev_event_ids: - prev_events = yield self.store.add_event_hashes(prev_event_ids) - prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) - depth = prev_max_depth + 1 - else: - latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( - builder.room_id, - ) + def create_new_client_event(self, builder, requester=None, + prev_events_and_hashes=None): + """Create a new event for a local client - # We want to limit the max number of prev events we point to in our - # new event - if len(latest_ret) > 10: - # Sort by reverse depth, so we point to the most recent. - latest_ret.sort(key=lambda a: -a[2]) - new_latest_ret = latest_ret[:5] - - # We also randomly point to some of the older events, to make - # sure that we don't completely ignore the older events. - if latest_ret[5:]: - sample_size = min(5, len(latest_ret[5:])) - new_latest_ret.extend(random.sample(latest_ret[5:], sample_size)) - latest_ret = new_latest_ret - - if latest_ret: - depth = max([d for _, _, d in latest_ret]) + 1 - else: - depth = 1 + Args: + builder (EventBuilder): + + requester (synapse.types.Requester|None): + + prev_events_and_hashes (list[(str, dict[str, str], int)]|None): + the forward extremities to use as the prev_events for the + new event. For each event, a tuple of (event_id, hashes, depth) + where *hashes* is a map from algorithm to hash. - prev_events = [ - (event_id, prev_hashes) - for event_id, prev_hashes, _ in latest_ret - ] + If None, they will be requested from the database. + + Returns: + Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)] + """ + + if prev_events_and_hashes is not None: + assert len(prev_events_and_hashes) <= 10, \ + "Attempting to create an event with %i prev_events" % ( + len(prev_events_and_hashes), + ) + else: + prev_events_and_hashes = \ + yield self.store.get_prev_events_for_room(builder.room_id) + + if prev_events_and_hashes: + depth = max([d for _, _, d in prev_events_and_hashes]) + 1 + # we cap depth of generated events, to ensure that they are not + # rejected by other servers (and so that they can be persisted in + # the db) + depth = min(depth, MAX_DEPTH) + else: + depth = 1 + + prev_events = [ + (event_id, prev_hashes) + for event_id, prev_hashes, _ in prev_events_and_hashes + ] builder.prev_events = prev_events builder.depth = depth @@ -678,8 +789,8 @@ class EventCreationHandler(object): # Ensure that we can round trip before trying to persist in db try: - dump = ujson.dumps(unfreeze(event.content)) - ujson.loads(dump) + dump = frozendict_json_encoder.encode(event.content) + simplejson.loads(dump) except Exception: logger.exception("Failed to encode content: %r", event.content) raise @@ -713,8 +824,14 @@ class EventCreationHandler(object): except: # noqa: E722, as we reraise the exception this is fine. # Ensure that we actually remove the entries in the push actions # staging area, if we calculated them. - preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id) - raise + tp, value, tb = sys.exc_info() + + run_in_background( + self.store.remove_push_actions_from_staging, + event.event_id, + ) + + six.reraise(tp, value, tb) @defer.inlineCallbacks def persist_and_notify_client_event( @@ -765,7 +882,7 @@ class EventCreationHandler(object): state_to_include_ids = [ e_id - for k, e_id in context.current_state_ids.iteritems() + for k, e_id in iteritems(context.current_state_ids) if k[0] in self.hs.config.room_invite_state_types or k == (EventTypes.Member, event.sender) ] @@ -779,7 +896,7 @@ class EventCreationHandler(object): "content": e.content, "sender": e.sender, } - for e in state_to_include.itervalues() + for e in itervalues(state_to_include) ] invitee = UserID.from_string(event.state_key) @@ -834,22 +951,33 @@ class EventCreationHandler(object): # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - preserve_fn(self.pusher_pool.on_new_notifications)( + run_in_background( + self.pusher_pool.on_new_notifications, event_stream_id, max_stream_id ) @defer.inlineCallbacks def _notify(): yield run_on_reactor() - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) + try: + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + except Exception: + logger.exception("Error notifying about new room event") - preserve_fn(_notify)() + run_in_background(_notify) if event.type == EventTypes.Message: - presence = self.hs.get_presence_handler() # We don't want to block sending messages on any presence code. This # matters as sometimes presence code can take a while. - preserve_fn(presence.bump_presence_active_time)(requester.user) + run_in_background(self._bump_active_time, requester.user) + + @defer.inlineCallbacks + def _bump_active_time(self, user): + try: + presence = self.hs.get_presence_handler() + yield presence.bump_presence_active_time(user) + except Exception: + logger.exception("Error bumping presence active time") diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a5e501897c..26fc0d3ec7 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -25,38 +25,42 @@ The methods that define policy are: from twisted.internet import defer, reactor from contextlib import contextmanager +from six import itervalues, iteritems + from synapse.api.errors import SynapseError from synapse.api.constants import PresenceState from synapse.storage.presence import UserPresenceState from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.async import Linearizer -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer from synapse.types import UserID, get_domain_from_id -import synapse.metrics +from synapse.metrics import LaterGauge import logging +from prometheus_client import Counter logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) -notified_presence_counter = metrics.register_counter("notified_presence") -federation_presence_out_counter = metrics.register_counter("federation_presence_out") -presence_updates_counter = metrics.register_counter("presence_updates") -timers_fired_counter = metrics.register_counter("timers_fired") -federation_presence_counter = metrics.register_counter("federation_presence") -bump_active_time_counter = metrics.register_counter("bump_active_time") +notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "") +federation_presence_out_counter = Counter( + "synapse_handler_presence_federation_presence_out", "") +presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "") +timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "") +federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "") +bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "") -get_updates_counter = metrics.register_counter("get_updates", labels=["type"]) +get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"]) -notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"]) -state_transition_counter = metrics.register_counter( - "state_transition", labels=["from", "to"] +notify_reason_counter = Counter( + "synapse_handler_presence_notify_reason", "", ["reason"]) +state_transition_counter = Counter( + "synapse_handler_presence_state_transition", "", ["from", "to"] ) @@ -87,6 +91,11 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER class PresenceHandler(object): def __init__(self, hs): + """ + + Args: + hs (synapse.server.HomeServer): + """ self.is_mine = hs.is_mine self.is_mine_id = hs.is_mine_id self.clock = hs.get_clock() @@ -94,7 +103,6 @@ class PresenceHandler(object): self.wheel_timer = WheelTimer() self.notifier = hs.get_notifier() self.federation = hs.get_federation_sender() - self.state = hs.get_state_handler() federation_registry = hs.get_federation_registry() @@ -137,8 +145,9 @@ class PresenceHandler(object): for state in active_presence } - metrics.register_callback( - "user_to_current_state_size", lambda: len(self.user_to_current_state) + LaterGauge( + "synapse_handlers_presence_user_to_current_state_size", "", [], + lambda: len(self.user_to_current_state) ) now = self.clock.time_msec() @@ -208,7 +217,8 @@ class PresenceHandler(object): 60 * 1000, ) - metrics.register_callback("wheel_timer_size", lambda: len(self.wheel_timer)) + LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [], + lambda: len(self.wheel_timer)) @defer.inlineCallbacks def _on_shutdown(self): @@ -255,6 +265,14 @@ class PresenceHandler(object): logger.info("Finished _persist_unpersisted_changes") @defer.inlineCallbacks + def _update_states_and_catch_exception(self, new_states): + try: + res = yield self._update_states(new_states) + defer.returnValue(res) + except Exception: + logger.exception("Error updating presence") + + @defer.inlineCallbacks def _update_states(self, new_states): """Updates presence of users. Sets the appropriate timeouts. Pokes the notifier and federation if and only if the changed presence state @@ -303,10 +321,10 @@ class PresenceHandler(object): # TODO: We should probably ensure there are no races hereafter - presence_updates_counter.inc_by(len(new_states)) + presence_updates_counter.inc(len(new_states)) if to_notify: - notified_presence_counter.inc_by(len(to_notify)) + notified_presence_counter.inc(len(to_notify)) yield self._persist_and_notify(to_notify.values()) self.unpersisted_users_changes |= set(s.user_id for s in new_states) @@ -317,7 +335,7 @@ class PresenceHandler(object): if user_id not in to_notify } if to_federation_ping: - federation_presence_out_counter.inc_by(len(to_federation_ping)) + federation_presence_out_counter.inc(len(to_federation_ping)) self._push_to_remotes(to_federation_ping.values()) @@ -355,7 +373,7 @@ class PresenceHandler(object): for user_id in users_to_check ] - timers_fired_counter.inc_by(len(states)) + timers_fired_counter.inc(len(states)) changes = handle_timeouts( states, @@ -364,7 +382,7 @@ class PresenceHandler(object): now=now, ) - preserve_fn(self._update_states)(changes) + run_in_background(self._update_states_and_catch_exception, changes) except Exception: logger.exception("Exception in _handle_timeouts loop") @@ -422,20 +440,23 @@ class PresenceHandler(object): @defer.inlineCallbacks def _end(): - if affect_presence: + try: self.user_to_num_current_syncs[user_id] -= 1 prev_state = yield self.current_state_for_user(user_id) yield self._update_states([prev_state.copy_and_replace( last_user_sync_ts=self.clock.time_msec(), )]) + except Exception: + logger.exception("Error updating presence after sync") @contextmanager def _user_syncing(): try: yield finally: - preserve_fn(_end)() + if affect_presence: + run_in_background(_end) defer.returnValue(_user_syncing()) @@ -453,61 +474,6 @@ class PresenceHandler(object): return syncing_user_ids @defer.inlineCallbacks - def update_external_syncs(self, process_id, syncing_user_ids): - """Update the syncing users for an external process - - Args: - process_id(str): An identifier for the process the users are - syncing against. This allows synapse to process updates - as user start and stop syncing against a given process. - syncing_user_ids(set(str)): The set of user_ids that are - currently syncing on that server. - """ - - # Grab the previous list of user_ids that were syncing on that process - prev_syncing_user_ids = ( - self.external_process_to_current_syncs.get(process_id, set()) - ) - # Grab the current presence state for both the users that are syncing - # now and the users that were syncing before this update. - prev_states = yield self.current_state_for_users( - syncing_user_ids | prev_syncing_user_ids - ) - updates = [] - time_now_ms = self.clock.time_msec() - - # For each new user that is syncing check if we need to mark them as - # being online. - for new_user_id in syncing_user_ids - prev_syncing_user_ids: - prev_state = prev_states[new_user_id] - if prev_state.state == PresenceState.OFFLINE: - updates.append(prev_state.copy_and_replace( - state=PresenceState.ONLINE, - last_active_ts=time_now_ms, - last_user_sync_ts=time_now_ms, - )) - else: - updates.append(prev_state.copy_and_replace( - last_user_sync_ts=time_now_ms, - )) - - # For each user that is still syncing or stopped syncing update the - # last sync time so that we will correctly apply the grace period when - # they stop syncing. - for old_user_id in prev_syncing_user_ids: - prev_state = prev_states[old_user_id] - updates.append(prev_state.copy_and_replace( - last_user_sync_ts=time_now_ms, - )) - - yield self._update_states(updates) - - # Update the last updated time for the process. We expire the entries - # if we don't receive an update in the given timeframe. - self.external_process_last_updated_ms[process_id] = self.clock.time_msec() - self.external_process_to_current_syncs[process_id] = syncing_user_ids - - @defer.inlineCallbacks def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec): """Update the syncing users for an external process as a delta. @@ -570,7 +536,7 @@ class PresenceHandler(object): prev_state.copy_and_replace( last_user_sync_ts=time_now_ms, ) - for prev_state in prev_states.itervalues() + for prev_state in itervalues(prev_states) ]) self.external_process_last_updated_ms.pop(process_id, None) @@ -593,14 +559,14 @@ class PresenceHandler(object): for user_id in user_ids } - missing = [user_id for user_id, state in states.iteritems() if not state] + missing = [user_id for user_id, state in iteritems(states) if not state] if missing: # There are things not in our in memory cache. Lets pull them out of # the database. res = yield self.store.get_presence_for_users(missing) states.update(res) - missing = [user_id for user_id, state in states.iteritems() if not state] + missing = [user_id for user_id, state in iteritems(states) if not state] if missing: new = { user_id: UserPresenceState.default(user_id) @@ -696,7 +662,7 @@ class PresenceHandler(object): updates.append(prev_state.copy_and_replace(**new_fields)) if updates: - federation_presence_counter.inc_by(len(updates)) + federation_presence_counter.inc(len(updates)) yield self._update_states(updates) @defer.inlineCallbacks @@ -971,28 +937,28 @@ def should_notify(old_state, new_state): return False if old_state.status_msg != new_state.status_msg: - notify_reason_counter.inc("status_msg_change") + notify_reason_counter.labels("status_msg_change").inc() return True if old_state.state != new_state.state: - notify_reason_counter.inc("state_change") - state_transition_counter.inc(old_state.state, new_state.state) + notify_reason_counter.labels("state_change").inc() + state_transition_counter.labels(old_state.state, new_state.state).inc() return True if old_state.state == PresenceState.ONLINE: if new_state.currently_active != old_state.currently_active: - notify_reason_counter.inc("current_active_change") + notify_reason_counter.labels("current_active_change").inc() return True if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Only notify about last active bumps if we're not currently acive if not new_state.currently_active: - notify_reason_counter.inc("last_active_change_online") + notify_reason_counter.labels("last_active_change_online").inc() return True elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Always notify for a transition where last active gets bumped. - notify_reason_counter.inc("last_active_change_not_online") + notify_reason_counter.labels("last_active_change_not_online").inc() return True return False @@ -1066,14 +1032,14 @@ class PresenceEventSource(object): if changed is not None and len(changed) < 500: # For small deltas, its quicker to get all changes and then # work out if we share a room or they're in our presence list - get_updates_counter.inc("stream") + get_updates_counter.labels("stream").inc() for other_user_id in changed: if other_user_id in users_interested_in: user_ids_changed.add(other_user_id) else: # Too many possible updates. Find all users we can see and check # if any of them have changed. - get_updates_counter.inc("full") + get_updates_counter.labels("full").inc() if from_key: user_ids_changed = stream_change_cache.get_entities_changed( @@ -1088,7 +1054,7 @@ class PresenceEventSource(object): defer.returnValue((updates.values(), max_token)) else: defer.returnValue(([ - s for s in updates.itervalues() + s for s in itervalues(updates) if s.state != PresenceState.OFFLINE ], max_token)) @@ -1345,11 +1311,11 @@ def get_interested_remotes(store, states, state_handler): # hosts in those rooms. room_ids_to_states, users_to_states = yield get_interested_parties(store, states) - for room_id, states in room_ids_to_states.iteritems(): + for room_id, states in iteritems(room_ids_to_states): hosts = yield state_handler.get_current_hosts_in_room(room_id) hosts_and_states.append((hosts, states)) - for user_id, states in users_to_states.iteritems(): + for user_id, states in iteritems(users_to_states): host = get_domain_from_id(user_id) hosts_and_states.append(([host], states)) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 3f215c2b4e..2e0672161c 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -135,37 +135,40 @@ class ReceiptsHandler(BaseHandler): """Given a list of receipts, works out which remote servers should be poked and pokes them. """ - # TODO: Some of this stuff should be coallesced. - for receipt in receipts: - room_id = receipt["room_id"] - receipt_type = receipt["receipt_type"] - user_id = receipt["user_id"] - event_ids = receipt["event_ids"] - data = receipt["data"] - - users = yield self.state.get_current_user_in_room(room_id) - remotedomains = set(get_domain_from_id(u) for u in users) - remotedomains = remotedomains.copy() - remotedomains.discard(self.server_name) - - logger.debug("Sending receipt to: %r", remotedomains) - - for domain in remotedomains: - self.federation.send_edu( - destination=domain, - edu_type="m.receipt", - content={ - room_id: { - receipt_type: { - user_id: { - "event_ids": event_ids, - "data": data, + try: + # TODO: Some of this stuff should be coallesced. + for receipt in receipts: + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + data = receipt["data"] + + users = yield self.state.get_current_user_in_room(room_id) + remotedomains = set(get_domain_from_id(u) for u in users) + remotedomains = remotedomains.copy() + remotedomains.discard(self.server_name) + + logger.debug("Sending receipt to: %r", remotedomains) + + for domain in remotedomains: + self.federation.send_edu( + destination=domain, + edu_type="m.receipt", + content={ + room_id: { + receipt_type: { + user_id: { + "event_ids": event_ids, + "data": data, + } } - } + }, }, - }, - key=(room_id, receipt_type, user_id), - ) + key=(room_id, receipt_type, user_id), + ) + except Exception: + logger.exception("Error pushing receipts to remote servers") @defer.inlineCallbacks def get_receipts_for_room(self, room_id, to_key): diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index ed5939880a..7e52adda3c 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -23,8 +23,8 @@ from synapse.api.errors import ( ) from synapse.http.client import CaptchaServerHttpClient from synapse import types -from synapse.types import UserID -from synapse.util.async import run_on_reactor +from synapse.types import UserID, create_requester, RoomID, RoomAlias +from synapse.util.async import run_on_reactor, Linearizer from synapse.util.threepids import check_3pid_allowed from ._base import BaseHandler @@ -34,6 +34,11 @@ logger = logging.getLogger(__name__) class RegistrationHandler(BaseHandler): def __init__(self, hs): + """ + + Args: + hs (synapse.server.HomeServer): + """ super(RegistrationHandler, self).__init__(hs) self.auth = hs.get_auth() @@ -46,6 +51,11 @@ class RegistrationHandler(BaseHandler): self.macaroon_gen = hs.get_macaroon_generator() + self._generate_user_id_linearizer = Linearizer( + name="_generate_user_id_linearizer", + ) + self._server_notices_mxid = hs.config.server_notices_mxid + @defer.inlineCallbacks def check_username(self, localpart, guest_access_token=None, assigned_user_id=None): @@ -201,10 +211,17 @@ class RegistrationHandler(BaseHandler): token = None attempts += 1 + # auto-join the user to any rooms we're supposed to dump them into + fake_requester = create_requester(user_id) + for r in self.hs.config.auto_join_rooms: + try: + yield self._join_user_to_room(fake_requester, r) + except Exception as e: + logger.error("Failed to join new user to %r: %r", r, e) + # We used to generate default identicons here, but nowadays # we want clients to generate their own as part of their branding # rather than there being consistent matrix-wide ones, so we don't. - defer.returnValue((user_id, token)) @defer.inlineCallbacks @@ -327,6 +344,14 @@ class RegistrationHandler(BaseHandler): yield identity_handler.bind_threepid(c, user_id) def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None): + # don't allow people to register the server notices mxid + if self._server_notices_mxid is not None: + if user_id == self._server_notices_mxid: + raise SynapseError( + 400, "This user ID is reserved.", + errcode=Codes.EXCLUSIVE + ) + # valid user IDs must not clash with any user ID namespaces claimed by # application services. services = self.store.get_app_services() @@ -345,9 +370,11 @@ class RegistrationHandler(BaseHandler): @defer.inlineCallbacks def _generate_user_id(self, reseed=False): if reseed or self._next_generated_user_id is None: - self._next_generated_user_id = ( - yield self.store.find_next_generated_user_id_localpart() - ) + with (yield self._generate_user_id_linearizer.queue(())): + if reseed or self._next_generated_user_id is None: + self._next_generated_user_id = ( + yield self.store.find_next_generated_user_id_localpart() + ) id = self._next_generated_user_id self._next_generated_user_id += 1 @@ -477,3 +504,28 @@ class RegistrationHandler(BaseHandler): ) defer.returnValue((user_id, access_token)) + + @defer.inlineCallbacks + def _join_user_to_room(self, requester, room_identifier): + room_id = None + room_member_handler = self.hs.get_room_member_handler() + if RoomID.is_valid(room_identifier): + room_id = room_identifier + elif RoomAlias.is_valid(room_identifier): + room_alias = RoomAlias.from_string(room_identifier) + room_id, remote_room_hosts = ( + yield room_member_handler.lookup_room_alias(room_alias) + ) + room_id = room_id.to_string() + else: + raise SynapseError(400, "%s was not legal room ID or room alias" % ( + room_identifier, + )) + + yield room_member_handler.update_membership( + requester=requester, + target=requester.user, + room_id=room_id, + remote_room_hosts=remote_room_hosts, + action="join", + ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 8df8fcbbad..b5850db42f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -68,14 +68,27 @@ class RoomCreationHandler(BaseHandler): self.event_creation_handler = hs.get_event_creation_handler() @defer.inlineCallbacks - def create_room(self, requester, config, ratelimit=True): + def create_room(self, requester, config, ratelimit=True, + creator_join_profile=None): """ Creates a new room. Args: - requester (Requester): The user who requested the room creation. + requester (synapse.types.Requester): + The user who requested the room creation. config (dict) : A dict of configuration options. + ratelimit (bool): set to False to disable the rate limiter + + creator_join_profile (dict|None): + Set to override the displayname and avatar for the creating + user in this room. If unset, displayname and avatar will be + derived from the user's profile. If set, should contain the + values to go in the body of the 'join' event (typically + `avatar_url` and/or `displayname`. + Returns: - The new room ID. + Deferred[dict]: + a dict containing the keys `room_id` and, if an alias was + requested, `room_alias`. Raises: SynapseError if the room ID couldn't be stored, or something went horribly wrong. @@ -113,6 +126,10 @@ class RoomCreationHandler(BaseHandler): except Exception: raise SynapseError(400, "Invalid user_id: %s" % (i,)) + yield self.event_creation_handler.assert_accepted_privacy_policy( + requester, + ) + invite_3pid_list = config.get("invite_3pid", []) visibility = config.get("visibility", None) @@ -176,7 +193,8 @@ class RoomCreationHandler(BaseHandler): initial_state=initial_state, creation_content=creation_content, room_alias=room_alias, - power_level_content_override=config.get("power_level_content_override", {}) + power_level_content_override=config.get("power_level_content_override", {}), + creator_join_profile=creator_join_profile, ) if "name" in config: @@ -256,6 +274,7 @@ class RoomCreationHandler(BaseHandler): creation_content, room_alias, power_level_content_override, + creator_join_profile, ): def create(etype, content, **kwargs): e = { @@ -299,6 +318,7 @@ class RoomCreationHandler(BaseHandler): room_id, "join", ratelimit=False, + content=creator_join_profile, ) # We treat the power levels override specially as this needs to be one diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 5d81f59b44..5757bb7f8a 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -15,12 +15,13 @@ from twisted.internet import defer +from six.moves import range + from ._base import BaseHandler from synapse.api.constants import ( EventTypes, JoinRules, ) -from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.async import concurrently_execute from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.response_cache import ResponseCache @@ -44,8 +45,9 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache(hs) - self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000) + self.response_cache = ResponseCache(hs, "room_list") + self.remote_response_cache = ResponseCache(hs, "remote_room_list", + timeout_ms=30 * 1000) def get_local_public_room_list(self, limit=None, since_token=None, search_filter=None, @@ -77,18 +79,11 @@ class RoomListHandler(BaseHandler): ) key = (limit, since_token, network_tuple) - result = self.response_cache.get(key) - if not result: - logger.info("No cached result, calculating one.") - result = self.response_cache.set( - key, - preserve_fn(self._get_public_room_list)( - limit, since_token, network_tuple=network_tuple - ) - ) - else: - logger.info("Using cached deferred result.") - return make_deferred_yieldable(result) + return self.response_cache.wrap( + key, + self._get_public_room_list, + limit, since_token, network_tuple=network_tuple, + ) @defer.inlineCallbacks def _get_public_room_list(self, limit=None, since_token=None, @@ -207,7 +202,7 @@ class RoomListHandler(BaseHandler): step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1 chunk = [] - for i in xrange(0, len(rooms_to_scan), step): + for i in range(0, len(rooms_to_scan), step): batch = rooms_to_scan[i:i + step] logger.info("Processing %i rooms for result", len(batch)) yield concurrently_execute( @@ -422,18 +417,14 @@ class RoomListHandler(BaseHandler): server_name, limit, since_token, include_all_networks, third_party_instance_id, ) - result = self.remote_response_cache.get(key) - if not result: - result = self.remote_response_cache.set( - key, - repl_layer.get_public_rooms( - server_name, limit=limit, since_token=since_token, - search_filter=search_filter, - include_all_networks=include_all_networks, - third_party_instance_id=third_party_instance_id, - ) - ) - return result + return self.remote_response_cache.wrap( + key, + repl_layer.get_public_rooms, + server_name, limit=limit, since_token=since_token, + search_filter=search_filter, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, + ) class RoomListNextBatch(namedtuple("RoomListNextBatch", ( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 9977be8831..f930e939e8 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -17,11 +17,14 @@ import abc import logging +from six.moves import http_client + from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from twisted.internet import defer from unpaddedbase64 import decode_base64 +import synapse.server import synapse.types from synapse.api.constants import ( EventTypes, Membership, @@ -46,6 +49,11 @@ class RoomMemberHandler(object): __metaclass__ = abc.ABCMeta def __init__(self, hs): + """ + + Args: + hs (synapse.server.HomeServer): + """ self.hs = hs self.store = hs.get_datastore() self.auth = hs.get_auth() @@ -63,6 +71,7 @@ class RoomMemberHandler(object): self.clock = hs.get_clock() self.spam_checker = hs.get_spam_checker() + self._server_notices_mxid = self.config.server_notices_mxid @abc.abstractmethod def _remote_join(self, requester, remote_room_hosts, room_id, user, content): @@ -149,7 +158,7 @@ class RoomMemberHandler(object): @defer.inlineCallbacks def _local_membership_update( self, requester, target, room_id, membership, - prev_event_ids, + prev_events_and_hashes, txn_id=None, ratelimit=True, content=None, @@ -175,7 +184,7 @@ class RoomMemberHandler(object): }, token_id=requester.access_token_id, txn_id=txn_id, - prev_event_ids=prev_event_ids, + prev_events_and_hashes=prev_events_and_hashes, ) # Check if this event matches the previous membership event for the user. @@ -290,11 +299,26 @@ class RoomMemberHandler(object): if is_blocked: raise SynapseError(403, "This room has been blocked on this server") - if effective_membership_state == "invite": + if effective_membership_state == Membership.INVITE: + # block any attempts to invite the server notices mxid + if target.to_string() == self._server_notices_mxid: + raise SynapseError( + http_client.FORBIDDEN, + "Cannot invite this user", + ) + block_invite = False - is_requester_admin = yield self.auth.is_server_admin( - requester.user, - ) + + if (self._server_notices_mxid is not None and + requester.user.to_string() == self._server_notices_mxid): + # allow the server notices mxid to send invites + is_requester_admin = True + + else: + is_requester_admin = yield self.auth.is_server_admin( + requester.user, + ) + if not is_requester_admin: if self.config.block_non_admin_invites: logger.info( @@ -314,7 +338,12 @@ class RoomMemberHandler(object): 403, "Invites have been disabled on this server", ) - latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + prev_events_and_hashes = yield self.store.get_prev_events_for_room( + room_id, + ) + latest_event_ids = ( + event_id for (event_id, _, _) in prev_events_and_hashes + ) current_state_ids = yield self.state_handler.get_current_state_ids( room_id, latest_event_ids=latest_event_ids, ) @@ -344,6 +373,20 @@ class RoomMemberHandler(object): if same_sender and same_membership and same_content: defer.returnValue(old_state) + # we don't allow people to reject invites to the server notice + # room, but they can leave it once they are joined. + if ( + old_membership == Membership.INVITE and + effective_membership_state == Membership.LEAVE + ): + is_blocked = yield self._is_server_notice_room(room_id) + if is_blocked: + raise SynapseError( + http_client.FORBIDDEN, + "You cannot reject this invite", + errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM, + ) + is_host_in_room = yield self._is_host_in_room(current_state_ids) if effective_membership_state == Membership.JOIN: @@ -403,7 +446,7 @@ class RoomMemberHandler(object): membership=effective_membership_state, txn_id=txn_id, ratelimit=ratelimit, - prev_event_ids=latest_event_ids, + prev_events_and_hashes=prev_events_and_hashes, content=content, ) defer.returnValue(res) @@ -839,6 +882,13 @@ class RoomMemberHandler(object): defer.returnValue(False) + @defer.inlineCallbacks + def _is_server_notice_room(self, room_id): + if self._server_notices_mxid is None: + defer.returnValue(False) + user_ids = yield self.store.get_users_in_room(room_id) + defer.returnValue(self._server_notices_mxid in user_ids) + class RoomMemberMasterHandler(RoomMemberHandler): def __init__(self, hs): @@ -852,6 +902,14 @@ class RoomMemberMasterHandler(RoomMemberHandler): def _remote_join(self, requester, remote_room_hosts, room_id, user, content): """Implements RoomMemberHandler._remote_join """ + # filter ourselves out of remote_room_hosts: do_invite_join ignores it + # and if it is the only entry we'd like to return a 404 rather than a + # 500. + + remote_room_hosts = [ + host for host in remote_room_hosts if host != self.hs.hostname + ] + if len(remote_room_hosts) == 0: raise SynapseError(404, "No known servers") diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f521d22e91..05bf6d46dd 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -16,7 +16,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.util.async import concurrently_execute -from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn +from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure, measure_func from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user @@ -29,6 +29,8 @@ import collections import logging import itertools +from six import itervalues, iteritems + logger = logging.getLogger(__name__) @@ -53,6 +55,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [ to tell if room needs to be part of the sync result. """ return bool(self.events) + __bool__ = __nonzero__ # python3 class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ @@ -77,6 +80,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ # nb the notification count does not, er, count: if there's nothing # else in the result, we don't need to send it. ) + __bool__ = __nonzero__ # python3 class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [ @@ -96,6 +100,7 @@ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [ or self.state or self.account_data ) + __bool__ = __nonzero__ # python3 class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ @@ -107,6 +112,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ def __nonzero__(self): """Invited rooms should always be reported to the client""" return True + __bool__ = __nonzero__ # python3 class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ @@ -118,6 +124,7 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ def __nonzero__(self): return bool(self.join or self.invite or self.leave) + __bool__ = __nonzero__ # python3 class DeviceLists(collections.namedtuple("DeviceLists", [ @@ -128,6 +135,7 @@ class DeviceLists(collections.namedtuple("DeviceLists", [ def __nonzero__(self): return bool(self.changed or self.left) + __bool__ = __nonzero__ # python3 class SyncResult(collections.namedtuple("SyncResult", [ @@ -160,6 +168,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ self.device_lists or self.groups ) + __bool__ = __nonzero__ # python3 class SyncHandler(object): @@ -170,7 +179,7 @@ class SyncHandler(object): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache(hs) + self.response_cache = ResponseCache(hs, "sync") self.state = hs.get_state_handler() def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, @@ -181,15 +190,11 @@ class SyncHandler(object): Returns: A Deferred SyncResult. """ - result = self.response_cache.get(sync_config.request_key) - if not result: - result = self.response_cache.set( - sync_config.request_key, - preserve_fn(self._wait_for_sync_for_user)( - sync_config, since_token, timeout, full_state - ) - ) - return make_deferred_yieldable(result) + return self.response_cache.wrap( + sync_config.request_key, + self._wait_for_sync_for_user, + sync_config, since_token, timeout, full_state, + ) @defer.inlineCallbacks def _wait_for_sync_for_user(self, sync_config, since_token, timeout, @@ -273,7 +278,7 @@ class SyncHandler(object): # result returned by the event source is poor form (it might cache # the object) room_id = event["room_id"] - event_copy = {k: v for (k, v) in event.iteritems() + event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) @@ -292,7 +297,7 @@ class SyncHandler(object): for event in receipts: room_id = event["room_id"] # exclude room id, as above - event_copy = {k: v for (k, v) in event.iteritems() + event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"} ephemeral_by_room.setdefault(room_id, []).append(event_copy) @@ -323,7 +328,7 @@ class SyncHandler(object): current_state_ids = frozenset() if any(e.is_state() for e in recents): current_state_ids = yield self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(current_state_ids.itervalues()) + current_state_ids = frozenset(itervalues(current_state_ids)) recents = yield filter_events_for_client( self.store, @@ -352,12 +357,24 @@ class SyncHandler(object): since_key = since_token.room_key while limited and len(recents) < timeline_limit and max_repeat: - events, end_key = yield self.store.get_room_events_stream_for_room( - room_id, - limit=load_limit + 1, - from_key=since_key, - to_key=end_key, - ) + # If we have a since_key then we are trying to get any events + # that have happened since `since_key` up to `end_key`, so we + # can just use `get_room_events_stream_for_room`. + # Otherwise, we want to return the last N events in the room + # in toplogical ordering. + if since_key: + events, end_key = yield self.store.get_room_events_stream_for_room( + room_id, + limit=load_limit + 1, + from_key=since_key, + to_key=end_key, + ) + else: + events, end_key = yield self.store.get_recent_events_for_room( + room_id, + limit=load_limit + 1, + end_token=end_key, + ) loaded_recents = sync_config.filter_collection.filter_room_timeline( events ) @@ -368,7 +385,7 @@ class SyncHandler(object): current_state_ids = frozenset() if any(e.is_state() for e in loaded_recents): current_state_ids = yield self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(current_state_ids.itervalues()) + current_state_ids = frozenset(itervalues(current_state_ids)) loaded_recents = yield filter_events_for_client( self.store, @@ -427,7 +444,7 @@ class SyncHandler(object): Returns: A Deferred map from ((type, state_key)->Event) """ - last_events, token = yield self.store.get_recent_events_for_room( + last_events, _ = yield self.store.get_recent_events_for_room( room_id, end_token=stream_position.room_key, limit=1, ) @@ -1026,7 +1043,7 @@ class SyncHandler(object): if since_token: for joined_sync in sync_result_builder.joined: it = itertools.chain( - joined_sync.timeline.events, joined_sync.state.itervalues() + joined_sync.timeline.events, itervalues(joined_sync.state) ) for event in it: if event.type == EventTypes.Member: @@ -1104,7 +1121,7 @@ class SyncHandler(object): newly_left_rooms = [] room_entries = [] invited = [] - for room_id, events in mem_change_events_by_room_id.iteritems(): + for room_id, events in iteritems(mem_change_events_by_room_id): non_joins = [e for e in events if e.membership != Membership.JOIN] has_join = len(non_joins) != len(events) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 77c0cf146f..5d9736e88f 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer from synapse.types import UserID, get_domain_from_id @@ -97,7 +97,8 @@ class TypingHandler(object): if self.hs.is_mine_id(member.user_id): last_fed_poke = self._member_last_federation_poke.get(member, None) if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now: - preserve_fn(self._push_remote)( + run_in_background( + self._push_remote, member=member, typing=True ) @@ -196,7 +197,7 @@ class TypingHandler(object): def _push_update(self, member, typing): if self.hs.is_mine_id(member.user_id): # Only send updates for changes to our own users. - preserve_fn(self._push_remote)(member, typing) + run_in_background(self._push_remote, member, typing) self._push_update_local( member=member, @@ -205,28 +206,31 @@ class TypingHandler(object): @defer.inlineCallbacks def _push_remote(self, member, typing): - users = yield self.state.get_current_user_in_room(member.room_id) - self._member_last_federation_poke[member] = self.clock.time_msec() + try: + users = yield self.state.get_current_user_in_room(member.room_id) + self._member_last_federation_poke[member] = self.clock.time_msec() - now = self.clock.time_msec() - self.wheel_timer.insert( - now=now, - obj=member, - then=now + FEDERATION_PING_INTERVAL, - ) + now = self.clock.time_msec() + self.wheel_timer.insert( + now=now, + obj=member, + then=now + FEDERATION_PING_INTERVAL, + ) - for domain in set(get_domain_from_id(u) for u in users): - if domain != self.server_name: - self.federation.send_edu( - destination=domain, - edu_type="m.typing", - content={ - "room_id": member.room_id, - "user_id": member.user_id, - "typing": typing, - }, - key=member, - ) + for domain in set(get_domain_from_id(u) for u in users): + if domain != self.server_name: + self.federation.send_edu( + destination=domain, + edu_type="m.typing", + content={ + "room_id": member.room_id, + "user_id": member.user_id, + "typing": typing, + }, + key=member, + ) + except Exception: + logger.exception("Error pushing typing notif to remotes") @defer.inlineCallbacks def _recv_edu(self, origin, content): diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 714f0195c8..a39f0f7343 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -22,6 +22,7 @@ from synapse.util.metrics import Measure from synapse.util.async import sleep from synapse.types import get_localpart_from_id +from six import iteritems logger = logging.getLogger(__name__) @@ -123,6 +124,13 @@ class UserDirectoryHandler(object): ) @defer.inlineCallbacks + def handle_user_deactivated(self, user_id): + """Called when a user ID is deactivated + """ + yield self.store.remove_from_user_dir(user_id) + yield self.store.remove_from_user_in_public_room(user_id) + + @defer.inlineCallbacks def _unsafe_process(self): # If self.pos is None then means we haven't fetched it from DB if self.pos is None: @@ -403,7 +411,7 @@ class UserDirectoryHandler(object): if change: users_with_profile = yield self.state.get_current_user_in_room(room_id) - for user_id, profile in users_with_profile.iteritems(): + for user_id, profile in iteritems(users_with_profile): yield self._handle_new_user(room_id, user_id, profile) else: users = yield self.store.get_users_in_public_due_to_room(room_id) |