diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 62fda0c664..c5188a1f8e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -64,20 +64,14 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
-SyncConfig = collections.namedtuple("SyncConfig", [
- "user",
- "filter_collection",
- "is_guest",
- "request_key",
- "device_id",
-])
-
-
-class TimelineBatch(collections.namedtuple("TimelineBatch", [
- "prev_batch",
- "events",
- "limited",
-])):
+SyncConfig = collections.namedtuple(
+ "SyncConfig", ["user", "filter_collection", "is_guest", "request_key", "device_id"]
+)
+
+
+class TimelineBatch(
+ collections.namedtuple("TimelineBatch", ["prev_batch", "events", "limited"])
+):
__slots__ = []
def __nonzero__(self):
@@ -85,18 +79,24 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [
to tell if room needs to be part of the sync result.
"""
return bool(self.events)
+
__bool__ = __nonzero__ # python3
-class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
- "room_id", # str
- "timeline", # TimelineBatch
- "state", # dict[(str, str), FrozenEvent]
- "ephemeral",
- "account_data",
- "unread_notifications",
- "summary",
-])):
+class JoinedSyncResult(
+ collections.namedtuple(
+ "JoinedSyncResult",
+ [
+ "room_id", # str
+ "timeline", # TimelineBatch
+ "state", # dict[(str, str), FrozenEvent]
+ "ephemeral",
+ "account_data",
+ "unread_notifications",
+ "summary",
+ ],
+ )
+):
__slots__ = []
def __nonzero__(self):
@@ -111,77 +111,93 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
# nb the notification count does not, er, count: if there's nothing
# else in the result, we don't need to send it.
)
+
__bool__ = __nonzero__ # python3
-class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
- "room_id", # str
- "timeline", # TimelineBatch
- "state", # dict[(str, str), FrozenEvent]
- "account_data",
-])):
+class ArchivedSyncResult(
+ collections.namedtuple(
+ "ArchivedSyncResult",
+ [
+ "room_id", # str
+ "timeline", # TimelineBatch
+ "state", # dict[(str, str), FrozenEvent]
+ "account_data",
+ ],
+ )
+):
__slots__ = []
def __nonzero__(self):
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
- return bool(
- self.timeline
- or self.state
- or self.account_data
- )
+ return bool(self.timeline or self.state or self.account_data)
+
__bool__ = __nonzero__ # python3
-class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
- "room_id", # str
- "invite", # FrozenEvent: the invite event
-])):
+class InvitedSyncResult(
+ collections.namedtuple(
+ "InvitedSyncResult",
+ ["room_id", "invite"], # str # FrozenEvent: the invite event
+ )
+):
__slots__ = []
def __nonzero__(self):
"""Invited rooms should always be reported to the client"""
return True
+
__bool__ = __nonzero__ # python3
-class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
- "join",
- "invite",
- "leave",
-])):
+class GroupsSyncResult(
+ collections.namedtuple("GroupsSyncResult", ["join", "invite", "leave"])
+):
__slots__ = []
def __nonzero__(self):
return bool(self.join or self.invite or self.leave)
+
__bool__ = __nonzero__ # python3
-class DeviceLists(collections.namedtuple("DeviceLists", [
- "changed", # list of user_ids whose devices may have changed
- "left", # list of user_ids whose devices we no longer track
-])):
+class DeviceLists(
+ collections.namedtuple(
+ "DeviceLists",
+ [
+ "changed", # list of user_ids whose devices may have changed
+ "left", # list of user_ids whose devices we no longer track
+ ],
+ )
+):
__slots__ = []
def __nonzero__(self):
return bool(self.changed or self.left)
+
__bool__ = __nonzero__ # python3
-class SyncResult(collections.namedtuple("SyncResult", [
- "next_batch", # Token for the next sync
- "presence", # List of presence events for the user.
- "account_data", # List of account_data events for the user.
- "joined", # JoinedSyncResult for each joined room.
- "invited", # InvitedSyncResult for each invited room.
- "archived", # ArchivedSyncResult for each archived room.
- "to_device", # List of direct messages for the device.
- "device_lists", # List of user_ids whose devices have changed
- "device_one_time_keys_count", # Dict of algorithm to count for one time keys
- # for this device
- "groups",
-])):
+class SyncResult(
+ collections.namedtuple(
+ "SyncResult",
+ [
+ "next_batch", # Token for the next sync
+ "presence", # List of presence events for the user.
+ "account_data", # List of account_data events for the user.
+ "joined", # JoinedSyncResult for each joined room.
+ "invited", # InvitedSyncResult for each invited room.
+ "archived", # ArchivedSyncResult for each archived room.
+ "to_device", # List of direct messages for the device.
+ "device_lists", # List of user_ids whose devices have changed
+ "device_one_time_keys_count", # Dict of algorithm to count for one time keys
+ # for this device
+ "groups",
+ ],
+ )
+):
__slots__ = []
def __nonzero__(self):
@@ -190,20 +206,20 @@ class SyncResult(collections.namedtuple("SyncResult", [
events.
"""
return bool(
- self.presence or
- self.joined or
- self.invited or
- self.archived or
- self.account_data or
- self.to_device or
- self.device_lists or
- self.groups
+ self.presence
+ or self.joined
+ or self.invited
+ or self.archived
+ or self.account_data
+ or self.to_device
+ or self.device_lists
+ or self.groups
)
+
__bool__ = __nonzero__ # python3
class SyncHandler(object):
-
def __init__(self, hs):
self.hs_config = hs.config
self.store = hs.get_datastore()
@@ -217,13 +233,16 @@ class SyncHandler(object):
# ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
self.lazy_loaded_members_cache = ExpiringCache(
- "lazy_loaded_members_cache", self.clock,
- max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
+ "lazy_loaded_members_cache",
+ self.clock,
+ max_len=0,
+ expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
)
@defer.inlineCallbacks
- def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
- full_state=False):
+ def wait_for_sync_for_user(
+ self, sync_config, since_token=None, timeout=0, full_state=False
+ ):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
@@ -239,13 +258,15 @@ class SyncHandler(object):
res = yield self.response_cache.wrap(
sync_config.request_key,
self._wait_for_sync_for_user,
- sync_config, since_token, timeout, full_state,
+ sync_config,
+ since_token,
+ timeout,
+ full_state,
)
defer.returnValue(res)
@defer.inlineCallbacks
- def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
- full_state):
+ def _wait_for_sync_for_user(self, sync_config, since_token, timeout, full_state):
if since_token is None:
sync_type = "initial_sync"
elif full_state:
@@ -261,14 +282,17 @@ class SyncHandler(object):
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result = yield self.current_sync_for_user(
- sync_config, since_token, full_state=full_state,
+ sync_config, since_token, full_state=full_state
)
else:
+
def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, since_token)
result = yield self.notifier.wait_for_events(
- sync_config.user.to_string(), timeout, current_sync_callback,
+ sync_config.user.to_string(),
+ timeout,
+ current_sync_callback,
from_token=since_token,
)
@@ -281,8 +305,7 @@ class SyncHandler(object):
defer.returnValue(result)
- def current_sync_for_user(self, sync_config, since_token=None,
- full_state=False):
+ def current_sync_for_user(self, sync_config, since_token=None, full_state=False):
"""Get the sync for client needed to match what the server has now.
Returns:
A Deferred SyncResult.
@@ -334,8 +357,7 @@ class SyncHandler(object):
# result returned by the event source is poor form (it might cache
# the object)
room_id = event["room_id"]
- event_copy = {k: v for (k, v) in iteritems(event)
- if k != "room_id"}
+ event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
receipt_key = since_token.receipt_key if since_token else "0"
@@ -353,22 +375,30 @@ class SyncHandler(object):
for event in receipts:
room_id = event["room_id"]
# exclude room id, as above
- event_copy = {k: v for (k, v) in iteritems(event)
- if k != "room_id"}
+ event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
defer.returnValue((now_token, ephemeral_by_room))
@defer.inlineCallbacks
- def _load_filtered_recents(self, room_id, sync_config, now_token,
- since_token=None, recents=None, newly_joined_room=False):
+ def _load_filtered_recents(
+ self,
+ room_id,
+ sync_config,
+ now_token,
+ since_token=None,
+ recents=None,
+ newly_joined_room=False,
+ ):
"""
Returns:
a Deferred TimelineBatch
"""
with Measure(self.clock, "load_filtered_recents"):
timeline_limit = sync_config.filter_collection.timeline_limit()
- block_all_timeline = sync_config.filter_collection.blocks_all_room_timeline()
+ block_all_timeline = (
+ sync_config.filter_collection.blocks_all_room_timeline()
+ )
if recents is None or newly_joined_room or timeline_limit < len(recents):
limited = True
@@ -396,11 +426,9 @@ class SyncHandler(object):
recents = []
if not limited or block_all_timeline:
- defer.returnValue(TimelineBatch(
- events=recents,
- prev_batch=now_token,
- limited=False
- ))
+ defer.returnValue(
+ TimelineBatch(events=recents, prev_batch=now_token, limited=False)
+ )
filtering_factor = 2
load_limit = max(timeline_limit * filtering_factor, 10)
@@ -427,9 +455,7 @@ class SyncHandler(object):
)
else:
events, end_key = yield self.store.get_recent_events_for_room(
- room_id,
- limit=load_limit + 1,
- end_token=end_key,
+ room_id, limit=load_limit + 1, end_token=end_key
)
loaded_recents = sync_config.filter_collection.filter_room_timeline(
events
@@ -462,15 +488,15 @@ class SyncHandler(object):
recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before
- prev_batch_token = now_token.copy_and_replace(
- "room_key", room_key
- )
+ prev_batch_token = now_token.copy_and_replace("room_key", room_key)
- defer.returnValue(TimelineBatch(
- events=recents,
- prev_batch=prev_batch_token,
- limited=limited or newly_joined_room
- ))
+ defer.returnValue(
+ TimelineBatch(
+ events=recents,
+ prev_batch=prev_batch_token,
+ limited=limited or newly_joined_room,
+ )
+ )
@defer.inlineCallbacks
def get_state_after_event(self, event, state_filter=StateFilter.all()):
@@ -486,7 +512,7 @@ class SyncHandler(object):
A Deferred map from ((type, state_key)->Event)
"""
state_ids = yield self.store.get_state_ids_for_event(
- event.event_id, state_filter=state_filter,
+ event.event_id, state_filter=state_filter
)
if event.is_state():
state_ids = state_ids.copy()
@@ -511,13 +537,13 @@ class SyncHandler(object):
# does not reliably give you the state at the given stream position.
# (https://github.com/matrix-org/synapse/issues/3305)
last_events, _ = yield self.store.get_recent_events_for_room(
- room_id, end_token=stream_position.room_key, limit=1,
+ room_id, end_token=stream_position.room_key, limit=1
)
if last_events:
last_event = last_events[-1]
state = yield self.get_state_after_event(
- last_event, state_filter=state_filter,
+ last_event, state_filter=state_filter
)
else:
@@ -549,7 +575,7 @@ class SyncHandler(object):
# FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
last_events, _ = yield self.store.get_recent_event_ids_for_room(
- room_id, end_token=now_token.room_key, limit=1,
+ room_id, end_token=now_token.room_key, limit=1
)
if not last_events:
@@ -559,28 +585,25 @@ class SyncHandler(object):
last_event = last_events[-1]
state_ids = yield self.store.get_state_ids_for_event(
last_event.event_id,
- state_filter=StateFilter.from_types([
- (EventTypes.Name, ''),
- (EventTypes.CanonicalAlias, ''),
- ]),
+ state_filter=StateFilter.from_types(
+ [(EventTypes.Name, ""), (EventTypes.CanonicalAlias, "")]
+ ),
)
# this is heavily cached, thus: fast.
details = yield self.store.get_room_summary(room_id)
- name_id = state_ids.get((EventTypes.Name, ''))
- canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
+ name_id = state_ids.get((EventTypes.Name, ""))
+ canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ""))
summary = {}
empty_ms = MemberSummary([], 0)
# TODO: only send these when they change.
- summary["m.joined_member_count"] = (
- details.get(Membership.JOIN, empty_ms).count
- )
- summary["m.invited_member_count"] = (
- details.get(Membership.INVITE, empty_ms).count
- )
+ summary["m.joined_member_count"] = details.get(Membership.JOIN, empty_ms).count
+ summary["m.invited_member_count"] = details.get(
+ Membership.INVITE, empty_ms
+ ).count
# if the room has a name or canonical_alias set, we can skip
# calculating heroes. Empty strings are falsey, so we check
@@ -592,7 +615,7 @@ class SyncHandler(object):
if canonical_alias_id:
canonical_alias = yield self.store.get_event(
- canonical_alias_id, allow_none=True,
+ canonical_alias_id, allow_none=True
)
if canonical_alias and canonical_alias.content.get("alias"):
defer.returnValue(summary)
@@ -600,26 +623,14 @@ class SyncHandler(object):
me = sync_config.user.to_string()
joined_user_ids = [
- r[0]
- for r in details.get(Membership.JOIN, empty_ms).members
- if r[0] != me
+ r[0] for r in details.get(Membership.JOIN, empty_ms).members if r[0] != me
]
invited_user_ids = [
- r[0]
- for r in details.get(Membership.INVITE, empty_ms).members
- if r[0] != me
+ r[0] for r in details.get(Membership.INVITE, empty_ms).members if r[0] != me
]
- gone_user_ids = (
- [
- r[0]
- for r in details.get(Membership.LEAVE, empty_ms).members
- if r[0] != me
- ] + [
- r[0]
- for r in details.get(Membership.BAN, empty_ms).members
- if r[0] != me
- ]
- )
+ gone_user_ids = [
+ r[0] for r in details.get(Membership.LEAVE, empty_ms).members if r[0] != me
+ ] + [r[0] for r in details.get(Membership.BAN, empty_ms).members if r[0] != me]
# FIXME: only build up a member_ids list for our heroes
member_ids = {}
@@ -627,20 +638,18 @@ class SyncHandler(object):
Membership.JOIN,
Membership.INVITE,
Membership.LEAVE,
- Membership.BAN
+ Membership.BAN,
):
for user_id, event_id in details.get(membership, empty_ms).members:
member_ids[user_id] = event_id
# FIXME: order by stream ordering rather than as returned by SQL
- if (joined_user_ids or invited_user_ids):
- summary['m.heroes'] = sorted(
+ if joined_user_ids or invited_user_ids:
+ summary["m.heroes"] = sorted(
[user_id for user_id in (joined_user_ids + invited_user_ids)]
)[0:5]
else:
- summary['m.heroes'] = sorted(
- [user_id for user_id in gone_user_ids]
- )[0:5]
+ summary["m.heroes"] = sorted([user_id for user_id in gone_user_ids])[0:5]
if not sync_config.filter_collection.lazy_load_members():
defer.returnValue(summary)
@@ -652,8 +661,7 @@ class SyncHandler(object):
# track which members the client should already know about via LL:
# Ones which are already in state...
existing_members = set(
- user_id for (typ, user_id) in state.keys()
- if typ == EventTypes.Member
+ user_id for (typ, user_id) in state.keys() if typ == EventTypes.Member
)
# ...or ones which are in the timeline...
@@ -664,10 +672,10 @@ class SyncHandler(object):
# ...and then ensure any missing ones get included in state.
missing_hero_event_ids = [
member_ids[hero_id]
- for hero_id in summary['m.heroes']
+ for hero_id in summary["m.heroes"]
if (
- cache.get(hero_id) != member_ids[hero_id] and
- hero_id not in existing_members
+ cache.get(hero_id) != member_ids[hero_id]
+ and hero_id not in existing_members
)
]
@@ -691,8 +699,9 @@ class SyncHandler(object):
return cache
@defer.inlineCallbacks
- def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
- full_state):
+ def compute_state_delta(
+ self, room_id, batch, sync_config, since_token, now_token, full_state
+ ):
""" Works out the difference in state between the start of the timeline
and the previous sync.
@@ -745,23 +754,23 @@ class SyncHandler(object):
timeline_state = {
(event.type, event.state_key): event.event_id
- for event in batch.events if event.is_state()
+ for event in batch.events
+ if event.is_state()
}
if full_state:
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id, state_filter=state_filter,
+ batch.events[-1].event_id, state_filter=state_filter
)
state_ids = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id, state_filter=state_filter,
+ batch.events[0].event_id, state_filter=state_filter
)
else:
current_state_ids = yield self.get_state_at(
- room_id, stream_position=now_token,
- state_filter=state_filter,
+ room_id, stream_position=now_token, state_filter=state_filter
)
state_ids = current_state_ids
@@ -775,7 +784,7 @@ class SyncHandler(object):
)
elif batch.limited:
state_at_timeline_start = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id, state_filter=state_filter,
+ batch.events[0].event_id, state_filter=state_filter
)
# for now, we disable LL for gappy syncs - see
@@ -793,12 +802,11 @@ class SyncHandler(object):
state_filter = StateFilter.all()
state_at_previous_sync = yield self.get_state_at(
- room_id, stream_position=since_token,
- state_filter=state_filter,
+ room_id, stream_position=since_token, state_filter=state_filter
)
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id, state_filter=state_filter,
+ batch.events[-1].event_id, state_filter=state_filter
)
state_ids = _calculate_state(
@@ -854,8 +862,7 @@ class SyncHandler(object):
# add any member IDs we are about to send into our LruCache
for t, event_id in itertools.chain(
- state_ids.items(),
- timeline_state.items(),
+ state_ids.items(), timeline_state.items()
):
if t[0] == EventTypes.Member:
cache.set(t[1], event_id)
@@ -864,10 +871,14 @@ class SyncHandler(object):
if state_ids:
state = yield self.store.get_events(list(state_ids.values()))
- defer.returnValue({
- (e.type, e.state_key): e
- for e in sync_config.filter_collection.filter_room_state(list(state.values()))
- })
+ defer.returnValue(
+ {
+ (e.type, e.state_key): e
+ for e in sync_config.filter_collection.filter_room_state(
+ list(state.values())
+ )
+ }
+ )
@defer.inlineCallbacks
def unread_notifs_for_room_id(self, room_id, sync_config):
@@ -875,7 +886,7 @@ class SyncHandler(object):
last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user(
user_id=sync_config.user.to_string(),
room_id=room_id,
- receipt_type="m.read"
+ receipt_type="m.read",
)
notifs = []
@@ -909,7 +920,9 @@ class SyncHandler(object):
logger.info(
"Calculating sync response for %r between %s and %s",
- sync_config.user, since_token, now_token,
+ sync_config.user,
+ since_token,
+ now_token,
)
user_id = sync_config.user.to_string()
@@ -920,11 +933,12 @@ class SyncHandler(object):
raise NotImplementedError()
else:
joined_room_ids = yield self.get_rooms_for_user_at(
- user_id, now_token.room_stream_id,
+ user_id, now_token.room_stream_id
)
sync_result_builder = SyncResultBuilder(
- sync_config, full_state,
+ sync_config,
+ full_state,
since_token=since_token,
now_token=now_token,
joined_room_ids=joined_room_ids,
@@ -941,8 +955,7 @@ class SyncHandler(object):
_, _, newly_left_rooms, newly_left_users = res
block_all_presence_data = (
- since_token is None and
- sync_config.filter_collection.blocks_all_presence()
+ since_token is None and sync_config.filter_collection.blocks_all_presence()
)
if self.hs_config.use_presence and not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
@@ -973,22 +986,23 @@ class SyncHandler(object):
room_id = joined_room.room_id
if room_id in newly_joined_rooms:
issue4422_logger.debug(
- "Sync result for newly joined room %s: %r",
- room_id, joined_room,
+ "Sync result for newly joined room %s: %r", room_id, joined_room
)
- defer.returnValue(SyncResult(
- presence=sync_result_builder.presence,
- account_data=sync_result_builder.account_data,
- joined=sync_result_builder.joined,
- invited=sync_result_builder.invited,
- archived=sync_result_builder.archived,
- to_device=sync_result_builder.to_device,
- device_lists=device_lists,
- groups=sync_result_builder.groups,
- device_one_time_keys_count=one_time_key_counts,
- next_batch=sync_result_builder.now_token,
- ))
+ defer.returnValue(
+ SyncResult(
+ presence=sync_result_builder.presence,
+ account_data=sync_result_builder.account_data,
+ joined=sync_result_builder.joined,
+ invited=sync_result_builder.invited,
+ archived=sync_result_builder.archived,
+ to_device=sync_result_builder.to_device,
+ device_lists=device_lists,
+ groups=sync_result_builder.groups,
+ device_one_time_keys_count=one_time_key_counts,
+ next_batch=sync_result_builder.now_token,
+ )
+ )
@measure_func("_generate_sync_entry_for_groups")
@defer.inlineCallbacks
@@ -999,11 +1013,11 @@ class SyncHandler(object):
if since_token and since_token.groups_key:
results = yield self.store.get_groups_changes_for_user(
- user_id, since_token.groups_key, now_token.groups_key,
+ user_id, since_token.groups_key, now_token.groups_key
)
else:
results = yield self.store.get_all_groups_for_user(
- user_id, now_token.groups_key,
+ user_id, now_token.groups_key
)
invited = {}
@@ -1031,17 +1045,19 @@ class SyncHandler(object):
left[group_id] = content["content"]
sync_result_builder.groups = GroupsSyncResult(
- join=joined,
- invite=invited,
- leave=left,
+ join=joined, invite=invited, leave=left
)
@measure_func("_generate_sync_entry_for_device_list")
@defer.inlineCallbacks
- def _generate_sync_entry_for_device_list(self, sync_result_builder,
- newly_joined_rooms,
- newly_joined_or_invited_users,
- newly_left_rooms, newly_left_users):
+ def _generate_sync_entry_for_device_list(
+ self,
+ sync_result_builder,
+ newly_joined_rooms,
+ newly_joined_or_invited_users,
+ newly_left_rooms,
+ newly_left_users,
+ ):
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
@@ -1065,24 +1081,20 @@ class SyncHandler(object):
changed.update(newly_joined_or_invited_users)
if not changed and not newly_left_users:
- defer.returnValue(DeviceLists(
- changed=[],
- left=newly_left_users,
- ))
+ defer.returnValue(DeviceLists(changed=[], left=newly_left_users))
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)
- defer.returnValue(DeviceLists(
- changed=users_who_share_room & changed,
- left=set(newly_left_users) - users_who_share_room,
- ))
+ defer.returnValue(
+ DeviceLists(
+ changed=users_who_share_room & changed,
+ left=set(newly_left_users) - users_who_share_room,
+ )
+ )
else:
- defer.returnValue(DeviceLists(
- changed=[],
- left=[],
- ))
+ defer.returnValue(DeviceLists(changed=[], left=[]))
@defer.inlineCallbacks
def _generate_sync_entry_for_to_device(self, sync_result_builder):
@@ -1109,8 +1121,9 @@ class SyncHandler(object):
deleted = yield self.store.delete_messages_for_device(
user_id, device_id, since_stream_id
)
- logger.debug("Deleted %d to-device messages up to %d",
- deleted, since_stream_id)
+ logger.debug(
+ "Deleted %d to-device messages up to %d", deleted, since_stream_id
+ )
messages, stream_id = yield self.store.get_new_messages_for_device(
user_id, device_id, since_stream_id, now_token.to_device_key
@@ -1118,7 +1131,10 @@ class SyncHandler(object):
logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)",
- len(messages), since_stream_id, stream_id, now_token.to_device_key
+ len(messages),
+ since_stream_id,
+ stream_id,
+ now_token.to_device_key,
)
sync_result_builder.now_token = now_token.copy_and_replace(
"to_device_key", stream_id
@@ -1145,8 +1161,7 @@ class SyncHandler(object):
if since_token and not sync_result_builder.full_state:
account_data, account_data_by_room = (
yield self.store.get_updated_account_data_for_user(
- user_id,
- since_token.account_data_key,
+ user_id, since_token.account_data_key
)
)
@@ -1160,27 +1175,28 @@ class SyncHandler(object):
)
else:
account_data, account_data_by_room = (
- yield self.store.get_account_data_for_user(
- sync_config.user.to_string()
- )
+ yield self.store.get_account_data_for_user(sync_config.user.to_string())
)
- account_data['m.push_rules'] = yield self.push_rules_for_user(
+ account_data["m.push_rules"] = yield self.push_rules_for_user(
sync_config.user
)
- account_data_for_user = sync_config.filter_collection.filter_account_data([
- {"type": account_data_type, "content": content}
- for account_data_type, content in account_data.items()
- ])
+ account_data_for_user = sync_config.filter_collection.filter_account_data(
+ [
+ {"type": account_data_type, "content": content}
+ for account_data_type, content in account_data.items()
+ ]
+ )
sync_result_builder.account_data = account_data_for_user
defer.returnValue(account_data_by_room)
@defer.inlineCallbacks
- def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms,
- newly_joined_or_invited_users):
+ def _generate_sync_entry_for_presence(
+ self, sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users
+ ):
"""Generates the presence portion of the sync response. Populates the
`sync_result_builder` with the result.
@@ -1223,17 +1239,13 @@ class SyncHandler(object):
extra_users_ids.discard(user.to_string())
if extra_users_ids:
- states = yield self.presence_handler.get_states(
- extra_users_ids,
- )
+ states = yield self.presence_handler.get_states(extra_users_ids)
presence.extend(states)
# Deduplicate the presence entries so that there's at most one per user
presence = list({p.user_id: p for p in presence}.values())
- presence = sync_config.filter_collection.filter_presence(
- presence
- )
+ presence = sync_config.filter_collection.filter_presence(presence)
sync_result_builder.presence = presence
@@ -1253,8 +1265,8 @@ class SyncHandler(object):
"""
user_id = sync_result_builder.sync_config.user.to_string()
block_all_room_ephemeral = (
- sync_result_builder.since_token is None and
- sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
+ sync_result_builder.since_token is None
+ and sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
)
if block_all_room_ephemeral:
@@ -1275,15 +1287,14 @@ class SyncHandler(object):
have_changed = yield self._have_rooms_changed(sync_result_builder)
if not have_changed:
tags_by_room = yield self.store.get_updated_tags(
- user_id,
- since_token.account_data_key,
+ user_id, since_token.account_data_key
)
if not tags_by_room:
logger.debug("no-oping sync")
defer.returnValue(([], [], [], []))
ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
- "m.ignored_user_list", user_id=user_id,
+ "m.ignored_user_list", user_id=user_id
)
if ignored_account_data:
@@ -1296,7 +1307,7 @@ class SyncHandler(object):
room_entries, invited, newly_joined_rooms, newly_left_rooms = res
tags_by_room = yield self.store.get_updated_tags(
- user_id, since_token.account_data_key,
+ user_id, since_token.account_data_key
)
else:
res = yield self._get_all_rooms(sync_result_builder, ignored_users)
@@ -1331,8 +1342,8 @@ class SyncHandler(object):
for event in it:
if event.type == EventTypes.Member:
if (
- event.membership == Membership.JOIN or
- event.membership == Membership.INVITE
+ event.membership == Membership.JOIN
+ or event.membership == Membership.INVITE
):
newly_joined_or_invited_users.add(event.state_key)
else:
@@ -1343,12 +1354,14 @@ class SyncHandler(object):
newly_left_users -= newly_joined_or_invited_users
- defer.returnValue((
- newly_joined_rooms,
- newly_joined_or_invited_users,
- newly_left_rooms,
- newly_left_users,
- ))
+ defer.returnValue(
+ (
+ newly_joined_rooms,
+ newly_joined_or_invited_users,
+ newly_left_rooms,
+ newly_left_users,
+ )
+ )
@defer.inlineCallbacks
def _have_rooms_changed(self, sync_result_builder):
@@ -1454,7 +1467,9 @@ class SyncHandler(object):
prev_membership = old_mem_ev.membership
issue4422_logger.debug(
"Previous membership for room %s with join: %s (event %s)",
- room_id, prev_membership, old_mem_ev_id,
+ room_id,
+ prev_membership,
+ old_mem_ev_id,
)
if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
@@ -1476,8 +1491,7 @@ class SyncHandler(object):
if not old_state_ids:
old_state_ids = yield self.get_state_at(room_id, since_token)
old_mem_ev_id = old_state_ids.get(
- (EventTypes.Member, user_id),
- None,
+ (EventTypes.Member, user_id), None
)
old_mem_ev = None
if old_mem_ev_id:
@@ -1498,7 +1512,8 @@ class SyncHandler(object):
# Always include leave/ban events. Just take the last one.
# TODO: How do we handle ban -> leave in same batch?
leave_events = [
- e for e in non_joins
+ e
+ for e in non_joins
if e.membership in (Membership.LEAVE, Membership.BAN)
]
@@ -1526,15 +1541,17 @@ class SyncHandler(object):
else:
batch_events = None
- room_entries.append(RoomSyncResultBuilder(
- room_id=room_id,
- rtype="archived",
- events=batch_events,
- newly_joined=room_id in newly_joined_rooms,
- full_state=False,
- since_token=since_token,
- upto_token=leave_token,
- ))
+ room_entries.append(
+ RoomSyncResultBuilder(
+ room_id=room_id,
+ rtype="archived",
+ events=batch_events,
+ newly_joined=room_id in newly_joined_rooms,
+ full_state=False,
+ since_token=since_token,
+ upto_token=leave_token,
+ )
+ )
timeline_limit = sync_config.filter_collection.timeline_limit()
@@ -1581,7 +1598,8 @@ class SyncHandler(object):
# debugging for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger.debug(
"RoomSyncResultBuilder events for newly joined room %s: %r",
- room_id, entry.events,
+ room_id,
+ entry.events,
)
room_entries.append(entry)
@@ -1606,12 +1624,14 @@ class SyncHandler(object):
sync_config = sync_result_builder.sync_config
membership_list = (
- Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN
+ Membership.INVITE,
+ Membership.JOIN,
+ Membership.LEAVE,
+ Membership.BAN,
)
room_list = yield self.store.get_rooms_for_user_where_membership_is(
- user_id=user_id,
- membership_list=membership_list
+ user_id=user_id, membership_list=membership_list
)
room_entries = []
@@ -1619,23 +1639,22 @@ class SyncHandler(object):
for event in room_list:
if event.membership == Membership.JOIN:
- room_entries.append(RoomSyncResultBuilder(
- room_id=event.room_id,
- rtype="joined",
- events=None,
- newly_joined=False,
- full_state=True,
- since_token=since_token,
- upto_token=now_token,
- ))
+ room_entries.append(
+ RoomSyncResultBuilder(
+ room_id=event.room_id,
+ rtype="joined",
+ events=None,
+ newly_joined=False,
+ full_state=True,
+ since_token=since_token,
+ upto_token=now_token,
+ )
+ )
elif event.membership == Membership.INVITE:
if event.sender in ignored_users:
continue
invite = yield self.store.get_event(event.event_id)
- invited.append(InvitedSyncResult(
- room_id=event.room_id,
- invite=invite,
- ))
+ invited.append(InvitedSyncResult(room_id=event.room_id, invite=invite))
elif event.membership in (Membership.LEAVE, Membership.BAN):
# Always send down rooms we were banned or kicked from.
if not sync_config.filter_collection.include_leave:
@@ -1646,22 +1665,31 @@ class SyncHandler(object):
leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,)
)
- room_entries.append(RoomSyncResultBuilder(
- room_id=event.room_id,
- rtype="archived",
- events=None,
- newly_joined=False,
- full_state=True,
- since_token=since_token,
- upto_token=leave_token,
- ))
+ room_entries.append(
+ RoomSyncResultBuilder(
+ room_id=event.room_id,
+ rtype="archived",
+ events=None,
+ newly_joined=False,
+ full_state=True,
+ since_token=since_token,
+ upto_token=leave_token,
+ )
+ )
defer.returnValue((room_entries, invited, []))
@defer.inlineCallbacks
- def _generate_room_entry(self, sync_result_builder, ignored_users,
- room_builder, ephemeral, tags, account_data,
- always_include=False):
+ def _generate_room_entry(
+ self,
+ sync_result_builder,
+ ignored_users,
+ room_builder,
+ ephemeral,
+ tags,
+ account_data,
+ always_include=False,
+ ):
"""Populates the `joined` and `archived` section of `sync_result_builder`
based on the `room_builder`.
@@ -1678,9 +1706,7 @@ class SyncHandler(object):
"""
newly_joined = room_builder.newly_joined
full_state = (
- room_builder.full_state
- or newly_joined
- or sync_result_builder.full_state
+ room_builder.full_state or newly_joined or sync_result_builder.full_state
)
events = room_builder.events
@@ -1697,7 +1723,8 @@ class SyncHandler(object):
upto_token = room_builder.upto_token
batch = yield self._load_filtered_recents(
- room_id, sync_config,
+ room_id,
+ sync_config,
now_token=upto_token,
since_token=since_token,
recents=events,
@@ -1708,7 +1735,8 @@ class SyncHandler(object):
# debug for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger.debug(
"Timeline events after filtering in newly-joined room %s: %r",
- room_id, batch,
+ room_id,
+ batch,
)
# When we join the room (or the client requests full_state), we should
@@ -1726,16 +1754,10 @@ class SyncHandler(object):
account_data_events = []
if tags is not None:
- account_data_events.append({
- "type": "m.tag",
- "content": {"tags": tags},
- })
+ account_data_events.append({"type": "m.tag", "content": {"tags": tags}})
for account_data_type, content in account_data.items():
- account_data_events.append({
- "type": account_data_type,
- "content": content,
- })
+ account_data_events.append({"type": account_data_type, "content": content})
account_data_events = sync_config.filter_collection.filter_room_account_data(
account_data_events
@@ -1743,16 +1765,13 @@ class SyncHandler(object):
ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
- if not (always_include
- or batch
- or account_data_events
- or ephemeral
- or full_state):
+ if not (
+ always_include or batch or account_data_events or ephemeral or full_state
+ ):
return
state = yield self.compute_state_delta(
- room_id, batch, sync_config, since_token, now_token,
- full_state=full_state
+ room_id, batch, sync_config, since_token, now_token, full_state=full_state
)
summary = {}
@@ -1760,22 +1779,19 @@ class SyncHandler(object):
# we include a summary in room responses when we're lazy loading
# members (as the client otherwise doesn't have enough info to form
# the name itself).
- if (
- sync_config.filter_collection.lazy_load_members() and
- (
- # we recalulate the summary:
- # if there are membership changes in the timeline, or
- # if membership has changed during a gappy sync, or
- # if this is an initial sync.
- any(ev.type == EventTypes.Member for ev in batch.events) or
- (
- # XXX: this may include false positives in the form of LL
- # members which have snuck into state
- batch.limited and
- any(t == EventTypes.Member for (t, k) in state)
- ) or
- since_token is None
+ if sync_config.filter_collection.lazy_load_members() and (
+ # we recalulate the summary:
+ # if there are membership changes in the timeline, or
+ # if membership has changed during a gappy sync, or
+ # if this is an initial sync.
+ any(ev.type == EventTypes.Member for ev in batch.events)
+ or (
+ # XXX: this may include false positives in the form of LL
+ # members which have snuck into state
+ batch.limited
+ and any(t == EventTypes.Member for (t, k) in state)
)
+ or since_token is None
):
summary = yield self.compute_summary(
room_id, sync_config, batch, state, now_token
@@ -1794,9 +1810,7 @@ class SyncHandler(object):
)
if room_sync or always_include:
- notifs = yield self.unread_notifs_for_room_id(
- room_id, sync_config
- )
+ notifs = yield self.unread_notifs_for_room_id(room_id, sync_config)
if notifs is not None:
unread_notifications["notification_count"] = notifs["notify_count"]
@@ -1807,11 +1821,8 @@ class SyncHandler(object):
if batch.limited and since_token:
user_id = sync_result_builder.sync_config.user.to_string()
logger.info(
- "Incremental gappy sync of %s for user %s with %d state events" % (
- room_id,
- user_id,
- len(state),
- )
+ "Incremental gappy sync of %s for user %s with %d state events"
+ % (room_id, user_id, len(state))
)
elif room_builder.rtype == "archived":
room_sync = ArchivedSyncResult(
@@ -1841,9 +1852,7 @@ class SyncHandler(object):
Deferred[frozenset[str]]: Set of room_ids the user is in at given
stream_ordering.
"""
- joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering(
- user_id,
- )
+ joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering(user_id)
joined_room_ids = set()
@@ -1862,11 +1871,9 @@ class SyncHandler(object):
logger.info("User joined room after current token: %s", room_id)
extrems = yield self.store.get_forward_extremeties_for_room(
- room_id, stream_ordering,
- )
- users_in_room = yield self.state.get_current_users_in_room(
- room_id, extrems,
+ room_id, stream_ordering
)
+ users_in_room = yield self.state.get_current_users_in_room(room_id, extrems)
if user_id in users_in_room:
joined_room_ids.add(room_id)
@@ -1886,7 +1893,7 @@ def _action_has_highlight(actions):
def _calculate_state(
- timeline_contains, timeline_start, previous, current, lazy_load_members,
+ timeline_contains, timeline_start, previous, current, lazy_load_members
):
"""Works out what state to include in a sync response.
@@ -1930,15 +1937,12 @@ def _calculate_state(
if lazy_load_members:
p_ids.difference_update(
- e for t, e in iteritems(timeline_start)
- if t[0] == EventTypes.Member
+ e for t, e in iteritems(timeline_start) if t[0] == EventTypes.Member
)
state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
- return {
- event_id_to_key[e]: e for e in state_ids
- }
+ return {event_id_to_key[e]: e for e in state_ids}
class SyncResultBuilder(object):
@@ -1961,8 +1965,10 @@ class SyncResultBuilder(object):
groups (GroupsSyncResult|None)
to_device (list)
"""
- def __init__(self, sync_config, full_state, since_token, now_token,
- joined_room_ids):
+
+ def __init__(
+ self, sync_config, full_state, since_token, now_token, joined_room_ids
+ ):
"""
Args:
sync_config (SyncConfig)
@@ -1991,8 +1997,10 @@ class RoomSyncResultBuilder(object):
"""Stores information needed to create either a `JoinedSyncResult` or
`ArchivedSyncResult`.
"""
- def __init__(self, room_id, rtype, events, newly_joined, full_state,
- since_token, upto_token):
+
+ def __init__(
+ self, room_id, rtype, events, newly_joined, full_state, since_token, upto_token
+ ):
"""
Args:
room_id(str)
|