summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/data_stores/main/event_federation.py3
-rw-r--r--synapse/storage/data_stores/main/events.py10
-rw-r--r--synapse/storage/data_stores/main/events_bg_updates.py4
-rw-r--r--synapse/storage/data_stores/main/events_worker.py41
-rw-r--r--synapse/storage/data_stores/main/presence.py41
-rw-r--r--synapse/storage/data_stores/main/push_rule.py56
-rw-r--r--synapse/storage/data_stores/main/receipts.py91
-rw-r--r--synapse/storage/data_stores/main/schema/delta/30/as_users.py2
-rw-r--r--synapse/storage/data_stores/main/search.py4
-rw-r--r--synapse/storage/data_stores/main/stream.py2
-rw-r--r--synapse/storage/data_stores/main/tags.py2
-rw-r--r--synapse/storage/data_stores/state/store.py2
-rw-r--r--synapse/storage/database.py3
-rw-r--r--synapse/storage/persist_events.py2
14 files changed, 195 insertions, 68 deletions
diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py
index 24ce8c4330..a6bb3221ff 100644
--- a/synapse/storage/data_stores/main/event_federation.py
+++ b/synapse/storage/data_stores/main/event_federation.py
@@ -14,10 +14,9 @@
 # limitations under the License.
 import itertools
 import logging
+from queue import Empty, PriorityQueue
 from typing import Dict, List, Optional, Set, Tuple
 
-from six.moves.queue import Empty, PriorityQueue
-
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 8a13101f1d..cfd24d2f06 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -21,9 +21,6 @@ from collections import OrderedDict, namedtuple
 from functools import wraps
 from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
 
-from six import integer_types, text_type
-from six.moves import range
-
 import attr
 from canonicaljson import json
 from prometheus_client import Counter
@@ -893,8 +890,7 @@ class PersistEventsStore:
                     "received_ts": self._clock.time_msec(),
                     "sender": event.sender,
                     "contains_url": (
-                        "url" in event.content
-                        and isinstance(event.content["url"], text_type)
+                        "url" in event.content and isinstance(event.content["url"], str)
                     ),
                 }
                 for event, _ in events_and_contexts
@@ -1345,10 +1341,10 @@ class PersistEventsStore:
         ):
             if (
                 "min_lifetime" in event.content
-                and not isinstance(event.content.get("min_lifetime"), integer_types)
+                and not isinstance(event.content.get("min_lifetime"), int)
             ) or (
                 "max_lifetime" in event.content
-                and not isinstance(event.content.get("max_lifetime"), integer_types)
+                and not isinstance(event.content.get("max_lifetime"), int)
             ):
                 # Ignore the event if one of the value isn't an integer.
                 return
diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py
index f54c8b1ee0..62d28f44dc 100644
--- a/synapse/storage/data_stores/main/events_bg_updates.py
+++ b/synapse/storage/data_stores/main/events_bg_updates.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from six import text_type
-
 from canonicaljson import json
 
 from twisted.internet import defer
@@ -133,7 +131,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
                     contains_url = "url" in content
                     if contains_url:
-                        contains_url &= isinstance(content["url"], text_type)
+                        contains_url &= isinstance(content["url"], str)
                 except (KeyError, AttributeError):
                     # If the event is missing a necessary field then
                     # skip over it.
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 213d69100a..a48c7a96ca 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -1077,9 +1077,32 @@ class EventsWorkerStore(SQLBaseStore):
             "get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
         )
 
-    def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
+    async def get_all_new_backfill_event_rows(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for backfill replication stream, including all new
+        backfilled events and events that have gone from being outliers to not.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
         if last_id == current_id:
-            return defer.succeed([])
+            return [], current_id, False
 
         def get_all_new_backfill_event_rows(txn):
             sql = (
@@ -1094,10 +1117,12 @@ class EventsWorkerStore(SQLBaseStore):
                 " LIMIT ?"
             )
             txn.execute(sql, (-last_id, -current_id, limit))
-            new_event_updates = txn.fetchall()
+            new_event_updates = [(row[0], row[1:]) for row in txn]
 
+            limited = False
             if len(new_event_updates) == limit:
                 upper_bound = new_event_updates[-1][0]
+                limited = True
             else:
                 upper_bound = current_id
 
@@ -1114,11 +1139,15 @@ class EventsWorkerStore(SQLBaseStore):
                 " ORDER BY event_stream_ordering DESC"
             )
             txn.execute(sql, (-last_id, -upper_bound))
-            new_event_updates.extend(txn.fetchall())
+            new_event_updates.extend((row[0], row[1:]) for row in txn)
 
-            return new_event_updates
+            if len(new_event_updates) >= limit:
+                upper_bound = new_event_updates[-1][0]
+                limited = True
 
-        return self.db.runInteraction(
+            return new_event_updates, upper_bound, limited
+
+        return await self.db.runInteraction(
             "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
         )
 
diff --git a/synapse/storage/data_stores/main/presence.py b/synapse/storage/data_stores/main/presence.py
index dab31e0c2d..7574612619 100644
--- a/synapse/storage/data_stores/main/presence.py
+++ b/synapse/storage/data_stores/main/presence.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from typing import List, Tuple
+
 from twisted.internet import defer
 
 from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
@@ -73,9 +75,32 @@ class PresenceStore(SQLBaseStore):
             )
             txn.execute(sql + clause, [stream_id] + list(args))
 
-    def get_all_presence_updates(self, last_id, current_id, limit):
+    async def get_all_presence_updates(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for presence replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
         if last_id == current_id:
-            return defer.succeed([])
+            return [], current_id, False
 
         def get_all_presence_updates_txn(txn):
             sql = """
@@ -89,9 +114,17 @@ class PresenceStore(SQLBaseStore):
                 LIMIT ?
             """
             txn.execute(sql, (last_id, current_id, limit))
-            return txn.fetchall()
+            updates = [(row[0], row[1:]) for row in txn]
+
+            upper_bound = current_id
+            limited = False
+            if len(updates) >= limit:
+                upper_bound = updates[-1][0]
+                limited = True
+
+            return updates, upper_bound, limited
 
-        return self.db.runInteraction(
+        return await self.db.runInteraction(
             "get_all_presence_updates", get_all_presence_updates_txn
         )
 
diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py
index ef8f40959f..f6e78ca590 100644
--- a/synapse/storage/data_stores/main/push_rule.py
+++ b/synapse/storage/data_stores/main/push_rule.py
@@ -16,7 +16,7 @@
 
 import abc
 import logging
-from typing import Union
+from typing import List, Tuple, Union
 
 from canonicaljson import json
 
@@ -348,23 +348,53 @@ class PushRulesWorkerStore(
             results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
         return results
 
-    def get_all_push_rule_updates(self, last_id, current_id, limit):
-        """Get all the push rules changes that have happend on the server"""
+    async def get_all_push_rule_updates(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for push_rules replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
         if last_id == current_id:
-            return defer.succeed([])
+            return [], current_id, False
 
         def get_all_push_rule_updates_txn(txn):
-            sql = (
-                "SELECT stream_id, event_stream_ordering, user_id, rule_id,"
-                " op, priority_class, priority, conditions, actions"
-                " FROM push_rules_stream"
-                " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC LIMIT ?"
-            )
+            sql = """
+                SELECT stream_id, user_id
+                FROM push_rules_stream
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """
             txn.execute(sql, (last_id, current_id, limit))
-            return txn.fetchall()
+            updates = [(stream_id, (user_id,)) for stream_id, user_id in txn]
+
+            limited = False
+            upper_bound = current_id
+            if len(updates) == limit:
+                limited = True
+                upper_bound = updates[-1][0]
+
+            return updates, upper_bound, limited
 
-        return self.db.runInteraction(
+        return await self.db.runInteraction(
             "get_all_push_rule_updates", get_all_push_rule_updates_txn
         )
 
diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index cebdcd409f..8f5505bd67 100644
--- a/synapse/storage/data_stores/main/receipts.py
+++ b/synapse/storage/data_stores/main/receipts.py
@@ -16,6 +16,7 @@
 
 import abc
 import logging
+from typing import List, Tuple
 
 from canonicaljson import json
 
@@ -24,6 +25,7 @@ from twisted.internet import defer
 from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
 from synapse.storage.database import Database
 from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.util.async_helpers import ObservableDeferred
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
@@ -266,26 +268,79 @@ class ReceiptsWorkerStore(SQLBaseStore):
         }
         return results
 
-    def get_all_updated_receipts(self, last_id, current_id, limit=None):
+    def get_users_sent_receipts_between(self, last_id: int, current_id: int):
+        """Get all users who sent receipts between `last_id` exclusive and
+        `current_id` inclusive.
+
+        Returns:
+            Deferred[List[str]]
+        """
+
         if last_id == current_id:
             return defer.succeed([])
 
-        def get_all_updated_receipts_txn(txn):
-            sql = (
-                "SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
-                " FROM receipts_linearized"
-                " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC"
-            )
-            args = [last_id, current_id]
-            if limit is not None:
-                sql += " LIMIT ?"
-                args.append(limit)
-            txn.execute(sql, args)
+        def _get_users_sent_receipts_between_txn(txn):
+            sql = """
+                SELECT DISTINCT user_id FROM receipts_linearized
+                WHERE ? < stream_id AND stream_id <= ?
+            """
+            txn.execute(sql, (last_id, current_id))
 
-            return [r[0:5] + (json.loads(r[5]),) for r in txn]
+            return [r[0] for r in txn]
 
         return self.db.runInteraction(
+            "get_users_sent_receipts_between", _get_users_sent_receipts_between_txn
+        )
+
+    async def get_all_updated_receipts(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for receipts replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
+        if last_id == current_id:
+            return [], current_id, False
+
+        def get_all_updated_receipts_txn(txn):
+            sql = """
+                SELECT stream_id, room_id, receipt_type, user_id, event_id, data
+                FROM receipts_linearized
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """
+            txn.execute(sql, (last_id, current_id, limit))
+
+            updates = [(r[0], r[1:5] + (json.loads(r[5]),)) for r in txn]
+
+            limited = False
+            upper_bound = current_id
+
+            if len(updates) == limit:
+                limited = True
+                upper_bound = updates[-1][0]
+
+            return updates, upper_bound, limited
+
+        return await self.db.runInteraction(
             "get_all_updated_receipts", get_all_updated_receipts_txn
         )
 
@@ -300,10 +355,10 @@ class ReceiptsWorkerStore(SQLBaseStore):
             room_id, None, update_metrics=False
         )
 
-        # first handle the Deferred case
-        if isinstance(res, defer.Deferred):
-            if res.called:
-                res = res.result
+        # first handle the ObservableDeferred case
+        if isinstance(res, ObservableDeferred):
+            if res.has_called():
+                res = res.get_result()
             else:
                 res = None
 
diff --git a/synapse/storage/data_stores/main/schema/delta/30/as_users.py b/synapse/storage/data_stores/main/schema/delta/30/as_users.py
index 9b95411fb6..b42c02710a 100644
--- a/synapse/storage/data_stores/main/schema/delta/30/as_users.py
+++ b/synapse/storage/data_stores/main/schema/delta/30/as_users.py
@@ -13,8 +13,6 @@
 # limitations under the License.
 import logging
 
-from six.moves import range
-
 from synapse.config.appservice import load_appservices
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index 13f49d8060..a8381dc577 100644
--- a/synapse/storage/data_stores/main/search.py
+++ b/synapse/storage/data_stores/main/search.py
@@ -17,8 +17,6 @@ import logging
 import re
 from collections import namedtuple
 
-from six import string_types
-
 from canonicaljson import json
 
 from twisted.internet import defer
@@ -180,7 +178,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
                     # skip over it.
                     continue
 
-                if not isinstance(value, string_types):
+                if not isinstance(value, str):
                     # If the event body, name or topic isn't a string
                     # then skip over it
                     continue
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index e89f0bffb5..379d758b5d 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -40,8 +40,6 @@ import abc
 import logging
 from collections import namedtuple
 
-from six.moves import range
-
 from twisted.internet import defer
 
 from synapse.logging.context import make_deferred_yieldable, run_in_background
diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index 4219018302..f8c776be3f 100644
--- a/synapse/storage/data_stores/main/tags.py
+++ b/synapse/storage/data_stores/main/tags.py
@@ -16,8 +16,6 @@
 
 import logging
 
-from six.moves import range
-
 from canonicaljson import json
 
 from twisted.internet import defer
diff --git a/synapse/storage/data_stores/state/store.py b/synapse/storage/data_stores/state/store.py
index b720212e55..5db9f20135 100644
--- a/synapse/storage/data_stores/state/store.py
+++ b/synapse/storage/data_stores/state/store.py
@@ -17,8 +17,6 @@ import logging
 from collections import namedtuple
 from typing import Dict, Iterable, List, Set, Tuple
 
-from six.moves import range
-
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 645a70934c..3be20c866a 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -16,6 +16,7 @@
 # limitations under the License.
 import logging
 import time
+from sys import intern
 from time import monotonic as monotonic_time
 from typing import (
     Any,
@@ -29,8 +30,6 @@ from typing import (
     TypeVar,
 )
 
-from six.moves import intern, range
-
 from prometheus_client import Histogram
 
 from twisted.enterprise import adbapi
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 92dfd709bc..ec894a91cb 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -20,8 +20,6 @@ import logging
 from collections import deque, namedtuple
 from typing import Iterable, List, Optional, Set, Tuple
 
-from six.moves import range
-
 from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer