diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 789859e69e..1d46fb0e43 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -257,7 +257,8 @@ class GuestAccess:
class ReceiptTypes:
READ: Final = "m.read"
- READ_PRIVATE: Final = "org.matrix.msc2285.read.private"
+ READ_PRIVATE: Final = "m.read.private"
+ UNSTABLE_READ_PRIVATE: Final = "org.matrix.msc2285.read.private"
FULLY_READ: Final = "m.fully_read"
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index c2ecd977cd..7d17c958bb 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -32,7 +32,7 @@ class ExperimentalConfig(Config):
# MSC2716 (importing historical messages)
self.msc2716_enabled: bool = experimental.get("msc2716_enabled", False)
- # MSC2285 (private read receipts)
+ # MSC2285 (unstable private read receipts)
self.msc2285_enabled: bool = experimental.get("msc2285_enabled", False)
# MSC3244 (room version capabilities)
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 85b472f250..6484e47e5f 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -143,8 +143,8 @@ class InitialSyncHandler:
joined_rooms,
to_key=int(now_token.receipt_key),
)
- if self.hs.config.experimental.msc2285_enabled:
- receipt = ReceiptEventSource.filter_out_private_receipts(receipt, user_id)
+
+ receipt = ReceiptEventSource.filter_out_private_receipts(receipt, user_id)
tags_by_room = await self.store.get_tags_for_user(user_id)
@@ -456,11 +456,8 @@ class InitialSyncHandler:
)
if not receipts:
return []
- if self.hs.config.experimental.msc2285_enabled:
- receipts = ReceiptEventSource.filter_out_private_receipts(
- receipts, user_id
- )
- return receipts
+
+ return ReceiptEventSource.filter_out_private_receipts(receipts, user_id)
presence, receipts, (messages, token) = await make_deferred_yieldable(
gather_results(
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 43d2882b0a..d4a866b346 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -163,7 +163,10 @@ class ReceiptsHandler:
if not is_new:
return
- if self.federation_sender and receipt_type != ReceiptTypes.READ_PRIVATE:
+ if self.federation_sender and receipt_type not in (
+ ReceiptTypes.READ_PRIVATE,
+ ReceiptTypes.UNSTABLE_READ_PRIVATE,
+ ):
await self.federation_sender.send_read_receipt(receipt)
@@ -203,24 +206,38 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
for event_id, orig_event_content in room.get("content", {}).items():
event_content = orig_event_content
# If there are private read receipts, additional logic is necessary.
- if ReceiptTypes.READ_PRIVATE in event_content:
+ if (
+ ReceiptTypes.READ_PRIVATE in event_content
+ or ReceiptTypes.UNSTABLE_READ_PRIVATE in event_content
+ ):
# Make a copy without private read receipts to avoid leaking
# other user's private read receipts..
event_content = {
receipt_type: receipt_value
for receipt_type, receipt_value in event_content.items()
- if receipt_type != ReceiptTypes.READ_PRIVATE
+ if receipt_type
+ not in (
+ ReceiptTypes.READ_PRIVATE,
+ ReceiptTypes.UNSTABLE_READ_PRIVATE,
+ )
}
# Copy the current user's private read receipt from the
# original content, if it exists.
- user_private_read_receipt = orig_event_content[
- ReceiptTypes.READ_PRIVATE
- ].get(user_id, None)
+ user_private_read_receipt = orig_event_content.get(
+ ReceiptTypes.READ_PRIVATE, {}
+ ).get(user_id, None)
if user_private_read_receipt:
event_content[ReceiptTypes.READ_PRIVATE] = {
user_id: user_private_read_receipt
}
+ user_unstable_private_read_receipt = orig_event_content.get(
+ ReceiptTypes.UNSTABLE_READ_PRIVATE, {}
+ ).get(user_id, None)
+ if user_unstable_private_read_receipt:
+ event_content[ReceiptTypes.UNSTABLE_READ_PRIVATE] = {
+ user_id: user_unstable_private_read_receipt
+ }
# Include the event if there is at least one non-private read
# receipt or the current user has a private read receipt.
@@ -256,10 +273,9 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
room_ids, from_key=from_key, to_key=to_key
)
- if self.config.experimental.msc2285_enabled:
- events = ReceiptEventSource.filter_out_private_receipts(
- events, user.to_string()
- )
+ events = ReceiptEventSource.filter_out_private_receipts(
+ events, user.to_string()
+ )
return events, to_key
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e4f2201c92..1ed7230e32 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -416,7 +416,10 @@ class FederationSenderHandler:
if not self._is_mine_id(receipt.user_id):
continue
# Private read receipts never get sent over federation.
- if receipt.receipt_type == ReceiptTypes.READ_PRIVATE:
+ if receipt.receipt_type in (
+ ReceiptTypes.READ_PRIVATE,
+ ReceiptTypes.UNSTABLE_READ_PRIVATE,
+ ):
continue
receipt_info = ReadReceipt(
receipt.room_id,
diff --git a/synapse/rest/client/notifications.py b/synapse/rest/client/notifications.py
index 24bc7c9095..a73322a6a4 100644
--- a/synapse/rest/client/notifications.py
+++ b/synapse/rest/client/notifications.py
@@ -58,7 +58,12 @@ class NotificationsServlet(RestServlet):
)
receipts_by_room = await self.store.get_receipts_for_user_with_orderings(
- user_id, [ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE]
+ user_id,
+ [
+ ReceiptTypes.READ,
+ ReceiptTypes.READ_PRIVATE,
+ ReceiptTypes.UNSTABLE_READ_PRIVATE,
+ ],
)
notif_event_ids = [pa.event_id for pa in push_actions]
diff --git a/synapse/rest/client/read_marker.py b/synapse/rest/client/read_marker.py
index 8896f2df50..aaad8b233f 100644
--- a/synapse/rest/client/read_marker.py
+++ b/synapse/rest/client/read_marker.py
@@ -40,9 +40,13 @@ class ReadMarkerRestServlet(RestServlet):
self.read_marker_handler = hs.get_read_marker_handler()
self.presence_handler = hs.get_presence_handler()
- self._known_receipt_types = {ReceiptTypes.READ, ReceiptTypes.FULLY_READ}
+ self._known_receipt_types = {
+ ReceiptTypes.READ,
+ ReceiptTypes.FULLY_READ,
+ ReceiptTypes.READ_PRIVATE,
+ }
if hs.config.experimental.msc2285_enabled:
- self._known_receipt_types.add(ReceiptTypes.READ_PRIVATE)
+ self._known_receipt_types.add(ReceiptTypes.UNSTABLE_READ_PRIVATE)
async def on_POST(
self, request: SynapseRequest, room_id: str
diff --git a/synapse/rest/client/receipts.py b/synapse/rest/client/receipts.py
index 409bfd43c1..c6108fc5eb 100644
--- a/synapse/rest/client/receipts.py
+++ b/synapse/rest/client/receipts.py
@@ -44,11 +44,13 @@ class ReceiptRestServlet(RestServlet):
self.read_marker_handler = hs.get_read_marker_handler()
self.presence_handler = hs.get_presence_handler()
- self._known_receipt_types = {ReceiptTypes.READ}
+ self._known_receipt_types = {
+ ReceiptTypes.READ,
+ ReceiptTypes.READ_PRIVATE,
+ ReceiptTypes.FULLY_READ,
+ }
if hs.config.experimental.msc2285_enabled:
- self._known_receipt_types.update(
- (ReceiptTypes.READ_PRIVATE, ReceiptTypes.FULLY_READ)
- )
+ self._known_receipt_types.add(ReceiptTypes.UNSTABLE_READ_PRIVATE)
async def on_POST(
self, request: SynapseRequest, room_id: str, receipt_type: str, event_id: str
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 0366986755..c9a830cbac 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -94,6 +94,7 @@ class VersionsRestServlet(RestServlet):
# Supports the busy presence state described in MSC3026.
"org.matrix.msc3026.busy_presence": self.config.experimental.msc3026_enabled,
# Supports receiving private read receipts as per MSC2285
+ "org.matrix.msc2285.stable": True, # TODO: Remove when MSC2285 becomes a part of the spec
"org.matrix.msc2285": self.config.experimental.msc2285_enabled,
# Supports filtering of /publicRooms by room type as per MSC3827
"org.matrix.msc3827.stable": True,
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 5db70f9a60..161aad0f89 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -80,7 +80,7 @@ import attr
from synapse.api.constants import ReceiptTypes
from synapse.metrics.background_process_metrics import wrap_as_background_process
-from synapse.storage._base import SQLBaseStore, db_to_json
+from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
@@ -259,7 +259,11 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
txn,
user_id,
room_id,
- receipt_types=(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
+ receipt_types=(
+ ReceiptTypes.READ,
+ ReceiptTypes.READ_PRIVATE,
+ ReceiptTypes.UNSTABLE_READ_PRIVATE,
+ ),
)
stream_ordering = None
@@ -448,6 +452,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
The list will be ordered by ascending stream_ordering.
The list will have between 0~limit entries.
"""
+
# find rooms that have a read receipt in them and return the next
# push actions
def get_after_receipt(
@@ -455,7 +460,18 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
) -> List[Tuple[str, str, int, str, bool]]:
# find rooms that have a read receipt in them and return the next
# push actions
- sql = """
+
+ receipt_types_clause, args = make_in_list_sql_clause(
+ self.database_engine,
+ "receipt_type",
+ (
+ ReceiptTypes.READ,
+ ReceiptTypes.READ_PRIVATE,
+ ReceiptTypes.UNSTABLE_READ_PRIVATE,
+ ),
+ )
+
+ sql = f"""
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight
FROM (
@@ -463,10 +479,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
MAX(stream_ordering) as stream_ordering
FROM events
INNER JOIN receipts_linearized USING (room_id, event_id)
- WHERE receipt_type = 'm.read' AND user_id = ?
+ WHERE {receipt_types_clause} AND user_id = ?
GROUP BY room_id
) AS rl,
- event_push_actions AS ep
+ event_push_actions AS ep
WHERE
ep.room_id = rl.room_id
AND ep.stream_ordering > rl.stream_ordering
@@ -476,7 +492,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
AND ep.notif = 1
ORDER BY ep.stream_ordering ASC LIMIT ?
"""
- args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
+ args.extend(
+ (user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
+ )
txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
@@ -490,7 +508,17 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
def get_no_receipt(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool]]:
- sql = """
+ receipt_types_clause, args = make_in_list_sql_clause(
+ self.database_engine,
+ "receipt_type",
+ (
+ ReceiptTypes.READ,
+ ReceiptTypes.READ_PRIVATE,
+ ReceiptTypes.UNSTABLE_READ_PRIVATE,
+ ),
+ )
+
+ sql = f"""
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight
FROM event_push_actions AS ep
@@ -498,7 +526,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
WHERE
ep.room_id NOT IN (
SELECT room_id FROM receipts_linearized
- WHERE receipt_type = 'm.read' AND user_id = ?
+ WHERE {receipt_types_clause} AND user_id = ?
GROUP BY room_id
)
AND ep.user_id = ?
@@ -507,7 +535,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
AND ep.notif = 1
ORDER BY ep.stream_ordering ASC LIMIT ?
"""
- args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
+ args.extend(
+ (user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
+ )
txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
@@ -557,12 +587,23 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
The list will be ordered by descending received_ts.
The list will have between 0~limit entries.
"""
+
# find rooms that have a read receipt in them and return the most recent
# push actions
def get_after_receipt(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool, int]]:
- sql = """
+ receipt_types_clause, args = make_in_list_sql_clause(
+ self.database_engine,
+ "receipt_type",
+ (
+ ReceiptTypes.READ,
+ ReceiptTypes.READ_PRIVATE,
+ ReceiptTypes.UNSTABLE_READ_PRIVATE,
+ ),
+ )
+
+ sql = f"""
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight, e.received_ts
FROM (
@@ -570,7 +611,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
MAX(stream_ordering) as stream_ordering
FROM events
INNER JOIN receipts_linearized USING (room_id, event_id)
- WHERE receipt_type = 'm.read' AND user_id = ?
+ WHERE {receipt_types_clause} AND user_id = ?
GROUP BY room_id
) AS rl,
event_push_actions AS ep
@@ -584,7 +625,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
AND ep.notif = 1
ORDER BY ep.stream_ordering DESC LIMIT ?
"""
- args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
+ args.extend(
+ (user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
+ )
txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())
@@ -598,7 +641,17 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
def get_no_receipt(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool, int]]:
- sql = """
+ receipt_types_clause, args = make_in_list_sql_clause(
+ self.database_engine,
+ "receipt_type",
+ (
+ ReceiptTypes.READ,
+ ReceiptTypes.READ_PRIVATE,
+ ReceiptTypes.UNSTABLE_READ_PRIVATE,
+ ),
+ )
+
+ sql = f"""
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight, e.received_ts
FROM event_push_actions AS ep
@@ -606,7 +659,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
WHERE
ep.room_id NOT IN (
SELECT room_id FROM receipts_linearized
- WHERE receipt_type = 'm.read' AND user_id = ?
+ WHERE {receipt_types_clause} AND user_id = ?
GROUP BY room_id
)
AND ep.user_id = ?
@@ -615,7 +668,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
AND ep.notif = 1
ORDER BY ep.stream_ordering DESC LIMIT ?
"""
- args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
+ args.extend(
+ (user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
+ )
txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())
|