diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index ee6b881de1..6cb756e476 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -61,18 +61,37 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
return bool(self.timeline or self.state or self.ephemeral)
+class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [
+ "room_id",
+ "timeline",
+ "state",
+])):
+ __slots__ = []
+
+ def __nonzero__(self):
+ """Make the result appear empty if there are no updates. This is used
+ to tell if room needs to be part of the sync result.
+ """
+ return bool(self.timeline or self.state)
+
+
class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
"room_id",
"invite",
])):
__slots__ = []
+ def __nonzero__(self):
+ """Invited rooms should always be reported to the client"""
+ return True
+
class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync
"presence", # List of presence events for the user.
"joined", # JoinedSyncResult for each joined room.
"invited", # InvitedSyncResult for each invited room.
+ "archived", # ArchivedSyncResult for each archived room.
])):
__slots__ = []
@@ -156,11 +175,17 @@ class SyncHandler(BaseHandler):
)
room_list = yield self.store.get_rooms_for_user_where_membership_is(
user_id=sync_config.user.to_string(),
- membership_list=[Membership.INVITE, Membership.JOIN]
+ membership_list=(
+ Membership.INVITE,
+ Membership.JOIN,
+ Membership.LEAVE,
+ Membership.BAN
+ )
)
joined = []
invited = []
+ archived = []
for event in room_list:
if event.membership == Membership.JOIN:
room_sync = yield self.initial_sync_for_joined_room(
@@ -173,11 +198,23 @@ class SyncHandler(BaseHandler):
room_id=event.room_id,
invite=invite,
))
+ elif event.membership in (Membership.LEAVE, Membership.BAN):
+ leave_token = now_token.copy_and_replace(
+ "room_key", "s%d" % (event.stream_ordering,)
+ )
+ room_sync = yield self.initial_sync_for_archived_room(
+ sync_config=sync_config,
+ room_id=event.room_id,
+ leave_event_id=event.event_id,
+ leave_token=leave_token,
+ )
+ archived.append(room_sync)
defer.returnValue(SyncResult(
presence=presence,
joined=joined,
invited=invited,
+ archived=archived,
next_batch=now_token,
))
@@ -205,6 +242,28 @@ class SyncHandler(BaseHandler):
))
@defer.inlineCallbacks
+ def initial_sync_for_archived_room(self, room_id, sync_config,
+ leave_event_id, leave_token):
+ """Sync a room for a client which is starting without any state
+ Returns:
+ A Deferred JoinedSyncResult.
+ """
+
+ batch = yield self.load_filtered_recents(
+ room_id, sync_config, leave_token,
+ )
+
+ leave_state = yield self.store.get_state_for_events(
+ [leave_event_id], None
+ )
+
+ defer.returnValue(ArchivedSyncResult(
+ room_id=room_id,
+ timeline=batch,
+ state=leave_state[leave_event_id].values(),
+ ))
+
+ @defer.inlineCallbacks
def incremental_sync_with_gap(self, sync_config, since_token):
""" Get the incremental delta needed to bring the client up to
date with the server.
@@ -257,18 +316,22 @@ class SyncHandler(BaseHandler):
)
joined = []
+ archived = []
if len(room_events) <= timeline_limit:
# There is no gap in any of the rooms. Therefore we can just
# partition the new events by room and return them.
invite_events = []
+ leave_events = []
events_by_room_id = {}
for event in room_events:
events_by_room_id.setdefault(event.room_id, []).append(event)
if event.room_id not in joined_room_ids:
if (event.type == EventTypes.Member
- and event.membership == Membership.INVITE
and event.state_key == sync_config.user.to_string()):
- invite_events.append(event)
+ if event.membership == Membership.INVITE:
+ invite_events.append(event)
+ elif event.membership in (Membership.LEAVE, Membership.BAN):
+ leave_events.append(event)
for room_id in joined_room_ids:
recents = events_by_room_id.get(room_id, [])
@@ -296,11 +359,16 @@ class SyncHandler(BaseHandler):
)
if room_sync:
joined.append(room_sync)
+
else:
invite_events = yield self.store.get_invites_for_user(
sync_config.user.to_string()
)
+ leave_events = yield self.store.get_leave_and_ban_events_for_user(
+ sync_config.user.to_string()
+ )
+
for room_id in joined_room_ids:
room_sync = yield self.incremental_sync_with_gap_for_room(
room_id, sync_config, since_token, now_token,
@@ -309,6 +377,12 @@ class SyncHandler(BaseHandler):
if room_sync:
joined.append(room_sync)
+ for leave_event in leave_events:
+ room_sync = yield self.incremental_sync_for_archived_room(
+ sync_config, leave_event, since_token
+ )
+ archived.append(room_sync)
+
invited = [
InvitedSyncResult(room_id=event.room_id, invite=event)
for event in invite_events
@@ -318,6 +392,7 @@ class SyncHandler(BaseHandler):
presence=presence,
joined=joined,
invited=invited,
+ archived=archived,
next_batch=now_token,
))
@@ -417,6 +492,55 @@ class SyncHandler(BaseHandler):
defer.returnValue(room_sync)
@defer.inlineCallbacks
+ def incremental_sync_for_archived_room(self, sync_config, leave_event,
+ since_token):
+ """ Get the incremental delta needed to bring the client up to date for
+ the archived room.
+ Returns:
+ A Deferred ArchivedSyncResult
+ """
+
+ stream_token = yield self.store.get_stream_token_for_event(
+ leave_event.event_id
+ )
+
+ leave_token = since_token.copy_and_replace("room_key", stream_token)
+
+ batch = yield self.load_filtered_recents(
+ leave_event.room_id, sync_config, leave_token, since_token,
+ )
+
+ logging.debug("Recents %r", batch)
+
+ # TODO(mjark): This seems racy since this isn't being passed a
+ # token to indicate what point in the stream this is
+ leave_state = yield self.store.get_state_for_events(
+ [leave_event.event_id], None
+ )
+
+ state_events_at_leave = leave_state[leave_event.event_id].values()
+
+ state_at_previous_sync = yield self.get_state_at_previous_sync(
+ leave_event.room_id, since_token=since_token
+ )
+
+ state_events_delta = yield self.compute_state_delta(
+ since_token=since_token,
+ previous_state=state_at_previous_sync,
+ current_state=state_events_at_leave,
+ )
+
+ room_sync = ArchivedSyncResult(
+ room_id=leave_event.room_id,
+ timeline=batch,
+ state=state_events_delta,
+ )
+
+ logging.debug("Room sync: %r", room_sync)
+
+ defer.returnValue(room_sync)
+
+ @defer.inlineCallbacks
def get_state_at_previous_sync(self, room_id, since_token):
""" Get the room state at the previous sync the client made.
Returns:
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index fffecb24f5..73473a7e6b 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -136,6 +136,10 @@ class SyncRestServlet(RestServlet):
sync_result.invited, filter, time_now, token_id
)
+ archived = self.encode_archived(
+ sync_result.archived, filter, time_now, token_id
+ )
+
response_content = {
"presence": self.encode_presence(
sync_result.presence, filter, time_now
@@ -143,7 +147,7 @@ class SyncRestServlet(RestServlet):
"rooms": {
"joined": joined,
"invited": invited,
- "archived": {},
+ "archived": archived,
},
"next_batch": sync_result.next_batch.to_string(),
}
@@ -182,14 +186,20 @@ class SyncRestServlet(RestServlet):
return invited
+ def encode_archived(self, rooms, filter, time_now, token_id):
+ joined = {}
+ for room in rooms:
+ joined[room.room_id] = self.encode_room(
+ room, filter, time_now, token_id, joined=False
+ )
+
+ return joined
+
@staticmethod
- def encode_room(room, filter, time_now, token_id):
+ def encode_room(room, filter, time_now, token_id, joined=True):
event_map = {}
state_events = filter.filter_room_state(room.state)
- timeline_events = filter.filter_room_timeline(room.timeline.events)
- ephemeral_events = filter.filter_room_ephemeral(room.ephemeral)
state_event_ids = []
- timeline_event_ids = []
for event in state_events:
# TODO(mjark): Respect formatting requirements in the filter.
event_map[event.event_id] = serialize_event(
@@ -198,6 +208,8 @@ class SyncRestServlet(RestServlet):
)
state_event_ids.append(event.event_id)
+ timeline_events = filter.filter_room_timeline(room.timeline.events)
+ timeline_event_ids = []
for event in timeline_events:
# TODO(mjark): Respect formatting requirements in the filter.
event_map[event.event_id] = serialize_event(
@@ -205,6 +217,7 @@ class SyncRestServlet(RestServlet):
event_format=format_event_for_client_v2_without_event_id,
)
timeline_event_ids.append(event.event_id)
+
result = {
"event_map": event_map,
"timeline": {
@@ -213,8 +226,12 @@ class SyncRestServlet(RestServlet):
"limited": room.timeline.limited,
},
"state": {"events": state_event_ids},
- "ephemeral": {"events": ephemeral_events},
}
+
+ if joined:
+ ephemeral_events = filter.filter_room_ephemeral(room.ephemeral)
+ result["ephemeral"] = {"events": ephemeral_events}
+
return result
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index dd98dcfda8..ae1ad56d9a 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -124,6 +124,19 @@ class RoomMemberStore(SQLBaseStore):
invites.event_id for invite in invites
]))
+ def get_leave_and_ban_events_for_user(self, user_id):
+ """ Get all the leave events for a user
+ Args:
+ user_id (str): The user ID.
+ Returns:
+ A deferred list of event objects.
+ """
+ return self.get_rooms_for_user_where_membership_is(
+ user_id, (Membership.LEAVE, Membership.BAN)
+ ).addCallback(lambda leaves: self._get_events([
+ leave.event_id for leave in leaves
+ ]))
+
def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
""" Get all the rooms for this user where the membership for this user
matches one in the membership list.
|