summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/read_marker.py87
-rw-r--r--synapse/rest/__init__.py2
-rw-r--r--synapse/rest/client/v2_alpha/account_data.py8
-rw-r--r--synapse/rest/client/v2_alpha/read_marker.py13
-rw-r--r--synapse/server.py5
5 files changed, 67 insertions, 48 deletions
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index 77164f3f49..021faff376 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -18,65 +18,74 @@ from ._base import BaseHandler
 from twisted.internet import defer
 
 from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.async import Linearizer
 from synapse.types import get_domain_from_id
+from synapse.api.errors import SynapseError
 
 import logging
-
-
 logger = logging.getLogger(__name__)
 
-
 class ReadMarkerHandler(BaseHandler):
     def __init__(self, hs):
         super(ReadMarkerHandler, self).__init__(hs)
-
         self.server_name = hs.config.server_name
         self.store = hs.get_datastore()
+        self.read_marker_linearizer = Linearizer(name="read_marker")
+        self.notifier = hs.get_notifier()
 
     @defer.inlineCallbacks
     def received_client_read_marker(self, room_id, user_id, event_id):
-        """NEEDS DOC
-        """
+        """Updates the read marker for a given user in a given room if the event ID given
+        is ahead in the stream relative to the current read marker.
 
-        room_id = read_marker["room_id"]
-        user_id = read_marker["user_id"]
-        event_id = read_marker["event_id"]
+        This uses a notifier to indicate that account data should be sent down /sync if
+        the read marker has changed.
+        """
 
         # Get ordering for existing read marker
-        account_data = yield self.store.get_account_data_for_room(user_id, room_id)
-        existing_read_marker = account_data["m.read_marker"]
+        with (yield self.read_marker_linearizer.queue(room_id + "_" + user_id)):
+            account_data = yield self.store.get_account_data_for_room(user_id, room_id)
+            existing_read_marker = account_data["m.read_marker"]
 
-        if existing_read_marker:
-            # Get ordering for new read marker
-            res = self.store._simple_select_one_txn(
-                txn,
-                table="events",
-                retcols=["topological_ordering", "stream_ordering"],
-                keyvalues={"event_id": event_id},
-                allow_none=True
-            )
-            new_to = int(res["topological_ordering"]) if res else None
-            new_so = int(res["stream_ordering"]) if res else None
+            should_update = True
 
-            res = self.store._simple_select_one_txn(
-                txn,
+            res = yield self.store._simple_select_one(
                 table="events",
                 retcols=["topological_ordering", "stream_ordering"],
-                keyvalues={"event_id": existing_read_marker.content.marker},
+                keyvalues={"event_id": event_id},
                 allow_none=True
             )
-            existing_to = int(res["topological_ordering"]) if res else None
-            existing_so = int(res["stream_ordering"]) if res else None
-
-            if new_to > existing_to:
-                return False
-            elif new_to == existing_to and new_so >= existing_so:
-                return False
 
-        # Update account data
-        content = {
-            "marker": event_id
-        }
-        yield self.store.add_account_data_to_room(
-            user_id, room_id, "m.read_marker", content
-        )
+            if not res:
+                raise SynapseError(404, 'Event does not exist')
+
+            if existing_read_marker:
+                new_to = int(res["topological_ordering"])
+                new_so = int(res["stream_ordering"])
+
+                # Get ordering for existing read marker
+                res = yield self.store._simple_select_one(
+                    table="events",
+                    retcols=["topological_ordering", "stream_ordering"],
+                    keyvalues={"event_id": existing_read_marker['marker']},
+                    allow_none=True
+                )
+                existing_to = int(res["topological_ordering"]) if res else None
+                existing_so = int(res["stream_ordering"]) if res else None
+
+                # Prevent updating if the existing marker is ahead in the stream
+                if existing_to > new_to:
+                    should_update = False
+                elif existing_to == new_to and existing_so >= new_so:
+                    should_update = False
+
+            if should_update:
+                content = {
+                    "marker": event_id
+                }
+                max_id = yield self.store.add_account_data_to_room(
+                    user_id, room_id, "m.read_marker", content
+                )
+                self.notifier.on_new_event(
+                    "account_data_key", max_id, users=[user_id], rooms=[room_id]
+                )
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index f9f5a3e077..aa8d874f96 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -40,6 +40,7 @@ from synapse.rest.client.v2_alpha import (
     register,
     auth,
     receipts,
+    read_marker,
     keys,
     tokenrefresh,
     tags,
@@ -88,6 +89,7 @@ class ClientRestResource(JsonResource):
         register.register_servlets(hs, client_resource)
         auth.register_servlets(hs, client_resource)
         receipts.register_servlets(hs, client_resource)
+        read_marker.register_servlets(hs, client_resource)
         keys.register_servlets(hs, client_resource)
         tokenrefresh.register_servlets(hs, client_resource)
         tags.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v2_alpha/account_data.py b/synapse/rest/client/v2_alpha/account_data.py
index b16079cece..a846ab1dc1 100644
--- a/synapse/rest/client/v2_alpha/account_data.py
+++ b/synapse/rest/client/v2_alpha/account_data.py
@@ -16,7 +16,7 @@
 from ._base import client_v2_patterns
 
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
-from synapse.api.errors import AuthError
+from synapse.api.errors import AuthError, SynapseError
 
 from twisted.internet import defer
 
@@ -82,6 +82,12 @@ class RoomAccountDataServlet(RestServlet):
 
         body = parse_json_object_from_request(request)
 
+        if account_data_type == "m.read_marker":
+            raise SynapseError(405,
+                "Cannot set m.read_marker through this API. "
+                "Use /rooms/!roomId:server.name/read_marker"
+            )
+
         max_id = yield self.store.add_account_data_to_room(
             user_id, room_id, account_data_type, body
         )
diff --git a/synapse/rest/client/v2_alpha/read_marker.py b/synapse/rest/client/v2_alpha/read_marker.py
index 02408eaf10..49ada9c047 100644
--- a/synapse/rest/client/v2_alpha/read_marker.py
+++ b/synapse/rest/client/v2_alpha/read_marker.py
@@ -25,14 +25,11 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class ReceiptRestServlet(RestServlet):
-    PATTERNS = client_v2_patterns(
-        "/rooms/(?P<room_id>[^/]*)"
-        "/read_marker$"
-    )
+class ReadMarkerRestServlet(RestServlet):
+    PATTERNS = client_v2_patterns("/rooms/(?P<room_id>[^/]*)/read_marker$")
 
     def __init__(self, hs):
-        super(ReceiptRestServlet, self).__init__()
+        super(ReadMarkerRestServlet, self).__init__()
         self.hs = hs
         self.auth = hs.get_auth()
         self.receipts_handler = hs.get_receipts_handler()
@@ -40,7 +37,7 @@ class ReceiptRestServlet(RestServlet):
         self.presence_handler = hs.get_presence_handler()
 
     @defer.inlineCallbacks
-    def on_POST(self, request, room_id, receipt_type, event_id):
+    def on_POST(self, request, room_id):
         requester = yield self.auth.get_user_by_req(request)
 
         yield self.presence_handler.bump_presence_active_time(requester.user)
@@ -68,4 +65,4 @@ class ReceiptRestServlet(RestServlet):
 
 
 def register_servlets(hs, http_server):
-    ReceiptRestServlet(hs).register(http_server)
+    ReadMarkerRestServlet(hs).register(http_server)
diff --git a/synapse/server.py b/synapse/server.py
index 6310152560..12754c89ae 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -48,6 +48,7 @@ from synapse.handlers.typing import TypingHandler
 from synapse.handlers.events import EventHandler, EventStreamHandler
 from synapse.handlers.initial_sync import InitialSyncHandler
 from synapse.handlers.receipts import ReceiptsHandler
+from synapse.handlers.read_marker import ReadMarkerHandler
 from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.notifier import Notifier
@@ -133,6 +134,7 @@ class HomeServer(object):
         'receipts_handler',
         'macaroon_generator',
         'tcp_replication',
+        'read_marker_handler',
     ]
 
     def __init__(self, hostname, **kwargs):
@@ -291,6 +293,9 @@ class HomeServer(object):
     def build_receipts_handler(self):
         return ReceiptsHandler(self)
 
+    def build_read_marker_handler(self):
+        return ReadMarkerHandler(self)
+
     def build_tcp_replication(self):
         raise NotImplementedError()