summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/data_stores/main/cache.py1
-rw-r--r--synapse/storage/data_stores/main/events.py48
-rw-r--r--synapse/storage/data_stores/main/events_worker.py86
-rw-r--r--synapse/storage/data_stores/main/schema/delta/58/12unread_messages.sql18
4 files changed, 151 insertions, 2 deletions
diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py
index f39f556c20..edc3624fed 100644
--- a/synapse/storage/data_stores/main/cache.py
+++ b/synapse/storage/data_stores/main/cache.py
@@ -172,6 +172,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
 
         self.get_latest_event_ids_in_room.invalidate((room_id,))
 
+        self.get_unread_message_count_for_user.invalidate_many((room_id,))
         self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
 
         if not backfilled:
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 6f2e0d15cc..0c9c02afa1 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -53,6 +53,47 @@ event_counter = Counter(
     ["type", "origin_type", "origin_entity"],
 )
 
+STATE_EVENT_TYPES_TO_MARK_UNREAD = {
+    EventTypes.Topic,
+    EventTypes.Name,
+    EventTypes.RoomAvatar,
+    EventTypes.Tombstone,
+}
+
+
+def should_count_as_unread(event: EventBase, context: EventContext) -> bool:
+    # Exclude rejected and soft-failed events.
+    if context.rejected or event.internal_metadata.is_soft_failed():
+        return False
+
+    # Exclude notices.
+    if (
+        not event.is_state()
+        and event.type == EventTypes.Message
+        and event.content.get("msgtype") == "m.notice"
+    ):
+        return False
+
+    # Exclude edits.
+    relates_to = event.content.get("m.relates_to", {})
+    if relates_to.get("rel_type") == RelationTypes.REPLACE:
+        return False
+
+    # Mark events that have a non-empty string body as unread.
+    body = event.content.get("body")
+    if isinstance(body, str) and body:
+        return True
+
+    # Mark some state events as unread.
+    if event.is_state() and event.type in STATE_EVENT_TYPES_TO_MARK_UNREAD:
+        return True
+
+    # Mark encrypted events as unread.
+    if not event.is_state() and event.type == EventTypes.Encrypted:
+        return True
+
+    return False
+
 
 def encode_json(json_object):
     """
@@ -196,6 +237,10 @@ class PersistEventsStore:
 
                 event_counter.labels(event.type, origin_type, origin_entity).inc()
 
+                self.store.get_unread_message_count_for_user.invalidate_many(
+                    (event.room_id,),
+                )
+
             for room_id, new_state in current_state_for_room.items():
                 self.store.get_current_state_ids.prefill((room_id,), new_state)
 
@@ -817,8 +862,9 @@ class PersistEventsStore:
                     "contains_url": (
                         "url" in event.content and isinstance(event.content["url"], str)
                     ),
+                    "count_as_unread": should_count_as_unread(event, context),
                 }
-                for event, _ in events_and_contexts
+                for event, context in events_and_contexts
             ],
         )
 
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index e812c67078..b03b259636 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -41,9 +41,15 @@ from synapse.replication.tcp.streams import BackfillStream
 from synapse.replication.tcp.streams.events import EventsStream
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import Database
+from synapse.storage.types import Cursor
 from synapse.storage.util.id_generators import StreamIdGenerator
 from synapse.types import get_domain_from_id
-from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks
+from synapse.util.caches.descriptors import (
+    Cache,
+    _CacheContext,
+    cached,
+    cachedInlineCallbacks,
+)
 from synapse.util.iterutils import batch_iter
 from synapse.util.metrics import Measure
 
@@ -1358,6 +1364,84 @@ class EventsWorkerStore(SQLBaseStore):
             desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
         )
 
+    @cached(tree=True, cache_context=True)
+    async def get_unread_message_count_for_user(
+        self, room_id: str, user_id: str, cache_context: _CacheContext,
+    ) -> int:
+        """Retrieve the count of unread messages for the given room and user.
+
+        Args:
+            room_id: The ID of the room to count unread messages in.
+            user_id: The ID of the user to count unread messages for.
+
+        Returns:
+            The number of unread messages for the given user in the given room.
+        """
+        with Measure(self._clock, "get_unread_message_count_for_user"):
+            last_read_event_id = await self.get_last_receipt_event_id_for_user(
+                user_id=user_id,
+                room_id=room_id,
+                receipt_type="m.read",
+                on_invalidate=cache_context.invalidate,
+            )
+
+            return await self.db.runInteraction(
+                "get_unread_message_count_for_user",
+                self._get_unread_message_count_for_user_txn,
+                user_id,
+                room_id,
+                last_read_event_id,
+            )
+
+    def _get_unread_message_count_for_user_txn(
+        self,
+        txn: Cursor,
+        user_id: str,
+        room_id: str,
+        last_read_event_id: Optional[str],
+    ) -> int:
+        if last_read_event_id:
+            # Get the stream ordering for the last read event.
+            stream_ordering = self.db.simple_select_one_onecol_txn(
+                txn=txn,
+                table="events",
+                keyvalues={"room_id": room_id, "event_id": last_read_event_id},
+                retcol="stream_ordering",
+            )
+        else:
+            # If there's no read receipt for that room, it probably means the user hasn't
+            # opened it yet, in which case use the stream ID of their join event.
+            # We can't just set it to 0 otherwise messages from other local users from
+            # before this user joined will be counted as well.
+            txn.execute(
+                """
+                SELECT stream_ordering FROM local_current_membership
+                LEFT JOIN events USING (event_id, room_id)
+                WHERE membership = 'join'
+                    AND user_id = ?
+                    AND room_id = ?
+                """,
+                (user_id, room_id),
+            )
+            row = txn.fetchone()
+
+            if row is None:
+                return 0
+
+            stream_ordering = row[0]
+
+        # Count the messages that qualify as unread after the stream ordering we've just
+        # retrieved.
+        sql = """
+            SELECT COUNT(*) FROM events
+            WHERE sender != ? AND room_id = ? AND stream_ordering > ? AND count_as_unread
+        """
+
+        txn.execute(sql, (user_id, room_id, stream_ordering))
+        row = txn.fetchone()
+
+        return row[0] if row else 0
+
 
 AllNewEventsResult = namedtuple(
     "AllNewEventsResult",
diff --git a/synapse/storage/data_stores/main/schema/delta/58/12unread_messages.sql b/synapse/storage/data_stores/main/schema/delta/58/12unread_messages.sql
new file mode 100644
index 0000000000..531b532c73
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/58/12unread_messages.sql
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Store a boolean value in the events table for whether the event should be counted in
+-- the unread_count property of sync responses.
+ALTER TABLE events ADD COLUMN count_as_unread BOOLEAN;