diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/_base.py | 152 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 5 | ||||
-rw-r--r-- | synapse/handlers/events.py | 23 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 108 | ||||
-rw-r--r-- | synapse/handlers/message.py | 187 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 7 | ||||
-rw-r--r-- | synapse/handlers/private_user_data.py | 46 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 6 | ||||
-rw-r--r-- | synapse/handlers/register.py | 12 | ||||
-rw-r--r-- | synapse/handlers/room.py | 66 | ||||
-rw-r--r-- | synapse/handlers/search.py | 201 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 331 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 11 |
13 files changed, 844 insertions, 311 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 6a26cb1879..6519f183df 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -21,7 +21,6 @@ from synapse.api.constants import Membership, EventTypes from synapse.types import UserID, RoomAlias from synapse.util.logcontext import PreserveLoggingContext -from synapse.util import third_party_invites import logging @@ -30,6 +29,12 @@ logger = logging.getLogger(__name__) class BaseHandler(object): + """ + Common base class for the event handlers. + + :type store: synapse.storage.events.StateStore + :type state_handler: synapse.state.StateHandler + """ def __init__(self, hs): self.store = hs.get_datastore() @@ -47,37 +52,24 @@ class BaseHandler(object): self.event_builder_factory = hs.get_event_builder_factory() @defer.inlineCallbacks - def _filter_events_for_client(self, user_id, events): - event_id_to_state = yield self.store.get_state_for_events( - frozenset(e.event_id for e in events), - types=( - (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, user_id), - ) - ) + def _filter_events_for_client(self, user_id, events, is_guest=False, + require_all_visible_for_guests=True): + # Assumes that user has at some point joined the room if not is_guest. - def allowed(event, state): - if event.type == EventTypes.RoomHistoryVisibility: + def allowed(event, membership, visibility): + if visibility == "world_readable": return True - membership_ev = state.get((EventTypes.Member, user_id), None) - if membership_ev: - membership = membership_ev.membership - else: - membership = Membership.LEAVE + if is_guest: + return False if membership == Membership.JOIN: return True - history = state.get((EventTypes.RoomHistoryVisibility, ''), None) - if history: - visibility = history.content.get("history_visibility", "shared") - else: - visibility = "shared" + if event.type == EventTypes.RoomHistoryVisibility: + return not is_guest - if visibility == "public": - return True - elif visibility == "shared": + if visibility == "shared": return True elif visibility == "joined": return membership == Membership.JOIN @@ -86,11 +78,46 @@ class BaseHandler(object): return True - defer.returnValue([ - event - for event in events - if allowed(event, event_id_to_state[event.event_id]) - ]) + event_id_to_state = yield self.store.get_state_for_events( + frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, user_id), + ) + ) + + events_to_return = [] + for event in events: + state = event_id_to_state[event.event_id] + + membership_event = state.get((EventTypes.Member, user_id), None) + if membership_event: + membership = membership_event.membership + else: + membership = None + + visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None) + if visibility_event: + visibility = visibility_event.content.get("history_visibility", "shared") + else: + visibility = "shared" + + should_include = allowed(event, membership, visibility) + if should_include: + events_to_return.append(event) + + if (require_all_visible_for_guests + and is_guest + and len(events_to_return) < len(events)): + # This indicates that some events in the requested range were not + # visible to guest users. To be safe, we reject the entire request, + # so that we don't have to worry about interpreting visibility + # boundaries. + raise AuthError(403, "User %s does not have permission" % ( + user_id + )) + + defer.returnValue(events_to_return) def ratelimit(self, user_id): time_now = self.clock.time() @@ -154,6 +181,8 @@ class BaseHandler(object): if not suppress_auth: self.auth.check(event, auth_events=context.current_state) + yield self.maybe_kick_guest_users(event, context.current_state.values()) + if event.type == EventTypes.CanonicalAlias: # Check the alias is acually valid (at this time at least) room_alias_str = event.content.get("alias", None) @@ -170,16 +199,6 @@ class BaseHandler(object): ) ) - if ( - event.type == EventTypes.Member and - event.content["membership"] == Membership.JOIN and - third_party_invites.join_has_third_party_invite(event.content) - ): - yield third_party_invites.check_key_valid( - self.hs.get_simple_http_client(), - event - ) - federation_handler = self.hs.get_handlers().federation_handler if event.type == EventTypes.Member: @@ -271,3 +290,58 @@ class BaseHandler(object): federation_handler.handle_new_event( event, destinations=destinations, ) + + @defer.inlineCallbacks + def maybe_kick_guest_users(self, event, current_state): + # Technically this function invalidates current_state by changing it. + # Hopefully this isn't that important to the caller. + if event.type == EventTypes.GuestAccess: + guest_access = event.content.get("guest_access", "forbidden") + if guest_access != "can_join": + yield self.kick_guest_users(current_state) + + @defer.inlineCallbacks + def kick_guest_users(self, current_state): + for member_event in current_state: + try: + if member_event.type != EventTypes.Member: + continue + + if not self.hs.is_mine(UserID.from_string(member_event.state_key)): + continue + + if member_event.content["membership"] not in { + Membership.JOIN, + Membership.INVITE + }: + continue + + if ( + "kind" not in member_event.content + or member_event.content["kind"] != "guest" + ): + continue + + # We make the user choose to leave, rather than have the + # event-sender kick them. This is partially because we don't + # need to worry about power levels, and partially because guest + # users are a concept which doesn't hugely work over federation, + # and having homeservers have their own users leave keeps more + # of that decision-making and control local to the guest-having + # homeserver. + message_handler = self.hs.get_handlers().message_handler + yield message_handler.create_and_send_event( + { + "type": EventTypes.Member, + "state_key": member_event.state_key, + "content": { + "membership": Membership.LEAVE, + "kind": "guest" + }, + "room_id": member_event.room_id, + "sender": member_event.state_key + }, + ratelimit=False, + ) + except Exception as e: + logger.warn("Error kicking guest user: %s" % (e,)) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 055d395b20..1b11dbdffd 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -372,12 +372,15 @@ class AuthHandler(BaseHandler): yield self.store.add_refresh_token_to_user(user_id, refresh_token) defer.returnValue(refresh_token) - def generate_access_token(self, user_id): + def generate_access_token(self, user_id, extra_caveats=None): + extra_caveats = extra_caveats or [] macaroon = self._generate_base_macaroon(user_id) macaroon.add_first_party_caveat("type = access") now = self.hs.get_clock().time_msec() expiry = now + (60 * 60 * 1000) macaroon.add_first_party_caveat("time < %d" % (expiry,)) + for caveat in extra_caveats: + macaroon.add_first_party_caveat(caveat) return macaroon.serialize() def generate_refresh_token(self, user_id): diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 92afa35d57..0e4c0d4d06 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -100,7 +100,7 @@ class EventStreamHandler(BaseHandler): @log_function def get_stream(self, auth_user_id, pagin_config, timeout=0, as_client_event=True, affect_presence=True, - only_room_events=False): + only_room_events=False, room_id=None, is_guest=False): """Fetches the events stream for a given user. If `only_room_events` is `True` only room events will be returned. @@ -111,17 +111,6 @@ class EventStreamHandler(BaseHandler): if affect_presence: yield self.started_stream(auth_user) - rm_handler = self.hs.get_handlers().room_member_handler - - app_service = yield self.store.get_app_service_by_user_id( - auth_user.to_string() - ) - if app_service: - rooms = yield self.store.get_app_service_rooms(app_service) - room_ids = set(r.room_id for r in rooms) - else: - room_ids = yield rm_handler.get_joined_rooms_for_user(auth_user) - if timeout: # If they've set a timeout set a minimum limit. timeout = max(timeout, 500) @@ -130,9 +119,15 @@ class EventStreamHandler(BaseHandler): # thundering herds on restart. timeout = random.randint(int(timeout*0.9), int(timeout*1.1)) + if is_guest: + yield self.distributor.fire( + "user_joined_room", user=auth_user, room_id=room_id + ) + events, tokens = yield self.notifier.get_events_for( - auth_user, room_ids, pagin_config, timeout, - only_room_events=only_room_events + auth_user, pagin_config, timeout, + only_room_events=only_room_events, + is_guest=is_guest, guest_room_id=room_id ) time_now = self.clock.time_msec() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ae9d227586..c1bce07e31 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -21,6 +21,7 @@ from synapse.api.errors import ( AuthError, FederationError, StoreError, CodeMessageException, SynapseError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason +from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function @@ -39,7 +40,6 @@ from twisted.internet import defer import itertools import logging -from synapse.util import third_party_invites logger = logging.getLogger(__name__) @@ -58,6 +58,8 @@ class FederationHandler(BaseHandler): def __init__(self, hs): super(FederationHandler, self).__init__(hs) + self.hs = hs + self.distributor.observe( "user_joined_room", self._on_user_joined @@ -68,12 +70,9 @@ class FederationHandler(BaseHandler): self.store = hs.get_datastore() self.replication_layer = hs.get_replication_layer() self.state_handler = hs.get_state_handler() - # self.auth_handler = gs.get_auth_handler() self.server_name = hs.hostname self.keyring = hs.get_keyring() - self.lock_manager = hs.get_room_lock_manager() - self.replication_layer.set_handler(self) # When joining a room we need to queue any events for that room up @@ -586,7 +585,7 @@ class FederationHandler(BaseHandler): room_id, joinee, "join", - content + content, ) self.room_queues[room_id] = [] @@ -663,16 +662,12 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def on_make_join_request(self, room_id, user_id, query): + def on_make_join_request(self, room_id, user_id): """ We've received a /make_join/ request, so we create a partial join event for the room and return that. We do *not* persist or process it until the other server has signed it and sent it back. """ event_content = {"membership": Membership.JOIN} - if third_party_invites.has_join_keys(query): - event_content["third_party_invite"] = ( - third_party_invites.extract_join_keys(query) - ) builder = self.event_builder_factory.new({ "type": EventTypes.Member, @@ -688,9 +683,6 @@ class FederationHandler(BaseHandler): self.auth.check(event, auth_events=context.current_state) - if third_party_invites.join_has_third_party_invite(event.content): - third_party_invites.check_key_valid(self.hs.get_simple_http_client(), event) - defer.returnValue(event) @defer.inlineCallbacks @@ -830,8 +822,7 @@ class FederationHandler(BaseHandler): target_hosts, room_id, user_id, - "leave", - {} + "leave" ) signed_event = self._sign_event(event) @@ -850,13 +841,14 @@ class FederationHandler(BaseHandler): defer.returnValue(None) @defer.inlineCallbacks - def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, content): + def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, + content={},): origin, pdu = yield self.replication_layer.make_membership_event( target_hosts, room_id, user_id, membership, - content + content, ) logger.debug("Got response to make_%s: %s", membership, pdu) @@ -1108,8 +1100,6 @@ class FederationHandler(BaseHandler): context = yield self._prep_event( origin, event, state=state, - backfilled=backfilled, - current_state=current_state, auth_events=auth_events, ) @@ -1132,7 +1122,6 @@ class FederationHandler(BaseHandler): origin, ev_info["event"], state=ev_info.get("state"), - backfilled=backfilled, auth_events=ev_info.get("auth_events"), ) for ev_info in event_infos @@ -1219,8 +1208,7 @@ class FederationHandler(BaseHandler): defer.returnValue((event_stream_id, max_stream_id)) @defer.inlineCallbacks - def _prep_event(self, origin, event, state=None, backfilled=False, - current_state=None, auth_events=None): + def _prep_event(self, origin, event, state=None, auth_events=None): outlier = event.internal_metadata.is_outlier() context = yield self.state_handler.compute_event_context( @@ -1253,6 +1241,10 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR + if event.type == EventTypes.GuestAccess: + full_context = yield self.store.get_current_state(room_id=event.room_id) + yield self.maybe_kick_guest_users(event, full_context) + defer.returnValue(context) @defer.inlineCallbacks @@ -1649,3 +1641,75 @@ class FederationHandler(BaseHandler): }, "missing": [e.event_id for e in missing_locals], }) + + @defer.inlineCallbacks + @log_function + def exchange_third_party_invite(self, invite): + sender = invite["sender"] + room_id = invite["room_id"] + + event_dict = { + "type": EventTypes.Member, + "content": { + "membership": Membership.INVITE, + "third_party_invite": invite, + }, + "room_id": room_id, + "sender": sender, + "state_key": invite["mxid"], + } + + if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): + builder = self.event_builder_factory.new(event_dict) + EventValidator().validate_new(builder) + event, context = yield self._create_new_client_event(builder=builder) + self.auth.check(event, context.current_state) + yield self._validate_keyserver(event, auth_events=context.current_state) + member_handler = self.hs.get_handlers().room_member_handler + yield member_handler.change_membership(event, context) + else: + destinations = set([x.split(":", 1)[-1] for x in (sender, room_id)]) + yield self.replication_layer.forward_third_party_invite( + destinations, + room_id, + event_dict, + ) + + @defer.inlineCallbacks + @log_function + def on_exchange_third_party_invite_request(self, origin, room_id, event_dict): + builder = self.event_builder_factory.new(event_dict) + + event, context = yield self._create_new_client_event( + builder=builder, + ) + + self.auth.check(event, auth_events=context.current_state) + yield self._validate_keyserver(event, auth_events=context.current_state) + + returned_invite = yield self.send_invite(origin, event) + # TODO: Make sure the signatures actually are correct. + event.signatures.update(returned_invite.signatures) + member_handler = self.hs.get_handlers().room_member_handler + yield member_handler.change_membership(event, context) + + @defer.inlineCallbacks + def _validate_keyserver(self, event, auth_events): + token = event.content["third_party_invite"]["signed"]["token"] + + invite_event = auth_events.get( + (EventTypes.ThirdPartyInvite, token,) + ) + + try: + response = yield self.hs.get_simple_http_client().get_json( + invite_event.content["key_validity_url"], + {"public_key": invite_event.content["public_key"]} + ) + except Exception: + raise SynapseError( + 502, + "Third party certificate could not be checked" + ) + if "valid" not in response or not response["valid"]: + raise AuthError(403, "Third party certificate was invalid") diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 024474d5fe..14051aee99 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import SynapseError +from synapse.api.errors import SynapseError, AuthError, Codes from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator @@ -71,20 +71,20 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_messages(self, user_id=None, room_id=None, pagin_config=None, - as_client_event=True): + as_client_event=True, is_guest=False): """Get messages in a room. Args: user_id (str): The user requesting messages. room_id (str): The room they want messages from. pagin_config (synapse.api.streams.PaginationConfig): The pagination - config rules to apply, if any. + config rules to apply, if any. as_client_event (bool): True to get events in client-server format. + is_guest (bool): Whether the requesting user is a guest (as opposed + to a fully registered user). Returns: dict: Pagination API results """ - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) - data_source = self.hs.get_event_sources().sources["room"] if pagin_config.from_token: @@ -107,23 +107,27 @@ class MessageHandler(BaseHandler): source_config = pagin_config.get_source_config("room") - if member_event.membership == Membership.LEAVE: - # If they have left the room then clamp the token to be before - # they left the room - leave_token = yield self.store.get_topological_token_for_event( - member_event.event_id - ) - leave_token = RoomStreamToken.parse(leave_token) - if leave_token.topological < room_token.topological: - source_config.from_key = str(leave_token) - - if source_config.direction == "f": - if source_config.to_key is None: - source_config.to_key = str(leave_token) - else: - to_token = RoomStreamToken.parse(source_config.to_key) - if leave_token.topological < to_token.topological: + if not is_guest: + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + if member_event.membership == Membership.LEAVE: + # If they have left the room then clamp the token to be before + # they left the room. + # If they're a guest, we'll just 403 them if they're asking for + # events they can't see. + leave_token = yield self.store.get_topological_token_for_event( + member_event.event_id + ) + leave_token = RoomStreamToken.parse(leave_token) + if leave_token.topological < room_token.topological: + source_config.from_key = str(leave_token) + + if source_config.direction == "f": + if source_config.to_key is None: source_config.to_key = str(leave_token) + else: + to_token = RoomStreamToken.parse(source_config.to_key) + if leave_token.topological < to_token.topological: + source_config.to_key = str(leave_token) yield self.hs.get_handlers().federation_handler.maybe_backfill( room_id, room_token.topological @@ -146,7 +150,7 @@ class MessageHandler(BaseHandler): "end": next_token.to_string(), }) - events = yield self._filter_events_for_client(user_id, events) + events = yield self._filter_events_for_client(user_id, events, is_guest=is_guest) time_now = self.clock.time_msec() @@ -163,7 +167,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def create_and_send_event(self, event_dict, ratelimit=True, - token_id=None, txn_id=None): + token_id=None, txn_id=None, is_guest=False): """ Given a dict from a client, create and handle a new event. Creates an FrozenEvent object, filling out auth_events, prev_events, @@ -209,7 +213,7 @@ class MessageHandler(BaseHandler): if event.type == EventTypes.Member: member_handler = self.hs.get_handlers().room_member_handler - yield member_handler.change_membership(event, context) + yield member_handler.change_membership(event, context, is_guest=is_guest) else: yield self.handle_new_client_event( event=event, @@ -225,7 +229,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_room_data(self, user_id=None, room_id=None, - event_type=None, state_key=""): + event_type=None, state_key="", is_guest=False): """ Get data from a room. Args: @@ -235,23 +239,52 @@ class MessageHandler(BaseHandler): Raises: SynapseError if something went wrong. """ - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + membership, membership_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id, is_guest + ) - if member_event.membership == Membership.JOIN: + if membership == Membership.JOIN: data = yield self.state_handler.get_current_state( room_id, event_type, state_key ) - elif member_event.membership == Membership.LEAVE: + elif membership == Membership.LEAVE: key = (event_type, state_key) room_state = yield self.store.get_state_for_events( - [member_event.event_id], [key] + [membership_event_id], [key] ) - data = room_state[member_event.event_id].get(key) + data = room_state[membership_event_id].get(key) defer.returnValue(data) @defer.inlineCallbacks - def get_state_events(self, user_id, room_id): + def _check_in_room_or_world_readable(self, room_id, user_id, is_guest): + try: + # check_user_was_in_room will return the most recent membership + # event for the user if: + # * The user is a non-guest user, and was ever in the room + # * The user is a guest user, and has joined the room + # else it will throw. + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + defer.returnValue((member_event.membership, member_event.event_id)) + return + except AuthError, auth_error: + visibility = yield self.state_handler.get_current_state( + room_id, EventTypes.RoomHistoryVisibility, "" + ) + if ( + visibility and + visibility.content["history_visibility"] == "world_readable" + ): + defer.returnValue((Membership.JOIN, None)) + return + if not is_guest: + raise auth_error + raise AuthError( + 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN + ) + + @defer.inlineCallbacks + def get_state_events(self, user_id, room_id, is_guest=False): """Retrieve all state events for a given room. If the user is joined to the room then return the current state. If the user has left the room return the state events from when they left. @@ -262,15 +295,17 @@ class MessageHandler(BaseHandler): Returns: A list of dicts representing state events. [{}, {}, {}] """ - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + membership, membership_event_id = yield self._check_in_room_or_world_readable( + room_id, user_id, is_guest + ) - if member_event.membership == Membership.JOIN: + if membership == Membership.JOIN: room_state = yield self.state_handler.get_current_state(room_id) - elif member_event.membership == Membership.LEAVE: + elif membership == Membership.LEAVE: room_state = yield self.store.get_state_for_events( - [member_event.event_id], None + [membership_event_id], None ) - room_state = room_state[member_event.event_id] + room_state = room_state[membership_event_id] now = self.clock.time_msec() defer.returnValue( @@ -322,6 +357,8 @@ class MessageHandler(BaseHandler): user, pagination_config.get_source_config("receipt"), None ) + tags_by_room = yield self.store.get_tags_for_user(user_id) + public_room_ids = yield self.store.get_public_room_ids() limit = pagin_config.limit @@ -398,6 +435,15 @@ class MessageHandler(BaseHandler): serialize_event(c, time_now, as_client_event) for c in current_state.values() ] + + private_user_data = [] + tags = tags_by_room.get(event.room_id) + if tags: + private_user_data.append({ + "type": "m.tag", + "content": {"tags": tags}, + }) + d["private_user_data"] = private_user_data except: logger.exception("Failed to get snapshot") @@ -420,7 +466,7 @@ class MessageHandler(BaseHandler): defer.returnValue(ret) @defer.inlineCallbacks - def room_initial_sync(self, user_id, room_id, pagin_config=None): + def room_initial_sync(self, user_id, room_id, pagin_config=None, is_guest=False): """Capture the a snapshot of a room. If user is currently a member of the room this will be what is currently in the room. If the user left the room this will be what was in the room when they left. @@ -437,33 +483,47 @@ class MessageHandler(BaseHandler): A JSON serialisable dict with the snapshot of the room. """ - member_event = yield self.auth.check_user_was_in_room(room_id, user_id) + membership, member_event_id = yield self._check_in_room_or_world_readable( + room_id, + user_id, + is_guest + ) - if member_event.membership == Membership.JOIN: + if membership == Membership.JOIN: result = yield self._room_initial_sync_joined( - user_id, room_id, pagin_config, member_event + user_id, room_id, pagin_config, membership, is_guest ) - elif member_event.membership == Membership.LEAVE: + elif membership == Membership.LEAVE: result = yield self._room_initial_sync_parted( - user_id, room_id, pagin_config, member_event + user_id, room_id, pagin_config, membership, member_event_id, is_guest ) + + private_user_data = [] + tags = yield self.store.get_tags_for_room(user_id, room_id) + if tags: + private_user_data.append({ + "type": "m.tag", + "content": {"tags": tags}, + }) + result["private_user_data"] = private_user_data + defer.returnValue(result) @defer.inlineCallbacks def _room_initial_sync_parted(self, user_id, room_id, pagin_config, - member_event): + membership, member_event_id, is_guest): room_state = yield self.store.get_state_for_events( - [member_event.event_id], None + [member_event_id], None ) - room_state = room_state[member_event.event_id] + room_state = room_state[member_event_id] limit = pagin_config.limit if pagin_config else None if limit is None: limit = 10 stream_token = yield self.store.get_stream_token_for_event( - member_event.event_id + member_event_id ) messages, token = yield self.store.get_recent_events_for_room( @@ -473,16 +533,16 @@ class MessageHandler(BaseHandler): ) messages = yield self._filter_events_for_client( - user_id, messages + user_id, messages, is_guest=is_guest ) - start_token = StreamToken(token[0], 0, 0, 0) - end_token = StreamToken(token[1], 0, 0, 0) + start_token = StreamToken(token[0], 0, 0, 0, 0) + end_token = StreamToken(token[1], 0, 0, 0, 0) time_now = self.clock.time_msec() defer.returnValue({ - "membership": member_event.membership, + "membership": membership, "room_id": room_id, "messages": { "chunk": [serialize_event(m, time_now) for m in messages], @@ -496,7 +556,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _room_initial_sync_joined(self, user_id, room_id, pagin_config, - member_event): + membership, is_guest): current_state = yield self.state.get_current_state( room_id=room_id, ) @@ -528,12 +588,14 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_presence(): - states = yield presence_handler.get_states( - target_users=[UserID.from_string(m.user_id) for m in room_members], - auth_user=auth_user, - as_event=True, - check_auth=False, - ) + states = {} + if not is_guest: + states = yield presence_handler.get_states( + target_users=[UserID.from_string(m.user_id) for m in room_members], + auth_user=auth_user, + as_event=True, + check_auth=False, + ) defer.returnValue(states.values()) @@ -553,7 +615,7 @@ class MessageHandler(BaseHandler): ).addErrback(unwrapFirstError) messages = yield self._filter_events_for_client( - user_id, messages + user_id, messages, is_guest=is_guest, require_all_visible_for_guests=False ) start_token = now_token.copy_and_replace("room_key", token[0]) @@ -561,8 +623,7 @@ class MessageHandler(BaseHandler): time_now = self.clock.time_msec() - defer.returnValue({ - "membership": member_event.membership, + ret = { "room_id": room_id, "messages": { "chunk": [serialize_event(m, time_now) for m in messages], @@ -572,4 +633,8 @@ class MessageHandler(BaseHandler): "state": state, "presence": presence, "receipts": receipts, - }) + } + if not is_guest: + ret["membership"] = membership + + defer.returnValue(ret) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index ce60642127..aca65096fc 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -950,7 +950,8 @@ class PresenceHandler(BaseHandler): ) while len(self._remote_offline_serials) > MAX_OFFLINE_SERIALS: self._remote_offline_serials.pop() # remove the oldest - del self._user_cachemap[user] + if user in self._user_cachemap: + del self._user_cachemap[user] else: # Remove the user from remote_offline_serials now that they're # no longer offline @@ -1142,8 +1143,9 @@ class PresenceEventSource(object): @defer.inlineCallbacks @log_function - def get_new_events_for_user(self, user, from_key, limit): + def get_new_events(self, user, from_key, room_ids=None, **kwargs): from_key = int(from_key) + room_ids = room_ids or [] presence = self.hs.get_handlers().presence_handler cachemap = presence._user_cachemap @@ -1161,7 +1163,6 @@ class PresenceEventSource(object): user_ids_to_check |= set( UserID.from_string(p["observed_user_id"]) for p in presence_list ) - room_ids = yield presence.get_joined_rooms_for_user(user) for room_id in set(room_ids) & set(presence._room_serials): if presence._room_serials[room_id] > from_key: joined = yield presence.get_joined_users_for_room_id(room_id) diff --git a/synapse/handlers/private_user_data.py b/synapse/handlers/private_user_data.py new file mode 100644 index 0000000000..1abe45ed7b --- /dev/null +++ b/synapse/handlers/private_user_data.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + + +class PrivateUserDataEventSource(object): + def __init__(self, hs): + self.store = hs.get_datastore() + + def get_current_key(self, direction='f'): + return self.store.get_max_private_user_data_stream_id() + + @defer.inlineCallbacks + def get_new_events(self, user, from_key, **kwargs): + user_id = user.to_string() + last_stream_id = from_key + + current_stream_id = yield self.store.get_max_private_user_data_stream_id() + tags = yield self.store.get_updated_tags(user_id, last_stream_id) + + results = [] + for room_id, room_tags in tags.items(): + results.append({ + "type": "m.tag", + "content": {"tags": room_tags}, + "room_id": room_id, + }) + + defer.returnValue((results, current_stream_id)) + + @defer.inlineCallbacks + def get_pagination_rows(self, user, config, key): + defer.returnValue(([], config.to_id)) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index a47ae3df42..973f4d5cae 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -164,17 +164,15 @@ class ReceiptEventSource(object): self.store = hs.get_datastore() @defer.inlineCallbacks - def get_new_events_for_user(self, user, from_key, limit): + def get_new_events(self, from_key, room_ids, **kwargs): from_key = int(from_key) to_key = yield self.get_current_key() if from_key == to_key: defer.returnValue(([], to_key)) - rooms = yield self.store.get_rooms_for_user(user.to_string()) - rooms = [room.room_id for room in rooms] events = yield self.store.get_linearized_receipts_for_rooms( - rooms, + room_ids, from_key=from_key, to_key=to_key, ) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index ef4081e3fe..493a087031 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -64,7 +64,7 @@ class RegistrationHandler(BaseHandler): ) @defer.inlineCallbacks - def register(self, localpart=None, password=None): + def register(self, localpart=None, password=None, generate_token=True): """Registers a new client on the server. Args: @@ -89,7 +89,9 @@ class RegistrationHandler(BaseHandler): user = UserID(localpart, self.hs.hostname) user_id = user.to_string() - token = self.auth_handler().generate_access_token(user_id) + token = None + if generate_token: + token = self.auth_handler().generate_access_token(user_id) yield self.store.register( user_id=user_id, token=token, @@ -102,14 +104,14 @@ class RegistrationHandler(BaseHandler): attempts = 0 user_id = None token = None - while not user_id and not token: + while not user_id: try: localpart = self._generate_user_id() user = UserID(localpart, self.hs.hostname) user_id = user.to_string() yield self.check_user_id_is_valid(user_id) - - token = self.auth_handler().generate_access_token(user_id) + if generate_token: + token = self.auth_handler().generate_access_token(user_id) yield self.store.register( user_id=user_id, token=token, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 36878a6c20..3f04752581 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -38,6 +38,8 @@ import string logger = logging.getLogger(__name__) +id_server_scheme = "https://" + class RoomCreationHandler(BaseHandler): @@ -367,7 +369,7 @@ class RoomMemberHandler(BaseHandler): remotedomains.add(member.domain) @defer.inlineCallbacks - def change_membership(self, event, context, do_auth=True): + def change_membership(self, event, context, do_auth=True, is_guest=False): """ Change the membership status of a user in a room. Args: @@ -388,6 +390,20 @@ class RoomMemberHandler(BaseHandler): # if this HS is not currently in the room, i.e. we have to do the # invite/join dance. if event.membership == Membership.JOIN: + if is_guest: + guest_access = context.current_state.get( + (EventTypes.GuestAccess, ""), + None + ) + is_guest_access_allowed = ( + guest_access + and guest_access.content + and "guest_access" in guest_access.content + and guest_access.content["guest_access"] == "can_join" + ) + if not is_guest_access_allowed: + raise AuthError(403, "Guest access not allowed") + yield self._do_join(event, context, do_auth=do_auth) else: if event.membership == Membership.LEAVE: @@ -489,7 +505,7 @@ class RoomMemberHandler(BaseHandler): room_hosts, room_id, event.user_id, - event.content # FIXME To get a non-frozen dict + event.content, ) else: logger.debug("Doing normal join") @@ -581,7 +597,6 @@ class RoomMemberHandler(BaseHandler): medium, address, id_server, - display_name, token_id, txn_id ): @@ -608,7 +623,6 @@ class RoomMemberHandler(BaseHandler): else: yield self._make_and_store_3pid_invite( id_server, - display_name, medium, address, room_id, @@ -632,7 +646,7 @@ class RoomMemberHandler(BaseHandler): """ try: data = yield self.hs.get_simple_http_client().get_json( - "https://%s/_matrix/identity/api/v1/lookup" % (id_server,), + "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,), { "medium": medium, "address": address, @@ -655,8 +669,8 @@ class RoomMemberHandler(BaseHandler): raise AuthError(401, "No signature from server %s" % (server_hostname,)) for key_name, signature in data["signatures"][server_hostname].items(): key_data = yield self.hs.get_simple_http_client().get_json( - "https://%s/_matrix/identity/api/v1/pubkey/%s" % - (server_hostname, key_name,), + "%s%s/_matrix/identity/api/v1/pubkey/%s" % + (id_server_scheme, server_hostname, key_name,), ) if "public_key" not in key_data: raise AuthError(401, "No public key named %s from %s" % @@ -672,7 +686,6 @@ class RoomMemberHandler(BaseHandler): def _make_and_store_3pid_invite( self, id_server, - display_name, medium, address, room_id, @@ -680,7 +693,7 @@ class RoomMemberHandler(BaseHandler): token_id, txn_id ): - token, public_key, key_validity_url = ( + token, public_key, key_validity_url, display_name = ( yield self._ask_id_server_for_third_party_invite( id_server, medium, @@ -709,7 +722,9 @@ class RoomMemberHandler(BaseHandler): @defer.inlineCallbacks def _ask_id_server_for_third_party_invite( self, id_server, medium, address, room_id, sender): - is_url = "https://%s/_matrix/identity/api/v1/store-invite" % (id_server,) + is_url = "%s%s/_matrix/identity/api/v1/store-invite" % ( + id_server_scheme, id_server, + ) data = yield self.hs.get_simple_http_client().post_urlencoded_get_json( is_url, { @@ -722,10 +737,11 @@ class RoomMemberHandler(BaseHandler): # TODO: Check for success token = data["token"] public_key = data["public_key"] - key_validity_url = "https://%s/_matrix/identity/api/v1/pubkey/isvalid" % ( - id_server, + display_name = data["display_name"] + key_validity_url = "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % ( + id_server_scheme, id_server, ) - defer.returnValue((token, public_key, key_validity_url)) + defer.returnValue((token, public_key, key_validity_url, display_name)) class RoomListHandler(BaseHandler): @@ -750,7 +766,7 @@ class RoomListHandler(BaseHandler): class RoomContextHandler(BaseHandler): @defer.inlineCallbacks - def get_event_context(self, user, room_id, event_id, limit): + def get_event_context(self, user, room_id, event_id, limit, is_guest): """Retrieves events, pagination tokens and state around a given event in a room. @@ -774,11 +790,17 @@ class RoomContextHandler(BaseHandler): ) results["events_before"] = yield self._filter_events_for_client( - user.to_string(), results["events_before"] + user.to_string(), + results["events_before"], + is_guest=is_guest, + require_all_visible_for_guests=False ) results["events_after"] = yield self._filter_events_for_client( - user.to_string(), results["events_after"] + user.to_string(), + results["events_after"], + is_guest=is_guest, + require_all_visible_for_guests=False ) if results["events_after"]: @@ -807,7 +829,14 @@ class RoomEventSource(object): self.store = hs.get_datastore() @defer.inlineCallbacks - def get_new_events_for_user(self, user, from_key, limit): + def get_new_events( + self, + user, + from_key, + limit, + room_ids, + is_guest, + ): # We just ignore the key for now. to_key = yield self.get_current_key() @@ -827,8 +856,9 @@ class RoomEventSource(object): user_id=user.to_string(), from_key=from_key, to_key=to_key, - room_id=None, limit=limit, + room_ids=room_ids, + is_guest=is_guest, ) defer.returnValue((events, end_key)) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 2718e9482e..b7545c111f 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -22,6 +22,8 @@ from synapse.api.filtering import Filter from synapse.api.errors import SynapseError from synapse.events.utils import serialize_event +from unpaddedbase64 import decode_base64, encode_base64 + import logging @@ -34,27 +36,59 @@ class SearchHandler(BaseHandler): super(SearchHandler, self).__init__(hs) @defer.inlineCallbacks - def search(self, user, content): + def search(self, user, content, batch=None): """Performs a full text search for a user. Args: user (UserID) content (dict): Search parameters + batch (str): The next_batch parameter. Used for pagination. Returns: dict to be returned to the client with results of search """ + batch_group = None + batch_group_key = None + batch_token = None + if batch: + try: + b = decode_base64(batch) + batch_group, batch_group_key, batch_token = b.split("\n") + + assert batch_group is not None + assert batch_group_key is not None + assert batch_token is not None + except: + raise SynapseError(400, "Invalid batch") + try: - search_term = content["search_categories"]["room_events"]["search_term"] - keys = content["search_categories"]["room_events"].get("keys", [ + room_cat = content["search_categories"]["room_events"] + + # The actual thing to query in FTS + search_term = room_cat["search_term"] + + # Which "keys" to search over in FTS query + keys = room_cat.get("keys", [ "content.body", "content.name", "content.topic", ]) - filter_dict = content["search_categories"]["room_events"].get("filter", {}) - event_context = content["search_categories"]["room_events"].get( + + # Filter to apply to results + filter_dict = room_cat.get("filter", {}) + + # What to order results by (impacts whether pagination can be doen) + order_by = room_cat.get("order_by", "rank") + + # Include context around each event? + event_context = room_cat.get( "event_context", None ) + # Group results together? May allow clients to paginate within a + # group + group_by = room_cat.get("groupings", {}).get("group_by", {}) + group_keys = [g["key"] for g in group_by] + if event_context is not None: before_limit = int(event_context.get( "before_limit", 5 @@ -65,6 +99,15 @@ class SearchHandler(BaseHandler): except KeyError: raise SynapseError(400, "Invalid search query") + if order_by not in ("rank", "recent"): + raise SynapseError(400, "Invalid order by: %r" % (order_by,)) + + if set(group_keys) - {"room_id", "sender"}: + raise SynapseError( + 400, + "Invalid group by keys: %r" % (set(group_keys) - {"room_id", "sender"},) + ) + search_filter = Filter(filter_dict) # TODO: Search through left rooms too @@ -77,19 +120,130 @@ class SearchHandler(BaseHandler): room_ids = search_filter.filter_rooms(room_ids) - rank_map, event_map, _ = yield self.store.search_msgs( - room_ids, search_term, keys - ) + if batch_group == "room_id": + room_ids.intersection_update({batch_group_key}) - filtered_events = search_filter.filter(event_map.values()) + rank_map = {} # event_id -> rank of event + allowed_events = [] + room_groups = {} # Holds result of grouping by room, if applicable + sender_group = {} # Holds result of grouping by sender, if applicable - allowed_events = yield self._filter_events_for_client( - user.to_string(), filtered_events - ) + # Holds the next_batch for the entire result set if one of those exists + global_next_batch = None - allowed_events.sort(key=lambda e: -rank_map[e.event_id]) - allowed_events = allowed_events[:search_filter.limit()] + if order_by == "rank": + results = yield self.store.search_msgs( + room_ids, search_term, keys + ) + + results_map = {r["event"].event_id: r for r in results} + + rank_map.update({r["event"].event_id: r["rank"] for r in results}) + + filtered_events = search_filter.filter([r["event"] for r in results]) + + events = yield self._filter_events_for_client( + user.to_string(), filtered_events + ) + + events.sort(key=lambda e: -rank_map[e.event_id]) + allowed_events = events[:search_filter.limit()] + + for e in allowed_events: + rm = room_groups.setdefault(e.room_id, { + "results": [], + "order": rank_map[e.event_id], + }) + rm["results"].append(e.event_id) + + s = sender_group.setdefault(e.sender, { + "results": [], + "order": rank_map[e.event_id], + }) + s["results"].append(e.event_id) + + elif order_by == "recent": + # In this case we specifically loop through each room as the given + # limit applies to each room, rather than a global list. + # This is not necessarilly a good idea. + for room_id in room_ids: + room_events = [] + if batch_group == "room_id" and batch_group_key == room_id: + pagination_token = batch_token + else: + pagination_token = None + i = 0 + + # We keep looping and we keep filtering until we reach the limit + # or we run out of things. + # But only go around 5 times since otherwise synapse will be sad. + while len(room_events) < search_filter.limit() and i < 5: + i += 1 + results = yield self.store.search_room( + room_id, search_term, keys, search_filter.limit() * 2, + pagination_token=pagination_token, + ) + + results_map = {r["event"].event_id: r for r in results} + + rank_map.update({r["event"].event_id: r["rank"] for r in results}) + + filtered_events = search_filter.filter([ + r["event"] for r in results + ]) + + events = yield self._filter_events_for_client( + user.to_string(), filtered_events + ) + + room_events.extend(events) + room_events = room_events[:search_filter.limit()] + + if len(results) < search_filter.limit() * 2: + pagination_token = None + break + else: + pagination_token = results[-1]["pagination_token"] + + if room_events: + res = results_map[room_events[-1].event_id] + pagination_token = res["pagination_token"] + + group = room_groups.setdefault(room_id, {}) + if pagination_token: + next_batch = encode_base64("%s\n%s\n%s" % ( + "room_id", room_id, pagination_token + )) + group["next_batch"] = next_batch + + if batch_token: + global_next_batch = next_batch + + group["results"] = [e.event_id for e in room_events] + group["order"] = max( + e.origin_server_ts/1000 for e in room_events + if hasattr(e, "origin_server_ts") + ) + + allowed_events.extend(room_events) + + # Normalize the group orders + if room_groups: + if len(room_groups) > 1: + mx = max(g["order"] for g in room_groups.values()) + mn = min(g["order"] for g in room_groups.values()) + + for g in room_groups.values(): + g["order"] = (g["order"] - mn) * 1.0 / (mx - mn) + else: + room_groups.values()[0]["order"] = 1 + else: + # We should never get here due to the guard earlier. + raise NotImplementedError() + + # If client has asked for "context" for each event (i.e. some surrounding + # events and state), fetch that if event_context is not None: now_token = yield self.hs.get_event_sources().get_current_token() @@ -144,11 +298,22 @@ class SearchHandler(BaseHandler): logger.info("Found %d results", len(results)) + rooms_cat_res = { + "results": results, + "count": len(results) + } + + if room_groups and "room_id" in group_keys: + rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups + + if sender_group and "sender" in group_keys: + rooms_cat_res.setdefault("groups", {})["sender"] = sender_group + + if global_next_batch: + rooms_cat_res["next_batch"] = global_next_batch + defer.returnValue({ "search_categories": { - "room_events": { - "results": results, - "count": len(results) - } + "room_events": rooms_cat_res } }) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4c5a2353b2..6dc9d0fb92 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -47,10 +47,11 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ - "room_id", - "timeline", - "state", + "room_id", # str + "timeline", # TimelineBatch + "state", # dict[(str, str), FrozenEvent] "ephemeral", + "private_user_data", ])): __slots__ = [] @@ -58,13 +59,19 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ """Make the result appear empty if there are no updates. This is used to tell if room needs to be part of the sync result. """ - return bool(self.timeline or self.state or self.ephemeral) + return bool( + self.timeline + or self.state + or self.ephemeral + or self.private_user_data + ) class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ - "room_id", - "timeline", - "state", + "room_id", # str + "timeline", # TimelineBatch + "state", # dict[(str, str), FrozenEvent] + "private_user_data", ])): __slots__ = [] @@ -72,12 +79,16 @@ class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ """Make the result appear empty if there are no updates. This is used to tell if room needs to be part of the sync result. """ - return bool(self.timeline or self.state) + return bool( + self.timeline + or self.state + or self.private_user_data + ) class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ - "room_id", - "invite", + "room_id", # str + "invite", # FrozenEvent: the invite event ])): __slots__ = [] @@ -132,21 +143,8 @@ class SyncHandler(BaseHandler): def current_sync_callback(before_token, after_token): return self.current_sync_for_user(sync_config, since_token) - rm_handler = self.hs.get_handlers().room_member_handler - - app_service = yield self.store.get_app_service_by_user_id( - sync_config.user.to_string() - ) - if app_service: - rooms = yield self.store.get_app_service_rooms(app_service) - room_ids = set(r.room_id for r in rooms) - else: - room_ids = yield rm_handler.get_joined_rooms_for_user( - sync_config.user - ) - result = yield self.notifier.wait_for_events( - sync_config.user, room_ids, timeout, current_sync_callback, + sync_config.user, timeout, current_sync_callback, from_token=since_token ) defer.returnValue(result) @@ -174,7 +172,7 @@ class SyncHandler(BaseHandler): """ now_token = yield self.event_sources.get_current_token() - now_token, typing_by_room = yield self.typing_by_room( + now_token, ephemeral_by_room = yield self.ephemeral_by_room( sync_config, now_token ) @@ -197,6 +195,10 @@ class SyncHandler(BaseHandler): ) ) + tags_by_room = yield self.store.get_tags_for_user( + sync_config.user.to_string() + ) + joined = [] invited = [] archived = [] @@ -207,7 +209,8 @@ class SyncHandler(BaseHandler): sync_config=sync_config, now_token=now_token, timeline_since_token=timeline_since_token, - typing_by_room=typing_by_room + ephemeral_by_room=ephemeral_by_room, + tags_by_room=tags_by_room, ) joined.append(room_sync) elif event.membership == Membership.INVITE: @@ -226,6 +229,7 @@ class SyncHandler(BaseHandler): leave_event_id=event.event_id, leave_token=leave_token, timeline_since_token=timeline_since_token, + tags_by_room=tags_by_room, ) archived.append(room_sync) @@ -240,7 +244,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def full_state_sync_for_joined_room(self, room_id, sync_config, now_token, timeline_since_token, - typing_by_room): + ephemeral_by_room, tags_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -250,21 +254,31 @@ class SyncHandler(BaseHandler): room_id, sync_config, now_token, since_token=timeline_since_token ) - current_state = yield self.state_handler.get_current_state( - room_id - ) - current_state_events = current_state.values() + current_state = yield self.get_state_at(room_id, now_token) defer.returnValue(JoinedSyncResult( room_id=room_id, timeline=batch, - state=current_state_events, - ephemeral=typing_by_room.get(room_id, []), + state=current_state, + ephemeral=ephemeral_by_room.get(room_id, []), + private_user_data=self.private_user_data_for_room( + room_id, tags_by_room + ), )) + def private_user_data_for_room(self, room_id, tags_by_room): + private_user_data = [] + tags = tags_by_room.get(room_id) + if tags is not None: + private_user_data.append({ + "type": "m.tag", + "content": {"tags": tags}, + }) + return private_user_data + @defer.inlineCallbacks - def typing_by_room(self, sync_config, now_token, since_token=None): - """Get the typing events for each room the user is in + def ephemeral_by_room(self, sync_config, now_token, since_token=None): + """Get the ephemeral events for each room the user is in Args: sync_config (SyncConfig): The flags, filters and user for the sync. now_token (StreamToken): Where the server is currently up to. @@ -278,25 +292,56 @@ class SyncHandler(BaseHandler): typing_key = since_token.typing_key if since_token else "0" + rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) + room_ids = [room.room_id for room in rooms] + typing_source = self.event_sources.sources["typing"] - typing, typing_key = yield typing_source.get_new_events_for_user( + typing, typing_key = yield typing_source.get_new_events( user=sync_config.user, from_key=typing_key, limit=sync_config.filter.ephemeral_limit(), + room_ids=room_ids, + is_guest=False, ) now_token = now_token.copy_and_replace("typing_key", typing_key) - typing_by_room = {event["room_id"]: [event] for event in typing} + ephemeral_by_room = {} + for event in typing: - event.pop("room_id") - logger.debug("Typing %r", typing_by_room) + # we want to exclude the room_id from the event, but modifying the + # result returned by the event source is poor form (it might cache + # the object) + room_id = event["room_id"] + event_copy = {k: v for (k, v) in event.iteritems() + if k != "room_id"} + ephemeral_by_room.setdefault(room_id, []).append(event_copy) + + receipt_key = since_token.receipt_key if since_token else "0" + + receipt_source = self.event_sources.sources["receipt"] + receipts, receipt_key = yield receipt_source.get_new_events( + user=sync_config.user, + from_key=receipt_key, + limit=sync_config.filter.ephemeral_limit(), + room_ids=room_ids, + # /sync doesn't support guest access, they can't get to this point in code + is_guest=False, + ) + now_token = now_token.copy_and_replace("receipt_key", receipt_key) - defer.returnValue((now_token, typing_by_room)) + for event in receipts: + room_id = event["room_id"] + # exclude room id, as above + event_copy = {k: v for (k, v) in event.iteritems() + if k != "room_id"} + ephemeral_by_room.setdefault(room_id, []).append(event_copy) + + defer.returnValue((now_token, ephemeral_by_room)) @defer.inlineCallbacks def full_state_sync_for_archived_room(self, room_id, sync_config, leave_event_id, leave_token, - timeline_since_token): + timeline_since_token, tags_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -306,14 +351,15 @@ class SyncHandler(BaseHandler): room_id, sync_config, leave_token, since_token=timeline_since_token ) - leave_state = yield self.store.get_state_for_events( - [leave_event_id], None - ) + leave_state = yield self.store.get_state_for_event(leave_event_id) defer.returnValue(ArchivedSyncResult( room_id=room_id, timeline=batch, - state=leave_state[leave_event_id].values(), + state=leave_state, + private_user_data=self.private_user_data_for_room( + room_id, tags_by_room + ), )) @defer.inlineCallbacks @@ -325,15 +371,21 @@ class SyncHandler(BaseHandler): """ now_token = yield self.event_sources.get_current_token() + rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string()) + room_ids = [room.room_id for room in rooms] + presence_source = self.event_sources.sources["presence"] - presence, presence_key = yield presence_source.get_new_events_for_user( + presence, presence_key = yield presence_source.get_new_events( user=sync_config.user, from_key=since_token.presence_key, limit=sync_config.filter.presence_limit(), + room_ids=room_ids, + # /sync doesn't support guest access, they can't get to this point in code + is_guest=False, ) now_token = now_token.copy_and_replace("presence_key", presence_key) - now_token, typing_by_room = yield self.typing_by_room( + now_token, ephemeral_by_room = yield self.ephemeral_by_room( sync_config, now_token, since_token ) @@ -355,15 +407,22 @@ class SyncHandler(BaseHandler): sync_config.user.to_string(), from_key=since_token.room_key, to_key=now_token.room_key, - room_id=None, limit=timeline_limit + 1, ) + tags_by_room = yield self.store.get_updated_tags( + sync_config.user.to_string(), + since_token.private_user_data_key, + ) + joined = [] archived = [] if len(room_events) <= timeline_limit: # There is no gap in any of the rooms. Therefore we can just # partition the new events by room and return them. + logger.debug("Got %i events for incremental sync - not limited", + len(room_events)) + invite_events = [] leave_events = [] events_by_room_id = {} @@ -379,7 +438,12 @@ class SyncHandler(BaseHandler): for room_id in joined_room_ids: recents = events_by_room_id.get(room_id, []) - state = [event for event in recents if event.is_state()] + logger.debug("Events for room %s: %r", room_id, recents) + state = { + (event.type, event.state_key): event + for event in recents if event.is_state()} + limited = False + if recents: prev_batch = now_token.copy_and_replace( "room_key", recents[0].internal_metadata.before @@ -387,9 +451,13 @@ class SyncHandler(BaseHandler): else: prev_batch = now_token - state, limited = yield self.check_joined_room( - sync_config, room_id, state - ) + just_joined = yield self.check_joined_room(sync_config, state) + if just_joined: + logger.debug("User has just joined %s: needs full state", + room_id) + state = yield self.get_state_at(room_id, now_token) + # the timeline is inherently limited if we've just joined + limited = True room_sync = JoinedSyncResult( room_id=room_id, @@ -399,12 +467,20 @@ class SyncHandler(BaseHandler): limited=limited, ), state=state, - ephemeral=typing_by_room.get(room_id, []) + ephemeral=ephemeral_by_room.get(room_id, []), + private_user_data=self.private_user_data_for_room( + room_id, tags_by_room + ), ) + logger.debug("Result for room %s: %r", room_id, room_sync) + if room_sync: joined.append(room_sync) else: + logger.debug("Got %i events for incremental sync - hit limit", + len(room_events)) + invite_events = yield self.store.get_invites_for_user( sync_config.user.to_string() ) @@ -416,14 +492,14 @@ class SyncHandler(BaseHandler): for room_id in joined_room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( room_id, sync_config, since_token, now_token, - typing_by_room + ephemeral_by_room, tags_by_room ) if room_sync: joined.append(room_sync) for leave_event in leave_events: room_sync = yield self.incremental_sync_for_archived_room( - sync_config, leave_event, since_token + sync_config, leave_event, since_token, tags_by_room ) archived.append(room_sync) @@ -443,6 +519,9 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def load_filtered_recents(self, room_id, sync_config, now_token, since_token=None): + """ + :returns a Deferred TimelineBatch + """ limited = True recents = [] filtering_factor = 2 @@ -487,13 +566,15 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, - typing_by_room): + ephemeral_by_room, tags_by_room): """ Get the incremental delta needed to bring the client up to date for the room. Gives the client the most recent events and the changes to state. Returns: A Deferred JoinedSyncResult """ + logger.debug("Doing incremental sync for room %s between %s and %s", + room_id, since_token, now_token) # TODO(mjark): Check for redactions we might have missed. @@ -503,32 +584,30 @@ class SyncHandler(BaseHandler): logging.debug("Recents %r", batch) - # TODO(mjark): This seems racy since this isn't being passed a - # token to indicate what point in the stream this is - current_state = yield self.state_handler.get_current_state( - room_id - ) - current_state_events = current_state.values() + current_state = yield self.get_state_at(room_id, now_token) - state_at_previous_sync = yield self.get_state_at_previous_sync( - room_id, since_token=since_token + state_at_previous_sync = yield self.get_state_at( + room_id, stream_position=since_token ) - state_events_delta = yield self.compute_state_delta( + state = yield self.compute_state_delta( since_token=since_token, previous_state=state_at_previous_sync, - current_state=current_state_events, + current_state=current_state, ) - state_events_delta, _ = yield self.check_joined_room( - sync_config, room_id, state_events_delta - ) + just_joined = yield self.check_joined_room(sync_config, state) + if just_joined: + state = yield self.get_state_at(room_id, now_token) room_sync = JoinedSyncResult( room_id=room_id, timeline=batch, - state=state_events_delta, - ephemeral=typing_by_room.get(room_id, []) + state=state, + ephemeral=ephemeral_by_room.get(room_id, []), + private_user_data=self.private_user_data_for_room( + room_id, tags_by_room + ), ) logging.debug("Room sync: %r", room_sync) @@ -537,7 +616,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_for_archived_room(self, sync_config, leave_event, - since_token): + since_token, tags_by_room): """ Get the incremental delta needed to bring the client up to date for the archived room. Returns: @@ -556,16 +635,12 @@ class SyncHandler(BaseHandler): logging.debug("Recents %r", batch) - # TODO(mjark): This seems racy since this isn't being passed a - # token to indicate what point in the stream this is - leave_state = yield self.store.get_state_for_events( - [leave_event.event_id], None + state_events_at_leave = yield self.store.get_state_for_event( + leave_event.event_id ) - state_events_at_leave = leave_state[leave_event.event_id].values() - - state_at_previous_sync = yield self.get_state_at_previous_sync( - leave_event.room_id, since_token=since_token + state_at_previous_sync = yield self.get_state_at( + leave_event.room_id, stream_position=since_token ) state_events_delta = yield self.compute_state_delta( @@ -578,6 +653,9 @@ class SyncHandler(BaseHandler): room_id=leave_event.room_id, timeline=batch, state=state_events_delta, + private_user_data=self.private_user_data_for_room( + leave_event.room_id, tags_by_room + ), ) logging.debug("Room sync: %r", room_sync) @@ -585,60 +663,77 @@ class SyncHandler(BaseHandler): defer.returnValue(room_sync) @defer.inlineCallbacks - def get_state_at_previous_sync(self, room_id, since_token): - """ Get the room state at the previous sync the client made. - Returns: - A Deferred list of Events. + def get_state_after_event(self, event): + """ + Get the room state after the given event + + :param synapse.events.EventBase event: event of interest + :return: A Deferred map from ((type, state_key)->Event) + """ + state = yield self.store.get_state_for_event(event.event_id) + if event.is_state(): + state = state.copy() + state[(event.type, event.state_key)] = event + defer.returnValue(state) + + @defer.inlineCallbacks + def get_state_at(self, room_id, stream_position): + """ Get the room state at a particular stream position + :param str room_id: room for which to get state + :param StreamToken stream_position: point at which to get state + :returns: A Deferred map from ((type, state_key)->Event) """ last_events, token = yield self.store.get_recent_events_for_room( - room_id, end_token=since_token.room_key, limit=1, + room_id, end_token=stream_position.room_key, limit=1, ) if last_events: - last_event = last_events[0] - last_context = yield self.state_handler.compute_event_context( - last_event - ) - if last_event.is_state(): - state = [last_event] + last_context.current_state.values() - else: - state = last_context.current_state.values() + last_event = last_events[-1] + state = yield self.get_state_after_event(last_event) + else: - state = () + # no events in this room - so presumably no state + state = {} defer.returnValue(state) def compute_state_delta(self, since_token, previous_state, current_state): """ Works out the differnce in state between the current state and the state the client got when it last performed a sync. - Returns: - A list of events. + + :param str since_token: the point we are comparing against + :param dict[(str,str), synapse.events.FrozenEvent] previous_state: the + state to compare to + :param dict[(str,str), synapse.events.FrozenEvent] current_state: the + new state + + :returns A new event dictionary """ # TODO(mjark) Check if the state events were received by the server # after the previous sync, since we need to include those state # updates even if they occured logically before the previous event. # TODO(mjark) Check for new redactions in the state events. - previous_dict = {event.event_id: event for event in previous_state} - state_delta = [] - for event in current_state: - if event.event_id not in previous_dict: - state_delta.append(event) + + state_delta = {} + for key, event in current_state.iteritems(): + if (key not in previous_state or + previous_state[key].event_id != event.event_id): + state_delta[key] = event return state_delta - @defer.inlineCallbacks - def check_joined_room(self, sync_config, room_id, state_delta): - joined = False - limited = False - for event in state_delta: - if ( - event.type == EventTypes.Member - and event.state_key == sync_config.user.to_string() - ): - if event.content["membership"] == Membership.JOIN: - joined = True - - if joined: - res = yield self.state_handler.get_current_state(room_id) - state_delta = res.values() - limited = True + def check_joined_room(self, sync_config, state_delta): + """ + Check if the user has just joined the given room (so should + be given the full state) + + :param sync_config: + :param dict[(str,str), synapse.events.FrozenEvent] state_delta: the + difference in state since the last sync - defer.returnValue((state_delta, limited)) + :returns A deferred Tuple (state_delta, limited) + """ + join_event = state_delta.get(( + EventTypes.Member, sync_config.user.to_string()), None) + if join_event is not None: + if join_event.content["membership"] == Membership.JOIN: + return True + return False diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index d7096aab8c..2846f3e6e8 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -246,17 +246,12 @@ class TypingNotificationEventSource(object): }, } - @defer.inlineCallbacks - def get_new_events_for_user(self, user, from_key, limit): + def get_new_events(self, from_key, room_ids, **kwargs): from_key = int(from_key) handler = self.handler() - joined_room_ids = ( - yield self.room_member_handler().get_joined_rooms_for_user(user) - ) - events = [] - for room_id in joined_room_ids: + for room_id in room_ids: if room_id not in handler._room_serials: continue if handler._room_serials[room_id] <= from_key: @@ -264,7 +259,7 @@ class TypingNotificationEventSource(object): events.append(self._make_event_for(room_id)) - defer.returnValue((events, handler._latest_room_serial)) + return events, handler._latest_room_serial def get_current_key(self): return self.handler()._latest_room_serial |