summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2018-03-11 20:01:41 +0000
committerMatthew Hodgson <matthew@matrix.org>2018-03-11 20:01:41 +0000
commit9b334b3f97057ac145622d2e4d0ad036ef27b468 (patch)
treeb8e7bbc795d75c0d82d268e8ba28b8fdd4b3a8c6 /synapse/handlers
parentMerge pull request #2944 from matrix-org/erikj/fix_sync_race (diff)
downloadsynapse-9b334b3f97057ac145622d2e4d0ad036ef27b468.tar.xz
WIP experiment in lazyloading room members
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/sync.py43
1 files changed, 31 insertions, 12 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0f713ce038..809e9fece9 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -399,7 +399,7 @@ class SyncHandler(object):
         ))
 
     @defer.inlineCallbacks
-    def get_state_after_event(self, event):
+    def get_state_after_event(self, event, types=None):
         """
         Get the room state after the given event
 
@@ -409,14 +409,14 @@ class SyncHandler(object):
         Returns:
             A Deferred map from ((type, state_key)->Event)
         """
-        state_ids = yield self.store.get_state_ids_for_event(event.event_id)
+        state_ids = yield self.store.get_state_ids_for_event(event.event_id, types)
         if event.is_state():
             state_ids = state_ids.copy()
             state_ids[(event.type, event.state_key)] = event.event_id
         defer.returnValue(state_ids)
 
     @defer.inlineCallbacks
-    def get_state_at(self, room_id, stream_position):
+    def get_state_at(self, room_id, stream_position, types=None):
         """ Get the room state at a particular stream position
 
         Args:
@@ -432,7 +432,7 @@ class SyncHandler(object):
 
         if last_events:
             last_event = last_events[-1]
-            state = yield self.get_state_after_event(last_event)
+            state = yield self.get_state_after_event(last_event, types)
 
         else:
             # no events in this room - so presumably no state
@@ -441,7 +441,7 @@ class SyncHandler(object):
 
     @defer.inlineCallbacks
     def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
-                            full_state):
+                            full_state, filter_members):
         """ Works out the differnce in state between the start of the timeline
         and the previous sync.
 
@@ -454,6 +454,8 @@ class SyncHandler(object):
                 be None.
             now_token(str): Token of the end of the current batch.
             full_state(bool): Whether to force returning the full state.
+            filter_members(bool): Whether to only return state for members
+                referenced in this timeline segment
 
         Returns:
              A deferred new event dictionary
@@ -464,18 +466,35 @@ class SyncHandler(object):
         # TODO(mjark) Check for new redactions in the state events.
 
         with Measure(self.clock, "compute_state_delta"):
+
+            types = None
+            if filter_members:
+                # We only request state for the members needed to display the
+                # timeline:
+                types = (
+                    (EventTypes.Member, state_key)
+                    for state_key in set(
+                        event.sender  # FIXME: we also care about targets etc.
+                        for event in batch.events
+                    )
+                )
+                types.append((None, None))  # don't just filter to room members
+
+                # TODO: we should opportunistically deduplicate these members too
+                # within the same sync series (based on an in-memory cache)
+
             if full_state:
                 if batch:
                     current_state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[-1].event_id
+                        batch.events[-1].event_id, types=types
                     )
 
                     state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[0].event_id
+                        batch.events[0].event_id, types=types
                     )
                 else:
                     current_state_ids = yield self.get_state_at(
-                        room_id, stream_position=now_token
+                        room_id, stream_position=now_token, types=types
                     )
 
                     state_ids = current_state_ids
@@ -493,15 +512,15 @@ class SyncHandler(object):
                 )
             elif batch.limited:
                 state_at_previous_sync = yield self.get_state_at(
-                    room_id, stream_position=since_token
+                    room_id, stream_position=since_token, types=types
                 )
 
                 current_state_ids = yield self.store.get_state_ids_for_event(
-                    batch.events[-1].event_id
+                    batch.events[-1].event_id, types=types
                 )
 
                 state_at_timeline_start = yield self.store.get_state_ids_for_event(
-                    batch.events[0].event_id
+                    batch.events[0].event_id, types=types
                 )
 
                 timeline_state = {
@@ -1325,7 +1344,7 @@ class SyncHandler(object):
 
         state = yield self.compute_state_delta(
             room_id, batch, sync_config, since_token, now_token,
-            full_state=full_state
+            full_state=full_state, filter_members=True
         )
 
         if room_builder.rtype == "joined":