From 436513068de73ab47d9ba9a32046420be3d86588 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 26 Jan 2015 18:53:31 +0000 Subject: Start implementing the non-incremental sync portion of the v2 /sync API --- synapse/events/utils.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'synapse/events') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index e391aca4cc..b7f1ad4b40 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -89,7 +89,7 @@ def prune_event(event): return type(event)(allowed_fields) -def serialize_event(e, time_now_ms, client_event=True): +def serialize_event(e, time_now_ms, client_event=True, strip_ids=False): # FIXME(erikj): To handle the case of presence events and the like if not isinstance(e, EventBase): return e @@ -138,4 +138,8 @@ def serialize_event(e, time_now_ms, client_event=True): d.pop("unsigned", None) d.pop("origin", None) + if strip_ids: + d.pop("room_id", None) + d.pop("event_id", None) + return d -- cgit 1.5.1 From a56008842b43089433768f569f35b2d14523ac39 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 27 Jan 2015 16:24:22 +0000 Subject: Start implementing incremental initial sync --- synapse/events/utils.py | 1 + synapse/handlers/sync.py | 233 +++++++++++++++++++++++++++++++++++++++++----- synapse/storage/stream.py | 41 ++++++-- 3 files changed, 241 insertions(+), 34 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index b7f1ad4b40..42fb0371e5 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -137,6 +137,7 @@ def serialize_event(e, time_now_ms, client_event=True, strip_ids=False): d.pop("depth", None) d.pop("unsigned", None) d.pop("origin", None) + d.pop("prev_state", None) if strip_ids: d.pop("room_id", None) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index bbabaf3df1..f8629a588f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -37,14 +37,18 @@ SyncConfig = collections.namedtuple("SyncConfig", [ ]) -RoomSyncResult = collections.namedtuple("RoomSyncResult", [ +class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ "room_id", "limited", "published", - "events", # dict of event + "events", "state", "prev_batch", -]) +])): + __slots__ = [] + + def __nonzero__(self): + return bool(self.events or self.state) class SyncResult(collections.namedtuple("SyncResult", [ @@ -56,7 +60,9 @@ class SyncResult(collections.namedtuple("SyncResult", [ __slots__ = [] def __nonzero__(self): - return self.private_user_data or self.public_user_data or self.rooms + return bool( + self.private_user_data or self.public_user_data or self.rooms + ) class SyncHandler(BaseHandler): @@ -67,7 +73,13 @@ class SyncHandler(BaseHandler): self.clock = hs.get_clock() def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0): - if timeout == 0: + """Get the sync for a client if we have new data for it now. Otherwise + wait for new data to arrive on the server. If the timeout expires, then + return an empty sync result. + Returns: + A Deferred SyncResult. + """ + if timeout == 0 or since_token is None: return self.current_sync_for_user(sync_config, since_token) else: def current_sync_callback(since_token): @@ -79,13 +91,25 @@ class SyncHandler(BaseHandler): ) def current_sync_for_user(self, sync_config, since_token=None): + """Get the sync for client needed to match what the server has now. + Returns: + A Deferred SyncResult. + """ if since_token is None: return self.initial_sync(sync_config) else: - return self.incremental_sync(sync_config) + if sync_config.gap: + return self.incremental_sync_with_gap(sync_config, since_token) + else: + #TODO(mjark): Handle gapless sync + pass @defer.inlineCallbacks def initial_sync(self, sync_config): + """Get a sync for a client which is starting without any state + Returns: + A Deferred SyncResult. + """ if sync_config.sort == "timeline,desc": # TODO(mjark): Handle going through events in reverse order?. # What does "most recent events" mean when applying the limits mean @@ -114,25 +138,86 @@ class SyncHandler(BaseHandler): rooms = [] for event in room_list: - #TODO (mjark): Apply the event filter in sync_config. - recent_events, token = yield self.store.get_recent_events_for_room( - event.room_id, - limit=sync_config.limit, - end_token=now_token.room_key, - ) - prev_batch_token = now_token.copy_and_replace("room_key", token[0]) - current_state_events = yield self.state_handler.get_current_state( - event.room_id + room_sync = yield self.initial_sync_for_room( + event.room_id, sync_config, now_token, published_room_ids ) + rooms.append(room_sync) + + defer.returnValue(SyncResult( + public_user_data=presence, + private_user_data=[], + rooms=rooms, + next_batch=now_token, + )) + + @defer.inlineCallbacks + def intial_sync_for_room(self, room_id, sync_config, now_token, + published_room_ids): + """Sync a room for a client which is starting without any state + Returns: + A Deferred RoomSyncResult. + """ + recent_events, token = yield self.store.get_recent_events_for_room( + room_id, + limit=sync_config.limit, + end_token=now_token.room_key, + ) + prev_batch_token = now_token.copy_and_replace("room_key", token[0]) + current_state_events = yield self.state_handler.get_current_state( + room_id + ) + + defer.returnValue(RoomSyncResult( + room_id=room_id, + published=room_id in published_room_ids, + events=recent_events, + prev_batch=prev_batch_token, + state=current_state_events, + limited=True, + )) + + + @defer.inlineCallbacks + def incremental_sync_with_gap(self, sync_config, since_token): + """ Get the incremental delta needed to bring the client up to + date with the server. + Returns: + A Deferred SyncResult. + """ + if sync_config.sort == "timeline,desc": + # TODO(mjark): Handle going through events in reverse order?. + # What does "most recent events" mean when applying the limits mean + # in this case? + raise NotImplementedError() + + now_token = yield self.event_sources.get_current_token() + + presence_stream = self.event_sources.sources["presence"] + pagination_config = PaginationConfig( + from_token=since_token, to_token=now_token + ) + presence, _ = yield presence_stream.get_pagination_rows( + user=sync_config.user, + pagination_config=pagination_config.get_source_config("presence"), + key=None + ) + room_list = yield self.store.get_rooms_for_user_where_membership_is( + user_id=sync_config.user.to_string(), + membership_list=[Membership.INVITE, Membership.JOIN] + ) + + # TODO (mjark): Does public mean "published"? + published_rooms = yield self.store.get_rooms(is_public=True) + published_room_ids = set(r["room_id"] for r in published_rooms) - rooms.append(RoomSyncResult( - room_id=event.room_id, - published=event.room_id in published_room_ids, - events=recent_events, - prev_batch=prev_batch_token, - state=current_state_events, - limited=True, - )) + rooms = [] + for event in room_list: + room_sync = yield self.incremental_sync_with_gap_for_room( + event.room_id, sync_config, since_token, now_token, + published_room_ids + ) + if room_sync: + rooms.append(room_sync) defer.returnValue(SyncResult( public_user_data=presence, @@ -143,5 +228,103 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks - def incremental_sync(self, sync_config): - pass + def incremental_sync_with_gap_for_room(self, room_id, sync_config, + since_token, now_token, + published_room_ids): + """ Get the incremental delta needed to bring the client up to date for + the room. Gives the client the most recent events and the changes to + state. + Returns: + A Deferred RoomSyncResult + """ + # TODO(mjark): Check if they have joined the room between + # the previous sync and this one. + # TODO(mjark): Apply the event filter in sync_config + # TODO(mjark): Check for redactions we might have missed. + # TODO(mjark): Typing notifications. + recents, token = yield self.store.get_recent_events_for_room( + room_id, + limit=sync_config.limit + 1, + from_token=since_token.room_key, + end_token=now_token.room_key, + ) + + logging.debug("Recents %r", recents) + + if len(recents) > sync_config.limit: + limited = True + recents = recents[1:] + else: + limited = False + + prev_batch_token = now_token.copy_and_replace("room_key", token[0]) + + # TODO(mjark): This seems racy since this isn't being passed a + # token to indicate what point in the stream this is + current_state_events = yield self.state_handler.get_current_state( + room_id + ) + + state_at_previous_sync = yield self.get_state_at_previous_sync( + room_id, since_token=since_token + ) + + state_events_delta = yield self.compute_state_delta( + since_token=since_token, + previous_state=state_at_previous_sync, + current_state=current_state_events, + ) + + room_sync = RoomSyncResult( + room_id=room_id, + published=room_id in published_room_ids, + events=recents, + prev_batch=prev_batch_token, + state=state_events_delta, + limited=limited, + ) + + logging.debug("Room sync: %r", room_sync) + + defer.returnValue(room_sync) + + @defer.inlineCallbacks + def get_state_at_previous_sync(self, room_id, since_token): + """ Get the room state at the previous sync the client made. + Returns: + A Deferred list of Events. + """ + last_events, token = yield self.store.get_recent_events_for_room( + room_id, end_token=since_token.room_key, limit=1, + ) + + if last_events: + last_event = last_events[0] + last_context = yield self.state_handler.compute_event_context( + last_event + ) + if last_event.is_state(): + state = [last_event] + last_context.current_state.values() + else: + state = last_context.current_state.values() + else: + state = () + defer.returnValue(state) + + + def compute_state_delta(self, since_token, previous_state, current_state): + """ Works out the differnce in state between the current state and the + state the client got when it last performed a sync. + Returns: + A list of events. + """ + # TODO(mjark) Check if the state events were received by the server + # after the previous sync, since we need to include those state + # updates even if they occured logically before the previous event. + # TODO(mjark) Check for new redactions in the state events. + previous_dict = {event.event_id:event for event in previous_state} + state_delta = [] + for event in current_state: + if event.event_id not in previous_dict: + state_delta.append(event) + return state_delta diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 8ac2adab05..06aca1a4e5 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -265,17 +265,38 @@ class StreamStore(SQLBaseStore): return self.runInteraction("paginate_room_events", f) def get_recent_events_for_room(self, room_id, limit, end_token, - with_feedback=False): + with_feedback=False, from_token=None): # TODO (erikj): Handle compressed feedback - sql = ( - "SELECT stream_ordering, topological_ordering, event_id FROM events " - "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 " - "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? " - ) + end_token = _StreamToken.parse_stream_token(end_token) - def f(txn): - txn.execute(sql, (room_id, end_token, limit,)) + if from_token is None: + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0" + " ORDER BY topological_ordering DESC, stream_ordering DESC" + " LIMIT ?" + ) + else: + from_token = _StreamToken.parse_stream_token(from_token) + sql = ( + "SELECT stream_ordering, topological_ordering, event_id" + " FROM events" + " WHERE room_id = ? AND stream_ordering > ?" + " AND stream_ordering <= ? AND outlier = 0" + " ORDER BY topological_ordering DESC, stream_ordering DESC" + " LIMIT ?" + ) + + + def get_recent_events_for_room_txn(txn): + if from_token is None: + txn.execute(sql, (room_id, end_token.stream, limit,)) + else: + txn.execute(sql, ( + room_id, from_token.stream, end_token.stream, limit + )) rows = self.cursor_to_dict(txn) @@ -303,7 +324,9 @@ class StreamStore(SQLBaseStore): return events, token - return self.runInteraction("get_recent_events_for_room", f) + return self.runInteraction( + "get_recent_events_for_room", get_recent_events_for_room_txn + ) def get_room_events_max_id(self): return self.runInteraction( -- cgit 1.5.1 From 1b4a164c026ac027b7c84422f6c743c36896a0e7 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Jan 2015 02:34:35 +0000 Subject: Add support for formatting events in the way a v2 client expects --- synapse/events/utils.py | 91 ++++++++++++++++++++++++++++--------------------- 1 file changed, 52 insertions(+), 39 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 42fb0371e5..dd7f3d6f42 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -89,7 +89,46 @@ def prune_event(event): return type(event)(allowed_fields) -def serialize_event(e, time_now_ms, client_event=True, strip_ids=False): +def format_event_raw(d): + return d + + +def format_event_for_client_v1(d): + d["user_id"] = d.pop("sender", None) + + move_keys = ("age", "redacted_because", "replaces_state", "prev_content") + for key in move_keys: + if key in d["unsigned"]: + d[key] = d["unsigned"][key] + + drop_keys = ( + "auth_events", "prev_events", "hashes", "signatures", "depth", + "unsigned", "origin" + ) + for key in drop_keys: + d.pop(key, None) + return d + + +def format_event_for_client_v2(d): + drop_keys = ( + "auth_events", "prev_events", "hashes", "signatures", "depth", "origin" + ) + for key in drop_keys: + d.pop(key, None) + return d + + +def format_event_for_client_v2_without_event_id(d): + d = format_event_for_client_v2(d) + d.pop("room_id", None) + d.pop("event_id", None) + return d + + +def serialize_event(e, time_now_ms, as_client_event=True, + event_format=format_event_for_client_v1, + token_id=None): # FIXME(erikj): To handle the case of presence events and the like if not isinstance(e, EventBase): return e @@ -99,48 +138,22 @@ def serialize_event(e, time_now_ms, client_event=True, strip_ids=False): # Should this strip out None's? d = {k: v for k, v in e.get_dict().items()} - if not client_event: - # set the age and keep all other keys - if "age_ts" in d["unsigned"]: - d["unsigned"]["age"] = time_now_ms - d["unsigned"]["age_ts"] - return d - if "age_ts" in d["unsigned"]: - d["age"] = time_now_ms - d["unsigned"]["age_ts"] - del d["unsigned"]["age_ts"] - - d["user_id"] = d.pop("sender", None) + d["unsigned"]["age"] = time_now_ms - d["unsigned"]["age_ts"] + d["unsigned"]["age_ts"] if "redacted_because" in e.unsigned: - d["redacted_because"] = serialize_event( + d["unsigned"]["redacted_because"] = serialize_event( e.unsigned["redacted_because"], time_now_ms ) - del d["unsigned"]["redacted_because"] - - if "redacted_by" in e.unsigned: - d["redacted_by"] = e.unsigned["redacted_by"] - del d["unsigned"]["redacted_by"] + if token_id is not None: + if token_id == e.internal_metadata["token_id"]: + txn_id = e.internal_metadata.get("txn_id", None) + if txn_id is not None: + d["unsigned"]["transaction_id"] = txn_id - if "replaces_state" in e.unsigned: - d["replaces_state"] = e.unsigned["replaces_state"] - del d["unsigned"]["replaces_state"] - - if "prev_content" in e.unsigned: - d["prev_content"] = e.unsigned["prev_content"] - del d["unsigned"]["prev_content"] - - del d["auth_events"] - del d["prev_events"] - del d["hashes"] - del d["signatures"] - d.pop("depth", None) - d.pop("unsigned", None) - d.pop("origin", None) - d.pop("prev_state", None) - - if strip_ids: - d.pop("room_id", None) - d.pop("event_id", None) - - return d + if as_client_event: + return event_format(d) + else: + return d -- cgit 1.5.1 From b9c442c85c9f8c2aa7fcb57d6132dec9b85b4e60 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Jan 2015 02:45:33 +0000 Subject: Include transaction ids in unsigned section of events in the sync results for the clients that made those requests --- synapse/events/utils.py | 11 ++++++----- synapse/rest/client/v2_alpha/sync.py | 23 ++++++++++++++++------- 2 files changed, 22 insertions(+), 12 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index dd7f3d6f42..21316cc125 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -103,7 +103,7 @@ def format_event_for_client_v1(d): drop_keys = ( "auth_events", "prev_events", "hashes", "signatures", "depth", - "unsigned", "origin" + "unsigned", "origin", "prev_state" ) for key in drop_keys: d.pop(key, None) @@ -112,7 +112,8 @@ def format_event_for_client_v1(d): def format_event_for_client_v2(d): drop_keys = ( - "auth_events", "prev_events", "hashes", "signatures", "depth", "origin" + "auth_events", "prev_events", "hashes", "signatures", "depth", + "origin", "prev_state", ) for key in drop_keys: d.pop(key, None) @@ -140,7 +141,7 @@ def serialize_event(e, time_now_ms, as_client_event=True, if "age_ts" in d["unsigned"]: d["unsigned"]["age"] = time_now_ms - d["unsigned"]["age_ts"] - d["unsigned"]["age_ts"] + del d["unsigned"]["age_ts"] if "redacted_because" in e.unsigned: d["unsigned"]["redacted_because"] = serialize_event( @@ -148,8 +149,8 @@ def serialize_event(e, time_now_ms, as_client_event=True, ) if token_id is not None: - if token_id == e.internal_metadata["token_id"]: - txn_id = e.internal_metadata.get("txn_id", None) + if token_id == getattr(e.internal_metadata, "token_id", None): + txn_id = getattr(e.internal_metadata, "txn_id", None) if txn_id is not None: d["unsigned"]["transaction_id"] = txn_id diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index a0ab9839a6..4d950f9956 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -18,7 +18,9 @@ from twisted.internet import defer from synapse.http.servlet import RestServlet from synapse.handlers.sync import SyncConfig from synapse.types import StreamToken -from synapse.events.utils import serialize_event +from synapse.events.utils import ( + serialize_event, format_event_for_client_v2_without_event_id, +) from ._base import client_v2_pattern import logging @@ -139,7 +141,9 @@ class SyncRestServlet(RestServlet): "private_user_data": self.encode_events( sync_result.private_user_data, filter, time_now ), - "rooms": self.encode_rooms(sync_result.rooms, filter, time_now), + "rooms": self.encode_rooms( + sync_result.rooms, filter, time_now, client.token_id + ), "next_batch": sync_result.next_batch.to_string(), } @@ -153,25 +157,30 @@ class SyncRestServlet(RestServlet): # TODO(mjark): Respect formatting requirements in the filter. return serialize_event(event, time_now) - def encode_rooms(self, rooms, filter, time_now): - return [self.encode_room(room, filter, time_now) for room in rooms] + def encode_rooms(self, rooms, filter, time_now, token_id): + return [ + self.encode_room(room, filter, time_now, token_id) + for room in rooms + ] @staticmethod - def encode_room(room, filter, time_now): + def encode_room(room, filter, time_now, token_id): event_map = {} state_event_ids = [] recent_event_ids = [] for event in room.state: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( - event, time_now, strip_ids=True + event, time_now, token_id=token_id, + event_format=format_event_for_client_v2_without_event_id, ) state_event_ids.append(event.event_id) for event in room.events: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( - event, time_now, strip_ids=True + event, time_now, token_id=token_id, + event_format=format_event_for_client_v2_without_event_id, ) recent_event_ids.append(event.event_id) return { -- cgit 1.5.1