summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/_base.py184
-rw-r--r--synapse/handlers/federation.py2
-rw-r--r--synapse/handlers/message.py18
-rw-r--r--synapse/handlers/room.py7
-rw-r--r--synapse/handlers/search.py17
-rw-r--r--synapse/handlers/sync.py7
-rw-r--r--synapse/notifier.py5
-rw-r--r--synapse/push/action_generator.py4
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py7
-rw-r--r--synapse/push/mailer.py6
-rw-r--r--synapse/replication/resource.py2
-rw-r--r--synapse/replication/slave/storage/events.py2
-rw-r--r--synapse/storage/event_push_actions.py12
-rw-r--r--synapse/storage/receipts.py32
-rw-r--r--synapse/storage/schema/delta/32/remove_indices.sql38
-rw-r--r--synapse/visibility.py210
16 files changed, 325 insertions, 228 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index ac716a8118..c904c6c500 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -19,7 +19,6 @@ from synapse.api.errors import LimitExceededError
 from synapse.api.constants import Membership, EventTypes
 from synapse.types import UserID, Requester
 
-from synapse.util.logcontext import preserve_fn
 
 import logging
 
@@ -27,23 +26,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-VISIBILITY_PRIORITY = (
-    "world_readable",
-    "shared",
-    "invited",
-    "joined",
-)
-
-
-MEMBERSHIP_PRIORITY = (
-    Membership.JOIN,
-    Membership.INVITE,
-    Membership.KNOCK,
-    Membership.LEAVE,
-    Membership.BAN,
-)
-
-
 class BaseHandler(object):
     """
     Common base class for the event handlers.
@@ -67,172 +49,6 @@ class BaseHandler(object):
 
         self.event_builder_factory = hs.get_event_builder_factory()
 
-    @defer.inlineCallbacks
-    def filter_events_for_clients(self, user_tuples, events, event_id_to_state):
-        """ Returns dict of user_id -> list of events that user is allowed to
-        see.
-
-        Args:
-            user_tuples (str, bool): (user id, is_peeking) for each user to be
-                checked. is_peeking should be true if:
-                * the user is not currently a member of the room, and:
-                * the user has not been a member of the room since the
-                given events
-            events ([synapse.events.EventBase]): list of events to filter
-        """
-        forgotten = yield defer.gatherResults([
-            preserve_fn(self.store.who_forgot_in_room)(
-                room_id,
-            )
-            for room_id in frozenset(e.room_id for e in events)
-        ], consumeErrors=True)
-
-        # Set of membership event_ids that have been forgotten
-        event_id_forgotten = frozenset(
-            row["event_id"] for rows in forgotten for row in rows
-        )
-
-        ignore_dict_content = yield self.store.get_global_account_data_by_type_for_users(
-            "m.ignored_user_list", user_ids=[user_id for user_id, _ in user_tuples]
-        )
-
-        # FIXME: This will explode if people upload something incorrect.
-        ignore_dict = {
-            user_id: frozenset(
-                content.get("ignored_users", {}).keys() if content else []
-            )
-            for user_id, content in ignore_dict_content.items()
-        }
-
-        def allowed(event, user_id, is_peeking, ignore_list):
-            """
-            Args:
-                event (synapse.events.EventBase): event to check
-                user_id (str)
-                is_peeking (bool)
-                ignore_list (list): list of users to ignore
-            """
-            if not event.is_state() and event.sender in ignore_list:
-                return False
-
-            state = event_id_to_state[event.event_id]
-
-            # get the room_visibility at the time of the event.
-            visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
-            if visibility_event:
-                visibility = visibility_event.content.get("history_visibility", "shared")
-            else:
-                visibility = "shared"
-
-            if visibility not in VISIBILITY_PRIORITY:
-                visibility = "shared"
-
-            # if it was world_readable, it's easy: everyone can read it
-            if visibility == "world_readable":
-                return True
-
-            # Always allow history visibility events on boundaries. This is done
-            # by setting the effective visibility to the least restrictive
-            # of the old vs new.
-            if event.type == EventTypes.RoomHistoryVisibility:
-                prev_content = event.unsigned.get("prev_content", {})
-                prev_visibility = prev_content.get("history_visibility", None)
-
-                if prev_visibility not in VISIBILITY_PRIORITY:
-                    prev_visibility = "shared"
-
-                new_priority = VISIBILITY_PRIORITY.index(visibility)
-                old_priority = VISIBILITY_PRIORITY.index(prev_visibility)
-                if old_priority < new_priority:
-                    visibility = prev_visibility
-
-            # likewise, if the event is the user's own membership event, use
-            # the 'most joined' membership
-            membership = None
-            if event.type == EventTypes.Member and event.state_key == user_id:
-                membership = event.content.get("membership", None)
-                if membership not in MEMBERSHIP_PRIORITY:
-                    membership = "leave"
-
-                prev_content = event.unsigned.get("prev_content", {})
-                prev_membership = prev_content.get("membership", None)
-                if prev_membership not in MEMBERSHIP_PRIORITY:
-                    prev_membership = "leave"
-
-                new_priority = MEMBERSHIP_PRIORITY.index(membership)
-                old_priority = MEMBERSHIP_PRIORITY.index(prev_membership)
-                if old_priority < new_priority:
-                    membership = prev_membership
-
-            # otherwise, get the user's membership at the time of the event.
-            if membership is None:
-                membership_event = state.get((EventTypes.Member, user_id), None)
-                if membership_event:
-                    if membership_event.event_id not in event_id_forgotten:
-                        membership = membership_event.membership
-
-            # if the user was a member of the room at the time of the event,
-            # they can see it.
-            if membership == Membership.JOIN:
-                return True
-
-            if visibility == "joined":
-                # we weren't a member at the time of the event, so we can't
-                # see this event.
-                return False
-
-            elif visibility == "invited":
-                # user can also see the event if they were *invited* at the time
-                # of the event.
-                return membership == Membership.INVITE
-
-            else:
-                # visibility is shared: user can also see the event if they have
-                # become a member since the event
-                #
-                # XXX: if the user has subsequently joined and then left again,
-                # ideally we would share history up to the point they left. But
-                # we don't know when they left.
-                return not is_peeking
-
-        defer.returnValue({
-            user_id: [
-                event
-                for event in events
-                if allowed(event, user_id, is_peeking, ignore_dict.get(user_id, []))
-            ]
-            for user_id, is_peeking in user_tuples
-        })
-
-    @defer.inlineCallbacks
-    def filter_events_for_client(self, user_id, events, is_peeking=False):
-        """
-        Check which events a user is allowed to see
-
-        Args:
-            user_id(str): user id to be checked
-            events([synapse.events.EventBase]): list of events to be checked
-            is_peeking(bool): should be True if:
-              * the user is not currently a member of the room, and:
-              * the user has not been a member of the room since the given
-                events
-
-        Returns:
-            [synapse.events.EventBase]
-        """
-        types = (
-            (EventTypes.RoomHistoryVisibility, ""),
-            (EventTypes.Member, user_id),
-        )
-        event_id_to_state = yield self.store.get_state_for_events(
-            frozenset(e.event_id for e in events),
-            types=types
-        )
-        res = yield self.filter_events_for_clients(
-            [(user_id, is_peeking)], events, event_id_to_state
-        )
-        defer.returnValue(res.get(user_id, []))
-
     def ratelimit(self, requester):
         time_now = self.clock.time()
         allowed, time_allowed = self.ratelimiter.send_message(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 4a65b246e6..c21d9d4d83 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1113,7 +1113,7 @@ class FederationHandler(BaseHandler):
         if not event.internal_metadata.is_outlier():
             action_generator = ActionGenerator(self.hs)
             yield action_generator.handle_push_actions_for_event(
-                event, context, self
+                event, context
             )
 
         event_stream_id, max_stream_id = yield self.store.persist_event(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index f9e2c98f3f..13154edb78 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -29,6 +29,7 @@ from synapse.util import unwrapFirstError
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.snapshot_cache import SnapshotCache
 from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
@@ -128,7 +129,8 @@ class MessageHandler(BaseHandler):
                 "end": next_token.to_string(),
             })
 
-        events = yield self.filter_events_for_client(
+        events = yield filter_events_for_client(
+            self.store,
             user_id,
             events,
             is_peeking=(member_event_id is None),
@@ -488,8 +490,8 @@ class MessageHandler(BaseHandler):
                     ]
                 ).addErrback(unwrapFirstError)
 
-                messages = yield self.filter_events_for_client(
-                    user_id, messages
+                messages = yield filter_events_for_client(
+                    self.store, user_id, messages
                 )
 
                 start_token = now_token.copy_and_replace("room_key", token[0])
@@ -624,8 +626,8 @@ class MessageHandler(BaseHandler):
             end_token=stream_token
         )
 
-        messages = yield self.filter_events_for_client(
-            user_id, messages, is_peeking=is_peeking
+        messages = yield filter_events_for_client(
+            self.store, user_id, messages, is_peeking=is_peeking
         )
 
         start_token = StreamToken.START.copy_and_replace("room_key", token[0])
@@ -705,8 +707,8 @@ class MessageHandler(BaseHandler):
             consumeErrors=True,
         ).addErrback(unwrapFirstError)
 
-        messages = yield self.filter_events_for_client(
-            user_id, messages, is_peeking=is_peeking,
+        messages = yield filter_events_for_client(
+            self.store, user_id, messages, is_peeking=is_peeking,
         )
 
         start_token = now_token.copy_and_replace("room_key", token[0])
@@ -882,7 +884,7 @@ class MessageHandler(BaseHandler):
 
         action_generator = ActionGenerator(self.hs)
         yield action_generator.handle_push_actions_for_event(
-            event, context, self
+            event, context
         )
 
         (event_stream_id, max_stream_id) = yield self.store.persist_event(
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index fdebc9c438..3d63b3c513 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -26,6 +26,7 @@ from synapse.api.errors import AuthError, StoreError, SynapseError
 from synapse.util import stringutils
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
+from synapse.visibility import filter_events_for_client
 
 from collections import OrderedDict
 
@@ -449,10 +450,12 @@ class RoomContextHandler(BaseHandler):
         now_token = yield self.hs.get_event_sources().get_current_token()
 
         def filter_evts(events):
-            return self.filter_events_for_client(
+            return filter_events_for_client(
+                self.store,
                 user.to_string(),
                 events,
-                is_peeking=is_guest)
+                is_peeking=is_guest
+            )
 
         event = yield self.store.get_event(event_id, get_prev_content=True,
                                            allow_none=True)
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index a937e87408..df75d70fac 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -21,6 +21,7 @@ from synapse.api.constants import Membership, EventTypes
 from synapse.api.filtering import Filter
 from synapse.api.errors import SynapseError
 from synapse.events.utils import serialize_event
+from synapse.visibility import filter_events_for_client
 
 from unpaddedbase64 import decode_base64, encode_base64
 
@@ -172,8 +173,8 @@ class SearchHandler(BaseHandler):
 
             filtered_events = search_filter.filter([r["event"] for r in results])
 
-            events = yield self.filter_events_for_client(
-                user.to_string(), filtered_events
+            events = yield filter_events_for_client(
+                self.store, user.to_string(), filtered_events
             )
 
             events.sort(key=lambda e: -rank_map[e.event_id])
@@ -223,8 +224,8 @@ class SearchHandler(BaseHandler):
                     r["event"] for r in results
                 ])
 
-                events = yield self.filter_events_for_client(
-                    user.to_string(), filtered_events
+                events = yield filter_events_for_client(
+                    self.store, user.to_string(), filtered_events
                 )
 
                 room_events.extend(events)
@@ -281,12 +282,12 @@ class SearchHandler(BaseHandler):
                     event.room_id, event.event_id, before_limit, after_limit
                 )
 
-                res["events_before"] = yield self.filter_events_for_client(
-                    user.to_string(), res["events_before"]
+                res["events_before"] = yield filter_events_for_client(
+                    self.store, user.to_string(), res["events_before"]
                 )
 
-                res["events_after"] = yield self.filter_events_for_client(
-                    user.to_string(), res["events_after"]
+                res["events_after"] = yield filter_events_for_client(
+                    self.store, user.to_string(), res["events_after"]
                 )
 
                 res["start"] = now_token.copy_and_replace(
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b7dcbc6b1b..921215469f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -22,6 +22,7 @@ from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
+from synapse.visibility import filter_events_for_client
 
 from twisted.internet import defer
 
@@ -697,7 +698,8 @@ class SyncHandler(BaseHandler):
 
             if recents is not None:
                 recents = sync_config.filter_collection.filter_room_timeline(recents)
-                recents = yield self.filter_events_for_client(
+                recents = yield filter_events_for_client(
+                    self.store,
                     sync_config.user.to_string(),
                     recents,
                 )
@@ -718,7 +720,8 @@ class SyncHandler(BaseHandler):
                 loaded_recents = sync_config.filter_collection.filter_room_timeline(
                     events
                 )
-                loaded_recents = yield self.filter_events_for_client(
+                loaded_recents = yield filter_events_for_client(
+                    self.store,
                     sync_config.user.to_string(),
                     loaded_recents,
                 )
diff --git a/synapse/notifier.py b/synapse/notifier.py
index cb58dfffd4..33b79c0ec7 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -21,6 +21,7 @@ from synapse.util.logutils import log_function
 from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.types import StreamToken
+from synapse.visibility import filter_events_for_client
 import synapse.metrics
 
 from collections import namedtuple
@@ -398,8 +399,8 @@ class Notifier(object):
                 )
 
                 if name == "room":
-                    room_member_handler = self.hs.get_handlers().room_member_handler
-                    new_events = yield room_member_handler.filter_events_for_client(
+                    new_events = yield filter_events_for_client(
+                        self.store,
                         user.to_string(),
                         new_events,
                         is_peeking=is_peeking,
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index a0160994b7..9b208668b6 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -37,14 +37,14 @@ class ActionGenerator:
         # tag (ie. we just need all the users).
 
     @defer.inlineCallbacks
-    def handle_push_actions_for_event(self, event, context, handler):
+    def handle_push_actions_for_event(self, event, context):
         with Measure(self.clock, "handle_push_actions_for_event"):
             bulk_evaluator = yield evaluator_for_event(
                 event, self.hs, self.store
             )
 
             actions_by_user = yield bulk_evaluator.action_for_event_by_user(
-                event, handler, context.current_state
+                event, context.current_state
             )
 
             context.push_actions = [
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index f97df36d80..25e13b3423 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -22,6 +22,7 @@ from .baserules import list_with_base_rules
 from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
 from synapse.api.constants import EventTypes
+from synapse.visibility import filter_events_for_clients
 
 
 logger = logging.getLogger(__name__)
@@ -126,7 +127,7 @@ class BulkPushRuleEvaluator:
         self.store = store
 
     @defer.inlineCallbacks
-    def action_for_event_by_user(self, event, handler, current_state):
+    def action_for_event_by_user(self, event, current_state):
         actions_by_user = {}
 
         # None of these users can be peeking since this list of users comes
@@ -136,8 +137,8 @@ class BulkPushRuleEvaluator:
             (u, False) for u in self.rules_by_user.keys()
         ]
 
-        filtered_by_user = yield handler.filter_events_for_clients(
-            user_tuples, [event], {event.event_id: current_state}
+        filtered_by_user = yield filter_events_for_clients(
+            self.store, user_tuples, [event], {event.event_id: current_state}
         )
 
         room_members = yield self.store.get_users_in_room(self.room_id)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 7031fa6d55..5d60c1efcf 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -28,6 +28,7 @@ from synapse.util.presentable_names import (
 from synapse.types import UserID
 from synapse.api.errors import StoreError
 from synapse.api.constants import EventTypes
+from synapse.visibility import filter_events_for_client
 
 import jinja2
 import bleach
@@ -227,9 +228,8 @@ class Mailer(object):
             "messages": [],
         }
 
-        handler = self.hs.get_handlers().message_handler
-        the_events = yield handler.filter_events_for_client(
-            user_id, results["events_before"]
+        the_events = yield filter_events_for_client(
+            self.store, user_id, results["events_before"]
         )
         the_events.append(notif_event)
 
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 69ad1de863..0e983ae7fa 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -164,8 +164,8 @@ class ReplicationResource(Resource):
                 "Replicating %d rows of %s from %s -> %s",
                 len(stream_content["rows"]),
                 stream_name,
-                stream_content["position"],
                 request_streams.get(stream_name),
+                stream_content["position"],
             )
 
         request.write(json.dumps(result, ensure_ascii=False))
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 7ba7a6f6e4..635febb174 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -146,12 +146,14 @@ class SlavedEventStore(BaseSlavedStore):
 
         stream = result.get("forward_ex_outliers")
         if stream:
+            self._stream_id_gen.advance(stream["position"])
             for row in stream["rows"]:
                 event_id = row[1]
                 self._invalidate_get_event_cache(event_id)
 
         stream = result.get("backward_ex_outliers")
         if stream:
+            self._backfill_id_gen.advance(-stream["position"])
             for row in stream["rows"]:
                 event_id = row[1]
                 self._invalidate_get_event_cache(event_id)
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 6f316f7d24..9705db5c47 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -224,6 +224,18 @@ class EventPushActionsStore(SQLBaseStore):
             (room_id, event_id)
         )
 
+    def _remove_push_actions_before_txn(self, txn, room_id, user_id,
+                                        topological_ordering):
+        txn.call_after(
+            self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+            (room_id, user_id, )
+        )
+        txn.execute(
+            "DELETE FROM event_push_actions"
+            " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?",
+            (room_id, user_id, topological_ordering,)
+        )
+
 
 def _action_has_highlight(actions):
     for action in actions:
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 935fc503d9..94be820f86 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -100,7 +100,7 @@ class ReceiptsStore(SQLBaseStore):
 
         defer.returnValue([ev for res in results.values() for ev in res])
 
-    @cachedInlineCallbacks(num_args=3, max_entries=5000)
+    @cachedInlineCallbacks(num_args=3, max_entries=5000, lru=True, tree=True)
     def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
         """Get receipts for a single room for sending to clients.
 
@@ -232,7 +232,7 @@ class ReceiptsStore(SQLBaseStore):
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
-        txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
+        txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
 
         txn.call_after(
             self._receipts_stream_cache.entity_has_changed,
@@ -244,6 +244,15 @@ class ReceiptsStore(SQLBaseStore):
             (user_id, room_id, receipt_type)
         )
 
+        res = self._simple_select_one_txn(
+            txn,
+            table="events",
+            retcols=["topological_ordering", "stream_ordering"],
+            keyvalues={"event_id": event_id},
+        )
+        topological_ordering = int(res["topological_ordering"])
+        stream_ordering = int(res["stream_ordering"])
+
         # We don't want to clobber receipts for more recent events, so we
         # have to compare orderings of existing receipts
         sql = (
@@ -256,15 +265,6 @@ class ReceiptsStore(SQLBaseStore):
         results = txn.fetchall()
 
         if results:
-            res = self._simple_select_one_txn(
-                txn,
-                table="events",
-                retcols=["topological_ordering", "stream_ordering"],
-                keyvalues={"event_id": event_id},
-            )
-            topological_ordering = int(res["topological_ordering"])
-            stream_ordering = int(res["stream_ordering"])
-
             for to, so, _ in results:
                 if int(to) > topological_ordering:
                     return False
@@ -294,6 +294,14 @@ class ReceiptsStore(SQLBaseStore):
             }
         )
 
+        if receipt_type == "m.read":
+            self._remove_push_actions_before_txn(
+                txn,
+                room_id=room_id,
+                user_id=user_id,
+                topological_ordering=topological_ordering,
+            )
+
         return True
 
     @defer.inlineCallbacks
@@ -367,7 +375,7 @@ class ReceiptsStore(SQLBaseStore):
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
-        txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
+        txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
 
         self._simple_delete_txn(
             txn,
diff --git a/synapse/storage/schema/delta/32/remove_indices.sql b/synapse/storage/schema/delta/32/remove_indices.sql
new file mode 100644
index 0000000000..f859be46a6
--- /dev/null
+++ b/synapse/storage/schema/delta/32/remove_indices.sql
@@ -0,0 +1,38 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- The following indices are redundant, other indices are equivalent or
+-- supersets
+DROP INDEX IF EXISTS events_room_id; -- Prefix of events_room_stream
+DROP INDEX IF EXISTS events_order; -- Prefix of events_order_topo_stream_room
+DROP INDEX IF EXISTS events_topological_ordering; -- Prefix of events_order_topo_stream_room
+DROP INDEX IF EXISTS events_stream_ordering; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS state_groups_id; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS event_to_state_groups_id; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS event_push_actions_room_id_event_id_user_id_profile_tag; -- Duplicate of UNIQUE CONSTRAINT
+
+DROP INDEX IF EXISTS event_destinations_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS st_extrem_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_content_hashes_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_signatures_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_edge_hashes_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS redactions_event_id; -- Duplicate of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS room_hosts_room_id; -- Prefix of UNIQUE CONSTRAINT
+
+-- The following indices were unused
+DROP INDEX IF EXISTS remote_media_cache_thumbnails_media_id;
+DROP INDEX IF EXISTS evauth_edges_auth_id;
+DROP INDEX IF EXISTS presence_stream_state;
diff --git a/synapse/visibility.py b/synapse/visibility.py
new file mode 100644
index 0000000000..948ad51772
--- /dev/null
+++ b/synapse/visibility.py
@@ -0,0 +1,210 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 - 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.constants import Membership, EventTypes
+
+from synapse.util.logcontext import preserve_fn
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+VISIBILITY_PRIORITY = (
+    "world_readable",
+    "shared",
+    "invited",
+    "joined",
+)
+
+
+MEMBERSHIP_PRIORITY = (
+    Membership.JOIN,
+    Membership.INVITE,
+    Membership.KNOCK,
+    Membership.LEAVE,
+    Membership.BAN,
+)
+
+
+@defer.inlineCallbacks
+def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
+    """ Returns dict of user_id -> list of events that user is allowed to
+    see.
+
+    Args:
+        user_tuples (str, bool): (user id, is_peeking) for each user to be
+            checked. is_peeking should be true if:
+            * the user is not currently a member of the room, and:
+            * the user has not been a member of the room since the
+            given events
+        events ([synapse.events.EventBase]): list of events to filter
+    """
+    forgotten = yield defer.gatherResults([
+        preserve_fn(store.who_forgot_in_room)(
+            room_id,
+        )
+        for room_id in frozenset(e.room_id for e in events)
+    ], consumeErrors=True)
+
+    # Set of membership event_ids that have been forgotten
+    event_id_forgotten = frozenset(
+        row["event_id"] for rows in forgotten for row in rows
+    )
+
+    ignore_dict_content = yield store.get_global_account_data_by_type_for_users(
+        "m.ignored_user_list", user_ids=[user_id for user_id, _ in user_tuples]
+    )
+
+    # FIXME: This will explode if people upload something incorrect.
+    ignore_dict = {
+        user_id: frozenset(
+            content.get("ignored_users", {}).keys() if content else []
+        )
+        for user_id, content in ignore_dict_content.items()
+    }
+
+    def allowed(event, user_id, is_peeking, ignore_list):
+        """
+        Args:
+            event (synapse.events.EventBase): event to check
+            user_id (str)
+            is_peeking (bool)
+            ignore_list (list): list of users to ignore
+        """
+        if not event.is_state() and event.sender in ignore_list:
+            return False
+
+        state = event_id_to_state[event.event_id]
+
+        # get the room_visibility at the time of the event.
+        visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
+        if visibility_event:
+            visibility = visibility_event.content.get("history_visibility", "shared")
+        else:
+            visibility = "shared"
+
+        if visibility not in VISIBILITY_PRIORITY:
+            visibility = "shared"
+
+        # if it was world_readable, it's easy: everyone can read it
+        if visibility == "world_readable":
+            return True
+
+        # Always allow history visibility events on boundaries. This is done
+        # by setting the effective visibility to the least restrictive
+        # of the old vs new.
+        if event.type == EventTypes.RoomHistoryVisibility:
+            prev_content = event.unsigned.get("prev_content", {})
+            prev_visibility = prev_content.get("history_visibility", None)
+
+            if prev_visibility not in VISIBILITY_PRIORITY:
+                prev_visibility = "shared"
+
+            new_priority = VISIBILITY_PRIORITY.index(visibility)
+            old_priority = VISIBILITY_PRIORITY.index(prev_visibility)
+            if old_priority < new_priority:
+                visibility = prev_visibility
+
+        # likewise, if the event is the user's own membership event, use
+        # the 'most joined' membership
+        membership = None
+        if event.type == EventTypes.Member and event.state_key == user_id:
+            membership = event.content.get("membership", None)
+            if membership not in MEMBERSHIP_PRIORITY:
+                membership = "leave"
+
+            prev_content = event.unsigned.get("prev_content", {})
+            prev_membership = prev_content.get("membership", None)
+            if prev_membership not in MEMBERSHIP_PRIORITY:
+                prev_membership = "leave"
+
+            new_priority = MEMBERSHIP_PRIORITY.index(membership)
+            old_priority = MEMBERSHIP_PRIORITY.index(prev_membership)
+            if old_priority < new_priority:
+                membership = prev_membership
+
+        # otherwise, get the user's membership at the time of the event.
+        if membership is None:
+            membership_event = state.get((EventTypes.Member, user_id), None)
+            if membership_event:
+                if membership_event.event_id not in event_id_forgotten:
+                    membership = membership_event.membership
+
+        # if the user was a member of the room at the time of the event,
+        # they can see it.
+        if membership == Membership.JOIN:
+            return True
+
+        if visibility == "joined":
+            # we weren't a member at the time of the event, so we can't
+            # see this event.
+            return False
+
+        elif visibility == "invited":
+            # user can also see the event if they were *invited* at the time
+            # of the event.
+            return membership == Membership.INVITE
+
+        else:
+            # visibility is shared: user can also see the event if they have
+            # become a member since the event
+            #
+            # XXX: if the user has subsequently joined and then left again,
+            # ideally we would share history up to the point they left. But
+            # we don't know when they left.
+            return not is_peeking
+
+    defer.returnValue({
+        user_id: [
+            event
+            for event in events
+            if allowed(event, user_id, is_peeking, ignore_dict.get(user_id, []))
+        ]
+        for user_id, is_peeking in user_tuples
+    })
+
+
+@defer.inlineCallbacks
+def filter_events_for_client(store, user_id, events, is_peeking=False):
+    """
+    Check which events a user is allowed to see
+
+    Args:
+        user_id(str): user id to be checked
+        events([synapse.events.EventBase]): list of events to be checked
+        is_peeking(bool): should be True if:
+          * the user is not currently a member of the room, and:
+          * the user has not been a member of the room since the given
+            events
+
+    Returns:
+        [synapse.events.EventBase]
+    """
+    types = (
+        (EventTypes.RoomHistoryVisibility, ""),
+        (EventTypes.Member, user_id),
+    )
+    event_id_to_state = yield store.get_state_for_events(
+        frozenset(e.event_id for e in events),
+        types=types
+    )
+    res = yield filter_events_for_clients(
+        store, [(user_id, is_peeking)], events, event_id_to_state
+    )
+    defer.returnValue(res.get(user_id, []))