summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2015-08-12 17:07:22 +0100
committerMark Haines <mark.haines@matrix.org>2015-08-12 17:21:14 +0100
commit998a72d4d9ec6e73000888dcdf51437ec427fbee (patch)
treef8fa9d5deb820b49eb3a216e194ee83e14fb9eda /synapse/handlers/message.py
parentBump the version of twisted needed for setup_requires to 15.2.1 (diff)
parentMerge pull request #220 from matrix-org/markjh/generate_keys (diff)
downloadsynapse-998a72d4d9ec6e73000888dcdf51437ec427fbee.tar.xz
Merge branch 'develop' into markjh/twisted-15
Conflicts:
	synapse/http/matrixfederationclient.py
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py132
1 files changed, 108 insertions, 24 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 867fdbefb0..9d6d4f0978 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -113,11 +113,21 @@ class MessageHandler(BaseHandler):
             "room_key", next_key
         )
 
+        if not events:
+            defer.returnValue({
+                "chunk": [],
+                "start": pagin_config.from_token.to_string(),
+                "end": next_token.to_string(),
+            })
+
+        events = yield self._filter_events_for_client(user_id, room_id, events)
+
         time_now = self.clock.time_msec()
 
         chunk = {
             "chunk": [
-                serialize_event(e, time_now, as_client_event) for e in events
+                serialize_event(e, time_now, as_client_event)
+                for e in events
             ],
             "start": pagin_config.from_token.to_string(),
             "end": next_token.to_string(),
@@ -126,6 +136,52 @@ class MessageHandler(BaseHandler):
         defer.returnValue(chunk)
 
     @defer.inlineCallbacks
+    def _filter_events_for_client(self, user_id, room_id, events):
+        states = yield self.store.get_state_for_events(
+            room_id, [e.event_id for e in events],
+        )
+
+        events_and_states = zip(events, states)
+
+        def allowed(event_and_state):
+            event, state = event_and_state
+
+            if event.type == EventTypes.RoomHistoryVisibility:
+                return True
+
+            membership_ev = state.get((EventTypes.Member, user_id), None)
+            if membership_ev:
+                membership = membership_ev.membership
+            else:
+                membership = Membership.LEAVE
+
+            if membership == Membership.JOIN:
+                return True
+
+            history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
+            if history:
+                visibility = history.content.get("history_visibility", "shared")
+            else:
+                visibility = "shared"
+
+            if visibility == "public":
+                return True
+            elif visibility == "shared":
+                return True
+            elif visibility == "joined":
+                return membership == Membership.JOIN
+            elif visibility == "invited":
+                return membership == Membership.INVITE
+
+            return True
+
+        events_and_states = filter(allowed, events_and_states)
+        defer.returnValue([
+            ev
+            for ev, _ in events_and_states
+        ])
+
+    @defer.inlineCallbacks
     def create_and_send_event(self, event_dict, ratelimit=True,
                               client=None, txn_id=None):
         """ Given a dict from a client, create and handle a new event.
@@ -278,6 +334,11 @@ class MessageHandler(BaseHandler):
             user, pagination_config.get_source_config("presence"), None
         )
 
+        receipt_stream = self.hs.get_event_sources().sources["receipt"]
+        receipt, _ = yield receipt_stream.get_pagination_rows(
+            user, pagination_config.get_source_config("receipt"), None
+        )
+
         public_room_ids = yield self.store.get_public_room_ids()
 
         limit = pagin_config.limit
@@ -316,6 +377,10 @@ class MessageHandler(BaseHandler):
                     ]
                 ).addErrback(unwrapFirstError)
 
+                messages = yield self._filter_events_for_client(
+                    user_id, event.room_id, messages
+                )
+
                 start_token = now_token.copy_and_replace("room_key", token[0])
                 end_token = now_token.copy_and_replace("room_key", token[1])
                 time_now = self.clock.time_msec()
@@ -344,7 +409,8 @@ class MessageHandler(BaseHandler):
         ret = {
             "rooms": rooms_ret,
             "presence": presence,
-            "end": now_token.to_string()
+            "receipts": receipt,
+            "end": now_token.to_string(),
         }
 
         defer.returnValue(ret)
@@ -380,15 +446,6 @@ class MessageHandler(BaseHandler):
         if limit is None:
             limit = 10
 
-        messages, token = yield self.store.get_recent_events_for_room(
-            room_id,
-            limit=limit,
-            end_token=now_token.room_key,
-        )
-
-        start_token = now_token.copy_and_replace("room_key", token[0])
-        end_token = now_token.copy_and_replace("room_key", token[1])
-
         room_members = [
             m for m in current_state.values()
             if m.type == EventTypes.Member
@@ -396,19 +453,45 @@ class MessageHandler(BaseHandler):
         ]
 
         presence_handler = self.hs.get_handlers().presence_handler
-        presence = []
-        for m in room_members:
-            try:
-                member_presence = yield presence_handler.get_state(
-                    target_user=UserID.from_string(m.user_id),
-                    auth_user=auth_user,
-                    as_event=True,
-                )
-                presence.append(member_presence)
-            except SynapseError:
-                logger.exception(
-                    "Failed to get member presence of %r", m.user_id
+
+        @defer.inlineCallbacks
+        def get_presence():
+            presence_defs = yield defer.DeferredList(
+                [
+                    presence_handler.get_state(
+                        target_user=UserID.from_string(m.user_id),
+                        auth_user=auth_user,
+                        as_event=True,
+                        check_auth=False,
+                    )
+                    for m in room_members
+                ],
+                consumeErrors=True,
+            )
+
+            defer.returnValue([p for success, p in presence_defs if success])
+
+        receipts_handler = self.hs.get_handlers().receipts_handler
+
+        presence, receipts, (messages, token) = yield defer.gatherResults(
+            [
+                get_presence(),
+                receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key),
+                self.store.get_recent_events_for_room(
+                    room_id,
+                    limit=limit,
+                    end_token=now_token.room_key,
                 )
+            ],
+            consumeErrors=True,
+        ).addErrback(unwrapFirstError)
+
+        messages = yield self._filter_events_for_client(
+            user_id, room_id, messages
+        )
+
+        start_token = now_token.copy_and_replace("room_key", token[0])
+        end_token = now_token.copy_and_replace("room_key", token[1])
 
         time_now = self.clock.time_msec()
 
@@ -421,5 +504,6 @@ class MessageHandler(BaseHandler):
                 "end": end_token.to_string(),
             },
             "state": state,
-            "presence": presence
+            "presence": presence,
+            "receipts": receipts,
         })