diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0f066f8421..15a41c6617 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -90,14 +90,12 @@ class TimelineBatch:
events = attr.ib(type=List[EventBase])
limited = attr.ib(bool)
- def __nonzero__(self) -> bool:
+ def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
return bool(self.events)
- __bool__ = __nonzero__ # python3
-
# We can't freeze this class, because we need to update it after it's instantiated to
# update its unread count. This is because we calculate the unread count for a room only
@@ -115,7 +113,7 @@ class JoinedSyncResult:
summary = attr.ib(type=Optional[JsonDict])
unread_count = attr.ib(type=int)
- def __nonzero__(self) -> bool:
+ def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
@@ -128,8 +126,6 @@ class JoinedSyncResult:
# else in the result, we don't need to send it.
)
- __bool__ = __nonzero__ # python3
-
@attr.s(slots=True, frozen=True)
class ArchivedSyncResult:
@@ -138,26 +134,22 @@ class ArchivedSyncResult:
state = attr.ib(type=StateMap[EventBase])
account_data = attr.ib(type=List[JsonDict])
- def __nonzero__(self) -> bool:
+ def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
return bool(self.timeline or self.state or self.account_data)
- __bool__ = __nonzero__ # python3
-
@attr.s(slots=True, frozen=True)
class InvitedSyncResult:
room_id = attr.ib(type=str)
invite = attr.ib(type=EventBase)
- def __nonzero__(self) -> bool:
+ def __bool__(self) -> bool:
"""Invited rooms should always be reported to the client"""
return True
- __bool__ = __nonzero__ # python3
-
@attr.s(slots=True, frozen=True)
class GroupsSyncResult:
@@ -165,11 +157,9 @@ class GroupsSyncResult:
invite = attr.ib(type=JsonDict)
leave = attr.ib(type=JsonDict)
- def __nonzero__(self) -> bool:
+ def __bool__(self) -> bool:
return bool(self.join or self.invite or self.leave)
- __bool__ = __nonzero__ # python3
-
@attr.s(slots=True, frozen=True)
class DeviceLists:
@@ -182,13 +172,11 @@ class DeviceLists:
changed = attr.ib(type=Collection[str])
left = attr.ib(type=Collection[str])
- def __nonzero__(self) -> bool:
+ def __bool__(self) -> bool:
return bool(self.changed or self.left)
- __bool__ = __nonzero__ # python3
-
-@attr.s
+@attr.s(slots=True)
class _RoomChanges:
"""The set of room entries to include in the sync, plus the set of joined
and left room IDs since last sync.
@@ -228,7 +216,7 @@ class SyncResult:
device_one_time_keys_count = attr.ib(type=JsonDict)
groups = attr.ib(type=Optional[GroupsSyncResult])
- def __nonzero__(self) -> bool:
+ def __bool__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
to tell if the notifier needs to wait for more events when polling for
events.
@@ -244,8 +232,6 @@ class SyncResult:
or self.groups
)
- __bool__ = __nonzero__ # python3
-
class SyncHandler:
def __init__(self, hs: "HomeServer"):
@@ -381,7 +367,7 @@ class SyncHandler:
sync_config = sync_result_builder.sync_config
with Measure(self.clock, "ephemeral_by_room"):
- typing_key = since_token.typing_key if since_token else "0"
+ typing_key = since_token.typing_key if since_token else 0
room_ids = sync_result_builder.joined_room_ids
@@ -405,7 +391,7 @@ class SyncHandler:
event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
- receipt_key = since_token.receipt_key if since_token else "0"
+ receipt_key = since_token.receipt_key if since_token else 0
receipt_source = self.event_sources.sources["receipt"]
receipts, receipt_key = await receipt_source.get_new_events(
@@ -984,7 +970,7 @@ class SyncHandler:
raise NotImplementedError()
else:
joined_room_ids = await self.get_rooms_for_user_at(
- user_id, now_token.room_stream_id
+ user_id, now_token.room_key
)
sync_result_builder = SyncResultBuilder(
sync_config,
@@ -1313,12 +1299,11 @@ class SyncHandler:
presence_source = self.event_sources.sources["presence"]
since_token = sync_result_builder.since_token
+ presence_key = None
+ include_offline = False
if since_token and not sync_result_builder.full_state:
presence_key = since_token.presence_key
include_offline = True
- else:
- presence_key = None
- include_offline = False
presence, presence_key = await presence_source.get_new_events(
user=user,
@@ -1326,6 +1311,7 @@ class SyncHandler:
is_guest=sync_config.is_guest,
include_offline=include_offline,
)
+ assert presence_key
sync_result_builder.now_token = now_token.copy_and_replace(
"presence_key", presence_key
)
@@ -1488,7 +1474,7 @@ class SyncHandler:
if rooms_changed:
return True
- stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
+ stream_id = since_token.room_key.stream
for room_id in sync_result_builder.joined_room_ids:
if self.store.has_room_changed_since(room_id, stream_id):
return True
@@ -1612,16 +1598,24 @@ class SyncHandler:
if leave_events:
leave_event = leave_events[-1]
- leave_stream_token = await self.store.get_stream_token_for_event(
+ leave_position = await self.store.get_position_for_event(
leave_event.event_id
)
- leave_token = since_token.copy_and_replace(
- "room_key", leave_stream_token
- )
- if since_token and since_token.is_after(leave_token):
+ # If the leave event happened before the since token then we
+ # bail.
+ if since_token and not leave_position.persisted_after(
+ since_token.room_key
+ ):
continue
+ # We can safely convert the position of the leave event into a
+ # stream token as it'll only be used in the context of this
+ # room. (c.f. the docstring of `to_room_stream_token`).
+ leave_token = since_token.copy_and_replace(
+ "room_key", leave_position.to_room_stream_token()
+ )
+
# If this is an out of band message, like a remote invite
# rejection, we include it in the recents batch. Otherwise, we
# let _load_filtered_recents handle fetching the correct
@@ -1754,7 +1748,7 @@ class SyncHandler:
continue
leave_token = now_token.copy_and_replace(
- "room_key", "s%d" % (event.stream_ordering,)
+ "room_key", RoomStreamToken(None, event.stream_ordering)
)
room_entries.append(
RoomSyncResultBuilder(
@@ -1933,7 +1927,7 @@ class SyncHandler:
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
async def get_rooms_for_user_at(
- self, user_id: str, stream_ordering: int
+ self, user_id: str, room_key: RoomStreamToken
) -> FrozenSet[str]:
"""Get set of joined rooms for a user at the given stream ordering.
@@ -1959,15 +1953,15 @@ class SyncHandler:
# If the membership's stream ordering is after the given stream
# ordering, we need to go and work out if the user was in the room
# before.
- for room_id, membership_stream_ordering in joined_rooms:
- if membership_stream_ordering <= stream_ordering:
+ for room_id, event_pos in joined_rooms:
+ if not event_pos.persisted_after(room_key):
joined_room_ids.add(room_id)
continue
logger.info("User joined room after current token: %s", room_id)
extrems = await self.store.get_forward_extremeties_for_room(
- room_id, stream_ordering
+ room_id, event_pos.stream
)
users_in_room = await self.state.get_current_users_in_room(room_id, extrems)
if user_id in users_in_room:
@@ -2041,7 +2035,7 @@ def _calculate_state(
return {event_id_to_key[e]: e for e in state_ids}
-@attr.s
+@attr.s(slots=True)
class SyncResultBuilder:
"""Used to help build up a new SyncResult for a user
@@ -2077,7 +2071,7 @@ class SyncResultBuilder:
to_device = attr.ib(type=List[JsonDict], default=attr.Factory(list))
-@attr.s
+@attr.s(slots=True)
class RoomSyncResultBuilder:
"""Stores information needed to create either a `JoinedSyncResult` or
`ArchivedSyncResult`.
|