diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 353a416054..6dc9d0fb92 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -28,23 +28,30 @@ logger = logging.getLogger(__name__)
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
- "client_info",
- "limit",
- "gap",
- "sort",
- "backfill",
"filter",
])
-class RoomSyncResult(collections.namedtuple("RoomSyncResult", [
- "room_id",
- "limited",
- "published",
- "events",
- "state",
+class TimelineBatch(collections.namedtuple("TimelineBatch", [
"prev_batch",
+ "events",
+ "limited",
+])):
+ __slots__ = []
+
+ def __nonzero__(self):
+ """Make the result appear empty if there are no updates. This is used
+ to tell if room needs to be part of the sync result.
+ """
+ return bool(self.events)
+
+
+class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
+ "room_id", # str
+ "timeline", # TimelineBatch
+ "state", # dict[(str, str), FrozenEvent]
"ephemeral",
+ "private_user_data",
])):
__slots__ = []
@@ -52,14 +59,50 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
- return bool(self.events or self.state or self.ephemeral)
+ return bool(
+ self.timeline
+ or self.state
+ or self.ephemeral
+ or self.private_user_data
+ )
+
+
+class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [
+ "room_id", # str
+ "timeline", # TimelineBatch
+ "state", # dict[(str, str), FrozenEvent]
+ "private_user_data",
+])):
+ __slots__ = []
+
+ def __nonzero__(self):
+ """Make the result appear empty if there are no updates. This is used
+ to tell if room needs to be part of the sync result.
+ """
+ return bool(
+ self.timeline
+ or self.state
+ or self.private_user_data
+ )
+
+
+class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
+ "room_id", # str
+ "invite", # FrozenEvent: the invite event
+])):
+ __slots__ = []
+
+ def __nonzero__(self):
+ """Invited rooms should always be reported to the client"""
+ return True
class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync
- "private_user_data", # List of private events for the user.
- "public_user_data", # List of public events for all users.
- "rooms", # RoomSyncResult for each room.
+ "presence", # List of presence events for the user.
+ "joined", # JoinedSyncResult for each joined room.
+ "invited", # InvitedSyncResult for each invited room.
+ "archived", # ArchivedSyncResult for each archived room.
])):
__slots__ = []
@@ -69,7 +112,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
events.
"""
return bool(
- self.private_user_data or self.public_user_data or self.rooms
+ self.presence or self.joined or self.invited
)
@@ -81,67 +124,58 @@ class SyncHandler(BaseHandler):
self.clock = hs.get_clock()
@defer.inlineCallbacks
- def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0):
+ def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
+ full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
Returns:
A Deferred SyncResult.
"""
- if timeout == 0 or since_token is None:
- result = yield self.current_sync_for_user(sync_config, since_token)
+
+ if timeout == 0 or since_token is None or full_state:
+ # we are going to return immediately, so don't bother calling
+ # notifier.wait_for_events.
+ result = yield self.current_sync_for_user(sync_config, since_token,
+ full_state=full_state)
defer.returnValue(result)
else:
def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, since_token)
- rm_handler = self.hs.get_handlers().room_member_handler
-
- app_service = yield self.store.get_app_service_by_user_id(
- sync_config.user.to_string()
- )
- if app_service:
- rooms = yield self.store.get_app_service_rooms(app_service)
- room_ids = set(r.room_id for r in rooms)
- else:
- room_ids = yield rm_handler.get_joined_rooms_for_user(
- sync_config.user
- )
-
result = yield self.notifier.wait_for_events(
- sync_config.user, room_ids,
- sync_config.filter, timeout, current_sync_callback
+ sync_config.user, timeout, current_sync_callback,
+ from_token=since_token
)
defer.returnValue(result)
- def current_sync_for_user(self, sync_config, since_token=None):
+ def current_sync_for_user(self, sync_config, since_token=None,
+ full_state=False):
"""Get the sync for client needed to match what the server has now.
Returns:
A Deferred SyncResult.
"""
- if since_token is None:
- return self.initial_sync(sync_config)
+ if since_token is None or full_state:
+ return self.full_state_sync(sync_config, since_token)
else:
- if sync_config.gap:
- return self.incremental_sync_with_gap(sync_config, since_token)
- else:
- # TODO(mjark): Handle gapless sync
- raise NotImplementedError()
+ return self.incremental_sync_with_gap(sync_config, since_token)
@defer.inlineCallbacks
- def initial_sync(self, sync_config):
- """Get a sync for a client which is starting without any state
+ def full_state_sync(self, sync_config, timeline_since_token):
+ """Get a sync for a client which is starting without any state.
+
+ If a 'message_since_token' is given, only timeline events which have
+ happened since that token will be returned.
+
Returns:
A Deferred SyncResult.
"""
- if sync_config.sort == "timeline,desc":
- # TODO(mjark): Handle going through events in reverse order?.
- # What does "most recent events" mean when applying the limits mean
- # in this case?
- raise NotImplementedError()
-
now_token = yield self.event_sources.get_current_token()
+ now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+ sync_config, now_token
+ )
+
presence_stream = self.event_sources.sources["presence"]
# TODO (mjark): This looks wrong, shouldn't we be getting the presence
# UP to the present rather than after the present?
@@ -153,52 +187,179 @@ class SyncHandler(BaseHandler):
)
room_list = yield self.store.get_rooms_for_user_where_membership_is(
user_id=sync_config.user.to_string(),
- membership_list=[Membership.INVITE, Membership.JOIN]
+ membership_list=(
+ Membership.INVITE,
+ Membership.JOIN,
+ Membership.LEAVE,
+ Membership.BAN
+ )
)
- # TODO (mjark): Does public mean "published"?
- published_rooms = yield self.store.get_rooms(is_public=True)
- published_room_ids = set(r["room_id"] for r in published_rooms)
+ tags_by_room = yield self.store.get_tags_for_user(
+ sync_config.user.to_string()
+ )
- rooms = []
+ joined = []
+ invited = []
+ archived = []
for event in room_list:
- room_sync = yield self.initial_sync_for_room(
- event.room_id, sync_config, now_token, published_room_ids
- )
- rooms.append(room_sync)
+ if event.membership == Membership.JOIN:
+ room_sync = yield self.full_state_sync_for_joined_room(
+ room_id=event.room_id,
+ sync_config=sync_config,
+ now_token=now_token,
+ timeline_since_token=timeline_since_token,
+ ephemeral_by_room=ephemeral_by_room,
+ tags_by_room=tags_by_room,
+ )
+ joined.append(room_sync)
+ elif event.membership == Membership.INVITE:
+ invite = yield self.store.get_event(event.event_id)
+ invited.append(InvitedSyncResult(
+ room_id=event.room_id,
+ invite=invite,
+ ))
+ elif event.membership in (Membership.LEAVE, Membership.BAN):
+ leave_token = now_token.copy_and_replace(
+ "room_key", "s%d" % (event.stream_ordering,)
+ )
+ room_sync = yield self.full_state_sync_for_archived_room(
+ sync_config=sync_config,
+ room_id=event.room_id,
+ leave_event_id=event.event_id,
+ leave_token=leave_token,
+ timeline_since_token=timeline_since_token,
+ tags_by_room=tags_by_room,
+ )
+ archived.append(room_sync)
defer.returnValue(SyncResult(
- public_user_data=presence,
- private_user_data=[],
- rooms=rooms,
+ presence=presence,
+ joined=joined,
+ invited=invited,
+ archived=archived,
next_batch=now_token,
))
@defer.inlineCallbacks
- def initial_sync_for_room(self, room_id, sync_config, now_token,
- published_room_ids):
+ def full_state_sync_for_joined_room(self, room_id, sync_config,
+ now_token, timeline_since_token,
+ ephemeral_by_room, tags_by_room):
"""Sync a room for a client which is starting without any state
Returns:
- A Deferred RoomSyncResult.
+ A Deferred JoinedSyncResult.
"""
- recents, prev_batch_token, limited = yield self.load_filtered_recents(
- room_id, sync_config, now_token,
+ batch = yield self.load_filtered_recents(
+ room_id, sync_config, now_token, since_token=timeline_since_token
)
- current_state = yield self.state_handler.get_current_state(
- room_id
+ current_state = yield self.get_state_at(room_id, now_token)
+
+ defer.returnValue(JoinedSyncResult(
+ room_id=room_id,
+ timeline=batch,
+ state=current_state,
+ ephemeral=ephemeral_by_room.get(room_id, []),
+ private_user_data=self.private_user_data_for_room(
+ room_id, tags_by_room
+ ),
+ ))
+
+ def private_user_data_for_room(self, room_id, tags_by_room):
+ private_user_data = []
+ tags = tags_by_room.get(room_id)
+ if tags is not None:
+ private_user_data.append({
+ "type": "m.tag",
+ "content": {"tags": tags},
+ })
+ return private_user_data
+
+ @defer.inlineCallbacks
+ def ephemeral_by_room(self, sync_config, now_token, since_token=None):
+ """Get the ephemeral events for each room the user is in
+ Args:
+ sync_config (SyncConfig): The flags, filters and user for the sync.
+ now_token (StreamToken): Where the server is currently up to.
+ since_token (StreamToken): Where the server was when the client
+ last synced.
+ Returns:
+ A tuple of the now StreamToken, updated to reflect the which typing
+ events are included, and a dict mapping from room_id to a list of
+ typing events for that room.
+ """
+
+ typing_key = since_token.typing_key if since_token else "0"
+
+ rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+ room_ids = [room.room_id for room in rooms]
+
+ typing_source = self.event_sources.sources["typing"]
+ typing, typing_key = yield typing_source.get_new_events(
+ user=sync_config.user,
+ from_key=typing_key,
+ limit=sync_config.filter.ephemeral_limit(),
+ room_ids=room_ids,
+ is_guest=False,
)
- current_state_events = current_state.values()
+ now_token = now_token.copy_and_replace("typing_key", typing_key)
+
+ ephemeral_by_room = {}
- defer.returnValue(RoomSyncResult(
+ for event in typing:
+ # we want to exclude the room_id from the event, but modifying the
+ # result returned by the event source is poor form (it might cache
+ # the object)
+ room_id = event["room_id"]
+ event_copy = {k: v for (k, v) in event.iteritems()
+ if k != "room_id"}
+ ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+
+ receipt_key = since_token.receipt_key if since_token else "0"
+
+ receipt_source = self.event_sources.sources["receipt"]
+ receipts, receipt_key = yield receipt_source.get_new_events(
+ user=sync_config.user,
+ from_key=receipt_key,
+ limit=sync_config.filter.ephemeral_limit(),
+ room_ids=room_ids,
+ # /sync doesn't support guest access, they can't get to this point in code
+ is_guest=False,
+ )
+ now_token = now_token.copy_and_replace("receipt_key", receipt_key)
+
+ for event in receipts:
+ room_id = event["room_id"]
+ # exclude room id, as above
+ event_copy = {k: v for (k, v) in event.iteritems()
+ if k != "room_id"}
+ ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+
+ defer.returnValue((now_token, ephemeral_by_room))
+
+ @defer.inlineCallbacks
+ def full_state_sync_for_archived_room(self, room_id, sync_config,
+ leave_event_id, leave_token,
+ timeline_since_token, tags_by_room):
+ """Sync a room for a client which is starting without any state
+ Returns:
+ A Deferred JoinedSyncResult.
+ """
+
+ batch = yield self.load_filtered_recents(
+ room_id, sync_config, leave_token, since_token=timeline_since_token
+ )
+
+ leave_state = yield self.store.get_state_for_event(leave_event_id)
+
+ defer.returnValue(ArchivedSyncResult(
room_id=room_id,
- published=room_id in published_room_ids,
- events=recents,
- prev_batch=prev_batch_token,
- state=current_state_events,
- limited=limited,
- ephemeral=[],
+ timeline=batch,
+ state=leave_state,
+ private_user_data=self.private_user_data_for_room(
+ room_id, tags_by_room
+ ),
))
@defer.inlineCallbacks
@@ -208,34 +369,25 @@ class SyncHandler(BaseHandler):
Returns:
A Deferred SyncResult.
"""
- if sync_config.sort == "timeline,desc":
- # TODO(mjark): Handle going through events in reverse order?.
- # What does "most recent events" mean when applying the limits mean
- # in this case?
- raise NotImplementedError()
-
now_token = yield self.event_sources.get_current_token()
+ rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+ room_ids = [room.room_id for room in rooms]
+
presence_source = self.event_sources.sources["presence"]
- presence, presence_key = yield presence_source.get_new_events_for_user(
+ presence, presence_key = yield presence_source.get_new_events(
user=sync_config.user,
from_key=since_token.presence_key,
- limit=sync_config.limit,
+ limit=sync_config.filter.presence_limit(),
+ room_ids=room_ids,
+ # /sync doesn't support guest access, they can't get to this point in code
+ is_guest=False,
)
now_token = now_token.copy_and_replace("presence_key", presence_key)
- typing_source = self.event_sources.sources["typing"]
- typing, typing_key = yield typing_source.get_new_events_for_user(
- user=sync_config.user,
- from_key=since_token.typing_key,
- limit=sync_config.limit,
+ now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+ sync_config, now_token, since_token
)
- now_token = now_token.copy_and_replace("typing_key", typing_key)
-
- typing_by_room = {event["room_id"]: [event] for event in typing}
- for event in typing:
- event.pop("room_id")
- logger.debug("Typing %r", typing_by_room)
rm_handler = self.hs.get_handlers().room_member_handler
app_service = yield self.store.get_app_service_by_user_id(
@@ -243,35 +395,55 @@ class SyncHandler(BaseHandler):
)
if app_service:
rooms = yield self.store.get_app_service_rooms(app_service)
- room_ids = set(r.room_id for r in rooms)
+ joined_room_ids = set(r.room_id for r in rooms)
else:
- room_ids = yield rm_handler.get_joined_rooms_for_user(
+ joined_room_ids = yield rm_handler.get_joined_rooms_for_user(
sync_config.user
)
- # TODO (mjark): Does public mean "published"?
- published_rooms = yield self.store.get_rooms(is_public=True)
- published_room_ids = set(r["room_id"] for r in published_rooms)
+ timeline_limit = sync_config.filter.timeline_limit()
room_events, _ = yield self.store.get_room_events_stream(
sync_config.user.to_string(),
from_key=since_token.room_key,
to_key=now_token.room_key,
- room_id=None,
- limit=sync_config.limit + 1,
+ limit=timeline_limit + 1,
)
- rooms = []
- if len(room_events) <= sync_config.limit:
+ tags_by_room = yield self.store.get_updated_tags(
+ sync_config.user.to_string(),
+ since_token.private_user_data_key,
+ )
+
+ joined = []
+ archived = []
+ if len(room_events) <= timeline_limit:
# There is no gap in any of the rooms. Therefore we can just
# partition the new events by room and return them.
+ logger.debug("Got %i events for incremental sync - not limited",
+ len(room_events))
+
+ invite_events = []
+ leave_events = []
events_by_room_id = {}
for event in room_events:
events_by_room_id.setdefault(event.room_id, []).append(event)
-
- for room_id in room_ids:
+ if event.room_id not in joined_room_ids:
+ if (event.type == EventTypes.Member
+ and event.state_key == sync_config.user.to_string()):
+ if event.membership == Membership.INVITE:
+ invite_events.append(event)
+ elif event.membership in (Membership.LEAVE, Membership.BAN):
+ leave_events.append(event)
+
+ for room_id in joined_room_ids:
recents = events_by_room_id.get(room_id, [])
- state = [event for event in recents if event.is_state()]
+ logger.debug("Events for room %s: %r", room_id, recents)
+ state = {
+ (event.type, event.state_key): event
+ for event in recents if event.is_state()}
+ limited = False
+
if recents:
prev_batch = now_token.copy_and_replace(
"room_key", recents[0].internal_metadata.before
@@ -279,95 +451,87 @@ class SyncHandler(BaseHandler):
else:
prev_batch = now_token
- state = yield self.check_joined_room(
- sync_config, room_id, state
- )
+ just_joined = yield self.check_joined_room(sync_config, state)
+ if just_joined:
+ logger.debug("User has just joined %s: needs full state",
+ room_id)
+ state = yield self.get_state_at(room_id, now_token)
+ # the timeline is inherently limited if we've just joined
+ limited = True
- room_sync = RoomSyncResult(
+ room_sync = JoinedSyncResult(
room_id=room_id,
- published=room_id in published_room_ids,
- events=recents,
- prev_batch=prev_batch,
+ timeline=TimelineBatch(
+ events=recents,
+ prev_batch=prev_batch,
+ limited=limited,
+ ),
state=state,
- limited=False,
- ephemeral=typing_by_room.get(room_id, [])
+ ephemeral=ephemeral_by_room.get(room_id, []),
+ private_user_data=self.private_user_data_for_room(
+ room_id, tags_by_room
+ ),
)
+ logger.debug("Result for room %s: %r", room_id, room_sync)
+
if room_sync:
- rooms.append(room_sync)
+ joined.append(room_sync)
+
else:
- for room_id in room_ids:
+ logger.debug("Got %i events for incremental sync - hit limit",
+ len(room_events))
+
+ invite_events = yield self.store.get_invites_for_user(
+ sync_config.user.to_string()
+ )
+
+ leave_events = yield self.store.get_leave_and_ban_events_for_user(
+ sync_config.user.to_string()
+ )
+
+ for room_id in joined_room_ids:
room_sync = yield self.incremental_sync_with_gap_for_room(
room_id, sync_config, since_token, now_token,
- published_room_ids, typing_by_room
+ ephemeral_by_room, tags_by_room
)
if room_sync:
- rooms.append(room_sync)
+ joined.append(room_sync)
- defer.returnValue(SyncResult(
- public_user_data=presence,
- private_user_data=[],
- rooms=rooms,
- next_batch=now_token,
- ))
-
- @defer.inlineCallbacks
- def _filter_events_for_client(self, user_id, room_id, events):
- event_id_to_state = yield self.store.get_state_for_events(
- room_id, frozenset(e.event_id for e in events),
- types=(
- (EventTypes.RoomHistoryVisibility, ""),
- (EventTypes.Member, user_id),
+ for leave_event in leave_events:
+ room_sync = yield self.incremental_sync_for_archived_room(
+ sync_config, leave_event, since_token, tags_by_room
)
- )
-
- def allowed(event, state):
- if event.type == EventTypes.RoomHistoryVisibility:
- return True
-
- membership_ev = state.get((EventTypes.Member, user_id), None)
- if membership_ev:
- membership = membership_ev.membership
- else:
- membership = Membership.LEAVE
-
- if membership == Membership.JOIN:
- return True
+ archived.append(room_sync)
- history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
- if history:
- visibility = history.content.get("history_visibility", "shared")
- else:
- visibility = "shared"
+ invited = [
+ InvitedSyncResult(room_id=event.room_id, invite=event)
+ for event in invite_events
+ ]
- if visibility == "public":
- return True
- elif visibility == "shared":
- return True
- elif visibility == "joined":
- return membership == Membership.JOIN
- elif visibility == "invited":
- return membership == Membership.INVITE
-
- return True
-
- defer.returnValue([
- event
- for event in events
- if allowed(event, event_id_to_state[event.event_id])
- ])
+ defer.returnValue(SyncResult(
+ presence=presence,
+ joined=joined,
+ invited=invited,
+ archived=archived,
+ next_batch=now_token,
+ ))
@defer.inlineCallbacks
def load_filtered_recents(self, room_id, sync_config, now_token,
since_token=None):
+ """
+ :returns a Deferred TimelineBatch
+ """
limited = True
recents = []
filtering_factor = 2
- load_limit = max(sync_config.limit * filtering_factor, 100)
+ timeline_limit = sync_config.filter.timeline_limit()
+ load_limit = max(timeline_limit * filtering_factor, 100)
max_repeat = 3 # Only try a few times per room, otherwise
room_key = now_token.room_key
end_key = room_key
- while limited and len(recents) < sync_config.limit and max_repeat:
+ while limited and len(recents) < timeline_limit and max_repeat:
events, keys = yield self.store.get_recent_events_for_room(
room_id,
limit=load_limit + 1,
@@ -376,9 +540,9 @@ class SyncHandler(BaseHandler):
)
(room_key, _) = keys
end_key = "s" + room_key.split('-')[-1]
- loaded_recents = sync_config.filter.filter_room_events(events)
+ loaded_recents = sync_config.filter.filter_room_timeline(events)
loaded_recents = yield self._filter_events_for_client(
- sync_config.user.to_string(), room_id, loaded_recents,
+ sync_config.user.to_string(), loaded_recents,
)
loaded_recents.extend(recents)
recents = loaded_recents
@@ -386,64 +550,64 @@ class SyncHandler(BaseHandler):
limited = False
max_repeat -= 1
- if len(recents) > sync_config.limit:
- recents = recents[-sync_config.limit:]
+ if len(recents) > timeline_limit:
+ limited = True
+ recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before
prev_batch_token = now_token.copy_and_replace(
"room_key", room_key
)
- defer.returnValue((recents, prev_batch_token, limited))
+ defer.returnValue(TimelineBatch(
+ events=recents, prev_batch=prev_batch_token, limited=limited
+ ))
@defer.inlineCallbacks
def incremental_sync_with_gap_for_room(self, room_id, sync_config,
since_token, now_token,
- published_room_ids, typing_by_room):
+ ephemeral_by_room, tags_by_room):
""" Get the incremental delta needed to bring the client up to date for
the room. Gives the client the most recent events and the changes to
state.
Returns:
- A Deferred RoomSyncResult
+ A Deferred JoinedSyncResult
"""
+ logger.debug("Doing incremental sync for room %s between %s and %s",
+ room_id, since_token, now_token)
# TODO(mjark): Check for redactions we might have missed.
- recents, prev_batch_token, limited = yield self.load_filtered_recents(
+ batch = yield self.load_filtered_recents(
room_id, sync_config, now_token, since_token,
)
- logging.debug("Recents %r", recents)
+ logging.debug("Recents %r", batch)
- # TODO(mjark): This seems racy since this isn't being passed a
- # token to indicate what point in the stream this is
- current_state = yield self.state_handler.get_current_state(
- room_id
- )
- current_state_events = current_state.values()
+ current_state = yield self.get_state_at(room_id, now_token)
- state_at_previous_sync = yield self.get_state_at_previous_sync(
- room_id, since_token=since_token
+ state_at_previous_sync = yield self.get_state_at(
+ room_id, stream_position=since_token
)
- state_events_delta = yield self.compute_state_delta(
+ state = yield self.compute_state_delta(
since_token=since_token,
previous_state=state_at_previous_sync,
- current_state=current_state_events,
+ current_state=current_state,
)
- state_events_delta = yield self.check_joined_room(
- sync_config, room_id, state_events_delta
- )
+ just_joined = yield self.check_joined_room(sync_config, state)
+ if just_joined:
+ state = yield self.get_state_at(room_id, now_token)
- room_sync = RoomSyncResult(
+ room_sync = JoinedSyncResult(
room_id=room_id,
- published=room_id in published_room_ids,
- events=recents,
- prev_batch=prev_batch_token,
- state=state_events_delta,
- limited=limited,
- ephemeral=typing_by_room.get(room_id, [])
+ timeline=batch,
+ state=state,
+ ephemeral=ephemeral_by_room.get(room_id, []),
+ private_user_data=self.private_user_data_for_room(
+ room_id, tags_by_room
+ ),
)
logging.debug("Room sync: %r", room_sync)
@@ -451,58 +615,125 @@ class SyncHandler(BaseHandler):
defer.returnValue(room_sync)
@defer.inlineCallbacks
- def get_state_at_previous_sync(self, room_id, since_token):
- """ Get the room state at the previous sync the client made.
+ def incremental_sync_for_archived_room(self, sync_config, leave_event,
+ since_token, tags_by_room):
+ """ Get the incremental delta needed to bring the client up to date for
+ the archived room.
Returns:
- A Deferred list of Events.
+ A Deferred ArchivedSyncResult
+ """
+
+ stream_token = yield self.store.get_stream_token_for_event(
+ leave_event.event_id
+ )
+
+ leave_token = since_token.copy_and_replace("room_key", stream_token)
+
+ batch = yield self.load_filtered_recents(
+ leave_event.room_id, sync_config, leave_token, since_token,
+ )
+
+ logging.debug("Recents %r", batch)
+
+ state_events_at_leave = yield self.store.get_state_for_event(
+ leave_event.event_id
+ )
+
+ state_at_previous_sync = yield self.get_state_at(
+ leave_event.room_id, stream_position=since_token
+ )
+
+ state_events_delta = yield self.compute_state_delta(
+ since_token=since_token,
+ previous_state=state_at_previous_sync,
+ current_state=state_events_at_leave,
+ )
+
+ room_sync = ArchivedSyncResult(
+ room_id=leave_event.room_id,
+ timeline=batch,
+ state=state_events_delta,
+ private_user_data=self.private_user_data_for_room(
+ leave_event.room_id, tags_by_room
+ ),
+ )
+
+ logging.debug("Room sync: %r", room_sync)
+
+ defer.returnValue(room_sync)
+
+ @defer.inlineCallbacks
+ def get_state_after_event(self, event):
+ """
+ Get the room state after the given event
+
+ :param synapse.events.EventBase event: event of interest
+ :return: A Deferred map from ((type, state_key)->Event)
+ """
+ state = yield self.store.get_state_for_event(event.event_id)
+ if event.is_state():
+ state = state.copy()
+ state[(event.type, event.state_key)] = event
+ defer.returnValue(state)
+
+ @defer.inlineCallbacks
+ def get_state_at(self, room_id, stream_position):
+ """ Get the room state at a particular stream position
+ :param str room_id: room for which to get state
+ :param StreamToken stream_position: point at which to get state
+ :returns: A Deferred map from ((type, state_key)->Event)
"""
last_events, token = yield self.store.get_recent_events_for_room(
- room_id, end_token=since_token.room_key, limit=1,
+ room_id, end_token=stream_position.room_key, limit=1,
)
if last_events:
- last_event = last_events[0]
- last_context = yield self.state_handler.compute_event_context(
- last_event
- )
- if last_event.is_state():
- state = [last_event] + last_context.current_state.values()
- else:
- state = last_context.current_state.values()
+ last_event = last_events[-1]
+ state = yield self.get_state_after_event(last_event)
+
else:
- state = ()
+ # no events in this room - so presumably no state
+ state = {}
defer.returnValue(state)
def compute_state_delta(self, since_token, previous_state, current_state):
""" Works out the differnce in state between the current state and the
state the client got when it last performed a sync.
- Returns:
- A list of events.
+
+ :param str since_token: the point we are comparing against
+ :param dict[(str,str), synapse.events.FrozenEvent] previous_state: the
+ state to compare to
+ :param dict[(str,str), synapse.events.FrozenEvent] current_state: the
+ new state
+
+ :returns A new event dictionary
"""
# TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state
# updates even if they occured logically before the previous event.
# TODO(mjark) Check for new redactions in the state events.
- previous_dict = {event.event_id: event for event in previous_state}
- state_delta = []
- for event in current_state:
- if event.event_id not in previous_dict:
- state_delta.append(event)
+
+ state_delta = {}
+ for key, event in current_state.iteritems():
+ if (key not in previous_state or
+ previous_state[key].event_id != event.event_id):
+ state_delta[key] = event
return state_delta
- @defer.inlineCallbacks
- def check_joined_room(self, sync_config, room_id, state_delta):
- joined = False
- for event in state_delta:
- if (
- event.type == EventTypes.Member
- and event.state_key == sync_config.user.to_string()
- ):
- if event.content["membership"] == Membership.JOIN:
- joined = True
-
- if joined:
- res = yield self.state_handler.get_current_state(room_id)
- state_delta = res.values()
-
- defer.returnValue(state_delta)
+ def check_joined_room(self, sync_config, state_delta):
+ """
+ Check if the user has just joined the given room (so should
+ be given the full state)
+
+ :param sync_config:
+ :param dict[(str,str), synapse.events.FrozenEvent] state_delta: the
+ difference in state since the last sync
+
+ :returns A deferred Tuple (state_delta, limited)
+ """
+ join_event = state_delta.get((
+ EventTypes.Member, sync_config.user.to_string()), None)
+ if join_event is not None:
+ if join_event.content["membership"] == Membership.JOIN:
+ return True
+ return False
|