diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index feea407ea2..1d0f0058a2 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
+# Copyright 2015 - 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,22 +15,25 @@
from ._base import BaseHandler
+from synapse.streams.config import PaginationConfig
from synapse.api.constants import Membership, EventTypes
-from synapse.api.errors import GuestAccessError
from synapse.util import unwrapFirstError
+from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.metrics import Measure
from twisted.internet import defer
import collections
import logging
+import itertools
logger = logging.getLogger(__name__)
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
+ "filter_collection",
"is_guest",
- "filter",
])
@@ -54,6 +57,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"state", # dict[(str, str), FrozenEvent]
"ephemeral",
"account_data",
+ "unread_notifications",
])):
__slots__ = []
@@ -66,10 +70,12 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
or self.state
or self.ephemeral
or self.account_data
+ # nb the notification count does not, er, count: if there's nothing
+ # else in the result, we don't need to send it.
)
-class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [
+class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
"room_id", # str
"timeline", # TimelineBatch
"state", # dict[(str, str), FrozenEvent]
@@ -115,11 +121,9 @@ class SyncResult(collections.namedtuple("SyncResult", [
events.
"""
return bool(
- self.presence or self.joined or self.invited
+ self.presence or self.joined or self.invited or self.archived
)
-GuestRoom = collections.namedtuple("GuestRoom", ("room_id", "membership"))
-
class SyncHandler(BaseHandler):
@@ -138,45 +142,32 @@ class SyncHandler(BaseHandler):
A Deferred SyncResult.
"""
- if sync_config.is_guest:
- bad_rooms = []
- for room_id in sync_config.filter.list_rooms():
- world_readable = yield self._is_world_readable(room_id)
- if not world_readable:
- bad_rooms.append(room_id)
-
- if bad_rooms:
- raise GuestAccessError(
- bad_rooms, 403, "Guest access not allowed"
- )
+ context = LoggingContext.current_context()
+ if context:
+ if since_token is None:
+ context.tag = "initial_sync"
+ elif full_state:
+ context.tag = "full_state_sync"
+ else:
+ context.tag = "incremental_sync"
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
- result = yield self.current_sync_for_user(sync_config, since_token,
- full_state=full_state)
+ result = yield self.current_sync_for_user(
+ sync_config, since_token, full_state=full_state,
+ )
defer.returnValue(result)
else:
def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, since_token)
result = yield self.notifier.wait_for_events(
- sync_config.user, timeout, current_sync_callback,
- from_token=since_token
+ sync_config.user.to_string(), timeout, current_sync_callback,
+ from_token=since_token,
)
defer.returnValue(result)
- @defer.inlineCallbacks
- def _is_world_readable(self, room_id):
- state = yield self.hs.get_state_handler().get_current_state(
- room_id,
- EventTypes.RoomHistoryVisibility
- )
- if state and "history_visibility" in state.content:
- defer.returnValue(state.content["history_visibility"] == "world_readable")
- else:
- defer.returnValue(False)
-
def current_sync_for_user(self, sync_config, since_token=None,
full_state=False):
"""Get the sync for client needed to match what the server has now.
@@ -200,100 +191,100 @@ class SyncHandler(BaseHandler):
"""
now_token = yield self.event_sources.get_current_token()
- if sync_config.is_guest:
- room_list = [
- GuestRoom(room_id, Membership.JOIN)
- for room_id in sync_config.filter.list_rooms()
- ]
-
- account_data = {}
- account_data_by_room = {}
- tags_by_room = {}
+ now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+ sync_config, now_token
+ )
- else:
- membership_list = (Membership.INVITE, Membership.JOIN)
- if sync_config.filter.include_leave:
- membership_list += (Membership.LEAVE, Membership.BAN)
+ presence_stream = self.event_sources.sources["presence"]
+ # TODO (mjark): This looks wrong, shouldn't we be getting the presence
+ # UP to the present rather than after the present?
+ pagination_config = PaginationConfig(from_token=now_token)
+ presence, _ = yield presence_stream.get_pagination_rows(
+ user=sync_config.user,
+ pagination_config=pagination_config.get_source_config("presence"),
+ key=None
+ )
- room_list = yield self.store.get_rooms_for_user_where_membership_is(
- user_id=sync_config.user.to_string(),
- membership_list=membership_list
- )
+ membership_list = (Membership.INVITE, Membership.JOIN)
+ if sync_config.filter_collection.include_leave:
+ membership_list += (Membership.LEAVE, Membership.BAN)
- account_data, account_data_by_room = (
- yield self.store.get_account_data_for_user(
- sync_config.user.to_string()
- )
- )
+ room_list = yield self.store.get_rooms_for_user_where_membership_is(
+ user_id=sync_config.user.to_string(),
+ membership_list=membership_list
+ )
- tags_by_room = yield self.store.get_tags_for_user(
+ account_data, account_data_by_room = (
+ yield self.store.get_account_data_for_user(
sync_config.user.to_string()
)
-
- presence_stream = self.event_sources.sources["presence"]
-
- joined_room_ids = [
- room.room_id for room in room_list
- if room.membership == Membership.JOIN
- ]
-
- presence, _ = yield presence_stream.get_new_events(
- from_key=0,
- user=sync_config.user,
- room_ids=joined_room_ids,
- is_guest=sync_config.is_guest,
)
- now_token, ephemeral_by_room = yield self.ephemeral_by_room(
- sync_config, now_token, joined_room_ids
+ tags_by_room = yield self.store.get_tags_for_user(
+ sync_config.user.to_string()
)
joined = []
invited = []
archived = []
deferreds = []
- for event in room_list:
- if event.membership == Membership.JOIN:
- room_sync_deferred = self.full_state_sync_for_joined_room(
- room_id=event.room_id,
- sync_config=sync_config,
- now_token=now_token,
- timeline_since_token=timeline_since_token,
- ephemeral_by_room=ephemeral_by_room,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- )
- room_sync_deferred.addCallback(joined.append)
- deferreds.append(room_sync_deferred)
- elif event.membership == Membership.INVITE:
- invite = yield self.store.get_event(event.event_id)
- invited.append(InvitedSyncResult(
- room_id=event.room_id,
- invite=invite,
- ))
- elif event.membership in (Membership.LEAVE, Membership.BAN):
- leave_token = now_token.copy_and_replace(
- "room_key", "s%d" % (event.stream_ordering,)
- )
- room_sync_deferred = self.full_state_sync_for_archived_room(
- sync_config=sync_config,
- room_id=event.room_id,
- leave_event_id=event.event_id,
- leave_token=leave_token,
- timeline_since_token=timeline_since_token,
- tags_by_room=tags_by_room,
- account_data_by_room=account_data_by_room,
- )
- room_sync_deferred.addCallback(archived.append)
- deferreds.append(room_sync_deferred)
- yield defer.gatherResults(
- deferreds, consumeErrors=True
- ).addErrback(unwrapFirstError)
+ room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)]
+ for room_list_chunk in room_list_chunks:
+ for event in room_list_chunk:
+ if event.membership == Membership.JOIN:
+ room_sync_deferred = preserve_fn(
+ self.full_state_sync_for_joined_room
+ )(
+ room_id=event.room_id,
+ sync_config=sync_config,
+ now_token=now_token,
+ timeline_since_token=timeline_since_token,
+ ephemeral_by_room=ephemeral_by_room,
+ tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
+ )
+ room_sync_deferred.addCallback(joined.append)
+ deferreds.append(room_sync_deferred)
+ elif event.membership == Membership.INVITE:
+ invite = yield self.store.get_event(event.event_id)
+ invited.append(InvitedSyncResult(
+ room_id=event.room_id,
+ invite=invite,
+ ))
+ elif event.membership in (Membership.LEAVE, Membership.BAN):
+ leave_token = now_token.copy_and_replace(
+ "room_key", "s%d" % (event.stream_ordering,)
+ )
+ room_sync_deferred = preserve_fn(
+ self.full_state_sync_for_archived_room
+ )(
+ sync_config=sync_config,
+ room_id=event.room_id,
+ leave_event_id=event.event_id,
+ leave_token=leave_token,
+ timeline_since_token=timeline_since_token,
+ tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
+ )
+ room_sync_deferred.addCallback(archived.append)
+ deferreds.append(room_sync_deferred)
+
+ yield defer.gatherResults(
+ deferreds, consumeErrors=True
+ ).addErrback(unwrapFirstError)
+
+ account_data_for_user = sync_config.filter_collection.filter_account_data(
+ self.account_data_for_user(account_data)
+ )
+
+ presence = sync_config.filter_collection.filter_presence(
+ presence
+ )
defer.returnValue(SyncResult(
presence=presence,
- account_data=self.account_data_for_user(account_data),
+ account_data=account_data_for_user,
joined=joined,
invited=invited,
archived=archived,
@@ -314,17 +305,18 @@ class SyncHandler(BaseHandler):
room_id, sync_config, now_token, since_token=timeline_since_token
)
- current_state = yield self.get_state_at(room_id, now_token)
+ room_sync = yield self.incremental_sync_with_gap_for_room(
+ room_id, sync_config,
+ now_token=now_token,
+ since_token=timeline_since_token,
+ ephemeral_by_room=ephemeral_by_room,
+ tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
+ batch=batch,
+ full_state=True,
+ )
- defer.returnValue(JoinedSyncResult(
- room_id=room_id,
- timeline=batch,
- state=current_state,
- ephemeral=ephemeral_by_room.get(room_id, []),
- account_data=self.account_data_for_room(
- room_id, tags_by_room, account_data_by_room
- ),
- ))
+ defer.returnValue(room_sync)
def account_data_for_user(self, account_data):
account_data_events = []
@@ -356,13 +348,11 @@ class SyncHandler(BaseHandler):
return account_data_events
@defer.inlineCallbacks
- def ephemeral_by_room(self, sync_config, now_token, room_ids,
- since_token=None):
+ def ephemeral_by_room(self, sync_config, now_token, since_token=None):
"""Get the ephemeral events for each room the user is in
Args:
sync_config (SyncConfig): The flags, filters and user for the sync.
now_token (StreamToken): Where the server is currently up to.
- room_ids (list): List of room id strings to get data for.
since_token (StreamToken): Where the server was when the client
last synced.
Returns:
@@ -371,76 +361,68 @@ class SyncHandler(BaseHandler):
typing events for that room.
"""
- typing_key = since_token.typing_key if since_token else "0"
-
- typing_source = self.event_sources.sources["typing"]
- typing, typing_key = yield typing_source.get_new_events(
- user=sync_config.user,
- from_key=typing_key,
- limit=sync_config.filter.ephemeral_limit(),
- room_ids=room_ids,
- is_guest=False,
- )
- now_token = now_token.copy_and_replace("typing_key", typing_key)
-
- ephemeral_by_room = {}
+ with Measure(self.clock, "ephemeral_by_room"):
+ typing_key = since_token.typing_key if since_token else "0"
- for event in typing:
- # we want to exclude the room_id from the event, but modifying the
- # result returned by the event source is poor form (it might cache
- # the object)
- room_id = event["room_id"]
- event_copy = {k: v for (k, v) in event.iteritems()
- if k != "room_id"}
- ephemeral_by_room.setdefault(room_id, []).append(event_copy)
-
- receipt_key = since_token.receipt_key if since_token else "0"
+ rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+ room_ids = [room.room_id for room in rooms]
- receipt_source = self.event_sources.sources["receipt"]
- receipts, receipt_key = yield receipt_source.get_new_events(
- user=sync_config.user,
- from_key=receipt_key,
- limit=sync_config.filter.ephemeral_limit(),
- room_ids=room_ids,
- # /sync doesn't support guest access, they can't get to this point in code
- is_guest=False,
- )
- now_token = now_token.copy_and_replace("receipt_key", receipt_key)
+ typing_source = self.event_sources.sources["typing"]
+ typing, typing_key = yield typing_source.get_new_events(
+ user=sync_config.user,
+ from_key=typing_key,
+ limit=sync_config.filter_collection.ephemeral_limit(),
+ room_ids=room_ids,
+ is_guest=sync_config.is_guest,
+ )
+ now_token = now_token.copy_and_replace("typing_key", typing_key)
+
+ ephemeral_by_room = {}
+
+ for event in typing:
+ # we want to exclude the room_id from the event, but modifying the
+ # result returned by the event source is poor form (it might cache
+ # the object)
+ room_id = event["room_id"]
+ event_copy = {k: v for (k, v) in event.iteritems()
+ if k != "room_id"}
+ ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+
+ receipt_key = since_token.receipt_key if since_token else "0"
+
+ receipt_source = self.event_sources.sources["receipt"]
+ receipts, receipt_key = yield receipt_source.get_new_events(
+ user=sync_config.user,
+ from_key=receipt_key,
+ limit=sync_config.filter_collection.ephemeral_limit(),
+ room_ids=room_ids,
+ is_guest=sync_config.is_guest,
+ )
+ now_token = now_token.copy_and_replace("receipt_key", receipt_key)
- for event in receipts:
- room_id = event["room_id"]
- # exclude room id, as above
- event_copy = {k: v for (k, v) in event.iteritems()
- if k != "room_id"}
- ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+ for event in receipts:
+ room_id = event["room_id"]
+ # exclude room id, as above
+ event_copy = {k: v for (k, v) in event.iteritems()
+ if k != "room_id"}
+ ephemeral_by_room.setdefault(room_id, []).append(event_copy)
defer.returnValue((now_token, ephemeral_by_room))
- @defer.inlineCallbacks
def full_state_sync_for_archived_room(self, room_id, sync_config,
leave_event_id, leave_token,
timeline_since_token, tags_by_room,
account_data_by_room):
"""Sync a room for a client which is starting without any state
Returns:
- A Deferred JoinedSyncResult.
+ A Deferred ArchivedSyncResult.
"""
- batch = yield self.load_filtered_recents(
- room_id, sync_config, leave_token, since_token=timeline_since_token
+ return self.incremental_sync_for_archived_room(
+ sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room,
+ account_data_by_room, full_state=True, leave_token=leave_token,
)
- leave_state = yield self.store.get_state_for_event(leave_event_id)
-
- defer.returnValue(ArchivedSyncResult(
- room_id=room_id,
- timeline=batch,
- state=leave_state,
- account_data=self.account_data_for_room(
- room_id, tags_by_room, account_data_by_room
- ),
- ))
-
@defer.inlineCallbacks
def incremental_sync_with_gap(self, sync_config, since_token):
""" Get the incremental delta needed to bring the client up to
@@ -450,49 +432,23 @@ class SyncHandler(BaseHandler):
"""
now_token = yield self.event_sources.get_current_token()
- if sync_config.is_guest:
- room_ids = sync_config.filter.list_rooms()
-
- tags_by_room = {}
- account_data = {}
- account_data_by_room = {}
-
- else:
- rooms = yield self.store.get_rooms_for_user(
- sync_config.user.to_string()
- )
- room_ids = [room.room_id for room in rooms]
-
- now_token, ephemeral_by_room = yield self.ephemeral_by_room(
- sync_config, now_token, since_token
- )
-
- tags_by_room = yield self.store.get_updated_tags(
- sync_config.user.to_string(),
- since_token.account_data_key,
- )
-
- account_data, account_data_by_room = (
- yield self.store.get_updated_account_data_for_user(
- sync_config.user.to_string(),
- since_token.account_data_key,
- )
- )
-
- now_token, ephemeral_by_room = yield self.ephemeral_by_room(
- sync_config, now_token, room_ids, since_token
- )
+ rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+ room_ids = [room.room_id for room in rooms]
presence_source = self.event_sources.sources["presence"]
presence, presence_key = yield presence_source.get_new_events(
user=sync_config.user,
from_key=since_token.presence_key,
- limit=sync_config.filter.presence_limit(),
+ limit=sync_config.filter_collection.presence_limit(),
room_ids=room_ids,
is_guest=sync_config.is_guest,
)
now_token = now_token.copy_and_replace("presence_key", presence_key)
+ now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+ sync_config, now_token, since_token
+ )
+
rm_handler = self.hs.get_handlers().room_member_handler
app_service = yield self.store.get_app_service_by_user_id(
sync_config.user.to_string()
@@ -505,114 +461,138 @@ class SyncHandler(BaseHandler):
sync_config.user
)
- timeline_limit = sync_config.filter.timeline_limit()
+ user_id = sync_config.user.to_string()
- room_events, _ = yield self.store.get_room_events_stream(
- sync_config.user.to_string(),
- from_key=since_token.room_key,
- to_key=now_token.room_key,
- limit=timeline_limit + 1,
- room_ids=room_ids if sync_config.is_guest else (),
- is_guest=sync_config.is_guest,
+ timeline_limit = sync_config.filter_collection.timeline_limit()
+
+ tags_by_room = yield self.store.get_updated_tags(
+ user_id,
+ since_token.account_data_key,
)
- joined = []
+ account_data, account_data_by_room = (
+ yield self.store.get_updated_account_data_for_user(
+ user_id,
+ since_token.account_data_key,
+ )
+ )
+
+ # Get a list of membership change events that have happened.
+ rooms_changed = yield self.store.get_membership_changes_for_user(
+ user_id, since_token.room_key, now_token.room_key
+ )
+
+ mem_change_events_by_room_id = {}
+ for event in rooms_changed:
+ mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
+
+ newly_joined_rooms = []
archived = []
- if len(room_events) <= timeline_limit:
- # There is no gap in any of the rooms. Therefore we can just
- # partition the new events by room and return them.
- logger.debug("Got %i events for incremental sync - not limited",
- len(room_events))
-
- invite_events = []
- leave_events = []
- events_by_room_id = {}
- for event in room_events:
- events_by_room_id.setdefault(event.room_id, []).append(event)
- if event.room_id not in joined_room_ids:
- if (event.type == EventTypes.Member
- and event.state_key == sync_config.user.to_string()):
- if event.membership == Membership.INVITE:
- invite_events.append(event)
- elif event.membership in (Membership.LEAVE, Membership.BAN):
- leave_events.append(event)
-
- for room_id in joined_room_ids:
- recents = events_by_room_id.get(room_id, [])
- logger.debug("Events for room %s: %r", room_id, recents)
- state = {
- (event.type, event.state_key): event
- for event in recents if event.is_state()}
- limited = False
+ invited = []
+ for room_id, events in mem_change_events_by_room_id.items():
+ non_joins = [e for e in events if e.membership != Membership.JOIN]
+ has_join = len(non_joins) != len(events)
+
+ # We want to figure out if we joined the room at some point since
+ # the last sync (even if we have since left). This is to make sure
+ # we do send down the room, and with full state, where necessary
+ if room_id in joined_room_ids or has_join:
+ old_state = yield self.get_state_at(room_id, since_token)
+ old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
+ if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
+ newly_joined_rooms.append(room_id)
+
+ if room_id in joined_room_ids:
+ continue
+
+ if not non_joins:
+ continue
+
+ # Only bother if we're still currently invited
+ should_invite = non_joins[-1].membership == Membership.INVITE
+ if should_invite:
+ room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
+ if room_sync:
+ invited.append(room_sync)
- if recents:
- prev_batch = now_token.copy_and_replace(
- "room_key", recents[0].internal_metadata.before
- )
- else:
- prev_batch = now_token
-
- just_joined = yield self.check_joined_room(sync_config, state)
- if just_joined:
- logger.debug("User has just joined %s: needs full state",
- room_id)
- state = yield self.get_state_at(room_id, now_token)
- # the timeline is inherently limited if we've just joined
- limited = True
-
- room_sync = JoinedSyncResult(
- room_id=room_id,
- timeline=TimelineBatch(
- events=recents,
- prev_batch=prev_batch,
- limited=limited,
- ),
- state=state,
- ephemeral=ephemeral_by_room.get(room_id, []),
- account_data=self.account_data_for_room(
- room_id, tags_by_room, account_data_by_room
- ),
- )
- logger.debug("Result for room %s: %r", room_id, room_sync)
+ # Always include leave/ban events. Just take the last one.
+ # TODO: How do we handle ban -> leave in same batch?
+ leave_events = [
+ e for e in non_joins
+ if e.membership in (Membership.LEAVE, Membership.BAN)
+ ]
+ if leave_events:
+ leave_event = leave_events[-1]
+ room_sync = yield self.incremental_sync_for_archived_room(
+ sync_config, room_id, leave_event.event_id, since_token,
+ tags_by_room, account_data_by_room,
+ full_state=room_id in newly_joined_rooms
+ )
if room_sync:
- joined.append(room_sync)
+ archived.append(room_sync)
- else:
- logger.debug("Got %i events for incremental sync - hit limit",
- len(room_events))
+ # Get all events for rooms we're currently joined to.
+ room_to_events = yield self.store.get_room_events_stream_for_rooms(
+ room_ids=joined_room_ids,
+ from_key=since_token.room_key,
+ to_key=now_token.room_key,
+ limit=timeline_limit + 1,
+ )
- invite_events = yield self.store.get_invites_for_user(
- sync_config.user.to_string()
- )
+ joined = []
+ # We loop through all room ids, even if there are no new events, in case
+ # there are non room events taht we need to notify about.
+ for room_id in joined_room_ids:
+ room_entry = room_to_events.get(room_id, None)
- leave_events = yield self.store.get_leave_and_ban_events_for_user(
- sync_config.user.to_string()
- )
+ if room_entry:
+ events, start_key = room_entry
- for room_id in joined_room_ids:
- room_sync = yield self.incremental_sync_with_gap_for_room(
- room_id, sync_config, since_token, now_token,
- ephemeral_by_room, tags_by_room, account_data_by_room
- )
- if room_sync:
- joined.append(room_sync)
+ prev_batch_token = now_token.copy_and_replace("room_key", start_key)
+
+ newly_joined_room = room_id in newly_joined_rooms
+ full_state = newly_joined_room
- for leave_event in leave_events:
- room_sync = yield self.incremental_sync_for_archived_room(
- sync_config, leave_event, since_token, tags_by_room,
- account_data_by_room
+ batch = yield self.load_filtered_recents(
+ room_id, sync_config, prev_batch_token,
+ since_token=since_token,
+ recents=events,
+ newly_joined_room=newly_joined_room,
+ )
+ else:
+ batch = TimelineBatch(
+ events=[],
+ prev_batch=since_token,
+ limited=False,
+ )
+ full_state = False
+
+ room_sync = yield self.incremental_sync_with_gap_for_room(
+ room_id=room_id,
+ sync_config=sync_config,
+ since_token=since_token,
+ now_token=now_token,
+ ephemeral_by_room=ephemeral_by_room,
+ tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
+ batch=batch,
+ full_state=full_state,
)
- archived.append(room_sync)
+ if room_sync:
+ joined.append(room_sync)
+
+ account_data_for_user = sync_config.filter_collection.filter_account_data(
+ self.account_data_for_user(account_data)
+ )
- invited = [
- InvitedSyncResult(room_id=event.room_id, invite=event)
- for event in invite_events
- ]
+ presence = sync_config.filter_collection.filter_presence(
+ presence
+ )
defer.returnValue(SyncResult(
presence=presence,
- account_data=self.account_data_for_user(account_data),
+ account_data=account_data_for_user,
joined=joined,
invited=invited,
archived=archived,
@@ -621,152 +601,169 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks
def load_filtered_recents(self, room_id, sync_config, now_token,
- since_token=None):
+ since_token=None, recents=None, newly_joined_room=False):
"""
:returns a Deferred TimelineBatch
"""
- limited = True
- recents = []
- filtering_factor = 2
- timeline_limit = sync_config.filter.timeline_limit()
- load_limit = max(timeline_limit * filtering_factor, 100)
- max_repeat = 3 # Only try a few times per room, otherwise
- room_key = now_token.room_key
- end_key = room_key
-
- while limited and len(recents) < timeline_limit and max_repeat:
- events, keys = yield self.store.get_recent_events_for_room(
- room_id,
- limit=load_limit + 1,
- from_token=since_token.room_key if since_token else None,
- end_token=end_key,
- )
- (room_key, _) = keys
- end_key = "s" + room_key.split('-')[-1]
- loaded_recents = sync_config.filter.filter_room_timeline(events)
- loaded_recents = yield self._filter_events_for_client(
- sync_config.user.to_string(),
- loaded_recents,
- is_guest=sync_config.is_guest,
- require_all_visible_for_guests=False
- )
- loaded_recents.extend(recents)
- recents = loaded_recents
- if len(events) <= load_limit:
+ with Measure(self.clock, "load_filtered_recents"):
+ filtering_factor = 2
+ timeline_limit = sync_config.filter_collection.timeline_limit()
+ load_limit = max(timeline_limit * filtering_factor, 10)
+ max_repeat = 5 # Only try a few times per room, otherwise
+ room_key = now_token.room_key
+ end_key = room_key
+
+ if recents is None or newly_joined_room or timeline_limit < len(recents):
+ limited = True
+ else:
limited = False
- max_repeat -= 1
- if len(recents) > timeline_limit:
- limited = True
- recents = recents[-timeline_limit:]
- room_key = recents[0].internal_metadata.before
+ if recents is not None:
+ recents = sync_config.filter_collection.filter_room_timeline(recents)
+ recents = yield self._filter_events_for_client(
+ sync_config.user.to_string(),
+ recents,
+ is_peeking=sync_config.is_guest,
+ )
+ else:
+ recents = []
+
+ since_key = None
+ if since_token and not newly_joined_room:
+ since_key = since_token.room_key
+
+ while limited and len(recents) < timeline_limit and max_repeat:
+ events, end_key = yield self.store.get_room_events_stream_for_room(
+ room_id,
+ limit=load_limit + 1,
+ from_key=since_key,
+ to_key=end_key,
+ )
+ loaded_recents = sync_config.filter_collection.filter_room_timeline(
+ events
+ )
+ loaded_recents = yield self._filter_events_for_client(
+ sync_config.user.to_string(),
+ loaded_recents,
+ is_peeking=sync_config.is_guest,
+ )
+ loaded_recents.extend(recents)
+ recents = loaded_recents
+
+ if len(events) <= load_limit:
+ limited = False
+ break
+ max_repeat -= 1
- prev_batch_token = now_token.copy_and_replace(
- "room_key", room_key
- )
+ if len(recents) > timeline_limit:
+ limited = True
+ recents = recents[-timeline_limit:]
+ room_key = recents[0].internal_metadata.before
+
+ prev_batch_token = now_token.copy_and_replace(
+ "room_key", room_key
+ )
defer.returnValue(TimelineBatch(
- events=recents, prev_batch=prev_batch_token, limited=limited
+ events=recents,
+ prev_batch=prev_batch_token,
+ limited=limited or newly_joined_room
))
@defer.inlineCallbacks
def incremental_sync_with_gap_for_room(self, room_id, sync_config,
since_token, now_token,
ephemeral_by_room, tags_by_room,
- account_data_by_room):
- """ Get the incremental delta needed to bring the client up to date for
- the room. Gives the client the most recent events and the changes to
- state.
- Returns:
- A Deferred JoinedSyncResult
- """
- logger.debug("Doing incremental sync for room %s between %s and %s",
- room_id, since_token, now_token)
-
- # TODO(mjark): Check for redactions we might have missed.
-
- batch = yield self.load_filtered_recents(
- room_id, sync_config, now_token, since_token,
+ account_data_by_room,
+ batch, full_state=False):
+ state = yield self.compute_state_delta(
+ room_id, batch, sync_config, since_token, now_token,
+ full_state=full_state
)
- logging.debug("Recents %r", batch)
-
- current_state = yield self.get_state_at(room_id, now_token)
-
- state_at_previous_sync = yield self.get_state_at(
- room_id, stream_position=since_token
+ account_data = self.account_data_for_room(
+ room_id, tags_by_room, account_data_by_room
)
- state = yield self.compute_state_delta(
- since_token=since_token,
- previous_state=state_at_previous_sync,
- current_state=current_state,
+ account_data = sync_config.filter_collection.filter_room_account_data(
+ account_data
)
- just_joined = yield self.check_joined_room(sync_config, state)
- if just_joined:
- state = yield self.get_state_at(room_id, now_token)
+ ephemeral = sync_config.filter_collection.filter_room_ephemeral(
+ ephemeral_by_room.get(room_id, [])
+ )
+ unread_notifications = {}
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
state=state,
- ephemeral=ephemeral_by_room.get(room_id, []),
- account_data=self.account_data_for_room(
- room_id, tags_by_room, account_data_by_room
- ),
+ ephemeral=ephemeral,
+ account_data=account_data,
+ unread_notifications=unread_notifications,
)
- logging.debug("Room sync: %r", room_sync)
+ if room_sync:
+ notifs = yield self.unread_notifs_for_room_id(
+ room_id, sync_config
+ )
+
+ if notifs is not None:
+ unread_notifications["notification_count"] = notifs["notify_count"]
+ unread_notifications["highlight_count"] = notifs["highlight_count"]
+
+ logger.debug("Room sync: %r", room_sync)
defer.returnValue(room_sync)
@defer.inlineCallbacks
- def incremental_sync_for_archived_room(self, sync_config, leave_event,
+ def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id,
since_token, tags_by_room,
- account_data_by_room):
+ account_data_by_room, full_state,
+ leave_token=None):
""" Get the incremental delta needed to bring the client up to date for
the archived room.
Returns:
A Deferred ArchivedSyncResult
"""
- stream_token = yield self.store.get_stream_token_for_event(
- leave_event.event_id
- )
+ if not leave_token:
+ stream_token = yield self.store.get_stream_token_for_event(
+ leave_event_id
+ )
- leave_token = since_token.copy_and_replace("room_key", stream_token)
+ leave_token = since_token.copy_and_replace("room_key", stream_token)
+
+ if since_token and since_token.is_after(leave_token):
+ defer.returnValue(None)
batch = yield self.load_filtered_recents(
- leave_event.room_id, sync_config, leave_token, since_token,
+ room_id, sync_config, leave_token, since_token,
)
- logging.debug("Recents %r", batch)
+ logger.debug("Recents %r", batch)
- state_events_at_leave = yield self.store.get_state_for_event(
- leave_event.event_id
+ state_events_delta = yield self.compute_state_delta(
+ room_id, batch, sync_config, since_token, leave_token,
+ full_state=full_state
)
- state_at_previous_sync = yield self.get_state_at(
- leave_event.room_id, stream_position=since_token
+ account_data = self.account_data_for_room(
+ room_id, tags_by_room, account_data_by_room
)
- state_events_delta = yield self.compute_state_delta(
- since_token=since_token,
- previous_state=state_at_previous_sync,
- current_state=state_events_at_leave,
+ account_data = sync_config.filter_collection.filter_room_account_data(
+ account_data
)
room_sync = ArchivedSyncResult(
- room_id=leave_event.room_id,
+ room_id=room_id,
timeline=batch,
state=state_events_delta,
- account_data=self.account_data_for_room(
- leave_event.room_id, tags_by_room, account_data_by_room
- ),
+ account_data=account_data,
)
- logging.debug("Room sync: %r", room_sync)
+ logger.debug("Room sync: %r", room_sync)
defer.returnValue(room_sync)
@@ -804,15 +801,19 @@ class SyncHandler(BaseHandler):
state = {}
defer.returnValue(state)
- def compute_state_delta(self, since_token, previous_state, current_state):
- """ Works out the differnce in state between the current state and the
- state the client got when it last performed a sync.
-
- :param str since_token: the point we are comparing against
- :param dict[(str,str), synapse.events.FrozenEvent] previous_state: the
- state to compare to
- :param dict[(str,str), synapse.events.FrozenEvent] current_state: the
- new state
+ @defer.inlineCallbacks
+ def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
+ full_state):
+ """ Works out the differnce in state between the start of the timeline
+ and the previous sync.
+
+ :param str room_id
+ :param TimelineBatch batch: The timeline batch for the room that will
+ be sent to the user.
+ :param sync_config
+ :param str since_token: Token of the end of the previous batch. May be None.
+ :param str now_token: Token of the end of the current batch.
+ :param bool full_state: Whether to force returning the full state.
:returns A new event dictionary
"""
@@ -821,12 +822,53 @@ class SyncHandler(BaseHandler):
# updates even if they occured logically before the previous event.
# TODO(mjark) Check for new redactions in the state events.
- state_delta = {}
- for key, event in current_state.iteritems():
- if (key not in previous_state or
- previous_state[key].event_id != event.event_id):
- state_delta[key] = event
- return state_delta
+ with Measure(self.clock, "compute_state_delta"):
+ if full_state:
+ if batch:
+ state = yield self.store.get_state_for_event(
+ batch.events[0].event_id
+ )
+ else:
+ state = yield self.get_state_at(
+ room_id, stream_position=now_token
+ )
+
+ timeline_state = {
+ (event.type, event.state_key): event
+ for event in batch.events if event.is_state()
+ }
+
+ state = _calculate_state(
+ timeline_contains=timeline_state,
+ timeline_start=state,
+ previous={},
+ )
+ elif batch.limited:
+ state_at_previous_sync = yield self.get_state_at(
+ room_id, stream_position=since_token
+ )
+
+ state_at_timeline_start = yield self.store.get_state_for_event(
+ batch.events[0].event_id
+ )
+
+ timeline_state = {
+ (event.type, event.state_key): event
+ for event in batch.events if event.is_state()
+ }
+
+ state = _calculate_state(
+ timeline_contains=timeline_state,
+ timeline_start=state_at_timeline_start,
+ previous=state_at_previous_sync,
+ )
+ else:
+ state = {}
+
+ defer.returnValue({
+ (e.type, e.state_key): e
+ for e in sync_config.filter_collection.filter_room_state(state.values())
+ })
def check_joined_room(self, sync_config, state_delta):
"""
@@ -845,3 +887,68 @@ class SyncHandler(BaseHandler):
if join_event.content["membership"] == Membership.JOIN:
return True
return False
+
+ @defer.inlineCallbacks
+ def unread_notifs_for_room_id(self, room_id, sync_config):
+ with Measure(self.clock, "unread_notifs_for_room_id"):
+ last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user(
+ user_id=sync_config.user.to_string(),
+ room_id=room_id,
+ receipt_type="m.read"
+ )
+
+ notifs = []
+ if last_unread_event_id:
+ notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
+ room_id, sync_config.user.to_string(), last_unread_event_id
+ )
+ defer.returnValue(notifs)
+
+ # There is no new information in this period, so your notification
+ # count is whatever it was last time.
+ defer.returnValue(None)
+
+
+def _action_has_highlight(actions):
+ for action in actions:
+ try:
+ if action.get("set_tweak", None) == "highlight":
+ return action.get("value", True)
+ except AttributeError:
+ pass
+
+ return False
+
+
+def _calculate_state(timeline_contains, timeline_start, previous):
+ """Works out what state to include in a sync response.
+
+ Args:
+ timeline_contains (dict): state in the timeline
+ timeline_start (dict): state at the start of the timeline
+ previous (dict): state at the end of the previous sync (or empty dict
+ if this is an initial sync)
+
+ Returns:
+ dict
+ """
+ event_id_to_state = {
+ e.event_id: e
+ for e in itertools.chain(
+ timeline_contains.values(),
+ previous.values(),
+ timeline_start.values(),
+ )
+ }
+
+ tc_ids = set(e.event_id for e in timeline_contains.values())
+ p_ids = set(e.event_id for e in previous.values())
+ ts_ids = set(e.event_id for e in timeline_start.values())
+
+ state_ids = (ts_ids - p_ids) - tc_ids
+
+ evs = (event_id_to_state[e] for e in state_ids)
+ return {
+ (e.type, e.state_key): e
+ for e in evs
+ }
|