summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py320
1 files changed, 156 insertions, 164 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 3f1cda5b0b..ddeed27965 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -19,6 +19,7 @@ from synapse.streams.config import PaginationConfig
 from synapse.api.constants import Membership, EventTypes
 from synapse.util import unwrapFirstError
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.metrics import Measure
 
 from twisted.internet import defer
 
@@ -178,18 +179,6 @@ class SyncHandler(BaseHandler):
         else:
             return self.incremental_sync_with_gap(sync_config, since_token)
 
-    def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room):
-        if room_id not in ephemeral_by_room:
-            return None
-        for e in ephemeral_by_room[room_id]:
-            if e['type'] != 'm.receipt':
-                continue
-            for receipt_event_id, val in e['content'].items():
-                if 'm.read' in val:
-                    if user_id in val['m.read']:
-                        return receipt_event_id
-        return None
-
     @defer.inlineCallbacks
     def full_state_sync(self, sync_config, timeline_since_token):
         """Get a sync for a client which is starting without any state.
@@ -318,7 +307,6 @@ class SyncHandler(BaseHandler):
             ephemeral_by_room=ephemeral_by_room,
             tags_by_room=tags_by_room,
             account_data_by_room=account_data_by_room,
-            all_ephemeral_by_room=ephemeral_by_room,
             batch=batch,
             full_state=True,
         )
@@ -368,50 +356,51 @@ class SyncHandler(BaseHandler):
             typing events for that room.
         """
 
-        typing_key = since_token.typing_key if since_token else "0"
+        with Measure(self.clock, "ephemeral_by_room"):
+            typing_key = since_token.typing_key if since_token else "0"
 
-        rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
-        room_ids = [room.room_id for room in rooms]
+            rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+            room_ids = [room.room_id for room in rooms]
 
-        typing_source = self.event_sources.sources["typing"]
-        typing, typing_key = yield typing_source.get_new_events(
-            user=sync_config.user,
-            from_key=typing_key,
-            limit=sync_config.filter_collection.ephemeral_limit(),
-            room_ids=room_ids,
-            is_guest=sync_config.is_guest,
-        )
-        now_token = now_token.copy_and_replace("typing_key", typing_key)
-
-        ephemeral_by_room = {}
-
-        for event in typing:
-            # we want to exclude the room_id from the event, but modifying the
-            # result returned by the event source is poor form (it might cache
-            # the object)
-            room_id = event["room_id"]
-            event_copy = {k: v for (k, v) in event.iteritems()
-                          if k != "room_id"}
-            ephemeral_by_room.setdefault(room_id, []).append(event_copy)
-
-        receipt_key = since_token.receipt_key if since_token else "0"
-
-        receipt_source = self.event_sources.sources["receipt"]
-        receipts, receipt_key = yield receipt_source.get_new_events(
-            user=sync_config.user,
-            from_key=receipt_key,
-            limit=sync_config.filter_collection.ephemeral_limit(),
-            room_ids=room_ids,
-            is_guest=sync_config.is_guest,
-        )
-        now_token = now_token.copy_and_replace("receipt_key", receipt_key)
+            typing_source = self.event_sources.sources["typing"]
+            typing, typing_key = yield typing_source.get_new_events(
+                user=sync_config.user,
+                from_key=typing_key,
+                limit=sync_config.filter_collection.ephemeral_limit(),
+                room_ids=room_ids,
+                is_guest=sync_config.is_guest,
+            )
+            now_token = now_token.copy_and_replace("typing_key", typing_key)
+
+            ephemeral_by_room = {}
+
+            for event in typing:
+                # we want to exclude the room_id from the event, but modifying the
+                # result returned by the event source is poor form (it might cache
+                # the object)
+                room_id = event["room_id"]
+                event_copy = {k: v for (k, v) in event.iteritems()
+                              if k != "room_id"}
+                ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+
+            receipt_key = since_token.receipt_key if since_token else "0"
+
+            receipt_source = self.event_sources.sources["receipt"]
+            receipts, receipt_key = yield receipt_source.get_new_events(
+                user=sync_config.user,
+                from_key=receipt_key,
+                limit=sync_config.filter_collection.ephemeral_limit(),
+                room_ids=room_ids,
+                is_guest=sync_config.is_guest,
+            )
+            now_token = now_token.copy_and_replace("receipt_key", receipt_key)
 
-        for event in receipts:
-            room_id = event["room_id"]
-            # exclude room id, as above
-            event_copy = {k: v for (k, v) in event.iteritems()
-                          if k != "room_id"}
-            ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+            for event in receipts:
+                room_id = event["room_id"]
+                # exclude room id, as above
+                event_copy = {k: v for (k, v) in event.iteritems()
+                              if k != "room_id"}
+                ephemeral_by_room.setdefault(room_id, []).append(event_copy)
 
         defer.returnValue((now_token, ephemeral_by_room))
 
@@ -451,13 +440,6 @@ class SyncHandler(BaseHandler):
         )
         now_token = now_token.copy_and_replace("presence_key", presence_key)
 
-        # We now fetch all ephemeral events for this room in order to get
-        # this users current read receipt. This could almost certainly be
-        # optimised.
-        _, all_ephemeral_by_room = yield self.ephemeral_by_room(
-            sync_config, now_token
-        )
-
         now_token, ephemeral_by_room = yield self.ephemeral_by_room(
             sync_config, now_token, since_token
         )
@@ -589,7 +571,6 @@ class SyncHandler(BaseHandler):
                 ephemeral_by_room=ephemeral_by_room,
                 tags_by_room=tags_by_room,
                 account_data_by_room=account_data_by_room,
-                all_ephemeral_by_room=all_ephemeral_by_room,
                 batch=batch,
                 full_state=full_state,
             )
@@ -619,58 +600,64 @@ class SyncHandler(BaseHandler):
         """
         :returns a Deferred TimelineBatch
         """
-        filtering_factor = 2
-        timeline_limit = sync_config.filter_collection.timeline_limit()
-        load_limit = max(timeline_limit * filtering_factor, 10)
-        max_repeat = 5  # Only try a few times per room, otherwise
-        room_key = now_token.room_key
-        end_key = room_key
-
-        limited = recents is None or newly_joined_room or timeline_limit < len(recents)
-
-        if recents is not None:
-            recents = sync_config.filter_collection.filter_room_timeline(recents)
-            recents = yield self._filter_events_for_client(
-                sync_config.user.to_string(),
-                recents,
-                is_peeking=sync_config.is_guest,
-            )
-        else:
-            recents = []
-
-        since_key = None
-        if since_token and not newly_joined_room:
-            since_key = since_token.room_key
-
-        while limited and len(recents) < timeline_limit and max_repeat:
-            events, end_key = yield self.store.get_room_events_stream_for_room(
-                room_id,
-                limit=load_limit + 1,
-                from_key=since_key,
-                to_key=end_key,
-            )
-            loaded_recents = sync_config.filter_collection.filter_room_timeline(events)
-            loaded_recents = yield self._filter_events_for_client(
-                sync_config.user.to_string(),
-                loaded_recents,
-                is_peeking=sync_config.is_guest,
-            )
-            loaded_recents.extend(recents)
-            recents = loaded_recents
-
-            if len(events) <= load_limit:
+        with Measure(self.clock, "load_filtered_recents"):
+            filtering_factor = 2
+            timeline_limit = sync_config.filter_collection.timeline_limit()
+            load_limit = max(timeline_limit * filtering_factor, 10)
+            max_repeat = 5  # Only try a few times per room, otherwise
+            room_key = now_token.room_key
+            end_key = room_key
+
+            if recents is None or newly_joined_room or timeline_limit < len(recents):
+                limited = True
+            else:
                 limited = False
-                break
-            max_repeat -= 1
 
-        if len(recents) > timeline_limit:
-            limited = True
-            recents = recents[-timeline_limit:]
-            room_key = recents[0].internal_metadata.before
+            if recents is not None:
+                recents = sync_config.filter_collection.filter_room_timeline(recents)
+                recents = yield self._filter_events_for_client(
+                    sync_config.user.to_string(),
+                    recents,
+                    is_peeking=sync_config.is_guest,
+                )
+            else:
+                recents = []
+
+            since_key = None
+            if since_token and not newly_joined_room:
+                since_key = since_token.room_key
+
+            while limited and len(recents) < timeline_limit and max_repeat:
+                events, end_key = yield self.store.get_room_events_stream_for_room(
+                    room_id,
+                    limit=load_limit + 1,
+                    from_key=since_key,
+                    to_key=end_key,
+                )
+                loaded_recents = sync_config.filter_collection.filter_room_timeline(
+                    events
+                )
+                loaded_recents = yield self._filter_events_for_client(
+                    sync_config.user.to_string(),
+                    loaded_recents,
+                    is_peeking=sync_config.is_guest,
+                )
+                loaded_recents.extend(recents)
+                recents = loaded_recents
 
-        prev_batch_token = now_token.copy_and_replace(
-            "room_key", room_key
-        )
+                if len(events) <= load_limit:
+                    limited = False
+                    break
+                max_repeat -= 1
+
+            if len(recents) > timeline_limit:
+                limited = True
+                recents = recents[-timeline_limit:]
+                room_key = recents[0].internal_metadata.before
+
+            prev_batch_token = now_token.copy_and_replace(
+                "room_key", room_key
+            )
 
         defer.returnValue(TimelineBatch(
             events=recents,
@@ -683,7 +670,6 @@ class SyncHandler(BaseHandler):
                                            since_token, now_token,
                                            ephemeral_by_room, tags_by_room,
                                            account_data_by_room,
-                                           all_ephemeral_by_room,
                                            batch, full_state=False):
         state = yield self.compute_state_delta(
             room_id, batch, sync_config, since_token, now_token,
@@ -714,7 +700,7 @@ class SyncHandler(BaseHandler):
 
         if room_sync:
             notifs = yield self.unread_notifs_for_room_id(
-                room_id, sync_config, all_ephemeral_by_room
+                room_id, sync_config
             )
 
             if notifs is not None:
@@ -831,50 +817,53 @@ class SyncHandler(BaseHandler):
         # updates even if they occured logically before the previous event.
         # TODO(mjark) Check for new redactions in the state events.
 
-        if full_state:
-            if batch:
-                state = yield self.store.get_state_for_event(batch.events[0].event_id)
-            else:
-                state = yield self.get_state_at(
-                    room_id, stream_position=now_token
-                )
+        with Measure(self.clock, "compute_state_delta"):
+            if full_state:
+                if batch:
+                    state = yield self.store.get_state_for_event(
+                        batch.events[0].event_id
+                    )
+                else:
+                    state = yield self.get_state_at(
+                        room_id, stream_position=now_token
+                    )
 
-            timeline_state = {
-                (event.type, event.state_key): event
-                for event in batch.events if event.is_state()
-            }
+                timeline_state = {
+                    (event.type, event.state_key): event
+                    for event in batch.events if event.is_state()
+                }
 
-            state = _calculate_state(
-                timeline_contains=timeline_state,
-                timeline_start=state,
-                previous={},
-            )
-        elif batch.limited:
-            state_at_previous_sync = yield self.get_state_at(
-                room_id, stream_position=since_token
-            )
+                state = _calculate_state(
+                    timeline_contains=timeline_state,
+                    timeline_start=state,
+                    previous={},
+                )
+            elif batch.limited:
+                state_at_previous_sync = yield self.get_state_at(
+                    room_id, stream_position=since_token
+                )
 
-            state_at_timeline_start = yield self.store.get_state_for_event(
-                batch.events[0].event_id
-            )
+                state_at_timeline_start = yield self.store.get_state_for_event(
+                    batch.events[0].event_id
+                )
 
-            timeline_state = {
-                (event.type, event.state_key): event
-                for event in batch.events if event.is_state()
-            }
+                timeline_state = {
+                    (event.type, event.state_key): event
+                    for event in batch.events if event.is_state()
+                }
 
-            state = _calculate_state(
-                timeline_contains=timeline_state,
-                timeline_start=state_at_timeline_start,
-                previous=state_at_previous_sync,
-            )
-        else:
-            state = {}
+                state = _calculate_state(
+                    timeline_contains=timeline_state,
+                    timeline_start=state_at_timeline_start,
+                    previous=state_at_previous_sync,
+                )
+            else:
+                state = {}
 
-        defer.returnValue({
-            (e.type, e.state_key): e
-            for e in sync_config.filter_collection.filter_room_state(state.values())
-        })
+            defer.returnValue({
+                (e.type, e.state_key): e
+                for e in sync_config.filter_collection.filter_room_state(state.values())
+            })
 
     def check_joined_room(self, sync_config, state_delta):
         """
@@ -895,21 +884,24 @@ class SyncHandler(BaseHandler):
         return False
 
     @defer.inlineCallbacks
-    def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room):
-        last_unread_event_id = self.last_read_event_id_for_room_and_user(
-            room_id, sync_config.user.to_string(), ephemeral_by_room
-        )
-
-        notifs = []
-        if last_unread_event_id:
-            notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
-                room_id, sync_config.user.to_string(), last_unread_event_id
+    def unread_notifs_for_room_id(self, room_id, sync_config):
+        with Measure(self.clock, "unread_notifs_for_room_id"):
+            last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user(
+                user_id=sync_config.user.to_string(),
+                room_id=room_id,
+                receipt_type="m.read"
             )
-            defer.returnValue(notifs)
 
-        # There is no new information in this period, so your notification
-        # count is whatever it was last time.
-        defer.returnValue(None)
+            notifs = []
+            if last_unread_event_id:
+                notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
+                    room_id, sync_config.user.to_string(), last_unread_event_id
+                )
+                defer.returnValue(notifs)
+
+            # There is no new information in this period, so your notification
+            # count is whatever it was last time.
+            defer.returnValue(None)
 
 
 def _action_has_highlight(actions):