diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 867fdbefb0..9d6d4f0978 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -113,11 +113,21 @@ class MessageHandler(BaseHandler):
"room_key", next_key
)
+ if not events:
+ defer.returnValue({
+ "chunk": [],
+ "start": pagin_config.from_token.to_string(),
+ "end": next_token.to_string(),
+ })
+
+ events = yield self._filter_events_for_client(user_id, room_id, events)
+
time_now = self.clock.time_msec()
chunk = {
"chunk": [
- serialize_event(e, time_now, as_client_event) for e in events
+ serialize_event(e, time_now, as_client_event)
+ for e in events
],
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
@@ -126,6 +136,52 @@ class MessageHandler(BaseHandler):
defer.returnValue(chunk)
@defer.inlineCallbacks
+ def _filter_events_for_client(self, user_id, room_id, events):
+ states = yield self.store.get_state_for_events(
+ room_id, [e.event_id for e in events],
+ )
+
+ events_and_states = zip(events, states)
+
+ def allowed(event_and_state):
+ event, state = event_and_state
+
+ if event.type == EventTypes.RoomHistoryVisibility:
+ return True
+
+ membership_ev = state.get((EventTypes.Member, user_id), None)
+ if membership_ev:
+ membership = membership_ev.membership
+ else:
+ membership = Membership.LEAVE
+
+ if membership == Membership.JOIN:
+ return True
+
+ history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
+ if history:
+ visibility = history.content.get("history_visibility", "shared")
+ else:
+ visibility = "shared"
+
+ if visibility == "public":
+ return True
+ elif visibility == "shared":
+ return True
+ elif visibility == "joined":
+ return membership == Membership.JOIN
+ elif visibility == "invited":
+ return membership == Membership.INVITE
+
+ return True
+
+ events_and_states = filter(allowed, events_and_states)
+ defer.returnValue([
+ ev
+ for ev, _ in events_and_states
+ ])
+
+ @defer.inlineCallbacks
def create_and_send_event(self, event_dict, ratelimit=True,
client=None, txn_id=None):
""" Given a dict from a client, create and handle a new event.
@@ -278,6 +334,11 @@ class MessageHandler(BaseHandler):
user, pagination_config.get_source_config("presence"), None
)
+ receipt_stream = self.hs.get_event_sources().sources["receipt"]
+ receipt, _ = yield receipt_stream.get_pagination_rows(
+ user, pagination_config.get_source_config("receipt"), None
+ )
+
public_room_ids = yield self.store.get_public_room_ids()
limit = pagin_config.limit
@@ -316,6 +377,10 @@ class MessageHandler(BaseHandler):
]
).addErrback(unwrapFirstError)
+ messages = yield self._filter_events_for_client(
+ user_id, event.room_id, messages
+ )
+
start_token = now_token.copy_and_replace("room_key", token[0])
end_token = now_token.copy_and_replace("room_key", token[1])
time_now = self.clock.time_msec()
@@ -344,7 +409,8 @@ class MessageHandler(BaseHandler):
ret = {
"rooms": rooms_ret,
"presence": presence,
- "end": now_token.to_string()
+ "receipts": receipt,
+ "end": now_token.to_string(),
}
defer.returnValue(ret)
@@ -380,15 +446,6 @@ class MessageHandler(BaseHandler):
if limit is None:
limit = 10
- messages, token = yield self.store.get_recent_events_for_room(
- room_id,
- limit=limit,
- end_token=now_token.room_key,
- )
-
- start_token = now_token.copy_and_replace("room_key", token[0])
- end_token = now_token.copy_and_replace("room_key", token[1])
-
room_members = [
m for m in current_state.values()
if m.type == EventTypes.Member
@@ -396,19 +453,45 @@ class MessageHandler(BaseHandler):
]
presence_handler = self.hs.get_handlers().presence_handler
- presence = []
- for m in room_members:
- try:
- member_presence = yield presence_handler.get_state(
- target_user=UserID.from_string(m.user_id),
- auth_user=auth_user,
- as_event=True,
- )
- presence.append(member_presence)
- except SynapseError:
- logger.exception(
- "Failed to get member presence of %r", m.user_id
+
+ @defer.inlineCallbacks
+ def get_presence():
+ presence_defs = yield defer.DeferredList(
+ [
+ presence_handler.get_state(
+ target_user=UserID.from_string(m.user_id),
+ auth_user=auth_user,
+ as_event=True,
+ check_auth=False,
+ )
+ for m in room_members
+ ],
+ consumeErrors=True,
+ )
+
+ defer.returnValue([p for success, p in presence_defs if success])
+
+ receipts_handler = self.hs.get_handlers().receipts_handler
+
+ presence, receipts, (messages, token) = yield defer.gatherResults(
+ [
+ get_presence(),
+ receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key),
+ self.store.get_recent_events_for_room(
+ room_id,
+ limit=limit,
+ end_token=now_token.room_key,
)
+ ],
+ consumeErrors=True,
+ ).addErrback(unwrapFirstError)
+
+ messages = yield self._filter_events_for_client(
+ user_id, room_id, messages
+ )
+
+ start_token = now_token.copy_and_replace("room_key", token[0])
+ end_token = now_token.copy_and_replace("room_key", token[1])
time_now = self.clock.time_msec()
@@ -421,5 +504,6 @@ class MessageHandler(BaseHandler):
"end": end_token.to_string(),
},
"state": state,
- "presence": presence
+ "presence": presence,
+ "receipts": receipts,
})
|