diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9ebfccc8bf..3b89582d79 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.streams.config import PaginationConfig
from synapse.api.constants import Membership, EventTypes
from synapse.util.async import concurrently_execute
from synapse.util.logcontext import LoggingContext
@@ -194,157 +193,7 @@ class SyncHandler(object):
Returns:
A Deferred SyncResult.
"""
- if since_token is None or full_state:
- return self.full_state_sync(sync_config, since_token)
- else:
- return self.incremental_sync_with_gap(sync_config, since_token)
-
- @defer.inlineCallbacks
- def full_state_sync(self, sync_config, timeline_since_token):
- """Get a sync for a client which is starting without any state.
-
- If a 'message_since_token' is given, only timeline events which have
- happened since that token will be returned.
-
- Returns:
- A Deferred SyncResult.
- """
- now_token = yield self.event_sources.get_current_token()
-
- now_token, ephemeral_by_room = yield self.ephemeral_by_room(
- sync_config, now_token
- )
-
- presence_stream = self.event_sources.sources["presence"]
- # TODO (mjark): This looks wrong, shouldn't we be getting the presence
- # UP to the present rather than after the present?
- pagination_config = PaginationConfig(from_token=now_token)
- presence, _ = yield presence_stream.get_pagination_rows(
- user=sync_config.user,
- pagination_config=pagination_config.get_source_config("presence"),
- key=None
- )
-
- membership_list = (
- Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN
- )
-
- room_list = yield self.store.get_rooms_for_user_where_membership_is(
- user_id=sync_config.user.to_string(),
- membership_list=membership_list
- )
-
- account_data, account_data_by_room = (
- yield self.store.get_account_data_for_user(
- sync_config.user.to_string()
- )
- )
-
- account_data['m.push_rules'] = yield self.push_rules_for_user(
- sync_config.user
- )
-
- tags_by_room = yield self.store.get_tags_for_user(
- sync_config.user.to_string()
- )
-
- ignored_users = account_data.get(
- "m.ignored_user_list", {}
- ).get("ignored_users", {}).keys()
-
- joined = []
- invited = []
- archived = []
-
- user_id = sync_config.user.to_string()
-
- @defer.inlineCallbacks
- def _generate_room_entry(event):
- if event.membership == Membership.JOIN:
- room_result = yield self.full_state_sync_for_joined_room(
- room_id=event.room_id,
- sync_config=sync_config,
- now_token=now_token,
- timeline_since_token=timeline_since_token,
- ephemeral_by_room=ephemeral_by_room,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- )
- joined.append(room_result)
- elif event.membership == Membership.INVITE:
- if event.sender in ignored_users:
- return
- invite = yield self.store.get_event(event.event_id)
- invited.append(InvitedSyncResult(
- room_id=event.room_id,
- invite=invite,
- ))
- elif event.membership in (Membership.LEAVE, Membership.BAN):
- # Always send down rooms we were banned or kicked from.
- if not sync_config.filter_collection.include_leave:
- if event.membership == Membership.LEAVE:
- if user_id == event.sender:
- return
-
- leave_token = now_token.copy_and_replace(
- "room_key", "s%d" % (event.stream_ordering,)
- )
- room_result = yield self.full_state_sync_for_archived_room(
- sync_config=sync_config,
- room_id=event.room_id,
- leave_event_id=event.event_id,
- leave_token=leave_token,
- timeline_since_token=timeline_since_token,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- )
- archived.append(room_result)
-
- yield concurrently_execute(_generate_room_entry, room_list, 10)
-
- account_data_for_user = sync_config.filter_collection.filter_account_data(
- self.account_data_for_user(account_data)
- )
-
- presence = sync_config.filter_collection.filter_presence(
- presence
- )
-
- defer.returnValue(SyncResult(
- presence=presence,
- account_data=account_data_for_user,
- joined=joined,
- invited=invited,
- archived=archived,
- next_batch=now_token,
- ))
-
- @defer.inlineCallbacks
- def full_state_sync_for_joined_room(self, room_id, sync_config,
- now_token, timeline_since_token,
- ephemeral_by_room, tags_by_room,
- account_data_by_room):
- """Sync a room for a client which is starting without any state
- Returns:
- A Deferred JoinedSyncResult.
- """
-
- batch = yield self.load_filtered_recents(
- room_id, sync_config, now_token, since_token=timeline_since_token
- )
-
- room_sync = yield self.incremental_sync_with_gap_for_room(
- room_id, sync_config,
- now_token=now_token,
- since_token=timeline_since_token,
- ephemeral_by_room=ephemeral_by_room,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- batch=batch,
- full_state=True,
- )
-
- defer.returnValue(room_sync)
+ return self.generate_sync_result(sync_config, since_token, full_state)
@defer.inlineCallbacks
def push_rules_for_user(self, user):
@@ -354,35 +203,6 @@ class SyncHandler(object):
rules = format_push_rules_for_user(user, rawrules, enabled_map)
defer.returnValue(rules)
- def account_data_for_user(self, account_data):
- account_data_events = []
-
- for account_data_type, content in account_data.items():
- account_data_events.append({
- "type": account_data_type,
- "content": content,
- })
-
- return account_data_events
-
- def account_data_for_room(self, room_id, tags_by_room, account_data_by_room):
- account_data_events = []
- tags = tags_by_room.get(room_id)
- if tags is not None:
- account_data_events.append({
- "type": "m.tag",
- "content": {"tags": tags},
- })
-
- account_data = account_data_by_room.get(room_id, {})
- for account_data_type, content in account_data.items():
- account_data_events.append({
- "type": account_data_type,
- "content": content,
- })
-
- return account_data_events
-
@defer.inlineCallbacks
def ephemeral_by_room(self, sync_config, now_token, since_token=None):
"""Get the ephemeral events for each room the user is in
@@ -445,258 +265,22 @@ class SyncHandler(object):
defer.returnValue((now_token, ephemeral_by_room))
- def full_state_sync_for_archived_room(self, room_id, sync_config,
- leave_event_id, leave_token,
- timeline_since_token, tags_by_room,
- account_data_by_room):
- """Sync a room for a client which is starting without any state
- Returns:
- A Deferred ArchivedSyncResult.
- """
-
- return self.incremental_sync_for_archived_room(
- sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room,
- account_data_by_room, full_state=True, leave_token=leave_token,
- )
-
- @defer.inlineCallbacks
- def incremental_sync_with_gap(self, sync_config, since_token):
- """ Get the incremental delta needed to bring the client up to
- date with the server.
- Returns:
- A Deferred SyncResult.
- """
- now_token = yield self.event_sources.get_current_token()
-
- rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
- room_ids = [room.room_id for room in rooms]
-
- presence_source = self.event_sources.sources["presence"]
- presence, presence_key = yield presence_source.get_new_events(
- user=sync_config.user,
- from_key=since_token.presence_key,
- limit=sync_config.filter_collection.presence_limit(),
- room_ids=room_ids,
- is_guest=sync_config.is_guest,
- )
- now_token = now_token.copy_and_replace("presence_key", presence_key)
-
- now_token, ephemeral_by_room = yield self.ephemeral_by_room(
- sync_config, now_token, since_token
- )
-
- app_service = yield self.store.get_app_service_by_user_id(
- sync_config.user.to_string()
- )
- if app_service:
- rooms = yield self.store.get_app_service_rooms(app_service)
- joined_room_ids = set(r.room_id for r in rooms)
- else:
- rooms = yield self.store.get_rooms_for_user(
- sync_config.user.to_string()
- )
- joined_room_ids = set(r.room_id for r in rooms)
-
- user_id = sync_config.user.to_string()
-
- timeline_limit = sync_config.filter_collection.timeline_limit()
-
- tags_by_room = yield self.store.get_updated_tags(
- user_id,
- since_token.account_data_key,
- )
-
- account_data, account_data_by_room = (
- yield self.store.get_updated_account_data_for_user(
- user_id,
- since_token.account_data_key,
- )
- )
-
- push_rules_changed = yield self.store.have_push_rules_changed_for_user(
- user_id, int(since_token.push_rules_key)
- )
-
- if push_rules_changed:
- account_data["m.push_rules"] = yield self.push_rules_for_user(
- sync_config.user
- )
-
- ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
- "m.ignored_user_list", user_id=user_id,
- )
-
- if ignored_account_data:
- ignored_users = ignored_account_data.get("ignored_users", {}).keys()
- else:
- ignored_users = frozenset()
-
- # Get a list of membership change events that have happened.
- rooms_changed = yield self.store.get_membership_changes_for_user(
- user_id, since_token.room_key, now_token.room_key
- )
-
- mem_change_events_by_room_id = {}
- for event in rooms_changed:
- mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
-
- newly_joined_rooms = []
- archived = []
- invited = []
- for room_id, events in mem_change_events_by_room_id.items():
- non_joins = [e for e in events if e.membership != Membership.JOIN]
- has_join = len(non_joins) != len(events)
-
- # We want to figure out if we joined the room at some point since
- # the last sync (even if we have since left). This is to make sure
- # we do send down the room, and with full state, where necessary
- if room_id in joined_room_ids or has_join:
- old_state = yield self.get_state_at(room_id, since_token)
- old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
- if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
- newly_joined_rooms.append(room_id)
-
- if room_id in joined_room_ids:
- continue
-
- if not non_joins:
- continue
-
- # Only bother if we're still currently invited
- should_invite = non_joins[-1].membership == Membership.INVITE
- if should_invite:
- if event.sender not in ignored_users:
- room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
- if room_sync:
- invited.append(room_sync)
-
- # Always include leave/ban events. Just take the last one.
- # TODO: How do we handle ban -> leave in same batch?
- leave_events = [
- e for e in non_joins
- if e.membership in (Membership.LEAVE, Membership.BAN)
- ]
-
- if leave_events:
- leave_event = leave_events[-1]
- room_sync = yield self.incremental_sync_for_archived_room(
- sync_config, room_id, leave_event.event_id, since_token,
- tags_by_room, account_data_by_room,
- full_state=room_id in newly_joined_rooms
- )
- if room_sync:
- archived.append(room_sync)
-
- # Get all events for rooms we're currently joined to.
- room_to_events = yield self.store.get_room_events_stream_for_rooms(
- room_ids=joined_room_ids,
- from_key=since_token.room_key,
- to_key=now_token.room_key,
- limit=timeline_limit + 1,
- )
-
- joined = []
- # We loop through all room ids, even if there are no new events, in case
- # there are non room events taht we need to notify about.
- for room_id in joined_room_ids:
- room_entry = room_to_events.get(room_id, None)
-
- if room_entry:
- events, start_key = room_entry
-
- prev_batch_token = now_token.copy_and_replace("room_key", start_key)
-
- newly_joined_room = room_id in newly_joined_rooms
- full_state = newly_joined_room
-
- batch = yield self.load_filtered_recents(
- room_id, sync_config, prev_batch_token,
- since_token=since_token,
- recents=events,
- newly_joined_room=newly_joined_room,
- )
- else:
- batch = TimelineBatch(
- events=[],
- prev_batch=since_token,
- limited=False,
- )
- full_state = False
-
- room_sync = yield self.incremental_sync_with_gap_for_room(
- room_id=room_id,
- sync_config=sync_config,
- since_token=since_token,
- now_token=now_token,
- ephemeral_by_room=ephemeral_by_room,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- batch=batch,
- full_state=full_state,
- )
- if room_sync:
- joined.append(room_sync)
-
- # For each newly joined room, we want to send down presence of
- # existing users.
- presence_handler = self.presence_handler
- extra_presence_users = set()
- for room_id in newly_joined_rooms:
- users = yield self.store.get_users_in_room(event.room_id)
- extra_presence_users.update(users)
-
- # For each new member, send down presence.
- for joined_sync in joined:
- it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values())
- for event in it:
- if event.type == EventTypes.Member:
- if event.membership == Membership.JOIN:
- extra_presence_users.add(event.state_key)
-
- states = yield presence_handler.get_states(
- [u for u in extra_presence_users if u != user_id],
- as_event=True,
- )
- presence.extend(states)
-
- account_data_for_user = sync_config.filter_collection.filter_account_data(
- self.account_data_for_user(account_data)
- )
-
- presence = sync_config.filter_collection.filter_presence(
- presence
- )
-
- defer.returnValue(SyncResult(
- presence=presence,
- account_data=account_data_for_user,
- joined=joined,
- invited=invited,
- archived=archived,
- next_batch=now_token,
- ))
-
@defer.inlineCallbacks
- def load_filtered_recents(self, room_id, sync_config, now_token,
- since_token=None, recents=None, newly_joined_room=False):
+ def _load_filtered_recents(self, room_id, sync_config, now_token,
+ since_token=None, recents=None, newly_joined_room=False):
"""
Returns:
a Deferred TimelineBatch
"""
with Measure(self.clock, "load_filtered_recents"):
- filtering_factor = 2
timeline_limit = sync_config.filter_collection.timeline_limit()
- load_limit = max(timeline_limit * filtering_factor, 10)
- max_repeat = 5 # Only try a few times per room, otherwise
- room_key = now_token.room_key
- end_key = room_key
if recents is None or newly_joined_room or timeline_limit < len(recents):
limited = True
else:
limited = False
- if recents is not None:
+ if recents:
recents = sync_config.filter_collection.filter_room_timeline(recents)
recents = yield filter_events_for_client(
self.store,
@@ -706,6 +290,19 @@ class SyncHandler(object):
else:
recents = []
+ if not limited:
+ defer.returnValue(TimelineBatch(
+ events=recents,
+ prev_batch=now_token,
+ limited=False
+ ))
+
+ filtering_factor = 2
+ load_limit = max(timeline_limit * filtering_factor, 10)
+ max_repeat = 5 # Only try a few times per room, otherwise
+ room_key = now_token.room_key
+ end_key = room_key
+
since_key = None
if since_token and not newly_joined_room:
since_key = since_token.room_key
@@ -749,103 +346,6 @@ class SyncHandler(object):
))
@defer.inlineCallbacks
- def incremental_sync_with_gap_for_room(self, room_id, sync_config,
- since_token, now_token,
- ephemeral_by_room, tags_by_room,
- account_data_by_room,
- batch, full_state=False):
- state = yield self.compute_state_delta(
- room_id, batch, sync_config, since_token, now_token,
- full_state=full_state
- )
-
- account_data = self.account_data_for_room(
- room_id, tags_by_room, account_data_by_room
- )
-
- account_data = sync_config.filter_collection.filter_room_account_data(
- account_data
- )
-
- ephemeral = sync_config.filter_collection.filter_room_ephemeral(
- ephemeral_by_room.get(room_id, [])
- )
-
- unread_notifications = {}
- room_sync = JoinedSyncResult(
- room_id=room_id,
- timeline=batch,
- state=state,
- ephemeral=ephemeral,
- account_data=account_data,
- unread_notifications=unread_notifications,
- )
-
- if room_sync:
- notifs = yield self.unread_notifs_for_room_id(
- room_id, sync_config
- )
-
- if notifs is not None:
- unread_notifications["notification_count"] = notifs["notify_count"]
- unread_notifications["highlight_count"] = notifs["highlight_count"]
-
- logger.debug("Room sync: %r", room_sync)
-
- defer.returnValue(room_sync)
-
- @defer.inlineCallbacks
- def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id,
- since_token, tags_by_room,
- account_data_by_room, full_state,
- leave_token=None):
- """ Get the incremental delta needed to bring the client up to date for
- the archived room.
- Returns:
- A Deferred ArchivedSyncResult
- """
-
- if not leave_token:
- stream_token = yield self.store.get_stream_token_for_event(
- leave_event_id
- )
-
- leave_token = since_token.copy_and_replace("room_key", stream_token)
-
- if since_token and since_token.is_after(leave_token):
- defer.returnValue(None)
-
- batch = yield self.load_filtered_recents(
- room_id, sync_config, leave_token, since_token,
- )
-
- logger.debug("Recents %r", batch)
-
- state_events_delta = yield self.compute_state_delta(
- room_id, batch, sync_config, since_token, leave_token,
- full_state=full_state
- )
-
- account_data = self.account_data_for_room(
- room_id, tags_by_room, account_data_by_room
- )
-
- account_data = sync_config.filter_collection.filter_room_account_data(
- account_data
- )
-
- room_sync = ArchivedSyncResult(
- room_id=room_id,
- timeline=batch,
- state=state_events_delta,
- account_data=account_data,
- )
-
- logger.debug("Room sync: %r", room_sync)
-
- defer.returnValue(room_sync)
-
- @defer.inlineCallbacks
def get_state_after_event(self, event):
"""
Get the room state after the given event
@@ -970,26 +470,6 @@ class SyncHandler(object):
for e in sync_config.filter_collection.filter_room_state(state.values())
})
- def check_joined_room(self, sync_config, state_delta):
- """
- Check if the user has just joined the given room (so should
- be given the full state)
-
- Args:
- sync_config(synapse.handlers.sync.SyncConfig):
- state_delta(dict[(str,str), synapse.events.FrozenEvent]): the
- difference in state since the last sync
-
- Returns:
- A deferred Tuple (state_delta, limited)
- """
- join_event = state_delta.get((
- EventTypes.Member, sync_config.user.to_string()), None)
- if join_event is not None:
- if join_event.content["membership"] == Membership.JOIN:
- return True
- return False
-
@defer.inlineCallbacks
def unread_notifs_for_room_id(self, room_id, sync_config):
with Measure(self.clock, "unread_notifs_for_room_id"):
@@ -1010,6 +490,548 @@ class SyncHandler(object):
# count is whatever it was last time.
defer.returnValue(None)
+ @defer.inlineCallbacks
+ def generate_sync_result(self, sync_config, since_token=None, full_state=False):
+ """Generates a sync result.
+
+ Args:
+ sync_config (SyncConfig)
+ since_token (StreamToken)
+ full_state (bool)
+
+ Returns:
+ Deferred(SyncResult)
+ """
+
+ # NB: The now_token gets changed by some of the generate_sync_* methods,
+ # this is due to some of the underlying streams not supporting the ability
+ # to query up to a given point.
+ # Always use the `now_token` in `SyncResultBuilder`
+ now_token = yield self.event_sources.get_current_token()
+
+ sync_result_builder = SyncResultBuilder(
+ sync_config, full_state,
+ since_token=since_token,
+ now_token=now_token,
+ )
+
+ account_data_by_room = yield self._generate_sync_entry_for_account_data(
+ sync_result_builder
+ )
+
+ res = yield self._generate_sync_entry_for_rooms(
+ sync_result_builder, account_data_by_room
+ )
+ newly_joined_rooms, newly_joined_users = res
+
+ yield self._generate_sync_entry_for_presence(
+ sync_result_builder, newly_joined_rooms, newly_joined_users
+ )
+
+ defer.returnValue(SyncResult(
+ presence=sync_result_builder.presence,
+ account_data=sync_result_builder.account_data,
+ joined=sync_result_builder.joined,
+ invited=sync_result_builder.invited,
+ archived=sync_result_builder.archived,
+ next_batch=sync_result_builder.now_token,
+ ))
+
+ @defer.inlineCallbacks
+ def _generate_sync_entry_for_account_data(self, sync_result_builder):
+ """Generates the account data portion of the sync response. Populates
+ `sync_result_builder` with the result.
+
+ Args:
+ sync_result_builder(SyncResultBuilder)
+
+ Returns:
+ Deferred(dict): A dictionary containing the per room account data.
+ """
+ sync_config = sync_result_builder.sync_config
+ user_id = sync_result_builder.sync_config.user.to_string()
+ since_token = sync_result_builder.since_token
+
+ if since_token and not sync_result_builder.full_state:
+ account_data, account_data_by_room = (
+ yield self.store.get_updated_account_data_for_user(
+ user_id,
+ since_token.account_data_key,
+ )
+ )
+
+ push_rules_changed = yield self.store.have_push_rules_changed_for_user(
+ user_id, int(since_token.push_rules_key)
+ )
+
+ if push_rules_changed:
+ account_data["m.push_rules"] = yield self.push_rules_for_user(
+ sync_config.user
+ )
+ else:
+ account_data, account_data_by_room = (
+ yield self.store.get_account_data_for_user(
+ sync_config.user.to_string()
+ )
+ )
+
+ account_data['m.push_rules'] = yield self.push_rules_for_user(
+ sync_config.user
+ )
+
+ account_data_for_user = sync_config.filter_collection.filter_account_data([
+ {"type": account_data_type, "content": content}
+ for account_data_type, content in account_data.items()
+ ])
+
+ sync_result_builder.account_data = account_data_for_user
+
+ defer.returnValue(account_data_by_room)
+
+ @defer.inlineCallbacks
+ def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms,
+ newly_joined_users):
+ """Generates the presence portion of the sync response. Populates the
+ `sync_result_builder` with the result.
+
+ Args:
+ sync_result_builder(SyncResultBuilder)
+ newly_joined_rooms(list): List of rooms that the user has joined
+ since the last sync (or empty if an initial sync)
+ newly_joined_users(list): List of users that have joined rooms
+ since the last sync (or empty if an initial sync)
+ """
+ now_token = sync_result_builder.now_token
+ sync_config = sync_result_builder.sync_config
+ user = sync_result_builder.sync_config.user
+
+ presence_source = self.event_sources.sources["presence"]
+
+ since_token = sync_result_builder.since_token
+ if since_token and not sync_result_builder.full_state:
+ presence_key = since_token.presence_key
+ include_offline = True
+ else:
+ presence_key = None
+ include_offline = False
+
+ presence, presence_key = yield presence_source.get_new_events(
+ user=user,
+ from_key=presence_key,
+ is_guest=sync_config.is_guest,
+ include_offline=include_offline,
+ )
+ sync_result_builder.now_token = now_token.copy_and_replace(
+ "presence_key", presence_key
+ )
+
+ extra_users_ids = set(newly_joined_users)
+ for room_id in newly_joined_rooms:
+ users = yield self.store.get_users_in_room(room_id)
+ extra_users_ids.update(users)
+ extra_users_ids.discard(user.to_string())
+
+ states = yield self.presence_handler.get_states(
+ extra_users_ids,
+ as_event=True,
+ )
+ presence.extend(states)
+
+ presence = sync_config.filter_collection.filter_presence(
+ presence
+ )
+
+ sync_result_builder.presence = presence
+
+ @defer.inlineCallbacks
+ def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room):
+ """Generates the rooms portion of the sync response. Populates the
+ `sync_result_builder` with the result.
+
+ Args:
+ sync_result_builder(SyncResultBuilder)
+ account_data_by_room(dict): Dictionary of per room account data
+
+ Returns:
+ Deferred(tuple): Returns a 2-tuple of
+ `(newly_joined_rooms, newly_joined_users)`
+ """
+ user_id = sync_result_builder.sync_config.user.to_string()
+
+ now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+ sync_result_builder.sync_config,
+ now_token=sync_result_builder.now_token,
+ since_token=sync_result_builder.since_token,
+ )
+ sync_result_builder.now_token = now_token
+
+ ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
+ "m.ignored_user_list", user_id=user_id,
+ )
+
+ if ignored_account_data:
+ ignored_users = ignored_account_data.get("ignored_users", {}).keys()
+ else:
+ ignored_users = frozenset()
+
+ if sync_result_builder.since_token:
+ res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
+ room_entries, invited, newly_joined_rooms = res
+
+ tags_by_room = yield self.store.get_updated_tags(
+ user_id,
+ sync_result_builder.since_token.account_data_key,
+ )
+ else:
+ res = yield self._get_all_rooms(sync_result_builder, ignored_users)
+ room_entries, invited, newly_joined_rooms = res
+
+ tags_by_room = yield self.store.get_tags_for_user(user_id)
+
+ def handle_room_entries(room_entry):
+ return self._generate_room_entry(
+ sync_result_builder,
+ ignored_users,
+ room_entry,
+ ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
+ tags=tags_by_room.get(room_entry.room_id),
+ account_data=account_data_by_room.get(room_entry.room_id, {}),
+ always_include=sync_result_builder.full_state,
+ )
+
+ yield concurrently_execute(handle_room_entries, room_entries, 10)
+
+ sync_result_builder.invited.extend(invited)
+
+ # Now we want to get any newly joined users
+ newly_joined_users = set()
+ if sync_result_builder.since_token:
+ for joined_sync in sync_result_builder.joined:
+ it = itertools.chain(
+ joined_sync.timeline.events, joined_sync.state.values()
+ )
+ for event in it:
+ if event.type == EventTypes.Member:
+ if event.membership == Membership.JOIN:
+ newly_joined_users.add(event.state_key)
+
+ defer.returnValue((newly_joined_rooms, newly_joined_users))
+
+ @defer.inlineCallbacks
+ def _get_rooms_changed(self, sync_result_builder, ignored_users):
+ """Gets the the changes that have happened since the last sync.
+
+ Args:
+ sync_result_builder(SyncResultBuilder)
+ ignored_users(set(str)): Set of users ignored by user.
+
+ Returns:
+ Deferred(tuple): Returns a tuple of the form:
+ `([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)`
+ """
+ user_id = sync_result_builder.sync_config.user.to_string()
+ since_token = sync_result_builder.since_token
+ now_token = sync_result_builder.now_token
+ sync_config = sync_result_builder.sync_config
+
+ assert since_token
+
+ app_service = yield self.store.get_app_service_by_user_id(user_id)
+ if app_service:
+ rooms = yield self.store.get_app_service_rooms(app_service)
+ joined_room_ids = set(r.room_id for r in rooms)
+ else:
+ rooms = yield self.store.get_rooms_for_user(user_id)
+ joined_room_ids = set(r.room_id for r in rooms)
+
+ # Get a list of membership change events that have happened.
+ rooms_changed = yield self.store.get_membership_changes_for_user(
+ user_id, since_token.room_key, now_token.room_key
+ )
+
+ mem_change_events_by_room_id = {}
+ for event in rooms_changed:
+ mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
+
+ newly_joined_rooms = []
+ room_entries = []
+ invited = []
+ for room_id, events in mem_change_events_by_room_id.items():
+ non_joins = [e for e in events if e.membership != Membership.JOIN]
+ has_join = len(non_joins) != len(events)
+
+ # We want to figure out if we joined the room at some point since
+ # the last sync (even if we have since left). This is to make sure
+ # we do send down the room, and with full state, where necessary
+ if room_id in joined_room_ids or has_join:
+ old_state = yield self.get_state_at(room_id, since_token)
+ old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
+ if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
+ newly_joined_rooms.append(room_id)
+
+ if room_id in joined_room_ids:
+ continue
+
+ if not non_joins:
+ continue
+
+ # Only bother if we're still currently invited
+ should_invite = non_joins[-1].membership == Membership.INVITE
+ if should_invite:
+ if event.sender not in ignored_users:
+ room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
+ if room_sync:
+ invited.append(room_sync)
+
+ # Always include leave/ban events. Just take the last one.
+ # TODO: How do we handle ban -> leave in same batch?
+ leave_events = [
+ e for e in non_joins
+ if e.membership in (Membership.LEAVE, Membership.BAN)
+ ]
+
+ if leave_events:
+ leave_event = leave_events[-1]
+ leave_stream_token = yield self.store.get_stream_token_for_event(
+ leave_event.event_id
+ )
+ leave_token = since_token.copy_and_replace(
+ "room_key", leave_stream_token
+ )
+
+ if since_token and since_token.is_after(leave_token):
+ continue
+
+ room_entries.append(RoomSyncResultBuilder(
+ room_id=room_id,
+ rtype="archived",
+ events=None,
+ newly_joined=room_id in newly_joined_rooms,
+ full_state=False,
+ since_token=since_token,
+ upto_token=leave_token,
+ ))
+
+ timeline_limit = sync_config.filter_collection.timeline_limit()
+
+ # Get all events for rooms we're currently joined to.
+ room_to_events = yield self.store.get_room_events_stream_for_rooms(
+ room_ids=joined_room_ids,
+ from_key=since_token.room_key,
+ to_key=now_token.room_key,
+ limit=timeline_limit + 1,
+ )
+
+ # We loop through all room ids, even if there are no new events, in case
+ # there are non room events taht we need to notify about.
+ for room_id in joined_room_ids:
+ room_entry = room_to_events.get(room_id, None)
+
+ if room_entry:
+ events, start_key = room_entry
+
+ prev_batch_token = now_token.copy_and_replace("room_key", start_key)
+
+ room_entries.append(RoomSyncResultBuilder(
+ room_id=room_id,
+ rtype="joined",
+ events=events,
+ newly_joined=room_id in newly_joined_rooms,
+ full_state=False,
+ since_token=None if room_id in newly_joined_rooms else since_token,
+ upto_token=prev_batch_token,
+ ))
+ else:
+ room_entries.append(RoomSyncResultBuilder(
+ room_id=room_id,
+ rtype="joined",
+ events=[],
+ newly_joined=room_id in newly_joined_rooms,
+ full_state=False,
+ since_token=since_token,
+ upto_token=since_token,
+ ))
+
+ defer.returnValue((room_entries, invited, newly_joined_rooms))
+
+ @defer.inlineCallbacks
+ def _get_all_rooms(self, sync_result_builder, ignored_users):
+ """Returns entries for all rooms for the user.
+
+ Args:
+ sync_result_builder(SyncResultBuilder)
+ ignored_users(set(str)): Set of users ignored by user.
+
+ Returns:
+ Deferred(tuple): Returns a tuple of the form:
+ `([RoomSyncResultBuilder], [InvitedSyncResult], [])`
+ """
+
+ user_id = sync_result_builder.sync_config.user.to_string()
+ since_token = sync_result_builder.since_token
+ now_token = sync_result_builder.now_token
+ sync_config = sync_result_builder.sync_config
+
+ membership_list = (
+ Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN
+ )
+
+ room_list = yield self.store.get_rooms_for_user_where_membership_is(
+ user_id=user_id,
+ membership_list=membership_list
+ )
+
+ room_entries = []
+ invited = []
+
+ for event in room_list:
+ if event.membership == Membership.JOIN:
+ room_entries.append(RoomSyncResultBuilder(
+ room_id=event.room_id,
+ rtype="joined",
+ events=None,
+ newly_joined=False,
+ full_state=True,
+ since_token=since_token,
+ upto_token=now_token,
+ ))
+ elif event.membership == Membership.INVITE:
+ if event.sender in ignored_users:
+ continue
+ invite = yield self.store.get_event(event.event_id)
+ invited.append(InvitedSyncResult(
+ room_id=event.room_id,
+ invite=invite,
+ ))
+ elif event.membership in (Membership.LEAVE, Membership.BAN):
+ # Always send down rooms we were banned or kicked from.
+ if not sync_config.filter_collection.include_leave:
+ if event.membership == Membership.LEAVE:
+ if user_id == event.sender:
+ continue
+
+ leave_token = now_token.copy_and_replace(
+ "room_key", "s%d" % (event.stream_ordering,)
+ )
+ room_entries.append(RoomSyncResultBuilder(
+ room_id=event.room_id,
+ rtype="archived",
+ events=None,
+ newly_joined=False,
+ full_state=True,
+ since_token=since_token,
+ upto_token=leave_token,
+ ))
+
+ defer.returnValue((room_entries, invited, []))
+
+ @defer.inlineCallbacks
+ def _generate_room_entry(self, sync_result_builder, ignored_users,
+ room_builder, ephemeral, tags, account_data,
+ always_include=False):
+ """Populates the `joined` and `archived` section of `sync_result_builder`
+ based on the `room_builder`.
+
+ Args:
+ sync_result_builder(SyncResultBuilder)
+ ignored_users(set(str)): Set of users ignored by user.
+ room_builder(RoomSyncResultBuilder)
+ ephemeral(list): List of new ephemeral events for room
+ tags(list): List of *all* tags for room, or None if there has been
+ no change.
+ account_data(list): List of new account data for room
+ always_include(bool): Always include this room in the sync response,
+ even if empty.
+ """
+ newly_joined = room_builder.newly_joined
+ full_state = (
+ room_builder.full_state
+ or newly_joined
+ or sync_result_builder.full_state
+ )
+ events = room_builder.events
+
+ # We want to shortcut out as early as possible.
+ if not (always_include or account_data or ephemeral or full_state):
+ if events == [] and tags is None:
+ return
+
+ since_token = sync_result_builder.since_token
+ now_token = sync_result_builder.now_token
+ sync_config = sync_result_builder.sync_config
+
+ room_id = room_builder.room_id
+ since_token = room_builder.since_token
+ upto_token = room_builder.upto_token
+
+ batch = yield self._load_filtered_recents(
+ room_id, sync_config,
+ now_token=upto_token,
+ since_token=since_token,
+ recents=events,
+ newly_joined_room=newly_joined,
+ )
+
+ account_data_events = []
+ if tags is not None:
+ account_data_events.append({
+ "type": "m.tag",
+ "content": {"tags": tags},
+ })
+
+ for account_data_type, content in account_data.items():
+ account_data_events.append({
+ "type": account_data_type,
+ "content": content,
+ })
+
+ account_data = sync_config.filter_collection.filter_room_account_data(
+ account_data_events
+ )
+
+ ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
+
+ if not (always_include or batch or account_data or ephemeral or full_state):
+ return
+
+ state = yield self.compute_state_delta(
+ room_id, batch, sync_config, since_token, now_token,
+ full_state=full_state
+ )
+
+ if room_builder.rtype == "joined":
+ unread_notifications = {}
+ room_sync = JoinedSyncResult(
+ room_id=room_id,
+ timeline=batch,
+ state=state,
+ ephemeral=ephemeral,
+ account_data=account_data_events,
+ unread_notifications=unread_notifications,
+ )
+
+ if room_sync or always_include:
+ notifs = yield self.unread_notifs_for_room_id(
+ room_id, sync_config
+ )
+
+ if notifs is not None:
+ unread_notifications["notification_count"] = notifs["notify_count"]
+ unread_notifications["highlight_count"] = notifs["highlight_count"]
+
+ sync_result_builder.joined.append(room_sync)
+ elif room_builder.rtype == "archived":
+ room_sync = ArchivedSyncResult(
+ room_id=room_id,
+ timeline=batch,
+ state=state,
+ account_data=account_data,
+ )
+ if room_sync or always_include:
+ sync_result_builder.archived.append(room_sync)
+ else:
+ raise Exception("Unrecognized rtype: %r", room_builder.rtype)
+
def _action_has_highlight(actions):
for action in actions:
@@ -1057,3 +1079,51 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
(e.type, e.state_key): e
for e in evs
}
+
+
+class SyncResultBuilder(object):
+ "Used to help build up a new SyncResult for a user"
+ def __init__(self, sync_config, full_state, since_token, now_token):
+ """
+ Args:
+ sync_config(SyncConfig)
+ full_state(bool): The full_state flag as specified by user
+ since_token(StreamToken): The token supplied by user, or None.
+ now_token(StreamToken): The token to sync up to.
+ """
+ self.sync_config = sync_config
+ self.full_state = full_state
+ self.since_token = since_token
+ self.now_token = now_token
+
+ self.presence = []
+ self.account_data = []
+ self.joined = []
+ self.invited = []
+ self.archived = []
+
+
+class RoomSyncResultBuilder(object):
+ """Stores information needed to create either a `JoinedSyncResult` or
+ `ArchivedSyncResult`.
+ """
+ def __init__(self, room_id, rtype, events, newly_joined, full_state,
+ since_token, upto_token):
+ """
+ Args:
+ room_id(str)
+ rtype(str): One of `"joined"` or `"archived"`
+ events(list): List of events to include in the room, (more events
+ may be added when generating result).
+ newly_joined(bool): If the user has newly joined the room
+ full_state(bool): Whether the full state should be sent in result
+ since_token(StreamToken): Earliest point to return events from, or None
+ upto_token(StreamToken): Latest point to return events from.
+ """
+ self.room_id = room_id
+ self.rtype = rtype
+ self.events = events
+ self.newly_joined = newly_joined
+ self.full_state = full_state
+ self.since_token = since_token
+ self.upto_token = upto_token
|