summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/api/filtering.py2
-rw-r--r--synapse/handlers/sync.py53
-rw-r--r--synapse/rest/client/v2_alpha/sync.py2
-rw-r--r--synapse/storage/stream.py21
4 files changed, 54 insertions, 24 deletions
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index b7e5d3222f..fa4de2614d 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -12,8 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from twisted.internet import defer
-
 from synapse.api.errors import SynapseError
 from synapse.types import UserID, RoomID
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 5768702192..0df1851b0e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -279,6 +279,40 @@ class SyncHandler(BaseHandler):
         ))
 
     @defer.inlineCallbacks
+    def load_filtered_recents(self, room_id, sync_config, since_token,
+                              now_token):
+        limited = True
+        recents = []
+        filtering_factor = 2
+        load_limit = max(sync_config.limit * filtering_factor, 100)
+        max_repeat = 3  # Only try a few times per room, otherwise
+        room_key = now_token.room_key
+
+        while limited and len(recents) < sync_config.limit and max_repeat:
+            events, room_key = yield self.store.get_recent_events_for_room(
+                room_id,
+                limit=load_limit + 1,
+                from_token=since_token.room_key,
+                end_token=room_key,
+            )
+            loaded_recents = sync_config.filter.filter_room_events(events)
+            loaded_recents.extend(recents)
+            recents = loaded_recents
+            if len(events) <= load_limit:
+                limited = False
+            max_repeat -= 1
+
+        if len(recents) > sync_config.limit:
+            recents = recents[-sync_config.limit:]
+            room_key = recents[0].internal_metadata.before
+
+        prev_batch_token = now_token.copy_and_replace(
+            "room_key", room_key
+        )
+
+        defer.returnValue((recents, prev_batch_token, limited))
+
+    @defer.inlineCallbacks
     def incremental_sync_with_gap_for_room(self, room_id, sync_config,
                                            since_token, now_token,
                                            published_room_ids, typing_by_room):
@@ -288,28 +322,17 @@ class SyncHandler(BaseHandler):
         Returns:
             A Deferred RoomSyncResult
         """
+
         # TODO(mjark): Check if they have joined the room between
         # the previous sync and this one.
-        # TODO(mjark): Apply the event filter in sync_config taking care to get
-        # enough events to reach the limit
         # TODO(mjark): Check for redactions we might have missed.
-        recents, token = yield self.store.get_recent_events_for_room(
-            room_id,
-            limit=sync_config.limit + 1,
-            from_token=since_token.room_key,
-            end_token=now_token.room_key,
+
+        recents, prev_batch_token, limited = self.load_filtered_recents(
+            room_id, sync_config, since_token,
         )
 
         logging.debug("Recents %r", recents)
 
-        if len(recents) > sync_config.limit:
-            limited = True
-            recents = recents[1:]
-        else:
-            limited = False
-
-        prev_batch_token = now_token.copy_and_replace("room_key", token[0])
-
         # TODO(mjark): This seems racy since this isn't being passed a
         # token to indicate what point in the stream this is
         current_state_events = yield self.state_handler.get_current_state(
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index c1277d2675..46ea50d118 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -116,7 +116,7 @@ class SyncRestServlet(RestServlet):
                 user.localpart, filter_id
             )
         except:
-           filter = Filter({})
+            filter = Filter({})
         # filter = filter.apply_overrides(http_request)
         #if filter.matches(event):
         #   # stuff
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 2ea5e1a021..73504c8b52 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -181,15 +181,11 @@ class StreamStore(SQLBaseStore):
                 get_prev_content=True
             )
 
-            for event, row in zip(ret, rows):
-                stream = row["stream_ordering"]
-                topo = event.depth
-                internal = event.internal_metadata
-                internal.before = str(_StreamToken(topo, stream - 1))
-                internal.after = str(_StreamToken(topo, stream))
+            self._set_before_and_after(ret, rows)
 
             if rows:
                 key = "s%d" % max([r["stream_ordering"] for r in rows])
+
             else:
                 # Assume we didn't get anything because there was nothing to
                 # get.
@@ -267,6 +263,8 @@ class StreamStore(SQLBaseStore):
                 get_prev_content=True
             )
 
+            self._set_before_and_after(events, rows)
+
             return events, next_token,
 
         return self.runInteraction("paginate_room_events", f)
@@ -328,6 +326,8 @@ class StreamStore(SQLBaseStore):
                 get_prev_content=True
             )
 
+            self._set_before_and_after(events, rows)
+
             return events, token
 
         return self.runInteraction(
@@ -354,3 +354,12 @@ class StreamStore(SQLBaseStore):
 
         key = res[0]["m"]
         return "s%d" % (key,)
+
+    @staticmethod
+    def _set_before_and_after(events, rows):
+        for event, row in zip(events, rows):
+            stream = row["stream_ordering"]
+            topo = event.depth
+            internal = event.internal_metadata
+            internal.before = str(_StreamToken(topo, stream - 1))
+            internal.after = str(_StreamToken(topo, stream))