diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index bc6d6af133..6b7c6a436e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -499,6 +499,10 @@ class SyncHandler(object):
@defer.inlineCallbacks
def generate_sync_result(self, sync_config, since_token=None, full_state=False):
+ # NB: The now_token gets changed by some of the generate_sync_* methods,
+ # this is due to some of the underlying streams not supporting the ability
+ # to query up to a given point.
+ # Always use the `now_token` in `SyncResultBuilder`
now_token = yield self.event_sources.get_current_token()
sync_result_builer = SyncResultBuilder(
@@ -511,9 +515,10 @@ class SyncHandler(object):
sync_result_builer
)
- newly_joined_rooms, newly_joined_users = yield self.generate_sync_entry_for_rooms(
+ res = yield self.generate_sync_entry_for_rooms(
sync_result_builer, account_data_by_room
)
+ newly_joined_rooms, newly_joined_users = res
yield self.generate_sync_entry_for_presence(
sync_result_builer, newly_joined_rooms, newly_joined_users
@@ -631,7 +636,7 @@ class SyncHandler(object):
if sync_result_builer.since_token:
res = yield self._get_rooms_changed(sync_result_builer, ignored_users)
- joined, invited, archived, newly_joined_rooms = res
+ room_entries, invited, newly_joined_rooms = res
tags_by_room = yield self.store.get_updated_tags(
user_id,
@@ -639,13 +644,12 @@ class SyncHandler(object):
)
else:
res = yield self._get_all_rooms(sync_result_builer, ignored_users)
- joined, invited, archived, newly_joined_rooms = res
+ room_entries, invited, newly_joined_rooms = res
tags_by_room = yield self.store.get_tags_for_user(user_id)
- def handle_joined(room_entry):
+ def handle_room_entries(room_entry):
return self._generate_room_entry(
- "joined",
sync_result_builer,
ignored_users,
room_entry,
@@ -655,21 +659,7 @@ class SyncHandler(object):
always_include=sync_result_builer.full_state,
)
- yield concurrently_execute(handle_joined, joined, 10)
-
- def handle_archived(room_entry):
- return self._generate_room_entry(
- "archived",
- sync_result_builer,
- ignored_users,
- room_entry,
- ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
- tags=tags_by_room.get(room_entry.room_id),
- account_data=account_data_by_room.get(room_entry.room_id, {}),
- always_include=sync_result_builer.full_state,
- )
-
- yield concurrently_execute(handle_archived, archived, 10)
+ yield concurrently_execute(handle_room_entries, room_entries, 10)
sync_result_builer.invited.extend(invited)
@@ -711,7 +701,7 @@ class SyncHandler(object):
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
newly_joined_rooms = []
- archived = []
+ room_entries = []
invited = []
for room_id, events in mem_change_events_by_room_id.items():
non_joins = [e for e in events if e.membership != Membership.JOIN]
@@ -759,8 +749,9 @@ class SyncHandler(object):
if since_token and since_token.is_after(leave_token):
continue
- archived.append(RoomSyncResultBuilder(
+ room_entries.append(RoomSyncResultBuilder(
room_id=room_id,
+ rtype="archived",
events=None,
newly_joined=room_id in newly_joined_rooms,
full_state=False,
@@ -778,7 +769,6 @@ class SyncHandler(object):
limit=timeline_limit + 1,
)
- joined = []
# We loop through all room ids, even if there are no new events, in case
# there are non room events taht we need to notify about.
for room_id in joined_room_ids:
@@ -789,8 +779,9 @@ class SyncHandler(object):
prev_batch_token = now_token.copy_and_replace("room_key", start_key)
- joined.append(RoomSyncResultBuilder(
+ room_entries.append(RoomSyncResultBuilder(
room_id=room_id,
+ rtype="joined",
events=events,
newly_joined=room_id in newly_joined_rooms,
full_state=False,
@@ -798,8 +789,9 @@ class SyncHandler(object):
upto_token=prev_batch_token,
))
else:
- joined.append(RoomSyncResultBuilder(
+ room_entries.append(RoomSyncResultBuilder(
room_id=room_id,
+ rtype="joined",
events=[],
newly_joined=room_id in newly_joined_rooms,
full_state=False,
@@ -807,7 +799,7 @@ class SyncHandler(object):
upto_token=since_token,
))
- defer.returnValue((joined, invited, archived, newly_joined_rooms))
+ defer.returnValue((room_entries, invited, newly_joined_rooms))
@defer.inlineCallbacks
def _get_all_rooms(self, sync_result_builer, ignored_users):
@@ -825,14 +817,14 @@ class SyncHandler(object):
membership_list=membership_list
)
- joined = []
+ room_entries = []
invited = []
- archived = []
for event in room_list:
if event.membership == Membership.JOIN:
- joined.append(RoomSyncResultBuilder(
+ room_entries.append(RoomSyncResultBuilder(
room_id=event.room_id,
+ rtype="joined",
events=None,
newly_joined=False,
full_state=True,
@@ -857,8 +849,9 @@ class SyncHandler(object):
leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,)
)
- archived.append(RoomSyncResultBuilder(
+ room_entries.append(RoomSyncResultBuilder(
room_id=event.room_id,
+ rtype="archived",
events=None,
newly_joined=False,
full_state=True,
@@ -866,10 +859,10 @@ class SyncHandler(object):
upto_token=leave_token,
))
- defer.returnValue((joined, invited, archived, []))
+ defer.returnValue((room_entries, invited, []))
@defer.inlineCallbacks
- def _generate_room_entry(self, room_type, sync_result_builer, ignored_users,
+ def _generate_room_entry(self, sync_result_builer, ignored_users,
room_builder, ephemeral, tags, account_data,
always_include=False):
since_token = sync_result_builer.since_token
@@ -892,7 +885,7 @@ class SyncHandler(object):
now_token=upto_token,
since_token=since_token,
recents=events,
- newly_joined_room=newly_joined, # FIXME
+ newly_joined_room=newly_joined,
)
account_data_events = []
@@ -922,7 +915,7 @@ class SyncHandler(object):
full_state=full_state
)
- if room_type == "joined":
+ if room_builder.rtype == "joined":
unread_notifications = {}
room_sync = JoinedSyncResult(
room_id=room_id,
@@ -943,7 +936,7 @@ class SyncHandler(object):
unread_notifications["highlight_count"] = notifs["highlight_count"]
sync_result_builer.joined.append(room_sync)
- elif room_type == "archived":
+ elif room_builder.rtype == "archived":
room_sync = ArchivedSyncResult(
room_id=room_id,
timeline=batch,
@@ -952,6 +945,8 @@ class SyncHandler(object):
)
if room_sync or always_include:
sync_result_builer.archived.append(room_sync)
+ else:
+ raise Exception("Unrecognized rtype: %r", room_builder.rtype)
def _action_has_highlight(actions):
@@ -1017,9 +1012,10 @@ class SyncResultBuilder(object):
class RoomSyncResultBuilder(object):
- def __init__(self, room_id, events, newly_joined, full_state, since_token,
- upto_token):
+ def __init__(self, room_id, rtype, events, newly_joined, full_state,
+ since_token, upto_token):
self.room_id = room_id
+ self.rtype = rtype
self.events = events
self.newly_joined = newly_joined
self.full_state = full_state
|