diff options
-rw-r--r-- | synapse/handlers/_base.py | 380 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 5 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 19 | ||||
-rw-r--r-- | synapse/handlers/message.py | 215 | ||||
-rw-r--r-- | synapse/handlers/room.py | 7 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 4 | ||||
-rw-r--r-- | synapse/handlers/search.py | 17 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 7 | ||||
-rw-r--r-- | synapse/notifier.py | 5 | ||||
-rw-r--r-- | synapse/push/action_generator.py | 4 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 7 | ||||
-rw-r--r-- | synapse/push/mailer.py | 6 | ||||
-rw-r--r-- | synapse/replication/resource.py | 2 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 2 | ||||
-rw-r--r-- | synapse/storage/_base.py | 8 | ||||
-rw-r--r-- | synapse/storage/event_push_actions.py | 12 | ||||
-rw-r--r-- | synapse/storage/pusher.py | 13 | ||||
-rw-r--r-- | synapse/storage/receipts.py | 32 | ||||
-rw-r--r-- | synapse/storage/schema/delta/32/remove_indices.sql | 38 | ||||
-rw-r--r-- | synapse/visibility.py | 210 |
20 files changed, 558 insertions, 435 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 2c811906d9..c904c6c500 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -15,13 +15,10 @@ from twisted.internet import defer -from synapse.api.errors import LimitExceededError, SynapseError, AuthError -from synapse.crypto.event_signing import add_hashes_and_signatures +from synapse.api.errors import LimitExceededError from synapse.api.constants import Membership, EventTypes -from synapse.types import UserID, RoomAlias, Requester, get_domian_from_id -from synapse.push.action_generator import ActionGenerator +from synapse.types import UserID, Requester -from synapse.util.logcontext import PreserveLoggingContext, preserve_fn import logging @@ -29,23 +26,6 @@ import logging logger = logging.getLogger(__name__) -VISIBILITY_PRIORITY = ( - "world_readable", - "shared", - "invited", - "joined", -) - - -MEMBERSHIP_PRIORITY = ( - Membership.JOIN, - Membership.INVITE, - Membership.KNOCK, - Membership.LEAVE, - Membership.BAN, -) - - class BaseHandler(object): """ Common base class for the event handlers. @@ -65,177 +45,10 @@ class BaseHandler(object): self.clock = hs.get_clock() self.hs = hs - self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname self.event_builder_factory = hs.get_event_builder_factory() - @defer.inlineCallbacks - def filter_events_for_clients(self, user_tuples, events, event_id_to_state): - """ Returns dict of user_id -> list of events that user is allowed to - see. - - Args: - user_tuples (str, bool): (user id, is_peeking) for each user to be - checked. is_peeking should be true if: - * the user is not currently a member of the room, and: - * the user has not been a member of the room since the - given events - events ([synapse.events.EventBase]): list of events to filter - """ - forgotten = yield defer.gatherResults([ - preserve_fn(self.store.who_forgot_in_room)( - room_id, - ) - for room_id in frozenset(e.room_id for e in events) - ], consumeErrors=True) - - # Set of membership event_ids that have been forgotten - event_id_forgotten = frozenset( - row["event_id"] for rows in forgotten for row in rows - ) - - ignore_dict_content = yield self.store.get_global_account_data_by_type_for_users( - "m.ignored_user_list", user_ids=[user_id for user_id, _ in user_tuples] - ) - - # FIXME: This will explode if people upload something incorrect. - ignore_dict = { - user_id: frozenset( - content.get("ignored_users", {}).keys() if content else [] - ) - for user_id, content in ignore_dict_content.items() - } - - def allowed(event, user_id, is_peeking, ignore_list): - """ - Args: - event (synapse.events.EventBase): event to check - user_id (str) - is_peeking (bool) - ignore_list (list): list of users to ignore - """ - if not event.is_state() and event.sender in ignore_list: - return False - - state = event_id_to_state[event.event_id] - - # get the room_visibility at the time of the event. - visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None) - if visibility_event: - visibility = visibility_event.content.get("history_visibility", "shared") - else: - visibility = "shared" - - if visibility not in VISIBILITY_PRIORITY: - visibility = "shared" - - # if it was world_readable, it's easy: everyone can read it - if visibility == "world_readable": - return True - - # Always allow history visibility events on boundaries. This is done - # by setting the effective visibility to the least restrictive - # of the old vs new. - if event.type == EventTypes.RoomHistoryVisibility: - prev_content = event.unsigned.get("prev_content", {}) - prev_visibility = prev_content.get("history_visibility", None) - - if prev_visibility not in VISIBILITY_PRIORITY: - prev_visibility = "shared" - - new_priority = VISIBILITY_PRIORITY.index(visibility) - old_priority = VISIBILITY_PRIORITY.index(prev_visibility) - if old_priority < new_priority: - visibility = prev_visibility - - # likewise, if the event is the user's own membership event, use - # the 'most joined' membership - membership = None - if event.type == EventTypes.Member and event.state_key == user_id: - membership = event.content.get("membership", None) - if membership not in MEMBERSHIP_PRIORITY: - membership = "leave" - - prev_content = event.unsigned.get("prev_content", {}) - prev_membership = prev_content.get("membership", None) - if prev_membership not in MEMBERSHIP_PRIORITY: - prev_membership = "leave" - - new_priority = MEMBERSHIP_PRIORITY.index(membership) - old_priority = MEMBERSHIP_PRIORITY.index(prev_membership) - if old_priority < new_priority: - membership = prev_membership - - # otherwise, get the user's membership at the time of the event. - if membership is None: - membership_event = state.get((EventTypes.Member, user_id), None) - if membership_event: - if membership_event.event_id not in event_id_forgotten: - membership = membership_event.membership - - # if the user was a member of the room at the time of the event, - # they can see it. - if membership == Membership.JOIN: - return True - - if visibility == "joined": - # we weren't a member at the time of the event, so we can't - # see this event. - return False - - elif visibility == "invited": - # user can also see the event if they were *invited* at the time - # of the event. - return membership == Membership.INVITE - - else: - # visibility is shared: user can also see the event if they have - # become a member since the event - # - # XXX: if the user has subsequently joined and then left again, - # ideally we would share history up to the point they left. But - # we don't know when they left. - return not is_peeking - - defer.returnValue({ - user_id: [ - event - for event in events - if allowed(event, user_id, is_peeking, ignore_dict.get(user_id, [])) - ] - for user_id, is_peeking in user_tuples - }) - - @defer.inlineCallbacks - def filter_events_for_client(self, user_id, events, is_peeking=False): - """ - Check which events a user is allowed to see - - Args: - user_id(str): user id to be checked - events([synapse.events.EventBase]): list of events to be checked - is_peeking(bool): should be True if: - * the user is not currently a member of the room, and: - * the user has not been a member of the room since the given - events - - Returns: - [synapse.events.EventBase] - """ - types = ( - (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, user_id), - ) - event_id_to_state = yield self.store.get_state_for_events( - frozenset(e.event_id for e in events), - types=types - ) - res = yield self.filter_events_for_clients( - [(user_id, is_peeking)], events, event_id_to_state - ) - defer.returnValue(res.get(user_id, [])) - def ratelimit(self, requester): time_now = self.clock.time() allowed, time_allowed = self.ratelimiter.send_message( @@ -248,56 +61,6 @@ class BaseHandler(object): retry_after_ms=int(1000 * (time_allowed - time_now)), ) - @defer.inlineCallbacks - def _create_new_client_event(self, builder, prev_event_ids=None): - if prev_event_ids: - prev_events = yield self.store.add_event_hashes(prev_event_ids) - prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) - depth = prev_max_depth + 1 - else: - latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( - builder.room_id, - ) - - if latest_ret: - depth = max([d for _, _, d in latest_ret]) + 1 - else: - depth = 1 - - prev_events = [ - (event_id, prev_hashes) - for event_id, prev_hashes, _ in latest_ret - ] - - builder.prev_events = prev_events - builder.depth = depth - - state_handler = self.state_handler - - context = yield state_handler.compute_event_context(builder) - - if builder.is_state(): - builder.prev_state = yield self.store.add_event_hashes( - context.prev_state_events - ) - - yield self.auth.add_auth_events(builder, context) - - add_hashes_and_signatures( - builder, self.server_name, self.signing_key - ) - - event = builder.build() - - logger.debug( - "Created event %s with current state: %s", - event.event_id, context.current_state, - ) - - defer.returnValue( - (event, context,) - ) - def is_host_in_room(self, current_state): room_members = [ (state_key, event.membership) @@ -319,145 +82,6 @@ class BaseHandler(object): return False @defer.inlineCallbacks - def handle_new_client_event( - self, - requester, - event, - context, - ratelimit=True, - extra_users=[] - ): - # We now need to go and hit out to wherever we need to hit out to. - - if ratelimit: - self.ratelimit(requester) - - try: - self.auth.check(event, auth_events=context.current_state) - except AuthError as err: - logger.warn("Denying new event %r because %s", event, err) - raise err - - yield self.maybe_kick_guest_users(event, context.current_state.values()) - - if event.type == EventTypes.CanonicalAlias: - # Check the alias is acually valid (at this time at least) - room_alias_str = event.content.get("alias", None) - if room_alias_str: - room_alias = RoomAlias.from_string(room_alias_str) - directory_handler = self.hs.get_handlers().directory_handler - mapping = yield directory_handler.get_association(room_alias) - - if mapping["room_id"] != event.room_id: - raise SynapseError( - 400, - "Room alias %s does not point to the room" % ( - room_alias_str, - ) - ) - - federation_handler = self.hs.get_handlers().federation_handler - - if event.type == EventTypes.Member: - if event.content["membership"] == Membership.INVITE: - def is_inviter_member_event(e): - return ( - e.type == EventTypes.Member and - e.sender == event.sender - ) - - event.unsigned["invite_room_state"] = [ - { - "type": e.type, - "state_key": e.state_key, - "content": e.content, - "sender": e.sender, - } - for k, e in context.current_state.items() - if e.type in self.hs.config.room_invite_state_types - or is_inviter_member_event(e) - ] - - invitee = UserID.from_string(event.state_key) - if not self.hs.is_mine(invitee): - # TODO: Can we add signature from remote server in a nicer - # way? If we have been invited by a remote server, we need - # to get them to sign the event. - - returned_invite = yield federation_handler.send_invite( - invitee.domain, - event, - ) - - event.unsigned.pop("room_state", None) - - # TODO: Make sure the signatures actually are correct. - event.signatures.update( - returned_invite.signatures - ) - - if event.type == EventTypes.Redaction: - if self.auth.check_redaction(event, auth_events=context.current_state): - original_event = yield self.store.get_event( - event.redacts, - check_redacted=False, - get_prev_content=False, - allow_rejected=False, - allow_none=False - ) - if event.user_id != original_event.user_id: - raise AuthError( - 403, - "You don't have permission to redact events" - ) - - if event.type == EventTypes.Create and context.current_state: - raise AuthError( - 403, - "Changing the room create event is forbidden", - ) - - action_generator = ActionGenerator(self.hs) - yield action_generator.handle_push_actions_for_event( - event, context, self - ) - - (event_stream_id, max_stream_id) = yield self.store.persist_event( - event, context=context - ) - - # this intentionally does not yield: we don't care about the result - # and don't need to wait for it. - preserve_fn(self.hs.get_pusherpool().on_new_notifications)( - event_stream_id, max_stream_id - ) - - destinations = set() - for k, s in context.current_state.items(): - try: - if k[0] == EventTypes.Member: - if s.content["membership"] == Membership.JOIN: - destinations.add(get_domian_from_id(s.state_key)) - except SynapseError: - logger.warn( - "Failed to get destination from event %s", s.event_id - ) - - with PreserveLoggingContext(): - # Don't block waiting on waking up all the listeners. - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) - - # If invite, remove room_state from unsigned before sending. - event.unsigned.pop("invite_room_state", None) - - federation_handler.handle_new_event( - event, destinations=destinations, - ) - - @defer.inlineCallbacks def maybe_kick_guest_users(self, event, current_state): # Technically this function invalidates current_state by changing it. # Hopefully this isn't that important to the caller. diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 3d36d3460e..68d0d78fc6 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -615,4 +615,7 @@ class AuthHandler(BaseHandler): Returns: Whether self.hash(password) == stored_hash (bool). """ - return bcrypt.hashpw(password, stored_hash) == stored_hash + if stored_hash: + return bcrypt.hashpw(password, stored_hash) == stored_hash + else: + return False diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f38c6a8713..c21d9d4d83 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -682,7 +682,8 @@ class FederationHandler(BaseHandler): }) try: - event, context = yield self._create_new_client_event( + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event( builder=builder, ) except AuthError as e: @@ -913,7 +914,8 @@ class FederationHandler(BaseHandler): "state_key": user_id, }) - event, context = yield self._create_new_client_event( + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event( builder=builder, ) @@ -1111,7 +1113,7 @@ class FederationHandler(BaseHandler): if not event.internal_metadata.is_outlier(): action_generator = ActionGenerator(self.hs) yield action_generator.handle_push_actions_for_event( - event, context, self + event, context ) event_stream_id, max_stream_id = yield self.store.persist_event( @@ -1688,7 +1690,10 @@ class FederationHandler(BaseHandler): if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - event, context = yield self._create_new_client_event(builder=builder) + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event( + builder=builder + ) event, context = yield self.add_display_name_to_third_party_invite( event_dict, event, context @@ -1716,7 +1721,8 @@ class FederationHandler(BaseHandler): def on_exchange_third_party_invite_request(self, origin, room_id, event_dict): builder = self.event_builder_factory.new(event_dict) - event, context = yield self._create_new_client_event( + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event( builder=builder, ) @@ -1755,7 +1761,8 @@ class FederationHandler(BaseHandler): event_dict["content"]["third_party_invite"]["display_name"] = display_name builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - event, context = yield self._create_new_client_event(builder=builder) + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event(builder=builder) defer.returnValue((event, context)) @defer.inlineCallbacks diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7d9e3cf364..13154edb78 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -17,13 +17,19 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError -from synapse.streams.config import PaginationConfig +from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator +from synapse.push.action_generator import ActionGenerator +from synapse.streams.config import PaginationConfig +from synapse.types import ( + UserID, RoomAlias, RoomStreamToken, StreamToken, get_domian_from_id +) from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.types import UserID, RoomStreamToken, StreamToken +from synapse.util.logcontext import PreserveLoggingContext, preserve_fn +from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -123,7 +129,8 @@ class MessageHandler(BaseHandler): "end": next_token.to_string(), }) - events = yield self.filter_events_for_client( + events = yield filter_events_for_client( + self.store, user_id, events, is_peeking=(member_event_id is None), @@ -483,8 +490,8 @@ class MessageHandler(BaseHandler): ] ).addErrback(unwrapFirstError) - messages = yield self.filter_events_for_client( - user_id, messages + messages = yield filter_events_for_client( + self.store, user_id, messages ) start_token = now_token.copy_and_replace("room_key", token[0]) @@ -619,8 +626,8 @@ class MessageHandler(BaseHandler): end_token=stream_token ) - messages = yield self.filter_events_for_client( - user_id, messages, is_peeking=is_peeking + messages = yield filter_events_for_client( + self.store, user_id, messages, is_peeking=is_peeking ) start_token = StreamToken.START.copy_and_replace("room_key", token[0]) @@ -700,8 +707,8 @@ class MessageHandler(BaseHandler): consumeErrors=True, ).addErrback(unwrapFirstError) - messages = yield self.filter_events_for_client( - user_id, messages, is_peeking=is_peeking, + messages = yield filter_events_for_client( + self.store, user_id, messages, is_peeking=is_peeking, ) start_token = now_token.copy_and_replace("room_key", token[0]) @@ -724,3 +731,193 @@ class MessageHandler(BaseHandler): ret["membership"] = membership defer.returnValue(ret) + + @defer.inlineCallbacks + def _create_new_client_event(self, builder, prev_event_ids=None): + if prev_event_ids: + prev_events = yield self.store.add_event_hashes(prev_event_ids) + prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) + depth = prev_max_depth + 1 + else: + latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( + builder.room_id, + ) + + if latest_ret: + depth = max([d for _, _, d in latest_ret]) + 1 + else: + depth = 1 + + prev_events = [ + (event_id, prev_hashes) + for event_id, prev_hashes, _ in latest_ret + ] + + builder.prev_events = prev_events + builder.depth = depth + + state_handler = self.state_handler + + context = yield state_handler.compute_event_context(builder) + + if builder.is_state(): + builder.prev_state = yield self.store.add_event_hashes( + context.prev_state_events + ) + + yield self.auth.add_auth_events(builder, context) + + signing_key = self.hs.config.signing_key[0] + add_hashes_and_signatures( + builder, self.server_name, signing_key + ) + + event = builder.build() + + logger.debug( + "Created event %s with current state: %s", + event.event_id, context.current_state, + ) + + defer.returnValue( + (event, context,) + ) + + @defer.inlineCallbacks + def handle_new_client_event( + self, + requester, + event, + context, + ratelimit=True, + extra_users=[] + ): + # We now need to go and hit out to wherever we need to hit out to. + + if ratelimit: + self.ratelimit(requester) + + try: + self.auth.check(event, auth_events=context.current_state) + except AuthError as err: + logger.warn("Denying new event %r because %s", event, err) + raise err + + yield self.maybe_kick_guest_users(event, context.current_state.values()) + + if event.type == EventTypes.CanonicalAlias: + # Check the alias is acually valid (at this time at least) + room_alias_str = event.content.get("alias", None) + if room_alias_str: + room_alias = RoomAlias.from_string(room_alias_str) + directory_handler = self.hs.get_handlers().directory_handler + mapping = yield directory_handler.get_association(room_alias) + + if mapping["room_id"] != event.room_id: + raise SynapseError( + 400, + "Room alias %s does not point to the room" % ( + room_alias_str, + ) + ) + + federation_handler = self.hs.get_handlers().federation_handler + + if event.type == EventTypes.Member: + if event.content["membership"] == Membership.INVITE: + def is_inviter_member_event(e): + return ( + e.type == EventTypes.Member and + e.sender == event.sender + ) + + event.unsigned["invite_room_state"] = [ + { + "type": e.type, + "state_key": e.state_key, + "content": e.content, + "sender": e.sender, + } + for k, e in context.current_state.items() + if e.type in self.hs.config.room_invite_state_types + or is_inviter_member_event(e) + ] + + invitee = UserID.from_string(event.state_key) + if not self.hs.is_mine(invitee): + # TODO: Can we add signature from remote server in a nicer + # way? If we have been invited by a remote server, we need + # to get them to sign the event. + + returned_invite = yield federation_handler.send_invite( + invitee.domain, + event, + ) + + event.unsigned.pop("room_state", None) + + # TODO: Make sure the signatures actually are correct. + event.signatures.update( + returned_invite.signatures + ) + + if event.type == EventTypes.Redaction: + if self.auth.check_redaction(event, auth_events=context.current_state): + original_event = yield self.store.get_event( + event.redacts, + check_redacted=False, + get_prev_content=False, + allow_rejected=False, + allow_none=False + ) + if event.user_id != original_event.user_id: + raise AuthError( + 403, + "You don't have permission to redact events" + ) + + if event.type == EventTypes.Create and context.current_state: + raise AuthError( + 403, + "Changing the room create event is forbidden", + ) + + action_generator = ActionGenerator(self.hs) + yield action_generator.handle_push_actions_for_event( + event, context + ) + + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context + ) + + # this intentionally does not yield: we don't care about the result + # and don't need to wait for it. + preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + event_stream_id, max_stream_id + ) + + destinations = set() + for k, s in context.current_state.items(): + try: + if k[0] == EventTypes.Member: + if s.content["membership"] == Membership.JOIN: + destinations.add(get_domian_from_id(s.state_key)) + except SynapseError: + logger.warn( + "Failed to get destination from event %s", s.event_id + ) + + with PreserveLoggingContext(): + # Don't block waiting on waking up all the listeners. + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + + # If invite, remove room_state from unsigned before sending. + event.unsigned.pop("invite_room_state", None) + + federation_handler.handle_new_event( + event, destinations=destinations, + ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index fdebc9c438..3d63b3c513 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -26,6 +26,7 @@ from synapse.api.errors import AuthError, StoreError, SynapseError from synapse.util import stringutils from synapse.util.async import concurrently_execute from synapse.util.caches.response_cache import ResponseCache +from synapse.visibility import filter_events_for_client from collections import OrderedDict @@ -449,10 +450,12 @@ class RoomContextHandler(BaseHandler): now_token = yield self.hs.get_event_sources().get_current_token() def filter_evts(events): - return self.filter_events_for_client( + return filter_events_for_client( + self.store, user.to_string(), events, - is_peeking=is_guest) + is_peeking=is_guest + ) event = yield self.store.get_event(event_id, get_prev_content=True, allow_none=True) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ed2cda837f..b44e52a515 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -113,7 +113,7 @@ class RoomMemberHandler(BaseHandler): prev_event_ids=prev_event_ids, ) - yield self.handle_new_client_event( + yield msg_handler.handle_new_client_event( requester, event, context, @@ -357,7 +357,7 @@ class RoomMemberHandler(BaseHandler): # so don't really fit into the general auth process. raise AuthError(403, "Guest access not allowed") - yield self.handle_new_client_event( + yield message_handler.handle_new_client_event( requester, event, context, diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index a937e87408..df75d70fac 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -21,6 +21,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.api.filtering import Filter from synapse.api.errors import SynapseError from synapse.events.utils import serialize_event +from synapse.visibility import filter_events_for_client from unpaddedbase64 import decode_base64, encode_base64 @@ -172,8 +173,8 @@ class SearchHandler(BaseHandler): filtered_events = search_filter.filter([r["event"] for r in results]) - events = yield self.filter_events_for_client( - user.to_string(), filtered_events + events = yield filter_events_for_client( + self.store, user.to_string(), filtered_events ) events.sort(key=lambda e: -rank_map[e.event_id]) @@ -223,8 +224,8 @@ class SearchHandler(BaseHandler): r["event"] for r in results ]) - events = yield self.filter_events_for_client( - user.to_string(), filtered_events + events = yield filter_events_for_client( + self.store, user.to_string(), filtered_events ) room_events.extend(events) @@ -281,12 +282,12 @@ class SearchHandler(BaseHandler): event.room_id, event.event_id, before_limit, after_limit ) - res["events_before"] = yield self.filter_events_for_client( - user.to_string(), res["events_before"] + res["events_before"] = yield filter_events_for_client( + self.store, user.to_string(), res["events_before"] ) - res["events_after"] = yield self.filter_events_for_client( - user.to_string(), res["events_after"] + res["events_after"] = yield filter_events_for_client( + self.store, user.to_string(), res["events_after"] ) res["start"] = now_token.copy_and_replace( diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b7dcbc6b1b..921215469f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -22,6 +22,7 @@ from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user +from synapse.visibility import filter_events_for_client from twisted.internet import defer @@ -697,7 +698,8 @@ class SyncHandler(BaseHandler): if recents is not None: recents = sync_config.filter_collection.filter_room_timeline(recents) - recents = yield self.filter_events_for_client( + recents = yield filter_events_for_client( + self.store, sync_config.user.to_string(), recents, ) @@ -718,7 +720,8 @@ class SyncHandler(BaseHandler): loaded_recents = sync_config.filter_collection.filter_room_timeline( events ) - loaded_recents = yield self.filter_events_for_client( + loaded_recents = yield filter_events_for_client( + self.store, sync_config.user.to_string(), loaded_recents, ) diff --git a/synapse/notifier.py b/synapse/notifier.py index cb58dfffd4..33b79c0ec7 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -21,6 +21,7 @@ from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred from synapse.util.logcontext import PreserveLoggingContext from synapse.types import StreamToken +from synapse.visibility import filter_events_for_client import synapse.metrics from collections import namedtuple @@ -398,8 +399,8 @@ class Notifier(object): ) if name == "room": - room_member_handler = self.hs.get_handlers().room_member_handler - new_events = yield room_member_handler.filter_events_for_client( + new_events = yield filter_events_for_client( + self.store, user.to_string(), new_events, is_peeking=is_peeking, diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index a0160994b7..9b208668b6 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -37,14 +37,14 @@ class ActionGenerator: # tag (ie. we just need all the users). @defer.inlineCallbacks - def handle_push_actions_for_event(self, event, context, handler): + def handle_push_actions_for_event(self, event, context): with Measure(self.clock, "handle_push_actions_for_event"): bulk_evaluator = yield evaluator_for_event( event, self.hs, self.store ) actions_by_user = yield bulk_evaluator.action_for_event_by_user( - event, handler, context.current_state + event, context.current_state ) context.push_actions = [ diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index f97df36d80..25e13b3423 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -22,6 +22,7 @@ from .baserules import list_with_base_rules from .push_rule_evaluator import PushRuleEvaluatorForEvent from synapse.api.constants import EventTypes +from synapse.visibility import filter_events_for_clients logger = logging.getLogger(__name__) @@ -126,7 +127,7 @@ class BulkPushRuleEvaluator: self.store = store @defer.inlineCallbacks - def action_for_event_by_user(self, event, handler, current_state): + def action_for_event_by_user(self, event, current_state): actions_by_user = {} # None of these users can be peeking since this list of users comes @@ -136,8 +137,8 @@ class BulkPushRuleEvaluator: (u, False) for u in self.rules_by_user.keys() ] - filtered_by_user = yield handler.filter_events_for_clients( - user_tuples, [event], {event.event_id: current_state} + filtered_by_user = yield filter_events_for_clients( + self.store, user_tuples, [event], {event.event_id: current_state} ) room_members = yield self.store.get_users_in_room(self.room_id) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 7031fa6d55..5d60c1efcf 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -28,6 +28,7 @@ from synapse.util.presentable_names import ( from synapse.types import UserID from synapse.api.errors import StoreError from synapse.api.constants import EventTypes +from synapse.visibility import filter_events_for_client import jinja2 import bleach @@ -227,9 +228,8 @@ class Mailer(object): "messages": [], } - handler = self.hs.get_handlers().message_handler - the_events = yield handler.filter_events_for_client( - user_id, results["events_before"] + the_events = yield filter_events_for_client( + self.store, user_id, results["events_before"] ) the_events.append(notif_event) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 69ad1de863..0e983ae7fa 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -164,8 +164,8 @@ class ReplicationResource(Resource): "Replicating %d rows of %s from %s -> %s", len(stream_content["rows"]), stream_name, - stream_content["position"], request_streams.get(stream_name), + stream_content["position"], ) request.write(json.dumps(result, ensure_ascii=False)) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 7ba7a6f6e4..635febb174 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -146,12 +146,14 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("forward_ex_outliers") if stream: + self._stream_id_gen.advance(stream["position"]) for row in stream["rows"]: event_id = row[1] self._invalidate_get_event_cache(event_id) stream = result.get("backward_ex_outliers") if stream: + self._backfill_id_gen.advance(-stream["position"]) for row in stream["rows"]: event_id = row[1] self._invalidate_get_event_cache(event_id) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1e27c2c0ce..e0d7098692 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -453,7 +453,9 @@ class SQLBaseStore(object): keyvalues (dict): The unique key tables and their new values values (dict): The nonunique columns and their new values insertion_values (dict): key/values to use when inserting - Returns: A deferred + Returns: + Deferred(bool): True if a new entry was created, False if an + existing one was updated. """ return self.runInteraction( desc, @@ -498,6 +500,10 @@ class SQLBaseStore(object): ) txn.execute(sql, allvalues.values()) + return True + else: + return False + def _simple_select_one(self, table, keyvalues, retcols, allow_none=False, desc="_simple_select_one"): """Executes a SELECT query on the named table, which is expected to diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 6f316f7d24..9705db5c47 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -224,6 +224,18 @@ class EventPushActionsStore(SQLBaseStore): (room_id, event_id) ) + def _remove_push_actions_before_txn(self, txn, room_id, user_id, + topological_ordering): + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id, user_id, ) + ) + txn.execute( + "DELETE FROM event_push_actions" + " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?", + (room_id, user_id, topological_ordering,) + ) + def _action_has_highlight(actions): for action in actions: diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index d9afd7ec87..9e8e2e2964 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -156,8 +156,7 @@ class PusherStore(SQLBaseStore): profile_tag=""): with self._pushers_id_gen.get_next() as stream_id: def f(txn): - txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) - return self._simple_upsert_txn( + newly_inserted = self._simple_upsert_txn( txn, "pushers", { @@ -178,11 +177,18 @@ class PusherStore(SQLBaseStore): "id": stream_id, }, ) - defer.returnValue((yield self.runInteraction("add_pusher", f))) + if newly_inserted: + # get_users_with_pushers_in_room only cares if the user has + # at least *one* pusher. + txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) + + yield self.runInteraction("add_pusher", f) @defer.inlineCallbacks def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): def delete_pusher_txn(txn, stream_id): + txn.call_after(self.get_users_with_pushers_in_room.invalidate_all) + self._simple_delete_one_txn( txn, "pushers", @@ -194,6 +200,7 @@ class PusherStore(SQLBaseStore): {"app_id": app_id, "pushkey": pushkey, "user_id": user_id}, {"stream_id": stream_id}, ) + with self._pushers_id_gen.get_next() as stream_id: yield self.runInteraction( "delete_pusher", delete_pusher_txn, stream_id diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 935fc503d9..94be820f86 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -100,7 +100,7 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue([ev for res in results.values() for ev in res]) - @cachedInlineCallbacks(num_args=3, max_entries=5000) + @cachedInlineCallbacks(num_args=3, max_entries=5000, lru=True, tree=True) def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): """Get receipts for a single room for sending to clients. @@ -232,7 +232,7 @@ class ReceiptsStore(SQLBaseStore): self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) + txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,)) txn.call_after( self._receipts_stream_cache.entity_has_changed, @@ -244,6 +244,15 @@ class ReceiptsStore(SQLBaseStore): (user_id, room_id, receipt_type) ) + res = self._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + ) + topological_ordering = int(res["topological_ordering"]) + stream_ordering = int(res["stream_ordering"]) + # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts sql = ( @@ -256,15 +265,6 @@ class ReceiptsStore(SQLBaseStore): results = txn.fetchall() if results: - res = self._simple_select_one_txn( - txn, - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": event_id}, - ) - topological_ordering = int(res["topological_ordering"]) - stream_ordering = int(res["stream_ordering"]) - for to, so, _ in results: if int(to) > topological_ordering: return False @@ -294,6 +294,14 @@ class ReceiptsStore(SQLBaseStore): } ) + if receipt_type == "m.read": + self._remove_push_actions_before_txn( + txn, + room_id=room_id, + user_id=user_id, + topological_ordering=topological_ordering, + ) + return True @defer.inlineCallbacks @@ -367,7 +375,7 @@ class ReceiptsStore(SQLBaseStore): self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) + txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,)) self._simple_delete_txn( txn, diff --git a/synapse/storage/schema/delta/32/remove_indices.sql b/synapse/storage/schema/delta/32/remove_indices.sql new file mode 100644 index 0000000000..f859be46a6 --- /dev/null +++ b/synapse/storage/schema/delta/32/remove_indices.sql @@ -0,0 +1,38 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- The following indices are redundant, other indices are equivalent or +-- supersets +DROP INDEX IF EXISTS events_room_id; -- Prefix of events_room_stream +DROP INDEX IF EXISTS events_order; -- Prefix of events_order_topo_stream_room +DROP INDEX IF EXISTS events_topological_ordering; -- Prefix of events_order_topo_stream_room +DROP INDEX IF EXISTS events_stream_ordering; -- Duplicate of PRIMARY KEY +DROP INDEX IF EXISTS state_groups_id; -- Duplicate of PRIMARY KEY +DROP INDEX IF EXISTS event_to_state_groups_id; -- Duplicate of PRIMARY KEY +DROP INDEX IF EXISTS event_push_actions_room_id_event_id_user_id_profile_tag; -- Duplicate of UNIQUE CONSTRAINT + +DROP INDEX IF EXISTS event_destinations_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS st_extrem_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS event_content_hashes_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS event_signatures_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS event_edge_hashes_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS redactions_event_id; -- Duplicate of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS room_hosts_room_id; -- Prefix of UNIQUE CONSTRAINT + +-- The following indices were unused +DROP INDEX IF EXISTS remote_media_cache_thumbnails_media_id; +DROP INDEX IF EXISTS evauth_edges_auth_id; +DROP INDEX IF EXISTS presence_stream_state; diff --git a/synapse/visibility.py b/synapse/visibility.py new file mode 100644 index 0000000000..948ad51772 --- /dev/null +++ b/synapse/visibility.py @@ -0,0 +1,210 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 - 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.constants import Membership, EventTypes + +from synapse.util.logcontext import preserve_fn + +import logging + + +logger = logging.getLogger(__name__) + + +VISIBILITY_PRIORITY = ( + "world_readable", + "shared", + "invited", + "joined", +) + + +MEMBERSHIP_PRIORITY = ( + Membership.JOIN, + Membership.INVITE, + Membership.KNOCK, + Membership.LEAVE, + Membership.BAN, +) + + +@defer.inlineCallbacks +def filter_events_for_clients(store, user_tuples, events, event_id_to_state): + """ Returns dict of user_id -> list of events that user is allowed to + see. + + Args: + user_tuples (str, bool): (user id, is_peeking) for each user to be + checked. is_peeking should be true if: + * the user is not currently a member of the room, and: + * the user has not been a member of the room since the + given events + events ([synapse.events.EventBase]): list of events to filter + """ + forgotten = yield defer.gatherResults([ + preserve_fn(store.who_forgot_in_room)( + room_id, + ) + for room_id in frozenset(e.room_id for e in events) + ], consumeErrors=True) + + # Set of membership event_ids that have been forgotten + event_id_forgotten = frozenset( + row["event_id"] for rows in forgotten for row in rows + ) + + ignore_dict_content = yield store.get_global_account_data_by_type_for_users( + "m.ignored_user_list", user_ids=[user_id for user_id, _ in user_tuples] + ) + + # FIXME: This will explode if people upload something incorrect. + ignore_dict = { + user_id: frozenset( + content.get("ignored_users", {}).keys() if content else [] + ) + for user_id, content in ignore_dict_content.items() + } + + def allowed(event, user_id, is_peeking, ignore_list): + """ + Args: + event (synapse.events.EventBase): event to check + user_id (str) + is_peeking (bool) + ignore_list (list): list of users to ignore + """ + if not event.is_state() and event.sender in ignore_list: + return False + + state = event_id_to_state[event.event_id] + + # get the room_visibility at the time of the event. + visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None) + if visibility_event: + visibility = visibility_event.content.get("history_visibility", "shared") + else: + visibility = "shared" + + if visibility not in VISIBILITY_PRIORITY: + visibility = "shared" + + # if it was world_readable, it's easy: everyone can read it + if visibility == "world_readable": + return True + + # Always allow history visibility events on boundaries. This is done + # by setting the effective visibility to the least restrictive + # of the old vs new. + if event.type == EventTypes.RoomHistoryVisibility: + prev_content = event.unsigned.get("prev_content", {}) + prev_visibility = prev_content.get("history_visibility", None) + + if prev_visibility not in VISIBILITY_PRIORITY: + prev_visibility = "shared" + + new_priority = VISIBILITY_PRIORITY.index(visibility) + old_priority = VISIBILITY_PRIORITY.index(prev_visibility) + if old_priority < new_priority: + visibility = prev_visibility + + # likewise, if the event is the user's own membership event, use + # the 'most joined' membership + membership = None + if event.type == EventTypes.Member and event.state_key == user_id: + membership = event.content.get("membership", None) + if membership not in MEMBERSHIP_PRIORITY: + membership = "leave" + + prev_content = event.unsigned.get("prev_content", {}) + prev_membership = prev_content.get("membership", None) + if prev_membership not in MEMBERSHIP_PRIORITY: + prev_membership = "leave" + + new_priority = MEMBERSHIP_PRIORITY.index(membership) + old_priority = MEMBERSHIP_PRIORITY.index(prev_membership) + if old_priority < new_priority: + membership = prev_membership + + # otherwise, get the user's membership at the time of the event. + if membership is None: + membership_event = state.get((EventTypes.Member, user_id), None) + if membership_event: + if membership_event.event_id not in event_id_forgotten: + membership = membership_event.membership + + # if the user was a member of the room at the time of the event, + # they can see it. + if membership == Membership.JOIN: + return True + + if visibility == "joined": + # we weren't a member at the time of the event, so we can't + # see this event. + return False + + elif visibility == "invited": + # user can also see the event if they were *invited* at the time + # of the event. + return membership == Membership.INVITE + + else: + # visibility is shared: user can also see the event if they have + # become a member since the event + # + # XXX: if the user has subsequently joined and then left again, + # ideally we would share history up to the point they left. But + # we don't know when they left. + return not is_peeking + + defer.returnValue({ + user_id: [ + event + for event in events + if allowed(event, user_id, is_peeking, ignore_dict.get(user_id, [])) + ] + for user_id, is_peeking in user_tuples + }) + + +@defer.inlineCallbacks +def filter_events_for_client(store, user_id, events, is_peeking=False): + """ + Check which events a user is allowed to see + + Args: + user_id(str): user id to be checked + events([synapse.events.EventBase]): list of events to be checked + is_peeking(bool): should be True if: + * the user is not currently a member of the room, and: + * the user has not been a member of the room since the given + events + + Returns: + [synapse.events.EventBase] + """ + types = ( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, user_id), + ) + event_id_to_state = yield store.get_state_for_events( + frozenset(e.event_id for e in events), + types=types + ) + res = yield filter_events_for_clients( + store, [(user_id, is_peeking)], events, event_id_to_state + ) + defer.returnValue(res.get(user_id, [])) |