diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index fc2f38c1c0..94f0810057 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -37,7 +37,8 @@ class ReceiptsHandler(BaseHandler):
"m.receipt", self._received_remote_receipt
)
- self._latest_serial = 0
+ # self._earliest_cached_serial = 0
+ # self._rooms_to_latest_serial = {}
@defer.inlineCallbacks
def received_client_receipt(self, room_id, receipt_type, user_id,
@@ -53,8 +54,10 @@ class ReceiptsHandler(BaseHandler):
"event_ids": [event_id],
}
- yield self._handle_new_receipts([receipt])
- self._push_remotes([receipt])
+ is_new = yield self._handle_new_receipts([receipt])
+
+ if is_new:
+ self._push_remotes([receipt])
@defer.inlineCallbacks
def _received_remote_receipt(self, origin, content):
@@ -81,33 +84,24 @@ class ReceiptsHandler(BaseHandler):
user_id = receipt["user_id"]
event_ids = receipt["event_ids"]
- stream_id, max_persisted_id = yield self.store.insert_receipt(
+ res = yield self.store.insert_receipt(
room_id, receipt_type, user_id, event_ids,
)
- # TODO: Use max_persisted_id
+ if not res:
+ # res will be None if this read receipt is 'old'
+ defer.returnValue(False)
- self._latest_serial = max(self._latest_serial, stream_id)
+ stream_id, max_persisted_id = res
with PreserveLoggingContext():
self.notifier.on_new_event(
- "receipt_key", self._latest_serial, rooms=[room_id]
+ "receipt_key", max_persisted_id, rooms=[room_id]
)
- localusers = set()
- remotedomains = set()
-
- rm_handler = self.hs.get_handlers().room_member_handler
- yield rm_handler.fetch_room_distributions_into(
- room_id, localusers=localusers, remotedomains=remotedomains
- )
-
- receipt["remotedomains"] = remotedomains
-
- self.notifier.on_new_event(
- "receipt_key", self._latest_room_serial, rooms=[room_id]
- )
+ defer.returnValue(True)
+ @defer.inlineCallbacks
def _push_remotes(self, receipts):
# TODO: Some of this stuff should be coallesced.
for receipt in receipts:
@@ -115,7 +109,15 @@ class ReceiptsHandler(BaseHandler):
receipt_type = receipt["receipt_type"]
user_id = receipt["user_id"]
event_ids = receipt["event_ids"]
- remotedomains = receipt["remotedomains"]
+
+ remotedomains = set()
+
+ rm_handler = self.hs.get_handlers().room_member_handler
+ yield rm_handler.fetch_room_distributions_into(
+ room_id, localusers=None, remotedomains=remotedomains
+ )
+
+ logger.debug("Sending receipt to: %r", remotedomains)
for domain in remotedomains:
self.federation.send_edu(
@@ -130,3 +132,40 @@ class ReceiptsHandler(BaseHandler):
},
},
)
+
+
+class ReceiptEventSource(object):
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def get_new_events_for_user(self, user, from_key, limit):
+ from_key = int(from_key)
+ to_key = yield self.get_current_key()
+
+ rooms = yield self.store.get_rooms_for_user(user.to_string())
+ rooms = [room.room_id for room in rooms]
+ content = {}
+ for room_id in rooms:
+ result = yield self.store.get_linearized_receipts_for_room(
+ room_id, from_key, to_key
+ )
+ if result:
+ content[room_id] = result
+
+ if not content:
+ defer.returnValue(([], to_key))
+
+ event = {
+ "type": "m.receipt",
+ "content": content,
+ }
+
+ defer.returnValue(([event], to_key))
+
+ def get_current_key(self, direction='f'):
+ return self.store.get_max_receipt_stream_id()
+
+ @defer.inlineCallbacks
+ def get_pagination_rows(self, user, config, key):
+ defer.returnValue(([{}], 0))
diff --git a/synapse/rest/client/v2_alpha/receipts.py b/synapse/rest/client/v2_alpha/receipts.py
index 829427b7b6..40406e2ede 100644
--- a/synapse/rest/client/v2_alpha/receipts.py
+++ b/synapse/rest/client/v2_alpha/receipts.py
@@ -28,7 +28,7 @@ class ReceiptRestServlet(RestServlet):
PATTERN = client_v2_pattern(
"/rooms/(?P<room_id>[^/]*)"
"/receipt/(?P<receipt_type>[^/]*)"
- "/(?P<event_id>[^/])*"
+ "/(?P<event_id>[^/]*)$"
)
def __init__(self, hs):
@@ -41,7 +41,6 @@ class ReceiptRestServlet(RestServlet):
def on_POST(self, request, room_id, receipt_type, event_id):
user, client = yield self.auth.get_user_by_req(request)
- # TODO: STUFF
yield self.receipts_handler.received_client_receipt(
room_id,
receipt_type,
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 15c11fd410..5a02c80252 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -17,17 +17,33 @@ from ._base import SQLBaseStore, cached
from twisted.internet import defer
+import logging
+
+
+logger = logging.getLogger(__name__)
+
class ReceiptsStore(SQLBaseStore):
- @cached
@defer.inlineCallbacks
- def get_linearized_receipts_for_room(self, room_id):
- rows = yield self._simple_select_list(
- table="receipts_linearized",
- keyvalues={"room_id": room_id},
- retcols=["receipt_type", "user_id", "event_id"],
- desc="get_linearized_receipts_for_room",
+ def get_linearized_receipts_for_room(self, room_id, from_key, to_key):
+ def f(txn):
+ sql = (
+ "SELECT * FROM receipts_linearized WHERE"
+ " room_id = ? AND stream_id > ? AND stream_id <= ?"
+ )
+
+ txn.execute(
+ sql,
+ (room_id, from_key, to_key)
+ )
+
+ rows = self.cursor_to_dict(txn)
+
+ return rows
+
+ rows = yield self.runInteraction(
+ "get_linearized_receipts_for_room", f
)
result = {}
@@ -40,6 +56,9 @@ class ReceiptsStore(SQLBaseStore):
defer.returnValue(result)
+ def get_max_receipt_stream_id(self):
+ return self._receipts_id_gen.get_max_token(self)
+
@cached
@defer.inlineCallbacks
def get_graph_receipts_for_room(self, room_id):
@@ -62,11 +81,38 @@ class ReceiptsStore(SQLBaseStore):
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
user_id, event_id, stream_id):
+
+ # We don't want to clobber receipts for more recent events, so we
+ # have to compare orderings of existing receipts
+ sql = (
+ "SELECT topological_ordering, stream_ordering, event_id FROM events"
+ " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
+ " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
+ )
+
+ txn.execute(sql, (room_id, receipt_type, user_id))
+ results = txn.fetchall()
+
+ if results:
+ res = self._simple_select_one_txn(
+ txn,
+ table="events",
+ retcols=["topological_ordering", "stream_ordering"],
+ keyvalues={"event_id": event_id},
+ )
+ topological_ordering = int(res["topological_ordering"])
+ stream_ordering = int(res["stream_ordering"])
+
+ for to, so, _ in results:
+ if int(to) > topological_ordering:
+ return False
+ elif int(to) == topological_ordering and int(so) >= stream_ordering:
+ return False
+
self._simple_delete_txn(
txn,
table="receipts_linearized",
keyvalues={
- "stream_id": stream_id,
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
@@ -85,6 +131,8 @@ class ReceiptsStore(SQLBaseStore):
}
)
+ return True
+
@defer.inlineCallbacks
def insert_receipt(self, room_id, receipt_type, user_id, event_ids):
if not event_ids:
@@ -115,13 +163,16 @@ class ReceiptsStore(SQLBaseStore):
stream_id_manager = yield self._receipts_id_gen.get_next(self)
with stream_id_manager as stream_id:
- yield self.runInteraction(
+ have_persisted = yield self.runInteraction(
"insert_linearized_receipt",
self.insert_linearized_receipt_txn,
room_id, receipt_type, user_id, linearized_event_id,
stream_id=stream_id,
)
+ if not have_persisted:
+ defer.returnValue(None)
+
yield self.insert_graph_receipt(
room_id, receipt_type, user_id, event_ids
)
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 0a1a3a3d03..aaa3609aa5 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -20,6 +20,7 @@ from synapse.types import StreamToken
from synapse.handlers.presence import PresenceEventSource
from synapse.handlers.room import RoomEventSource
from synapse.handlers.typing import TypingNotificationEventSource
+from synapse.handlers.receipts import ReceiptEventSource
class NullSource(object):
@@ -43,6 +44,7 @@ class EventSources(object):
"room": RoomEventSource,
"presence": PresenceEventSource,
"typing": TypingNotificationEventSource,
+ "receipt": ReceiptEventSource,
}
def __init__(self, hs):
@@ -63,7 +65,9 @@ class EventSources(object):
typing_key=(
yield self.sources["typing"].get_current_key()
),
- receipt_key="0",
+ receipt_key=(
+ yield self.sources["receipt"].get_current_key()
+ ),
)
defer.returnValue(token)
|