diff options
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r-- | synapse/handlers/message.py | 112 |
1 files changed, 92 insertions, 20 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e324662f18..f12465fa2c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -113,11 +113,21 @@ class MessageHandler(BaseHandler): "room_key", next_key ) + if not events: + defer.returnValue({ + "chunk": [], + "start": pagin_config.from_token.to_string(), + "end": next_token.to_string(), + }) + + events = yield self._filter_events_for_client(user_id, room_id, events) + time_now = self.clock.time_msec() chunk = { "chunk": [ - serialize_event(e, time_now, as_client_event) for e in events + serialize_event(e, time_now, as_client_event) + for e in events ], "start": pagin_config.from_token.to_string(), "end": next_token.to_string(), @@ -126,6 +136,52 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks + def _filter_events_for_client(self, user_id, room_id, events): + event_id_to_state = yield self.store.get_state_for_events( + room_id, frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, user_id), + ) + ) + + def allowed(event, state): + if event.type == EventTypes.RoomHistoryVisibility: + return True + + membership_ev = state.get((EventTypes.Member, user_id), None) + if membership_ev: + membership = membership_ev.membership + else: + membership = Membership.LEAVE + + if membership == Membership.JOIN: + return True + + history = state.get((EventTypes.RoomHistoryVisibility, ''), None) + if history: + visibility = history.content.get("history_visibility", "shared") + else: + visibility = "shared" + + if visibility == "public": + return True + elif visibility == "shared": + return True + elif visibility == "joined": + return membership == Membership.JOIN + elif visibility == "invited": + return membership == Membership.INVITE + + return True + + defer.returnValue([ + event + for event in events + if allowed(event, event_id_to_state[event.event_id]) + ]) + + @defer.inlineCallbacks def create_and_send_event(self, event_dict, ratelimit=True, client=None, txn_id=None): """ Given a dict from a client, create and handle a new event. @@ -278,6 +334,11 @@ class MessageHandler(BaseHandler): user, pagination_config.get_source_config("presence"), None ) + receipt_stream = self.hs.get_event_sources().sources["receipt"] + receipt, _ = yield receipt_stream.get_pagination_rows( + user, pagination_config.get_source_config("receipt"), None + ) + public_room_ids = yield self.store.get_public_room_ids() limit = pagin_config.limit @@ -316,6 +377,10 @@ class MessageHandler(BaseHandler): ] ).addErrback(unwrapFirstError) + messages = yield self._filter_events_for_client( + user_id, event.room_id, messages + ) + start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) time_now = self.clock.time_msec() @@ -336,15 +401,20 @@ class MessageHandler(BaseHandler): except: logger.exception("Failed to get snapshot") - yield defer.gatherResults( - [handle_room(e) for e in room_list], - consumeErrors=True - ).addErrback(unwrapFirstError) + # Only do N rooms at once + n = 5 + d_list = [handle_room(e) for e in room_list] + for i in range(0, len(d_list), n): + yield defer.gatherResults( + d_list[i:i + n], + consumeErrors=True + ).addErrback(unwrapFirstError) ret = { "rooms": rooms_ret, "presence": presence, - "end": now_token.to_string() + "receipts": receipt, + "end": now_token.to_string(), } defer.returnValue(ret) @@ -390,24 +460,21 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_presence(): - presence_defs = yield defer.DeferredList( - [ - presence_handler.get_state( - target_user=UserID.from_string(m.user_id), - auth_user=auth_user, - as_event=True, - check_auth=False, - ) - for m in room_members - ], - consumeErrors=True, + states = yield presence_handler.get_states( + target_users=[UserID.from_string(m.user_id) for m in room_members], + auth_user=auth_user, + as_event=True, + check_auth=False, ) - defer.returnValue([p for success, p in presence_defs if success]) + defer.returnValue(states.values()) - presence, (messages, token) = yield defer.gatherResults( + receipts_handler = self.hs.get_handlers().receipts_handler + + presence, receipts, (messages, token) = yield defer.gatherResults( [ get_presence(), + receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key), self.store.get_recent_events_for_room( room_id, limit=limit, @@ -417,6 +484,10 @@ class MessageHandler(BaseHandler): consumeErrors=True, ).addErrback(unwrapFirstError) + messages = yield self._filter_events_for_client( + user_id, room_id, messages + ) + start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) @@ -431,5 +502,6 @@ class MessageHandler(BaseHandler): "end": end_token.to_string(), }, "state": state, - "presence": presence + "presence": presence, + "receipts": receipts, }) |