summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-01-25 10:10:44 +0000
committerErik Johnston <erik@matrix.org>2016-01-25 10:10:44 +0000
commit4021f95261ebdcca0ec2c3c91e8dd442a85c5ed4 (patch)
treebffe640c7934a01d6d69b2e6a00fb49361f6eefe /synapse/handlers/sync.py
parentMerge branch 'develop' of github.com:matrix-org/synapse into erikj/sync (diff)
downloadsynapse-4021f95261ebdcca0ec2c3c91e8dd442a85c5ed4.tar.xz
Move logic from rest/ to handlers/
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py189
1 files changed, 148 insertions, 41 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 53e1eb0508..9b5b4d2c9f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -17,6 +17,7 @@ from ._base import BaseHandler
 
 from synapse.streams.config import PaginationConfig
 from synapse.api.constants import Membership, EventTypes
+from synapse.api.filtering import DEFAULT_FILTER_COLLECTION
 from synapse.util import unwrapFirstError
 
 from twisted.internet import defer
@@ -29,7 +30,7 @@ logger = logging.getLogger(__name__)
 
 SyncConfig = collections.namedtuple("SyncConfig", [
     "user",
-    "filter",
+    "filter_collection",
     "is_guest",
 ])
 
@@ -130,6 +131,11 @@ class SyncHandler(BaseHandler):
         self.clock = hs.get_clock()
 
     @defer.inlineCallbacks
+    def get_sync_for_user(self, sync_config, since_token=None, timeout=0,
+                          filter_collection=DEFAULT_FILTER_COLLECTION):
+        pass
+
+    @defer.inlineCallbacks
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                full_state=False):
         """Get the sync for a client if we have new data for it now. Otherwise
@@ -142,8 +148,9 @@ class SyncHandler(BaseHandler):
         if timeout == 0 or since_token is None or full_state:
             # we are going to return immediately, so don't bother calling
             # notifier.wait_for_events.
-            result = yield self.current_sync_for_user(sync_config, since_token,
-                                                      full_state=full_state)
+            result = yield self.current_sync_for_user(
+                sync_config, since_token, full_state=full_state,
+            )
             defer.returnValue(result)
         else:
             def current_sync_callback(before_token, after_token):
@@ -151,7 +158,7 @@ class SyncHandler(BaseHandler):
 
             result = yield self.notifier.wait_for_events(
                 sync_config.user.to_string(), timeout, current_sync_callback,
-                from_token=since_token
+                from_token=since_token,
             )
             defer.returnValue(result)
 
@@ -205,7 +212,7 @@ class SyncHandler(BaseHandler):
         )
 
         membership_list = (Membership.INVITE, Membership.JOIN)
-        if sync_config.filter.include_leave:
+        if sync_config.filter_collection.include_leave:
             membership_list += (Membership.LEAVE, Membership.BAN)
 
         room_list = yield self.store.get_rooms_for_user_where_membership_is(
@@ -266,9 +273,17 @@ class SyncHandler(BaseHandler):
             deferreds, consumeErrors=True
         ).addErrback(unwrapFirstError)
 
+        account_data_for_user = sync_config.filter_collection.filter_account_data(
+            self.account_data_for_user(account_data)
+        )
+
+        presence = sync_config.filter_collection.filter_presence(
+            presence
+        )
+
         defer.returnValue(SyncResult(
             presence=presence,
-            account_data=self.account_data_for_user(account_data),
+            account_data=account_data_for_user,
             joined=joined,
             invited=invited,
             archived=archived,
@@ -302,14 +317,31 @@ class SyncHandler(BaseHandler):
 
         current_state = yield self.get_state_at(room_id, now_token)
 
+        current_state = {
+            (e.type, e.state_key): e
+            for e in sync_config.filter_collection.filter_room_state(
+                current_state.values()
+            )
+        }
+
+        account_data = self.account_data_for_room(
+            room_id, tags_by_room, account_data_by_room
+        )
+
+        account_data = sync_config.filter_collection.filter_room_account_data(
+            account_data
+        )
+
+        ephemeral = sync_config.filter_collection.filter_room_ephemeral(
+            ephemeral_by_room.get(room_id, [])
+        )
+
         defer.returnValue(JoinedSyncResult(
             room_id=room_id,
             timeline=batch,
             state=current_state,
-            ephemeral=ephemeral_by_room.get(room_id, []),
-            account_data=self.account_data_for_room(
-                room_id, tags_by_room, account_data_by_room
-            ),
+            ephemeral=ephemeral,
+            account_data=account_data,
             unread_notifications=unread_notifications,
         ))
 
@@ -365,7 +397,7 @@ class SyncHandler(BaseHandler):
         typing, typing_key = yield typing_source.get_new_events(
             user=sync_config.user,
             from_key=typing_key,
-            limit=sync_config.filter.ephemeral_limit(),
+            limit=sync_config.filter_collection.ephemeral_limit(),
             room_ids=room_ids,
             is_guest=sync_config.is_guest,
         )
@@ -388,7 +420,7 @@ class SyncHandler(BaseHandler):
         receipts, receipt_key = yield receipt_source.get_new_events(
             user=sync_config.user,
             from_key=receipt_key,
-            limit=sync_config.filter.ephemeral_limit(),
+            limit=sync_config.filter_collection.ephemeral_limit(),
             room_ids=room_ids,
             is_guest=sync_config.is_guest,
         )
@@ -419,13 +451,26 @@ class SyncHandler(BaseHandler):
 
         leave_state = yield self.store.get_state_for_event(leave_event_id)
 
+        leave_state = {
+            (e.type, e.state_key): e
+            for e in sync_config.filter_collection.filter_room_state(
+                leave_state.values()
+            )
+        }
+
+        account_data = self.account_data_for_room(
+            room_id, tags_by_room, account_data_by_room
+        )
+
+        account_data = sync_config.filter_collection.filter_room_account_data(
+            account_data
+        )
+
         defer.returnValue(ArchivedSyncResult(
             room_id=room_id,
             timeline=batch,
             state=leave_state,
-            account_data=self.account_data_for_room(
-                room_id, tags_by_room, account_data_by_room
-            ),
+            account_data=account_data,
         ))
 
     @defer.inlineCallbacks
@@ -444,7 +489,7 @@ class SyncHandler(BaseHandler):
         presence, presence_key = yield presence_source.get_new_events(
             user=sync_config.user,
             from_key=since_token.presence_key,
-            limit=sync_config.filter.presence_limit(),
+            limit=sync_config.filter_collection.presence_limit(),
             room_ids=room_ids,
             is_guest=sync_config.is_guest,
         )
@@ -473,7 +518,7 @@ class SyncHandler(BaseHandler):
                 sync_config.user
             )
 
-        timeline_limit = sync_config.filter.timeline_limit()
+        timeline_limit = sync_config.filter_collection.timeline_limit()
 
         room_events, _ = yield self.store.get_room_events_stream(
             sync_config.user.to_string(),
@@ -538,6 +583,27 @@ class SyncHandler(BaseHandler):
                     # the timeline is inherently limited if we've just joined
                     limited = True
 
+                recents = sync_config.filter_collection.filter_room_timeline(recents)
+
+                state = {
+                    (e.type, e.state_key): e
+                    for e in sync_config.filter_collection.filter_room_state(
+                        state.values()
+                    )
+                }
+
+                acc_data = self.account_data_for_room(
+                    room_id, tags_by_room, account_data_by_room
+                )
+
+                acc_data = sync_config.filter_collection.filter_room_account_data(
+                    acc_data
+                )
+
+                ephemeral = sync_config.filter_collection.filter_room_ephemeral(
+                    ephemeral_by_room.get(room_id, [])
+                )
+
                 room_sync = JoinedSyncResult(
                     room_id=room_id,
                     timeline=TimelineBatch(
@@ -546,10 +612,8 @@ class SyncHandler(BaseHandler):
                         limited=limited,
                     ),
                     state=state,
-                    ephemeral=ephemeral_by_room.get(room_id, []),
-                    account_data=self.account_data_for_room(
-                        room_id, tags_by_room, account_data_by_room
-                    ),
+                    ephemeral=ephemeral,
+                    account_data=acc_data,
                     unread_notifications={},
                 )
                 logger.debug("Result for room %s: %r", room_id, room_sync)
@@ -603,9 +667,17 @@ class SyncHandler(BaseHandler):
             for event in invite_events
         ]
 
+        account_data_for_user = sync_config.filter_collection.filter_account_data(
+            self.account_data_for_user(account_data)
+        )
+
+        presence = sync_config.filter_collection.filter_presence(
+            presence
+        )
+
         defer.returnValue(SyncResult(
             presence=presence,
-            account_data=self.account_data_for_user(account_data),
+            account_data=account_data_for_user,
             joined=joined,
             invited=invited,
             archived=archived,
@@ -621,7 +693,7 @@ class SyncHandler(BaseHandler):
         limited = True
         recents = []
         filtering_factor = 2
-        timeline_limit = sync_config.filter.timeline_limit()
+        timeline_limit = sync_config.filter_collection.timeline_limit()
         load_limit = max(timeline_limit * filtering_factor, 100)
         max_repeat = 3  # Only try a few times per room, otherwise
         room_key = now_token.room_key
@@ -634,9 +706,9 @@ class SyncHandler(BaseHandler):
                 from_token=since_token.room_key if since_token else None,
                 end_token=end_key,
             )
-            (room_key, _) = keys
+            room_key, _ = keys
             end_key = "s" + room_key.split('-')[-1]
-            loaded_recents = sync_config.filter.filter_room_timeline(events)
+            loaded_recents = sync_config.filter_collection.filter_room_timeline(events)
             loaded_recents = yield self._filter_events_for_client(
                 sync_config.user.to_string(),
                 loaded_recents,
@@ -684,21 +756,28 @@ class SyncHandler(BaseHandler):
 
         logger.debug("Recents %r", batch)
 
-        current_state = yield self.get_state_at(room_id, now_token)
+        if batch.limited:
+            current_state = yield self.get_state_at(room_id, now_token)
 
-        state_at_previous_sync = yield self.get_state_at(
-            room_id, stream_position=since_token
-        )
+            state_at_previous_sync = yield self.get_state_at(
+                room_id, stream_position=since_token
+            )
 
-        state = yield self.compute_state_delta(
-            since_token=since_token,
-            previous_state=state_at_previous_sync,
-            current_state=current_state,
-        )
+            state = yield self.compute_state_delta(
+                since_token=since_token,
+                previous_state=state_at_previous_sync,
+                current_state=current_state,
+            )
+        else:
+            state = {
+                (event.type, event.state_key): event
+                for event in batch.events if event.is_state()
+            }
 
         just_joined = yield self.check_joined_room(sync_config, state)
         if just_joined:
             state = yield self.get_state_at(room_id, now_token)
+            # batch.limited = True
 
         notifs = yield self.unread_notifs_for_room_id(
             room_id, sync_config, all_ephemeral_by_room
@@ -711,14 +790,29 @@ class SyncHandler(BaseHandler):
                 1 for notif in notifs if _action_has_highlight(notif["actions"])
             ])
 
+        state = {
+            (e.type, e.state_key): e
+            for e in sync_config.filter_collection.filter_room_state(state.values())
+        }
+
+        account_data = self.account_data_for_room(
+            room_id, tags_by_room, account_data_by_room
+        )
+
+        account_data = sync_config.filter_collection.filter_room_account_data(
+            account_data
+        )
+
+        ephemeral = sync_config.filter_collection.filter_room_ephemeral(
+            ephemeral_by_room.get(room_id, [])
+        )
+
         room_sync = JoinedSyncResult(
             room_id=room_id,
             timeline=batch,
             state=state,
-            ephemeral=ephemeral_by_room.get(room_id, []),
-            account_data=self.account_data_for_room(
-                room_id, tags_by_room, account_data_by_room
-            ),
+            ephemeral=ephemeral,
+            account_data=account_data,
             unread_notifications=unread_notifications,
         )
 
@@ -765,13 +859,26 @@ class SyncHandler(BaseHandler):
             current_state=state_events_at_leave,
         )
 
+        state_events_delta = {
+            (e.type, e.state_key): e
+            for e in sync_config.filter_collection.filter_room_state(
+                state_events_delta.values()
+            )
+        }
+
+        account_data = self.account_data_for_room(
+            leave_event.room_id, tags_by_room, account_data_by_room
+        )
+
+        account_data = sync_config.filter_collection.filter_room_account_data(
+            account_data
+        )
+
         room_sync = ArchivedSyncResult(
             room_id=leave_event.room_id,
             timeline=batch,
             state=state_events_delta,
-            account_data=self.account_data_for_room(
-                leave_event.room_id, tags_by_room, account_data_by_room
-            ),
+            account_data=account_data,
         )
 
         logger.debug("Room sync: %r", room_sync)