diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e324662f18..7c1d6b5489 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -278,6 +278,11 @@ class MessageHandler(BaseHandler):
user, pagination_config.get_source_config("presence"), None
)
+ receipt_stream = self.hs.get_event_sources().sources["receipt"]
+ receipt, _ = yield receipt_stream.get_pagination_rows(
+ user, pagination_config.get_source_config("receipt"), None
+ )
+
public_room_ids = yield self.store.get_public_room_ids()
limit = pagin_config.limit
@@ -344,7 +349,8 @@ class MessageHandler(BaseHandler):
ret = {
"rooms": rooms_ret,
"presence": presence,
- "end": now_token.to_string()
+ "receipts": receipt,
+ "end": now_token.to_string(),
}
defer.returnValue(ret)
@@ -405,9 +411,12 @@ class MessageHandler(BaseHandler):
defer.returnValue([p for success, p in presence_defs if success])
- presence, (messages, token) = yield defer.gatherResults(
+ receipts_handler = self.hs.get_handlers().receipts_handler
+
+ presence, receipts, (messages, token) = yield defer.gatherResults(
[
get_presence(),
+ receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key),
self.store.get_recent_events_for_room(
room_id,
limit=limit,
@@ -431,5 +440,6 @@ class MessageHandler(BaseHandler):
"end": end_token.to_string(),
},
"state": state,
- "presence": presence
+ "presence": presence,
+ "receipts": receipts,
})
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 94f0810057..f6cde30e63 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -133,6 +133,24 @@ class ReceiptsHandler(BaseHandler):
},
)
+ @defer.inlineCallbacks
+ def get_receipts_for_room(self, room_id, to_key):
+ result = yield self.store.get_linearized_receipts_for_room(
+ room_id, None, to_key
+ )
+
+ if not result:
+ defer.returnValue([])
+
+ event = {
+ "type": "m.receipt",
+ "content": {
+ room_id: result,
+ },
+ }
+
+ defer.returnValue([event])
+
class ReceiptEventSource(object):
def __init__(self, hs):
@@ -168,4 +186,29 @@ class ReceiptEventSource(object):
@defer.inlineCallbacks
def get_pagination_rows(self, user, config, key):
- defer.returnValue(([{}], 0))
+ to_key = int(config.from_key)
+
+ if config.to_key:
+ from_key = int(config.to_key)
+ else:
+ from_key = None
+
+ rooms = yield self.store.get_rooms_for_user(user.to_string())
+ rooms = [room.room_id for room in rooms]
+ content = {}
+ for room_id in rooms:
+ result = yield self.store.get_linearized_receipts_for_room(
+ room_id, from_key, to_key
+ )
+ if result:
+ content[room_id] = result
+
+ if not content:
+ defer.returnValue(([], to_key))
+
+ event = {
+ "type": "m.receipt",
+ "content": content,
+ }
+
+ defer.returnValue(([event], to_key))
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 5a02c80252..07f8edaace 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -28,15 +28,26 @@ class ReceiptsStore(SQLBaseStore):
@defer.inlineCallbacks
def get_linearized_receipts_for_room(self, room_id, from_key, to_key):
def f(txn):
- sql = (
- "SELECT * FROM receipts_linearized WHERE"
- " room_id = ? AND stream_id > ? AND stream_id <= ?"
- )
-
- txn.execute(
- sql,
- (room_id, from_key, to_key)
- )
+ if from_key:
+ sql = (
+ "SELECT * FROM receipts_linearized WHERE"
+ " room_id = ? AND stream_id > ? AND stream_id <= ?"
+ )
+
+ txn.execute(
+ sql,
+ (room_id, from_key, to_key)
+ )
+ else:
+ sql = (
+ "SELECT * FROM receipts_linearized WHERE"
+ " room_id = ? AND stream_id <= ?"
+ )
+
+ txn.execute(
+ sql,
+ (room_id, to_key)
+ )
rows = self.cursor_to_dict(txn)
|