summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/_base.py380
-rw-r--r--synapse/handlers/auth.py5
-rw-r--r--synapse/handlers/federation.py19
-rw-r--r--synapse/handlers/message.py215
-rw-r--r--synapse/handlers/room.py7
-rw-r--r--synapse/handlers/room_member.py4
-rw-r--r--synapse/handlers/search.py17
-rw-r--r--synapse/handlers/sync.py7
-rw-r--r--synapse/notifier.py5
-rw-r--r--synapse/push/action_generator.py4
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py7
-rw-r--r--synapse/push/mailer.py6
-rw-r--r--synapse/replication/resource.py2
-rw-r--r--synapse/replication/slave/storage/events.py2
-rw-r--r--synapse/storage/_base.py8
-rw-r--r--synapse/storage/event_push_actions.py12
-rw-r--r--synapse/storage/pusher.py13
-rw-r--r--synapse/storage/receipts.py32
-rw-r--r--synapse/storage/schema/delta/32/remove_indices.sql38
-rw-r--r--synapse/visibility.py210
20 files changed, 558 insertions, 435 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 2c811906d9..c904c6c500 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -15,13 +15,10 @@
 
 from twisted.internet import defer
 
-from synapse.api.errors import LimitExceededError, SynapseError, AuthError
-from synapse.crypto.event_signing import add_hashes_and_signatures
+from synapse.api.errors import LimitExceededError
 from synapse.api.constants import Membership, EventTypes
-from synapse.types import UserID, RoomAlias, Requester, get_domian_from_id
-from synapse.push.action_generator import ActionGenerator
+from synapse.types import UserID, Requester
 
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
 
 import logging
 
@@ -29,23 +26,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-VISIBILITY_PRIORITY = (
-    "world_readable",
-    "shared",
-    "invited",
-    "joined",
-)
-
-
-MEMBERSHIP_PRIORITY = (
-    Membership.JOIN,
-    Membership.INVITE,
-    Membership.KNOCK,
-    Membership.LEAVE,
-    Membership.BAN,
-)
-
-
 class BaseHandler(object):
     """
     Common base class for the event handlers.
@@ -65,177 +45,10 @@ class BaseHandler(object):
         self.clock = hs.get_clock()
         self.hs = hs
 
-        self.signing_key = hs.config.signing_key[0]
         self.server_name = hs.hostname
 
         self.event_builder_factory = hs.get_event_builder_factory()
 
-    @defer.inlineCallbacks
-    def filter_events_for_clients(self, user_tuples, events, event_id_to_state):
-        """ Returns dict of user_id -> list of events that user is allowed to
-        see.
-
-        Args:
-            user_tuples (str, bool): (user id, is_peeking) for each user to be
-                checked. is_peeking should be true if:
-                * the user is not currently a member of the room, and:
-                * the user has not been a member of the room since the
-                given events
-            events ([synapse.events.EventBase]): list of events to filter
-        """
-        forgotten = yield defer.gatherResults([
-            preserve_fn(self.store.who_forgot_in_room)(
-                room_id,
-            )
-            for room_id in frozenset(e.room_id for e in events)
-        ], consumeErrors=True)
-
-        # Set of membership event_ids that have been forgotten
-        event_id_forgotten = frozenset(
-            row["event_id"] for rows in forgotten for row in rows
-        )
-
-        ignore_dict_content = yield self.store.get_global_account_data_by_type_for_users(
-            "m.ignored_user_list", user_ids=[user_id for user_id, _ in user_tuples]
-        )
-
-        # FIXME: This will explode if people upload something incorrect.
-        ignore_dict = {
-            user_id: frozenset(
-                content.get("ignored_users", {}).keys() if content else []
-            )
-            for user_id, content in ignore_dict_content.items()
-        }
-
-        def allowed(event, user_id, is_peeking, ignore_list):
-            """
-            Args:
-                event (synapse.events.EventBase): event to check
-                user_id (str)
-                is_peeking (bool)
-                ignore_list (list): list of users to ignore
-            """
-            if not event.is_state() and event.sender in ignore_list:
-                return False
-
-            state = event_id_to_state[event.event_id]
-
-            # get the room_visibility at the time of the event.
-            visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
-            if visibility_event:
-                visibility = visibility_event.content.get("history_visibility", "shared")
-            else:
-                visibility = "shared"
-
-            if visibility not in VISIBILITY_PRIORITY:
-                visibility = "shared"
-
-            # if it was world_readable, it's easy: everyone can read it
-            if visibility == "world_readable":
-                return True
-
-            # Always allow history visibility events on boundaries. This is done
-            # by setting the effective visibility to the least restrictive
-            # of the old vs new.
-            if event.type == EventTypes.RoomHistoryVisibility:
-                prev_content = event.unsigned.get("prev_content", {})
-                prev_visibility = prev_content.get("history_visibility", None)
-
-                if prev_visibility not in VISIBILITY_PRIORITY:
-                    prev_visibility = "shared"
-
-                new_priority = VISIBILITY_PRIORITY.index(visibility)
-                old_priority = VISIBILITY_PRIORITY.index(prev_visibility)
-                if old_priority < new_priority:
-                    visibility = prev_visibility
-
-            # likewise, if the event is the user's own membership event, use
-            # the 'most joined' membership
-            membership = None
-            if event.type == EventTypes.Member and event.state_key == user_id:
-                membership = event.content.get("membership", None)
-                if membership not in MEMBERSHIP_PRIORITY:
-                    membership = "leave"
-
-                prev_content = event.unsigned.get("prev_content", {})
-                prev_membership = prev_content.get("membership", None)
-                if prev_membership not in MEMBERSHIP_PRIORITY:
-                    prev_membership = "leave"
-
-                new_priority = MEMBERSHIP_PRIORITY.index(membership)
-                old_priority = MEMBERSHIP_PRIORITY.index(prev_membership)
-                if old_priority < new_priority:
-                    membership = prev_membership
-
-            # otherwise, get the user's membership at the time of the event.
-            if membership is None:
-                membership_event = state.get((EventTypes.Member, user_id), None)
-                if membership_event:
-                    if membership_event.event_id not in event_id_forgotten:
-                        membership = membership_event.membership
-
-            # if the user was a member of the room at the time of the event,
-            # they can see it.
-            if membership == Membership.JOIN:
-                return True
-
-            if visibility == "joined":
-                # we weren't a member at the time of the event, so we can't
-                # see this event.
-                return False
-
-            elif visibility == "invited":
-                # user can also see the event if they were *invited* at the time
-                # of the event.
-                return membership == Membership.INVITE
-
-            else:
-                # visibility is shared: user can also see the event if they have
-                # become a member since the event
-                #
-                # XXX: if the user has subsequently joined and then left again,
-                # ideally we would share history up to the point they left. But
-                # we don't know when they left.
-                return not is_peeking
-
-        defer.returnValue({
-            user_id: [
-                event
-                for event in events
-                if allowed(event, user_id, is_peeking, ignore_dict.get(user_id, []))
-            ]
-            for user_id, is_peeking in user_tuples
-        })
-
-    @defer.inlineCallbacks
-    def filter_events_for_client(self, user_id, events, is_peeking=False):
-        """
-        Check which events a user is allowed to see
-
-        Args:
-            user_id(str): user id to be checked
-            events([synapse.events.EventBase]): list of events to be checked
-            is_peeking(bool): should be True if:
-              * the user is not currently a member of the room, and:
-              * the user has not been a member of the room since the given
-                events
-
-        Returns:
-            [synapse.events.EventBase]
-        """
-        types = (
-            (EventTypes.RoomHistoryVisibility, ""),
-            (EventTypes.Member, user_id),
-        )
-        event_id_to_state = yield self.store.get_state_for_events(
-            frozenset(e.event_id for e in events),
-            types=types
-        )
-        res = yield self.filter_events_for_clients(
-            [(user_id, is_peeking)], events, event_id_to_state
-        )
-        defer.returnValue(res.get(user_id, []))
-
     def ratelimit(self, requester):
         time_now = self.clock.time()
         allowed, time_allowed = self.ratelimiter.send_message(
@@ -248,56 +61,6 @@ class BaseHandler(object):
                 retry_after_ms=int(1000 * (time_allowed - time_now)),
             )
 
-    @defer.inlineCallbacks
-    def _create_new_client_event(self, builder, prev_event_ids=None):
-        if prev_event_ids:
-            prev_events = yield self.store.add_event_hashes(prev_event_ids)
-            prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
-            depth = prev_max_depth + 1
-        else:
-            latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
-                builder.room_id,
-            )
-
-            if latest_ret:
-                depth = max([d for _, _, d in latest_ret]) + 1
-            else:
-                depth = 1
-
-            prev_events = [
-                (event_id, prev_hashes)
-                for event_id, prev_hashes, _ in latest_ret
-            ]
-
-        builder.prev_events = prev_events
-        builder.depth = depth
-
-        state_handler = self.state_handler
-
-        context = yield state_handler.compute_event_context(builder)
-
-        if builder.is_state():
-            builder.prev_state = yield self.store.add_event_hashes(
-                context.prev_state_events
-            )
-
-        yield self.auth.add_auth_events(builder, context)
-
-        add_hashes_and_signatures(
-            builder, self.server_name, self.signing_key
-        )
-
-        event = builder.build()
-
-        logger.debug(
-            "Created event %s with current state: %s",
-            event.event_id, context.current_state,
-        )
-
-        defer.returnValue(
-            (event, context,)
-        )
-
     def is_host_in_room(self, current_state):
         room_members = [
             (state_key, event.membership)
@@ -319,145 +82,6 @@ class BaseHandler(object):
         return False
 
     @defer.inlineCallbacks
-    def handle_new_client_event(
-        self,
-        requester,
-        event,
-        context,
-        ratelimit=True,
-        extra_users=[]
-    ):
-        # We now need to go and hit out to wherever we need to hit out to.
-
-        if ratelimit:
-            self.ratelimit(requester)
-
-        try:
-            self.auth.check(event, auth_events=context.current_state)
-        except AuthError as err:
-            logger.warn("Denying new event %r because %s", event, err)
-            raise err
-
-        yield self.maybe_kick_guest_users(event, context.current_state.values())
-
-        if event.type == EventTypes.CanonicalAlias:
-            # Check the alias is acually valid (at this time at least)
-            room_alias_str = event.content.get("alias", None)
-            if room_alias_str:
-                room_alias = RoomAlias.from_string(room_alias_str)
-                directory_handler = self.hs.get_handlers().directory_handler
-                mapping = yield directory_handler.get_association(room_alias)
-
-                if mapping["room_id"] != event.room_id:
-                    raise SynapseError(
-                        400,
-                        "Room alias %s does not point to the room" % (
-                            room_alias_str,
-                        )
-                    )
-
-        federation_handler = self.hs.get_handlers().federation_handler
-
-        if event.type == EventTypes.Member:
-            if event.content["membership"] == Membership.INVITE:
-                def is_inviter_member_event(e):
-                    return (
-                        e.type == EventTypes.Member and
-                        e.sender == event.sender
-                    )
-
-                event.unsigned["invite_room_state"] = [
-                    {
-                        "type": e.type,
-                        "state_key": e.state_key,
-                        "content": e.content,
-                        "sender": e.sender,
-                    }
-                    for k, e in context.current_state.items()
-                    if e.type in self.hs.config.room_invite_state_types
-                    or is_inviter_member_event(e)
-                ]
-
-                invitee = UserID.from_string(event.state_key)
-                if not self.hs.is_mine(invitee):
-                    # TODO: Can we add signature from remote server in a nicer
-                    # way? If we have been invited by a remote server, we need
-                    # to get them to sign the event.
-
-                    returned_invite = yield federation_handler.send_invite(
-                        invitee.domain,
-                        event,
-                    )
-
-                    event.unsigned.pop("room_state", None)
-
-                    # TODO: Make sure the signatures actually are correct.
-                    event.signatures.update(
-                        returned_invite.signatures
-                    )
-
-        if event.type == EventTypes.Redaction:
-            if self.auth.check_redaction(event, auth_events=context.current_state):
-                original_event = yield self.store.get_event(
-                    event.redacts,
-                    check_redacted=False,
-                    get_prev_content=False,
-                    allow_rejected=False,
-                    allow_none=False
-                )
-                if event.user_id != original_event.user_id:
-                    raise AuthError(
-                        403,
-                        "You don't have permission to redact events"
-                    )
-
-        if event.type == EventTypes.Create and context.current_state:
-            raise AuthError(
-                403,
-                "Changing the room create event is forbidden",
-            )
-
-        action_generator = ActionGenerator(self.hs)
-        yield action_generator.handle_push_actions_for_event(
-            event, context, self
-        )
-
-        (event_stream_id, max_stream_id) = yield self.store.persist_event(
-            event, context=context
-        )
-
-        # this intentionally does not yield: we don't care about the result
-        # and don't need to wait for it.
-        preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
-            event_stream_id, max_stream_id
-        )
-
-        destinations = set()
-        for k, s in context.current_state.items():
-            try:
-                if k[0] == EventTypes.Member:
-                    if s.content["membership"] == Membership.JOIN:
-                        destinations.add(get_domian_from_id(s.state_key))
-            except SynapseError:
-                logger.warn(
-                    "Failed to get destination from event %s", s.event_id
-                )
-
-        with PreserveLoggingContext():
-            # Don't block waiting on waking up all the listeners.
-            self.notifier.on_new_room_event(
-                event, event_stream_id, max_stream_id,
-                extra_users=extra_users
-            )
-
-        # If invite, remove room_state from unsigned before sending.
-        event.unsigned.pop("invite_room_state", None)
-
-        federation_handler.handle_new_event(
-            event, destinations=destinations,
-        )
-
-    @defer.inlineCallbacks
     def maybe_kick_guest_users(self, event, current_state):
         # Technically this function invalidates current_state by changing it.
         # Hopefully this isn't that important to the caller.
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 3d36d3460e..68d0d78fc6 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -615,4 +615,7 @@ class AuthHandler(BaseHandler):
         Returns:
             Whether self.hash(password) == stored_hash (bool).
         """
-        return bcrypt.hashpw(password, stored_hash) == stored_hash
+        if stored_hash:
+            return bcrypt.hashpw(password, stored_hash) == stored_hash
+        else:
+            return False
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f38c6a8713..c21d9d4d83 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -682,7 +682,8 @@ class FederationHandler(BaseHandler):
         })
 
         try:
-            event, context = yield self._create_new_client_event(
+            message_handler = self.hs.get_handlers().message_handler
+            event, context = yield message_handler._create_new_client_event(
                 builder=builder,
             )
         except AuthError as e:
@@ -913,7 +914,8 @@ class FederationHandler(BaseHandler):
             "state_key": user_id,
         })
 
-        event, context = yield self._create_new_client_event(
+        message_handler = self.hs.get_handlers().message_handler
+        event, context = yield message_handler._create_new_client_event(
             builder=builder,
         )
 
@@ -1111,7 +1113,7 @@ class FederationHandler(BaseHandler):
         if not event.internal_metadata.is_outlier():
             action_generator = ActionGenerator(self.hs)
             yield action_generator.handle_push_actions_for_event(
-                event, context, self
+                event, context
             )
 
         event_stream_id, max_stream_id = yield self.store.persist_event(
@@ -1688,7 +1690,10 @@ class FederationHandler(BaseHandler):
         if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
             builder = self.event_builder_factory.new(event_dict)
             EventValidator().validate_new(builder)
-            event, context = yield self._create_new_client_event(builder=builder)
+            message_handler = self.hs.get_handlers().message_handler
+            event, context = yield message_handler._create_new_client_event(
+                builder=builder
+            )
 
             event, context = yield self.add_display_name_to_third_party_invite(
                 event_dict, event, context
@@ -1716,7 +1721,8 @@ class FederationHandler(BaseHandler):
     def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
         builder = self.event_builder_factory.new(event_dict)
 
-        event, context = yield self._create_new_client_event(
+        message_handler = self.hs.get_handlers().message_handler
+        event, context = yield message_handler._create_new_client_event(
             builder=builder,
         )
 
@@ -1755,7 +1761,8 @@ class FederationHandler(BaseHandler):
         event_dict["content"]["third_party_invite"]["display_name"] = display_name
         builder = self.event_builder_factory.new(event_dict)
         EventValidator().validate_new(builder)
-        event, context = yield self._create_new_client_event(builder=builder)
+        message_handler = self.hs.get_handlers().message_handler
+        event, context = yield message_handler._create_new_client_event(builder=builder)
         defer.returnValue((event, context))
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7d9e3cf364..13154edb78 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -17,13 +17,19 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.streams.config import PaginationConfig
+from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
+from synapse.push.action_generator import ActionGenerator
+from synapse.streams.config import PaginationConfig
+from synapse.types import (
+    UserID, RoomAlias, RoomStreamToken, StreamToken, get_domian_from_id
+)
 from synapse.util import unwrapFirstError
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.types import UserID, RoomStreamToken, StreamToken
+from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
@@ -123,7 +129,8 @@ class MessageHandler(BaseHandler):
                 "end": next_token.to_string(),
             })
 
-        events = yield self.filter_events_for_client(
+        events = yield filter_events_for_client(
+            self.store,
             user_id,
             events,
             is_peeking=(member_event_id is None),
@@ -483,8 +490,8 @@ class MessageHandler(BaseHandler):
                     ]
                 ).addErrback(unwrapFirstError)
 
-                messages = yield self.filter_events_for_client(
-                    user_id, messages
+                messages = yield filter_events_for_client(
+                    self.store, user_id, messages
                 )
 
                 start_token = now_token.copy_and_replace("room_key", token[0])
@@ -619,8 +626,8 @@ class MessageHandler(BaseHandler):
             end_token=stream_token
         )
 
-        messages = yield self.filter_events_for_client(
-            user_id, messages, is_peeking=is_peeking
+        messages = yield filter_events_for_client(
+            self.store, user_id, messages, is_peeking=is_peeking
         )
 
         start_token = StreamToken.START.copy_and_replace("room_key", token[0])
@@ -700,8 +707,8 @@ class MessageHandler(BaseHandler):
             consumeErrors=True,
         ).addErrback(unwrapFirstError)
 
-        messages = yield self.filter_events_for_client(
-            user_id, messages, is_peeking=is_peeking,
+        messages = yield filter_events_for_client(
+            self.store, user_id, messages, is_peeking=is_peeking,
         )
 
         start_token = now_token.copy_and_replace("room_key", token[0])
@@ -724,3 +731,193 @@ class MessageHandler(BaseHandler):
             ret["membership"] = membership
 
         defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def _create_new_client_event(self, builder, prev_event_ids=None):
+        if prev_event_ids:
+            prev_events = yield self.store.add_event_hashes(prev_event_ids)
+            prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
+            depth = prev_max_depth + 1
+        else:
+            latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
+                builder.room_id,
+            )
+
+            if latest_ret:
+                depth = max([d for _, _, d in latest_ret]) + 1
+            else:
+                depth = 1
+
+            prev_events = [
+                (event_id, prev_hashes)
+                for event_id, prev_hashes, _ in latest_ret
+            ]
+
+        builder.prev_events = prev_events
+        builder.depth = depth
+
+        state_handler = self.state_handler
+
+        context = yield state_handler.compute_event_context(builder)
+
+        if builder.is_state():
+            builder.prev_state = yield self.store.add_event_hashes(
+                context.prev_state_events
+            )
+
+        yield self.auth.add_auth_events(builder, context)
+
+        signing_key = self.hs.config.signing_key[0]
+        add_hashes_and_signatures(
+            builder, self.server_name, signing_key
+        )
+
+        event = builder.build()
+
+        logger.debug(
+            "Created event %s with current state: %s",
+            event.event_id, context.current_state,
+        )
+
+        defer.returnValue(
+            (event, context,)
+        )
+
+    @defer.inlineCallbacks
+    def handle_new_client_event(
+        self,
+        requester,
+        event,
+        context,
+        ratelimit=True,
+        extra_users=[]
+    ):
+        # We now need to go and hit out to wherever we need to hit out to.
+
+        if ratelimit:
+            self.ratelimit(requester)
+
+        try:
+            self.auth.check(event, auth_events=context.current_state)
+        except AuthError as err:
+            logger.warn("Denying new event %r because %s", event, err)
+            raise err
+
+        yield self.maybe_kick_guest_users(event, context.current_state.values())
+
+        if event.type == EventTypes.CanonicalAlias:
+            # Check the alias is acually valid (at this time at least)
+            room_alias_str = event.content.get("alias", None)
+            if room_alias_str:
+                room_alias = RoomAlias.from_string(room_alias_str)
+                directory_handler = self.hs.get_handlers().directory_handler
+                mapping = yield directory_handler.get_association(room_alias)
+
+                if mapping["room_id"] != event.room_id:
+                    raise SynapseError(
+                        400,
+                        "Room alias %s does not point to the room" % (
+                            room_alias_str,
+                        )
+                    )
+
+        federation_handler = self.hs.get_handlers().federation_handler
+
+        if event.type == EventTypes.Member:
+            if event.content["membership"] == Membership.INVITE:
+                def is_inviter_member_event(e):
+                    return (
+                        e.type == EventTypes.Member and
+                        e.sender == event.sender
+                    )
+
+                event.unsigned["invite_room_state"] = [
+                    {
+                        "type": e.type,
+                        "state_key": e.state_key,
+                        "content": e.content,
+                        "sender": e.sender,
+                    }
+                    for k, e in context.current_state.items()
+                    if e.type in self.hs.config.room_invite_state_types
+                    or is_inviter_member_event(e)
+                ]
+
+                invitee = UserID.from_string(event.state_key)
+                if not self.hs.is_mine(invitee):
+                    # TODO: Can we add signature from remote server in a nicer
+                    # way? If we have been invited by a remote server, we need
+                    # to get them to sign the event.
+
+                    returned_invite = yield federation_handler.send_invite(
+                        invitee.domain,
+                        event,
+                    )
+
+                    event.unsigned.pop("room_state", None)
+
+                    # TODO: Make sure the signatures actually are correct.
+                    event.signatures.update(
+                        returned_invite.signatures
+                    )
+
+        if event.type == EventTypes.Redaction:
+            if self.auth.check_redaction(event, auth_events=context.current_state):
+                original_event = yield self.store.get_event(
+                    event.redacts,
+                    check_redacted=False,
+                    get_prev_content=False,
+                    allow_rejected=False,
+                    allow_none=False
+                )
+                if event.user_id != original_event.user_id:
+                    raise AuthError(
+                        403,
+                        "You don't have permission to redact events"
+                    )
+
+        if event.type == EventTypes.Create and context.current_state:
+            raise AuthError(
+                403,
+                "Changing the room create event is forbidden",
+            )
+
+        action_generator = ActionGenerator(self.hs)
+        yield action_generator.handle_push_actions_for_event(
+            event, context
+        )
+
+        (event_stream_id, max_stream_id) = yield self.store.persist_event(
+            event, context=context
+        )
+
+        # this intentionally does not yield: we don't care about the result
+        # and don't need to wait for it.
+        preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+            event_stream_id, max_stream_id
+        )
+
+        destinations = set()
+        for k, s in context.current_state.items():
+            try:
+                if k[0] == EventTypes.Member:
+                    if s.content["membership"] == Membership.JOIN:
+                        destinations.add(get_domian_from_id(s.state_key))
+            except SynapseError:
+                logger.warn(
+                    "Failed to get destination from event %s", s.event_id
+                )
+
+        with PreserveLoggingContext():
+            # Don't block waiting on waking up all the listeners.
+            self.notifier.on_new_room_event(
+                event, event_stream_id, max_stream_id,
+                extra_users=extra_users
+            )
+
+        # If invite, remove room_state from unsigned before sending.
+        event.unsigned.pop("invite_room_state", None)
+
+        federation_handler.handle_new_event(
+            event, destinations=destinations,
+        )
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index fdebc9c438..3d63b3c513 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -26,6 +26,7 @@ from synapse.api.errors import AuthError, StoreError, SynapseError
 from synapse.util import stringutils
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
+from synapse.visibility import filter_events_for_client
 
 from collections import OrderedDict
 
@@ -449,10 +450,12 @@ class RoomContextHandler(BaseHandler):
         now_token = yield self.hs.get_event_sources().get_current_token()
 
         def filter_evts(events):
-            return self.filter_events_for_client(
+            return filter_events_for_client(
+                self.store,
                 user.to_string(),
                 events,
-                is_peeking=is_guest)
+                is_peeking=is_guest
+            )
 
         event = yield self.store.get_event(event_id, get_prev_content=True,
                                            allow_none=True)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ed2cda837f..b44e52a515 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -113,7 +113,7 @@ class RoomMemberHandler(BaseHandler):
             prev_event_ids=prev_event_ids,
         )
 
-        yield self.handle_new_client_event(
+        yield msg_handler.handle_new_client_event(
             requester,
             event,
             context,
@@ -357,7 +357,7 @@ class RoomMemberHandler(BaseHandler):
                 # so don't really fit into the general auth process.
                 raise AuthError(403, "Guest access not allowed")
 
-        yield self.handle_new_client_event(
+        yield message_handler.handle_new_client_event(
             requester,
             event,
             context,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index a937e87408..df75d70fac 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -21,6 +21,7 @@ from synapse.api.constants import Membership, EventTypes
 from synapse.api.filtering import Filter
 from synapse.api.errors import SynapseError
 from synapse.events.utils import serialize_event
+from synapse.visibility import filter_events_for_client
 
 from unpaddedbase64 import decode_base64, encode_base64
 
@@ -172,8 +173,8 @@ class SearchHandler(BaseHandler):
 
             filtered_events = search_filter.filter([r["event"] for r in results])
 
-            events = yield self.filter_events_for_client(
-                user.to_string(), filtered_events
+            events = yield filter_events_for_client(
+                self.store, user.to_string(), filtered_events
             )
 
             events.sort(key=lambda e: -rank_map[e.event_id])
@@ -223,8 +224,8 @@ class SearchHandler(BaseHandler):
                     r["event"] for r in results
                 ])
 
-                events = yield self.filter_events_for_client(
-                    user.to_string(), filtered_events
+                events = yield filter_events_for_client(
+                    self.store, user.to_string(), filtered_events
                 )
 
                 room_events.extend(events)
@@ -281,12 +282,12 @@ class SearchHandler(BaseHandler):
                     event.room_id, event.event_id, before_limit, after_limit
                 )
 
-                res["events_before"] = yield self.filter_events_for_client(
-                    user.to_string(), res["events_before"]
+                res["events_before"] = yield filter_events_for_client(
+                    self.store, user.to_string(), res["events_before"]
                 )
 
-                res["events_after"] = yield self.filter_events_for_client(
-                    user.to_string(), res["events_after"]
+                res["events_after"] = yield filter_events_for_client(
+                    self.store, user.to_string(), res["events_after"]
                 )
 
                 res["start"] = now_token.copy_and_replace(
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b7dcbc6b1b..921215469f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -22,6 +22,7 @@ from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
+from synapse.visibility import filter_events_for_client
 
 from twisted.internet import defer
 
@@ -697,7 +698,8 @@ class SyncHandler(BaseHandler):
 
             if recents is not None:
                 recents = sync_config.filter_collection.filter_room_timeline(recents)
-                recents = yield self.filter_events_for_client(
+                recents = yield filter_events_for_client(
+                    self.store,
                     sync_config.user.to_string(),
                     recents,
                 )
@@ -718,7 +720,8 @@ class SyncHandler(BaseHandler):
                 loaded_recents = sync_config.filter_collection.filter_room_timeline(
                     events
                 )
-                loaded_recents = yield self.filter_events_for_client(
+                loaded_recents = yield filter_events_for_client(
+                    self.store,
                     sync_config.user.to_string(),
                     loaded_recents,
                 )
diff --git a/synapse/notifier.py b/synapse/notifier.py
index cb58dfffd4..33b79c0ec7 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -21,6 +21,7 @@ from synapse.util.logutils import log_function
 from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.types import StreamToken
+from synapse.visibility import filter_events_for_client
 import synapse.metrics
 
 from collections import namedtuple
@@ -398,8 +399,8 @@ class Notifier(object):
                 )
 
                 if name == "room":
-                    room_member_handler = self.hs.get_handlers().room_member_handler
-                    new_events = yield room_member_handler.filter_events_for_client(
+                    new_events = yield filter_events_for_client(
+                        self.store,
                         user.to_string(),
                         new_events,
                         is_peeking=is_peeking,
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index a0160994b7..9b208668b6 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -37,14 +37,14 @@ class ActionGenerator:
         # tag (ie. we just need all the users).
 
     @defer.inlineCallbacks
-    def handle_push_actions_for_event(self, event, context, handler):
+    def handle_push_actions_for_event(self, event, context):
         with Measure(self.clock, "handle_push_actions_for_event"):
             bulk_evaluator = yield evaluator_for_event(
                 event, self.hs, self.store
             )
 
             actions_by_user = yield bulk_evaluator.action_for_event_by_user(
-                event, handler, context.current_state
+                event, context.current_state
             )
 
             context.push_actions = [
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index f97df36d80..25e13b3423 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -22,6 +22,7 @@ from .baserules import list_with_base_rules
 from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
 from synapse.api.constants import EventTypes
+from synapse.visibility import filter_events_for_clients
 
 
 logger = logging.getLogger(__name__)
@@ -126,7 +127,7 @@ class BulkPushRuleEvaluator:
         self.store = store
 
     @defer.inlineCallbacks
-    def action_for_event_by_user(self, event, handler, current_state):
+    def action_for_event_by_user(self, event, current_state):
         actions_by_user = {}
 
         # None of these users can be peeking since this list of users comes
@@ -136,8 +137,8 @@ class BulkPushRuleEvaluator:
             (u, False) for u in self.rules_by_user.keys()
         ]
 
-        filtered_by_user = yield handler.filter_events_for_clients(
-            user_tuples, [event], {event.event_id: current_state}
+        filtered_by_user = yield filter_events_for_clients(
+            self.store, user_tuples, [event], {event.event_id: current_state}
         )
 
         room_members = yield self.store.get_users_in_room(self.room_id)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 7031fa6d55..5d60c1efcf 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -28,6 +28,7 @@ from synapse.util.presentable_names import (
 from synapse.types import UserID
 from synapse.api.errors import StoreError
 from synapse.api.constants import EventTypes
+from synapse.visibility import filter_events_for_client
 
 import jinja2
 import bleach
@@ -227,9 +228,8 @@ class Mailer(object):
             "messages": [],
         }
 
-        handler = self.hs.get_handlers().message_handler
-        the_events = yield handler.filter_events_for_client(
-            user_id, results["events_before"]
+        the_events = yield filter_events_for_client(
+            self.store, user_id, results["events_before"]
         )
         the_events.append(notif_event)
 
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 69ad1de863..0e983ae7fa 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -164,8 +164,8 @@ class ReplicationResource(Resource):
                 "Replicating %d rows of %s from %s -> %s",
                 len(stream_content["rows"]),
                 stream_name,
-                stream_content["position"],
                 request_streams.get(stream_name),
+                stream_content["position"],
             )
 
         request.write(json.dumps(result, ensure_ascii=False))
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 7ba7a6f6e4..635febb174 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -146,12 +146,14 @@ class SlavedEventStore(BaseSlavedStore):
 
         stream = result.get("forward_ex_outliers")
         if stream:
+            self._stream_id_gen.advance(stream["position"])
             for row in stream["rows"]:
                 event_id = row[1]
                 self._invalidate_get_event_cache(event_id)
 
         stream = result.get("backward_ex_outliers")
         if stream:
+            self._backfill_id_gen.advance(-stream["position"])
             for row in stream["rows"]:
                 event_id = row[1]
                 self._invalidate_get_event_cache(event_id)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 1e27c2c0ce..e0d7098692 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -453,7 +453,9 @@ class SQLBaseStore(object):
             keyvalues (dict): The unique key tables and their new values
             values (dict): The nonunique columns and their new values
             insertion_values (dict): key/values to use when inserting
-        Returns: A deferred
+        Returns:
+            Deferred(bool): True if a new entry was created, False if an
+                existing one was updated.
         """
         return self.runInteraction(
             desc,
@@ -498,6 +500,10 @@ class SQLBaseStore(object):
             )
             txn.execute(sql, allvalues.values())
 
+            return True
+        else:
+            return False
+
     def _simple_select_one(self, table, keyvalues, retcols,
                            allow_none=False, desc="_simple_select_one"):
         """Executes a SELECT query on the named table, which is expected to
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 6f316f7d24..9705db5c47 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -224,6 +224,18 @@ class EventPushActionsStore(SQLBaseStore):
             (room_id, event_id)
         )
 
+    def _remove_push_actions_before_txn(self, txn, room_id, user_id,
+                                        topological_ordering):
+        txn.call_after(
+            self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+            (room_id, user_id, )
+        )
+        txn.execute(
+            "DELETE FROM event_push_actions"
+            " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?",
+            (room_id, user_id, topological_ordering,)
+        )
+
 
 def _action_has_highlight(actions):
     for action in actions:
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index d9afd7ec87..9e8e2e2964 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -156,8 +156,7 @@ class PusherStore(SQLBaseStore):
                    profile_tag=""):
         with self._pushers_id_gen.get_next() as stream_id:
             def f(txn):
-                txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
-                return self._simple_upsert_txn(
+                newly_inserted = self._simple_upsert_txn(
                     txn,
                     "pushers",
                     {
@@ -178,11 +177,18 @@ class PusherStore(SQLBaseStore):
                         "id": stream_id,
                     },
                 )
-        defer.returnValue((yield self.runInteraction("add_pusher", f)))
+                if newly_inserted:
+                    # get_users_with_pushers_in_room only cares if the user has
+                    # at least *one* pusher.
+                    txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+
+            yield self.runInteraction("add_pusher", f)
 
     @defer.inlineCallbacks
     def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
         def delete_pusher_txn(txn, stream_id):
+            txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
+
             self._simple_delete_one_txn(
                 txn,
                 "pushers",
@@ -194,6 +200,7 @@ class PusherStore(SQLBaseStore):
                 {"app_id": app_id, "pushkey": pushkey, "user_id": user_id},
                 {"stream_id": stream_id},
             )
+
         with self._pushers_id_gen.get_next() as stream_id:
             yield self.runInteraction(
                 "delete_pusher", delete_pusher_txn, stream_id
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 935fc503d9..94be820f86 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -100,7 +100,7 @@ class ReceiptsStore(SQLBaseStore):
 
         defer.returnValue([ev for res in results.values() for ev in res])
 
-    @cachedInlineCallbacks(num_args=3, max_entries=5000)
+    @cachedInlineCallbacks(num_args=3, max_entries=5000, lru=True, tree=True)
     def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
         """Get receipts for a single room for sending to clients.
 
@@ -232,7 +232,7 @@ class ReceiptsStore(SQLBaseStore):
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
-        txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
+        txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
 
         txn.call_after(
             self._receipts_stream_cache.entity_has_changed,
@@ -244,6 +244,15 @@ class ReceiptsStore(SQLBaseStore):
             (user_id, room_id, receipt_type)
         )
 
+        res = self._simple_select_one_txn(
+            txn,
+            table="events",
+            retcols=["topological_ordering", "stream_ordering"],
+            keyvalues={"event_id": event_id},
+        )
+        topological_ordering = int(res["topological_ordering"])
+        stream_ordering = int(res["stream_ordering"])
+
         # We don't want to clobber receipts for more recent events, so we
         # have to compare orderings of existing receipts
         sql = (
@@ -256,15 +265,6 @@ class ReceiptsStore(SQLBaseStore):
         results = txn.fetchall()
 
         if results:
-            res = self._simple_select_one_txn(
-                txn,
-                table="events",
-                retcols=["topological_ordering", "stream_ordering"],
-                keyvalues={"event_id": event_id},
-            )
-            topological_ordering = int(res["topological_ordering"])
-            stream_ordering = int(res["stream_ordering"])
-
             for to, so, _ in results:
                 if int(to) > topological_ordering:
                     return False
@@ -294,6 +294,14 @@ class ReceiptsStore(SQLBaseStore):
             }
         )
 
+        if receipt_type == "m.read":
+            self._remove_push_actions_before_txn(
+                txn,
+                room_id=room_id,
+                user_id=user_id,
+                topological_ordering=topological_ordering,
+            )
+
         return True
 
     @defer.inlineCallbacks
@@ -367,7 +375,7 @@ class ReceiptsStore(SQLBaseStore):
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
-        txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
+        txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
 
         self._simple_delete_txn(
             txn,
diff --git a/synapse/storage/schema/delta/32/remove_indices.sql b/synapse/storage/schema/delta/32/remove_indices.sql
new file mode 100644
index 0000000000..f859be46a6
--- /dev/null
+++ b/synapse/storage/schema/delta/32/remove_indices.sql
@@ -0,0 +1,38 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- The following indices are redundant, other indices are equivalent or
+-- supersets
+DROP INDEX IF EXISTS events_room_id; -- Prefix of events_room_stream
+DROP INDEX IF EXISTS events_order; -- Prefix of events_order_topo_stream_room
+DROP INDEX IF EXISTS events_topological_ordering; -- Prefix of events_order_topo_stream_room
+DROP INDEX IF EXISTS events_stream_ordering; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS state_groups_id; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS event_to_state_groups_id; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS event_push_actions_room_id_event_id_user_id_profile_tag; -- Duplicate of UNIQUE CONSTRAINT
+
+DROP INDEX IF EXISTS event_destinations_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS st_extrem_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_content_hashes_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_signatures_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_edge_hashes_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS redactions_event_id; -- Duplicate of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS room_hosts_room_id; -- Prefix of UNIQUE CONSTRAINT
+
+-- The following indices were unused
+DROP INDEX IF EXISTS remote_media_cache_thumbnails_media_id;
+DROP INDEX IF EXISTS evauth_edges_auth_id;
+DROP INDEX IF EXISTS presence_stream_state;
diff --git a/synapse/visibility.py b/synapse/visibility.py
new file mode 100644
index 0000000000..948ad51772
--- /dev/null
+++ b/synapse/visibility.py
@@ -0,0 +1,210 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 - 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.constants import Membership, EventTypes
+
+from synapse.util.logcontext import preserve_fn
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+VISIBILITY_PRIORITY = (
+    "world_readable",
+    "shared",
+    "invited",
+    "joined",
+)
+
+
+MEMBERSHIP_PRIORITY = (
+    Membership.JOIN,
+    Membership.INVITE,
+    Membership.KNOCK,
+    Membership.LEAVE,
+    Membership.BAN,
+)
+
+
+@defer.inlineCallbacks
+def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
+    """ Returns dict of user_id -> list of events that user is allowed to
+    see.
+
+    Args:
+        user_tuples (str, bool): (user id, is_peeking) for each user to be
+            checked. is_peeking should be true if:
+            * the user is not currently a member of the room, and:
+            * the user has not been a member of the room since the
+            given events
+        events ([synapse.events.EventBase]): list of events to filter
+    """
+    forgotten = yield defer.gatherResults([
+        preserve_fn(store.who_forgot_in_room)(
+            room_id,
+        )
+        for room_id in frozenset(e.room_id for e in events)
+    ], consumeErrors=True)
+
+    # Set of membership event_ids that have been forgotten
+    event_id_forgotten = frozenset(
+        row["event_id"] for rows in forgotten for row in rows
+    )
+
+    ignore_dict_content = yield store.get_global_account_data_by_type_for_users(
+        "m.ignored_user_list", user_ids=[user_id for user_id, _ in user_tuples]
+    )
+
+    # FIXME: This will explode if people upload something incorrect.
+    ignore_dict = {
+        user_id: frozenset(
+            content.get("ignored_users", {}).keys() if content else []
+        )
+        for user_id, content in ignore_dict_content.items()
+    }
+
+    def allowed(event, user_id, is_peeking, ignore_list):
+        """
+        Args:
+            event (synapse.events.EventBase): event to check
+            user_id (str)
+            is_peeking (bool)
+            ignore_list (list): list of users to ignore
+        """
+        if not event.is_state() and event.sender in ignore_list:
+            return False
+
+        state = event_id_to_state[event.event_id]
+
+        # get the room_visibility at the time of the event.
+        visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
+        if visibility_event:
+            visibility = visibility_event.content.get("history_visibility", "shared")
+        else:
+            visibility = "shared"
+
+        if visibility not in VISIBILITY_PRIORITY:
+            visibility = "shared"
+
+        # if it was world_readable, it's easy: everyone can read it
+        if visibility == "world_readable":
+            return True
+
+        # Always allow history visibility events on boundaries. This is done
+        # by setting the effective visibility to the least restrictive
+        # of the old vs new.
+        if event.type == EventTypes.RoomHistoryVisibility:
+            prev_content = event.unsigned.get("prev_content", {})
+            prev_visibility = prev_content.get("history_visibility", None)
+
+            if prev_visibility not in VISIBILITY_PRIORITY:
+                prev_visibility = "shared"
+
+            new_priority = VISIBILITY_PRIORITY.index(visibility)
+            old_priority = VISIBILITY_PRIORITY.index(prev_visibility)
+            if old_priority < new_priority:
+                visibility = prev_visibility
+
+        # likewise, if the event is the user's own membership event, use
+        # the 'most joined' membership
+        membership = None
+        if event.type == EventTypes.Member and event.state_key == user_id:
+            membership = event.content.get("membership", None)
+            if membership not in MEMBERSHIP_PRIORITY:
+                membership = "leave"
+
+            prev_content = event.unsigned.get("prev_content", {})
+            prev_membership = prev_content.get("membership", None)
+            if prev_membership not in MEMBERSHIP_PRIORITY:
+                prev_membership = "leave"
+
+            new_priority = MEMBERSHIP_PRIORITY.index(membership)
+            old_priority = MEMBERSHIP_PRIORITY.index(prev_membership)
+            if old_priority < new_priority:
+                membership = prev_membership
+
+        # otherwise, get the user's membership at the time of the event.
+        if membership is None:
+            membership_event = state.get((EventTypes.Member, user_id), None)
+            if membership_event:
+                if membership_event.event_id not in event_id_forgotten:
+                    membership = membership_event.membership
+
+        # if the user was a member of the room at the time of the event,
+        # they can see it.
+        if membership == Membership.JOIN:
+            return True
+
+        if visibility == "joined":
+            # we weren't a member at the time of the event, so we can't
+            # see this event.
+            return False
+
+        elif visibility == "invited":
+            # user can also see the event if they were *invited* at the time
+            # of the event.
+            return membership == Membership.INVITE
+
+        else:
+            # visibility is shared: user can also see the event if they have
+            # become a member since the event
+            #
+            # XXX: if the user has subsequently joined and then left again,
+            # ideally we would share history up to the point they left. But
+            # we don't know when they left.
+            return not is_peeking
+
+    defer.returnValue({
+        user_id: [
+            event
+            for event in events
+            if allowed(event, user_id, is_peeking, ignore_dict.get(user_id, []))
+        ]
+        for user_id, is_peeking in user_tuples
+    })
+
+
+@defer.inlineCallbacks
+def filter_events_for_client(store, user_id, events, is_peeking=False):
+    """
+    Check which events a user is allowed to see
+
+    Args:
+        user_id(str): user id to be checked
+        events([synapse.events.EventBase]): list of events to be checked
+        is_peeking(bool): should be True if:
+          * the user is not currently a member of the room, and:
+          * the user has not been a member of the room since the given
+            events
+
+    Returns:
+        [synapse.events.EventBase]
+    """
+    types = (
+        (EventTypes.RoomHistoryVisibility, ""),
+        (EventTypes.Member, user_id),
+    )
+    event_id_to_state = yield store.get_state_for_events(
+        frozenset(e.event_id for e in events),
+        types=types
+    )
+    res = yield filter_events_for_clients(
+        store, [(user_id, is_peeking)], events, event_id_to_state
+    )
+    defer.returnValue(res.get(user_id, []))