diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/__init__.py | 2 | ||||
-rw-r--r-- | synapse/handlers/_base.py | 2 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 7 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 28 | ||||
-rw-r--r-- | synapse/handlers/device.py | 13 | ||||
-rw-r--r-- | synapse/handlers/devicemessage.py | 16 | ||||
-rw-r--r-- | synapse/handlers/directory.py | 11 | ||||
-rw-r--r-- | synapse/handlers/e2e_keys.py | 25 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 92 | ||||
-rw-r--r-- | synapse/handlers/groups_local.py | 7 | ||||
-rw-r--r-- | synapse/handlers/message.py | 514 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 11 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 11 | ||||
-rw-r--r-- | synapse/handlers/read_marker.py | 6 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 2 | ||||
-rw-r--r-- | synapse/handlers/register.py | 36 | ||||
-rw-r--r-- | synapse/handlers/room.py | 24 | ||||
-rw-r--r-- | synapse/handlers/room_list.py | 5 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 334 | ||||
-rw-r--r-- | synapse/handlers/room_member_worker.py | 102 | ||||
-rw-r--r-- | synapse/handlers/set_password.py | 2 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 101 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 2 |
23 files changed, 939 insertions, 414 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 53213cdccf..8f8fd82eb0 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -17,7 +17,6 @@ from .register import RegistrationHandler from .room import ( RoomCreationHandler, RoomContextHandler, ) -from .room_member import RoomMemberHandler from .message import MessageHandler from .federation import FederationHandler from .directory import DirectoryHandler @@ -49,7 +48,6 @@ class Handlers(object): self.registration_handler = RegistrationHandler(hs) self.message_handler = MessageHandler(hs) self.room_creation_handler = RoomCreationHandler(hs) - self.room_member_handler = RoomMemberHandler(hs) self.federation_handler = FederationHandler(hs) self.directory_handler = DirectoryHandler(hs) self.admin_handler = AdminHandler(hs) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index faa5609c0c..e089e66fde 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -158,7 +158,7 @@ class BaseHandler(object): # homeserver. requester = synapse.types.create_requester( target_user, is_guest=True) - handler = self.hs.get_handlers().room_member_handler + handler = self.hs.get_room_member_handler() yield handler.update_membership( requester, target_user, diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index feca3e4c10..3dd3fa2a27 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -15,6 +15,7 @@ from twisted.internet import defer +import synapse from synapse.api.constants import EventTypes from synapse.util.metrics import Measure from synapse.util.logcontext import make_deferred_yieldable, preserve_fn @@ -23,6 +24,10 @@ import logging logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) + +events_processed_counter = metrics.register_counter("events_processed") + def log_failure(failure): logger.error( @@ -103,6 +108,8 @@ class ApplicationServicesHandler(object): service, event ) + events_processed_counter.inc_by(len(events)) + yield self.store.set_appservice_last_pos(upper_bound) finally: self.is_processing = False diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 573c9db8a1..a5365c4fe4 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -13,7 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +from twisted.internet import defer, threads from ._base import BaseHandler from synapse.api.constants import LoginType @@ -25,6 +25,7 @@ from synapse.module_api import ModuleApi from synapse.types import UserID from synapse.util.async import run_on_reactor from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.logcontext import make_deferred_yieldable from twisted.web.client import PartialDownloadError @@ -714,7 +715,7 @@ class AuthHandler(BaseHandler): if not lookupres: defer.returnValue(None) (user_id, password_hash) = lookupres - result = self.validate_hash(password, password_hash) + result = yield self.validate_hash(password, password_hash) if not result: logger.warn("Failed password login for user %s", user_id) defer.returnValue(None) @@ -842,10 +843,13 @@ class AuthHandler(BaseHandler): password (str): Password to hash. Returns: - Hashed password (str). + Deferred(str): Hashed password. """ - return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper, - bcrypt.gensalt(self.bcrypt_rounds)) + def _do_hash(): + return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper, + bcrypt.gensalt(self.bcrypt_rounds)) + + return make_deferred_yieldable(threads.deferToThread(_do_hash)) def validate_hash(self, password, stored_hash): """Validates that self.hash(password) == stored_hash. @@ -855,13 +859,19 @@ class AuthHandler(BaseHandler): stored_hash (str): Expected hash value. Returns: - Whether self.hash(password) == stored_hash (bool). + Deferred(bool): Whether self.hash(password) == stored_hash. """ + + def _do_validate_hash(): + return bcrypt.checkpw( + password.encode('utf8') + self.hs.config.password_pepper, + stored_hash.encode('utf8') + ) + if stored_hash: - return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper, - stored_hash.encode('utf8')) == stored_hash + return make_deferred_yieldable(threads.deferToThread(_do_validate_hash)) else: - return False + return defer.succeed(False) class MacaroonGeneartor(object): diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 2152efc692..40f3d24678 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -14,6 +14,7 @@ # limitations under the License. from synapse.api import errors from synapse.api.constants import EventTypes +from synapse.api.errors import FederationDeniedError from synapse.util import stringutils from synapse.util.async import Linearizer from synapse.util.caches.expiringcache import ExpiringCache @@ -36,14 +37,15 @@ class DeviceHandler(BaseHandler): self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() self.federation_sender = hs.get_federation_sender() - self.federation = hs.get_replication_layer() self._edu_updater = DeviceListEduUpdater(hs, self) - self.federation.register_edu_handler( + federation_registry = hs.get_federation_registry() + + federation_registry.register_edu_handler( "m.device_list_update", self._edu_updater.incoming_device_list_update, ) - self.federation.register_query_handler( + federation_registry.register_query_handler( "user_devices", self.on_federation_query_user_devices, ) @@ -429,7 +431,7 @@ class DeviceListEduUpdater(object): def __init__(self, hs, device_handler): self.store = hs.get_datastore() - self.federation = hs.get_replication_layer() + self.federation = hs.get_federation_client() self.clock = hs.get_clock() self.device_handler = device_handler @@ -513,6 +515,9 @@ class DeviceListEduUpdater(object): # This makes it more likely that the device lists will # eventually become consistent. return + except FederationDeniedError as e: + logger.info(e) + return except Exception: # TODO: Remember that we are now out of sync and try again # later diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index f7fad15c62..f147a20b73 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -17,7 +17,8 @@ import logging from twisted.internet import defer -from synapse.types import get_domain_from_id +from synapse.api.errors import SynapseError +from synapse.types import get_domain_from_id, UserID from synapse.util.stringutils import random_string @@ -33,10 +34,10 @@ class DeviceMessageHandler(object): """ self.store = hs.get_datastore() self.notifier = hs.get_notifier() - self.is_mine_id = hs.is_mine_id + self.is_mine = hs.is_mine self.federation = hs.get_federation_sender() - hs.get_replication_layer().register_edu_handler( + hs.get_federation_registry().register_edu_handler( "m.direct_to_device", self.on_direct_to_device_edu ) @@ -52,6 +53,12 @@ class DeviceMessageHandler(object): message_type = content["type"] message_id = content["message_id"] for user_id, by_device in content["messages"].items(): + # we use UserID.from_string to catch invalid user ids + if not self.is_mine(UserID.from_string(user_id)): + logger.warning("Request for keys for non-local user %s", + user_id) + raise SynapseError(400, "Not a user here") + messages_by_device = { device_id: { "content": message_content, @@ -77,7 +84,8 @@ class DeviceMessageHandler(object): local_messages = {} remote_messages = {} for user_id, by_device in messages.items(): - if self.is_mine_id(user_id): + # we use UserID.from_string to catch invalid user ids + if self.is_mine(UserID.from_string(user_id)): messages_by_device = { device_id: { "content": message_content, diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index a0464ae5c0..c5b6e75e03 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -34,9 +34,10 @@ class DirectoryHandler(BaseHandler): self.state = hs.get_state_handler() self.appservice_handler = hs.get_application_service_handler() + self.event_creation_handler = hs.get_event_creation_handler() - self.federation = hs.get_replication_layer() - self.federation.register_query_handler( + self.federation = hs.get_federation_client() + hs.get_federation_registry().register_query_handler( "directory", self.on_directory_query ) @@ -249,8 +250,7 @@ class DirectoryHandler(BaseHandler): def send_room_alias_update_event(self, requester, user_id, room_id): aliases = yield self.store.get_aliases_for_room(room_id) - msg_handler = self.hs.get_handlers().message_handler - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Aliases, @@ -272,8 +272,7 @@ class DirectoryHandler(BaseHandler): if not alias_event or alias_event.content.get("alias", "") != alias_str: return - msg_handler = self.hs.get_handlers().message_handler - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.CanonicalAlias, diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index ce2c87e400..80b359b2e7 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -19,8 +19,10 @@ import logging from canonicaljson import encode_canonical_json from twisted.internet import defer -from synapse.api.errors import SynapseError, CodeMessageException -from synapse.types import get_domain_from_id +from synapse.api.errors import ( + SynapseError, CodeMessageException, FederationDeniedError, +) +from synapse.types import get_domain_from_id, UserID from synapse.util.logcontext import preserve_fn, make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination @@ -30,15 +32,15 @@ logger = logging.getLogger(__name__) class E2eKeysHandler(object): def __init__(self, hs): self.store = hs.get_datastore() - self.federation = hs.get_replication_layer() + self.federation = hs.get_federation_client() self.device_handler = hs.get_device_handler() - self.is_mine_id = hs.is_mine_id + self.is_mine = hs.is_mine self.clock = hs.get_clock() # doesn't really work as part of the generic query API, because the # query request requires an object POST, but we abuse the # "query handler" interface. - self.federation.register_query_handler( + hs.get_federation_registry().register_query_handler( "client_keys", self.on_federation_query_client_keys ) @@ -70,7 +72,8 @@ class E2eKeysHandler(object): remote_queries = {} for user_id, device_ids in device_keys_query.items(): - if self.is_mine_id(user_id): + # we use UserID.from_string to catch invalid user ids + if self.is_mine(UserID.from_string(user_id)): local_query[user_id] = device_ids else: remote_queries[user_id] = device_ids @@ -139,6 +142,10 @@ class E2eKeysHandler(object): failures[destination] = { "status": 503, "message": "Not ready for retry", } + except FederationDeniedError as e: + failures[destination] = { + "status": 403, "message": "Federation Denied", + } except Exception as e: # include ConnectionRefused and other errors failures[destination] = { @@ -170,7 +177,8 @@ class E2eKeysHandler(object): result_dict = {} for user_id, device_ids in query.items(): - if not self.is_mine_id(user_id): + # we use UserID.from_string to catch invalid user ids + if not self.is_mine(UserID.from_string(user_id)): logger.warning("Request for keys for non-local user %s", user_id) raise SynapseError(400, "Not a user here") @@ -213,7 +221,8 @@ class E2eKeysHandler(object): remote_queries = {} for user_id, device_keys in query.get("one_time_keys", {}).items(): - if self.is_mine_id(user_id): + # we use UserID.from_string to catch invalid user ids + if self.is_mine(UserID.from_string(user_id)): for device_id, algorithm in device_keys.items(): local_query.append((user_id, device_id, algorithm)) else: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ac70730885..080aca3d71 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -22,6 +23,7 @@ from ._base import BaseHandler from synapse.api.errors import ( AuthError, FederationError, StoreError, CodeMessageException, SynapseError, + FederationDeniedError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator @@ -66,7 +68,7 @@ class FederationHandler(BaseHandler): self.hs = hs self.store = hs.get_datastore() - self.replication_layer = hs.get_replication_layer() + self.replication_layer = hs.get_federation_client() self.state_handler = hs.get_state_handler() self.server_name = hs.hostname self.keyring = hs.get_keyring() @@ -74,8 +76,7 @@ class FederationHandler(BaseHandler): self.is_mine_id = hs.is_mine_id self.pusher_pool = hs.get_pusherpool() self.spam_checker = hs.get_spam_checker() - - self.replication_layer.set_handler(self) + self.event_creation_handler = hs.get_event_creation_handler() # When joining a room we need to queue any events for that room up self.room_queues = {} @@ -782,6 +783,9 @@ class FederationHandler(BaseHandler): except NotRetryingDestination as e: logger.info(e.message) continue + except FederationDeniedError as e: + logger.info(e) + continue except Exception as e: logger.exception( "Failed to backfill from %s because %s", @@ -804,13 +808,12 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) logger.debug("calling resolve_state_groups in _maybe_backfill") + resolve = logcontext.preserve_fn( + self.state_handler.resolve_state_groups_for_events + ) states = yield logcontext.make_deferred_yieldable(defer.gatherResults( - [ - logcontext.preserve_fn(self.state_handler.resolve_state_groups)( - room_id, [e] - ) - for e in event_ids - ], consumeErrors=True, + [resolve(room_id, [e]) for e in event_ids], + consumeErrors=True, )) states = dict(zip(event_ids, [s.state for s in states])) @@ -1004,8 +1007,7 @@ class FederationHandler(BaseHandler): }) try: - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) except AuthError as e: @@ -1245,8 +1247,7 @@ class FederationHandler(BaseHandler): "state_key": user_id, }) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) @@ -1444,16 +1445,24 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - if not event.internal_metadata.is_outlier() and not backfilled: - yield self.action_generator.handle_push_actions_for_event( - event, context - ) + try: + if not event.internal_metadata.is_outlier() and not backfilled: + yield self.action_generator.handle_push_actions_for_event( + event, context + ) - event_stream_id, max_stream_id = yield self.store.persist_event( - event, - context=context, - backfilled=backfilled, - ) + event_stream_id, max_stream_id = yield self.store.persist_event( + event, + context=context, + backfilled=backfilled, + ) + except: # noqa: E722, as we reraise the exception this is fine. + # Ensure that we actually remove the entries in the push actions + # staging area + logcontext.preserve_fn( + self.store.remove_push_actions_from_staging + )(event.event_id) + raise if not backfilled: # this intentionally does not yield: we don't care about the result @@ -1828,8 +1837,8 @@ class FederationHandler(BaseHandler): current_state = set(e.event_id for e in auth_events.values()) different_auth = event_auth_events - current_state - self._update_context_for_auth_events( - context, auth_events, event_key, + yield self._update_context_for_auth_events( + event, context, auth_events, event_key, ) if different_auth and not event.internal_metadata.is_outlier(): @@ -1910,8 +1919,8 @@ class FederationHandler(BaseHandler): # 4. Look at rejects and their proofs. # TODO. - self._update_context_for_auth_events( - context, auth_events, event_key, + yield self._update_context_for_auth_events( + event, context, auth_events, event_key, ) try: @@ -1920,11 +1929,15 @@ class FederationHandler(BaseHandler): logger.warn("Failed auth resolution for %r because %s", event, e) raise e - def _update_context_for_auth_events(self, context, auth_events, + @defer.inlineCallbacks + def _update_context_for_auth_events(self, event, context, auth_events, event_key): - """Update the state_ids in an event context after auth event resolution + """Update the state_ids in an event context after auth event resolution, + storing the changes as a new state group. Args: + event (Event): The event we're handling the context for + context (synapse.events.snapshot.EventContext): event context to be updated @@ -1947,7 +1960,13 @@ class FederationHandler(BaseHandler): context.prev_state_ids.update({ k: a.event_id for k, a in auth_events.iteritems() }) - context.state_group = self.store.get_next_state_group() + context.state_group = yield self.store.store_state_group( + event.event_id, + event.room_id, + prev_group=context.prev_group, + delta_ids=context.delta_ids, + current_state_ids=context.current_state_ids, + ) @defer.inlineCallbacks def construct_auth_difference(self, local_auth, remote_auth): @@ -2117,8 +2136,7 @@ class FederationHandler(BaseHandler): if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder ) @@ -2133,7 +2151,7 @@ class FederationHandler(BaseHandler): raise e yield self._check_signature(event, context) - member_handler = self.hs.get_handlers().room_member_handler + member_handler = self.hs.get_room_member_handler() yield member_handler.send_membership_event(None, event, context) else: destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id)) @@ -2156,8 +2174,7 @@ class FederationHandler(BaseHandler): """ builder = self.event_builder_factory.new(event_dict) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) @@ -2178,7 +2195,7 @@ class FederationHandler(BaseHandler): # TODO: Make sure the signatures actually are correct. event.signatures.update(returned_invite.signatures) - member_handler = self.hs.get_handlers().room_member_handler + member_handler = self.hs.get_room_member_handler() yield member_handler.send_membership_event(None, event, context) @defer.inlineCallbacks @@ -2207,8 +2224,9 @@ class FederationHandler(BaseHandler): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event(builder=builder) + event, context = yield self.event_creation_handler.create_new_client_event( + builder=builder, + ) defer.returnValue((event, context)) @defer.inlineCallbacks diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 7e5d3f148d..e4d0cc8b02 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -383,11 +383,12 @@ class GroupsLocalHandler(object): defer.returnValue({"groups": result}) else: - result = yield self.transport_client.get_publicised_groups_for_user( - get_domain_from_id(user_id), user_id + bulk_result = yield self.transport_client.bulk_get_publicised_groups( + get_domain_from_id(user_id), [user_id], ) + result = bulk_result.get("users", {}).get(user_id) # TODO: Verify attestations - defer.returnValue(result) + defer.returnValue({"groups": result}) @defer.inlineCallbacks def bulk_get_publicised_groups(self, user_ids, proxy=True): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index d7413833ed..5a8ddc253e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014 - 2016 OpenMarket Ltd -# Copyright 2017 New Vector Ltd +# Copyright 2017 - 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,7 +13,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +from twisted.internet import defer, reactor +from twisted.python.failure import Failure from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError @@ -24,10 +25,12 @@ from synapse.types import ( UserID, RoomAlias, RoomStreamToken, ) from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, run_in_background from synapse.util.metrics import measure_func from synapse.util.frozenutils import unfreeze +from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client +from synapse.replication.http.send_event import send_event_to_master from ._base import BaseHandler @@ -40,6 +43,36 @@ import simplejson logger = logging.getLogger(__name__) +class PurgeStatus(object): + """Object tracking the status of a purge request + + This class contains information on the progress of a purge request, for + return by get_purge_status. + + Attributes: + status (int): Tracks whether this request has completed. One of + STATUS_{ACTIVE,COMPLETE,FAILED} + """ + + STATUS_ACTIVE = 0 + STATUS_COMPLETE = 1 + STATUS_FAILED = 2 + + STATUS_TEXT = { + STATUS_ACTIVE: "active", + STATUS_COMPLETE: "complete", + STATUS_FAILED: "failed", + } + + def __init__(self): + self.status = PurgeStatus.STATUS_ACTIVE + + def asdict(self): + return { + "status": PurgeStatus.STATUS_TEXT[self.status] + } + + class MessageHandler(BaseHandler): def __init__(self, hs): @@ -47,32 +80,89 @@ class MessageHandler(BaseHandler): self.hs = hs self.state = hs.get_state_handler() self.clock = hs.get_clock() - self.validator = EventValidator() - self.profile_handler = hs.get_profile_handler() self.pagination_lock = ReadWriteLock() + self._purges_in_progress_by_room = set() + # map from purge id to PurgeStatus + self._purges_by_id = {} - self.pusher_pool = hs.get_pusherpool() + def start_purge_history(self, room_id, topological_ordering, + delete_local_events=False): + """Start off a history purge on a room. - # We arbitrarily limit concurrent event creation for a room to 5. - # This is to stop us from diverging history *too* much. - self.limiter = Limiter(max_count=5) + Args: + room_id (str): The room to purge from - self.action_generator = hs.get_action_generator() + topological_ordering (int): minimum topo ordering to preserve + delete_local_events (bool): True to delete local events as well as + remote ones - self.spam_checker = hs.get_spam_checker() + Returns: + str: unique ID for this purge transaction. + """ + if room_id in self._purges_in_progress_by_room: + raise SynapseError( + 400, + "History purge already in progress for %s" % (room_id, ), + ) + + purge_id = random_string(16) + + # we log the purge_id here so that it can be tied back to the + # request id in the log lines. + logger.info("[purge] starting purge_id %s", purge_id) + + self._purges_by_id[purge_id] = PurgeStatus() + run_in_background( + self._purge_history, + purge_id, room_id, topological_ordering, delete_local_events, + ) + return purge_id @defer.inlineCallbacks - def purge_history(self, room_id, event_id): - event = yield self.store.get_event(event_id) + def _purge_history(self, purge_id, room_id, topological_ordering, + delete_local_events): + """Carry out a history purge on a room. + + Args: + purge_id (str): The id for this purge + room_id (str): The room to purge from + topological_ordering (int): minimum topo ordering to preserve + delete_local_events (bool): True to delete local events as well as + remote ones + + Returns: + Deferred + """ + self._purges_in_progress_by_room.add(room_id) + try: + with (yield self.pagination_lock.write(room_id)): + yield self.store.purge_history( + room_id, topological_ordering, delete_local_events, + ) + logger.info("[purge] complete") + self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE + except Exception: + logger.error("[purge] failed: %s", Failure().getTraceback().rstrip()) + self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED + finally: + self._purges_in_progress_by_room.discard(room_id) - if event.room_id != room_id: - raise SynapseError(400, "Event is for wrong room.") + # remove the purge from the list 24 hours after it completes + def clear_purge(): + del self._purges_by_id[purge_id] + reactor.callLater(24 * 3600, clear_purge) - depth = event.depth + def get_purge_status(self, purge_id): + """Get the current status of an active purge - with (yield self.pagination_lock.write(room_id)): - yield self.store.delete_old_state(room_id, depth) + Args: + purge_id (str): purge_id returned by start_purge_history + + Returns: + PurgeStatus|None + """ + return self._purges_by_id.get(purge_id) @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, @@ -183,6 +273,165 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks + def get_room_data(self, user_id=None, room_id=None, + event_type=None, state_key="", is_guest=False): + """ Get data from a room. + + Args: + event : The room path event + Returns: + The path data content. + Raises: + SynapseError if something went wrong. + """ + membership, membership_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + + if membership == Membership.JOIN: + data = yield self.state_handler.get_current_state( + room_id, event_type, state_key + ) + elif membership == Membership.LEAVE: + key = (event_type, state_key) + room_state = yield self.store.get_state_for_events( + [membership_event_id], [key] + ) + data = room_state[membership_event_id].get(key) + + defer.returnValue(data) + + @defer.inlineCallbacks + def _check_in_room_or_world_readable(self, room_id, user_id): + try: + # check_user_was_in_room will return the most recent membership + # event for the user if: + # * The user is a non-guest user, and was ever in the room + # * The user is a guest user, and has joined the room + # else it will throw. + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + defer.returnValue((member_event.membership, member_event.event_id)) + return + except AuthError: + visibility = yield self.state_handler.get_current_state( + room_id, EventTypes.RoomHistoryVisibility, "" + ) + if ( + visibility and + visibility.content["history_visibility"] == "world_readable" + ): + defer.returnValue((Membership.JOIN, None)) + return + raise AuthError( + 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN + ) + + @defer.inlineCallbacks + def get_state_events(self, user_id, room_id, is_guest=False): + """Retrieve all state events for a given room. If the user is + joined to the room then return the current state. If the user has + left the room return the state events from when they left. + + Args: + user_id(str): The user requesting state events. + room_id(str): The room ID to get all state events from. + Returns: + A list of dicts representing state events. [{}, {}, {}] + """ + membership, membership_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + + if membership == Membership.JOIN: + room_state = yield self.state_handler.get_current_state(room_id) + elif membership == Membership.LEAVE: + room_state = yield self.store.get_state_for_events( + [membership_event_id], None + ) + room_state = room_state[membership_event_id] + + now = self.clock.time_msec() + defer.returnValue( + [serialize_event(c, now) for c in room_state.values()] + ) + + @defer.inlineCallbacks + def get_joined_members(self, requester, room_id): + """Get all the joined members in the room and their profile information. + + If the user has left the room return the state events from when they left. + + Args: + requester(Requester): The user requesting state events. + room_id(str): The room ID to get all state events from. + Returns: + A dict of user_id to profile info + """ + user_id = requester.user.to_string() + if not requester.app_service: + # We check AS auth after fetching the room membership, as it + # requires us to pull out all joined members anyway. + membership, _ = yield self._check_in_room_or_world_readable( + room_id, user_id + ) + if membership != Membership.JOIN: + raise NotImplementedError( + "Getting joined members after leaving is not implemented" + ) + + users_with_profile = yield self.state.get_current_user_in_room(room_id) + + # If this is an AS, double check that they are allowed to see the members. + # This can either be because the AS user is in the room or becuase there + # is a user in the room that the AS is "interested in" + if requester.app_service and user_id not in users_with_profile: + for uid in users_with_profile: + if requester.app_service.is_interested_in_user(uid): + break + else: + # Loop fell through, AS has no interested users in room + raise AuthError(403, "Appservice not in room") + + defer.returnValue({ + user_id: { + "avatar_url": profile.avatar_url, + "display_name": profile.display_name, + } + for user_id, profile in users_with_profile.iteritems() + }) + + +class EventCreationHandler(object): + def __init__(self, hs): + self.hs = hs + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.state = hs.get_state_handler() + self.clock = hs.get_clock() + self.validator = EventValidator() + self.profile_handler = hs.get_profile_handler() + self.event_builder_factory = hs.get_event_builder_factory() + self.server_name = hs.hostname + self.ratelimiter = hs.get_ratelimiter() + self.notifier = hs.get_notifier() + self.config = hs.config + + self.http_client = hs.get_simple_http_client() + + # This is only used to get at ratelimit function, and maybe_kick_guest_users + self.base_handler = BaseHandler(hs) + + self.pusher_pool = hs.get_pusherpool() + + # We arbitrarily limit concurrent event creation for a room to 5. + # This is to stop us from diverging history *too* much. + self.limiter = Limiter(max_count=5) + + self.action_generator = hs.get_action_generator() + + self.spam_checker = hs.get_spam_checker() + + @defer.inlineCallbacks def create_event(self, requester, event_dict, token_id=None, txn_id=None, prev_event_ids=None): """ @@ -234,7 +483,7 @@ class MessageHandler(BaseHandler): if txn_id is not None: builder.internal_metadata.txn_id = txn_id - event, context = yield self._create_new_client_event( + event, context = yield self.create_new_client_event( builder=builder, requester=requester, prev_event_ids=prev_event_ids, @@ -259,11 +508,6 @@ class MessageHandler(BaseHandler): "Tried to send member event through non-member codepath" ) - # We check here if we are currently being rate limited, so that we - # don't do unnecessary work. We check again just before we actually - # send the event. - yield self.ratelimit(requester, update=False) - user = UserID.from_string(event.sender) assert self.hs.is_mine(user), "User must be our own: %s" % (user,) @@ -280,12 +524,6 @@ class MessageHandler(BaseHandler): ratelimit=ratelimit, ) - if event.type == EventTypes.Message: - presence = self.hs.get_presence_handler() - # We don't want to block sending messages on any presence code. This - # matters as sometimes presence code can take a while. - preserve_fn(presence.bump_presence_active_time)(user) - @defer.inlineCallbacks def deduplicate_state_event(self, event, context): """ @@ -342,137 +580,9 @@ class MessageHandler(BaseHandler): ) defer.returnValue(event) + @measure_func("create_new_client_event") @defer.inlineCallbacks - def get_room_data(self, user_id=None, room_id=None, - event_type=None, state_key="", is_guest=False): - """ Get data from a room. - - Args: - event : The room path event - Returns: - The path data content. - Raises: - SynapseError if something went wrong. - """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - - if membership == Membership.JOIN: - data = yield self.state_handler.get_current_state( - room_id, event_type, state_key - ) - elif membership == Membership.LEAVE: - key = (event_type, state_key) - room_state = yield self.store.get_state_for_events( - [membership_event_id], [key] - ) - data = room_state[membership_event_id].get(key) - - defer.returnValue(data) - - @defer.inlineCallbacks - def _check_in_room_or_world_readable(self, room_id, user_id): - try: - # check_user_was_in_room will return the most recent membership - # event for the user if: - # * The user is a non-guest user, and was ever in the room - # * The user is a guest user, and has joined the room - # else it will throw. - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - defer.returnValue((member_event.membership, member_event.event_id)) - return - except AuthError: - visibility = yield self.state_handler.get_current_state( - room_id, EventTypes.RoomHistoryVisibility, "" - ) - if ( - visibility and - visibility.content["history_visibility"] == "world_readable" - ): - defer.returnValue((Membership.JOIN, None)) - return - raise AuthError( - 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN - ) - - @defer.inlineCallbacks - def get_state_events(self, user_id, room_id, is_guest=False): - """Retrieve all state events for a given room. If the user is - joined to the room then return the current state. If the user has - left the room return the state events from when they left. - - Args: - user_id(str): The user requesting state events. - room_id(str): The room ID to get all state events from. - Returns: - A list of dicts representing state events. [{}, {}, {}] - """ - membership, membership_event_id = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - - if membership == Membership.JOIN: - room_state = yield self.state_handler.get_current_state(room_id) - elif membership == Membership.LEAVE: - room_state = yield self.store.get_state_for_events( - [membership_event_id], None - ) - room_state = room_state[membership_event_id] - - now = self.clock.time_msec() - defer.returnValue( - [serialize_event(c, now) for c in room_state.values()] - ) - - @defer.inlineCallbacks - def get_joined_members(self, requester, room_id): - """Get all the joined members in the room and their profile information. - - If the user has left the room return the state events from when they left. - - Args: - requester(Requester): The user requesting state events. - room_id(str): The room ID to get all state events from. - Returns: - A dict of user_id to profile info - """ - user_id = requester.user.to_string() - if not requester.app_service: - # We check AS auth after fetching the room membership, as it - # requires us to pull out all joined members anyway. - membership, _ = yield self._check_in_room_or_world_readable( - room_id, user_id - ) - if membership != Membership.JOIN: - raise NotImplementedError( - "Getting joined members after leaving is not implemented" - ) - - users_with_profile = yield self.state.get_current_user_in_room(room_id) - - # If this is an AS, double check that they are allowed to see the members. - # This can either be because the AS user is in the room or becuase there - # is a user in the room that the AS is "interested in" - if requester.app_service and user_id not in users_with_profile: - for uid in users_with_profile: - if requester.app_service.is_interested_in_user(uid): - break - else: - # Loop fell through, AS has no interested users in room - raise AuthError(403, "Appservice not in room") - - defer.returnValue({ - user_id: { - "avatar_url": profile.avatar_url, - "display_name": profile.display_name, - } - for user_id, profile in users_with_profile.iteritems() - }) - - @measure_func("_create_new_client_event") - @defer.inlineCallbacks - def _create_new_client_event(self, builder, requester=None, prev_event_ids=None): + def create_new_client_event(self, builder, requester=None, prev_event_ids=None): if prev_event_ids: prev_events = yield self.store.add_event_hashes(prev_event_ids) prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) @@ -509,9 +619,7 @@ class MessageHandler(BaseHandler): builder.prev_events = prev_events builder.depth = depth - state_handler = self.state_handler - - context = yield state_handler.compute_event_context(builder) + context = yield self.state.compute_event_context(builder) if requester: context.app_service = requester.app_service @@ -546,12 +654,21 @@ class MessageHandler(BaseHandler): event, context, ratelimit=True, - extra_users=[] + extra_users=[], ): - # We now need to go and hit out to wherever we need to hit out to. + """Processes a new event. This includes checking auth, persisting it, + notifying users, sending to remote servers, etc. - if ratelimit: - yield self.ratelimit(requester) + If called from a worker will hit out to the master process for final + processing. + + Args: + requester (Requester) + event (FrozenEvent) + context (EventContext) + ratelimit (bool) + extra_users (list(UserID)): Any extra users to notify about event + """ try: yield self.auth.check_from_context(event, context) @@ -567,7 +684,58 @@ class MessageHandler(BaseHandler): logger.exception("Failed to encode content: %r", event.content) raise - yield self.maybe_kick_guest_users(event, context) + yield self.action_generator.handle_push_actions_for_event( + event, context + ) + + try: + # If we're a worker we need to hit out to the master. + if self.config.worker_app: + yield send_event_to_master( + self.http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + requester=requester, + event=event, + context=context, + ratelimit=ratelimit, + extra_users=extra_users, + ) + return + + yield self.persist_and_notify_client_event( + requester, + event, + context, + ratelimit=ratelimit, + extra_users=extra_users, + ) + except: # noqa: E722, as we reraise the exception this is fine. + # Ensure that we actually remove the entries in the push actions + # staging area, if we calculated them. + preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id) + raise + + @defer.inlineCallbacks + def persist_and_notify_client_event( + self, + requester, + event, + context, + ratelimit=True, + extra_users=[], + ): + """Called when we have fully built the event, have already + calculated the push actions for the event, and checked auth. + + This should only be run on master. + """ + assert not self.config.worker_app + + if ratelimit: + yield self.base_handler.ratelimit(requester) + + yield self.base_handler.maybe_kick_guest_users(event, context) if event.type == EventTypes.CanonicalAlias: # Check the alias is acually valid (at this time at least) @@ -660,10 +828,6 @@ class MessageHandler(BaseHandler): "Changing the room create event is forbidden", ) - yield self.action_generator.handle_push_actions_for_event( - event, context - ) - (event_stream_id, max_stream_id) = yield self.store.persist_event( event, context=context ) @@ -683,3 +847,9 @@ class MessageHandler(BaseHandler): ) preserve_fn(_notify)() + + if event.type == EventTypes.Message: + presence = self.hs.get_presence_handler() + # We don't want to block sending messages on any presence code. This + # matters as sometimes presence code can take a while. + preserve_fn(presence.bump_presence_active_time)(requester.user) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index cb158ba962..a5e501897c 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -93,29 +93,30 @@ class PresenceHandler(object): self.store = hs.get_datastore() self.wheel_timer = WheelTimer() self.notifier = hs.get_notifier() - self.replication = hs.get_replication_layer() self.federation = hs.get_federation_sender() self.state = hs.get_state_handler() - self.replication.register_edu_handler( + federation_registry = hs.get_federation_registry() + + federation_registry.register_edu_handler( "m.presence", self.incoming_presence ) - self.replication.register_edu_handler( + federation_registry.register_edu_handler( "m.presence_invite", lambda origin, content: self.invite_presence( observed_user=UserID.from_string(content["observed_user"]), observer_user=UserID.from_string(content["observer_user"]), ) ) - self.replication.register_edu_handler( + federation_registry.register_edu_handler( "m.presence_accept", lambda origin, content: self.accept_presence( observed_user=UserID.from_string(content["observed_user"]), observer_user=UserID.from_string(content["observer_user"]), ) ) - self.replication.register_edu_handler( + federation_registry.register_edu_handler( "m.presence_deny", lambda origin, content: self.deny_presence( observed_user=UserID.from_string(content["observed_user"]), diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 9800e24453..3465a787ab 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -31,14 +31,17 @@ class ProfileHandler(BaseHandler): def __init__(self, hs): super(ProfileHandler, self).__init__(hs) - self.federation = hs.get_replication_layer() - self.federation.register_query_handler( + self.federation = hs.get_federation_client() + hs.get_federation_registry().register_query_handler( "profile", self.on_profile_query ) self.user_directory_handler = hs.get_user_directory_handler() - self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS) + if hs.config.worker_app is None: + self.clock.looping_call( + self._update_remote_profile_cache, self.PROFILE_UPDATE_MS, + ) @defer.inlineCallbacks def get_profile(self, user_id): @@ -233,7 +236,7 @@ class ProfileHandler(BaseHandler): ) for room_id in room_ids: - handler = self.hs.get_handlers().room_member_handler + handler = self.hs.get_room_member_handler() try: # Assume the target_user isn't a guest, # because we don't let guests set profile or avatar data. diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index b5b0303d54..5142ae153d 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -41,9 +41,9 @@ class ReadMarkerHandler(BaseHandler): """ with (yield self.read_marker_linearizer.queue((room_id, user_id))): - account_data = yield self.store.get_account_data_for_room(user_id, room_id) - - existing_read_marker = account_data.get("m.fully_read", None) + existing_read_marker = yield self.store.get_account_data_for_room_and_type( + user_id, room_id, "m.fully_read", + ) should_update = True diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 0525765272..3f215c2b4e 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -35,7 +35,7 @@ class ReceiptsHandler(BaseHandler): self.store = hs.get_datastore() self.hs = hs self.federation = hs.get_federation_sender() - hs.get_replication_layer().register_edu_handler( + hs.get_federation_registry().register_edu_handler( "m.receipt", self._received_remote_receipt ) self.clock = self.hs.get_clock() diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 4bc6ef51fe..ed5939880a 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -25,6 +25,7 @@ from synapse.http.client import CaptchaServerHttpClient from synapse import types from synapse.types import UserID from synapse.util.async import run_on_reactor +from synapse.util.threepids import check_3pid_allowed from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -131,7 +132,7 @@ class RegistrationHandler(BaseHandler): yield run_on_reactor() password_hash = None if password: - password_hash = self.auth_handler().hash(password) + password_hash = yield self.auth_handler().hash(password) if localpart: yield self.check_username(localpart, guest_access_token=guest_access_token) @@ -293,7 +294,7 @@ class RegistrationHandler(BaseHandler): """ for c in threepidCreds: - logger.info("validating theeepidcred sid %s on id server %s", + logger.info("validating threepidcred sid %s on id server %s", c['sid'], c['idServer']) try: identity_handler = self.hs.get_handlers().identity_handler @@ -307,6 +308,11 @@ class RegistrationHandler(BaseHandler): logger.info("got threepid with medium '%s' and address '%s'", threepid['medium'], threepid['address']) + if not check_3pid_allowed(self.hs, threepid['medium'], threepid['address']): + raise RegistrationError( + 403, "Third party identifier is not allowed" + ) + @defer.inlineCallbacks def bind_emails(self, user_id, threepidCreds): """Links emails with a user ID and informs an identity server. @@ -440,16 +446,34 @@ class RegistrationHandler(BaseHandler): return self.hs.get_auth_handler() @defer.inlineCallbacks - def guest_access_token_for(self, medium, address, inviter_user_id): + def get_or_register_3pid_guest(self, medium, address, inviter_user_id): + """Get a guest access token for a 3PID, creating a guest account if + one doesn't already exist. + + Args: + medium (str) + address (str) + inviter_user_id (str): The user ID who is trying to invite the + 3PID + + Returns: + Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the + 3PID guest account. + """ access_token = yield self.store.get_3pid_guest_access_token(medium, address) if access_token: - defer.returnValue(access_token) + user_info = yield self.auth.get_user_by_access_token( + access_token + ) - _, access_token = yield self.register( + defer.returnValue((user_info["user"].to_string(), access_token)) + + user_id, access_token = yield self.register( generate_token=True, make_guest=True ) access_token = yield self.store.save_or_get_3pid_guest_access_token( medium, address, access_token, inviter_user_id ) - defer.returnValue(access_token) + + defer.returnValue((user_id, access_token)) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d1cc87a016..8df8fcbbad 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014 - 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -64,6 +65,7 @@ class RoomCreationHandler(BaseHandler): super(RoomCreationHandler, self).__init__(hs) self.spam_checker = hs.get_spam_checker() + self.event_creation_handler = hs.get_event_creation_handler() @defer.inlineCallbacks def create_room(self, requester, config, ratelimit=True): @@ -163,13 +165,11 @@ class RoomCreationHandler(BaseHandler): creation_content = config.get("creation_content", {}) - msg_handler = self.hs.get_handlers().message_handler - room_member_handler = self.hs.get_handlers().room_member_handler + room_member_handler = self.hs.get_room_member_handler() yield self._send_events_for_new_room( requester, room_id, - msg_handler, room_member_handler, preset_config=preset_config, invite_list=invite_list, @@ -181,7 +181,7 @@ class RoomCreationHandler(BaseHandler): if "name" in config: name = config["name"] - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Name, @@ -194,7 +194,7 @@ class RoomCreationHandler(BaseHandler): if "topic" in config: topic = config["topic"] - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( requester, { "type": EventTypes.Topic, @@ -224,7 +224,7 @@ class RoomCreationHandler(BaseHandler): id_server = invite_3pid["id_server"] address = invite_3pid["address"] medium = invite_3pid["medium"] - yield self.hs.get_handlers().room_member_handler.do_3pid_invite( + yield self.hs.get_room_member_handler().do_3pid_invite( room_id, requester.user, medium, @@ -249,7 +249,6 @@ class RoomCreationHandler(BaseHandler): self, creator, # A Requester object. room_id, - msg_handler, room_member_handler, preset_config, invite_list, @@ -272,7 +271,7 @@ class RoomCreationHandler(BaseHandler): @defer.inlineCallbacks def send(etype, content, **kwargs): event = create(etype, content, **kwargs) - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_handler.create_and_send_nonmember_event( creator, event, ratelimit=False @@ -476,12 +475,9 @@ class RoomEventSource(object): user.to_string() ) if app_service: - events, end_key = yield self.store.get_appservice_room_stream( - service=app_service, - from_key=from_key, - to_key=to_key, - limit=limit, - ) + # We no longer support AS users using /sync directly. + # See https://github.com/matrix-org/matrix-doc/issues/1144 + raise NotImplementedError() else: room_events = yield self.store.get_membership_changes_for_user( user.to_string(), from_key, to_key diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index bb40075387..5d81f59b44 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -203,7 +203,8 @@ class RoomListHandler(BaseHandler): if limit: step = limit + 1 else: - step = len(rooms_to_scan) + # step cannot be zero + step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1 chunk = [] for i in xrange(0, len(rooms_to_scan), step): @@ -408,7 +409,7 @@ class RoomListHandler(BaseHandler): def _get_remote_list_cached(self, server_name, limit=None, since_token=None, search_filter=None, include_all_networks=False, third_party_instance_id=None,): - repl_layer = self.hs.get_replication_layer() + repl_layer = self.hs.get_federation_client() if search_filter: # We can't cache when asking for search return repl_layer.get_public_rooms( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 7e6467cd1d..9977be8831 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - +import abc import logging from signedjson.key import decode_verify_key_bytes @@ -29,32 +30,121 @@ from synapse.api.errors import AuthError, SynapseError, Codes from synapse.types import UserID, RoomID from synapse.util.async import Linearizer from synapse.util.distributor import user_left_room, user_joined_room -from ._base import BaseHandler + logger = logging.getLogger(__name__) id_server_scheme = "https://" -class RoomMemberHandler(BaseHandler): +class RoomMemberHandler(object): # TODO(paul): This handler currently contains a messy conflation of # low-level API that works on UserID objects and so on, and REST-level # API that takes ID strings and returns pagination chunks. These concerns # ought to be separated out a lot better. - def __init__(self, hs): - super(RoomMemberHandler, self).__init__(hs) + __metaclass__ = abc.ABCMeta + def __init__(self, hs): + self.hs = hs + self.store = hs.get_datastore() + self.auth = hs.get_auth() + self.state_handler = hs.get_state_handler() + self.config = hs.config + self.simple_http_client = hs.get_simple_http_client() + + self.federation_handler = hs.get_handlers().federation_handler + self.directory_handler = hs.get_handlers().directory_handler + self.registration_handler = hs.get_handlers().registration_handler self.profile_handler = hs.get_profile_handler() + self.event_creation_hander = hs.get_event_creation_handler() self.member_linearizer = Linearizer(name="member") self.clock = hs.get_clock() self.spam_checker = hs.get_spam_checker() - self.distributor = hs.get_distributor() - self.distributor.declare("user_joined_room") - self.distributor.declare("user_left_room") + @abc.abstractmethod + def _remote_join(self, requester, remote_room_hosts, room_id, user, content): + """Try and join a room that this server is not in + + Args: + requester (Requester) + remote_room_hosts (list[str]): List of servers that can be used + to join via. + room_id (str): Room that we are trying to join + user (UserID): User who is trying to join + content (dict): A dict that should be used as the content of the + join event. + + Returns: + Deferred + """ + raise NotImplementedError() + + @abc.abstractmethod + def _remote_reject_invite(self, remote_room_hosts, room_id, target): + """Attempt to reject an invite for a room this server is not in. If we + fail to do so we locally mark the invite as rejected. + + Args: + requester (Requester) + remote_room_hosts (list[str]): List of servers to use to try and + reject invite + room_id (str) + target (UserID): The user rejecting the invite + + Returns: + Deferred[dict]: A dictionary to be returned to the client, may + include event_id etc, or nothing if we locally rejected + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): + """Get a guest access token for a 3PID, creating a guest account if + one doesn't already exist. + + Args: + requester (Requester) + medium (str) + address (str) + inviter_user_id (str): The user ID who is trying to invite the + 3PID + + Returns: + Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the + 3PID guest account. + """ + raise NotImplementedError() + + @abc.abstractmethod + def _user_joined_room(self, target, room_id): + """Notifies distributor on master process that the user has joined the + room. + + Args: + target (UserID) + room_id (str) + + Returns: + Deferred|None + """ + raise NotImplementedError() + + @abc.abstractmethod + def _user_left_room(self, target, room_id): + """Notifies distributor on master process that the user has left the + room. + + Args: + target (UserID) + room_id (str) + + Returns: + Deferred|None + """ + raise NotImplementedError() @defer.inlineCallbacks def _local_membership_update( @@ -66,13 +156,12 @@ class RoomMemberHandler(BaseHandler): ): if content is None: content = {} - msg_handler = self.hs.get_handlers().message_handler content["membership"] = membership if requester.is_guest: content["kind"] = "guest" - event, context = yield msg_handler.create_event( + event, context = yield self.event_creation_hander.create_event( requester, { "type": EventTypes.Member, @@ -90,12 +179,14 @@ class RoomMemberHandler(BaseHandler): ) # Check if this event matches the previous membership event for the user. - duplicate = yield msg_handler.deduplicate_state_event(event, context) + duplicate = yield self.event_creation_hander.deduplicate_state_event( + event, context, + ) if duplicate is not None: # Discard the new event since this membership change is a no-op. defer.returnValue(duplicate) - yield msg_handler.handle_new_client_event( + yield self.event_creation_hander.handle_new_client_event( requester, event, context, @@ -117,33 +208,16 @@ class RoomMemberHandler(BaseHandler): prev_member_event = yield self.store.get_event(prev_member_event_id) newly_joined = prev_member_event.membership != Membership.JOIN if newly_joined: - yield user_joined_room(self.distributor, target, room_id) + yield self._user_joined_room(target, room_id) elif event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = yield self.store.get_event(prev_member_event_id) if prev_member_event.membership == Membership.JOIN: - user_left_room(self.distributor, target, room_id) + yield self._user_left_room(target, room_id) defer.returnValue(event) @defer.inlineCallbacks - def remote_join(self, remote_room_hosts, room_id, user, content): - if len(remote_room_hosts) == 0: - raise SynapseError(404, "No known servers") - - # We don't do an auth check if we are doing an invite - # join dance for now, since we're kinda implicitly checking - # that we are allowed to join when we decide whether or not we - # need to do the invite/join dance. - yield self.hs.get_handlers().federation_handler.do_invite_join( - remote_room_hosts, - room_id, - user.to_string(), - content, - ) - yield user_joined_room(self.distributor, user, room_id) - - @defer.inlineCallbacks def update_membership( self, requester, @@ -201,8 +275,7 @@ class RoomMemberHandler(BaseHandler): # if this is a join with a 3pid signature, we may need to turn a 3pid # invite into a normal invite before we can handle the join. if third_party_signed is not None: - replication = self.hs.get_replication_layer() - yield replication.exchange_third_party_invite( + yield self.federation_handler.exchange_third_party_invite( third_party_signed["sender"], target.to_string(), room_id, @@ -223,7 +296,7 @@ class RoomMemberHandler(BaseHandler): requester.user, ) if not is_requester_admin: - if self.hs.config.block_non_admin_invites: + if self.config.block_non_admin_invites: logger.info( "Blocking invite: user is not admin and non-admin " "invites disabled" @@ -282,7 +355,7 @@ class RoomMemberHandler(BaseHandler): raise AuthError(403, "Guest access not allowed") if not is_host_in_room: - inviter = yield self.get_inviter(target.to_string(), room_id) + inviter = yield self._get_inviter(target.to_string(), room_id) if inviter and not self.hs.is_mine(inviter): remote_room_hosts.append(inviter.domain) @@ -296,15 +369,15 @@ class RoomMemberHandler(BaseHandler): if requester.is_guest: content["kind"] = "guest" - ret = yield self.remote_join( - remote_room_hosts, room_id, target, content + ret = yield self._remote_join( + requester, remote_room_hosts, room_id, target, content ) defer.returnValue(ret) elif effective_membership_state == Membership.LEAVE: if not is_host_in_room: # perhaps we've been invited - inviter = yield self.get_inviter(target.to_string(), room_id) + inviter = yield self._get_inviter(target.to_string(), room_id) if not inviter: raise SynapseError(404, "Not a known room") @@ -318,28 +391,10 @@ class RoomMemberHandler(BaseHandler): else: # send the rejection to the inviter's HS. remote_room_hosts = remote_room_hosts + [inviter.domain] - fed_handler = self.hs.get_handlers().federation_handler - try: - ret = yield fed_handler.do_remotely_reject_invite( - remote_room_hosts, - room_id, - target.to_string(), - ) - defer.returnValue(ret) - except Exception as e: - # if we were unable to reject the exception, just mark - # it as rejected on our end and plough ahead. - # - # The 'except' clause is very broad, but we need to - # capture everything from DNS failures upwards - # - logger.warn("Failed to reject invite: %s", e) - - yield self.store.locally_reject_invite( - target.to_string(), room_id - ) - - defer.returnValue({}) + res = yield self._remote_reject_invite( + requester, remote_room_hosts, room_id, target, + ) + defer.returnValue(res) res = yield self._local_membership_update( requester=requester, @@ -394,8 +449,9 @@ class RoomMemberHandler(BaseHandler): else: requester = synapse.types.create_requester(target_user) - message_handler = self.hs.get_handlers().message_handler - prev_event = yield message_handler.deduplicate_state_event(event, context) + prev_event = yield self.event_creation_hander.deduplicate_state_event( + event, context, + ) if prev_event is not None: return @@ -412,7 +468,7 @@ class RoomMemberHandler(BaseHandler): if is_blocked: raise SynapseError(403, "This room has been blocked on this server") - yield message_handler.handle_new_client_event( + yield self.event_creation_hander.handle_new_client_event( requester, event, context, @@ -434,12 +490,12 @@ class RoomMemberHandler(BaseHandler): prev_member_event = yield self.store.get_event(prev_member_event_id) newly_joined = prev_member_event.membership != Membership.JOIN if newly_joined: - yield user_joined_room(self.distributor, target_user, room_id) + yield self._user_joined_room(target_user, room_id) elif event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = yield self.store.get_event(prev_member_event_id) if prev_member_event.membership == Membership.JOIN: - user_left_room(self.distributor, target_user, room_id) + yield self._user_left_room(target_user, room_id) @defer.inlineCallbacks def _can_guest_join(self, current_state_ids): @@ -473,7 +529,7 @@ class RoomMemberHandler(BaseHandler): Raises: SynapseError if room alias could not be found. """ - directory_handler = self.hs.get_handlers().directory_handler + directory_handler = self.directory_handler mapping = yield directory_handler.get_association(room_alias) if not mapping: @@ -485,7 +541,7 @@ class RoomMemberHandler(BaseHandler): defer.returnValue((RoomID.from_string(room_id), servers)) @defer.inlineCallbacks - def get_inviter(self, user_id, room_id): + def _get_inviter(self, user_id, room_id): invite = yield self.store.get_invite_for_user_in_room( user_id=user_id, room_id=room_id, @@ -504,7 +560,7 @@ class RoomMemberHandler(BaseHandler): requester, txn_id ): - if self.hs.config.block_non_admin_invites: + if self.config.block_non_admin_invites: is_requester_admin = yield self.auth.is_server_admin( requester.user, ) @@ -551,7 +607,7 @@ class RoomMemberHandler(BaseHandler): str: the matrix ID of the 3pid, or None if it is not recognized. """ try: - data = yield self.hs.get_simple_http_client().get_json( + data = yield self.simple_http_client.get_json( "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,), { "medium": medium, @@ -562,7 +618,7 @@ class RoomMemberHandler(BaseHandler): if "mxid" in data: if "signatures" not in data: raise AuthError(401, "No signatures on 3pid binding") - self.verify_any_signature(data, id_server) + yield self._verify_any_signature(data, id_server) defer.returnValue(data["mxid"]) except IOError as e: @@ -570,11 +626,11 @@ class RoomMemberHandler(BaseHandler): defer.returnValue(None) @defer.inlineCallbacks - def verify_any_signature(self, data, server_hostname): + def _verify_any_signature(self, data, server_hostname): if server_hostname not in data["signatures"]: raise AuthError(401, "No signature from server %s" % (server_hostname,)) for key_name, signature in data["signatures"][server_hostname].items(): - key_data = yield self.hs.get_simple_http_client().get_json( + key_data = yield self.simple_http_client.get_json( "%s%s/_matrix/identity/api/v1/pubkey/%s" % (id_server_scheme, server_hostname, key_name,), ) @@ -599,7 +655,7 @@ class RoomMemberHandler(BaseHandler): user, txn_id ): - room_state = yield self.hs.get_state_handler().get_current_state(room_id) + room_state = yield self.state_handler.get_current_state(room_id) inviter_display_name = "" inviter_avatar_url = "" @@ -630,6 +686,7 @@ class RoomMemberHandler(BaseHandler): token, public_keys, fallback_public_key, display_name = ( yield self._ask_id_server_for_third_party_invite( + requester=requester, id_server=id_server, medium=medium, address=address, @@ -644,8 +701,7 @@ class RoomMemberHandler(BaseHandler): ) ) - msg_handler = self.hs.get_handlers().message_handler - yield msg_handler.create_and_send_nonmember_event( + yield self.event_creation_hander.create_and_send_nonmember_event( requester, { "type": EventTypes.ThirdPartyInvite, @@ -667,6 +723,7 @@ class RoomMemberHandler(BaseHandler): @defer.inlineCallbacks def _ask_id_server_for_third_party_invite( self, + requester, id_server, medium, address, @@ -683,6 +740,7 @@ class RoomMemberHandler(BaseHandler): Asks an identity server for a third party invite. Args: + requester (Requester) id_server (str): hostname + optional port for the identity server. medium (str): The literal string "email". address (str): The third party address being invited. @@ -724,24 +782,20 @@ class RoomMemberHandler(BaseHandler): "sender_avatar_url": inviter_avatar_url, } - if self.hs.config.invite_3pid_guest: - registration_handler = self.hs.get_handlers().registration_handler - guest_access_token = yield registration_handler.guest_access_token_for( + if self.config.invite_3pid_guest: + guest_access_token, guest_user_id = yield self.get_or_register_3pid_guest( + requester=requester, medium=medium, address=address, inviter_user_id=inviter_user_id, ) - guest_user_info = yield self.hs.get_auth().get_user_by_access_token( - guest_access_token - ) - invite_config.update({ "guest_access_token": guest_access_token, - "guest_user_id": guest_user_info["user"].to_string(), + "guest_user_id": guest_user_id, }) - data = yield self.hs.get_simple_http_client().post_urlencoded_get_json( + data = yield self.simple_http_client.post_urlencoded_get_json( is_url, invite_config ) @@ -764,27 +818,6 @@ class RoomMemberHandler(BaseHandler): defer.returnValue((token, public_keys, fallback_public_key, display_name)) @defer.inlineCallbacks - def forget(self, user, room_id): - user_id = user.to_string() - - member = yield self.state_handler.get_current_state( - room_id=room_id, - event_type=EventTypes.Member, - state_key=user_id - ) - membership = member.membership if member else None - - if membership is not None and membership not in [ - Membership.LEAVE, Membership.BAN - ]: - raise SynapseError(400, "User %s in room %s" % ( - user_id, room_id - )) - - if membership: - yield self.store.forget(user_id, room_id) - - @defer.inlineCallbacks def _is_host_in_room(self, current_state_ids): # Have we just created the room, and is this about to be the very # first member event? @@ -805,3 +838,94 @@ class RoomMemberHandler(BaseHandler): defer.returnValue(True) defer.returnValue(False) + + +class RoomMemberMasterHandler(RoomMemberHandler): + def __init__(self, hs): + super(RoomMemberMasterHandler, self).__init__(hs) + + self.distributor = hs.get_distributor() + self.distributor.declare("user_joined_room") + self.distributor.declare("user_left_room") + + @defer.inlineCallbacks + def _remote_join(self, requester, remote_room_hosts, room_id, user, content): + """Implements RoomMemberHandler._remote_join + """ + if len(remote_room_hosts) == 0: + raise SynapseError(404, "No known servers") + + # We don't do an auth check if we are doing an invite + # join dance for now, since we're kinda implicitly checking + # that we are allowed to join when we decide whether or not we + # need to do the invite/join dance. + yield self.federation_handler.do_invite_join( + remote_room_hosts, + room_id, + user.to_string(), + content, + ) + yield self._user_joined_room(user, room_id) + + @defer.inlineCallbacks + def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): + """Implements RoomMemberHandler._remote_reject_invite + """ + fed_handler = self.federation_handler + try: + ret = yield fed_handler.do_remotely_reject_invite( + remote_room_hosts, + room_id, + target.to_string(), + ) + defer.returnValue(ret) + except Exception as e: + # if we were unable to reject the exception, just mark + # it as rejected on our end and plough ahead. + # + # The 'except' clause is very broad, but we need to + # capture everything from DNS failures upwards + # + logger.warn("Failed to reject invite: %s", e) + + yield self.store.locally_reject_invite( + target.to_string(), room_id + ) + defer.returnValue({}) + + def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): + """Implements RoomMemberHandler.get_or_register_3pid_guest + """ + rg = self.registration_handler + return rg.get_or_register_3pid_guest(medium, address, inviter_user_id) + + def _user_joined_room(self, target, room_id): + """Implements RoomMemberHandler._user_joined_room + """ + return user_joined_room(self.distributor, target, room_id) + + def _user_left_room(self, target, room_id): + """Implements RoomMemberHandler._user_left_room + """ + return user_left_room(self.distributor, target, room_id) + + @defer.inlineCallbacks + def forget(self, user, room_id): + user_id = user.to_string() + + member = yield self.state_handler.get_current_state( + room_id=room_id, + event_type=EventTypes.Member, + state_key=user_id + ) + membership = member.membership if member else None + + if membership is not None and membership not in [ + Membership.LEAVE, Membership.BAN + ]: + raise SynapseError(400, "User %s in room %s" % ( + user_id, room_id + )) + + if membership: + yield self.store.forget(user_id, room_id) diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py new file mode 100644 index 0000000000..493aec1e48 --- /dev/null +++ b/synapse/handlers/room_member_worker.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.errors import SynapseError +from synapse.handlers.room_member import RoomMemberHandler +from synapse.replication.http.membership import ( + remote_join, remote_reject_invite, get_or_register_3pid_guest, + notify_user_membership_change, +) + + +logger = logging.getLogger(__name__) + + +class RoomMemberWorkerHandler(RoomMemberHandler): + @defer.inlineCallbacks + def _remote_join(self, requester, remote_room_hosts, room_id, user, content): + """Implements RoomMemberHandler._remote_join + """ + if len(remote_room_hosts) == 0: + raise SynapseError(404, "No known servers") + + ret = yield remote_join( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + requester=requester, + remote_room_hosts=remote_room_hosts, + room_id=room_id, + user_id=user.to_string(), + content=content, + ) + + yield self._user_joined_room(user, room_id) + + defer.returnValue(ret) + + def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): + """Implements RoomMemberHandler._remote_reject_invite + """ + return remote_reject_invite( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + requester=requester, + remote_room_hosts=remote_room_hosts, + room_id=room_id, + user_id=target.to_string(), + ) + + def _user_joined_room(self, target, room_id): + """Implements RoomMemberHandler._user_joined_room + """ + return notify_user_membership_change( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + user_id=target.to_string(), + room_id=room_id, + change="joined", + ) + + def _user_left_room(self, target, room_id): + """Implements RoomMemberHandler._user_left_room + """ + return notify_user_membership_change( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + user_id=target.to_string(), + room_id=room_id, + change="left", + ) + + def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): + """Implements RoomMemberHandler.get_or_register_3pid_guest + """ + return get_or_register_3pid_guest( + self.simple_http_client, + host=self.config.worker_replication_host, + port=self.config.worker_replication_http_port, + requester=requester, + medium=medium, + address=address, + inviter_user_id=inviter_user_id, + ) diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py index 44414e1dc1..e057ae54c9 100644 --- a/synapse/handlers/set_password.py +++ b/synapse/handlers/set_password.py @@ -31,7 +31,7 @@ class SetPasswordHandler(BaseHandler): @defer.inlineCallbacks def set_password(self, user_id, newpassword, requester=None): - password_hash = self._auth_handler.hash(newpassword) + password_hash = yield self._auth_handler.hash(newpassword) except_device_id = requester.device_id if requester else None except_access_token_id = requester.access_token_id if requester else None diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b12988f3c9..0f713ce038 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -235,10 +235,10 @@ class SyncHandler(object): defer.returnValue(rules) @defer.inlineCallbacks - def ephemeral_by_room(self, sync_config, now_token, since_token=None): + def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None): """Get the ephemeral events for each room the user is in Args: - sync_config (SyncConfig): The flags, filters and user for the sync. + sync_result_builder(SyncResultBuilder) now_token (StreamToken): Where the server is currently up to. since_token (StreamToken): Where the server was when the client last synced. @@ -248,10 +248,12 @@ class SyncHandler(object): typing events for that room. """ + sync_config = sync_result_builder.sync_config + with Measure(self.clock, "ephemeral_by_room"): typing_key = since_token.typing_key if since_token else "0" - room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string()) + room_ids = sync_result_builder.joined_room_ids typing_source = self.event_sources.sources["typing"] typing, typing_key = yield typing_source.get_new_events( @@ -565,10 +567,22 @@ class SyncHandler(object): # Always use the `now_token` in `SyncResultBuilder` now_token = yield self.event_sources.get_current_token() + user_id = sync_config.user.to_string() + app_service = self.store.get_app_service_by_user_id(user_id) + if app_service: + # We no longer support AS users using /sync directly. + # See https://github.com/matrix-org/matrix-doc/issues/1144 + raise NotImplementedError() + else: + joined_room_ids = yield self.get_rooms_for_user_at( + user_id, now_token.room_stream_id, + ) + sync_result_builder = SyncResultBuilder( sync_config, full_state, since_token=since_token, now_token=now_token, + joined_room_ids=joined_room_ids, ) account_data_by_room = yield self._generate_sync_entry_for_account_data( @@ -603,7 +617,6 @@ class SyncHandler(object): device_id = sync_config.device_id one_time_key_counts = {} if device_id: - user_id = sync_config.user.to_string() one_time_key_counts = yield self.store.count_e2e_one_time_keys( user_id, device_id ) @@ -891,7 +904,7 @@ class SyncHandler(object): ephemeral_by_room = {} else: now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_result_builder.sync_config, + sync_result_builder, now_token=sync_result_builder.now_token, since_token=sync_result_builder.since_token, ) @@ -996,15 +1009,8 @@ class SyncHandler(object): if rooms_changed: defer.returnValue(True) - app_service = self.store.get_app_service_by_user_id(user_id) - if app_service: - rooms = yield self.store.get_app_service_rooms(app_service) - joined_room_ids = set(r.room_id for r in rooms) - else: - joined_room_ids = yield self.store.get_rooms_for_user(user_id) - stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream - for room_id in joined_room_ids: + for room_id in sync_result_builder.joined_room_ids: if self.store.has_room_changed_since(room_id, stream_id): defer.returnValue(True) defer.returnValue(False) @@ -1028,13 +1034,6 @@ class SyncHandler(object): assert since_token - app_service = self.store.get_app_service_by_user_id(user_id) - if app_service: - rooms = yield self.store.get_app_service_rooms(app_service) - joined_room_ids = set(r.room_id for r in rooms) - else: - joined_room_ids = yield self.store.get_rooms_for_user(user_id) - # Get a list of membership change events that have happened. rooms_changed = yield self.store.get_membership_changes_for_user( user_id, since_token.room_key, now_token.room_key @@ -1057,7 +1056,7 @@ class SyncHandler(object): # we do send down the room, and with full state, where necessary old_state_ids = None - if room_id in joined_room_ids and non_joins: + if room_id in sync_result_builder.joined_room_ids and non_joins: # Always include if the user (re)joined the room, especially # important so that device list changes are calculated correctly. # If there are non join member events, but we are still in the room, @@ -1067,7 +1066,7 @@ class SyncHandler(object): # User is in the room so we don't need to do the invite/leave checks continue - if room_id in joined_room_ids or has_join: + if room_id in sync_result_builder.joined_room_ids or has_join: old_state_ids = yield self.get_state_at(room_id, since_token) old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) old_mem_ev = None @@ -1079,7 +1078,7 @@ class SyncHandler(object): newly_joined_rooms.append(room_id) # If user is in the room then we don't need to do the invite/leave checks - if room_id in joined_room_ids: + if room_id in sync_result_builder.joined_room_ids: continue if not non_joins: @@ -1146,7 +1145,7 @@ class SyncHandler(object): # Get all events for rooms we're currently joined to. room_to_events = yield self.store.get_room_events_stream_for_rooms( - room_ids=joined_room_ids, + room_ids=sync_result_builder.joined_room_ids, from_key=since_token.room_key, to_key=now_token.room_key, limit=timeline_limit + 1, @@ -1154,7 +1153,7 @@ class SyncHandler(object): # We loop through all room ids, even if there are no new events, in case # there are non room events taht we need to notify about. - for room_id in joined_room_ids: + for room_id in sync_result_builder.joined_room_ids: room_entry = room_to_events.get(room_id, None) if room_entry: @@ -1362,6 +1361,54 @@ class SyncHandler(object): else: raise Exception("Unrecognized rtype: %r", room_builder.rtype) + @defer.inlineCallbacks + def get_rooms_for_user_at(self, user_id, stream_ordering): + """Get set of joined rooms for a user at the given stream ordering. + + The stream ordering *must* be recent, otherwise this may throw an + exception if older than a month. (This function is called with the + current token, which should be perfectly fine). + + Args: + user_id (str) + stream_ordering (int) + + ReturnValue: + Deferred[frozenset[str]]: Set of room_ids the user is in at given + stream_ordering. + """ + joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering( + user_id, + ) + + joined_room_ids = set() + + # We need to check that the stream ordering of the join for each room + # is before the stream_ordering asked for. This might not be the case + # if the user joins a room between us getting the current token and + # calling `get_rooms_for_user_with_stream_ordering`. + # If the membership's stream ordering is after the given stream + # ordering, we need to go and work out if the user was in the room + # before. + for room_id, membership_stream_ordering in joined_rooms: + if membership_stream_ordering <= stream_ordering: + joined_room_ids.add(room_id) + continue + + logger.info("User joined room after current token: %s", room_id) + + extrems = yield self.store.get_forward_extremeties_for_room( + room_id, stream_ordering, + ) + users_in_room = yield self.state.get_current_user_in_room( + room_id, extrems, + ) + if user_id in users_in_room: + joined_room_ids.add(room_id) + + joined_room_ids = frozenset(joined_room_ids) + defer.returnValue(joined_room_ids) + def _action_has_highlight(actions): for action in actions: @@ -1411,7 +1458,8 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): class SyncResultBuilder(object): "Used to help build up a new SyncResult for a user" - def __init__(self, sync_config, full_state, since_token, now_token): + def __init__(self, sync_config, full_state, since_token, now_token, + joined_room_ids): """ Args: sync_config(SyncConfig) @@ -1423,6 +1471,7 @@ class SyncResultBuilder(object): self.full_state = full_state self.since_token = since_token self.now_token = now_token + self.joined_room_ids = joined_room_ids self.presence = [] self.account_data = [] diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 82dedbbc99..77c0cf146f 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -56,7 +56,7 @@ class TypingHandler(object): self.federation = hs.get_federation_sender() - hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu) + hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu) hs.get_distributor().observe("user_left_room", self.user_left_room) |