diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/__init__.py | 23 | ||||
-rw-r--r-- | synapse/handlers/_base.py | 369 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 15 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 142 | ||||
-rw-r--r-- | synapse/handlers/directory.py | 3 | ||||
-rw-r--r-- | synapse/handlers/events.py | 2 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 233 | ||||
-rw-r--r-- | synapse/handlers/message.py | 277 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 214 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 43 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 33 | ||||
-rw-r--r-- | synapse/handlers/register.py | 63 | ||||
-rw-r--r-- | synapse/handlers/room.py | 709 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 677 | ||||
-rw-r--r-- | synapse/handlers/search.py | 17 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 1216 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 143 |
17 files changed, 2234 insertions, 1945 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 66d2c01123..d28e07f0d9 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -13,23 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.appservice.scheduler import AppServiceScheduler -from synapse.appservice.api import ApplicationServiceApi from .register import RegistrationHandler from .room import ( - RoomCreationHandler, RoomMemberHandler, RoomListHandler, RoomContextHandler, + RoomCreationHandler, RoomContextHandler, ) +from .room_member import RoomMemberHandler from .message import MessageHandler from .events import EventStreamHandler, EventHandler from .federation import FederationHandler from .profile import ProfileHandler -from .presence import PresenceHandler from .directory import DirectoryHandler -from .typing import TypingNotificationHandler from .admin import AdminHandler -from .appservice import ApplicationServicesHandler -from .sync import SyncHandler -from .auth import AuthHandler from .identity import IdentityHandler from .receipts import ReceiptsHandler from .search import SearchHandler @@ -52,22 +46,9 @@ class Handlers(object): self.event_handler = EventHandler(hs) self.federation_handler = FederationHandler(hs) self.profile_handler = ProfileHandler(hs) - self.presence_handler = PresenceHandler(hs) - self.room_list_handler = RoomListHandler(hs) self.directory_handler = DirectoryHandler(hs) - self.typing_notification_handler = TypingNotificationHandler(hs) self.admin_handler = AdminHandler(hs) self.receipts_handler = ReceiptsHandler(hs) - asapi = ApplicationServiceApi(hs) - self.appservice_handler = ApplicationServicesHandler( - hs, asapi, AppServiceScheduler( - clock=hs.get_clock(), - store=hs.get_datastore(), - as_api=asapi - ) - ) - self.sync_handler = SyncHandler(hs) - self.auth_handler = AuthHandler(hs) self.identity_handler = IdentityHandler(hs) self.search_handler = SearchHandler(hs) self.room_context_handler = RoomContextHandler(hs) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 90eabb6eb7..c904c6c500 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -15,13 +15,10 @@ from twisted.internet import defer -from synapse.api.errors import LimitExceededError, SynapseError, AuthError -from synapse.crypto.event_signing import add_hashes_and_signatures +from synapse.api.errors import LimitExceededError from synapse.api.constants import Membership, EventTypes -from synapse.types import UserID, RoomAlias, Requester -from synapse.push.action_generator import ActionGenerator +from synapse.types import UserID, Requester -from synapse.util.logcontext import PreserveLoggingContext import logging @@ -29,20 +26,13 @@ import logging logger = logging.getLogger(__name__) -VISIBILITY_PRIORITY = ( - "world_readable", - "shared", - "invited", - "joined", -) - - class BaseHandler(object): """ Common base class for the event handlers. - :type store: synapse.storage.events.StateStore - :type state_handler: synapse.state.StateHandler + Attributes: + store (synapse.storage.events.StateStore): + state_handler (synapse.state.StateHandler): """ def __init__(self, hs): @@ -55,137 +45,10 @@ class BaseHandler(object): self.clock = hs.get_clock() self.hs = hs - self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname self.event_builder_factory = hs.get_event_builder_factory() - @defer.inlineCallbacks - def filter_events_for_clients(self, user_tuples, events, event_id_to_state): - """ Returns dict of user_id -> list of events that user is allowed to - see. - - :param (str, bool) user_tuples: (user id, is_peeking) for each - user to be checked. is_peeking should be true if: - * the user is not currently a member of the room, and: - * the user has not been a member of the room since the given - events - """ - forgotten = yield defer.gatherResults([ - self.store.who_forgot_in_room( - room_id, - ) - for room_id in frozenset(e.room_id for e in events) - ], consumeErrors=True) - - # Set of membership event_ids that have been forgotten - event_id_forgotten = frozenset( - row["event_id"] for rows in forgotten for row in rows - ) - - def allowed(event, user_id, is_peeking): - state = event_id_to_state[event.event_id] - - # get the room_visibility at the time of the event. - visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None) - if visibility_event: - visibility = visibility_event.content.get("history_visibility", "shared") - else: - visibility = "shared" - - if visibility not in VISIBILITY_PRIORITY: - visibility = "shared" - - # if it was world_readable, it's easy: everyone can read it - if visibility == "world_readable": - return True - - # Always allow history visibility events on boundaries. This is done - # by setting the effective visibility to the least restrictive - # of the old vs new. - if event.type == EventTypes.RoomHistoryVisibility: - prev_content = event.unsigned.get("prev_content", {}) - prev_visibility = prev_content.get("history_visibility", None) - - if prev_visibility not in VISIBILITY_PRIORITY: - prev_visibility = "shared" - - new_priority = VISIBILITY_PRIORITY.index(visibility) - old_priority = VISIBILITY_PRIORITY.index(prev_visibility) - if old_priority < new_priority: - visibility = prev_visibility - - # get the user's membership at the time of the event. (or rather, - # just *after* the event. Which means that people can see their - # own join events, but not (currently) their own leave events.) - membership_event = state.get((EventTypes.Member, user_id), None) - if membership_event: - if membership_event.event_id in event_id_forgotten: - membership = None - else: - membership = membership_event.membership - else: - membership = None - - # if the user was a member of the room at the time of the event, - # they can see it. - if membership == Membership.JOIN: - return True - - if visibility == "joined": - # we weren't a member at the time of the event, so we can't - # see this event. - return False - - elif visibility == "invited": - # user can also see the event if they were *invited* at the time - # of the event. - return membership == Membership.INVITE - - else: - # visibility is shared: user can also see the event if they have - # become a member since the event - # - # XXX: if the user has subsequently joined and then left again, - # ideally we would share history up to the point they left. But - # we don't know when they left. - return not is_peeking - - defer.returnValue({ - user_id: [ - event - for event in events - if allowed(event, user_id, is_peeking) - ] - for user_id, is_peeking in user_tuples - }) - - @defer.inlineCallbacks - def _filter_events_for_client(self, user_id, events, is_peeking=False): - """ - Check which events a user is allowed to see - - :param str user_id: user id to be checked - :param [synapse.events.EventBase] events: list of events to be checked - :param bool is_peeking should be True if: - * the user is not currently a member of the room, and: - * the user has not been a member of the room since the given - events - :rtype [synapse.events.EventBase] - """ - types = ( - (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, user_id), - ) - event_id_to_state = yield self.store.get_state_for_events( - frozenset(e.event_id for e in events), - types=types - ) - res = yield self.filter_events_for_clients( - [(user_id, is_peeking)], events, event_id_to_state - ) - defer.returnValue(res.get(user_id, [])) - def ratelimit(self, requester): time_now = self.clock.time() allowed, time_allowed = self.ratelimiter.send_message( @@ -198,95 +61,6 @@ class BaseHandler(object): retry_after_ms=int(1000 * (time_allowed - time_now)), ) - @defer.inlineCallbacks - def _create_new_client_event(self, builder): - latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( - builder.room_id, - ) - - if latest_ret: - depth = max([d for _, _, d in latest_ret]) + 1 - else: - depth = 1 - - prev_events = [ - (event_id, prev_hashes) - for event_id, prev_hashes, _ in latest_ret - ] - - builder.prev_events = prev_events - builder.depth = depth - - state_handler = self.state_handler - - context = yield state_handler.compute_event_context(builder) - - # If we've received an invite over federation, there are no latest - # events in the room, because we don't know enough about the graph - # fragment we received to treat it like a graph, so the above returned - # no relevant events. It may have returned some events (if we have - # joined and left the room), but not useful ones, like the invite. - if ( - not self.is_host_in_room(context.current_state) and - builder.type == EventTypes.Member - ): - prev_member_event = yield self.store.get_room_member( - builder.sender, builder.room_id - ) - - # The prev_member_event may already be in context.current_state, - # despite us not being present in the room; in particular, if - # inviting user, and all other local users, have already left. - # - # In that case, we have all the information we need, and we don't - # want to drop "context" - not least because we may need to handle - # the invite locally, which will require us to have the whole - # context (not just prev_member_event) to auth it. - # - context_event_ids = ( - e.event_id for e in context.current_state.values() - ) - - if ( - prev_member_event and - prev_member_event.event_id not in context_event_ids - ): - # The prev_member_event is missing from context, so it must - # have arrived over federation and is an outlier. We forcibly - # set our context to the invite we received over federation - builder.prev_events = ( - prev_member_event.event_id, - prev_member_event.prev_events - ) - - context = yield state_handler.compute_event_context( - builder, - old_state=(prev_member_event,), - outlier=True - ) - - if builder.is_state(): - builder.prev_state = yield self.store.add_event_hashes( - context.prev_state_events - ) - - yield self.auth.add_auth_events(builder, context) - - add_hashes_and_signatures( - builder, self.server_name, self.signing_key - ) - - event = builder.build() - - logger.debug( - "Created event %s with current state: %s", - event.event_id, context.current_state, - ) - - defer.returnValue( - (event, context,) - ) - def is_host_in_room(self, current_state): room_members = [ (state_key, event.membership) @@ -301,144 +75,13 @@ class BaseHandler(object): return True for (state_key, membership) in room_members: if ( - UserID.from_string(state_key).domain == self.hs.hostname + self.hs.is_mine_id(state_key) and membership == Membership.JOIN ): return True return False @defer.inlineCallbacks - def handle_new_client_event( - self, - requester, - event, - context, - ratelimit=True, - extra_users=[] - ): - # We now need to go and hit out to wherever we need to hit out to. - - if ratelimit: - self.ratelimit(requester) - - self.auth.check(event, auth_events=context.current_state) - - yield self.maybe_kick_guest_users(event, context.current_state.values()) - - if event.type == EventTypes.CanonicalAlias: - # Check the alias is acually valid (at this time at least) - room_alias_str = event.content.get("alias", None) - if room_alias_str: - room_alias = RoomAlias.from_string(room_alias_str) - directory_handler = self.hs.get_handlers().directory_handler - mapping = yield directory_handler.get_association(room_alias) - - if mapping["room_id"] != event.room_id: - raise SynapseError( - 400, - "Room alias %s does not point to the room" % ( - room_alias_str, - ) - ) - - federation_handler = self.hs.get_handlers().federation_handler - - if event.type == EventTypes.Member: - if event.content["membership"] == Membership.INVITE: - def is_inviter_member_event(e): - return ( - e.type == EventTypes.Member and - e.sender == event.sender - ) - - event.unsigned["invite_room_state"] = [ - { - "type": e.type, - "state_key": e.state_key, - "content": e.content, - "sender": e.sender, - } - for k, e in context.current_state.items() - if e.type in self.hs.config.room_invite_state_types - or is_inviter_member_event(e) - ] - - invitee = UserID.from_string(event.state_key) - if not self.hs.is_mine(invitee): - # TODO: Can we add signature from remote server in a nicer - # way? If we have been invited by a remote server, we need - # to get them to sign the event. - - returned_invite = yield federation_handler.send_invite( - invitee.domain, - event, - ) - - event.unsigned.pop("room_state", None) - - # TODO: Make sure the signatures actually are correct. - event.signatures.update( - returned_invite.signatures - ) - - if event.type == EventTypes.Redaction: - if self.auth.check_redaction(event, auth_events=context.current_state): - original_event = yield self.store.get_event( - event.redacts, - check_redacted=False, - get_prev_content=False, - allow_rejected=False, - allow_none=False - ) - if event.user_id != original_event.user_id: - raise AuthError( - 403, - "You don't have permission to redact events" - ) - - if event.type == EventTypes.Create and context.current_state: - raise AuthError( - 403, - "Changing the room create event is forbidden", - ) - - action_generator = ActionGenerator(self.hs) - yield action_generator.handle_push_actions_for_event( - event, context, self - ) - - (event_stream_id, max_stream_id) = yield self.store.persist_event( - event, context=context - ) - - destinations = set() - for k, s in context.current_state.items(): - try: - if k[0] == EventTypes.Member: - if s.content["membership"] == Membership.JOIN: - destinations.add( - UserID.from_string(s.state_key).domain - ) - except SynapseError: - logger.warn( - "Failed to get destination from event %s", s.event_id - ) - - with PreserveLoggingContext(): - # Don't block waiting on waking up all the listeners. - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) - - # If invite, remove room_state from unsigned before sending. - event.unsigned.pop("invite_room_state", None) - - federation_handler.handle_new_event( - event, destinations=destinations, - ) - - @defer.inlineCallbacks def maybe_kick_guest_users(self, event, current_state): # Technically this function invalidates current_state by changing it. # Hopefully this isn't that important to the caller. diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 75fc74c797..051ccdb380 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.appservice import ApplicationService -from synapse.types import UserID import logging @@ -35,16 +34,13 @@ def log_failure(failure): ) -# NB: Purposefully not inheriting BaseHandler since that contains way too much -# setup code which this handler does not need or use. This makes testing a lot -# easier. class ApplicationServicesHandler(object): - def __init__(self, hs, appservice_api, appservice_scheduler): + def __init__(self, hs): self.store = hs.get_datastore() - self.hs = hs - self.appservice_api = appservice_api - self.scheduler = appservice_scheduler + self.is_mine_id = hs.is_mine_id + self.appservice_api = hs.get_application_service_api() + self.scheduler = hs.get_application_service_scheduler() self.started_scheduler = False @defer.inlineCallbacks @@ -169,8 +165,7 @@ class ApplicationServicesHandler(object): @defer.inlineCallbacks def _is_unknown_user(self, user_id): - user = UserID.from_string(user_id) - if not self.hs.is_mine(user): + if not self.is_mine_id(user_id): # we don't know if they are unknown or not since it isn't one of our # users. We can't poke ASes. defer.returnValue(False) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 82d458b424..200793b5ed 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -18,7 +18,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.constants import LoginType from synapse.types import UserID -from synapse.api.errors import AuthError, LoginError, Codes +from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError from synapse.util.async import run_on_reactor from twisted.web.client import PartialDownloadError @@ -49,6 +49,21 @@ class AuthHandler(BaseHandler): self.sessions = {} self.INVALID_TOKEN_HTTP_STATUS = 401 + self.ldap_enabled = hs.config.ldap_enabled + self.ldap_server = hs.config.ldap_server + self.ldap_port = hs.config.ldap_port + self.ldap_tls = hs.config.ldap_tls + self.ldap_search_base = hs.config.ldap_search_base + self.ldap_search_property = hs.config.ldap_search_property + self.ldap_email_property = hs.config.ldap_email_property + self.ldap_full_name_property = hs.config.ldap_full_name_property + + if self.ldap_enabled is True: + import ldap + logger.info("Import ldap version: %s", ldap.__version__) + + self.hs = hs # FIXME better possibility to access registrationHandler later? + @defer.inlineCallbacks def check_auth(self, flows, clientdict, clientip): """ @@ -163,9 +178,13 @@ class AuthHandler(BaseHandler): def get_session_id(self, clientdict): """ Gets the session ID for a client given the client dictionary - :param clientdict: The dictionary sent by the client in the request - :return: The string session ID the client sent. If the client did not - send a session ID, returns None. + + Args: + clientdict: The dictionary sent by the client in the request + + Returns: + str|None: The string session ID the client sent. If the client did + not send a session ID, returns None. """ sid = None if clientdict and 'auth' in clientdict: @@ -179,9 +198,11 @@ class AuthHandler(BaseHandler): Store a key-value pair into the sessions data associated with this request. This data is stored server-side and cannot be modified by the client. - :param session_id: (string) The ID of this session as returned from check_auth - :param key: (string) The key to store the data under - :param value: (any) The data to store + + Args: + session_id (string): The ID of this session as returned from check_auth + key (string): The key to store the data under + value (any): The data to store """ sess = self._get_session_info(session_id) sess.setdefault('serverdict', {})[key] = value @@ -190,9 +211,11 @@ class AuthHandler(BaseHandler): def get_session_data(self, session_id, key, default=None): """ Retrieve data stored with set_session_data - :param session_id: (string) The ID of this session as returned from check_auth - :param key: (string) The key to store the data under - :param default: (any) Value to return if the key has not been set + + Args: + session_id (string): The ID of this session as returned from check_auth + key (string): The key to store the data under + default (any): Value to return if the key has not been set """ sess = self._get_session_info(session_id) return sess.setdefault('serverdict', {}).get(key, default) @@ -207,8 +230,10 @@ class AuthHandler(BaseHandler): if not user_id.startswith('@'): user_id = UserID.create(user_id, self.hs.hostname).to_string() - user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id) - self._check_password(user_id, password, password_hash) + if not (yield self._check_password(user_id, password)): + logger.warn("Failed password login for user %s", user_id) + raise LoginError(403, "", errcode=Codes.FORBIDDEN) + defer.returnValue(user_id) @defer.inlineCallbacks @@ -332,8 +357,10 @@ class AuthHandler(BaseHandler): StoreError if there was a problem storing the token. LoginError if there was an authentication problem. """ - user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id) - self._check_password(user_id, password, password_hash) + + if not (yield self._check_password(user_id, password)): + logger.warn("Failed password login for user %s", user_id) + raise LoginError(403, "", errcode=Codes.FORBIDDEN) logger.info("Logging in user %s", user_id) access_token = yield self.issue_access_token(user_id) @@ -399,11 +426,67 @@ class AuthHandler(BaseHandler): else: defer.returnValue(user_infos.popitem()) - def _check_password(self, user_id, password, stored_hash): - """Checks that user_id has passed password, raises LoginError if not.""" - if not self.validate_hash(password, stored_hash): - logger.warn("Failed password login for user %s", user_id) - raise LoginError(403, "", errcode=Codes.FORBIDDEN) + @defer.inlineCallbacks + def _check_password(self, user_id, password): + """ + Returns: + True if the user_id successfully authenticated + """ + valid_ldap = yield self._check_ldap_password(user_id, password) + if valid_ldap: + defer.returnValue(True) + + valid_local_password = yield self._check_local_password(user_id, password) + if valid_local_password: + defer.returnValue(True) + + defer.returnValue(False) + + @defer.inlineCallbacks + def _check_local_password(self, user_id, password): + try: + user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id) + defer.returnValue(self.validate_hash(password, password_hash)) + except LoginError: + defer.returnValue(False) + + @defer.inlineCallbacks + def _check_ldap_password(self, user_id, password): + if not self.ldap_enabled: + logger.debug("LDAP not configured") + defer.returnValue(False) + + import ldap + + logger.info("Authenticating %s with LDAP" % user_id) + try: + ldap_url = "%s:%s" % (self.ldap_server, self.ldap_port) + logger.debug("Connecting LDAP server at %s" % ldap_url) + l = ldap.initialize(ldap_url) + if self.ldap_tls: + logger.debug("Initiating TLS") + self._connection.start_tls_s() + + local_name = UserID.from_string(user_id).localpart + + dn = "%s=%s, %s" % ( + self.ldap_search_property, + local_name, + self.ldap_search_base) + logger.debug("DN for LDAP authentication: %s" % dn) + + l.simple_bind_s(dn.encode('utf-8'), password.encode('utf-8')) + + if not (yield self.does_user_exist(user_id)): + handler = self.hs.get_handlers().registration_handler + user_id, access_token = ( + yield handler.register(localpart=local_name) + ) + + defer.returnValue(True) + except ldap.LDAPError, e: + logger.warn("LDAP error: %s", e) + defer.returnValue(False) @defer.inlineCallbacks def issue_access_token(self, user_id): @@ -438,14 +521,19 @@ class AuthHandler(BaseHandler): )) return m.serialize() - def generate_short_term_login_token(self, user_id): + def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)): macaroon = self._generate_base_macaroon(user_id) macaroon.add_first_party_caveat("type = login") now = self.hs.get_clock().time_msec() - expiry = now + (2 * 60 * 1000) + expiry = now + duration_in_ms macaroon.add_first_party_caveat("time < %d" % (expiry,)) return macaroon.serialize() + def generate_delete_pusher_token(self, user_id): + macaroon = self._generate_base_macaroon(user_id) + macaroon.add_first_party_caveat("type = delete_pusher") + return macaroon.serialize() + def validate_short_term_login_token_and_get_user_id(self, login_token): try: macaroon = pymacaroons.Macaroon.deserialize(login_token) @@ -480,7 +568,12 @@ class AuthHandler(BaseHandler): except_access_token_ids = [requester.access_token_id] if requester else [] - yield self.store.user_set_password_hash(user_id, password_hash) + try: + yield self.store.user_set_password_hash(user_id, password_hash) + except StoreError as e: + if e.code == 404: + raise SynapseError(404, "Unknown user", Codes.NOT_FOUND) + raise e yield self.store.user_delete_access_tokens( user_id, except_access_token_ids ) @@ -532,4 +625,7 @@ class AuthHandler(BaseHandler): Returns: Whether self.hash(password) == stored_hash (bool). """ - return bcrypt.hashpw(password, stored_hash) == stored_hash + if stored_hash: + return bcrypt.hashpw(password, stored_hash) == stored_hash + else: + return False diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 8eeb225811..4bea7f2b19 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -33,6 +33,7 @@ class DirectoryHandler(BaseHandler): super(DirectoryHandler, self).__init__(hs) self.state = hs.get_state_handler() + self.appservice_handler = hs.get_application_service_handler() self.federation = hs.get_replication_layer() self.federation.register_query_handler( @@ -281,7 +282,7 @@ class DirectoryHandler(BaseHandler): ) if not result: # Query AS to see if it exists - as_handler = self.hs.get_handlers().appservice_handler + as_handler = self.appservice_handler result = yield as_handler.query_room_alias_exists(room_alias) defer.returnValue(result) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index f25a252523..3a3a1257d3 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -58,7 +58,7 @@ class EventStreamHandler(BaseHandler): If `only_keys` is not None, events from keys will be sent down. """ auth_user = UserID.from_string(auth_user_id) - presence_handler = self.hs.get_handlers().presence_handler + presence_handler = self.hs.get_presence_handler() context = yield presence_handler.user_syncing( auth_user_id, affect_presence=affect_presence, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 267fedf114..ff83c608e7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -26,20 +26,21 @@ from synapse.api.errors import ( from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze from synapse.crypto.event_signing import ( compute_event_signature, add_hashes_and_signatures, ) -from synapse.types import UserID +from synapse.types import UserID, get_domain_from_id from synapse.events.utils import prune_event from synapse.util.retryutils import NotRetryingDestination from synapse.push.action_generator import ActionGenerator +from synapse.util.distributor import user_joined_room from twisted.internet import defer @@ -49,10 +50,6 @@ import logging logger = logging.getLogger(__name__) -def user_joined_room(distributor, user, room_id): - return distributor.fire("user_joined_room", user, room_id) - - class FederationHandler(BaseHandler): """Handles events that originated from federation. Responsible for: @@ -69,10 +66,6 @@ class FederationHandler(BaseHandler): self.hs = hs - self.distributor.observe("user_joined_room", self.user_joined_room) - - self.waiting_for_join_list = {} - self.store = hs.get_datastore() self.replication_layer = hs.get_replication_layer() self.state_handler = hs.get_state_handler() @@ -102,8 +95,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def on_receive_pdu(self, origin, pdu, state=None, - auth_chain=None): + def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it through the StateHandler. """ @@ -174,11 +166,7 @@ class FederationHandler(BaseHandler): }) seen_ids.add(e.event_id) - yield self._handle_new_events( - origin, - event_infos, - outliers=True - ) + yield self._handle_new_events(origin, event_infos) try: context, event_stream_id, max_stream_id = yield self._handle_new_event( @@ -288,7 +276,14 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def backfill(self, dest, room_id, limit, extremities=[]): """ Trigger a backfill request to `dest` for the given `room_id` + + This will attempt to get more events from the remote. This may return + be successfull and still return no events if the other side has no new + events to offer. """ + if dest == self.server_name: + raise SynapseError(400, "Can't backfill from self.") + if not extremities: extremities = yield self.store.get_oldest_events_in_room(room_id) @@ -299,6 +294,16 @@ class FederationHandler(BaseHandler): extremities=extremities, ) + # Don't bother processing events we already have. + seen_events = yield self.store.have_events_in_timeline( + set(e.event_id for e in events) + ) + + events = [e for e in events if e.event_id not in seen_events] + + if not events: + defer.returnValue([]) + event_map = {e.event_id: e for e in events} event_ids = set(e.event_id for e in events) @@ -358,6 +363,7 @@ class FederationHandler(BaseHandler): for a in auth_events.values(): if a.event_id in seen_events: continue + a.internal_metadata.outlier = True ev_infos.append({ "event": a, "auth_events": { @@ -378,20 +384,23 @@ class FederationHandler(BaseHandler): } }) + yield self._handle_new_events( + dest, ev_infos, + backfilled=True, + ) + events.sort(key=lambda e: e.depth) for event in events: if event in events_to_state: continue - ev_infos.append({ - "event": event, - }) - - yield self._handle_new_events( - dest, ev_infos, - backfilled=True, - ) + # We store these one at a time since each event depends on the + # previous to work out the state. + # TODO: We can probably do something more clever here. + yield self._handle_new_event( + dest, event + ) defer.returnValue(events) @@ -440,7 +449,7 @@ class FederationHandler(BaseHandler): joined_domains = {} for u, d in joined_users: try: - dom = UserID.from_string(u).domain + dom = get_domain_from_id(u) old_d = joined_domains.get(dom) if old_d: joined_domains[dom] = min(d, old_d) @@ -455,7 +464,7 @@ class FederationHandler(BaseHandler): likely_domains = [ domain for domain, depth in curr_domains - if domain is not self.server_name + if domain != self.server_name ] @defer.inlineCallbacks @@ -463,11 +472,15 @@ class FederationHandler(BaseHandler): # TODO: Should we try multiple of these at a time? for dom in domains: try: - events = yield self.backfill( + yield self.backfill( dom, room_id, limit=100, extremities=[e for e in extremities.keys()] ) + # If this succeeded then we probably already have the + # appropriate stuff. + # TODO: We can probably do something more intelligent here. + defer.returnValue(True) except SynapseError as e: logger.info( "Failed to backfill from %s because %s", @@ -493,8 +506,6 @@ class FederationHandler(BaseHandler): ) continue - if events: - defer.returnValue(True) defer.returnValue(False) success = yield try_backfill(likely_domains) @@ -666,9 +677,14 @@ class FederationHandler(BaseHandler): "state_key": user_id, }) - event, context = yield self._create_new_client_event( - builder=builder, - ) + try: + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event( + builder=builder, + ) + except AuthError as e: + logger.warn("Failed to create join %r because %s", event, e) + raise e self.auth.check(event, auth_events=context.current_state) @@ -724,9 +740,7 @@ class FederationHandler(BaseHandler): try: if k[0] == EventTypes.Member: if s.content["membership"] == Membership.JOIN: - destinations.add( - UserID.from_string(s.state_key).domain - ) + destinations.add(get_domain_from_id(s.state_key)) except: logger.warn( "Failed to get destination from event %s", s.event_id @@ -761,6 +775,7 @@ class FederationHandler(BaseHandler): event = pdu event.internal_metadata.outlier = True + event.internal_metadata.invite_from_remote = True event.signatures.update( compute_event_signature( @@ -788,13 +803,19 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def do_remotely_reject_invite(self, target_hosts, room_id, user_id): - origin, event = yield self._make_and_verify_event( - target_hosts, - room_id, - user_id, - "leave" - ) - signed_event = self._sign_event(event) + try: + origin, event = yield self._make_and_verify_event( + target_hosts, + room_id, + user_id, + "leave" + ) + signed_event = self._sign_event(event) + except SynapseError: + raise + except CodeMessageException as e: + logger.warn("Failed to reject invite: %s", e) + raise SynapseError(500, "Failed to reject invite") # Try the host we successfully got a response to /make_join/ # request first. @@ -804,10 +825,16 @@ class FederationHandler(BaseHandler): except ValueError: pass - yield self.replication_layer.send_leave( - target_hosts, - signed_event - ) + try: + yield self.replication_layer.send_leave( + target_hosts, + signed_event + ) + except SynapseError: + raise + except CodeMessageException as e: + logger.warn("Failed to reject invite: %s", e) + raise SynapseError(500, "Failed to reject invite") context = yield self.state_handler.compute_event_context(event) @@ -883,11 +910,16 @@ class FederationHandler(BaseHandler): "state_key": user_id, }) - event, context = yield self._create_new_client_event( + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event( builder=builder, ) - self.auth.check(event, auth_events=context.current_state) + try: + self.auth.check(event, auth_events=context.current_state) + except AuthError as e: + logger.warn("Failed to create new leave %r because %s", event, e) + raise e defer.returnValue(event) @@ -934,9 +966,7 @@ class FederationHandler(BaseHandler): try: if k[0] == EventTypes.Member: if s.content["membership"] == Membership.LEAVE: - destinations.add( - UserID.from_string(s.state_key).domain - ) + destinations.add(get_domain_from_id(s.state_key)) except: logger.warn( "Failed to get destination from event %s", s.event_id @@ -1057,21 +1087,10 @@ class FederationHandler(BaseHandler): def get_min_depth_for_context(self, context): return self.store.get_min_depth(context) - @log_function - def user_joined_room(self, user, room_id): - waiters = self.waiting_for_join_list.get( - (user.to_string(), room_id), - [] - ) - while waiters: - waiters.pop().callback(None) - @defer.inlineCallbacks @log_function - def _handle_new_event(self, origin, event, state=None, auth_events=None): - - outlier = event.internal_metadata.is_outlier() - + def _handle_new_event(self, origin, event, state=None, auth_events=None, + backfilled=False): context = yield self._prep_event( origin, event, state=state, @@ -1081,20 +1100,30 @@ class FederationHandler(BaseHandler): if not event.internal_metadata.is_outlier(): action_generator = ActionGenerator(self.hs) yield action_generator.handle_push_actions_for_event( - event, context, self + event, context ) event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, - is_new_state=not outlier, + backfilled=backfilled, + ) + + # this intentionally does not yield: we don't care about the result + # and don't need to wait for it. + preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + event_stream_id, max_stream_id ) defer.returnValue((context, event_stream_id, max_stream_id)) @defer.inlineCallbacks - def _handle_new_events(self, origin, event_infos, backfilled=False, - outliers=False): + def _handle_new_events(self, origin, event_infos, backfilled=False): + """Creates the appropriate contexts and persists events. The events + should not depend on one another, e.g. this should be used to persist + a bunch of outliers, but not a chunk of individual events that depend + on each other for state calculations. + """ contexts = yield defer.gatherResults( [ self._prep_event( @@ -1113,7 +1142,6 @@ class FederationHandler(BaseHandler): for ev_info, context in itertools.izip(event_infos, contexts) ], backfilled=backfilled, - is_new_state=(not outliers and not backfilled), ) @defer.inlineCallbacks @@ -1128,11 +1156,9 @@ class FederationHandler(BaseHandler): """ events_to_context = {} for e in itertools.chain(auth_events, state): - ctx = yield self.state_handler.compute_event_context( - e, outlier=True, - ) - events_to_context[e.event_id] = ctx e.internal_metadata.outlier = True + ctx = yield self.state_handler.compute_event_context(e) + events_to_context[e.event_id] = ctx event_map = { e.event_id: e @@ -1176,16 +1202,14 @@ class FederationHandler(BaseHandler): (e, events_to_context[e.event_id]) for e in itertools.chain(auth_events, state) ], - is_new_state=False, ) new_event_context = yield self.state_handler.compute_event_context( - event, old_state=state, outlier=False, + event, old_state=state ) event_stream_id, max_stream_id = yield self.store.persist_event( event, new_event_context, - is_new_state=True, current_state=state, ) @@ -1193,10 +1217,9 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _prep_event(self, origin, event, state=None, auth_events=None): - outlier = event.internal_metadata.is_outlier() context = yield self.state_handler.compute_event_context( - event, old_state=state, outlier=outlier, + event, old_state=state, ) if not auth_events: @@ -1482,8 +1505,9 @@ class FederationHandler(BaseHandler): try: self.auth.check(event, auth_events=auth_events) - except AuthError: - raise + except AuthError as e: + logger.warn("Failed auth resolution for %r because %s", event, e) + raise e @defer.inlineCallbacks def construct_auth_difference(self, local_auth, remote_auth): @@ -1653,13 +1677,21 @@ class FederationHandler(BaseHandler): if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - event, context = yield self._create_new_client_event(builder=builder) + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event( + builder=builder + ) event, context = yield self.add_display_name_to_third_party_invite( event_dict, event, context ) - self.auth.check(event, context.current_state) + try: + self.auth.check(event, context.current_state) + except AuthError as e: + logger.warn("Denying new third party invite %r because %s", event, e) + raise e + yield self._check_signature(event, auth_events=context.current_state) member_handler = self.hs.get_handlers().room_member_handler yield member_handler.send_membership_event(None, event, context) @@ -1676,7 +1708,8 @@ class FederationHandler(BaseHandler): def on_exchange_third_party_invite_request(self, origin, room_id, event_dict): builder = self.event_builder_factory.new(event_dict) - event, context = yield self._create_new_client_event( + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event( builder=builder, ) @@ -1684,7 +1717,11 @@ class FederationHandler(BaseHandler): event_dict, event, context ) - self.auth.check(event, auth_events=context.current_state) + try: + self.auth.check(event, auth_events=context.current_state) + except AuthError as e: + logger.warn("Denying third party invite %r because %s", event, e) + raise e yield self._check_signature(event, auth_events=context.current_state) returned_invite = yield self.send_invite(origin, event) @@ -1711,20 +1748,23 @@ class FederationHandler(BaseHandler): event_dict["content"]["third_party_invite"]["display_name"] = display_name builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - event, context = yield self._create_new_client_event(builder=builder) + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event(builder=builder) defer.returnValue((event, context)) @defer.inlineCallbacks def _check_signature(self, event, auth_events): """ Checks that the signature in the event is consistent with its invite. - :param event (Event): The m.room.member event to check - :param auth_events (dict<(event type, state_key), event>) - :raises - AuthError if signature didn't match any keys, or key has been + Args: + event (Event): The m.room.member event to check + auth_events (dict<(event type, state_key), event>): + + Raises: + AuthError: if signature didn't match any keys, or key has been revoked, - SynapseError if a transient error meant a key couldn't be checked + SynapseError: if a transient error meant a key couldn't be checked for revocation. """ signed = event.content["third_party_invite"]["signed"] @@ -1766,12 +1806,13 @@ class FederationHandler(BaseHandler): """ Checks whether public_key has been revoked. - :param public_key (str): base-64 encoded public key. - :param url (str): Key revocation URL. + Args: + public_key (str): base-64 encoded public key. + url (str): Key revocation URL. - :raises - AuthError if they key has been revoked. - SynapseError if a transient error meant a key couldn't be checked + Raises: + AuthError: if they key has been revoked. + SynapseError: if a transient error meant a key couldn't be checked for revocation. """ try: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5c50c611ba..15caf1950a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -17,12 +17,19 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError -from synapse.streams.config import PaginationConfig +from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator +from synapse.push.action_generator import ActionGenerator +from synapse.streams.config import PaginationConfig +from synapse.types import ( + UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id +) from synapse.util import unwrapFirstError +from synapse.util.async import concurrently_execute, run_on_reactor from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.types import UserID, RoomStreamToken, StreamToken +from synapse.util.logcontext import preserve_fn +from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -33,10 +40,6 @@ import logging logger = logging.getLogger(__name__) -def collect_presencelike_data(distributor, user, content): - return distributor.fire("collect_presencelike_data", user, content) - - class MessageHandler(BaseHandler): def __init__(self, hs): @@ -48,35 +51,6 @@ class MessageHandler(BaseHandler): self.snapshot_cache = SnapshotCache() @defer.inlineCallbacks - def get_message(self, msg_id=None, room_id=None, sender_id=None, - user_id=None): - """ Retrieve a message. - - Args: - msg_id (str): The message ID to obtain. - room_id (str): The room where the message resides. - sender_id (str): The user ID of the user who sent the message. - user_id (str): The user ID of the user making this request. - Returns: - The message, or None if no message exists. - Raises: - SynapseError if something went wrong. - """ - yield self.auth.check_joined_room(room_id, user_id) - - # Pull out the message from the db -# msg = yield self.store.get_message( -# room_id=room_id, -# msg_id=msg_id, -# user_id=sender_id -# ) - - # TODO (erikj): Once we work out the correct c-s api we need to think - # on how to do this. - - defer.returnValue(None) - - @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, as_client_event=True): """Get messages in a room. @@ -155,7 +129,8 @@ class MessageHandler(BaseHandler): "end": next_token.to_string(), }) - events = yield self._filter_events_for_client( + events = yield filter_events_for_client( + self.store, user_id, events, is_peeking=(member_event_id is None), @@ -175,7 +150,7 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks - def create_event(self, event_dict, token_id=None, txn_id=None): + def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None): """ Given a dict from a client, create a new event. @@ -186,6 +161,9 @@ class MessageHandler(BaseHandler): Args: event_dict (dict): An entire event + token_id (str) + txn_id (str) + prev_event_ids (list): The prev event ids to use when creating the event Returns: Tuple of created event (FrozenEvent), Context @@ -198,12 +176,8 @@ class MessageHandler(BaseHandler): membership = builder.content.get("membership", None) target = UserID.from_string(builder.state_key) - if membership == Membership.JOIN: + if membership in {Membership.JOIN, Membership.INVITE}: # If event doesn't include a display name, add one. - yield collect_presencelike_data( - self.distributor, target, builder.content - ) - elif membership == Membership.INVITE: profile = self.hs.get_handlers().profile_handler content = builder.content @@ -224,6 +198,7 @@ class MessageHandler(BaseHandler): event, context = yield self._create_new_client_event( builder=builder, + prev_event_ids=prev_event_ids, ) defer.returnValue((event, context)) @@ -261,7 +236,7 @@ class MessageHandler(BaseHandler): ) if event.type == EventTypes.Message: - presence = self.hs.get_handlers().presence_handler + presence = self.hs.get_presence_handler() yield presence.bump_presence_active_time(user) def deduplicate_state_event(self, event, context): @@ -515,8 +490,8 @@ class MessageHandler(BaseHandler): ] ).addErrback(unwrapFirstError) - messages = yield self._filter_events_for_client( - user_id, messages + messages = yield filter_events_for_client( + self.store, user_id, messages ) start_token = now_token.copy_and_replace("room_key", token[0]) @@ -556,14 +531,7 @@ class MessageHandler(BaseHandler): except: logger.exception("Failed to get snapshot") - # Only do N rooms at once - n = 5 - d_list = [handle_room(e) for e in room_list] - for i in range(0, len(d_list), n): - yield defer.gatherResults( - d_list[i:i + n], - consumeErrors=True - ).addErrback(unwrapFirstError) + yield concurrently_execute(handle_room, room_list, 10) account_data_events = [] for account_data_type, content in account_data.items(): @@ -658,8 +626,8 @@ class MessageHandler(BaseHandler): end_token=stream_token ) - messages = yield self._filter_events_for_client( - user_id, messages, is_peeking=is_peeking + messages = yield filter_events_for_client( + self.store, user_id, messages, is_peeking=is_peeking ) start_token = StreamToken.START.copy_and_replace("room_key", token[0]) @@ -706,7 +674,7 @@ class MessageHandler(BaseHandler): and m.content["membership"] == Membership.JOIN ] - presence_handler = self.hs.get_handlers().presence_handler + presence_handler = self.hs.get_presence_handler() @defer.inlineCallbacks def get_presence(): @@ -739,8 +707,8 @@ class MessageHandler(BaseHandler): consumeErrors=True, ).addErrback(unwrapFirstError) - messages = yield self._filter_events_for_client( - user_id, messages, is_peeking=is_peeking, + messages = yield filter_events_for_client( + self.store, user_id, messages, is_peeking=is_peeking, ) start_token = now_token.copy_and_replace("room_key", token[0]) @@ -763,3 +731,196 @@ class MessageHandler(BaseHandler): ret["membership"] = membership defer.returnValue(ret) + + @defer.inlineCallbacks + def _create_new_client_event(self, builder, prev_event_ids=None): + if prev_event_ids: + prev_events = yield self.store.add_event_hashes(prev_event_ids) + prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) + depth = prev_max_depth + 1 + else: + latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( + builder.room_id, + ) + + if latest_ret: + depth = max([d for _, _, d in latest_ret]) + 1 + else: + depth = 1 + + prev_events = [ + (event_id, prev_hashes) + for event_id, prev_hashes, _ in latest_ret + ] + + builder.prev_events = prev_events + builder.depth = depth + + state_handler = self.state_handler + + context = yield state_handler.compute_event_context(builder) + + if builder.is_state(): + builder.prev_state = yield self.store.add_event_hashes( + context.prev_state_events + ) + + yield self.auth.add_auth_events(builder, context) + + signing_key = self.hs.config.signing_key[0] + add_hashes_and_signatures( + builder, self.server_name, signing_key + ) + + event = builder.build() + + logger.debug( + "Created event %s with current state: %s", + event.event_id, context.current_state, + ) + + defer.returnValue( + (event, context,) + ) + + @defer.inlineCallbacks + def handle_new_client_event( + self, + requester, + event, + context, + ratelimit=True, + extra_users=[] + ): + # We now need to go and hit out to wherever we need to hit out to. + + if ratelimit: + self.ratelimit(requester) + + try: + self.auth.check(event, auth_events=context.current_state) + except AuthError as err: + logger.warn("Denying new event %r because %s", event, err) + raise err + + yield self.maybe_kick_guest_users(event, context.current_state.values()) + + if event.type == EventTypes.CanonicalAlias: + # Check the alias is acually valid (at this time at least) + room_alias_str = event.content.get("alias", None) + if room_alias_str: + room_alias = RoomAlias.from_string(room_alias_str) + directory_handler = self.hs.get_handlers().directory_handler + mapping = yield directory_handler.get_association(room_alias) + + if mapping["room_id"] != event.room_id: + raise SynapseError( + 400, + "Room alias %s does not point to the room" % ( + room_alias_str, + ) + ) + + federation_handler = self.hs.get_handlers().federation_handler + + if event.type == EventTypes.Member: + if event.content["membership"] == Membership.INVITE: + def is_inviter_member_event(e): + return ( + e.type == EventTypes.Member and + e.sender == event.sender + ) + + event.unsigned["invite_room_state"] = [ + { + "type": e.type, + "state_key": e.state_key, + "content": e.content, + "sender": e.sender, + } + for k, e in context.current_state.items() + if e.type in self.hs.config.room_invite_state_types + or is_inviter_member_event(e) + ] + + invitee = UserID.from_string(event.state_key) + if not self.hs.is_mine(invitee): + # TODO: Can we add signature from remote server in a nicer + # way? If we have been invited by a remote server, we need + # to get them to sign the event. + + returned_invite = yield federation_handler.send_invite( + invitee.domain, + event, + ) + + event.unsigned.pop("room_state", None) + + # TODO: Make sure the signatures actually are correct. + event.signatures.update( + returned_invite.signatures + ) + + if event.type == EventTypes.Redaction: + if self.auth.check_redaction(event, auth_events=context.current_state): + original_event = yield self.store.get_event( + event.redacts, + check_redacted=False, + get_prev_content=False, + allow_rejected=False, + allow_none=False + ) + if event.user_id != original_event.user_id: + raise AuthError( + 403, + "You don't have permission to redact events" + ) + + if event.type == EventTypes.Create and context.current_state: + raise AuthError( + 403, + "Changing the room create event is forbidden", + ) + + action_generator = ActionGenerator(self.hs) + yield action_generator.handle_push_actions_for_event( + event, context + ) + + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context + ) + + # this intentionally does not yield: we don't care about the result + # and don't need to wait for it. + preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + event_stream_id, max_stream_id + ) + + destinations = set() + for k, s in context.current_state.items(): + try: + if k[0] == EventTypes.Member: + if s.content["membership"] == Membership.JOIN: + destinations.add(get_domain_from_id(s.state_key)) + except SynapseError: + logger.warn( + "Failed to get destination from event %s", s.event_id + ) + + @defer.inlineCallbacks + def _notify(): + yield run_on_reactor() + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + + preserve_fn(_notify)() + + # If invite, remove room_state from unsigned before sending. + event.unsigned.pop("invite_room_state", None) + + federation_handler.handle_new_event( + event, destinations=destinations, + ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d0c8f1328b..6b70fa3817 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -33,11 +33,9 @@ from synapse.util.logcontext import preserve_fn from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer -from synapse.types import UserID +from synapse.types import UserID, get_domain_from_id import synapse.metrics -from ._base import BaseHandler - import logging @@ -52,6 +50,8 @@ timers_fired_counter = metrics.register_counter("timers_fired") federation_presence_counter = metrics.register_counter("federation_presence") bump_active_time_counter = metrics.register_counter("bump_active_time") +get_updates_counter = metrics.register_counter("get_updates", labels=["type"]) + # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them # "currently_active" @@ -70,14 +70,18 @@ FEDERATION_TIMEOUT = 30 * 60 * 1000 # How often to resend presence to remote servers FEDERATION_PING_INTERVAL = 25 * 60 * 1000 +# How long we will wait before assuming that the syncs from an external process +# are dead. +EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000 + assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER -class PresenceHandler(BaseHandler): +class PresenceHandler(object): def __init__(self, hs): - super(PresenceHandler, self).__init__(hs) - self.hs = hs + self.is_mine = hs.is_mine + self.is_mine_id = hs.is_mine_id self.clock = hs.get_clock() self.store = hs.get_datastore() self.wheel_timer = WheelTimer() @@ -138,7 +142,7 @@ class PresenceHandler(BaseHandler): obj=state.user_id, then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT, ) - if self.hs.is_mine_id(state.user_id): + if self.is_mine_id(state.user_id): self.wheel_timer.insert( now=now, obj=state.user_id, @@ -160,15 +164,26 @@ class PresenceHandler(BaseHandler): self.serial_to_user = {} self._next_serial = 1 - # Keeps track of the number of *ongoing* syncs. While this is non zero - # a user will never go offline. + # Keeps track of the number of *ongoing* syncs on this process. While + # this is non zero a user will never go offline. self.user_to_num_current_syncs = {} + # Keeps track of the number of *ongoing* syncs on other processes. + # While any sync is ongoing on another process the user will never + # go offline. + # Each process has a unique identifier and an update frequency. If + # no update is received from that process within the update period then + # we assume that all the sync requests on that process have stopped. + # Stored as a dict from process_id to set of user_id, and a dict of + # process_id to millisecond timestamp last updated. + self.external_process_to_current_syncs = {} + self.external_process_last_updated_ms = {} + # Start a LoopingCall in 30s that fires every 5s. # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. self.clock.call_later( - 0 * 1000, + 30, self.clock.looping_call, self._handle_timeouts, 5000, @@ -228,7 +243,7 @@ class PresenceHandler(BaseHandler): new_state, should_notify, should_ping = handle_update( prev_state, new_state, - is_mine=self.hs.is_mine_id(user_id), + is_mine=self.is_mine_id(user_id), wheel_timer=self.wheel_timer, now=now ) @@ -268,31 +283,48 @@ class PresenceHandler(BaseHandler): """Checks the presence of users that have timed out and updates as appropriate. """ + logger.info("Handling presence timeouts") now = self.clock.time_msec() - with Measure(self.clock, "presence_handle_timeouts"): - # Fetch the list of users that *may* have timed out. Things may have - # changed since the timeout was set, so we won't necessarily have to - # take any action. - users_to_check = self.wheel_timer.fetch(now) + try: + with Measure(self.clock, "presence_handle_timeouts"): + # Fetch the list of users that *may* have timed out. Things may have + # changed since the timeout was set, so we won't necessarily have to + # take any action. + users_to_check = set(self.wheel_timer.fetch(now)) + + # Check whether the lists of syncing processes from an external + # process have expired. + expired_process_ids = [ + process_id for process_id, last_update + in self.external_process_last_updated_ms.items() + if now - last_update > EXTERNAL_PROCESS_EXPIRY + ] + for process_id in expired_process_ids: + users_to_check.update( + self.external_process_last_updated_ms.pop(process_id, ()) + ) + self.external_process_last_update.pop(process_id) - states = [ - self.user_to_current_state.get( - user_id, UserPresenceState.default(user_id) - ) - for user_id in set(users_to_check) - ] + states = [ + self.user_to_current_state.get( + user_id, UserPresenceState.default(user_id) + ) + for user_id in users_to_check + ] - timers_fired_counter.inc_by(len(states)) + timers_fired_counter.inc_by(len(states)) - changes = handle_timeouts( - states, - is_mine_fn=self.hs.is_mine_id, - user_to_num_current_syncs=self.user_to_num_current_syncs, - now=now, - ) + changes = handle_timeouts( + states, + is_mine_fn=self.is_mine_id, + syncing_user_ids=self.get_currently_syncing_users(), + now=now, + ) - preserve_fn(self._update_states)(changes) + preserve_fn(self._update_states)(changes) + except: + logger.exception("Exception in _handle_timeouts loop") @defer.inlineCallbacks def bump_presence_active_time(self, user): @@ -365,6 +397,74 @@ class PresenceHandler(BaseHandler): defer.returnValue(_user_syncing()) + def get_currently_syncing_users(self): + """Get the set of user ids that are currently syncing on this HS. + Returns: + set(str): A set of user_id strings. + """ + syncing_user_ids = { + user_id for user_id, count in self.user_to_num_current_syncs.items() + if count + } + for user_ids in self.external_process_to_current_syncs.values(): + syncing_user_ids.update(user_ids) + return syncing_user_ids + + @defer.inlineCallbacks + def update_external_syncs(self, process_id, syncing_user_ids): + """Update the syncing users for an external process + + Args: + process_id(str): An identifier for the process the users are + syncing against. This allows synapse to process updates + as user start and stop syncing against a given process. + syncing_user_ids(set(str)): The set of user_ids that are + currently syncing on that server. + """ + + # Grab the previous list of user_ids that were syncing on that process + prev_syncing_user_ids = ( + self.external_process_to_current_syncs.get(process_id, set()) + ) + # Grab the current presence state for both the users that are syncing + # now and the users that were syncing before this update. + prev_states = yield self.current_state_for_users( + syncing_user_ids | prev_syncing_user_ids + ) + updates = [] + time_now_ms = self.clock.time_msec() + + # For each new user that is syncing check if we need to mark them as + # being online. + for new_user_id in syncing_user_ids - prev_syncing_user_ids: + prev_state = prev_states[new_user_id] + if prev_state.state == PresenceState.OFFLINE: + updates.append(prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=time_now_ms, + last_user_sync_ts=time_now_ms, + )) + else: + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + )) + + # For each user that is still syncing or stopped syncing update the + # last sync time so that we will correctly apply the grace period when + # they stop syncing. + for old_user_id in prev_syncing_user_ids: + prev_state = prev_states[old_user_id] + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + )) + + yield self._update_states(updates) + + # Update the last updated time for the process. We expire the entries + # if we don't receive an update in the given timeframe. + self.external_process_last_updated_ms[process_id] = self.clock.time_msec() + self.external_process_to_current_syncs[process_id] = syncing_user_ids + @defer.inlineCallbacks def current_state_for_user(self, user_id): """Get the current presence state for a user. @@ -427,7 +527,7 @@ class PresenceHandler(BaseHandler): hosts_to_states = {} for room_id, states in room_ids_to_states.items(): - local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states) + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) if not local_states: continue @@ -436,11 +536,11 @@ class PresenceHandler(BaseHandler): hosts_to_states.setdefault(host, []).extend(local_states) for user_id, states in users_to_states.items(): - local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states) + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) if not local_states: continue - host = UserID.from_string(user_id).domain + host = get_domain_from_id(user_id) hosts_to_states.setdefault(host, []).extend(local_states) # TODO: de-dup hosts_to_states, as a single host might have multiple @@ -611,14 +711,14 @@ class PresenceHandler(BaseHandler): # don't need to send to local clients here, as that is done as part # of the event stream/sync. # TODO: Only send to servers not already in the room. - if self.hs.is_mine(user): + if self.is_mine(user): state = yield self.current_state_for_user(user.to_string()) hosts = yield self.store.get_joined_hosts_for_room(room_id) self._push_to_remotes({host: (state,) for host in hosts}) else: user_ids = yield self.store.get_users_in_room(room_id) - user_ids = filter(self.hs.is_mine_id, user_ids) + user_ids = filter(self.is_mine_id, user_ids) states = yield self.current_state_for_users(user_ids) @@ -628,7 +728,7 @@ class PresenceHandler(BaseHandler): def get_presence_list(self, observer_user, accepted=None): """Returns the presence for all users in their presence list. """ - if not self.hs.is_mine(observer_user): + if not self.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") presence_list = yield self.store.get_presence_list( @@ -659,7 +759,7 @@ class PresenceHandler(BaseHandler): observer_user.localpart, observed_user.to_string() ) - if self.hs.is_mine(observed_user): + if self.is_mine(observed_user): yield self.invite_presence(observed_user, observer_user) else: yield self.federation.send_edu( @@ -675,11 +775,11 @@ class PresenceHandler(BaseHandler): def invite_presence(self, observed_user, observer_user): """Handles new presence invites. """ - if not self.hs.is_mine(observed_user): + if not self.is_mine(observed_user): raise SynapseError(400, "User is not hosted on this Home Server") # TODO: Don't auto accept - if self.hs.is_mine(observer_user): + if self.is_mine(observer_user): yield self.accept_presence(observed_user, observer_user) else: self.federation.send_edu( @@ -742,7 +842,7 @@ class PresenceHandler(BaseHandler): Returns: A Deferred. """ - if not self.hs.is_mine(observer_user): + if not self.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") yield self.store.del_presence_list( @@ -834,7 +934,11 @@ def _format_user_presence_state(state, now): class PresenceEventSource(object): def __init__(self, hs): - self.hs = hs + # We can't call get_presence_handler here because there's a cycle: + # + # Presence -> Notifier -> PresenceEventSource -> Presence + # + self.get_presence_handler = hs.get_presence_handler self.clock = hs.get_clock() self.store = hs.get_datastore() @@ -860,7 +964,7 @@ class PresenceEventSource(object): from_key = int(from_key) room_ids = room_ids or [] - presence = self.hs.get_handlers().presence_handler + presence = self.get_presence_handler() stream_change_cache = self.store.presence_stream_cache if not room_ids: @@ -877,13 +981,13 @@ class PresenceEventSource(object): user_ids_changed = set() changed = None - if from_key and max_token - from_key < 100: - # For small deltas, its quicker to get all changes and then - # work out if we share a room or they're in our presence list + if from_key: changed = stream_change_cache.get_all_entities_changed(from_key) - # get_all_entities_changed can return None - if changed is not None: + if changed is not None and len(changed) < 500: + # For small deltas, its quicker to get all changes and then + # work out if we share a room or they're in our presence list + get_updates_counter.inc("stream") for other_user_id in changed: if other_user_id in friends: user_ids_changed.add(other_user_id) @@ -895,6 +999,8 @@ class PresenceEventSource(object): else: # Too many possible updates. Find all users we can see and check # if any of them have changed. + get_updates_counter.inc("full") + user_ids_to_check = set() for room_id in room_ids: users = yield self.store.get_users_in_room(room_id) @@ -933,15 +1039,14 @@ class PresenceEventSource(object): return self.get_new_events(user, from_key=None, include_offline=False) -def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now): +def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now): """Checks the presence of users that have timed out and updates as appropriate. Args: user_states(list): List of UserPresenceState's to check. is_mine_fn (fn): Function that returns if a user_id is ours - user_to_num_current_syncs (dict): Mapping of user_id to number of currently - active syncs. + syncing_user_ids (set): Set of user_ids with active syncs. now (int): Current time in ms. Returns: @@ -952,21 +1057,20 @@ def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now): for state in user_states: is_mine = is_mine_fn(state.user_id) - new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now) + new_state = handle_timeout(state, is_mine, syncing_user_ids, now) if new_state: changes[state.user_id] = new_state return changes.values() -def handle_timeout(state, is_mine, user_to_num_current_syncs, now): +def handle_timeout(state, is_mine, syncing_user_ids, now): """Checks the presence of the user to see if any of the timers have elapsed Args: state (UserPresenceState) is_mine (bool): Whether the user is ours - user_to_num_current_syncs (dict): Mapping of user_id to number of currently - active syncs. + syncing_user_ids (set): Set of user_ids with active syncs. now (int): Current time in ms. Returns: @@ -1000,7 +1104,7 @@ def handle_timeout(state, is_mine, user_to_num_current_syncs, now): # If there are have been no sync for a while (and none ongoing), # set presence to offline - if not user_to_num_current_syncs.get(user_id, 0): + if user_id not in syncing_user_ids: if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT: state = state.copy_and_replace( state=PresenceState.OFFLINE, diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index b45eafbb49..e37409170d 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.types import UserID, Requester -from synapse.util import unwrapFirstError from ._base import BaseHandler @@ -27,14 +26,6 @@ import logging logger = logging.getLogger(__name__) -def changed_presencelike_data(distributor, user, state): - return distributor.fire("changed_presencelike_data", user, state) - - -def collect_presencelike_data(distributor, user, content): - return distributor.fire("collect_presencelike_data", user, content) - - class ProfileHandler(BaseHandler): def __init__(self, hs): @@ -46,17 +37,9 @@ class ProfileHandler(BaseHandler): ) distributor = hs.get_distributor() - self.distributor = distributor - - distributor.declare("collect_presencelike_data") - distributor.declare("changed_presencelike_data") distributor.observe("registered_user", self.registered_user) - distributor.observe( - "collect_presencelike_data", self.collect_presencelike_data - ) - def registered_user(self, user): return self.store.create_profile(user.localpart) @@ -105,10 +88,6 @@ class ProfileHandler(BaseHandler): target_user.localpart, new_displayname ) - yield changed_presencelike_data(self.distributor, target_user, { - "displayname": new_displayname, - }) - yield self._update_join_states(requester) @defer.inlineCallbacks @@ -152,31 +131,9 @@ class ProfileHandler(BaseHandler): target_user.localpart, new_avatar_url ) - yield changed_presencelike_data(self.distributor, target_user, { - "avatar_url": new_avatar_url, - }) - yield self._update_join_states(requester) @defer.inlineCallbacks - def collect_presencelike_data(self, user, state): - if not self.hs.is_mine(user): - defer.returnValue(None) - - (displayname, avatar_url) = yield defer.gatherResults( - [ - self.store.get_profile_displayname(user.localpart), - self.store.get_profile_avatar_url(user.localpart), - ], - consumeErrors=True - ).addErrback(unwrapFirstError) - - state["displayname"] = displayname - state["avatar_url"] = avatar_url - - defer.returnValue(None) - - @defer.inlineCallbacks def on_profile_query(self, args): user = UserID.from_string(args["user_id"]) if not self.hs.is_mine(user): diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 935c339707..e62722d78d 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -29,6 +29,8 @@ class ReceiptsHandler(BaseHandler): def __init__(self, hs): super(ReceiptsHandler, self).__init__(hs) + self.server_name = hs.config.server_name + self.store = hs.get_datastore() self.hs = hs self.federation = hs.get_replication_layer() self.federation.register_edu_handler( @@ -80,6 +82,9 @@ class ReceiptsHandler(BaseHandler): def _handle_new_receipts(self, receipts): """Takes a list of receipts, stores them and informs the notifier. """ + min_batch_id = None + max_batch_id = None + for receipt in receipts: room_id = receipt["room_id"] receipt_type = receipt["receipt_type"] @@ -97,10 +102,21 @@ class ReceiptsHandler(BaseHandler): stream_id, max_persisted_id = res - with PreserveLoggingContext(): - self.notifier.on_new_event( - "receipt_key", max_persisted_id, rooms=[room_id] - ) + if min_batch_id is None or stream_id < min_batch_id: + min_batch_id = stream_id + if max_batch_id is None or max_persisted_id > max_batch_id: + max_batch_id = max_persisted_id + + affected_room_ids = list(set([r["room_id"] for r in receipts])) + + with PreserveLoggingContext(): + self.notifier.on_new_event( + "receipt_key", max_batch_id, rooms=affected_room_ids + ) + # Note that the min here shouldn't be relied upon to be accurate. + self.hs.get_pusherpool().on_new_receipts( + min_batch_id, max_batch_id, affected_room_ids + ) defer.returnValue(True) @@ -117,12 +133,9 @@ class ReceiptsHandler(BaseHandler): event_ids = receipt["event_ids"] data = receipt["data"] - remotedomains = set() - - rm_handler = self.hs.get_handlers().room_member_handler - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=None, remotedomains=remotedomains - ) + remotedomains = yield self.store.get_joined_hosts_for_room(room_id) + remotedomains = remotedomains.copy() + remotedomains.discard(self.server_name) logger.debug("Sending receipt to: %r", remotedomains) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index f287ee247b..bbc07b045e 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -16,13 +16,14 @@ """Contains functions for registering clients.""" from twisted.internet import defer -from synapse.types import UserID +from synapse.types import UserID, Requester from synapse.api.errors import ( AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError ) from ._base import BaseHandler from synapse.util.async import run_on_reactor from synapse.http.client import CaptchaServerHttpClient +from synapse.util.distributor import registered_user import logging import urllib @@ -30,10 +31,6 @@ import urllib logger = logging.getLogger(__name__) -def registered_user(distributor, user): - return distributor.fire("registered_user", user) - - class RegistrationHandler(BaseHandler): def __init__(self, hs): @@ -361,8 +358,62 @@ class RegistrationHandler(BaseHandler): ) defer.returnValue(data) + @defer.inlineCallbacks + def get_or_create_user(self, localpart, displayname, duration_seconds): + """Creates a new user if the user does not exist, + else revokes all previous access tokens and generates a new one. + + Args: + localpart : The local part of the user ID to register. If None, + one will be randomly generated. + Returns: + A tuple of (user_id, access_token). + Raises: + RegistrationError if there was a problem registering. + """ + yield run_on_reactor() + + if localpart is None: + raise SynapseError(400, "Request must include user id") + + need_register = True + + try: + yield self.check_username(localpart) + except SynapseError as e: + if e.errcode == Codes.USER_IN_USE: + need_register = False + else: + raise + + user = UserID(localpart, self.hs.hostname) + user_id = user.to_string() + auth_handler = self.hs.get_handlers().auth_handler + token = auth_handler.generate_short_term_login_token(user_id, duration_seconds) + + if need_register: + yield self.store.register( + user_id=user_id, + token=token, + password_hash=None + ) + + yield registered_user(self.distributor, user) + else: + yield self.store.user_delete_access_tokens(user_id=user_id) + yield self.store.add_access_token_to_user(user_id=user_id, token=token) + + if displayname is not None: + logger.info("setting user display name: %s -> %s", user_id, displayname) + profile_handler = self.hs.get_handlers().profile_handler + yield profile_handler.set_displayname( + user, Requester(user, token, False), displayname + ) + + defer.returnValue((user_id, token)) + def auth_handler(self): - return self.hs.get_handlers().auth_handler + return self.hs.get_auth_handler() @defer.inlineCallbacks def guest_access_token_for(self, medium, address, inviter_user_id): diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d5c56ce0d6..9fd34588dd 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -18,19 +18,17 @@ from twisted.internet import defer from ._base import BaseHandler -from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken, Requester +from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken from synapse.api.constants import ( - EventTypes, Membership, JoinRules, RoomCreationPreset, + EventTypes, JoinRules, RoomCreationPreset, ) -from synapse.api.errors import AuthError, StoreError, SynapseError, Codes -from synapse.util import stringutils, unwrapFirstError -from synapse.util.logcontext import preserve_context_over_fn - -from signedjson.sign import verify_signed_json -from signedjson.key import decode_verify_key_bytes +from synapse.api.errors import AuthError, StoreError, SynapseError +from synapse.util import stringutils +from synapse.util.async import concurrently_execute +from synapse.util.caches.response_cache import ResponseCache +from synapse.visibility import filter_events_for_client from collections import OrderedDict -from unpaddedbase64 import decode_base64 import logging import math @@ -38,21 +36,9 @@ import string logger = logging.getLogger(__name__) -id_server_scheme = "https://" - - -def user_left_room(distributor, user, room_id): - return preserve_context_over_fn( - distributor.fire, - "user_left_room", user=user, room_id=room_id - ) +REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000 - -def user_joined_room(distributor, user, room_id): - return preserve_context_over_fn( - distributor.fire, - "user_joined_room", user=user, room_id=room_id - ) +id_server_scheme = "https://" class RoomCreationHandler(BaseHandler): @@ -356,598 +342,31 @@ class RoomCreationHandler(BaseHandler): ) -class RoomMemberHandler(BaseHandler): - # TODO(paul): This handler currently contains a messy conflation of - # low-level API that works on UserID objects and so on, and REST-level - # API that takes ID strings and returns pagination chunks. These concerns - # ought to be separated out a lot better. - +class RoomListHandler(BaseHandler): def __init__(self, hs): - super(RoomMemberHandler, self).__init__(hs) - - self.clock = hs.get_clock() - - self.distributor = hs.get_distributor() - self.distributor.declare("user_joined_room") - self.distributor.declare("user_left_room") - - @defer.inlineCallbacks - def get_room_members(self, room_id): - users = yield self.store.get_users_in_room(room_id) - - defer.returnValue([UserID.from_string(u) for u in users]) - - @defer.inlineCallbacks - def fetch_room_distributions_into(self, room_id, localusers=None, - remotedomains=None, ignore_user=None): - """Fetch the distribution of a room, adding elements to either - 'localusers' or 'remotedomains', which should be a set() if supplied. - If ignore_user is set, ignore that user. - - This function returns nothing; its result is performed by the - side-effect on the two passed sets. This allows easy accumulation of - member lists of multiple rooms at once if required. - """ - members = yield self.get_room_members(room_id) - for member in members: - if ignore_user is not None and member == ignore_user: - continue - - if self.hs.is_mine(member): - if localusers is not None: - localusers.add(member) - else: - if remotedomains is not None: - remotedomains.add(member.domain) - - @defer.inlineCallbacks - def update_membership( - self, - requester, - target, - room_id, - action, - txn_id=None, - remote_room_hosts=None, - third_party_signed=None, - ratelimit=True, - ): - effective_membership_state = action - if action in ["kick", "unban"]: - effective_membership_state = "leave" - elif action == "forget": - effective_membership_state = "leave" - - if third_party_signed is not None: - replication = self.hs.get_replication_layer() - yield replication.exchange_third_party_invite( - third_party_signed["sender"], - target.to_string(), - room_id, - third_party_signed, - ) - - msg_handler = self.hs.get_handlers().message_handler - - content = {"membership": effective_membership_state} - if requester.is_guest: - content["kind"] = "guest" - - event, context = yield msg_handler.create_event( - { - "type": EventTypes.Member, - "content": content, - "room_id": room_id, - "sender": requester.user.to_string(), - "state_key": target.to_string(), - - # For backwards compatibility: - "membership": effective_membership_state, - }, - token_id=requester.access_token_id, - txn_id=txn_id, - ) - - old_state = context.current_state.get((EventTypes.Member, event.state_key)) - old_membership = old_state.content.get("membership") if old_state else None - if action == "unban" and old_membership != "ban": - raise SynapseError( - 403, - "Cannot unban user who was not banned (membership=%s)" % old_membership, - errcode=Codes.BAD_STATE - ) - if old_membership == "ban" and action != "unban": - raise SynapseError( - 403, - "Cannot %s user who was is banned" % (action,), - errcode=Codes.BAD_STATE - ) - - member_handler = self.hs.get_handlers().room_member_handler - yield member_handler.send_membership_event( - requester, - event, - context, - ratelimit=ratelimit, - remote_room_hosts=remote_room_hosts, - ) - - if action == "forget": - yield self.forget(requester.user, room_id) - - @defer.inlineCallbacks - def send_membership_event( - self, - requester, - event, - context, - remote_room_hosts=None, - ratelimit=True, - ): - """ - Change the membership status of a user in a room. - - Args: - requester (Requester): The local user who requested the membership - event. If None, certain checks, like whether this homeserver can - act as the sender, will be skipped. - event (SynapseEvent): The membership event. - context: The context of the event. - is_guest (bool): Whether the sender is a guest. - room_hosts ([str]): Homeservers which are likely to already be in - the room, and could be danced with in order to join this - homeserver for the first time. - ratelimit (bool): Whether to rate limit this request. - Raises: - SynapseError if there was a problem changing the membership. - """ - remote_room_hosts = remote_room_hosts or [] - - target_user = UserID.from_string(event.state_key) - room_id = event.room_id - - if requester is not None: - sender = UserID.from_string(event.sender) - assert sender == requester.user, ( - "Sender (%s) must be same as requester (%s)" % - (sender, requester.user) - ) - assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,) - else: - requester = Requester(target_user, None, False) - - message_handler = self.hs.get_handlers().message_handler - prev_event = message_handler.deduplicate_state_event(event, context) - if prev_event is not None: - return - - action = "send" - - if event.membership == Membership.JOIN: - if requester.is_guest and not self._can_guest_join(context.current_state): - # This should be an auth check, but guests are a local concept, - # so don't really fit into the general auth process. - raise AuthError(403, "Guest access not allowed") - do_remote_join_dance, remote_room_hosts = self._should_do_dance( - context, - (self.get_inviter(event.state_key, context.current_state)), - remote_room_hosts, - ) - if do_remote_join_dance: - action = "remote_join" - elif event.membership == Membership.LEAVE: - is_host_in_room = self.is_host_in_room(context.current_state) - - if not is_host_in_room: - # perhaps we've been invited - inviter = self.get_inviter(target_user.to_string(), context.current_state) - if not inviter: - raise SynapseError(404, "Not a known room") - - if self.hs.is_mine(inviter): - # the inviter was on our server, but has now left. Carry on - # with the normal rejection codepath. - # - # This is a bit of a hack, because the room might still be - # active on other servers. - pass - else: - # send the rejection to the inviter's HS. - remote_room_hosts = remote_room_hosts + [inviter.domain] - action = "remote_reject" - - federation_handler = self.hs.get_handlers().federation_handler - - if action == "remote_join": - if len(remote_room_hosts) == 0: - raise SynapseError(404, "No known servers") - - # We don't do an auth check if we are doing an invite - # join dance for now, since we're kinda implicitly checking - # that we are allowed to join when we decide whether or not we - # need to do the invite/join dance. - yield federation_handler.do_invite_join( - remote_room_hosts, - event.room_id, - event.user_id, - event.content, - ) - elif action == "remote_reject": - yield federation_handler.do_remotely_reject_invite( - remote_room_hosts, - room_id, - event.user_id - ) - else: - yield self.handle_new_client_event( - requester, - event, - context, - extra_users=[target_user], - ratelimit=ratelimit, - ) - - prev_member_event = context.current_state.get( - (EventTypes.Member, target_user.to_string()), - None - ) - - if event.membership == Membership.JOIN: - if not prev_member_event or prev_member_event.membership != Membership.JOIN: - # Only fire user_joined_room if the user has acutally joined the - # room. Don't bother if the user is just changing their profile - # info. - yield user_joined_room(self.distributor, target_user, room_id) - elif event.membership == Membership.LEAVE: - if prev_member_event and prev_member_event.membership == Membership.JOIN: - user_left_room(self.distributor, target_user, room_id) - - def _can_guest_join(self, current_state): - """ - Returns whether a guest can join a room based on its current state. - """ - guest_access = current_state.get((EventTypes.GuestAccess, ""), None) - return ( - guest_access - and guest_access.content - and "guest_access" in guest_access.content - and guest_access.content["guest_access"] == "can_join" - ) - - def _should_do_dance(self, context, inviter, room_hosts=None): - # TODO: Shouldn't this be remote_room_host? - room_hosts = room_hosts or [] - - is_host_in_room = self.is_host_in_room(context.current_state) - if is_host_in_room: - return False, room_hosts - - if inviter and not self.hs.is_mine(inviter): - room_hosts.append(inviter.domain) - - return True, room_hosts - - @defer.inlineCallbacks - def lookup_room_alias(self, room_alias): - """ - Get the room ID associated with a room alias. - - Args: - room_alias (RoomAlias): The alias to look up. - Returns: - A tuple of: - The room ID as a RoomID object. - Hosts likely to be participating in the room ([str]). - Raises: - SynapseError if room alias could not be found. - """ - directory_handler = self.hs.get_handlers().directory_handler - mapping = yield directory_handler.get_association(room_alias) - - if not mapping: - raise SynapseError(404, "No such room alias") - - room_id = mapping["room_id"] - servers = mapping["servers"] - - defer.returnValue((RoomID.from_string(room_id), servers)) - - def get_inviter(self, user_id, current_state): - prev_state = current_state.get((EventTypes.Member, user_id)) - if prev_state and prev_state.membership == Membership.INVITE: - return UserID.from_string(prev_state.user_id) - return None - - @defer.inlineCallbacks - def get_joined_rooms_for_user(self, user): - """Returns a list of roomids that the user has any of the given - membership states in.""" - - rooms = yield self.store.get_rooms_for_user( - user.to_string(), - ) - - # For some reason the list of events contains duplicates - # TODO(paul): work out why because I really don't think it should - room_ids = set(r.room_id for r in rooms) - - defer.returnValue(room_ids) - - @defer.inlineCallbacks - def do_3pid_invite( - self, - room_id, - inviter, - medium, - address, - id_server, - requester, - txn_id - ): - invitee = yield self._lookup_3pid( - id_server, medium, address - ) - - if invitee: - handler = self.hs.get_handlers().room_member_handler - yield handler.update_membership( - requester, - UserID.from_string(invitee), - room_id, - "invite", - txn_id=txn_id, - ) - else: - yield self._make_and_store_3pid_invite( - requester, - id_server, - medium, - address, - room_id, - inviter, - txn_id=txn_id - ) - - @defer.inlineCallbacks - def _lookup_3pid(self, id_server, medium, address): - """Looks up a 3pid in the passed identity server. - - Args: - id_server (str): The server name (including port, if required) - of the identity server to use. - medium (str): The type of the third party identifier (e.g. "email"). - address (str): The third party identifier (e.g. "foo@example.com"). - - Returns: - (str) the matrix ID of the 3pid, or None if it is not recognized. - """ - try: - data = yield self.hs.get_simple_http_client().get_json( - "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,), - { - "medium": medium, - "address": address, - } - ) - - if "mxid" in data: - if "signatures" not in data: - raise AuthError(401, "No signatures on 3pid binding") - self.verify_any_signature(data, id_server) - defer.returnValue(data["mxid"]) - - except IOError as e: - logger.warn("Error from identity server lookup: %s" % (e,)) - defer.returnValue(None) - - @defer.inlineCallbacks - def verify_any_signature(self, data, server_hostname): - if server_hostname not in data["signatures"]: - raise AuthError(401, "No signature from server %s" % (server_hostname,)) - for key_name, signature in data["signatures"][server_hostname].items(): - key_data = yield self.hs.get_simple_http_client().get_json( - "%s%s/_matrix/identity/api/v1/pubkey/%s" % - (id_server_scheme, server_hostname, key_name,), - ) - if "public_key" not in key_data: - raise AuthError(401, "No public key named %s from %s" % - (key_name, server_hostname,)) - verify_signed_json( - data, - server_hostname, - decode_verify_key_bytes(key_name, decode_base64(key_data["public_key"])) - ) - return - - @defer.inlineCallbacks - def _make_and_store_3pid_invite( - self, - requester, - id_server, - medium, - address, - room_id, - user, - txn_id - ): - room_state = yield self.hs.get_state_handler().get_current_state(room_id) - - inviter_display_name = "" - inviter_avatar_url = "" - member_event = room_state.get((EventTypes.Member, user.to_string())) - if member_event: - inviter_display_name = member_event.content.get("displayname", "") - inviter_avatar_url = member_event.content.get("avatar_url", "") - - canonical_room_alias = "" - canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, "")) - if canonical_alias_event: - canonical_room_alias = canonical_alias_event.content.get("alias", "") - - room_name = "" - room_name_event = room_state.get((EventTypes.Name, "")) - if room_name_event: - room_name = room_name_event.content.get("name", "") - - room_join_rules = "" - join_rules_event = room_state.get((EventTypes.JoinRules, "")) - if join_rules_event: - room_join_rules = join_rules_event.content.get("join_rule", "") - - room_avatar_url = "" - room_avatar_event = room_state.get((EventTypes.RoomAvatar, "")) - if room_avatar_event: - room_avatar_url = room_avatar_event.content.get("url", "") - - token, public_keys, fallback_public_key, display_name = ( - yield self._ask_id_server_for_third_party_invite( - id_server=id_server, - medium=medium, - address=address, - room_id=room_id, - inviter_user_id=user.to_string(), - room_alias=canonical_room_alias, - room_avatar_url=room_avatar_url, - room_join_rules=room_join_rules, - room_name=room_name, - inviter_display_name=inviter_display_name, - inviter_avatar_url=inviter_avatar_url - ) - ) - - msg_handler = self.hs.get_handlers().message_handler - yield msg_handler.create_and_send_nonmember_event( - requester, - { - "type": EventTypes.ThirdPartyInvite, - "content": { - "display_name": display_name, - "public_keys": public_keys, - - # For backwards compatibility: - "key_validity_url": fallback_public_key["key_validity_url"], - "public_key": fallback_public_key["public_key"], - }, - "room_id": room_id, - "sender": user.to_string(), - "state_key": token, - }, - txn_id=txn_id, - ) - - @defer.inlineCallbacks - def _ask_id_server_for_third_party_invite( - self, - id_server, - medium, - address, - room_id, - inviter_user_id, - room_alias, - room_avatar_url, - room_join_rules, - room_name, - inviter_display_name, - inviter_avatar_url - ): - """ - Asks an identity server for a third party invite. - - :param id_server (str): hostname + optional port for the identity server. - :param medium (str): The literal string "email". - :param address (str): The third party address being invited. - :param room_id (str): The ID of the room to which the user is invited. - :param inviter_user_id (str): The user ID of the inviter. - :param room_alias (str): An alias for the room, for cosmetic - notifications. - :param room_avatar_url (str): The URL of the room's avatar, for cosmetic - notifications. - :param room_join_rules (str): The join rules of the email - (e.g. "public"). - :param room_name (str): The m.room.name of the room. - :param inviter_display_name (str): The current display name of the - inviter. - :param inviter_avatar_url (str): The URL of the inviter's avatar. - - :return: A deferred tuple containing: - token (str): The token which must be signed to prove authenticity. - public_keys ([{"public_key": str, "key_validity_url": str}]): - public_key is a base64-encoded ed25519 public key. - fallback_public_key: One element from public_keys. - display_name (str): A user-friendly name to represent the invited - user. - """ - - is_url = "%s%s/_matrix/identity/api/v1/store-invite" % ( - id_server_scheme, id_server, + super(RoomListHandler, self).__init__(hs) + self.response_cache = ResponseCache() + self.remote_list_request_cache = ResponseCache() + self.remote_list_cache = {} + self.fetch_looping_call = hs.get_clock().looping_call( + self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL ) + self.fetch_all_remote_lists() - invite_config = { - "medium": medium, - "address": address, - "room_id": room_id, - "room_alias": room_alias, - "room_avatar_url": room_avatar_url, - "room_join_rules": room_join_rules, - "room_name": room_name, - "sender": inviter_user_id, - "sender_display_name": inviter_display_name, - "sender_avatar_url": inviter_avatar_url, - } - - if self.hs.config.invite_3pid_guest: - registration_handler = self.hs.get_handlers().registration_handler - guest_access_token = yield registration_handler.guest_access_token_for( - medium=medium, - address=address, - inviter_user_id=inviter_user_id, - ) - - guest_user_info = yield self.hs.get_auth().get_user_by_access_token( - guest_access_token - ) - - invite_config.update({ - "guest_access_token": guest_access_token, - "guest_user_id": guest_user_info["user"].to_string(), - }) - - data = yield self.hs.get_simple_http_client().post_urlencoded_get_json( - is_url, - invite_config - ) - # TODO: Check for success - token = data["token"] - public_keys = data.get("public_keys", []) - if "public_key" in data: - fallback_public_key = { - "public_key": data["public_key"], - "key_validity_url": "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % ( - id_server_scheme, id_server, - ), - } - else: - fallback_public_key = public_keys[0] - - if not public_keys: - public_keys.append(fallback_public_key) - display_name = data["display_name"] - defer.returnValue((token, public_keys, fallback_public_key, display_name)) - - def forget(self, user, room_id): - return self.store.forget(user.to_string(), room_id) - - -class RoomListHandler(BaseHandler): + def get_local_public_room_list(self): + result = self.response_cache.get(()) + if not result: + result = self.response_cache.set((), self._get_public_room_list()) + return result @defer.inlineCallbacks - def get_public_room_list(self): + def _get_public_room_list(self): room_ids = yield self.store.get_public_room_ids() + results = [] + @defer.inlineCallbacks def handle_room(room_id): - aliases = yield self.store.get_aliases_for_room(room_id) - # We pull each bit of state out indvidually to avoid pulling the # full state into memory. Due to how the caching works this should # be fairly quick, even if not originally in the cache. @@ -962,6 +381,14 @@ class RoomListHandler(BaseHandler): defer.returnValue(None) result = {"room_id": room_id} + + joined_users = yield self.store.get_users_in_room(room_id) + if len(joined_users) == 0: + return + + result["num_joined_members"] = len(joined_users) + + aliases = yield self.store.get_aliases_for_room(room_id) if aliases: result["aliases"] = aliases @@ -1001,21 +428,61 @@ class RoomListHandler(BaseHandler): if avatar_url: result["avatar_url"] = avatar_url - joined_users = yield self.store.get_users_in_room(room_id) - result["num_joined_members"] = len(joined_users) - - defer.returnValue(result) + results.append(result) - result = [] - for chunk in (room_ids[i:i + 10] for i in xrange(0, len(room_ids), 10)): - chunk_result = yield defer.gatherResults([ - handle_room(room_id) - for room_id in chunk - ], consumeErrors=True).addErrback(unwrapFirstError) - result.extend(v for v in chunk_result if v) + yield concurrently_execute(handle_room, room_ids, 10) # FIXME (erikj): START is no longer a valid value - defer.returnValue({"start": "START", "end": "END", "chunk": result}) + defer.returnValue({"start": "START", "end": "END", "chunk": results}) + + @defer.inlineCallbacks + def fetch_all_remote_lists(self): + deferred = self.hs.get_replication_layer().get_public_rooms( + self.hs.config.secondary_directory_servers + ) + self.remote_list_request_cache.set((), deferred) + self.remote_list_cache = yield deferred + + @defer.inlineCallbacks + def get_aggregated_public_room_list(self): + """ + Get the public room list from this server and the servers + specified in the secondary_directory_servers config option. + XXX: Pagination... + """ + # We return the results from out cache which is updated by a looping call, + # unless we're missing a cache entry, in which case wait for the result + # of the fetch if there's one in progress. If not, omit that server. + wait = False + for s in self.hs.config.secondary_directory_servers: + if s not in self.remote_list_cache: + logger.warn("No cached room list from %s: waiting for fetch", s) + wait = True + break + + if wait and self.remote_list_request_cache.get(()): + yield self.remote_list_request_cache.get(()) + + public_rooms = yield self.get_local_public_room_list() + + # keep track of which room IDs we've seen so we can de-dup + room_ids = set() + + # tag all the ones in our list with our server name. + # Also add the them to the de-deping set + for room in public_rooms['chunk']: + room["server_name"] = self.hs.hostname + room_ids.add(room["room_id"]) + + # Now add the results from federation + for server_name, server_result in self.remote_list_cache.items(): + for room in server_result["chunk"]: + if room["room_id"] not in room_ids: + room["server_name"] = server_name + public_rooms["chunk"].append(room) + room_ids.add(room["room_id"]) + + defer.returnValue(public_rooms) class RoomContextHandler(BaseHandler): @@ -1040,10 +507,12 @@ class RoomContextHandler(BaseHandler): now_token = yield self.hs.get_event_sources().get_current_token() def filter_evts(events): - return self._filter_events_for_client( + return filter_events_for_client( + self.store, user.to_string(), events, - is_peeking=is_guest) + is_peeking=is_guest + ) event = yield self.store.get_event(event_id, get_prev_content=True, allow_none=True) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py new file mode 100644 index 0000000000..7e616f44fd --- /dev/null +++ b/synapse/handlers/room_member.py @@ -0,0 +1,677 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from twisted.internet import defer + +from ._base import BaseHandler + +from synapse.types import UserID, RoomID, Requester +from synapse.api.constants import ( + EventTypes, Membership, +) +from synapse.api.errors import AuthError, SynapseError, Codes +from synapse.util.async import Linearizer +from synapse.util.distributor import user_left_room, user_joined_room + +from signedjson.sign import verify_signed_json +from signedjson.key import decode_verify_key_bytes + +from unpaddedbase64 import decode_base64 + +import logging + +logger = logging.getLogger(__name__) + +id_server_scheme = "https://" + + +class RoomMemberHandler(BaseHandler): + # TODO(paul): This handler currently contains a messy conflation of + # low-level API that works on UserID objects and so on, and REST-level + # API that takes ID strings and returns pagination chunks. These concerns + # ought to be separated out a lot better. + + def __init__(self, hs): + super(RoomMemberHandler, self).__init__(hs) + + self.member_linearizer = Linearizer() + + self.clock = hs.get_clock() + + self.distributor = hs.get_distributor() + self.distributor.declare("user_joined_room") + self.distributor.declare("user_left_room") + + @defer.inlineCallbacks + def _local_membership_update( + self, requester, target, room_id, membership, + prev_event_ids, + txn_id=None, + ratelimit=True, + ): + msg_handler = self.hs.get_handlers().message_handler + + content = {"membership": membership} + if requester.is_guest: + content["kind"] = "guest" + + event, context = yield msg_handler.create_event( + { + "type": EventTypes.Member, + "content": content, + "room_id": room_id, + "sender": requester.user.to_string(), + "state_key": target.to_string(), + + # For backwards compatibility: + "membership": membership, + }, + token_id=requester.access_token_id, + txn_id=txn_id, + prev_event_ids=prev_event_ids, + ) + + yield msg_handler.handle_new_client_event( + requester, + event, + context, + extra_users=[target], + ratelimit=ratelimit, + ) + + prev_member_event = context.current_state.get( + (EventTypes.Member, target.to_string()), + None + ) + + if event.membership == Membership.JOIN: + if not prev_member_event or prev_member_event.membership != Membership.JOIN: + # Only fire user_joined_room if the user has acutally joined the + # room. Don't bother if the user is just changing their profile + # info. + yield user_joined_room(self.distributor, target, room_id) + elif event.membership == Membership.LEAVE: + if prev_member_event and prev_member_event.membership == Membership.JOIN: + user_left_room(self.distributor, target, room_id) + + @defer.inlineCallbacks + def remote_join(self, remote_room_hosts, room_id, user, content): + if len(remote_room_hosts) == 0: + raise SynapseError(404, "No known servers") + + # We don't do an auth check if we are doing an invite + # join dance for now, since we're kinda implicitly checking + # that we are allowed to join when we decide whether or not we + # need to do the invite/join dance. + yield self.hs.get_handlers().federation_handler.do_invite_join( + remote_room_hosts, + room_id, + user.to_string(), + content, + ) + yield user_joined_room(self.distributor, user, room_id) + + def reject_remote_invite(self, user_id, room_id, remote_room_hosts): + return self.hs.get_handlers().federation_handler.do_remotely_reject_invite( + remote_room_hosts, + room_id, + user_id + ) + + @defer.inlineCallbacks + def update_membership( + self, + requester, + target, + room_id, + action, + txn_id=None, + remote_room_hosts=None, + third_party_signed=None, + ratelimit=True, + ): + key = (target, room_id,) + + with (yield self.member_linearizer.queue(key)): + result = yield self._update_membership( + requester, + target, + room_id, + action, + txn_id=txn_id, + remote_room_hosts=remote_room_hosts, + third_party_signed=third_party_signed, + ratelimit=ratelimit, + ) + + defer.returnValue(result) + + @defer.inlineCallbacks + def _update_membership( + self, + requester, + target, + room_id, + action, + txn_id=None, + remote_room_hosts=None, + third_party_signed=None, + ratelimit=True, + ): + effective_membership_state = action + if action in ["kick", "unban"]: + effective_membership_state = "leave" + + if third_party_signed is not None: + replication = self.hs.get_replication_layer() + yield replication.exchange_third_party_invite( + third_party_signed["sender"], + target.to_string(), + room_id, + third_party_signed, + ) + + if not remote_room_hosts: + remote_room_hosts = [] + + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + current_state = yield self.state_handler.get_current_state( + room_id, latest_event_ids=latest_event_ids, + ) + + old_state = current_state.get((EventTypes.Member, target.to_string())) + old_membership = old_state.content.get("membership") if old_state else None + if action == "unban" and old_membership != "ban": + raise SynapseError( + 403, + "Cannot unban user who was not banned (membership=%s)" % old_membership, + errcode=Codes.BAD_STATE + ) + if old_membership == "ban" and action != "unban": + raise SynapseError( + 403, + "Cannot %s user who was banned" % (action,), + errcode=Codes.BAD_STATE + ) + + is_host_in_room = self.is_host_in_room(current_state) + + if effective_membership_state == Membership.JOIN: + if requester.is_guest and not self._can_guest_join(current_state): + # This should be an auth check, but guests are a local concept, + # so don't really fit into the general auth process. + raise AuthError(403, "Guest access not allowed") + + if not is_host_in_room: + inviter = yield self.get_inviter(target.to_string(), room_id) + if inviter and not self.hs.is_mine(inviter): + remote_room_hosts.append(inviter.domain) + + content = {"membership": Membership.JOIN} + + profile = self.hs.get_handlers().profile_handler + content["displayname"] = yield profile.get_displayname(target) + content["avatar_url"] = yield profile.get_avatar_url(target) + + if requester.is_guest: + content["kind"] = "guest" + + ret = yield self.remote_join( + remote_room_hosts, room_id, target, content + ) + defer.returnValue(ret) + + elif effective_membership_state == Membership.LEAVE: + if not is_host_in_room: + # perhaps we've been invited + inviter = yield self.get_inviter(target.to_string(), room_id) + if not inviter: + raise SynapseError(404, "Not a known room") + + if self.hs.is_mine(inviter): + # the inviter was on our server, but has now left. Carry on + # with the normal rejection codepath. + # + # This is a bit of a hack, because the room might still be + # active on other servers. + pass + else: + # send the rejection to the inviter's HS. + remote_room_hosts = remote_room_hosts + [inviter.domain] + + try: + ret = yield self.reject_remote_invite( + target.to_string(), room_id, remote_room_hosts + ) + defer.returnValue(ret) + except SynapseError as e: + logger.warn("Failed to reject invite: %s", e) + + yield self.store.locally_reject_invite( + target.to_string(), room_id + ) + + defer.returnValue({}) + + yield self._local_membership_update( + requester=requester, + target=target, + room_id=room_id, + membership=effective_membership_state, + txn_id=txn_id, + ratelimit=ratelimit, + prev_event_ids=latest_event_ids, + ) + + @defer.inlineCallbacks + def send_membership_event( + self, + requester, + event, + context, + remote_room_hosts=None, + ratelimit=True, + ): + """ + Change the membership status of a user in a room. + + Args: + requester (Requester): The local user who requested the membership + event. If None, certain checks, like whether this homeserver can + act as the sender, will be skipped. + event (SynapseEvent): The membership event. + context: The context of the event. + is_guest (bool): Whether the sender is a guest. + room_hosts ([str]): Homeservers which are likely to already be in + the room, and could be danced with in order to join this + homeserver for the first time. + ratelimit (bool): Whether to rate limit this request. + Raises: + SynapseError if there was a problem changing the membership. + """ + remote_room_hosts = remote_room_hosts or [] + + target_user = UserID.from_string(event.state_key) + room_id = event.room_id + + if requester is not None: + sender = UserID.from_string(event.sender) + assert sender == requester.user, ( + "Sender (%s) must be same as requester (%s)" % + (sender, requester.user) + ) + assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,) + else: + requester = Requester(target_user, None, False) + + message_handler = self.hs.get_handlers().message_handler + prev_event = message_handler.deduplicate_state_event(event, context) + if prev_event is not None: + return + + if event.membership == Membership.JOIN: + if requester.is_guest and not self._can_guest_join(context.current_state): + # This should be an auth check, but guests are a local concept, + # so don't really fit into the general auth process. + raise AuthError(403, "Guest access not allowed") + + yield message_handler.handle_new_client_event( + requester, + event, + context, + extra_users=[target_user], + ratelimit=ratelimit, + ) + + prev_member_event = context.current_state.get( + (EventTypes.Member, target_user.to_string()), + None + ) + + if event.membership == Membership.JOIN: + if not prev_member_event or prev_member_event.membership != Membership.JOIN: + # Only fire user_joined_room if the user has acutally joined the + # room. Don't bother if the user is just changing their profile + # info. + yield user_joined_room(self.distributor, target_user, room_id) + elif event.membership == Membership.LEAVE: + if prev_member_event and prev_member_event.membership == Membership.JOIN: + user_left_room(self.distributor, target_user, room_id) + + def _can_guest_join(self, current_state): + """ + Returns whether a guest can join a room based on its current state. + """ + guest_access = current_state.get((EventTypes.GuestAccess, ""), None) + return ( + guest_access + and guest_access.content + and "guest_access" in guest_access.content + and guest_access.content["guest_access"] == "can_join" + ) + + @defer.inlineCallbacks + def lookup_room_alias(self, room_alias): + """ + Get the room ID associated with a room alias. + + Args: + room_alias (RoomAlias): The alias to look up. + Returns: + A tuple of: + The room ID as a RoomID object. + Hosts likely to be participating in the room ([str]). + Raises: + SynapseError if room alias could not be found. + """ + directory_handler = self.hs.get_handlers().directory_handler + mapping = yield directory_handler.get_association(room_alias) + + if not mapping: + raise SynapseError(404, "No such room alias") + + room_id = mapping["room_id"] + servers = mapping["servers"] + + defer.returnValue((RoomID.from_string(room_id), servers)) + + @defer.inlineCallbacks + def get_inviter(self, user_id, room_id): + invite = yield self.store.get_invite_for_user_in_room( + user_id=user_id, + room_id=room_id, + ) + if invite: + defer.returnValue(UserID.from_string(invite.sender)) + + @defer.inlineCallbacks + def do_3pid_invite( + self, + room_id, + inviter, + medium, + address, + id_server, + requester, + txn_id + ): + invitee = yield self._lookup_3pid( + id_server, medium, address + ) + + if invitee: + yield self.update_membership( + requester, + UserID.from_string(invitee), + room_id, + "invite", + txn_id=txn_id, + ) + else: + yield self._make_and_store_3pid_invite( + requester, + id_server, + medium, + address, + room_id, + inviter, + txn_id=txn_id + ) + + @defer.inlineCallbacks + def _lookup_3pid(self, id_server, medium, address): + """Looks up a 3pid in the passed identity server. + + Args: + id_server (str): The server name (including port, if required) + of the identity server to use. + medium (str): The type of the third party identifier (e.g. "email"). + address (str): The third party identifier (e.g. "foo@example.com"). + + Returns: + str: the matrix ID of the 3pid, or None if it is not recognized. + """ + try: + data = yield self.hs.get_simple_http_client().get_json( + "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,), + { + "medium": medium, + "address": address, + } + ) + + if "mxid" in data: + if "signatures" not in data: + raise AuthError(401, "No signatures on 3pid binding") + self.verify_any_signature(data, id_server) + defer.returnValue(data["mxid"]) + + except IOError as e: + logger.warn("Error from identity server lookup: %s" % (e,)) + defer.returnValue(None) + + @defer.inlineCallbacks + def verify_any_signature(self, data, server_hostname): + if server_hostname not in data["signatures"]: + raise AuthError(401, "No signature from server %s" % (server_hostname,)) + for key_name, signature in data["signatures"][server_hostname].items(): + key_data = yield self.hs.get_simple_http_client().get_json( + "%s%s/_matrix/identity/api/v1/pubkey/%s" % + (id_server_scheme, server_hostname, key_name,), + ) + if "public_key" not in key_data: + raise AuthError(401, "No public key named %s from %s" % + (key_name, server_hostname,)) + verify_signed_json( + data, + server_hostname, + decode_verify_key_bytes(key_name, decode_base64(key_data["public_key"])) + ) + return + + @defer.inlineCallbacks + def _make_and_store_3pid_invite( + self, + requester, + id_server, + medium, + address, + room_id, + user, + txn_id + ): + room_state = yield self.hs.get_state_handler().get_current_state(room_id) + + inviter_display_name = "" + inviter_avatar_url = "" + member_event = room_state.get((EventTypes.Member, user.to_string())) + if member_event: + inviter_display_name = member_event.content.get("displayname", "") + inviter_avatar_url = member_event.content.get("avatar_url", "") + + canonical_room_alias = "" + canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, "")) + if canonical_alias_event: + canonical_room_alias = canonical_alias_event.content.get("alias", "") + + room_name = "" + room_name_event = room_state.get((EventTypes.Name, "")) + if room_name_event: + room_name = room_name_event.content.get("name", "") + + room_join_rules = "" + join_rules_event = room_state.get((EventTypes.JoinRules, "")) + if join_rules_event: + room_join_rules = join_rules_event.content.get("join_rule", "") + + room_avatar_url = "" + room_avatar_event = room_state.get((EventTypes.RoomAvatar, "")) + if room_avatar_event: + room_avatar_url = room_avatar_event.content.get("url", "") + + token, public_keys, fallback_public_key, display_name = ( + yield self._ask_id_server_for_third_party_invite( + id_server=id_server, + medium=medium, + address=address, + room_id=room_id, + inviter_user_id=user.to_string(), + room_alias=canonical_room_alias, + room_avatar_url=room_avatar_url, + room_join_rules=room_join_rules, + room_name=room_name, + inviter_display_name=inviter_display_name, + inviter_avatar_url=inviter_avatar_url + ) + ) + + msg_handler = self.hs.get_handlers().message_handler + yield msg_handler.create_and_send_nonmember_event( + requester, + { + "type": EventTypes.ThirdPartyInvite, + "content": { + "display_name": display_name, + "public_keys": public_keys, + + # For backwards compatibility: + "key_validity_url": fallback_public_key["key_validity_url"], + "public_key": fallback_public_key["public_key"], + }, + "room_id": room_id, + "sender": user.to_string(), + "state_key": token, + }, + txn_id=txn_id, + ) + + @defer.inlineCallbacks + def _ask_id_server_for_third_party_invite( + self, + id_server, + medium, + address, + room_id, + inviter_user_id, + room_alias, + room_avatar_url, + room_join_rules, + room_name, + inviter_display_name, + inviter_avatar_url + ): + """ + Asks an identity server for a third party invite. + + Args: + id_server (str): hostname + optional port for the identity server. + medium (str): The literal string "email". + address (str): The third party address being invited. + room_id (str): The ID of the room to which the user is invited. + inviter_user_id (str): The user ID of the inviter. + room_alias (str): An alias for the room, for cosmetic notifications. + room_avatar_url (str): The URL of the room's avatar, for cosmetic + notifications. + room_join_rules (str): The join rules of the email (e.g. "public"). + room_name (str): The m.room.name of the room. + inviter_display_name (str): The current display name of the + inviter. + inviter_avatar_url (str): The URL of the inviter's avatar. + + Returns: + A deferred tuple containing: + token (str): The token which must be signed to prove authenticity. + public_keys ([{"public_key": str, "key_validity_url": str}]): + public_key is a base64-encoded ed25519 public key. + fallback_public_key: One element from public_keys. + display_name (str): A user-friendly name to represent the invited + user. + """ + + is_url = "%s%s/_matrix/identity/api/v1/store-invite" % ( + id_server_scheme, id_server, + ) + + invite_config = { + "medium": medium, + "address": address, + "room_id": room_id, + "room_alias": room_alias, + "room_avatar_url": room_avatar_url, + "room_join_rules": room_join_rules, + "room_name": room_name, + "sender": inviter_user_id, + "sender_display_name": inviter_display_name, + "sender_avatar_url": inviter_avatar_url, + } + + if self.hs.config.invite_3pid_guest: + registration_handler = self.hs.get_handlers().registration_handler + guest_access_token = yield registration_handler.guest_access_token_for( + medium=medium, + address=address, + inviter_user_id=inviter_user_id, + ) + + guest_user_info = yield self.hs.get_auth().get_user_by_access_token( + guest_access_token + ) + + invite_config.update({ + "guest_access_token": guest_access_token, + "guest_user_id": guest_user_info["user"].to_string(), + }) + + data = yield self.hs.get_simple_http_client().post_urlencoded_get_json( + is_url, + invite_config + ) + # TODO: Check for success + token = data["token"] + public_keys = data.get("public_keys", []) + if "public_key" in data: + fallback_public_key = { + "public_key": data["public_key"], + "key_validity_url": "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % ( + id_server_scheme, id_server, + ), + } + else: + fallback_public_key = public_keys[0] + + if not public_keys: + public_keys.append(fallback_public_key) + display_name = data["display_name"] + defer.returnValue((token, public_keys, fallback_public_key, display_name)) + + @defer.inlineCallbacks + def forget(self, user, room_id): + user_id = user.to_string() + + member = yield self.state_handler.get_current_state( + room_id=room_id, + event_type=EventTypes.Member, + state_key=user_id + ) + membership = member.membership if member else None + + if membership is not None and membership != Membership.LEAVE: + raise SynapseError(400, "User %s in room %s" % ( + user_id, room_id + )) + + if membership: + yield self.store.forget(user_id, room_id) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 9937d8dd7f..df75d70fac 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -21,6 +21,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.api.filtering import Filter from synapse.api.errors import SynapseError from synapse.events.utils import serialize_event +from synapse.visibility import filter_events_for_client from unpaddedbase64 import decode_base64, encode_base64 @@ -172,8 +173,8 @@ class SearchHandler(BaseHandler): filtered_events = search_filter.filter([r["event"] for r in results]) - events = yield self._filter_events_for_client( - user.to_string(), filtered_events + events = yield filter_events_for_client( + self.store, user.to_string(), filtered_events ) events.sort(key=lambda e: -rank_map[e.event_id]) @@ -223,8 +224,8 @@ class SearchHandler(BaseHandler): r["event"] for r in results ]) - events = yield self._filter_events_for_client( - user.to_string(), filtered_events + events = yield filter_events_for_client( + self.store, user.to_string(), filtered_events ) room_events.extend(events) @@ -281,12 +282,12 @@ class SearchHandler(BaseHandler): event.room_id, event.event_id, before_limit, after_limit ) - res["events_before"] = yield self._filter_events_for_client( - user.to_string(), res["events_before"] + res["events_before"] = yield filter_events_for_client( + self.store, user.to_string(), res["events_before"] ) - res["events_after"] = yield self._filter_events_for_client( - user.to_string(), res["events_after"] + res["events_after"] = yield filter_events_for_client( + self.store, user.to_string(), res["events_after"] ) res["start"] = now_token.copy_and_replace( diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 1f6fde8e8a..be26a491ff 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -13,14 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import BaseHandler - -from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes -from synapse.util import unwrapFirstError -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.async import concurrently_execute +from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure +from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user +from synapse.visibility import filter_events_for_client from twisted.internet import defer @@ -35,6 +34,7 @@ SyncConfig = collections.namedtuple("SyncConfig", [ "user", "filter_collection", "is_guest", + "request_key", ]) @@ -130,14 +130,16 @@ class SyncResult(collections.namedtuple("SyncResult", [ ) -class SyncHandler(BaseHandler): +class SyncHandler(object): def __init__(self, hs): - super(SyncHandler, self).__init__(hs) + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() + self.response_cache = ResponseCache() - @defer.inlineCallbacks def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): """Get the sync for a client if we have new data for it now. Otherwise @@ -146,7 +148,19 @@ class SyncHandler(BaseHandler): Returns: A Deferred SyncResult. """ + result = self.response_cache.get(sync_config.request_key) + if not result: + result = self.response_cache.set( + sync_config.request_key, + self._wait_for_sync_for_user( + sync_config, since_token, timeout, full_state + ) + ) + return result + @defer.inlineCallbacks + def _wait_for_sync_for_user(self, sync_config, since_token, timeout, + full_state): context = LoggingContext.current_context() if context: if since_token is None: @@ -179,197 +193,15 @@ class SyncHandler(BaseHandler): Returns: A Deferred SyncResult. """ - if since_token is None or full_state: - return self.full_state_sync(sync_config, since_token) - else: - return self.incremental_sync_with_gap(sync_config, since_token) - - @defer.inlineCallbacks - def full_state_sync(self, sync_config, timeline_since_token): - """Get a sync for a client which is starting without any state. - - If a 'message_since_token' is given, only timeline events which have - happened since that token will be returned. - - Returns: - A Deferred SyncResult. - """ - now_token = yield self.event_sources.get_current_token() - - now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token - ) - - presence_stream = self.event_sources.sources["presence"] - # TODO (mjark): This looks wrong, shouldn't we be getting the presence - # UP to the present rather than after the present? - pagination_config = PaginationConfig(from_token=now_token) - presence, _ = yield presence_stream.get_pagination_rows( - user=sync_config.user, - pagination_config=pagination_config.get_source_config("presence"), - key=None - ) - - membership_list = ( - Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN - ) - - room_list = yield self.store.get_rooms_for_user_where_membership_is( - user_id=sync_config.user.to_string(), - membership_list=membership_list - ) - - account_data, account_data_by_room = ( - yield self.store.get_account_data_for_user( - sync_config.user.to_string() - ) - ) - - account_data['m.push_rules'] = yield self.push_rules_for_user( - sync_config.user - ) - - tags_by_room = yield self.store.get_tags_for_user( - sync_config.user.to_string() - ) - - joined = [] - invited = [] - archived = [] - deferreds = [] - - room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)] - for room_list_chunk in room_list_chunks: - for event in room_list_chunk: - if event.membership == Membership.JOIN: - room_sync_deferred = preserve_fn( - self.full_state_sync_for_joined_room - )( - room_id=event.room_id, - sync_config=sync_config, - now_token=now_token, - timeline_since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) - room_sync_deferred.addCallback(joined.append) - deferreds.append(room_sync_deferred) - elif event.membership == Membership.INVITE: - invite = yield self.store.get_event(event.event_id) - invited.append(InvitedSyncResult( - room_id=event.room_id, - invite=invite, - )) - elif event.membership in (Membership.LEAVE, Membership.BAN): - # Always send down rooms we were banned or kicked from. - if not sync_config.filter_collection.include_leave: - if event.membership == Membership.LEAVE: - if sync_config.user.to_string() == event.sender: - continue - - leave_token = now_token.copy_and_replace( - "room_key", "s%d" % (event.stream_ordering,) - ) - room_sync_deferred = preserve_fn( - self.full_state_sync_for_archived_room - )( - sync_config=sync_config, - room_id=event.room_id, - leave_event_id=event.event_id, - leave_token=leave_token, - timeline_since_token=timeline_since_token, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) - room_sync_deferred.addCallback(archived.append) - deferreds.append(room_sync_deferred) - - yield defer.gatherResults( - deferreds, consumeErrors=True - ).addErrback(unwrapFirstError) - - account_data_for_user = sync_config.filter_collection.filter_account_data( - self.account_data_for_user(account_data) - ) - - presence = sync_config.filter_collection.filter_presence( - presence - ) - - defer.returnValue(SyncResult( - presence=presence, - account_data=account_data_for_user, - joined=joined, - invited=invited, - archived=archived, - next_batch=now_token, - )) - - @defer.inlineCallbacks - def full_state_sync_for_joined_room(self, room_id, sync_config, - now_token, timeline_since_token, - ephemeral_by_room, tags_by_room, - account_data_by_room): - """Sync a room for a client which is starting without any state - Returns: - A Deferred JoinedSyncResult. - """ - - batch = yield self.load_filtered_recents( - room_id, sync_config, now_token, since_token=timeline_since_token - ) - - room_sync = yield self.incremental_sync_with_gap_for_room( - room_id, sync_config, - now_token=now_token, - since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - batch=batch, - full_state=True, - ) - - defer.returnValue(room_sync) + return self.generate_sync_result(sync_config, since_token, full_state) @defer.inlineCallbacks def push_rules_for_user(self, user): user_id = user.to_string() - rawrules = yield self.store.get_push_rules_for_user(user_id) - enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id) - rules = format_push_rules_for_user(user, rawrules, enabled_map) + rules = yield self.store.get_push_rules_for_user(user_id) + rules = format_push_rules_for_user(user, rules) defer.returnValue(rules) - def account_data_for_user(self, account_data): - account_data_events = [] - - for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) - - return account_data_events - - def account_data_for_room(self, room_id, tags_by_room, account_data_by_room): - account_data_events = [] - tags = tags_by_room.get(room_id) - if tags is not None: - account_data_events.append({ - "type": "m.tag", - "content": {"tags": tags}, - }) - - account_data = account_data_by_room.get(room_id, {}) - for account_data_type, content in account_data.items(): - account_data_events.append({ - "type": account_data_type, - "content": content, - }) - - return account_data_events - @defer.inlineCallbacks def ephemeral_by_room(self, sync_config, now_token, since_token=None): """Get the ephemeral events for each room the user is in @@ -432,255 +264,44 @@ class SyncHandler(BaseHandler): defer.returnValue((now_token, ephemeral_by_room)) - def full_state_sync_for_archived_room(self, room_id, sync_config, - leave_event_id, leave_token, - timeline_since_token, tags_by_room, - account_data_by_room): - """Sync a room for a client which is starting without any state - Returns: - A Deferred ArchivedSyncResult. - """ - - return self.incremental_sync_for_archived_room( - sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room, - account_data_by_room, full_state=True, leave_token=leave_token, - ) - @defer.inlineCallbacks - def incremental_sync_with_gap(self, sync_config, since_token): - """ Get the incremental delta needed to bring the client up to - date with the server. - Returns: - A Deferred SyncResult. + def _load_filtered_recents(self, room_id, sync_config, now_token, + since_token=None, recents=None, newly_joined_room=False): """ - now_token = yield self.event_sources.get_current_token() - - rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) - room_ids = [room.room_id for room in rooms] - - presence_source = self.event_sources.sources["presence"] - presence, presence_key = yield presence_source.get_new_events( - user=sync_config.user, - from_key=since_token.presence_key, - limit=sync_config.filter_collection.presence_limit(), - room_ids=room_ids, - is_guest=sync_config.is_guest, - ) - now_token = now_token.copy_and_replace("presence_key", presence_key) - - now_token, ephemeral_by_room = yield self.ephemeral_by_room( - sync_config, now_token, since_token - ) - - rm_handler = self.hs.get_handlers().room_member_handler - app_service = yield self.store.get_app_service_by_user_id( - sync_config.user.to_string() - ) - if app_service: - rooms = yield self.store.get_app_service_rooms(app_service) - joined_room_ids = set(r.room_id for r in rooms) - else: - joined_room_ids = yield rm_handler.get_joined_rooms_for_user( - sync_config.user - ) - - user_id = sync_config.user.to_string() - - timeline_limit = sync_config.filter_collection.timeline_limit() - - tags_by_room = yield self.store.get_updated_tags( - user_id, - since_token.account_data_key, - ) - - account_data, account_data_by_room = ( - yield self.store.get_updated_account_data_for_user( - user_id, - since_token.account_data_key, - ) - ) - - push_rules_changed = yield self.store.have_push_rules_changed_for_user( - user_id, int(since_token.push_rules_key) - ) - - if push_rules_changed: - account_data["m.push_rules"] = yield self.push_rules_for_user( - sync_config.user - ) - - # Get a list of membership change events that have happened. - rooms_changed = yield self.store.get_membership_changes_for_user( - user_id, since_token.room_key, now_token.room_key - ) - - mem_change_events_by_room_id = {} - for event in rooms_changed: - mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) - - newly_joined_rooms = [] - archived = [] - invited = [] - for room_id, events in mem_change_events_by_room_id.items(): - non_joins = [e for e in events if e.membership != Membership.JOIN] - has_join = len(non_joins) != len(events) - - # We want to figure out if we joined the room at some point since - # the last sync (even if we have since left). This is to make sure - # we do send down the room, and with full state, where necessary - if room_id in joined_room_ids or has_join: - old_state = yield self.get_state_at(room_id, since_token) - old_mem_ev = old_state.get((EventTypes.Member, user_id), None) - if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: - newly_joined_rooms.append(room_id) - - if room_id in joined_room_ids: - continue - - if not non_joins: - continue - - # Only bother if we're still currently invited - should_invite = non_joins[-1].membership == Membership.INVITE - if should_invite: - room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) - if room_sync: - invited.append(room_sync) - - # Always include leave/ban events. Just take the last one. - # TODO: How do we handle ban -> leave in same batch? - leave_events = [ - e for e in non_joins - if e.membership in (Membership.LEAVE, Membership.BAN) - ] - - if leave_events: - leave_event = leave_events[-1] - room_sync = yield self.incremental_sync_for_archived_room( - sync_config, room_id, leave_event.event_id, since_token, - tags_by_room, account_data_by_room, - full_state=room_id in newly_joined_rooms - ) - if room_sync: - archived.append(room_sync) - - # Get all events for rooms we're currently joined to. - room_to_events = yield self.store.get_room_events_stream_for_rooms( - room_ids=joined_room_ids, - from_key=since_token.room_key, - to_key=now_token.room_key, - limit=timeline_limit + 1, - ) - - joined = [] - # We loop through all room ids, even if there are no new events, in case - # there are non room events taht we need to notify about. - for room_id in joined_room_ids: - room_entry = room_to_events.get(room_id, None) - - if room_entry: - events, start_key = room_entry - - prev_batch_token = now_token.copy_and_replace("room_key", start_key) - - newly_joined_room = room_id in newly_joined_rooms - full_state = newly_joined_room - - batch = yield self.load_filtered_recents( - room_id, sync_config, prev_batch_token, - since_token=since_token, - recents=events, - newly_joined_room=newly_joined_room, - ) - else: - batch = TimelineBatch( - events=[], - prev_batch=since_token, - limited=False, - ) - full_state = False - - room_sync = yield self.incremental_sync_with_gap_for_room( - room_id=room_id, - sync_config=sync_config, - since_token=since_token, - now_token=now_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - batch=batch, - full_state=full_state, - ) - if room_sync: - joined.append(room_sync) - - # For each newly joined room, we want to send down presence of - # existing users. - presence_handler = self.hs.get_handlers().presence_handler - extra_presence_users = set() - for room_id in newly_joined_rooms: - users = yield self.store.get_users_in_room(event.room_id) - extra_presence_users.update(users) - - # For each new member, send down presence. - for joined_sync in joined: - it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values()) - for event in it: - if event.type == EventTypes.Member: - if event.membership == Membership.JOIN: - extra_presence_users.add(event.state_key) - - states = yield presence_handler.get_states( - [u for u in extra_presence_users if u != user_id], - as_event=True, - ) - presence.extend(states) - - account_data_for_user = sync_config.filter_collection.filter_account_data( - self.account_data_for_user(account_data) - ) - - presence = sync_config.filter_collection.filter_presence( - presence - ) - - defer.returnValue(SyncResult( - presence=presence, - account_data=account_data_for_user, - joined=joined, - invited=invited, - archived=archived, - next_batch=now_token, - )) - - @defer.inlineCallbacks - def load_filtered_recents(self, room_id, sync_config, now_token, - since_token=None, recents=None, newly_joined_room=False): - """ - :returns a Deferred TimelineBatch + Returns: + a Deferred TimelineBatch """ with Measure(self.clock, "load_filtered_recents"): - filtering_factor = 2 timeline_limit = sync_config.filter_collection.timeline_limit() - load_limit = max(timeline_limit * filtering_factor, 10) - max_repeat = 5 # Only try a few times per room, otherwise - room_key = now_token.room_key - end_key = room_key if recents is None or newly_joined_room or timeline_limit < len(recents): limited = True else: limited = False - if recents is not None: + if recents: recents = sync_config.filter_collection.filter_room_timeline(recents) - recents = yield self._filter_events_for_client( + recents = yield filter_events_for_client( + self.store, sync_config.user.to_string(), recents, ) else: recents = [] + if not limited: + defer.returnValue(TimelineBatch( + events=recents, + prev_batch=now_token, + limited=False + )) + + filtering_factor = 2 + load_limit = max(timeline_limit * filtering_factor, 10) + max_repeat = 5 # Only try a few times per room, otherwise + room_key = now_token.room_key + end_key = room_key + since_key = None if since_token and not newly_joined_room: since_key = since_token.room_key @@ -695,7 +316,8 @@ class SyncHandler(BaseHandler): loaded_recents = sync_config.filter_collection.filter_room_timeline( events ) - loaded_recents = yield self._filter_events_for_client( + loaded_recents = yield filter_events_for_client( + self.store, sync_config.user.to_string(), loaded_recents, ) @@ -723,109 +345,15 @@ class SyncHandler(BaseHandler): )) @defer.inlineCallbacks - def incremental_sync_with_gap_for_room(self, room_id, sync_config, - since_token, now_token, - ephemeral_by_room, tags_by_room, - account_data_by_room, - batch, full_state=False): - state = yield self.compute_state_delta( - room_id, batch, sync_config, since_token, now_token, - full_state=full_state - ) - - account_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) - - account_data = sync_config.filter_collection.filter_room_account_data( - account_data - ) - - ephemeral = sync_config.filter_collection.filter_room_ephemeral( - ephemeral_by_room.get(room_id, []) - ) - - unread_notifications = {} - room_sync = JoinedSyncResult( - room_id=room_id, - timeline=batch, - state=state, - ephemeral=ephemeral, - account_data=account_data, - unread_notifications=unread_notifications, - ) - - if room_sync: - notifs = yield self.unread_notifs_for_room_id( - room_id, sync_config - ) - - if notifs is not None: - unread_notifications["notification_count"] = notifs["notify_count"] - unread_notifications["highlight_count"] = notifs["highlight_count"] - - logger.debug("Room sync: %r", room_sync) - - defer.returnValue(room_sync) - - @defer.inlineCallbacks - def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id, - since_token, tags_by_room, - account_data_by_room, full_state, - leave_token=None): - """ Get the incremental delta needed to bring the client up to date for - the archived room. - Returns: - A Deferred ArchivedSyncResult - """ - - if not leave_token: - stream_token = yield self.store.get_stream_token_for_event( - leave_event_id - ) - - leave_token = since_token.copy_and_replace("room_key", stream_token) - - if since_token and since_token.is_after(leave_token): - defer.returnValue(None) - - batch = yield self.load_filtered_recents( - room_id, sync_config, leave_token, since_token, - ) - - logger.debug("Recents %r", batch) - - state_events_delta = yield self.compute_state_delta( - room_id, batch, sync_config, since_token, leave_token, - full_state=full_state - ) - - account_data = self.account_data_for_room( - room_id, tags_by_room, account_data_by_room - ) - - account_data = sync_config.filter_collection.filter_room_account_data( - account_data - ) - - room_sync = ArchivedSyncResult( - room_id=room_id, - timeline=batch, - state=state_events_delta, - account_data=account_data, - ) - - logger.debug("Room sync: %r", room_sync) - - defer.returnValue(room_sync) - - @defer.inlineCallbacks def get_state_after_event(self, event): """ Get the room state after the given event - :param synapse.events.EventBase event: event of interest - :return: A Deferred map from ((type, state_key)->Event) + Args: + event(synapse.events.EventBase): event of interest + + Returns: + A Deferred map from ((type, state_key)->Event) """ state = yield self.store.get_state_for_event(event.event_id) if event.is_state(): @@ -836,9 +364,13 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def get_state_at(self, room_id, stream_position): """ Get the room state at a particular stream position - :param str room_id: room for which to get state - :param StreamToken stream_position: point at which to get state - :returns: A Deferred map from ((type, state_key)->Event) + + Args: + room_id(str): room for which to get state + stream_position(StreamToken): point at which to get state + + Returns: + A Deferred map from ((type, state_key)->Event) """ last_events, token = yield self.store.get_recent_events_for_room( room_id, end_token=stream_position.room_key, limit=1, @@ -859,15 +391,18 @@ class SyncHandler(BaseHandler): """ Works out the differnce in state between the start of the timeline and the previous sync. - :param str room_id - :param TimelineBatch batch: The timeline batch for the room that will - be sent to the user. - :param sync_config - :param str since_token: Token of the end of the previous batch. May be None. - :param str now_token: Token of the end of the current batch. - :param bool full_state: Whether to force returning the full state. + Args: + room_id(str): + batch(synapse.handlers.sync.TimelineBatch): The timeline batch for + the room that will be sent to the user. + sync_config(synapse.handlers.sync.SyncConfig): + since_token(str|None): Token of the end of the previous batch. May + be None. + now_token(str): Token of the end of the current batch. + full_state(bool): Whether to force returning the full state. - :returns A new event dictionary + Returns: + A deferred new event dictionary """ # TODO(mjark) Check if the state events were received by the server # after the previous sync, since we need to include those state @@ -934,24 +469,6 @@ class SyncHandler(BaseHandler): for e in sync_config.filter_collection.filter_room_state(state.values()) }) - def check_joined_room(self, sync_config, state_delta): - """ - Check if the user has just joined the given room (so should - be given the full state) - - :param sync_config: - :param dict[(str,str), synapse.events.FrozenEvent] state_delta: the - difference in state since the last sync - - :returns A deferred Tuple (state_delta, limited) - """ - join_event = state_delta.get(( - EventTypes.Member, sync_config.user.to_string()), None) - if join_event is not None: - if join_event.content["membership"] == Membership.JOIN: - return True - return False - @defer.inlineCallbacks def unread_notifs_for_room_id(self, room_id, sync_config): with Measure(self.clock, "unread_notifs_for_room_id"): @@ -972,6 +489,551 @@ class SyncHandler(BaseHandler): # count is whatever it was last time. defer.returnValue(None) + @defer.inlineCallbacks + def generate_sync_result(self, sync_config, since_token=None, full_state=False): + """Generates a sync result. + + Args: + sync_config (SyncConfig) + since_token (StreamToken) + full_state (bool) + + Returns: + Deferred(SyncResult) + """ + + # NB: The now_token gets changed by some of the generate_sync_* methods, + # this is due to some of the underlying streams not supporting the ability + # to query up to a given point. + # Always use the `now_token` in `SyncResultBuilder` + now_token = yield self.event_sources.get_current_token() + + sync_result_builder = SyncResultBuilder( + sync_config, full_state, + since_token=since_token, + now_token=now_token, + ) + + account_data_by_room = yield self._generate_sync_entry_for_account_data( + sync_result_builder + ) + + res = yield self._generate_sync_entry_for_rooms( + sync_result_builder, account_data_by_room + ) + newly_joined_rooms, newly_joined_users = res + + yield self._generate_sync_entry_for_presence( + sync_result_builder, newly_joined_rooms, newly_joined_users + ) + + defer.returnValue(SyncResult( + presence=sync_result_builder.presence, + account_data=sync_result_builder.account_data, + joined=sync_result_builder.joined, + invited=sync_result_builder.invited, + archived=sync_result_builder.archived, + next_batch=sync_result_builder.now_token, + )) + + @defer.inlineCallbacks + def _generate_sync_entry_for_account_data(self, sync_result_builder): + """Generates the account data portion of the sync response. Populates + `sync_result_builder` with the result. + + Args: + sync_result_builder(SyncResultBuilder) + + Returns: + Deferred(dict): A dictionary containing the per room account data. + """ + sync_config = sync_result_builder.sync_config + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + + if since_token and not sync_result_builder.full_state: + account_data, account_data_by_room = ( + yield self.store.get_updated_account_data_for_user( + user_id, + since_token.account_data_key, + ) + ) + + push_rules_changed = yield self.store.have_push_rules_changed_for_user( + user_id, int(since_token.push_rules_key) + ) + + if push_rules_changed: + account_data["m.push_rules"] = yield self.push_rules_for_user( + sync_config.user + ) + else: + account_data, account_data_by_room = ( + yield self.store.get_account_data_for_user( + sync_config.user.to_string() + ) + ) + + account_data['m.push_rules'] = yield self.push_rules_for_user( + sync_config.user + ) + + account_data_for_user = sync_config.filter_collection.filter_account_data([ + {"type": account_data_type, "content": content} + for account_data_type, content in account_data.items() + ]) + + sync_result_builder.account_data = account_data_for_user + + defer.returnValue(account_data_by_room) + + @defer.inlineCallbacks + def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms, + newly_joined_users): + """Generates the presence portion of the sync response. Populates the + `sync_result_builder` with the result. + + Args: + sync_result_builder(SyncResultBuilder) + newly_joined_rooms(list): List of rooms that the user has joined + since the last sync (or empty if an initial sync) + newly_joined_users(list): List of users that have joined rooms + since the last sync (or empty if an initial sync) + """ + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config + user = sync_result_builder.sync_config.user + + presence_source = self.event_sources.sources["presence"] + + since_token = sync_result_builder.since_token + if since_token and not sync_result_builder.full_state: + presence_key = since_token.presence_key + include_offline = True + else: + presence_key = None + include_offline = False + + presence, presence_key = yield presence_source.get_new_events( + user=user, + from_key=presence_key, + is_guest=sync_config.is_guest, + include_offline=include_offline, + ) + sync_result_builder.now_token = now_token.copy_and_replace( + "presence_key", presence_key + ) + + extra_users_ids = set(newly_joined_users) + for room_id in newly_joined_rooms: + users = yield self.store.get_users_in_room(room_id) + extra_users_ids.update(users) + extra_users_ids.discard(user.to_string()) + + states = yield self.presence_handler.get_states( + extra_users_ids, + as_event=True, + ) + presence.extend(states) + + # Deduplicate the presence entries so that there's at most one per user + presence = {p["content"]["user_id"]: p for p in presence}.values() + + presence = sync_config.filter_collection.filter_presence( + presence + ) + + sync_result_builder.presence = presence + + @defer.inlineCallbacks + def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room): + """Generates the rooms portion of the sync response. Populates the + `sync_result_builder` with the result. + + Args: + sync_result_builder(SyncResultBuilder) + account_data_by_room(dict): Dictionary of per room account data + + Returns: + Deferred(tuple): Returns a 2-tuple of + `(newly_joined_rooms, newly_joined_users)` + """ + user_id = sync_result_builder.sync_config.user.to_string() + + now_token, ephemeral_by_room = yield self.ephemeral_by_room( + sync_result_builder.sync_config, + now_token=sync_result_builder.now_token, + since_token=sync_result_builder.since_token, + ) + sync_result_builder.now_token = now_token + + ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( + "m.ignored_user_list", user_id=user_id, + ) + + if ignored_account_data: + ignored_users = ignored_account_data.get("ignored_users", {}).keys() + else: + ignored_users = frozenset() + + if sync_result_builder.since_token: + res = yield self._get_rooms_changed(sync_result_builder, ignored_users) + room_entries, invited, newly_joined_rooms = res + + tags_by_room = yield self.store.get_updated_tags( + user_id, + sync_result_builder.since_token.account_data_key, + ) + else: + res = yield self._get_all_rooms(sync_result_builder, ignored_users) + room_entries, invited, newly_joined_rooms = res + + tags_by_room = yield self.store.get_tags_for_user(user_id) + + def handle_room_entries(room_entry): + return self._generate_room_entry( + sync_result_builder, + ignored_users, + room_entry, + ephemeral=ephemeral_by_room.get(room_entry.room_id, []), + tags=tags_by_room.get(room_entry.room_id), + account_data=account_data_by_room.get(room_entry.room_id, {}), + always_include=sync_result_builder.full_state, + ) + + yield concurrently_execute(handle_room_entries, room_entries, 10) + + sync_result_builder.invited.extend(invited) + + # Now we want to get any newly joined users + newly_joined_users = set() + if sync_result_builder.since_token: + for joined_sync in sync_result_builder.joined: + it = itertools.chain( + joined_sync.timeline.events, joined_sync.state.values() + ) + for event in it: + if event.type == EventTypes.Member: + if event.membership == Membership.JOIN: + newly_joined_users.add(event.state_key) + + defer.returnValue((newly_joined_rooms, newly_joined_users)) + + @defer.inlineCallbacks + def _get_rooms_changed(self, sync_result_builder, ignored_users): + """Gets the the changes that have happened since the last sync. + + Args: + sync_result_builder(SyncResultBuilder) + ignored_users(set(str)): Set of users ignored by user. + + Returns: + Deferred(tuple): Returns a tuple of the form: + `([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)` + """ + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config + + assert since_token + + app_service = yield self.store.get_app_service_by_user_id(user_id) + if app_service: + rooms = yield self.store.get_app_service_rooms(app_service) + joined_room_ids = set(r.room_id for r in rooms) + else: + rooms = yield self.store.get_rooms_for_user(user_id) + joined_room_ids = set(r.room_id for r in rooms) + + # Get a list of membership change events that have happened. + rooms_changed = yield self.store.get_membership_changes_for_user( + user_id, since_token.room_key, now_token.room_key + ) + + mem_change_events_by_room_id = {} + for event in rooms_changed: + mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) + + newly_joined_rooms = [] + room_entries = [] + invited = [] + for room_id, events in mem_change_events_by_room_id.items(): + non_joins = [e for e in events if e.membership != Membership.JOIN] + has_join = len(non_joins) != len(events) + + # We want to figure out if we joined the room at some point since + # the last sync (even if we have since left). This is to make sure + # we do send down the room, and with full state, where necessary + if room_id in joined_room_ids or has_join: + old_state = yield self.get_state_at(room_id, since_token) + old_mem_ev = old_state.get((EventTypes.Member, user_id), None) + if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: + newly_joined_rooms.append(room_id) + + if room_id in joined_room_ids: + continue + + if not non_joins: + continue + + # Only bother if we're still currently invited + should_invite = non_joins[-1].membership == Membership.INVITE + if should_invite: + if event.sender not in ignored_users: + room_sync = InvitedSyncResult(room_id, invite=non_joins[-1]) + if room_sync: + invited.append(room_sync) + + # Always include leave/ban events. Just take the last one. + # TODO: How do we handle ban -> leave in same batch? + leave_events = [ + e for e in non_joins + if e.membership in (Membership.LEAVE, Membership.BAN) + ] + + if leave_events: + leave_event = leave_events[-1] + leave_stream_token = yield self.store.get_stream_token_for_event( + leave_event.event_id + ) + leave_token = since_token.copy_and_replace( + "room_key", leave_stream_token + ) + + if since_token and since_token.is_after(leave_token): + continue + + room_entries.append(RoomSyncResultBuilder( + room_id=room_id, + rtype="archived", + events=None, + newly_joined=room_id in newly_joined_rooms, + full_state=False, + since_token=since_token, + upto_token=leave_token, + )) + + timeline_limit = sync_config.filter_collection.timeline_limit() + + # Get all events for rooms we're currently joined to. + room_to_events = yield self.store.get_room_events_stream_for_rooms( + room_ids=joined_room_ids, + from_key=since_token.room_key, + to_key=now_token.room_key, + limit=timeline_limit + 1, + ) + + # We loop through all room ids, even if there are no new events, in case + # there are non room events taht we need to notify about. + for room_id in joined_room_ids: + room_entry = room_to_events.get(room_id, None) + + if room_entry: + events, start_key = room_entry + + prev_batch_token = now_token.copy_and_replace("room_key", start_key) + + room_entries.append(RoomSyncResultBuilder( + room_id=room_id, + rtype="joined", + events=events, + newly_joined=room_id in newly_joined_rooms, + full_state=False, + since_token=None if room_id in newly_joined_rooms else since_token, + upto_token=prev_batch_token, + )) + else: + room_entries.append(RoomSyncResultBuilder( + room_id=room_id, + rtype="joined", + events=[], + newly_joined=room_id in newly_joined_rooms, + full_state=False, + since_token=since_token, + upto_token=since_token, + )) + + defer.returnValue((room_entries, invited, newly_joined_rooms)) + + @defer.inlineCallbacks + def _get_all_rooms(self, sync_result_builder, ignored_users): + """Returns entries for all rooms for the user. + + Args: + sync_result_builder(SyncResultBuilder) + ignored_users(set(str)): Set of users ignored by user. + + Returns: + Deferred(tuple): Returns a tuple of the form: + `([RoomSyncResultBuilder], [InvitedSyncResult], [])` + """ + + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config + + membership_list = ( + Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN + ) + + room_list = yield self.store.get_rooms_for_user_where_membership_is( + user_id=user_id, + membership_list=membership_list + ) + + room_entries = [] + invited = [] + + for event in room_list: + if event.membership == Membership.JOIN: + room_entries.append(RoomSyncResultBuilder( + room_id=event.room_id, + rtype="joined", + events=None, + newly_joined=False, + full_state=True, + since_token=since_token, + upto_token=now_token, + )) + elif event.membership == Membership.INVITE: + if event.sender in ignored_users: + continue + invite = yield self.store.get_event(event.event_id) + invited.append(InvitedSyncResult( + room_id=event.room_id, + invite=invite, + )) + elif event.membership in (Membership.LEAVE, Membership.BAN): + # Always send down rooms we were banned or kicked from. + if not sync_config.filter_collection.include_leave: + if event.membership == Membership.LEAVE: + if user_id == event.sender: + continue + + leave_token = now_token.copy_and_replace( + "room_key", "s%d" % (event.stream_ordering,) + ) + room_entries.append(RoomSyncResultBuilder( + room_id=event.room_id, + rtype="archived", + events=None, + newly_joined=False, + full_state=True, + since_token=since_token, + upto_token=leave_token, + )) + + defer.returnValue((room_entries, invited, [])) + + @defer.inlineCallbacks + def _generate_room_entry(self, sync_result_builder, ignored_users, + room_builder, ephemeral, tags, account_data, + always_include=False): + """Populates the `joined` and `archived` section of `sync_result_builder` + based on the `room_builder`. + + Args: + sync_result_builder(SyncResultBuilder) + ignored_users(set(str)): Set of users ignored by user. + room_builder(RoomSyncResultBuilder) + ephemeral(list): List of new ephemeral events for room + tags(list): List of *all* tags for room, or None if there has been + no change. + account_data(list): List of new account data for room + always_include(bool): Always include this room in the sync response, + even if empty. + """ + newly_joined = room_builder.newly_joined + full_state = ( + room_builder.full_state + or newly_joined + or sync_result_builder.full_state + ) + events = room_builder.events + + # We want to shortcut out as early as possible. + if not (always_include or account_data or ephemeral or full_state): + if events == [] and tags is None: + return + + since_token = sync_result_builder.since_token + now_token = sync_result_builder.now_token + sync_config = sync_result_builder.sync_config + + room_id = room_builder.room_id + since_token = room_builder.since_token + upto_token = room_builder.upto_token + + batch = yield self._load_filtered_recents( + room_id, sync_config, + now_token=upto_token, + since_token=since_token, + recents=events, + newly_joined_room=newly_joined, + ) + + account_data_events = [] + if tags is not None: + account_data_events.append({ + "type": "m.tag", + "content": {"tags": tags}, + }) + + for account_data_type, content in account_data.items(): + account_data_events.append({ + "type": account_data_type, + "content": content, + }) + + account_data = sync_config.filter_collection.filter_room_account_data( + account_data_events + ) + + ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) + + if not (always_include or batch or account_data or ephemeral or full_state): + return + + state = yield self.compute_state_delta( + room_id, batch, sync_config, since_token, now_token, + full_state=full_state + ) + + if room_builder.rtype == "joined": + unread_notifications = {} + room_sync = JoinedSyncResult( + room_id=room_id, + timeline=batch, + state=state, + ephemeral=ephemeral, + account_data=account_data_events, + unread_notifications=unread_notifications, + ) + + if room_sync or always_include: + notifs = yield self.unread_notifs_for_room_id( + room_id, sync_config + ) + + if notifs is not None: + unread_notifications["notification_count"] = notifs["notify_count"] + unread_notifications["highlight_count"] = notifs["highlight_count"] + + sync_result_builder.joined.append(room_sync) + elif room_builder.rtype == "archived": + room_sync = ArchivedSyncResult( + room_id=room_id, + timeline=batch, + state=state, + account_data=account_data, + ) + if room_sync or always_include: + sync_result_builder.archived.append(room_sync) + else: + raise Exception("Unrecognized rtype: %r", room_builder.rtype) + def _action_has_highlight(actions): for action in actions: @@ -1019,3 +1081,51 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): (e.type, e.state_key): e for e in evs } + + +class SyncResultBuilder(object): + "Used to help build up a new SyncResult for a user" + def __init__(self, sync_config, full_state, since_token, now_token): + """ + Args: + sync_config(SyncConfig) + full_state(bool): The full_state flag as specified by user + since_token(StreamToken): The token supplied by user, or None. + now_token(StreamToken): The token to sync up to. + """ + self.sync_config = sync_config + self.full_state = full_state + self.since_token = since_token + self.now_token = now_token + + self.presence = [] + self.account_data = [] + self.joined = [] + self.invited = [] + self.archived = [] + + +class RoomSyncResultBuilder(object): + """Stores information needed to create either a `JoinedSyncResult` or + `ArchivedSyncResult`. + """ + def __init__(self, room_id, rtype, events, newly_joined, full_state, + since_token, upto_token): + """ + Args: + room_id(str) + rtype(str): One of `"joined"` or `"archived"` + events(list): List of events to include in the room, (more events + may be added when generating result). + newly_joined(bool): If the user has newly joined the room + full_state(bool): Whether the full state should be sent in result + since_token(StreamToken): Earliest point to return events from, or None + upto_token(StreamToken): Latest point to return events from. + """ + self.room_id = room_id + self.rtype = rtype + self.events = events + self.newly_joined = newly_joined + self.full_state = full_state + self.since_token = since_token + self.upto_token = upto_token diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 8ce27f49ec..861b8f7989 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -15,8 +15,6 @@ from twisted.internet import defer -from ._base import BaseHandler - from synapse.api.errors import SynapseError, AuthError from synapse.util.logcontext import PreserveLoggingContext from synapse.util.metrics import Measure @@ -32,14 +30,16 @@ logger = logging.getLogger(__name__) # A tiny object useful for storing a user's membership in a room, as a mapping # key -RoomMember = namedtuple("RoomMember", ("room_id", "user")) +RoomMember = namedtuple("RoomMember", ("room_id", "user_id")) -class TypingNotificationHandler(BaseHandler): +class TypingHandler(object): def __init__(self, hs): - super(TypingNotificationHandler, self).__init__(hs) - - self.homeserver = hs + self.store = hs.get_datastore() + self.server_name = hs.config.server_name + self.auth = hs.get_auth() + self.is_mine_id = hs.is_mine_id + self.notifier = hs.get_notifier() self.clock = hs.get_clock() @@ -67,20 +67,23 @@ class TypingNotificationHandler(BaseHandler): @defer.inlineCallbacks def started_typing(self, target_user, auth_user, room_id, timeout): - if not self.hs.is_mine(target_user): + target_user_id = target_user.to_string() + auth_user_id = auth_user.to_string() + + if not self.is_mine_id(target_user_id): raise SynapseError(400, "User is not hosted on this Home Server") - if target_user != auth_user: + if target_user_id != auth_user_id: raise AuthError(400, "Cannot set another user's typing state") - yield self.auth.check_joined_room(room_id, target_user.to_string()) + yield self.auth.check_joined_room(room_id, target_user_id) logger.debug( - "%s has started typing in %s", target_user.to_string(), room_id + "%s has started typing in %s", target_user_id, room_id ) until = self.clock.time_msec() + timeout - member = RoomMember(room_id=room_id, user=target_user) + member = RoomMember(room_id=room_id, user_id=target_user_id) was_present = member in self._member_typing_until @@ -104,25 +107,28 @@ class TypingNotificationHandler(BaseHandler): yield self._push_update( room_id=room_id, - user=target_user, + user_id=target_user_id, typing=True, ) @defer.inlineCallbacks def stopped_typing(self, target_user, auth_user, room_id): - if not self.hs.is_mine(target_user): + target_user_id = target_user.to_string() + auth_user_id = auth_user.to_string() + + if not self.is_mine_id(target_user_id): raise SynapseError(400, "User is not hosted on this Home Server") - if target_user != auth_user: + if target_user_id != auth_user_id: raise AuthError(400, "Cannot set another user's typing state") - yield self.auth.check_joined_room(room_id, target_user.to_string()) + yield self.auth.check_joined_room(room_id, target_user_id) logger.debug( - "%s has stopped typing in %s", target_user.to_string(), room_id + "%s has stopped typing in %s", target_user_id, room_id ) - member = RoomMember(room_id=room_id, user=target_user) + member = RoomMember(room_id=room_id, user_id=target_user_id) if member in self._member_typing_timer: self.clock.cancel_call_later(self._member_typing_timer[member]) @@ -132,8 +138,9 @@ class TypingNotificationHandler(BaseHandler): @defer.inlineCallbacks def user_left_room(self, user, room_id): - if self.hs.is_mine(user): - member = RoomMember(room_id=room_id, user=user) + user_id = user.to_string() + if self.is_mine_id(user_id): + member = RoomMember(room_id=room_id, user_id=user_id) yield self._stopped_typing(member) @defer.inlineCallbacks @@ -144,7 +151,7 @@ class TypingNotificationHandler(BaseHandler): yield self._push_update( room_id=member.room_id, - user=member.user, + user_id=member.user_id, typing=False, ) @@ -156,61 +163,53 @@ class TypingNotificationHandler(BaseHandler): del self._member_typing_timer[member] @defer.inlineCallbacks - def _push_update(self, room_id, user, typing): - localusers = set() - remotedomains = set() - - rm_handler = self.homeserver.get_handlers().room_member_handler - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=localusers, remotedomains=remotedomains - ) - - if localusers: - self._push_update_local( - room_id=room_id, - user=user, - typing=typing - ) + def _push_update(self, room_id, user_id, typing): + domains = yield self.store.get_joined_hosts_for_room(room_id) deferreds = [] - for domain in remotedomains: - deferreds.append(self.federation.send_edu( - destination=domain, - edu_type="m.typing", - content={ - "room_id": room_id, - "user_id": user.to_string(), - "typing": typing, - }, - )) + for domain in domains: + if domain == self.server_name: + self._push_update_local( + room_id=room_id, + user_id=user_id, + typing=typing + ) + else: + deferreds.append(self.federation.send_edu( + destination=domain, + edu_type="m.typing", + content={ + "room_id": room_id, + "user_id": user_id, + "typing": typing, + }, + )) yield defer.DeferredList(deferreds, consumeErrors=True) @defer.inlineCallbacks def _recv_edu(self, origin, content): room_id = content["room_id"] - user = UserID.from_string(content["user_id"]) + user_id = content["user_id"] - localusers = set() + # Check that the string is a valid user id + UserID.from_string(user_id) - rm_handler = self.homeserver.get_handlers().room_member_handler - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=localusers - ) + domains = yield self.store.get_joined_hosts_for_room(room_id) - if localusers: + if self.server_name in domains: self._push_update_local( room_id=room_id, - user=user, + user_id=user_id, typing=content["typing"] ) - def _push_update_local(self, room_id, user, typing): + def _push_update_local(self, room_id, user_id, typing): room_set = self._room_typing.setdefault(room_id, set()) if typing: - room_set.add(user) + room_set.add(user_id) else: - room_set.discard(user) + room_set.discard(user_id) self._latest_room_serial += 1 self._room_serials[room_id] = self._latest_room_serial @@ -226,9 +225,7 @@ class TypingNotificationHandler(BaseHandler): for room_id, serial in self._room_serials.items(): if last_id < serial and serial <= current_id: typing = self._room_typing[room_id] - typing_bytes = json.dumps([ - u.to_string() for u in typing - ], ensure_ascii=False) + typing_bytes = json.dumps(list(typing), ensure_ascii=False) rows.append((serial, room_id, typing_bytes)) rows.sort() return rows @@ -238,34 +235,26 @@ class TypingNotificationEventSource(object): def __init__(self, hs): self.hs = hs self.clock = hs.get_clock() - self._handler = None - self._room_member_handler = None - - def handler(self): - # Avoid cyclic dependency in handler setup - if not self._handler: - self._handler = self.hs.get_handlers().typing_notification_handler - return self._handler - - def room_member_handler(self): - if not self._room_member_handler: - self._room_member_handler = self.hs.get_handlers().room_member_handler - return self._room_member_handler + # We can't call get_typing_handler here because there's a cycle: + # + # Typing -> Notifier -> TypingNotificationEventSource -> Typing + # + self.get_typing_handler = hs.get_typing_handler def _make_event_for(self, room_id): - typing = self.handler()._room_typing[room_id] + typing = self.get_typing_handler()._room_typing[room_id] return { "type": "m.typing", "room_id": room_id, "content": { - "user_ids": [u.to_string() for u in typing], + "user_ids": list(typing), }, } def get_new_events(self, from_key, room_ids, **kwargs): with Measure(self.clock, "typing.get_new_events"): from_key = int(from_key) - handler = self.handler() + handler = self.get_typing_handler() events = [] for room_id in room_ids: @@ -279,7 +268,7 @@ class TypingNotificationEventSource(object): return events, handler._latest_room_serial def get_current_key(self): - return self.handler()._latest_room_serial + return self.get_typing_handler()._latest_room_serial def get_pagination_rows(self, user, pagination_config, key): return ([], pagination_config.from_key) |