From e263c26690685b713ab8a5f4fdb7b9e92a7b8d99 Mon Sep 17 00:00:00 2001 From: lukebarnard Date: Tue, 11 Apr 2017 11:55:30 +0100 Subject: Initial commit of RM server-side impl (See https://docs.google.com/document/d/1UWqdS-e1sdwkLDUY0wA4gZyIkRp-ekjsLZ8k6g_Zvso/edit#heading=h.lndohpg8at5u) --- synapse/handlers/read_marker.py | 82 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 synapse/handlers/read_marker.py (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py new file mode 100644 index 0000000000..77164f3f49 --- /dev/null +++ b/synapse/handlers/read_marker.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseHandler + +from twisted.internet import defer + +from synapse.util.logcontext import PreserveLoggingContext +from synapse.types import get_domain_from_id + +import logging + + +logger = logging.getLogger(__name__) + + +class ReadMarkerHandler(BaseHandler): + def __init__(self, hs): + super(ReadMarkerHandler, self).__init__(hs) + + self.server_name = hs.config.server_name + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def received_client_read_marker(self, room_id, user_id, event_id): + """NEEDS DOC + """ + + room_id = read_marker["room_id"] + user_id = read_marker["user_id"] + event_id = read_marker["event_id"] + + # Get ordering for existing read marker + account_data = yield self.store.get_account_data_for_room(user_id, room_id) + existing_read_marker = account_data["m.read_marker"] + + if existing_read_marker: + # Get ordering for new read marker + res = self.store._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + allow_none=True + ) + new_to = int(res["topological_ordering"]) if res else None + new_so = int(res["stream_ordering"]) if res else None + + res = self.store._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": existing_read_marker.content.marker}, + allow_none=True + ) + existing_to = int(res["topological_ordering"]) if res else None + existing_so = int(res["stream_ordering"]) if res else None + + if new_to > existing_to: + return False + elif new_to == existing_to and new_so >= existing_so: + return False + + # Update account data + content = { + "marker": event_id + } + yield self.store.add_account_data_to_room( + user_id, room_id, "m.read_marker", content + ) -- cgit 1.4.1 From d892079844eec9dd7855ee66f3ad3225df4bdbc0 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Tue, 11 Apr 2017 15:01:39 +0100 Subject: Finish implementing RM endpoint - This change causes a 405 to be sent if "m.read_marker" is set via /account_data - This also fixes-up the RM endpoint so that it actually Works. --- synapse/handlers/read_marker.py | 87 +++++++++++++++------------- synapse/rest/__init__.py | 2 + synapse/rest/client/v2_alpha/account_data.py | 8 ++- synapse/rest/client/v2_alpha/read_marker.py | 13 ++--- synapse/server.py | 5 ++ 5 files changed, 67 insertions(+), 48 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 77164f3f49..021faff376 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -18,65 +18,74 @@ from ._base import BaseHandler from twisted.internet import defer from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.async import Linearizer from synapse.types import get_domain_from_id +from synapse.api.errors import SynapseError import logging - - logger = logging.getLogger(__name__) - class ReadMarkerHandler(BaseHandler): def __init__(self, hs): super(ReadMarkerHandler, self).__init__(hs) - self.server_name = hs.config.server_name self.store = hs.get_datastore() + self.read_marker_linearizer = Linearizer(name="read_marker") + self.notifier = hs.get_notifier() @defer.inlineCallbacks def received_client_read_marker(self, room_id, user_id, event_id): - """NEEDS DOC - """ + """Updates the read marker for a given user in a given room if the event ID given + is ahead in the stream relative to the current read marker. - room_id = read_marker["room_id"] - user_id = read_marker["user_id"] - event_id = read_marker["event_id"] + This uses a notifier to indicate that account data should be sent down /sync if + the read marker has changed. + """ # Get ordering for existing read marker - account_data = yield self.store.get_account_data_for_room(user_id, room_id) - existing_read_marker = account_data["m.read_marker"] + with (yield self.read_marker_linearizer.queue(room_id + "_" + user_id)): + account_data = yield self.store.get_account_data_for_room(user_id, room_id) + existing_read_marker = account_data["m.read_marker"] - if existing_read_marker: - # Get ordering for new read marker - res = self.store._simple_select_one_txn( - txn, - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": event_id}, - allow_none=True - ) - new_to = int(res["topological_ordering"]) if res else None - new_so = int(res["stream_ordering"]) if res else None + should_update = True - res = self.store._simple_select_one_txn( - txn, + res = yield self.store._simple_select_one( table="events", retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": existing_read_marker.content.marker}, + keyvalues={"event_id": event_id}, allow_none=True ) - existing_to = int(res["topological_ordering"]) if res else None - existing_so = int(res["stream_ordering"]) if res else None - - if new_to > existing_to: - return False - elif new_to == existing_to and new_so >= existing_so: - return False - # Update account data - content = { - "marker": event_id - } - yield self.store.add_account_data_to_room( - user_id, room_id, "m.read_marker", content - ) + if not res: + raise SynapseError(404, 'Event does not exist') + + if existing_read_marker: + new_to = int(res["topological_ordering"]) + new_so = int(res["stream_ordering"]) + + # Get ordering for existing read marker + res = yield self.store._simple_select_one( + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": existing_read_marker['marker']}, + allow_none=True + ) + existing_to = int(res["topological_ordering"]) if res else None + existing_so = int(res["stream_ordering"]) if res else None + + # Prevent updating if the existing marker is ahead in the stream + if existing_to > new_to: + should_update = False + elif existing_to == new_to and existing_so >= new_so: + should_update = False + + if should_update: + content = { + "marker": event_id + } + max_id = yield self.store.add_account_data_to_room( + user_id, room_id, "m.read_marker", content + ) + self.notifier.on_new_event( + "account_data_key", max_id, users=[user_id], rooms=[room_id] + ) diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index f9f5a3e077..aa8d874f96 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -40,6 +40,7 @@ from synapse.rest.client.v2_alpha import ( register, auth, receipts, + read_marker, keys, tokenrefresh, tags, @@ -88,6 +89,7 @@ class ClientRestResource(JsonResource): register.register_servlets(hs, client_resource) auth.register_servlets(hs, client_resource) receipts.register_servlets(hs, client_resource) + read_marker.register_servlets(hs, client_resource) keys.register_servlets(hs, client_resource) tokenrefresh.register_servlets(hs, client_resource) tags.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/account_data.py b/synapse/rest/client/v2_alpha/account_data.py index b16079cece..a846ab1dc1 100644 --- a/synapse/rest/client/v2_alpha/account_data.py +++ b/synapse/rest/client/v2_alpha/account_data.py @@ -16,7 +16,7 @@ from ._base import client_v2_patterns from synapse.http.servlet import RestServlet, parse_json_object_from_request -from synapse.api.errors import AuthError +from synapse.api.errors import AuthError, SynapseError from twisted.internet import defer @@ -82,6 +82,12 @@ class RoomAccountDataServlet(RestServlet): body = parse_json_object_from_request(request) + if account_data_type == "m.read_marker": + raise SynapseError(405, + "Cannot set m.read_marker through this API. " + "Use /rooms/!roomId:server.name/read_marker" + ) + max_id = yield self.store.add_account_data_to_room( user_id, room_id, account_data_type, body ) diff --git a/synapse/rest/client/v2_alpha/read_marker.py b/synapse/rest/client/v2_alpha/read_marker.py index 02408eaf10..49ada9c047 100644 --- a/synapse/rest/client/v2_alpha/read_marker.py +++ b/synapse/rest/client/v2_alpha/read_marker.py @@ -25,14 +25,11 @@ import logging logger = logging.getLogger(__name__) -class ReceiptRestServlet(RestServlet): - PATTERNS = client_v2_patterns( - "/rooms/(?P[^/]*)" - "/read_marker$" - ) +class ReadMarkerRestServlet(RestServlet): + PATTERNS = client_v2_patterns("/rooms/(?P[^/]*)/read_marker$") def __init__(self, hs): - super(ReceiptRestServlet, self).__init__() + super(ReadMarkerRestServlet, self).__init__() self.hs = hs self.auth = hs.get_auth() self.receipts_handler = hs.get_receipts_handler() @@ -40,7 +37,7 @@ class ReceiptRestServlet(RestServlet): self.presence_handler = hs.get_presence_handler() @defer.inlineCallbacks - def on_POST(self, request, room_id, receipt_type, event_id): + def on_POST(self, request, room_id): requester = yield self.auth.get_user_by_req(request) yield self.presence_handler.bump_presence_active_time(requester.user) @@ -68,4 +65,4 @@ class ReceiptRestServlet(RestServlet): def register_servlets(hs, http_server): - ReceiptRestServlet(hs).register(http_server) + ReadMarkerRestServlet(hs).register(http_server) diff --git a/synapse/server.py b/synapse/server.py index 6310152560..12754c89ae 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -48,6 +48,7 @@ from synapse.handlers.typing import TypingHandler from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.receipts import ReceiptsHandler +from synapse.handlers.read_marker import ReadMarkerHandler from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.notifier import Notifier @@ -133,6 +134,7 @@ class HomeServer(object): 'receipts_handler', 'macaroon_generator', 'tcp_replication', + 'read_marker_handler', ] def __init__(self, hostname, **kwargs): @@ -291,6 +293,9 @@ class HomeServer(object): def build_receipts_handler(self): return ReceiptsHandler(self) + def build_read_marker_handler(self): + return ReadMarkerHandler(self) + def build_tcp_replication(self): raise NotImplementedError() -- cgit 1.4.1 From 012742302748a231593222beffc5ceff61966ffc Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Tue, 11 Apr 2017 17:07:07 +0100 Subject: flake8 --- synapse/handlers/read_marker.py | 3 +-- synapse/rest/client/v2_alpha/account_data.py | 3 ++- synapse/rest/client/v2_alpha/read_marker.py | 3 +-- 3 files changed, 4 insertions(+), 5 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 021faff376..a97f6b9a79 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -17,14 +17,13 @@ from ._base import BaseHandler from twisted.internet import defer -from synapse.util.logcontext import PreserveLoggingContext from synapse.util.async import Linearizer -from synapse.types import get_domain_from_id from synapse.api.errors import SynapseError import logging logger = logging.getLogger(__name__) + class ReadMarkerHandler(BaseHandler): def __init__(self, hs): super(ReadMarkerHandler, self).__init__(hs) diff --git a/synapse/rest/client/v2_alpha/account_data.py b/synapse/rest/client/v2_alpha/account_data.py index a846ab1dc1..56cd347e78 100644 --- a/synapse/rest/client/v2_alpha/account_data.py +++ b/synapse/rest/client/v2_alpha/account_data.py @@ -83,7 +83,8 @@ class RoomAccountDataServlet(RestServlet): body = parse_json_object_from_request(request) if account_data_type == "m.read_marker": - raise SynapseError(405, + raise SynapseError( + 405, "Cannot set m.read_marker through this API. " "Use /rooms/!roomId:server.name/read_marker" ) diff --git a/synapse/rest/client/v2_alpha/read_marker.py b/synapse/rest/client/v2_alpha/read_marker.py index 49ada9c047..a568a9252a 100644 --- a/synapse/rest/client/v2_alpha/read_marker.py +++ b/synapse/rest/client/v2_alpha/read_marker.py @@ -15,7 +15,6 @@ from twisted.internet import defer -from synapse.api.errors import SynapseError from synapse.http.servlet import RestServlet, parse_json_object_from_request from ._base import client_v2_patterns @@ -45,7 +44,7 @@ class ReadMarkerRestServlet(RestServlet): body = parse_json_object_from_request(request) if "m.read" in body: - read_event_id = body["m.read"]; + read_event_id = body["m.read"] yield self.receipts_handler.received_client_receipt( room_id, "m.read", -- cgit 1.4.1 From 131485ef665b4dc714a9624711c18bb67f264aca Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Tue, 11 Apr 2017 17:33:51 +0100 Subject: Copyright --- synapse/handlers/read_marker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index a97f6b9a79..19decc2c63 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. -- cgit 1.4.1 From 73880268ef7184d17ec369074d1d0d72de56f33c Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Tue, 11 Apr 2017 17:34:09 +0100 Subject: Refactor event ordering check to events store --- synapse/handlers/read_marker.py | 32 ++++---------------------------- synapse/storage/events.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 28 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 19decc2c63..c3882313e6 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -42,41 +42,17 @@ class ReadMarkerHandler(BaseHandler): """ # Get ordering for existing read marker - with (yield self.read_marker_linearizer.queue(room_id + "_" + user_id)): + with (yield self.read_marker_linearizer.queue((room_id, user_id))): account_data = yield self.store.get_account_data_for_room(user_id, room_id) existing_read_marker = account_data["m.read_marker"] should_update = True - res = yield self.store._simple_select_one( - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": event_id}, - allow_none=True - ) - - if not res: - raise SynapseError(404, 'Event does not exist') - if existing_read_marker: - new_to = int(res["topological_ordering"]) - new_so = int(res["stream_ordering"]) - - # Get ordering for existing read marker - res = yield self.store._simple_select_one( - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": existing_read_marker['marker']}, - allow_none=True + should_update = yield self.store.is_event_after( + existing_read_marker['marker'], + event_id ) - existing_to = int(res["topological_ordering"]) if res else None - existing_so = int(res["stream_ordering"]) if res else None - - # Prevent updating if the existing marker is ahead in the stream - if existing_to > new_to: - should_update = False - elif existing_to == new_to and existing_so >= new_so: - should_update = False if should_update: content = { diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 64fe937bdc..3c6df5c2d2 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2159,6 +2159,34 @@ class EventsStore(SQLBaseStore): ] ) + @defer.inlineCallbacks + def is_event_after(self, event_id1, event_id2): + is_after = True + + to_1, so_1 = yield self._get_event_ordering(event_id1) + to_2, so_2 = yield self._get_event_ordering(event_id2) + + # Prevent updating if the existing marker is ahead in the stream + if to_1 > to_2: + is_after = False + elif to_1 == to_2 and so_1 >= so_2: + is_after = False + + defer.returnValue(is_after) + + @defer.inlineCallbacks + def _get_event_ordering(self, event_id): + res = yield self._simple_select_one( + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + allow_none=True + ) + + if not res: + raise SynapseError(404, "Could not find event %s" % (event_id,)) + + defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"]))) AllNewEventsResult = namedtuple("AllNewEventsResult", [ "new_forward_events", "new_backfill_events", -- cgit 1.4.1 From 867822fa1e588da67508bb6a094c133c48e5d129 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Tue, 11 Apr 2017 17:36:04 +0100 Subject: flake8 --- synapse/handlers/read_marker.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index c3882313e6..ee8cfc1c7a 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -18,7 +18,6 @@ from ._base import BaseHandler from twisted.internet import defer from synapse.util.async import Linearizer -from synapse.api.errors import SynapseError import logging logger = logging.getLogger(__name__) -- cgit 1.4.1 From 77fb2b72aeccfa94a47b9088c885ed103edbfc60 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Wed, 12 Apr 2017 09:47:29 +0100 Subject: Handle no previous RM --- synapse/handlers/read_marker.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index ee8cfc1c7a..43489b88bf 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -43,7 +43,10 @@ class ReadMarkerHandler(BaseHandler): # Get ordering for existing read marker with (yield self.read_marker_linearizer.queue((room_id, user_id))): account_data = yield self.store.get_account_data_for_room(user_id, room_id) - existing_read_marker = account_data["m.read_marker"] + + existing_read_marker = None + if "m.read_marker" in account_data: + existing_read_marker = account_data["m.read_marker"] should_update = True -- cgit 1.4.1 From 122cd52ce451a2c182d23bb1a1a3a33ccaf307fc Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Wed, 12 Apr 2017 10:48:32 +0100 Subject: Remove comment, simplify null-guard --- synapse/handlers/read_marker.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 43489b88bf..7f10dd8b45 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -40,13 +40,10 @@ class ReadMarkerHandler(BaseHandler): the read marker has changed. """ - # Get ordering for existing read marker with (yield self.read_marker_linearizer.queue((room_id, user_id))): account_data = yield self.store.get_account_data_for_room(user_id, room_id) - existing_read_marker = None - if "m.read_marker" in account_data: - existing_read_marker = account_data["m.read_marker"] + existing_read_marker = account_data.get("m.read_marker", None) should_update = True -- cgit 1.4.1 From 69a18514e94a8fae928e80bae17c701186686d12 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Wed, 12 Apr 2017 10:50:37 +0100 Subject: Only notify user, not entire room --- synapse/handlers/read_marker.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 7f10dd8b45..800240b8a9 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -60,6 +60,4 @@ class ReadMarkerHandler(BaseHandler): max_id = yield self.store.add_account_data_to_room( user_id, room_id, "m.read_marker", content ) - self.notifier.on_new_event( - "account_data_key", max_id, users=[user_id], rooms=[room_id] - ) + self.notifier.on_new_event("account_data_key", max_id, users=[user_id]) -- cgit 1.4.1 From b9557064bf6003a666b8fb6813dd3618fe9e48b4 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Wed, 12 Apr 2017 14:36:20 +0100 Subject: Simplify is_event_after logic --- synapse/handlers/read_marker.py | 5 +++-- synapse/storage/events.py | 13 +++---------- 2 files changed, 6 insertions(+), 12 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index 800240b8a9..3f46a16b90 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -48,9 +48,10 @@ class ReadMarkerHandler(BaseHandler): should_update = True if existing_read_marker: + # Only update if the new marker is ahead in the stream should_update = yield self.store.is_event_after( - existing_read_marker['marker'], - event_id + event_id, + existing_read_marker['marker'] ) if should_update: diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 702bd64b2e..221cb563d8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2161,18 +2161,11 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def is_event_after(self, event_id1, event_id2): - is_after = True - + """Returns True if event_id1 is after event_id2 in the stream + """ to_1, so_1 = yield self._get_event_ordering(event_id1) to_2, so_2 = yield self._get_event_ordering(event_id2) - - # Prevent updating if the existing marker is ahead in the stream - if to_1 > to_2: - is_after = False - elif to_1 == to_2 and so_1 >= so_2: - is_after = False - - defer.returnValue(is_after) + defer.returnValue(to_1 > to_2 and so_1 > so_2) @defer.inlineCallbacks def _get_event_ordering(self, event_id): -- cgit 1.4.1