summary refs log tree commit diff
path: root/synapse/storage/data_stores
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-06-16 17:10:28 +0100
committerGitHub <noreply@github.com>2020-06-16 17:10:28 +0100
commitf6f7511a4c0548b17bd1cdabebd0ffad9ea73bc7 (patch)
tree5c8f96ecc110b9bfa8cf2d735359a03e397bff83 /synapse/storage/data_stores
parentFix "argument of type 'ObservableDeferred' is not iterable" error (#7708) (diff)
downloadsynapse-f6f7511a4c0548b17bd1cdabebd0ffad9ea73bc7.tar.xz
Refactor getting replication updates from database. (#7636)
The aim here is to make it easier to reason about when streams are limited and when they're not, by moving the logic into the database functions themselves. This should mean we can kill of `db_query_to_update_function` function.
Diffstat (limited to 'synapse/storage/data_stores')
-rw-r--r--synapse/storage/data_stores/main/events_worker.py41
-rw-r--r--synapse/storage/data_stores/main/presence.py41
-rw-r--r--synapse/storage/data_stores/main/push_rule.py56
-rw-r--r--synapse/storage/data_stores/main/receipts.py82
4 files changed, 183 insertions, 37 deletions
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 213d69100a..a48c7a96ca 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -1077,9 +1077,32 @@ class EventsWorkerStore(SQLBaseStore):
             "get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
         )
 
-    def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
+    async def get_all_new_backfill_event_rows(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for backfill replication stream, including all new
+        backfilled events and events that have gone from being outliers to not.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
         if last_id == current_id:
-            return defer.succeed([])
+            return [], current_id, False
 
         def get_all_new_backfill_event_rows(txn):
             sql = (
@@ -1094,10 +1117,12 @@ class EventsWorkerStore(SQLBaseStore):
                 " LIMIT ?"
             )
             txn.execute(sql, (-last_id, -current_id, limit))
-            new_event_updates = txn.fetchall()
+            new_event_updates = [(row[0], row[1:]) for row in txn]
 
+            limited = False
             if len(new_event_updates) == limit:
                 upper_bound = new_event_updates[-1][0]
+                limited = True
             else:
                 upper_bound = current_id
 
@@ -1114,11 +1139,15 @@ class EventsWorkerStore(SQLBaseStore):
                 " ORDER BY event_stream_ordering DESC"
             )
             txn.execute(sql, (-last_id, -upper_bound))
-            new_event_updates.extend(txn.fetchall())
+            new_event_updates.extend((row[0], row[1:]) for row in txn)
 
-            return new_event_updates
+            if len(new_event_updates) >= limit:
+                upper_bound = new_event_updates[-1][0]
+                limited = True
 
-        return self.db.runInteraction(
+            return new_event_updates, upper_bound, limited
+
+        return await self.db.runInteraction(
             "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
         )
 
diff --git a/synapse/storage/data_stores/main/presence.py b/synapse/storage/data_stores/main/presence.py
index dab31e0c2d..7574612619 100644
--- a/synapse/storage/data_stores/main/presence.py
+++ b/synapse/storage/data_stores/main/presence.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from typing import List, Tuple
+
 from twisted.internet import defer
 
 from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
@@ -73,9 +75,32 @@ class PresenceStore(SQLBaseStore):
             )
             txn.execute(sql + clause, [stream_id] + list(args))
 
-    def get_all_presence_updates(self, last_id, current_id, limit):
+    async def get_all_presence_updates(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for presence replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
         if last_id == current_id:
-            return defer.succeed([])
+            return [], current_id, False
 
         def get_all_presence_updates_txn(txn):
             sql = """
@@ -89,9 +114,17 @@ class PresenceStore(SQLBaseStore):
                 LIMIT ?
             """
             txn.execute(sql, (last_id, current_id, limit))
-            return txn.fetchall()
+            updates = [(row[0], row[1:]) for row in txn]
+
+            upper_bound = current_id
+            limited = False
+            if len(updates) >= limit:
+                upper_bound = updates[-1][0]
+                limited = True
+
+            return updates, upper_bound, limited
 
-        return self.db.runInteraction(
+        return await self.db.runInteraction(
             "get_all_presence_updates", get_all_presence_updates_txn
         )
 
diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py
index ef8f40959f..f6e78ca590 100644
--- a/synapse/storage/data_stores/main/push_rule.py
+++ b/synapse/storage/data_stores/main/push_rule.py
@@ -16,7 +16,7 @@
 
 import abc
 import logging
-from typing import Union
+from typing import List, Tuple, Union
 
 from canonicaljson import json
 
@@ -348,23 +348,53 @@ class PushRulesWorkerStore(
             results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
         return results
 
-    def get_all_push_rule_updates(self, last_id, current_id, limit):
-        """Get all the push rules changes that have happend on the server"""
+    async def get_all_push_rule_updates(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for push_rules replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
         if last_id == current_id:
-            return defer.succeed([])
+            return [], current_id, False
 
         def get_all_push_rule_updates_txn(txn):
-            sql = (
-                "SELECT stream_id, event_stream_ordering, user_id, rule_id,"
-                " op, priority_class, priority, conditions, actions"
-                " FROM push_rules_stream"
-                " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC LIMIT ?"
-            )
+            sql = """
+                SELECT stream_id, user_id
+                FROM push_rules_stream
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """
             txn.execute(sql, (last_id, current_id, limit))
-            return txn.fetchall()
+            updates = [(stream_id, (user_id,)) for stream_id, user_id in txn]
+
+            limited = False
+            upper_bound = current_id
+            if len(updates) == limit:
+                limited = True
+                upper_bound = updates[-1][0]
+
+            return updates, upper_bound, limited
 
-        return self.db.runInteraction(
+        return await self.db.runInteraction(
             "get_all_push_rule_updates", get_all_push_rule_updates_txn
         )
 
diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index d4a7163049..8f5505bd67 100644
--- a/synapse/storage/data_stores/main/receipts.py
+++ b/synapse/storage/data_stores/main/receipts.py
@@ -16,6 +16,7 @@
 
 import abc
 import logging
+from typing import List, Tuple
 
 from canonicaljson import json
 
@@ -267,26 +268,79 @@ class ReceiptsWorkerStore(SQLBaseStore):
         }
         return results
 
-    def get_all_updated_receipts(self, last_id, current_id, limit=None):
+    def get_users_sent_receipts_between(self, last_id: int, current_id: int):
+        """Get all users who sent receipts between `last_id` exclusive and
+        `current_id` inclusive.
+
+        Returns:
+            Deferred[List[str]]
+        """
+
         if last_id == current_id:
             return defer.succeed([])
 
-        def get_all_updated_receipts_txn(txn):
-            sql = (
-                "SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
-                " FROM receipts_linearized"
-                " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC"
-            )
-            args = [last_id, current_id]
-            if limit is not None:
-                sql += " LIMIT ?"
-                args.append(limit)
-            txn.execute(sql, args)
+        def _get_users_sent_receipts_between_txn(txn):
+            sql = """
+                SELECT DISTINCT user_id FROM receipts_linearized
+                WHERE ? < stream_id AND stream_id <= ?
+            """
+            txn.execute(sql, (last_id, current_id))
 
-            return [r[0:5] + (json.loads(r[5]),) for r in txn]
+            return [r[0] for r in txn]
 
         return self.db.runInteraction(
+            "get_users_sent_receipts_between", _get_users_sent_receipts_between_txn
+        )
+
+    async def get_all_updated_receipts(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for receipts replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
+        if last_id == current_id:
+            return [], current_id, False
+
+        def get_all_updated_receipts_txn(txn):
+            sql = """
+                SELECT stream_id, room_id, receipt_type, user_id, event_id, data
+                FROM receipts_linearized
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """
+            txn.execute(sql, (last_id, current_id, limit))
+
+            updates = [(r[0], r[1:5] + (json.loads(r[5]),)) for r in txn]
+
+            limited = False
+            upper_bound = current_id
+
+            if len(updates) == limit:
+                limited = True
+                upper_bound = updates[-1][0]
+
+            return updates, upper_bound, limited
+
+        return await self.db.runInteraction(
             "get_all_updated_receipts", get_all_updated_receipts_txn
         )