diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 53e1eb0508..328c049b03 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -29,7 +29,7 @@ logger = logging.getLogger(__name__)
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
- "filter",
+ "filter_collection",
"is_guest",
])
@@ -142,8 +142,9 @@ class SyncHandler(BaseHandler):
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
- result = yield self.current_sync_for_user(sync_config, since_token,
- full_state=full_state)
+ result = yield self.current_sync_for_user(
+ sync_config, since_token, full_state=full_state,
+ )
defer.returnValue(result)
else:
def current_sync_callback(before_token, after_token):
@@ -151,7 +152,7 @@ class SyncHandler(BaseHandler):
result = yield self.notifier.wait_for_events(
sync_config.user.to_string(), timeout, current_sync_callback,
- from_token=since_token
+ from_token=since_token,
)
defer.returnValue(result)
@@ -205,7 +206,7 @@ class SyncHandler(BaseHandler):
)
membership_list = (Membership.INVITE, Membership.JOIN)
- if sync_config.filter.include_leave:
+ if sync_config.filter_collection.include_leave:
membership_list += (Membership.LEAVE, Membership.BAN)
room_list = yield self.store.get_rooms_for_user_where_membership_is(
@@ -266,9 +267,17 @@ class SyncHandler(BaseHandler):
deferreds, consumeErrors=True
).addErrback(unwrapFirstError)
+ account_data_for_user = sync_config.filter_collection.filter_account_data(
+ self.account_data_for_user(account_data)
+ )
+
+ presence = sync_config.filter_collection.filter_presence(
+ presence
+ )
+
defer.returnValue(SyncResult(
presence=presence,
- account_data=self.account_data_for_user(account_data),
+ account_data=account_data_for_user,
joined=joined,
invited=invited,
archived=archived,
@@ -302,14 +311,31 @@ class SyncHandler(BaseHandler):
current_state = yield self.get_state_at(room_id, now_token)
+ current_state = {
+ (e.type, e.state_key): e
+ for e in sync_config.filter_collection.filter_room_state(
+ current_state.values()
+ )
+ }
+
+ account_data = self.account_data_for_room(
+ room_id, tags_by_room, account_data_by_room
+ )
+
+ account_data = sync_config.filter_collection.filter_room_account_data(
+ account_data
+ )
+
+ ephemeral = sync_config.filter_collection.filter_room_ephemeral(
+ ephemeral_by_room.get(room_id, [])
+ )
+
defer.returnValue(JoinedSyncResult(
room_id=room_id,
timeline=batch,
state=current_state,
- ephemeral=ephemeral_by_room.get(room_id, []),
- account_data=self.account_data_for_room(
- room_id, tags_by_room, account_data_by_room
- ),
+ ephemeral=ephemeral,
+ account_data=account_data,
unread_notifications=unread_notifications,
))
@@ -365,7 +391,7 @@ class SyncHandler(BaseHandler):
typing, typing_key = yield typing_source.get_new_events(
user=sync_config.user,
from_key=typing_key,
- limit=sync_config.filter.ephemeral_limit(),
+ limit=sync_config.filter_collection.ephemeral_limit(),
room_ids=room_ids,
is_guest=sync_config.is_guest,
)
@@ -388,7 +414,7 @@ class SyncHandler(BaseHandler):
receipts, receipt_key = yield receipt_source.get_new_events(
user=sync_config.user,
from_key=receipt_key,
- limit=sync_config.filter.ephemeral_limit(),
+ limit=sync_config.filter_collection.ephemeral_limit(),
room_ids=room_ids,
is_guest=sync_config.is_guest,
)
@@ -419,13 +445,26 @@ class SyncHandler(BaseHandler):
leave_state = yield self.store.get_state_for_event(leave_event_id)
+ leave_state = {
+ (e.type, e.state_key): e
+ for e in sync_config.filter_collection.filter_room_state(
+ leave_state.values()
+ )
+ }
+
+ account_data = self.account_data_for_room(
+ room_id, tags_by_room, account_data_by_room
+ )
+
+ account_data = sync_config.filter_collection.filter_room_account_data(
+ account_data
+ )
+
defer.returnValue(ArchivedSyncResult(
room_id=room_id,
timeline=batch,
state=leave_state,
- account_data=self.account_data_for_room(
- room_id, tags_by_room, account_data_by_room
- ),
+ account_data=account_data,
))
@defer.inlineCallbacks
@@ -444,7 +483,7 @@ class SyncHandler(BaseHandler):
presence, presence_key = yield presence_source.get_new_events(
user=sync_config.user,
from_key=since_token.presence_key,
- limit=sync_config.filter.presence_limit(),
+ limit=sync_config.filter_collection.presence_limit(),
room_ids=room_ids,
is_guest=sync_config.is_guest,
)
@@ -473,7 +512,7 @@ class SyncHandler(BaseHandler):
sync_config.user
)
- timeline_limit = sync_config.filter.timeline_limit()
+ timeline_limit = sync_config.filter_collection.timeline_limit()
room_events, _ = yield self.store.get_room_events_stream(
sync_config.user.to_string(),
@@ -538,6 +577,27 @@ class SyncHandler(BaseHandler):
# the timeline is inherently limited if we've just joined
limited = True
+ recents = sync_config.filter_collection.filter_room_timeline(recents)
+
+ state = {
+ (e.type, e.state_key): e
+ for e in sync_config.filter_collection.filter_room_state(
+ state.values()
+ )
+ }
+
+ acc_data = self.account_data_for_room(
+ room_id, tags_by_room, account_data_by_room
+ )
+
+ acc_data = sync_config.filter_collection.filter_room_account_data(
+ acc_data
+ )
+
+ ephemeral = sync_config.filter_collection.filter_room_ephemeral(
+ ephemeral_by_room.get(room_id, [])
+ )
+
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=TimelineBatch(
@@ -546,10 +606,8 @@ class SyncHandler(BaseHandler):
limited=limited,
),
state=state,
- ephemeral=ephemeral_by_room.get(room_id, []),
- account_data=self.account_data_for_room(
- room_id, tags_by_room, account_data_by_room
- ),
+ ephemeral=ephemeral,
+ account_data=acc_data,
unread_notifications={},
)
logger.debug("Result for room %s: %r", room_id, room_sync)
@@ -603,9 +661,17 @@ class SyncHandler(BaseHandler):
for event in invite_events
]
+ account_data_for_user = sync_config.filter_collection.filter_account_data(
+ self.account_data_for_user(account_data)
+ )
+
+ presence = sync_config.filter_collection.filter_presence(
+ presence
+ )
+
defer.returnValue(SyncResult(
presence=presence,
- account_data=self.account_data_for_user(account_data),
+ account_data=account_data_for_user,
joined=joined,
invited=invited,
archived=archived,
@@ -621,7 +687,7 @@ class SyncHandler(BaseHandler):
limited = True
recents = []
filtering_factor = 2
- timeline_limit = sync_config.filter.timeline_limit()
+ timeline_limit = sync_config.filter_collection.timeline_limit()
load_limit = max(timeline_limit * filtering_factor, 100)
max_repeat = 3 # Only try a few times per room, otherwise
room_key = now_token.room_key
@@ -634,9 +700,9 @@ class SyncHandler(BaseHandler):
from_token=since_token.room_key if since_token else None,
end_token=end_key,
)
- (room_key, _) = keys
+ room_key, _ = keys
end_key = "s" + room_key.split('-')[-1]
- loaded_recents = sync_config.filter.filter_room_timeline(events)
+ loaded_recents = sync_config.filter_collection.filter_room_timeline(events)
loaded_recents = yield self._filter_events_for_client(
sync_config.user.to_string(),
loaded_recents,
@@ -684,17 +750,23 @@ class SyncHandler(BaseHandler):
logger.debug("Recents %r", batch)
- current_state = yield self.get_state_at(room_id, now_token)
+ if batch.limited:
+ current_state = yield self.get_state_at(room_id, now_token)
- state_at_previous_sync = yield self.get_state_at(
- room_id, stream_position=since_token
- )
+ state_at_previous_sync = yield self.get_state_at(
+ room_id, stream_position=since_token
+ )
- state = yield self.compute_state_delta(
- since_token=since_token,
- previous_state=state_at_previous_sync,
- current_state=current_state,
- )
+ state = yield self.compute_state_delta(
+ since_token=since_token,
+ previous_state=state_at_previous_sync,
+ current_state=current_state,
+ )
+ else:
+ state = {
+ (event.type, event.state_key): event
+ for event in batch.events if event.is_state()
+ }
just_joined = yield self.check_joined_room(sync_config, state)
if just_joined:
@@ -711,14 +783,29 @@ class SyncHandler(BaseHandler):
1 for notif in notifs if _action_has_highlight(notif["actions"])
])
+ state = {
+ (e.type, e.state_key): e
+ for e in sync_config.filter_collection.filter_room_state(state.values())
+ }
+
+ account_data = self.account_data_for_room(
+ room_id, tags_by_room, account_data_by_room
+ )
+
+ account_data = sync_config.filter_collection.filter_room_account_data(
+ account_data
+ )
+
+ ephemeral = sync_config.filter_collection.filter_room_ephemeral(
+ ephemeral_by_room.get(room_id, [])
+ )
+
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
state=state,
- ephemeral=ephemeral_by_room.get(room_id, []),
- account_data=self.account_data_for_room(
- room_id, tags_by_room, account_data_by_room
- ),
+ ephemeral=ephemeral,
+ account_data=account_data,
unread_notifications=unread_notifications,
)
@@ -765,13 +852,26 @@ class SyncHandler(BaseHandler):
current_state=state_events_at_leave,
)
+ state_events_delta = {
+ (e.type, e.state_key): e
+ for e in sync_config.filter_collection.filter_room_state(
+ state_events_delta.values()
+ )
+ }
+
+ account_data = self.account_data_for_room(
+ leave_event.room_id, tags_by_room, account_data_by_room
+ )
+
+ account_data = sync_config.filter_collection.filter_room_account_data(
+ account_data
+ )
+
room_sync = ArchivedSyncResult(
room_id=leave_event.room_id,
timeline=batch,
state=state_events_delta,
- account_data=self.account_data_for_room(
- leave_event.room_id, tags_by_room, account_data_by_room
- ),
+ account_data=account_data,
)
logger.debug("Room sync: %r", room_sync)
|