From 68b7fc3e2ba0aae7813b0bae52370860b5cd9f26 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 19 Oct 2015 17:26:18 +0100 Subject: Add rooms that the user has left under archived in v2 sync. --- synapse/handlers/sync.py | 128 ++++++++++++++++++++++++++++++++++- synapse/rest/client/v2_alpha/sync.py | 29 ++++++-- synapse/storage/roommember.py | 13 ++++ 3 files changed, 161 insertions(+), 9 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ee6b881de1..1891cd088c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -61,18 +61,37 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ return bool(self.timeline or self.state or self.ephemeral) +class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ + "room_id", + "timeline", + "state", +])): + __slots__ = [] + + def __nonzero__(self): + """Make the result appear empty if there are no updates. This is used + to tell if room needs to be part of the sync result. + """ + return bool(self.timeline or self.state) + + class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ "room_id", "invite", ])): __slots__ = [] + def __nonzero__(self): + """Invited rooms should always be reported to the client""" + return True + class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. "joined", # JoinedSyncResult for each joined room. "invited", # InvitedSyncResult for each invited room. + "archived", # ArchivedSyncResult for each archived room. ])): __slots__ = [] @@ -156,11 +175,14 @@ class SyncHandler(BaseHandler): ) room_list = yield self.store.get_rooms_for_user_where_membership_is( user_id=sync_config.user.to_string(), - membership_list=[Membership.INVITE, Membership.JOIN] + membership_list=[ + Membership.INVITE, Membership.JOIN, Membership.LEAVE + ] ) joined = [] invited = [] + archived = [] for event in room_list: if event.membership == Membership.JOIN: room_sync = yield self.initial_sync_for_joined_room( @@ -173,11 +195,23 @@ class SyncHandler(BaseHandler): room_id=event.room_id, invite=invite, )) + elif event.membership == Membership.LEAVE: + leave_token = now_token.copy_and_replace( + "room_key", "s%d" % (event.stream_ordering,) + ) + room_sync = yield self.initial_sync_for_archived_room( + sync_config=sync_config, + room_id=event.room_id, + leave_event_id=event.event_id, + leave_token=leave_token, + ) + archived.append(room_sync) defer.returnValue(SyncResult( presence=presence, joined=joined, invited=invited, + archived=archived, next_batch=now_token, )) @@ -204,6 +238,28 @@ class SyncHandler(BaseHandler): ephemeral=[], )) + @defer.inlineCallbacks + def initial_sync_for_archived_room(self, room_id, sync_config, + leave_event_id, leave_token): + """Sync a room for a client which is starting without any state + Returns: + A Deferred JoinedSyncResult. + """ + + batch = yield self.load_filtered_recents( + room_id, sync_config, leave_token, + ) + + leave_state = yield self.store.get_state_for_events( + [leave_event_id], None + ) + + defer.returnValue(ArchivedSyncResult( + room_id=room_id, + timeline=batch, + state=leave_state[leave_event_id].values(), + )) + @defer.inlineCallbacks def incremental_sync_with_gap(self, sync_config, since_token): """ Get the incremental delta needed to bring the client up to @@ -257,18 +313,22 @@ class SyncHandler(BaseHandler): ) joined = [] + archived = [] if len(room_events) <= timeline_limit: # There is no gap in any of the rooms. Therefore we can just # partition the new events by room and return them. invite_events = [] + leave_events = [] events_by_room_id = {} for event in room_events: events_by_room_id.setdefault(event.room_id, []).append(event) if event.room_id not in joined_room_ids: if (event.type == EventTypes.Member - and event.membership == Membership.INVITE and event.state_key == sync_config.user.to_string()): - invite_events.append(event) + if event.membership == Membership.INVITE: + invite_events.append(event) + elif event.membership == Membership.LEAVE: + leave_events.append(event) for room_id in joined_room_ids: recents = events_by_room_id.get(room_id, []) @@ -296,11 +356,16 @@ class SyncHandler(BaseHandler): ) if room_sync: joined.append(room_sync) + else: invite_events = yield self.store.get_invites_for_user( sync_config.user.to_string() ) + leave_events = yield self.store.get_leave_events_for_user( + sync_config.user.to_string() + ) + for room_id in joined_room_ids: room_sync = yield self.incremental_sync_with_gap_for_room( room_id, sync_config, since_token, now_token, @@ -309,6 +374,12 @@ class SyncHandler(BaseHandler): if room_sync: joined.append(room_sync) + for leave_event in leave_events: + room_sync = yield self.incremental_sync_for_archived_room( + sync_config, leave_event, since_token + ) + archived.append(room_sync) + invited = [ InvitedSyncResult(room_id=event.room_id, invite=event) for event in invite_events @@ -318,6 +389,7 @@ class SyncHandler(BaseHandler): presence=presence, joined=joined, invited=invited, + archived=archived, next_batch=now_token, )) @@ -416,6 +488,56 @@ class SyncHandler(BaseHandler): defer.returnValue(room_sync) + @defer.inlineCallbacks + def incremental_sync_for_archived_room(self, sync_config, leave_event, + since_token): + """ Get the incremental delta needed to bring the client up to date for + the archived room. + Returns: + A Deferred ArchivedSyncResult + """ + + stream_token = yield self.store.get_stream_token_for_event( + leave_event.event_id + ) + + leave_token = since_token.copy_and_replace("room_key", stream_token) + + batch = yield self.load_filtered_recents( + leave_event.room_id, sync_config, leave_token, since_token, + ) + + logging.debug("Recents %r", batch) + + # TODO(mjark): This seems racy since this isn't being passed a + # token to indicate what point in the stream this is + leave_state = yield self.store.get_state_for_events( + [leave_event.event_id], None + ) + + state_events_at_leave = leave_state[leave_event.event_id].values() + + state_at_previous_sync = yield self.get_state_at_previous_sync( + leave_event.room_id, since_token=since_token + ) + + state_events_delta = yield self.compute_state_delta( + since_token=since_token, + previous_state=state_at_previous_sync, + current_state=state_events_at_leave, + ) + + room_sync = ArchivedSyncResult( + room_id=leave_event.room_id, + timeline=batch, + state=state_events_delta, + ) + + logging.debug("Room sync: %r", room_sync) + + defer.returnValue(room_sync) + + @defer.inlineCallbacks def get_state_at_previous_sync(self, room_id, since_token): """ Get the room state at the previous sync the client made. diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index fffecb24f5..73473a7e6b 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -136,6 +136,10 @@ class SyncRestServlet(RestServlet): sync_result.invited, filter, time_now, token_id ) + archived = self.encode_archived( + sync_result.archived, filter, time_now, token_id + ) + response_content = { "presence": self.encode_presence( sync_result.presence, filter, time_now @@ -143,7 +147,7 @@ class SyncRestServlet(RestServlet): "rooms": { "joined": joined, "invited": invited, - "archived": {}, + "archived": archived, }, "next_batch": sync_result.next_batch.to_string(), } @@ -182,14 +186,20 @@ class SyncRestServlet(RestServlet): return invited + def encode_archived(self, rooms, filter, time_now, token_id): + joined = {} + for room in rooms: + joined[room.room_id] = self.encode_room( + room, filter, time_now, token_id, joined=False + ) + + return joined + @staticmethod - def encode_room(room, filter, time_now, token_id): + def encode_room(room, filter, time_now, token_id, joined=True): event_map = {} state_events = filter.filter_room_state(room.state) - timeline_events = filter.filter_room_timeline(room.timeline.events) - ephemeral_events = filter.filter_room_ephemeral(room.ephemeral) state_event_ids = [] - timeline_event_ids = [] for event in state_events: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( @@ -198,6 +208,8 @@ class SyncRestServlet(RestServlet): ) state_event_ids.append(event.event_id) + timeline_events = filter.filter_room_timeline(room.timeline.events) + timeline_event_ids = [] for event in timeline_events: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( @@ -205,6 +217,7 @@ class SyncRestServlet(RestServlet): event_format=format_event_for_client_v2_without_event_id, ) timeline_event_ids.append(event.event_id) + result = { "event_map": event_map, "timeline": { @@ -213,8 +226,12 @@ class SyncRestServlet(RestServlet): "limited": room.timeline.limited, }, "state": {"events": state_event_ids}, - "ephemeral": {"events": ephemeral_events}, } + + if joined: + ephemeral_events = filter.filter_room_ephemeral(room.ephemeral) + result["ephemeral"] = {"events": ephemeral_events} + return result diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index dd98dcfda8..623400fd36 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -124,6 +124,19 @@ class RoomMemberStore(SQLBaseStore): invites.event_id for invite in invites ])) + def get_leave_events_for_user(self, user_id): + """ Get all the leave events for a user + Args: + user_id (str): The user ID. + Returns: + A deferred list of event objects. + """ + return self.get_rooms_for_user_where_membership_is( + user_id, [Membership.LEAVE] + ).addCallback(lambda leaves: self._get_events([ + leave.event_id for leave in leaves + ])) + def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user matches one in the membership list. -- cgit 1.5.1 From 51d03e65b2d481a59dfb08e6c75aa349fca71fe6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 19 Oct 2015 17:48:58 +0100 Subject: Fix pep8 --- synapse/handlers/sync.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 1891cd088c..5ca2606443 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -537,7 +537,6 @@ class SyncHandler(BaseHandler): defer.returnValue(room_sync) - @defer.inlineCallbacks def get_state_at_previous_sync(self, room_id, since_token): """ Get the room state at the previous sync the client made. -- cgit 1.5.1 From b02a342750f84ffebb793aa5d3c80780684dd147 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 20 Oct 2015 11:07:50 +0100 Subject: Don't 500 when the email doesn't map to a valid user ID. --- synapse/rest/client/v1/login.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse') diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index dacc416055..b2e4cb8eaa 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -101,6 +101,10 @@ class LoginRestServlet(ClientV1RestServlet): user_id = yield self.hs.get_datastore().get_user_id_by_threepid( login_submission['medium'], login_submission['address'] ) + if not user_id: + raise LoginError( + 401, "Unrecognised address", errcode=Codes.UNAUTHORIZED + ) else: user_id = login_submission['user'] -- cgit 1.5.1 From 7be06680edd6d1bde6e73a91b361fa8cb0a7034d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 20 Oct 2015 16:36:20 +0100 Subject: Include typing events in initial v2 sync --- synapse/handlers/sync.py | 43 +++++++++++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 14 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ee6b881de1..e22fe553fd 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -145,6 +145,10 @@ class SyncHandler(BaseHandler): """ now_token = yield self.event_sources.get_current_token() + now_token, typing_by_room = yield self.typing_by_room( + sync_config, now_token + ) + presence_stream = self.event_sources.sources["presence"] # TODO (mjark): This looks wrong, shouldn't we be getting the presence # UP to the present rather than after the present? @@ -164,7 +168,7 @@ class SyncHandler(BaseHandler): for event in room_list: if event.membership == Membership.JOIN: room_sync = yield self.initial_sync_for_joined_room( - event.room_id, sync_config, now_token, + event.room_id, sync_config, now_token, typing_by_room ) joined.append(room_sync) elif event.membership == Membership.INVITE: @@ -182,7 +186,8 @@ class SyncHandler(BaseHandler): )) @defer.inlineCallbacks - def initial_sync_for_joined_room(self, room_id, sync_config, now_token): + def initial_sync_for_joined_room(self, room_id, sync_config, now_token, + typing_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. @@ -201,9 +206,28 @@ class SyncHandler(BaseHandler): room_id=room_id, timeline=batch, state=current_state_events, - ephemeral=[], + ephemeral=typing_by_room.get(room_id, []), )) + @defer.inlineCallbacks + def typing_by_room(self, sync_config, now_token, since_token=None): + typing_key = since_token.typing_key if since_token else "0" + + typing_source = self.event_sources.sources["typing"] + typing, typing_key = yield typing_source.get_new_events_for_user( + user=sync_config.user, + from_key=typing_key, + limit=sync_config.filter.ephemeral_limit(), + ) + now_token = now_token.copy_and_replace("typing_key", typing_key) + + typing_by_room = {event["room_id"]: [event] for event in typing} + for event in typing: + event.pop("room_id") + logger.debug("Typing %r", typing_by_room) + + defer.returnValue((now_token, typing_by_room)) + @defer.inlineCallbacks def incremental_sync_with_gap(self, sync_config, since_token): """ Get the incremental delta needed to bring the client up to @@ -221,18 +245,9 @@ class SyncHandler(BaseHandler): ) now_token = now_token.copy_and_replace("presence_key", presence_key) - typing_source = self.event_sources.sources["typing"] - typing, typing_key = yield typing_source.get_new_events_for_user( - user=sync_config.user, - from_key=since_token.typing_key, - limit=sync_config.filter.ephemeral_limit(), + now_token, typing_by_room = yield self.typing_by_room( + sync_config, now_token, since_token ) - now_token = now_token.copy_and_replace("typing_key", typing_key) - - typing_by_room = {event["room_id"]: [event] for event in typing} - for event in typing: - event.pop("room_id") - logger.debug("Typing %r", typing_by_room) rm_handler = self.hs.get_handlers().room_member_handler app_service = yield self.store.get_app_service_by_user_id( -- cgit 1.5.1 From ede07434e069d1b143993a3b492428b69a515856 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 21 Oct 2015 09:42:07 +0100 Subject: Use 403 and message to match handlers/auth --- synapse/rest/client/v1/login.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index b2e4cb8eaa..e71cf7e43e 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -102,9 +102,7 @@ class LoginRestServlet(ClientV1RestServlet): login_submission['medium'], login_submission['address'] ) if not user_id: - raise LoginError( - 401, "Unrecognised address", errcode=Codes.UNAUTHORIZED - ) + raise LoginError(403, "", errcode=Codes.FORBIDDEN) else: user_id = login_submission['user'] -- cgit 1.5.1 From 4dec901c76e29ad029f53ce199450d75ec8d2ad5 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 21 Oct 2015 10:10:55 +0100 Subject: Cap the time to retry txns to appservices to 8.5 minutes There's been numerous issues with people playing around with their application service and then not receiving events from their HS for ages due to backoff timers reaching crazy heights (albeit capped at < 1 day). Reduce the max time between pokes to be 8.5 minutes (2^9 secs) which is quick enough for people to wait it out (avg wait time being 4.25 min) but long enough to actually give the AS breathing room if it needs it. --- synapse/appservice/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 59b0b1f4ac..44dc2c4744 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -224,8 +224,8 @@ class _Recoverer(object): self.clock.call_later((2 ** self.backoff_counter), self.retry) def _backoff(self): - # cap the backoff to be around 18h => (2^16) = 65536 secs - if self.backoff_counter < 16: + # cap the backoff to be around 8.5min => (2^9) = 512 secs + if self.backoff_counter < 9: self.backoff_counter += 1 self.recover() -- cgit 1.5.1 From e3d75f564ab1d17eb4ee47314f78c41553a486f1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 21 Oct 2015 11:15:48 +0100 Subject: Include banned rooms in the archived section of v2 sync --- synapse/handlers/sync.py | 15 +++++++++------ synapse/storage/roommember.py | 4 ++-- 2 files changed, 11 insertions(+), 8 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5ca2606443..6cb756e476 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -175,9 +175,12 @@ class SyncHandler(BaseHandler): ) room_list = yield self.store.get_rooms_for_user_where_membership_is( user_id=sync_config.user.to_string(), - membership_list=[ - Membership.INVITE, Membership.JOIN, Membership.LEAVE - ] + membership_list=( + Membership.INVITE, + Membership.JOIN, + Membership.LEAVE, + Membership.BAN + ) ) joined = [] @@ -195,7 +198,7 @@ class SyncHandler(BaseHandler): room_id=event.room_id, invite=invite, )) - elif event.membership == Membership.LEAVE: + elif event.membership in (Membership.LEAVE, Membership.BAN): leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) @@ -327,7 +330,7 @@ class SyncHandler(BaseHandler): and event.state_key == sync_config.user.to_string()): if event.membership == Membership.INVITE: invite_events.append(event) - elif event.membership == Membership.LEAVE: + elif event.membership in (Membership.LEAVE, Membership.BAN): leave_events.append(event) for room_id in joined_room_ids: @@ -362,7 +365,7 @@ class SyncHandler(BaseHandler): sync_config.user.to_string() ) - leave_events = yield self.store.get_leave_events_for_user( + leave_events = yield self.store.get_leave_and_ban_events_for_user( sync_config.user.to_string() ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 623400fd36..ae1ad56d9a 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -124,7 +124,7 @@ class RoomMemberStore(SQLBaseStore): invites.event_id for invite in invites ])) - def get_leave_events_for_user(self, user_id): + def get_leave_and_ban_events_for_user(self, user_id): """ Get all the leave events for a user Args: user_id (str): The user ID. @@ -132,7 +132,7 @@ class RoomMemberStore(SQLBaseStore): A deferred list of event objects. """ return self.get_rooms_for_user_where_membership_is( - user_id, [Membership.LEAVE] + user_id, (Membership.LEAVE, Membership.BAN) ).addCallback(lambda leaves: self._get_events([ leave.event_id for leave in leaves ])) -- cgit 1.5.1 From d63a0ca34b0951cb3d1981225e1c1cf91b996d30 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 21 Oct 2015 15:45:37 +0100 Subject: Doc string for the SyncHandler.typing_by_room method --- synapse/handlers/sync.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'synapse') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e22fe553fd..e651b49987 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -211,6 +211,18 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def typing_by_room(self, sync_config, now_token, since_token=None): + """Get the typing events for each room the user is in + Args: + sync_config (SyncConfig): The flags, filters and user for the sync. + now_token (StreamToken): Where the server is currently up to. + since_token (StreamToken): Where the server was when the client + last synced. + Returns: + A tuple of the now StreamToken, updated to reflect the which typing + events are included, and a dict mapping from room_id to a list of + typing events for that room. + """ + typing_key = since_token.typing_key if since_token else "0" typing_source = self.event_sources.sources["typing"] -- cgit 1.5.1