summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-07-07 15:25:30 +0100
committerErik Johnston <erik@matrix.org>2015-07-07 15:25:30 +0100
commitca041d55267740214a2cfab95c44ee6f70cc6d0d (patch)
treeeffa50d112ee4482eff5bee06e7c97f53042508f
parentFix various typos (diff)
downloadsynapse-ca041d55267740214a2cfab95c44ee6f70cc6d0d.tar.xz
Wire together receipts and the notifer/federation
-rw-r--r--synapse/handlers/receipts.py81
-rw-r--r--synapse/rest/client/v2_alpha/receipts.py3
-rw-r--r--synapse/storage/receipts.py69
-rw-r--r--synapse/streams/events.py6
4 files changed, 126 insertions, 33 deletions
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index fc2f38c1c0..94f0810057 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -37,7 +37,8 @@ class ReceiptsHandler(BaseHandler):
             "m.receipt", self._received_remote_receipt
         )
 
-        self._latest_serial = 0
+        # self._earliest_cached_serial = 0
+        # self._rooms_to_latest_serial = {}
 
     @defer.inlineCallbacks
     def received_client_receipt(self, room_id, receipt_type, user_id,
@@ -53,8 +54,10 @@ class ReceiptsHandler(BaseHandler):
             "event_ids": [event_id],
         }
 
-        yield self._handle_new_receipts([receipt])
-        self._push_remotes([receipt])
+        is_new = yield self._handle_new_receipts([receipt])
+
+        if is_new:
+            self._push_remotes([receipt])
 
     @defer.inlineCallbacks
     def _received_remote_receipt(self, origin, content):
@@ -81,33 +84,24 @@ class ReceiptsHandler(BaseHandler):
             user_id = receipt["user_id"]
             event_ids = receipt["event_ids"]
 
-            stream_id, max_persisted_id = yield self.store.insert_receipt(
+            res = yield self.store.insert_receipt(
                 room_id, receipt_type, user_id, event_ids,
             )
 
-            # TODO: Use max_persisted_id
+            if not res:
+                # res will be None if this read receipt is 'old'
+                defer.returnValue(False)
 
-            self._latest_serial = max(self._latest_serial, stream_id)
+            stream_id, max_persisted_id = res
 
             with PreserveLoggingContext():
                 self.notifier.on_new_event(
-                    "receipt_key", self._latest_serial, rooms=[room_id]
+                    "receipt_key", max_persisted_id, rooms=[room_id]
                 )
 
-            localusers = set()
-            remotedomains = set()
-
-            rm_handler = self.hs.get_handlers().room_member_handler
-            yield rm_handler.fetch_room_distributions_into(
-                room_id, localusers=localusers, remotedomains=remotedomains
-            )
-
-            receipt["remotedomains"] = remotedomains
-
-            self.notifier.on_new_event(
-                "receipt_key", self._latest_room_serial, rooms=[room_id]
-            )
+            defer.returnValue(True)
 
+    @defer.inlineCallbacks
     def _push_remotes(self, receipts):
         # TODO: Some of this stuff should be coallesced.
         for receipt in receipts:
@@ -115,7 +109,15 @@ class ReceiptsHandler(BaseHandler):
             receipt_type = receipt["receipt_type"]
             user_id = receipt["user_id"]
             event_ids = receipt["event_ids"]
-            remotedomains = receipt["remotedomains"]
+
+            remotedomains = set()
+
+            rm_handler = self.hs.get_handlers().room_member_handler
+            yield rm_handler.fetch_room_distributions_into(
+                room_id, localusers=None, remotedomains=remotedomains
+            )
+
+            logger.debug("Sending receipt to: %r", remotedomains)
 
             for domain in remotedomains:
                 self.federation.send_edu(
@@ -130,3 +132,40 @@ class ReceiptsHandler(BaseHandler):
                         },
                     },
                 )
+
+
+class ReceiptEventSource(object):
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def get_new_events_for_user(self, user, from_key, limit):
+        from_key = int(from_key)
+        to_key = yield self.get_current_key()
+
+        rooms = yield self.store.get_rooms_for_user(user.to_string())
+        rooms = [room.room_id for room in rooms]
+        content = {}
+        for room_id in rooms:
+            result = yield self.store.get_linearized_receipts_for_room(
+                room_id, from_key, to_key
+            )
+            if result:
+                content[room_id] = result
+
+        if not content:
+            defer.returnValue(([], to_key))
+
+        event = {
+            "type": "m.receipt",
+            "content": content,
+        }
+
+        defer.returnValue(([event], to_key))
+
+    def get_current_key(self, direction='f'):
+        return self.store.get_max_receipt_stream_id()
+
+    @defer.inlineCallbacks
+    def get_pagination_rows(self, user, config, key):
+        defer.returnValue(([{}], 0))
diff --git a/synapse/rest/client/v2_alpha/receipts.py b/synapse/rest/client/v2_alpha/receipts.py
index 829427b7b6..40406e2ede 100644
--- a/synapse/rest/client/v2_alpha/receipts.py
+++ b/synapse/rest/client/v2_alpha/receipts.py
@@ -28,7 +28,7 @@ class ReceiptRestServlet(RestServlet):
     PATTERN = client_v2_pattern(
         "/rooms/(?P<room_id>[^/]*)"
         "/receipt/(?P<receipt_type>[^/]*)"
-        "/(?P<event_id>[^/])*"
+        "/(?P<event_id>[^/]*)$"
     )
 
     def __init__(self, hs):
@@ -41,7 +41,6 @@ class ReceiptRestServlet(RestServlet):
     def on_POST(self, request, room_id, receipt_type, event_id):
         user, client = yield self.auth.get_user_by_req(request)
 
-        # TODO: STUFF
         yield self.receipts_handler.received_client_receipt(
             room_id,
             receipt_type,
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 15c11fd410..5a02c80252 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -17,17 +17,33 @@ from ._base import SQLBaseStore, cached
 
 from twisted.internet import defer
 
+import logging
+
+
+logger = logging.getLogger(__name__)
+
 
 class ReceiptsStore(SQLBaseStore):
 
-    @cached
     @defer.inlineCallbacks
-    def get_linearized_receipts_for_room(self, room_id):
-        rows = yield self._simple_select_list(
-            table="receipts_linearized",
-            keyvalues={"room_id": room_id},
-            retcols=["receipt_type", "user_id", "event_id"],
-            desc="get_linearized_receipts_for_room",
+    def get_linearized_receipts_for_room(self, room_id, from_key, to_key):
+        def f(txn):
+            sql = (
+                "SELECT * FROM receipts_linearized WHERE"
+                " room_id = ? AND stream_id > ? AND stream_id <= ?"
+            )
+
+            txn.execute(
+                sql,
+                (room_id, from_key, to_key)
+            )
+
+            rows = self.cursor_to_dict(txn)
+
+            return rows
+
+        rows = yield self.runInteraction(
+            "get_linearized_receipts_for_room", f
         )
 
         result = {}
@@ -40,6 +56,9 @@ class ReceiptsStore(SQLBaseStore):
 
         defer.returnValue(result)
 
+    def get_max_receipt_stream_id(self):
+        return self._receipts_id_gen.get_max_token(self)
+
     @cached
     @defer.inlineCallbacks
     def get_graph_receipts_for_room(self, room_id):
@@ -62,11 +81,38 @@ class ReceiptsStore(SQLBaseStore):
 
     def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
                                       user_id, event_id, stream_id):
+
+        # We don't want to clobber receipts for more recent events, so we
+        # have to compare orderings of existing receipts
+        sql = (
+            "SELECT topological_ordering, stream_ordering, event_id FROM events"
+            " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
+            " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
+        )
+
+        txn.execute(sql, (room_id, receipt_type, user_id))
+        results = txn.fetchall()
+
+        if results:
+            res = self._simple_select_one_txn(
+                txn,
+                table="events",
+                retcols=["topological_ordering", "stream_ordering"],
+                keyvalues={"event_id": event_id},
+            )
+            topological_ordering = int(res["topological_ordering"])
+            stream_ordering = int(res["stream_ordering"])
+
+            for to, so, _ in results:
+                if int(to) > topological_ordering:
+                    return False
+                elif int(to) == topological_ordering and int(so) >= stream_ordering:
+                    return False
+
         self._simple_delete_txn(
             txn,
             table="receipts_linearized",
             keyvalues={
-                "stream_id": stream_id,
                 "room_id": room_id,
                 "receipt_type": receipt_type,
                 "user_id": user_id,
@@ -85,6 +131,8 @@ class ReceiptsStore(SQLBaseStore):
             }
         )
 
+        return True
+
     @defer.inlineCallbacks
     def insert_receipt(self, room_id, receipt_type, user_id, event_ids):
         if not event_ids:
@@ -115,13 +163,16 @@ class ReceiptsStore(SQLBaseStore):
 
         stream_id_manager = yield self._receipts_id_gen.get_next(self)
         with stream_id_manager as stream_id:
-            yield self.runInteraction(
+            have_persisted = yield self.runInteraction(
                 "insert_linearized_receipt",
                 self.insert_linearized_receipt_txn,
                 room_id, receipt_type, user_id, linearized_event_id,
                 stream_id=stream_id,
             )
 
+            if not have_persisted:
+                defer.returnValue(None)
+
         yield self.insert_graph_receipt(
             room_id, receipt_type, user_id, event_ids
         )
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 0a1a3a3d03..aaa3609aa5 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -20,6 +20,7 @@ from synapse.types import StreamToken
 from synapse.handlers.presence import PresenceEventSource
 from synapse.handlers.room import RoomEventSource
 from synapse.handlers.typing import TypingNotificationEventSource
+from synapse.handlers.receipts import ReceiptEventSource
 
 
 class NullSource(object):
@@ -43,6 +44,7 @@ class EventSources(object):
         "room": RoomEventSource,
         "presence": PresenceEventSource,
         "typing": TypingNotificationEventSource,
+        "receipt": ReceiptEventSource,
     }
 
     def __init__(self, hs):
@@ -63,7 +65,9 @@ class EventSources(object):
             typing_key=(
                 yield self.sources["typing"].get_current_key()
             ),
-            receipt_key="0",
+            receipt_key=(
+                yield self.sources["receipt"].get_current_key()
+            ),
         )
         defer.returnValue(token)