From 89ae0166ded093be2343409cfe42f475dea83139 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 9 Sep 2015 13:25:22 +0100 Subject: Allow room initialSync for users that have left the room, returning a snapshot of how the room was when they left it --- synapse/storage/stream.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'synapse/storage/stream.py') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index d7fe423f5a..0abfa86cd2 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -379,6 +379,21 @@ class StreamStore(SQLBaseStore): ) defer.returnValue("t%d-%d" % (topo, token)) + def get_stream_token_for_event(self, event_id): + """The stream token for an event + Args: + event_id(str): The id of the event to look up a stream token for. + Raises: + StoreError if the event wasn't in the database. + Returns: + A deferred "s%d" stream token. + """ + return self._simple_select_one_onecol( + table="events", + keyvalues={"event_id": event_id}, + retcol="stream_ordering", + ).addCallback(lambda stream_ordering: "s%d" % (stream_ordering,)) + def _get_max_topological_txn(self, txn): txn.execute( "SELECT MAX(topological_ordering) FROM events" -- cgit 1.4.1 From 3c166a24c591afdc851de3c6c754c90471b1b0a9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 9 Sep 2015 16:05:09 +0100 Subject: Remove undocumented and unimplemented 'feedback' parameter from the Client-Server API --- synapse/api/constants.py | 11 ----------- synapse/handlers/message.py | 21 +++------------------ synapse/handlers/room.py | 1 - synapse/rest/client/v1/initial_sync.py | 2 -- synapse/rest/client/v1/room.py | 2 -- synapse/storage/stream.py | 10 ++-------- 6 files changed, 5 insertions(+), 42 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 1423986c1e..3385664394 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -27,16 +27,6 @@ class Membership(object): LIST = (INVITE, JOIN, KNOCK, LEAVE, BAN) -class Feedback(object): - - """Represents the types of feedback a user can send in response to a - message.""" - - DELIVERED = u"delivered" - READ = u"read" - LIST = (DELIVERED, READ) - - class PresenceState(object): """Represents the presence state of a user.""" OFFLINE = u"offline" @@ -73,7 +63,6 @@ class EventTypes(object): PowerLevels = "m.room.power_levels" Aliases = "m.room.aliases" Redaction = "m.room.redaction" - Feedback = "m.room.message.feedback" RoomHistoryVisibility = "m.room.history_visibility" CanonicalAlias = "m.room.canonical_alias" diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 171e9d72ac..72ebac047f 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -71,7 +71,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_messages(self, user_id=None, room_id=None, pagin_config=None, - feedback=False, as_client_event=True): + as_client_event=True): """Get messages in a room. Args: @@ -79,7 +79,6 @@ class MessageHandler(BaseHandler): room_id (str): The room they want messages from. pagin_config (synapse.api.streams.PaginationConfig): The pagination config rules to apply, if any. - feedback (bool): True to get compressed feedback with the messages as_client_event (bool): True to get events in client-server format. Returns: dict: Pagination API results @@ -264,17 +263,6 @@ class MessageHandler(BaseHandler): ) defer.returnValue(data) - @defer.inlineCallbacks - def get_feedback(self, event_id): - # yield self.auth.check_joined_room(room_id, user_id) - - # Pull out the feedback from the db - fb = yield self.store.get_feedback(event_id) - - if fb: - defer.returnValue(fb) - defer.returnValue(None) - @defer.inlineCallbacks def get_state_events(self, user_id, room_id): """Retrieve all state events for a given room. If the user is @@ -303,8 +291,7 @@ class MessageHandler(BaseHandler): ) @defer.inlineCallbacks - def snapshot_all_rooms(self, user_id=None, pagin_config=None, - feedback=False, as_client_event=True): + def snapshot_all_rooms(self, user_id=None, pagin_config=None, as_client_event=True): """Retrieve a snapshot of all rooms the user is invited or has joined. This snapshot may include messages for all rooms where the user is @@ -314,7 +301,6 @@ class MessageHandler(BaseHandler): user_id (str): The ID of the user making the request. pagin_config (synapse.api.streams.PaginationConfig): The pagination config used to determine how many messages *PER ROOM* to return. - feedback (bool): True to get feedback along with these messages. as_client_event (bool): True to get events in client-server format. Returns: A list of dicts with "room_id" and "membership" keys for all rooms @@ -439,8 +425,7 @@ class MessageHandler(BaseHandler): defer.returnValue(ret) @defer.inlineCallbacks - def room_initial_sync(self, user_id, room_id, pagin_config=None, - feedback=False): + def room_initial_sync(self, user_id, room_id, pagin_config=None): """Capture the a snapshot of a room. If user is currently a member of the room this will be what is currently in the room. If the user left the room this will be what was in the room when they left. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 0ff816d53e..243623190f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -610,7 +610,6 @@ class RoomEventSource(object): to_key=config.to_key, direction=config.direction, limit=config.limit, - with_feedback=True ) defer.returnValue((events, next_key)) diff --git a/synapse/rest/client/v1/initial_sync.py b/synapse/rest/client/v1/initial_sync.py index 4ea4da653c..bac68cc29f 100644 --- a/synapse/rest/client/v1/initial_sync.py +++ b/synapse/rest/client/v1/initial_sync.py @@ -26,14 +26,12 @@ class InitialSyncRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request): user, _ = yield self.auth.get_user_by_req(request) - with_feedback = "feedback" in request.args as_client_event = "raw" not in request.args pagination_config = PaginationConfig.from_request(request) handler = self.handlers.message_handler content = yield handler.snapshot_all_rooms( user_id=user.to_string(), pagin_config=pagination_config, - feedback=with_feedback, as_client_event=as_client_event ) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index f4558b95a7..23871f161e 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -329,14 +329,12 @@ class RoomMessageListRestServlet(ClientV1RestServlet): pagination_config = PaginationConfig.from_request( request, default_limit=10, ) - with_feedback = "feedback" in request.args as_client_event = "raw" not in request.args handler = self.handlers.message_handler msgs = yield handler.get_messages( room_id=room_id, user_id=user.to_string(), pagin_config=pagination_config, - feedback=with_feedback, as_client_event=as_client_event ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 0abfa86cd2..5763c462af 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -159,9 +159,7 @@ class StreamStore(SQLBaseStore): @log_function def get_room_events_stream(self, user_id, from_key, to_key, room_id, - limit=0, with_feedback=False): - # TODO (erikj): Handle compressed feedback - + limit=0): current_room_membership_sql = ( "SELECT m.room_id FROM room_memberships as m " " INNER JOIN current_state_events as c" @@ -227,10 +225,7 @@ class StreamStore(SQLBaseStore): @defer.inlineCallbacks def paginate_room_events(self, room_id, from_key, to_key=None, - direction='b', limit=-1, - with_feedback=False): - # TODO (erikj): Handle compressed feedback - + direction='b', limit=-1): # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. @@ -302,7 +297,6 @@ class StreamStore(SQLBaseStore): @cachedInlineCallbacks(num_args=4) def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): - # TODO (erikj): Handle compressed feedback end_token = RoomStreamToken.parse_stream_token(end_token) -- cgit 1.4.1 From 09cb5c7d33c32e2cbf5a5b6f6f0e2780338491d2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 9 Sep 2015 17:31:09 +0100 Subject: Allow users that have left a room to get the messages that happend in the room before they left --- synapse/handlers/message.py | 31 +++++++++++++++++++++++++++---- synapse/storage/stream.py | 19 ++++++++++++++++++- 2 files changed, 45 insertions(+), 5 deletions(-) (limited to 'synapse/storage/stream.py') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 72ebac047f..db89491b46 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -83,21 +83,44 @@ class MessageHandler(BaseHandler): Returns: dict: Pagination API results """ - yield self.auth.check_joined_room(room_id, user_id) + member_event = yield self.auth.check_user_was_in_room(room_id, user_id) data_source = self.hs.get_event_sources().sources["room"] - if not pagin_config.from_token: + if pagin_config.from_token: + room_token = pagin_config.from_token.room_key + else: pagin_config.from_token = ( yield self.hs.get_event_sources().get_current_token( direction='b' ) ) + room_token = pagin_config.from_token.room_key - room_token = RoomStreamToken.parse(pagin_config.from_token.room_key) + room_token = RoomStreamToken.parse(room_token) if room_token.topological is None: raise SynapseError(400, "Invalid token") + pagin_config.from_token = pagin_config.from_token.copy_and_replace( + "room_key", str(room_token) + ) + + source_config = pagin_config.get_source_config("room") + + if member_event.membership == Membership.LEAVE: + # If they have left the room then clamp the token to be before + # they left the room + leave_token = yield self.store.get_topological_token_for_event( + member_event.event_id + ) + leave_token = RoomStreamToken.parse(leave_token) + if leave_token.topological < room_token.topological: + source_config.from_key = str(leave_token) + + if source_config.direction == "f": + if source_config.to_key is None: + source_config.to_key = str(leave_token) + yield self.hs.get_handlers().federation_handler.maybe_backfill( room_id, room_token.topological ) @@ -105,7 +128,7 @@ class MessageHandler(BaseHandler): user = UserID.from_string(user_id) events, next_key = yield data_source.get_pagination_rows( - user, pagin_config.get_source_config("room"), room_id + user, source_config, room_id ) next_token = pagin_config.from_token.copy_and_replace( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 5763c462af..3cab06fdef 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -386,7 +386,24 @@ class StreamStore(SQLBaseStore): table="events", keyvalues={"event_id": event_id}, retcol="stream_ordering", - ).addCallback(lambda stream_ordering: "s%d" % (stream_ordering,)) + ).addCallback(lambda row: "s%d" % (row,)) + + def get_topological_token_for_event(self, event_id): + """The stream token for an event + Args: + event_id(str): The id of the event to look up a stream token for. + Raises: + StoreError if the event wasn't in the database. + Returns: + A deferred "t%d-%d" topological token. + """ + return self._simple_select_one( + table="events", + keyvalues={"event_id": event_id}, + retcols=("stream_ordering", "topological_ordering"), + ).addCallback(lambda row: "t%d-%d" % ( + row["topological_ordering"], row["stream_ordering"],) + ) def _get_max_topological_txn(self, txn): txn.execute( -- cgit 1.4.1