diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c8dfd02e7b..b5962f4f5a 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -35,6 +35,7 @@ SyncConfig = collections.namedtuple("SyncConfig", [
"filter_collection",
"is_guest",
"request_key",
+ "device_id",
])
@@ -113,6 +114,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
"joined", # JoinedSyncResult for each joined room.
"invited", # InvitedSyncResult for each invited room.
"archived", # ArchivedSyncResult for each archived room.
+ "to_device", # List of direct messages for the device.
])):
__slots__ = []
@@ -126,7 +128,8 @@ class SyncResult(collections.namedtuple("SyncResult", [
self.joined or
self.invited or
self.archived or
- self.account_data
+ self.account_data or
+ self.to_device
)
@@ -139,6 +142,7 @@ class SyncHandler(object):
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.response_cache = ResponseCache(hs)
+ self.state = hs.get_state_handler()
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
@@ -355,11 +359,11 @@ class SyncHandler(object):
Returns:
A Deferred map from ((type, state_key)->Event)
"""
- state = yield self.store.get_state_for_event(event.event_id)
+ state_ids = yield self.store.get_state_ids_for_event(event.event_id)
if event.is_state():
- state = state.copy()
- state[(event.type, event.state_key)] = event
- defer.returnValue(state)
+ state_ids = state_ids.copy()
+ state_ids[(event.type, event.state_key)] = event.event_id
+ defer.returnValue(state_ids)
@defer.inlineCallbacks
def get_state_at(self, room_id, stream_position):
@@ -412,57 +416,61 @@ class SyncHandler(object):
with Measure(self.clock, "compute_state_delta"):
if full_state:
if batch:
- current_state = yield self.store.get_state_for_event(
+ current_state_ids = yield self.store.get_state_ids_for_event(
batch.events[-1].event_id
)
- state = yield self.store.get_state_for_event(
+ state_ids = yield self.store.get_state_ids_for_event(
batch.events[0].event_id
)
else:
- current_state = yield self.get_state_at(
+ current_state_ids = yield self.get_state_at(
room_id, stream_position=now_token
)
- state = current_state
+ state_ids = current_state_ids
timeline_state = {
- (event.type, event.state_key): event
+ (event.type, event.state_key): event.event_id
for event in batch.events if event.is_state()
}
- state = _calculate_state(
+ state_ids = _calculate_state(
timeline_contains=timeline_state,
- timeline_start=state,
+ timeline_start=state_ids,
previous={},
- current=current_state,
+ current=current_state_ids,
)
elif batch.limited:
state_at_previous_sync = yield self.get_state_at(
room_id, stream_position=since_token
)
- current_state = yield self.store.get_state_for_event(
+ current_state_ids = yield self.store.get_state_ids_for_event(
batch.events[-1].event_id
)
- state_at_timeline_start = yield self.store.get_state_for_event(
+ state_at_timeline_start = yield self.store.get_state_ids_for_event(
batch.events[0].event_id
)
timeline_state = {
- (event.type, event.state_key): event
+ (event.type, event.state_key): event.event_id
for event in batch.events if event.is_state()
}
- state = _calculate_state(
+ state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
previous=state_at_previous_sync,
- current=current_state,
+ current=current_state_ids,
)
else:
- state = {}
+ state_ids = {}
+
+ state = {}
+ if state_ids:
+ state = yield self.store.get_events(state_ids.values())
defer.returnValue({
(e.type, e.state_key): e
@@ -527,16 +535,58 @@ class SyncHandler(object):
sync_result_builder, newly_joined_rooms, newly_joined_users
)
+ yield self._generate_sync_entry_for_to_device(sync_result_builder)
+
defer.returnValue(SyncResult(
presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data,
joined=sync_result_builder.joined,
invited=sync_result_builder.invited,
archived=sync_result_builder.archived,
+ to_device=sync_result_builder.to_device,
next_batch=sync_result_builder.now_token,
))
@defer.inlineCallbacks
+ def _generate_sync_entry_for_to_device(self, sync_result_builder):
+ """Generates the portion of the sync response. Populates
+ `sync_result_builder` with the result.
+
+ Args:
+ sync_result_builder(SyncResultBuilder)
+
+ Returns:
+ Deferred(dict): A dictionary containing the per room account data.
+ """
+ user_id = sync_result_builder.sync_config.user.to_string()
+ device_id = sync_result_builder.sync_config.device_id
+ now_token = sync_result_builder.now_token
+ since_stream_id = 0
+ if sync_result_builder.since_token is not None:
+ since_stream_id = int(sync_result_builder.since_token.to_device_key)
+
+ if since_stream_id != int(now_token.to_device_key):
+ # We only delete messages when a new message comes in, but that's
+ # fine so long as we delete them at some point.
+
+ logger.debug("Deleting messages up to %d", since_stream_id)
+ yield self.store.delete_messages_for_device(
+ user_id, device_id, since_stream_id
+ )
+
+ logger.debug("Getting messages up to %d", now_token.to_device_key)
+ messages, stream_id = yield self.store.get_new_messages_for_device(
+ user_id, device_id, since_stream_id, now_token.to_device_key
+ )
+ logger.debug("Got messages up to %d: %r", stream_id, messages)
+ sync_result_builder.now_token = now_token.copy_and_replace(
+ "to_device_key", stream_id
+ )
+ sync_result_builder.to_device = messages
+ else:
+ sync_result_builder.to_device = []
+
+ @defer.inlineCallbacks
def _generate_sync_entry_for_account_data(self, sync_result_builder):
"""Generates the account data portion of the sync response. Populates
`sync_result_builder` with the result.
@@ -626,7 +676,7 @@ class SyncHandler(object):
extra_users_ids = set(newly_joined_users)
for room_id in newly_joined_rooms:
- users = yield self.store.get_users_in_room(room_id)
+ users = yield self.state.get_current_user_in_room(room_id)
extra_users_ids.update(users)
extra_users_ids.discard(user.to_string())
@@ -766,8 +816,13 @@ class SyncHandler(object):
# the last sync (even if we have since left). This is to make sure
# we do send down the room, and with full state, where necessary
if room_id in joined_room_ids or has_join:
- old_state = yield self.get_state_at(room_id, since_token)
- old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
+ old_state_ids = yield self.get_state_at(room_id, since_token)
+ old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
+ old_mem_ev = None
+ if old_mem_ev_id:
+ old_mem_ev = yield self.store.get_event(
+ old_mem_ev_id, allow_none=True
+ )
if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
newly_joined_rooms.append(room_id)
@@ -1059,27 +1114,25 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
Returns:
dict
"""
- event_id_to_state = {
- e.event_id: e
- for e in itertools.chain(
- timeline_contains.values(),
- previous.values(),
- timeline_start.values(),
- current.values(),
+ event_id_to_key = {
+ e: key
+ for key, e in itertools.chain(
+ timeline_contains.items(),
+ previous.items(),
+ timeline_start.items(),
+ current.items(),
)
}
- c_ids = set(e.event_id for e in current.values())
- tc_ids = set(e.event_id for e in timeline_contains.values())
- p_ids = set(e.event_id for e in previous.values())
- ts_ids = set(e.event_id for e in timeline_start.values())
+ c_ids = set(e for e in current.values())
+ tc_ids = set(e for e in timeline_contains.values())
+ p_ids = set(e for e in previous.values())
+ ts_ids = set(e for e in timeline_start.values())
state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
- evs = (event_id_to_state[e] for e in state_ids)
return {
- (e.type, e.state_key): e
- for e in evs
+ event_id_to_key[e]: e for e in state_ids
}
@@ -1103,6 +1156,7 @@ class SyncResultBuilder(object):
self.joined = []
self.invited = []
self.archived = []
+ self.device = []
class RoomSyncResultBuilder(object):
|