diff options
-rw-r--r-- | synapse/handlers/_base.py | 184 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 2 | ||||
-rw-r--r-- | synapse/handlers/message.py | 18 | ||||
-rw-r--r-- | synapse/handlers/room.py | 7 | ||||
-rw-r--r-- | synapse/handlers/search.py | 17 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 7 | ||||
-rw-r--r-- | synapse/notifier.py | 5 | ||||
-rw-r--r-- | synapse/push/action_generator.py | 4 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 7 | ||||
-rw-r--r-- | synapse/push/mailer.py | 6 | ||||
-rw-r--r-- | synapse/replication/resource.py | 2 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 2 | ||||
-rw-r--r-- | synapse/storage/event_push_actions.py | 12 | ||||
-rw-r--r-- | synapse/storage/receipts.py | 32 | ||||
-rw-r--r-- | synapse/storage/schema/delta/32/remove_indices.sql | 38 | ||||
-rw-r--r-- | synapse/visibility.py | 210 |
16 files changed, 325 insertions, 228 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index ac716a8118..c904c6c500 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -19,7 +19,6 @@ from synapse.api.errors import LimitExceededError from synapse.api.constants import Membership, EventTypes from synapse.types import UserID, Requester -from synapse.util.logcontext import preserve_fn import logging @@ -27,23 +26,6 @@ import logging logger = logging.getLogger(__name__) -VISIBILITY_PRIORITY = ( - "world_readable", - "shared", - "invited", - "joined", -) - - -MEMBERSHIP_PRIORITY = ( - Membership.JOIN, - Membership.INVITE, - Membership.KNOCK, - Membership.LEAVE, - Membership.BAN, -) - - class BaseHandler(object): """ Common base class for the event handlers. @@ -67,172 +49,6 @@ class BaseHandler(object): self.event_builder_factory = hs.get_event_builder_factory() - @defer.inlineCallbacks - def filter_events_for_clients(self, user_tuples, events, event_id_to_state): - """ Returns dict of user_id -> list of events that user is allowed to - see. - - Args: - user_tuples (str, bool): (user id, is_peeking) for each user to be - checked. is_peeking should be true if: - * the user is not currently a member of the room, and: - * the user has not been a member of the room since the - given events - events ([synapse.events.EventBase]): list of events to filter - """ - forgotten = yield defer.gatherResults([ - preserve_fn(self.store.who_forgot_in_room)( - room_id, - ) - for room_id in frozenset(e.room_id for e in events) - ], consumeErrors=True) - - # Set of membership event_ids that have been forgotten - event_id_forgotten = frozenset( - row["event_id"] for rows in forgotten for row in rows - ) - - ignore_dict_content = yield self.store.get_global_account_data_by_type_for_users( - "m.ignored_user_list", user_ids=[user_id for user_id, _ in user_tuples] - ) - - # FIXME: This will explode if people upload something incorrect. - ignore_dict = { - user_id: frozenset( - content.get("ignored_users", {}).keys() if content else [] - ) - for user_id, content in ignore_dict_content.items() - } - - def allowed(event, user_id, is_peeking, ignore_list): - """ - Args: - event (synapse.events.EventBase): event to check - user_id (str) - is_peeking (bool) - ignore_list (list): list of users to ignore - """ - if not event.is_state() and event.sender in ignore_list: - return False - - state = event_id_to_state[event.event_id] - - # get the room_visibility at the time of the event. - visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None) - if visibility_event: - visibility = visibility_event.content.get("history_visibility", "shared") - else: - visibility = "shared" - - if visibility not in VISIBILITY_PRIORITY: - visibility = "shared" - - # if it was world_readable, it's easy: everyone can read it - if visibility == "world_readable": - return True - - # Always allow history visibility events on boundaries. This is done - # by setting the effective visibility to the least restrictive - # of the old vs new. - if event.type == EventTypes.RoomHistoryVisibility: - prev_content = event.unsigned.get("prev_content", {}) - prev_visibility = prev_content.get("history_visibility", None) - - if prev_visibility not in VISIBILITY_PRIORITY: - prev_visibility = "shared" - - new_priority = VISIBILITY_PRIORITY.index(visibility) - old_priority = VISIBILITY_PRIORITY.index(prev_visibility) - if old_priority < new_priority: - visibility = prev_visibility - - # likewise, if the event is the user's own membership event, use - # the 'most joined' membership - membership = None - if event.type == EventTypes.Member and event.state_key == user_id: - membership = event.content.get("membership", None) - if membership not in MEMBERSHIP_PRIORITY: - membership = "leave" - - prev_content = event.unsigned.get("prev_content", {}) - prev_membership = prev_content.get("membership", None) - if prev_membership not in MEMBERSHIP_PRIORITY: - prev_membership = "leave" - - new_priority = MEMBERSHIP_PRIORITY.index(membership) - old_priority = MEMBERSHIP_PRIORITY.index(prev_membership) - if old_priority < new_priority: - membership = prev_membership - - # otherwise, get the user's membership at the time of the event. - if membership is None: - membership_event = state.get((EventTypes.Member, user_id), None) - if membership_event: - if membership_event.event_id not in event_id_forgotten: - membership = membership_event.membership - - # if the user was a member of the room at the time of the event, - # they can see it. - if membership == Membership.JOIN: - return True - - if visibility == "joined": - # we weren't a member at the time of the event, so we can't - # see this event. - return False - - elif visibility == "invited": - # user can also see the event if they were *invited* at the time - # of the event. - return membership == Membership.INVITE - - else: - # visibility is shared: user can also see the event if they have - # become a member since the event - # - # XXX: if the user has subsequently joined and then left again, - # ideally we would share history up to the point they left. But - # we don't know when they left. - return not is_peeking - - defer.returnValue({ - user_id: [ - event - for event in events - if allowed(event, user_id, is_peeking, ignore_dict.get(user_id, [])) - ] - for user_id, is_peeking in user_tuples - }) - - @defer.inlineCallbacks - def filter_events_for_client(self, user_id, events, is_peeking=False): - """ - Check which events a user is allowed to see - - Args: - user_id(str): user id to be checked - events([synapse.events.EventBase]): list of events to be checked - is_peeking(bool): should be True if: - * the user is not currently a member of the room, and: - * the user has not been a member of the room since the given - events - - Returns: - [synapse.events.EventBase] - """ - types = ( - (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, user_id), - ) - event_id_to_state = yield self.store.get_state_for_events( - frozenset(e.event_id for e in events), - types=types - ) - res = yield self.filter_events_for_clients( - [(user_id, is_peeking)], events, event_id_to_state - ) - defer.returnValue(res.get(user_id, [])) - def ratelimit(self, requester): time_now = self.clock.time() allowed, time_allowed = self.ratelimiter.send_message( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 4a65b246e6..c21d9d4d83 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1113,7 +1113,7 @@ class FederationHandler(BaseHandler): if not event.internal_metadata.is_outlier(): action_generator = ActionGenerator(self.hs) yield action_generator.handle_push_actions_for_event( - event, context, self + event, context ) event_stream_id, max_stream_id = yield self.store.persist_event( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index f9e2c98f3f..13154edb78 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -29,6 +29,7 @@ from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache from synapse.util.logcontext import PreserveLoggingContext, preserve_fn +from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -128,7 +129,8 @@ class MessageHandler(BaseHandler): "end": next_token.to_string(), }) - events = yield self.filter_events_for_client( + events = yield filter_events_for_client( + self.store, user_id, events, is_peeking=(member_event_id is None), @@ -488,8 +490,8 @@ class MessageHandler(BaseHandler): ] ).addErrback(unwrapFirstError) - messages = yield self.filter_events_for_client( - user_id, messages + messages = yield filter_events_for_client( + self.store, user_id, messages ) start_token = now_token.copy_and_replace("room_key", token[0]) @@ -624,8 +626,8 @@ class MessageHandler(BaseHandler): end_token=stream_token ) - messages = yield self.filter_events_for_client( - user_id, messages, is_peeking=is_peeking + messages = yield filter_events_for_client( + self.store, user_id, messages, is_peeking=is_peeking ) start_token = StreamToken.START.copy_and_replace("room_key", token[0]) @@ -705,8 +707,8 @@ class MessageHandler(BaseHandler): consumeErrors=True, ).addErrback(unwrapFirstError) - messages = yield self.filter_events_for_client( - user_id, messages, is_peeking=is_peeking, + messages = yield filter_events_for_client( + self.store, user_id, messages, is_peeking=is_peeking, ) start_token = now_token.copy_and_replace("room_key", token[0]) @@ -882,7 +884,7 @@ class MessageHandler(BaseHandler): action_generator = ActionGenerator(self.hs) yield action_generator.handle_push_actions_for_event( - event, context, self + event, context ) (event_stream_id, max_stream_id) = yield self.store.persist_event( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index fdebc9c438..3d63b3c513 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -26,6 +26,7 @@ from synapse.api.errors import AuthError, StoreError, SynapseError from synapse.util import stringutils from synapse.util.async import concurrently_execute from synapse.util.caches.response_cache import ResponseCache +from synapse.visibility import filter_events_for_client from collections import OrderedDict @@ -449,10 +450,12 @@ class RoomContextHandler(BaseHandler): now_token = yield self.hs.get_event_sources().get_current_token() def filter_evts(events): - return self.filter_events_for_client( + return filter_events_for_client( + self.store, user.to_string(), events, - is_peeking=is_guest) + is_peeking=is_guest + ) event = yield self.store.get_event(event_id, get_prev_content=True, allow_none=True) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index a937e87408..df75d70fac 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -21,6 +21,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.api.filtering import Filter from synapse.api.errors import SynapseError from synapse.events.utils import serialize_event +from synapse.visibility import filter_events_for_client from unpaddedbase64 import decode_base64, encode_base64 @@ -172,8 +173,8 @@ class SearchHandler(BaseHandler): filtered_events = search_filter.filter([r["event"] for r in results]) - events = yield self.filter_events_for_client( - user.to_string(), filtered_events + events = yield filter_events_for_client( + self.store, user.to_string(), filtered_events ) events.sort(key=lambda e: -rank_map[e.event_id]) @@ -223,8 +224,8 @@ class SearchHandler(BaseHandler): r["event"] for r in results ]) - events = yield self.filter_events_for_client( - user.to_string(), filtered_events + events = yield filter_events_for_client( + self.store, user.to_string(), filtered_events ) room_events.extend(events) @@ -281,12 +282,12 @@ class SearchHandler(BaseHandler): event.room_id, event.event_id, before_limit, after_limit ) - res["events_before"] = yield self.filter_events_for_client( - user.to_string(), res["events_before"] + res["events_before"] = yield filter_events_for_client( + self.store, user.to_string(), res["events_before"] ) - res["events_after"] = yield self.filter_events_for_client( - user.to_string(), res["events_after"] + res["events_after"] = yield filter_events_for_client( + self.store, user.to_string(), res["events_after"] ) res["start"] = now_token.copy_and_replace( diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b7dcbc6b1b..921215469f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -22,6 +22,7 @@ from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user +from synapse.visibility import filter_events_for_client from twisted.internet import defer @@ -697,7 +698,8 @@ class SyncHandler(BaseHandler): if recents is not None: recents = sync_config.filter_collection.filter_room_timeline(recents) - recents = yield self.filter_events_for_client( + recents = yield filter_events_for_client( + self.store, sync_config.user.to_string(), recents, ) @@ -718,7 +720,8 @@ class SyncHandler(BaseHandler): loaded_recents = sync_config.filter_collection.filter_room_timeline( events ) - loaded_recents = yield self.filter_events_for_client( + loaded_recents = yield filter_events_for_client( + self.store, sync_config.user.to_string(), loaded_recents, ) diff --git a/synapse/notifier.py b/synapse/notifier.py index cb58dfffd4..33b79c0ec7 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -21,6 +21,7 @@ from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred from synapse.util.logcontext import PreserveLoggingContext from synapse.types import StreamToken +from synapse.visibility import filter_events_for_client import synapse.metrics from collections import namedtuple @@ -398,8 +399,8 @@ class Notifier(object): ) if name == "room": - room_member_handler = self.hs.get_handlers().room_member_handler - new_events = yield room_member_handler.filter_events_for_client( + new_events = yield filter_events_for_client( + self.store, user.to_string(), new_events, is_peeking=is_peeking, diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index a0160994b7..9b208668b6 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -37,14 +37,14 @@ class ActionGenerator: # tag (ie. we just need all the users). @defer.inlineCallbacks - def handle_push_actions_for_event(self, event, context, handler): + def handle_push_actions_for_event(self, event, context): with Measure(self.clock, "handle_push_actions_for_event"): bulk_evaluator = yield evaluator_for_event( event, self.hs, self.store ) actions_by_user = yield bulk_evaluator.action_for_event_by_user( - event, handler, context.current_state + event, context.current_state ) context.push_actions = [ diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index f97df36d80..25e13b3423 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -22,6 +22,7 @@ from .baserules import list_with_base_rules from .push_rule_evaluator import PushRuleEvaluatorForEvent from synapse.api.constants import EventTypes +from synapse.visibility import filter_events_for_clients logger = logging.getLogger(__name__) @@ -126,7 +127,7 @@ class BulkPushRuleEvaluator: self.store = store @defer.inlineCallbacks - def action_for_event_by_user(self, event, handler, current_state): + def action_for_event_by_user(self, event, current_state): actions_by_user = {} # None of these users can be peeking since this list of users comes @@ -136,8 +137,8 @@ class BulkPushRuleEvaluator: (u, False) for u in self.rules_by_user.keys() ] - filtered_by_user = yield handler.filter_events_for_clients( - user_tuples, [event], {event.event_id: current_state} + filtered_by_user = yield filter_events_for_clients( + self.store, user_tuples, [event], {event.event_id: current_state} ) room_members = yield self.store.get_users_in_room(self.room_id) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 7031fa6d55..5d60c1efcf 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -28,6 +28,7 @@ from synapse.util.presentable_names import ( from synapse.types import UserID from synapse.api.errors import StoreError from synapse.api.constants import EventTypes +from synapse.visibility import filter_events_for_client import jinja2 import bleach @@ -227,9 +228,8 @@ class Mailer(object): "messages": [], } - handler = self.hs.get_handlers().message_handler - the_events = yield handler.filter_events_for_client( - user_id, results["events_before"] + the_events = yield filter_events_for_client( + self.store, user_id, results["events_before"] ) the_events.append(notif_event) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 69ad1de863..0e983ae7fa 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -164,8 +164,8 @@ class ReplicationResource(Resource): "Replicating %d rows of %s from %s -> %s", len(stream_content["rows"]), stream_name, - stream_content["position"], request_streams.get(stream_name), + stream_content["position"], ) request.write(json.dumps(result, ensure_ascii=False)) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 7ba7a6f6e4..635febb174 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -146,12 +146,14 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("forward_ex_outliers") if stream: + self._stream_id_gen.advance(stream["position"]) for row in stream["rows"]: event_id = row[1] self._invalidate_get_event_cache(event_id) stream = result.get("backward_ex_outliers") if stream: + self._backfill_id_gen.advance(-stream["position"]) for row in stream["rows"]: event_id = row[1] self._invalidate_get_event_cache(event_id) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 6f316f7d24..9705db5c47 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -224,6 +224,18 @@ class EventPushActionsStore(SQLBaseStore): (room_id, event_id) ) + def _remove_push_actions_before_txn(self, txn, room_id, user_id, + topological_ordering): + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id, user_id, ) + ) + txn.execute( + "DELETE FROM event_push_actions" + " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?", + (room_id, user_id, topological_ordering,) + ) + def _action_has_highlight(actions): for action in actions: diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 935fc503d9..94be820f86 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -100,7 +100,7 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue([ev for res in results.values() for ev in res]) - @cachedInlineCallbacks(num_args=3, max_entries=5000) + @cachedInlineCallbacks(num_args=3, max_entries=5000, lru=True, tree=True) def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): """Get receipts for a single room for sending to clients. @@ -232,7 +232,7 @@ class ReceiptsStore(SQLBaseStore): self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) + txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,)) txn.call_after( self._receipts_stream_cache.entity_has_changed, @@ -244,6 +244,15 @@ class ReceiptsStore(SQLBaseStore): (user_id, room_id, receipt_type) ) + res = self._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + ) + topological_ordering = int(res["topological_ordering"]) + stream_ordering = int(res["stream_ordering"]) + # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts sql = ( @@ -256,15 +265,6 @@ class ReceiptsStore(SQLBaseStore): results = txn.fetchall() if results: - res = self._simple_select_one_txn( - txn, - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": event_id}, - ) - topological_ordering = int(res["topological_ordering"]) - stream_ordering = int(res["stream_ordering"]) - for to, so, _ in results: if int(to) > topological_ordering: return False @@ -294,6 +294,14 @@ class ReceiptsStore(SQLBaseStore): } ) + if receipt_type == "m.read": + self._remove_push_actions_before_txn( + txn, + room_id=room_id, + user_id=user_id, + topological_ordering=topological_ordering, + ) + return True @defer.inlineCallbacks @@ -367,7 +375,7 @@ class ReceiptsStore(SQLBaseStore): self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) + txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,)) self._simple_delete_txn( txn, diff --git a/synapse/storage/schema/delta/32/remove_indices.sql b/synapse/storage/schema/delta/32/remove_indices.sql new file mode 100644 index 0000000000..f859be46a6 --- /dev/null +++ b/synapse/storage/schema/delta/32/remove_indices.sql @@ -0,0 +1,38 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- The following indices are redundant, other indices are equivalent or +-- supersets +DROP INDEX IF EXISTS events_room_id; -- Prefix of events_room_stream +DROP INDEX IF EXISTS events_order; -- Prefix of events_order_topo_stream_room +DROP INDEX IF EXISTS events_topological_ordering; -- Prefix of events_order_topo_stream_room +DROP INDEX IF EXISTS events_stream_ordering; -- Duplicate of PRIMARY KEY +DROP INDEX IF EXISTS state_groups_id; -- Duplicate of PRIMARY KEY +DROP INDEX IF EXISTS event_to_state_groups_id; -- Duplicate of PRIMARY KEY +DROP INDEX IF EXISTS event_push_actions_room_id_event_id_user_id_profile_tag; -- Duplicate of UNIQUE CONSTRAINT + +DROP INDEX IF EXISTS event_destinations_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS st_extrem_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS event_content_hashes_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS event_signatures_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS event_edge_hashes_id; -- Prefix of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS redactions_event_id; -- Duplicate of UNIQUE CONSTRAINT +DROP INDEX IF EXISTS room_hosts_room_id; -- Prefix of UNIQUE CONSTRAINT + +-- The following indices were unused +DROP INDEX IF EXISTS remote_media_cache_thumbnails_media_id; +DROP INDEX IF EXISTS evauth_edges_auth_id; +DROP INDEX IF EXISTS presence_stream_state; diff --git a/synapse/visibility.py b/synapse/visibility.py new file mode 100644 index 0000000000..948ad51772 --- /dev/null +++ b/synapse/visibility.py @@ -0,0 +1,210 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 - 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.constants import Membership, EventTypes + +from synapse.util.logcontext import preserve_fn + +import logging + + +logger = logging.getLogger(__name__) + + +VISIBILITY_PRIORITY = ( + "world_readable", + "shared", + "invited", + "joined", +) + + +MEMBERSHIP_PRIORITY = ( + Membership.JOIN, + Membership.INVITE, + Membership.KNOCK, + Membership.LEAVE, + Membership.BAN, +) + + +@defer.inlineCallbacks +def filter_events_for_clients(store, user_tuples, events, event_id_to_state): + """ Returns dict of user_id -> list of events that user is allowed to + see. + + Args: + user_tuples (str, bool): (user id, is_peeking) for each user to be + checked. is_peeking should be true if: + * the user is not currently a member of the room, and: + * the user has not been a member of the room since the + given events + events ([synapse.events.EventBase]): list of events to filter + """ + forgotten = yield defer.gatherResults([ + preserve_fn(store.who_forgot_in_room)( + room_id, + ) + for room_id in frozenset(e.room_id for e in events) + ], consumeErrors=True) + + # Set of membership event_ids that have been forgotten + event_id_forgotten = frozenset( + row["event_id"] for rows in forgotten for row in rows + ) + + ignore_dict_content = yield store.get_global_account_data_by_type_for_users( + "m.ignored_user_list", user_ids=[user_id for user_id, _ in user_tuples] + ) + + # FIXME: This will explode if people upload something incorrect. + ignore_dict = { + user_id: frozenset( + content.get("ignored_users", {}).keys() if content else [] + ) + for user_id, content in ignore_dict_content.items() + } + + def allowed(event, user_id, is_peeking, ignore_list): + """ + Args: + event (synapse.events.EventBase): event to check + user_id (str) + is_peeking (bool) + ignore_list (list): list of users to ignore + """ + if not event.is_state() and event.sender in ignore_list: + return False + + state = event_id_to_state[event.event_id] + + # get the room_visibility at the time of the event. + visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None) + if visibility_event: + visibility = visibility_event.content.get("history_visibility", "shared") + else: + visibility = "shared" + + if visibility not in VISIBILITY_PRIORITY: + visibility = "shared" + + # if it was world_readable, it's easy: everyone can read it + if visibility == "world_readable": + return True + + # Always allow history visibility events on boundaries. This is done + # by setting the effective visibility to the least restrictive + # of the old vs new. + if event.type == EventTypes.RoomHistoryVisibility: + prev_content = event.unsigned.get("prev_content", {}) + prev_visibility = prev_content.get("history_visibility", None) + + if prev_visibility not in VISIBILITY_PRIORITY: + prev_visibility = "shared" + + new_priority = VISIBILITY_PRIORITY.index(visibility) + old_priority = VISIBILITY_PRIORITY.index(prev_visibility) + if old_priority < new_priority: + visibility = prev_visibility + + # likewise, if the event is the user's own membership event, use + # the 'most joined' membership + membership = None + if event.type == EventTypes.Member and event.state_key == user_id: + membership = event.content.get("membership", None) + if membership not in MEMBERSHIP_PRIORITY: + membership = "leave" + + prev_content = event.unsigned.get("prev_content", {}) + prev_membership = prev_content.get("membership", None) + if prev_membership not in MEMBERSHIP_PRIORITY: + prev_membership = "leave" + + new_priority = MEMBERSHIP_PRIORITY.index(membership) + old_priority = MEMBERSHIP_PRIORITY.index(prev_membership) + if old_priority < new_priority: + membership = prev_membership + + # otherwise, get the user's membership at the time of the event. + if membership is None: + membership_event = state.get((EventTypes.Member, user_id), None) + if membership_event: + if membership_event.event_id not in event_id_forgotten: + membership = membership_event.membership + + # if the user was a member of the room at the time of the event, + # they can see it. + if membership == Membership.JOIN: + return True + + if visibility == "joined": + # we weren't a member at the time of the event, so we can't + # see this event. + return False + + elif visibility == "invited": + # user can also see the event if they were *invited* at the time + # of the event. + return membership == Membership.INVITE + + else: + # visibility is shared: user can also see the event if they have + # become a member since the event + # + # XXX: if the user has subsequently joined and then left again, + # ideally we would share history up to the point they left. But + # we don't know when they left. + return not is_peeking + + defer.returnValue({ + user_id: [ + event + for event in events + if allowed(event, user_id, is_peeking, ignore_dict.get(user_id, [])) + ] + for user_id, is_peeking in user_tuples + }) + + +@defer.inlineCallbacks +def filter_events_for_client(store, user_id, events, is_peeking=False): + """ + Check which events a user is allowed to see + + Args: + user_id(str): user id to be checked + events([synapse.events.EventBase]): list of events to be checked + is_peeking(bool): should be True if: + * the user is not currently a member of the room, and: + * the user has not been a member of the room since the given + events + + Returns: + [synapse.events.EventBase] + """ + types = ( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, user_id), + ) + event_id_to_state = yield store.get_state_for_events( + frozenset(e.event_id for e in events), + types=types + ) + res = yield filter_events_for_clients( + store, [(user_id, is_peeking)], events, event_id_to_state + ) + defer.returnValue(res.get(user_id, [])) |