summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/events.py12
-rw-r--r--synapse/storage/relations.py233
2 files changed, 233 insertions, 12 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 6802bf42ce..b025ebc926 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1657,10 +1657,11 @@ class EventsStore(
         def get_all_new_forward_event_rows(txn):
             sql = (
                 "SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
-                " state_key, redacts"
+                " state_key, redacts, relates_to_id"
                 " FROM events AS e"
                 " LEFT JOIN redactions USING (event_id)"
                 " LEFT JOIN state_events USING (event_id)"
+                " LEFT JOIN event_relations USING (event_id)"
                 " WHERE ? < stream_ordering AND stream_ordering <= ?"
                 " ORDER BY stream_ordering ASC"
                 " LIMIT ?"
@@ -1675,11 +1676,12 @@ class EventsStore(
 
             sql = (
                 "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
-                " state_key, redacts"
+                " state_key, redacts, relates_to_id"
                 " FROM events AS e"
                 " INNER JOIN ex_outlier_stream USING (event_id)"
                 " LEFT JOIN redactions USING (event_id)"
                 " LEFT JOIN state_events USING (event_id)"
+                " LEFT JOIN event_relations USING (event_id)"
                 " WHERE ? < event_stream_ordering"
                 " AND event_stream_ordering <= ?"
                 " ORDER BY event_stream_ordering DESC"
@@ -1700,10 +1702,11 @@ class EventsStore(
         def get_all_new_backfill_event_rows(txn):
             sql = (
                 "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
-                " state_key, redacts"
+                " state_key, redacts, relates_to_id"
                 " FROM events AS e"
                 " LEFT JOIN redactions USING (event_id)"
                 " LEFT JOIN state_events USING (event_id)"
+                " LEFT JOIN event_relations USING (event_id)"
                 " WHERE ? > stream_ordering AND stream_ordering >= ?"
                 " ORDER BY stream_ordering ASC"
                 " LIMIT ?"
@@ -1718,11 +1721,12 @@ class EventsStore(
 
             sql = (
                 "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
-                " state_key, redacts"
+                " state_key, redacts, relates_to_id"
                 " FROM events AS e"
                 " INNER JOIN ex_outlier_stream USING (event_id)"
                 " LEFT JOIN redactions USING (event_id)"
                 " LEFT JOIN state_events USING (event_id)"
+                " LEFT JOIN event_relations USING (event_id)"
                 " WHERE ? > event_stream_ordering"
                 " AND event_stream_ordering >= ?"
                 " ORDER BY event_stream_ordering DESC"
diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py
index 31ef6679af..de67e305a1 100644
--- a/synapse/storage/relations.py
+++ b/synapse/storage/relations.py
@@ -18,7 +18,10 @@ import logging
 import attr
 
 from synapse.api.constants import RelationTypes
+from synapse.api.errors import SynapseError
 from synapse.storage._base import SQLBaseStore
+from synapse.storage.stream import generate_pagination_where_clause
+from synapse.util.caches.descriptors import cached
 
 logger = logging.getLogger(__name__)
 
@@ -29,19 +32,101 @@ class PaginationChunk(object):
 
     Attributes:
         chunk (list): The rows returned by pagination
+        next_batch (Any|None): Token to fetch next set of results with, if
+            None then there are no more results.
+        prev_batch (Any|None): Token to fetch previous set of results with, if
+            None then there are no previous results.
     """
 
     chunk = attr.ib()
+    next_batch = attr.ib(default=None)
+    prev_batch = attr.ib(default=None)
 
     def to_dict(self):
         d = {"chunk": self.chunk}
 
+        if self.next_batch:
+            d["next_batch"] = self.next_batch.to_string()
+
+        if self.prev_batch:
+            d["prev_batch"] = self.prev_batch.to_string()
+
         return d
 
 
-class RelationsStore(SQLBaseStore):
+@attr.s(frozen=True, slots=True)
+class RelationPaginationToken(object):
+    """Pagination token for relation pagination API.
+
+    As the results are order by topological ordering, we can use the
+    `topological_ordering` and `stream_ordering` fields of the events at the
+    boundaries of the chunk as pagination tokens.
+
+    Attributes:
+        topological (int): The topological ordering of the boundary event
+        stream (int): The stream ordering of the boundary event.
+    """
+
+    topological = attr.ib()
+    stream = attr.ib()
+
+    @staticmethod
+    def from_string(string):
+        try:
+            t, s = string.split("-")
+            return RelationPaginationToken(int(t), int(s))
+        except ValueError:
+            raise SynapseError(400, "Invalid token")
+
+    def to_string(self):
+        return "%d-%d" % (self.topological, self.stream)
+
+    def as_tuple(self):
+        return attr.astuple(self)
+
+
+@attr.s(frozen=True, slots=True)
+class AggregationPaginationToken(object):
+    """Pagination token for relation aggregation pagination API.
+
+    As the results are order by count and then MAX(stream_ordering) of the
+    aggregation groups, we can just use them as our pagination token.
+
+    Attributes:
+        count (int): The count of relations in the boundar group.
+        stream (int): The MAX stream ordering in the boundary group.
+    """
+
+    count = attr.ib()
+    stream = attr.ib()
+
+    @staticmethod
+    def from_string(string):
+        try:
+            c, s = string.split("-")
+            return AggregationPaginationToken(int(c), int(s))
+        except ValueError:
+            raise SynapseError(400, "Invalid token")
+
+    def to_string(self):
+        return "%d-%d" % (self.count, self.stream)
+
+    def as_tuple(self):
+        return attr.astuple(self)
+
+
+class RelationsWorkerStore(SQLBaseStore):
+    @cached(tree=True)
     def get_relations_for_event(
-        self, event_id, relation_type=None, event_type=None, limit=5, direction="b"
+        self,
+        event_id,
+        relation_type=None,
+        event_type=None,
+        aggregation_key=None,
+        limit=5,
+        direction="b",
+        from_token=None,
+        to_token=None,
     ):
         """Get a list of relations for an event, ordered by topological ordering.
 
@@ -51,17 +136,21 @@ class RelationsStore(SQLBaseStore):
                 type, if given.
             event_type (str|None): Only fetch events with this event type, if
                 given.
+            aggregation_key (str|None): Only fetch events with this aggregation
+                key, if given.
             limit (int): Only fetch the most recent `limit` events.
             direction (str): Whether to fetch the most recent first (`"b"`) or
                 the oldest first (`"f"`).
+            from_token (RelationPaginationToken|None): Fetch rows from the given
+                token, or from the start if None.
+            to_token (RelationPaginationToken|None): Fetch rows up to the given
+                token, or up to the end if None.
 
         Returns:
             Deferred[PaginationChunk]: List of event IDs that match relations
             requested. The rows are of the form `{"event_id": "..."}`.
         """
 
-        # TODO: Pagination tokens
-
         where_clause = ["relates_to_id = ?"]
         where_args = [event_id]
 
@@ -73,12 +162,29 @@ class RelationsStore(SQLBaseStore):
             where_clause.append("type = ?")
             where_args.append(event_type)
 
-        order = "ASC"
+        if aggregation_key:
+            where_clause.append("aggregation_key = ?")
+            where_args.append(aggregation_key)
+
+        pagination_clause = generate_pagination_where_clause(
+            direction=direction,
+            column_names=("topological_ordering", "stream_ordering"),
+            from_token=attr.astuple(from_token) if from_token else None,
+            to_token=attr.astuple(to_token) if to_token else None,
+            engine=self.database_engine,
+        )
+
+        if pagination_clause:
+            where_clause.append(pagination_clause)
+
         if direction == "b":
             order = "DESC"
+        else:
+            order = "ASC"
 
         sql = """
-            SELECT event_id FROM event_relations
+            SELECT event_id, topological_ordering, stream_ordering
+            FROM event_relations
             INNER JOIN events USING (event_id)
             WHERE %s
             ORDER BY topological_ordering %s, stream_ordering %s
@@ -92,16 +198,122 @@ class RelationsStore(SQLBaseStore):
         def _get_recent_references_for_event_txn(txn):
             txn.execute(sql, where_args + [limit + 1])
 
-            events = [{"event_id": row[0]} for row in txn]
+            last_topo_id = None
+            last_stream_id = None
+            events = []
+            for row in txn:
+                events.append({"event_id": row[0]})
+                last_topo_id = row[1]
+                last_stream_id = row[2]
+
+            next_batch = None
+            if len(events) > limit and last_topo_id and last_stream_id:
+                next_batch = RelationPaginationToken(last_topo_id, last_stream_id)
 
             return PaginationChunk(
-                chunk=list(events[:limit]),
+                chunk=list(events[:limit]), next_batch=next_batch, prev_batch=from_token
             )
 
         return self.runInteraction(
             "get_recent_references_for_event", _get_recent_references_for_event_txn
         )
 
+    @cached(tree=True)
+    def get_aggregation_groups_for_event(
+        self,
+        event_id,
+        event_type=None,
+        limit=5,
+        direction="b",
+        from_token=None,
+        to_token=None,
+    ):
+        """Get a list of annotations on the event, grouped by event type and
+        aggregation key, sorted by count.
+
+        This is used e.g. to get the what and how many reactions have happend
+        on an event.
+
+        Args:
+            event_id (str): Fetch events that relate to this event ID.
+            event_type (str|None): Only fetch events with this event type, if
+                given.
+            limit (int): Only fetch the `limit` groups.
+            direction (str): Whether to fetch the highest count first (`"b"`) or
+                the lowest count first (`"f"`).
+            from_token (AggregationPaginationToken|None): Fetch rows from the
+                given token, or from the start if None.
+            to_token (AggregationPaginationToken|None): Fetch rows up to the
+                given token, or up to the end if None.
+
+
+        Returns:
+            Deferred[PaginationChunk]: List of groups of annotations that
+            match. Each row is a dict with `type`, `key` and `count` fields.
+        """
+
+        where_clause = ["relates_to_id = ?", "relation_type = ?"]
+        where_args = [event_id, RelationTypes.ANNOTATION]
+
+        if event_type:
+            where_clause.append("type = ?")
+            where_args.append(event_type)
+
+        having_clause = generate_pagination_where_clause(
+            direction=direction,
+            column_names=("COUNT(*)", "MAX(stream_ordering)"),
+            from_token=attr.astuple(from_token) if from_token else None,
+            to_token=attr.astuple(to_token) if to_token else None,
+            engine=self.database_engine,
+        )
+
+        if direction == "b":
+            order = "DESC"
+        else:
+            order = "ASC"
+
+        if having_clause:
+            having_clause = "HAVING " + having_clause
+        else:
+            having_clause = ""
+
+        sql = """
+            SELECT type, aggregation_key, COUNT(*), MAX(stream_ordering)
+            FROM event_relations
+            INNER JOIN events USING (event_id)
+            WHERE {where_clause}
+            GROUP BY relation_type, type, aggregation_key
+            {having_clause}
+            ORDER BY COUNT(*) {order}, MAX(stream_ordering) {order}
+            LIMIT ?
+        """.format(
+            where_clause=" AND ".join(where_clause),
+            order=order,
+            having_clause=having_clause,
+        )
+
+        def _get_aggregation_groups_for_event_txn(txn):
+            txn.execute(sql, where_args + [limit + 1])
+
+            next_batch = None
+            events = []
+            for row in txn:
+                events.append({"type": row[0], "key": row[1], "count": row[2]})
+                next_batch = AggregationPaginationToken(row[2], row[3])
+
+            if len(events) <= limit:
+                next_batch = None
+
+            return PaginationChunk(
+                chunk=list(events[:limit]), next_batch=next_batch, prev_batch=from_token
+            )
+
+        return self.runInteraction(
+            "get_aggregation_groups_for_event", _get_aggregation_groups_for_event_txn
+        )
+
+
+class RelationsStore(RelationsWorkerStore):
     def _handle_event_relations(self, txn, event):
         """Handles inserting relation data during peristence of events
 
@@ -140,3 +352,8 @@ class RelationsStore(SQLBaseStore):
                 "aggregation_key": aggregation_key,
             },
         )
+
+        txn.call_after(self.get_relations_for_event.invalidate_many, (parent_id,))
+        txn.call_after(
+            self.get_aggregation_groups_for_event.invalidate_many, (parent_id,)
+        )