summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/data_stores/main/cache.py44
-rw-r--r--synapse/storage/data_stores/main/deviceinbox.py54
-rw-r--r--synapse/storage/data_stores/main/devices.py70
-rw-r--r--synapse/storage/data_stores/main/end_to_end_keys.py65
-rw-r--r--synapse/storage/data_stores/main/events.py119
-rw-r--r--synapse/storage/data_stores/main/events_worker.py11
-rw-r--r--synapse/storage/data_stores/main/group_server.py52
-rw-r--r--synapse/storage/data_stores/main/purge_events.py1
-rw-r--r--synapse/storage/data_stores/main/pusher.py108
-rw-r--r--synapse/storage/data_stores/main/room.py41
-rw-r--r--synapse/storage/data_stores/main/schema/delta/25/fts.py6
-rw-r--r--synapse/storage/data_stores/main/schema/delta/27/ts.py6
-rw-r--r--synapse/storage/data_stores/main/schema/delta/31/search_update.py6
-rw-r--r--synapse/storage/data_stores/main/schema/delta/33/event_fields.py6
-rw-r--r--synapse/storage/data_stores/main/tags.py45
-rw-r--r--synapse/storage/data_stores/main/ui_auth.py2
-rw-r--r--synapse/storage/engines/postgres.py2
-rw-r--r--synapse/storage/persist_events.py6
-rw-r--r--synapse/storage/types.py2
19 files changed, 365 insertions, 281 deletions
diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py
index eac5a4e55b..f39f556c20 100644
--- a/synapse/storage/data_stores/main/cache.py
+++ b/synapse/storage/data_stores/main/cache.py
@@ -16,10 +16,12 @@
 
 import itertools
 import logging
-from typing import Any, Iterable, Optional, Tuple
+from typing import Any, Iterable, List, Optional, Tuple
 
 from synapse.api.constants import EventTypes
+from synapse.replication.tcp.streams import BackfillStream, CachesStream
 from synapse.replication.tcp.streams.events import (
+    EventsStream,
     EventsStreamCurrentStateRow,
     EventsStreamEventRow,
 )
@@ -44,13 +46,30 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
 
     async def get_all_updated_caches(
         self, instance_name: str, last_id: int, current_id: int, limit: int
-    ):
-        """Fetches cache invalidation rows between the two given IDs written
-        by the given instance. Returns at most `limit` rows.
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for caches replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
 
         if last_id == current_id:
-            return []
+            return [], current_id, False
 
         def get_all_updated_caches_txn(txn):
             # We purposefully don't bound by the current token, as we want to
@@ -64,17 +83,24 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
                 LIMIT ?
             """
             txn.execute(sql, (last_id, instance_name, limit))
-            return txn.fetchall()
+            updates = [(row[0], row[1:]) for row in txn]
+            limited = False
+            upto_token = current_id
+            if len(updates) >= limit:
+                upto_token = updates[-1][0]
+                limited = True
+
+            return updates, upto_token, limited
 
         return await self.db.runInteraction(
             "get_all_updated_caches", get_all_updated_caches_txn
         )
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
-        if stream_name == "events":
+        if stream_name == EventsStream.NAME:
             for row in rows:
                 self._process_event_stream_row(token, row)
-        elif stream_name == "backfill":
+        elif stream_name == BackfillStream.NAME:
             for row in rows:
                 self._invalidate_caches_for_event(
                     -token,
@@ -86,7 +112,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
                     row.relates_to,
                     backfilled=True,
                 )
-        elif stream_name == "caches":
+        elif stream_name == CachesStream.NAME:
             if self._cache_id_gen:
                 self._cache_id_gen.advance(instance_name, token)
 
diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py
index 9a1178fb39..d313b9705f 100644
--- a/synapse/storage/data_stores/main/deviceinbox.py
+++ b/synapse/storage/data_stores/main/deviceinbox.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import logging
+from typing import List, Tuple
 
 from canonicaljson import json
 
@@ -207,31 +208,46 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             "delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn
         )
 
-    def get_all_new_device_messages(self, last_pos, current_pos, limit):
-        """
+    async def get_all_new_device_messages(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for to device replication stream.
+
         Args:
-            last_pos(int):
-            current_pos(int):
-            limit(int):
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
         Returns:
-            A deferred list of rows from the device inbox
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
-        if last_pos == current_pos:
-            return defer.succeed([])
+
+        if last_id == current_id:
+            return [], current_id, False
 
         def get_all_new_device_messages_txn(txn):
             # We limit like this as we might have multiple rows per stream_id, and
             # we want to make sure we always get all entries for any stream_id
             # we return.
-            upper_pos = min(current_pos, last_pos + limit)
+            upper_pos = min(current_id, last_id + limit)
             sql = (
                 "SELECT max(stream_id), user_id"
                 " FROM device_inbox"
                 " WHERE ? < stream_id AND stream_id <= ?"
                 " GROUP BY user_id"
             )
-            txn.execute(sql, (last_pos, upper_pos))
-            rows = txn.fetchall()
+            txn.execute(sql, (last_id, upper_pos))
+            updates = [(row[0], row[1:]) for row in txn]
 
             sql = (
                 "SELECT max(stream_id), destination"
@@ -239,15 +255,21 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 " WHERE ? < stream_id AND stream_id <= ?"
                 " GROUP BY destination"
             )
-            txn.execute(sql, (last_pos, upper_pos))
-            rows.extend(txn)
+            txn.execute(sql, (last_id, upper_pos))
+            updates.extend((row[0], row[1:]) for row in txn)
 
             # Order by ascending stream ordering
-            rows.sort()
+            updates.sort()
 
-            return rows
+            limited = False
+            upto_token = current_id
+            if len(updates) >= limit:
+                upto_token = updates[-1][0]
+                limited = True
 
-        return self.db.runInteraction(
+            return updates, upto_token, limited
+
+        return await self.db.runInteraction(
             "get_all_new_device_messages", get_all_new_device_messages_txn
         )
 
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index 0ff0542453..343cf9a2d5 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -582,32 +582,58 @@ class DeviceWorkerStore(SQLBaseStore):
             return set()
 
     async def get_all_device_list_changes_for_remotes(
-        self, from_key: int, to_key: int, limit: int,
-    ) -> List[Tuple[int, str]]:
-        """Return a list of `(stream_id, entity)` which is the combined list of
-        changes to devices and which destinations need to be poked. Entity is
-        either a user ID (starting with '@') or a remote destination.
-        """
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for device lists replication stream.
 
-        # This query Does The Right Thing where it'll correctly apply the
-        # bounds to the inner queries.
-        sql = """
-            SELECT stream_id, entity FROM (
-                SELECT stream_id, user_id AS entity FROM device_lists_stream
-                UNION ALL
-                SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes
-            ) AS e
-            WHERE ? < stream_id AND stream_id <= ?
-            LIMIT ?
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
 
-        return await self.db.execute(
+        if last_id == current_id:
+            return [], current_id, False
+
+        def _get_all_device_list_changes_for_remotes(txn):
+            # This query Does The Right Thing where it'll correctly apply the
+            # bounds to the inner queries.
+            sql = """
+                SELECT stream_id, entity FROM (
+                    SELECT stream_id, user_id AS entity FROM device_lists_stream
+                    UNION ALL
+                    SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes
+                ) AS e
+                WHERE ? < stream_id AND stream_id <= ?
+                LIMIT ?
+            """
+
+            txn.execute(sql, (last_id, current_id, limit))
+            updates = [(row[0], row[1:]) for row in txn]
+            limited = False
+            upto_token = current_id
+            if len(updates) >= limit:
+                upto_token = updates[-1][0]
+                limited = True
+
+            return updates, upto_token, limited
+
+        return await self.db.runInteraction(
             "get_all_device_list_changes_for_remotes",
-            None,
-            sql,
-            from_key,
-            to_key,
-            limit,
+            _get_all_device_list_changes_for_remotes,
         )
 
     @cached(max_entries=10000)
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index 1a0842d4b0..6c3cff82e1 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -14,7 +14,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from typing import Dict, List
+from typing import Dict, List, Tuple
 
 from canonicaljson import encode_canonical_json, json
 
@@ -479,34 +479,61 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
 
         return result
 
-    def get_all_user_signature_changes_for_remotes(self, from_key, to_key, limit):
-        """Return a list of changes from the user signature stream to notify remotes.
+    async def get_all_user_signature_changes_for_remotes(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for groups replication stream.
+
         Note that the user signature stream represents when a user signs their
         device with their user-signing key, which is not published to other
         users or servers, so no `destination` is needed in the returned
         list. However, this is needed to poke workers.
 
         Args:
-            from_key (int): the stream ID to start at (exclusive)
-            to_key (int): the stream ID to end at (inclusive)
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
 
         Returns:
-            Deferred[list[(int,str)]] a list of `(stream_id, user_id)`
-        """
-        sql = """
-            SELECT stream_id, from_user_id AS user_id
-            FROM user_signature_stream
-            WHERE ? < stream_id AND stream_id <= ?
-            ORDER BY stream_id ASC
-            LIMIT ?
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
-        return self.db.execute(
+
+        if last_id == current_id:
+            return [], current_id, False
+
+        def _get_all_user_signature_changes_for_remotes_txn(txn):
+            sql = """
+                SELECT stream_id, from_user_id AS user_id
+                FROM user_signature_stream
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """
+            txn.execute(sql, (last_id, current_id, limit))
+
+            updates = [(row[0], (row[1:])) for row in txn]
+
+            limited = False
+            upto_token = current_id
+            if len(updates) >= limit:
+                upto_token = updates[-1][0]
+                limited = True
+
+            return updates, upto_token, limited
+
+        return await self.db.runInteraction(
             "get_all_user_signature_changes_for_remotes",
-            None,
-            sql,
-            from_key,
-            to_key,
-            limit,
+            _get_all_user_signature_changes_for_remotes_txn,
         )
 
 
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index cfd24d2f06..230fb5cd7f 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -14,7 +14,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import itertools
 import logging
 from collections import OrderedDict, namedtuple
@@ -28,12 +27,7 @@ from prometheus_client import Counter
 from twisted.internet import defer
 
 import synapse.metrics
-from synapse.api.constants import (
-    EventContentFields,
-    EventTypes,
-    Membership,
-    RelationTypes,
-)
+from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
 from synapse.api.room_versions import RoomVersions
 from synapse.crypto.event_signing import compute_event_reference_hash
 from synapse.events import EventBase  # noqa: F401
@@ -48,8 +42,8 @@ from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.iterutils import batch_iter
 
 if TYPE_CHECKING:
-    from synapse.storage.data_stores.main import DataStore
     from synapse.server import HomeServer
+    from synapse.storage.data_stores.main import DataStore
 
 
 logger = logging.getLogger(__name__)
@@ -820,7 +814,6 @@ class PersistEventsStore:
             "event_reference_hashes",
             "event_search",
             "event_to_state_groups",
-            "local_invites",
             "state_events",
             "rejections",
             "redactions",
@@ -1197,65 +1190,27 @@ class PersistEventsStore:
                 (event.state_key,),
             )
 
-            # We update the local_invites table only if the event is "current",
-            # i.e., its something that has just happened. If the event is an
-            # outlier it is only current if its an "out of band membership",
-            # like a remote invite or a rejection of a remote invite.
-            is_new_state = not backfilled and (
-                not event.internal_metadata.is_outlier()
-                or event.internal_metadata.is_out_of_band_membership()
-            )
-            is_mine = self.is_mine_id(event.state_key)
-            if is_new_state and is_mine:
-                if event.membership == Membership.INVITE:
-                    self.db.simple_insert_txn(
-                        txn,
-                        table="local_invites",
-                        values={
-                            "event_id": event.event_id,
-                            "invitee": event.state_key,
-                            "inviter": event.sender,
-                            "room_id": event.room_id,
-                            "stream_id": event.internal_metadata.stream_ordering,
-                        },
-                    )
-                else:
-                    sql = (
-                        "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE"
-                        " room_id = ? AND invitee = ? AND locally_rejected is NULL"
-                        " AND replaced_by is NULL"
-                    )
-
-                    txn.execute(
-                        sql,
-                        (
-                            event.internal_metadata.stream_ordering,
-                            event.event_id,
-                            event.room_id,
-                            event.state_key,
-                        ),
-                    )
-
-                # We also update the `local_current_membership` table with
-                # latest invite info. This will usually get updated by the
-                # `current_state_events` handling, unless its an outlier.
-                if event.internal_metadata.is_outlier():
-                    # This should only happen for out of band memberships, so
-                    # we add a paranoia check.
-                    assert event.internal_metadata.is_out_of_band_membership()
-
-                    self.db.simple_upsert_txn(
-                        txn,
-                        table="local_current_membership",
-                        keyvalues={
-                            "room_id": event.room_id,
-                            "user_id": event.state_key,
-                        },
-                        values={
-                            "event_id": event.event_id,
-                            "membership": event.membership,
-                        },
-                    )
+            # We update the local_current_membership table only if the event is
+            # "current", i.e., its something that has just happened.
+            #
+            # This will usually get updated by the `current_state_events` handling,
+            # unless its an outlier, and an outlier is only "current" if it's an "out of
+            # band membership", like a remote invite or a rejection of a remote invite.
+            if (
+                self.is_mine_id(event.state_key)
+                and not backfilled
+                and event.internal_metadata.is_outlier()
+                and event.internal_metadata.is_out_of_band_membership()
+            ):
+                self.db.simple_upsert_txn(
+                    txn,
+                    table="local_current_membership",
+                    keyvalues={"room_id": event.room_id, "user_id": event.state_key},
+                    values={
+                        "event_id": event.event_id,
+                        "membership": event.membership,
+                    },
+                )
 
     def _handle_event_relations(self, txn, event):
         """Handles inserting relation data during peristence of events
@@ -1586,31 +1541,3 @@ class PersistEventsStore:
                 if not ev.internal_metadata.is_outlier()
             ],
         )
-
-    async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
-        """Mark the invite has having been rejected even though we failed to
-        create a leave event for it.
-        """
-
-        sql = (
-            "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
-            " room_id = ? AND invitee = ? AND locally_rejected is NULL"
-            " AND replaced_by is NULL"
-        )
-
-        def f(txn, stream_ordering):
-            txn.execute(sql, (stream_ordering, True, room_id, user_id))
-
-            # We also clear this entry from `local_current_membership`.
-            # Ideally we'd point to a leave event, but we don't have one, so
-            # nevermind.
-            self.db.simple_delete_txn(
-                txn,
-                table="local_current_membership",
-                keyvalues={"room_id": room_id, "user_id": user_id},
-            )
-
-        with self._stream_id_gen.get_next() as stream_ordering:
-            await self.db.runInteraction("locally_reject_invite", f, stream_ordering)
-
-        return stream_ordering
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index a48c7a96ca..01cad7d4fa 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -38,6 +38,8 @@ from synapse.events.utils import prune_event
 from synapse.logging.context import PreserveLoggingContext, current_context
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.replication.tcp.streams import BackfillStream
+from synapse.replication.tcp.streams.events import EventsStream
 from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
 from synapse.storage.database import Database
 from synapse.storage.util.id_generators import StreamIdGenerator
@@ -80,10 +82,7 @@ class EventsWorkerStore(SQLBaseStore):
             # We are the process in charge of generating stream ids for events,
             # so instantiate ID generators based on the database
             self._stream_id_gen = StreamIdGenerator(
-                db_conn,
-                "events",
-                "stream_ordering",
-                extra_tables=[("local_invites", "stream_id")],
+                db_conn, "events", "stream_ordering",
             )
             self._backfill_id_gen = StreamIdGenerator(
                 db_conn,
@@ -113,9 +112,9 @@ class EventsWorkerStore(SQLBaseStore):
         self._event_fetch_ongoing = 0
 
     def process_replication_rows(self, stream_name, instance_name, token, rows):
-        if stream_name == "events":
+        if stream_name == EventsStream.NAME:
             self._stream_id_gen.advance(token)
-        elif stream_name == "backfill":
+        elif stream_name == BackfillStream.NAME:
             self._backfill_id_gen.advance(-token)
 
         super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py
index fb1361f1c1..4fb9f9850c 100644
--- a/synapse/storage/data_stores/main/group_server.py
+++ b/synapse/storage/data_stores/main/group_server.py
@@ -14,6 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from typing import List, Tuple
+
 from canonicaljson import json
 
 from twisted.internet import defer
@@ -526,13 +528,35 @@ class GroupServerWorkerStore(SQLBaseStore):
             "get_groups_changes_for_user", _get_groups_changes_for_user_txn
         )
 
-    def get_all_groups_changes(self, from_token, to_token, limit):
-        from_token = int(from_token)
-        has_changed = self._group_updates_stream_cache.has_any_entity_changed(
-            from_token
-        )
+    async def get_all_groups_changes(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for groups replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
+        last_id = int(last_id)
+        has_changed = self._group_updates_stream_cache.has_any_entity_changed(last_id)
+
         if not has_changed:
-            return defer.succeed([])
+            return [], current_id, False
 
         def _get_all_groups_changes_txn(txn):
             sql = """
@@ -541,13 +565,21 @@ class GroupServerWorkerStore(SQLBaseStore):
                 WHERE ? < stream_id AND stream_id <= ?
                 LIMIT ?
             """
-            txn.execute(sql, (from_token, to_token, limit))
-            return [
-                (stream_id, group_id, user_id, gtype, json.loads(content_json))
+            txn.execute(sql, (last_id, current_id, limit))
+            updates = [
+                (stream_id, (group_id, user_id, gtype, json.loads(content_json)))
                 for stream_id, group_id, user_id, gtype, content_json in txn
             ]
 
-        return self.db.runInteraction(
+            limited = False
+            upto_token = current_id
+            if len(updates) >= limit:
+                upto_token = updates[-1][0]
+                limited = True
+
+            return updates, upto_token, limited
+
+        return await self.db.runInteraction(
             "get_all_groups_changes", _get_all_groups_changes_txn
         )
 
diff --git a/synapse/storage/data_stores/main/purge_events.py b/synapse/storage/data_stores/main/purge_events.py
index a93e1ef198..6546569139 100644
--- a/synapse/storage/data_stores/main/purge_events.py
+++ b/synapse/storage/data_stores/main/purge_events.py
@@ -361,7 +361,6 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
             "event_push_summary",
             "pusher_throttle",
             "group_summary_rooms",
-            "local_invites",
             "room_account_data",
             "room_tags",
             "local_current_membership",
diff --git a/synapse/storage/data_stores/main/pusher.py b/synapse/storage/data_stores/main/pusher.py
index 547b9d69cb..5461016240 100644
--- a/synapse/storage/data_stores/main/pusher.py
+++ b/synapse/storage/data_stores/main/pusher.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 
 import logging
-from typing import Iterable, Iterator
+from typing import Iterable, Iterator, List, Tuple
 
 from canonicaljson import encode_canonical_json, json
 
@@ -98,77 +98,69 @@ class PusherWorkerStore(SQLBaseStore):
         rows = yield self.db.runInteraction("get_all_pushers", get_pushers)
         return rows
 
-    def get_all_updated_pushers(self, last_id, current_id, limit):
-        if last_id == current_id:
-            return defer.succeed(([], []))
-
-        def get_all_updated_pushers_txn(txn):
-            sql = (
-                "SELECT id, user_name, access_token, profile_tag, kind,"
-                " app_id, app_display_name, device_display_name, pushkey, ts,"
-                " lang, data"
-                " FROM pushers"
-                " WHERE ? < id AND id <= ?"
-                " ORDER BY id ASC LIMIT ?"
-            )
-            txn.execute(sql, (last_id, current_id, limit))
-            updated = txn.fetchall()
-
-            sql = (
-                "SELECT stream_id, user_id, app_id, pushkey"
-                " FROM deleted_pushers"
-                " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC LIMIT ?"
-            )
-            txn.execute(sql, (last_id, current_id, limit))
-            deleted = txn.fetchall()
+    async def get_all_updated_pushers_rows(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for pushers replication stream.
 
-            return updated, deleted
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
 
-        return self.db.runInteraction(
-            "get_all_updated_pushers", get_all_updated_pushers_txn
-        )
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
 
-    def get_all_updated_pushers_rows(self, last_id, current_id, limit):
-        """Get all the pushers that have changed between the given tokens.
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
 
-        Returns:
-            Deferred(list(tuple)): each tuple consists of:
-                stream_id (str)
-                user_id (str)
-                app_id (str)
-                pushkey (str)
-                was_deleted (bool): whether the pusher was added/updated (False)
-                    or deleted (True)
+            The updates are a list of 2-tuples of stream ID and the row data
         """
 
         if last_id == current_id:
-            return defer.succeed([])
+            return [], current_id, False
 
         def get_all_updated_pushers_rows_txn(txn):
-            sql = (
-                "SELECT id, user_name, app_id, pushkey"
-                " FROM pushers"
-                " WHERE ? < id AND id <= ?"
-                " ORDER BY id ASC LIMIT ?"
-            )
+            sql = """
+                SELECT id, user_name, app_id, pushkey
+                FROM pushers
+                WHERE ? < id AND id <= ?
+                ORDER BY id ASC LIMIT ?
+            """
             txn.execute(sql, (last_id, current_id, limit))
-            results = [list(row) + [False] for row in txn]
-
-            sql = (
-                "SELECT stream_id, user_id, app_id, pushkey"
-                " FROM deleted_pushers"
-                " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC LIMIT ?"
-            )
+            updates = [
+                (stream_id, (user_name, app_id, pushkey, False))
+                for stream_id, user_name, app_id, pushkey in txn
+            ]
+
+            sql = """
+                SELECT stream_id, user_id, app_id, pushkey
+                FROM deleted_pushers
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC LIMIT ?
+            """
             txn.execute(sql, (last_id, current_id, limit))
+            updates.extend(
+                (stream_id, (user_name, app_id, pushkey, True))
+                for stream_id, user_name, app_id, pushkey in txn
+            )
+
+            updates.sort()  # Sort so that they're ordered by stream id
 
-            results.extend(list(row) + [True] for row in txn)
-            results.sort()  # Sort so that they're ordered by stream id
+            limited = False
+            upper_bound = current_id
+            if len(updates) >= limit:
+                limited = True
+                upper_bound = updates[-1][0]
 
-            return results
+            return updates, upper_bound, limited
 
-        return self.db.runInteraction(
+        return await self.db.runInteraction(
             "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
         )
 
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index 13e366536a..c473cf158f 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -803,7 +803,32 @@ class RoomWorkerStore(SQLBaseStore):
 
         return total_media_quarantined
 
-    def get_all_new_public_rooms(self, prev_id, current_id, limit):
+    async def get_all_new_public_rooms(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for public rooms replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+        if last_id == current_id:
+            return [], current_id, False
+
         def get_all_new_public_rooms(txn):
             sql = """
                 SELECT stream_id, room_id, visibility, appservice_id, network_id
@@ -813,13 +838,17 @@ class RoomWorkerStore(SQLBaseStore):
                 LIMIT ?
             """
 
-            txn.execute(sql, (prev_id, current_id, limit))
-            return txn.fetchall()
+            txn.execute(sql, (last_id, current_id, limit))
+            updates = [(row[0], row[1:]) for row in txn]
+            limited = False
+            upto_token = current_id
+            if len(updates) >= limit:
+                upto_token = updates[-1][0]
+                limited = True
 
-        if prev_id == current_id:
-            return defer.succeed([])
+            return updates, upto_token, limited
 
-        return self.db.runInteraction(
+        return await self.db.runInteraction(
             "get_all_new_public_rooms", get_all_new_public_rooms
         )
 
diff --git a/synapse/storage/data_stores/main/schema/delta/25/fts.py b/synapse/storage/data_stores/main/schema/delta/25/fts.py
index 4b2ffd35fd..ee675e71ff 100644
--- a/synapse/storage/data_stores/main/schema/delta/25/fts.py
+++ b/synapse/storage/data_stores/main/schema/delta/25/fts.py
@@ -11,11 +11,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import json
 import logging
 
-import simplejson
-
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.storage.prepare_database import get_statements
 
@@ -66,7 +64,7 @@ def run_create(cur, database_engine, *args, **kwargs):
             "max_stream_id_exclusive": max_stream_id + 1,
             "rows_inserted": 0,
         }
-        progress_json = simplejson.dumps(progress)
+        progress_json = json.dumps(progress)
 
         sql = (
             "INSERT into background_updates (update_name, progress_json)"
diff --git a/synapse/storage/data_stores/main/schema/delta/27/ts.py b/synapse/storage/data_stores/main/schema/delta/27/ts.py
index 414f9f5aa0..b7972cfa8e 100644
--- a/synapse/storage/data_stores/main/schema/delta/27/ts.py
+++ b/synapse/storage/data_stores/main/schema/delta/27/ts.py
@@ -11,11 +11,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import json
 import logging
 
-import simplejson
-
 from synapse.storage.prepare_database import get_statements
 
 logger = logging.getLogger(__name__)
@@ -45,7 +43,7 @@ def run_create(cur, database_engine, *args, **kwargs):
             "max_stream_id_exclusive": max_stream_id + 1,
             "rows_inserted": 0,
         }
-        progress_json = simplejson.dumps(progress)
+        progress_json = json.dumps(progress)
 
         sql = (
             "INSERT into background_updates (update_name, progress_json)"
diff --git a/synapse/storage/data_stores/main/schema/delta/31/search_update.py b/synapse/storage/data_stores/main/schema/delta/31/search_update.py
index 7d8ca5f93f..63b757ade6 100644
--- a/synapse/storage/data_stores/main/schema/delta/31/search_update.py
+++ b/synapse/storage/data_stores/main/schema/delta/31/search_update.py
@@ -11,11 +11,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import json
 import logging
 
-import simplejson
-
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.prepare_database import get_statements
 
@@ -50,7 +48,7 @@ def run_create(cur, database_engine, *args, **kwargs):
             "rows_inserted": 0,
             "have_added_indexes": False,
         }
-        progress_json = simplejson.dumps(progress)
+        progress_json = json.dumps(progress)
 
         sql = (
             "INSERT into background_updates (update_name, progress_json)"
diff --git a/synapse/storage/data_stores/main/schema/delta/33/event_fields.py b/synapse/storage/data_stores/main/schema/delta/33/event_fields.py
index bff1256a7b..a3e81eeac7 100644
--- a/synapse/storage/data_stores/main/schema/delta/33/event_fields.py
+++ b/synapse/storage/data_stores/main/schema/delta/33/event_fields.py
@@ -11,11 +11,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import json
 import logging
 
-import simplejson
-
 from synapse.storage.prepare_database import get_statements
 
 logger = logging.getLogger(__name__)
@@ -45,7 +43,7 @@ def run_create(cur, database_engine, *args, **kwargs):
             "max_stream_id_exclusive": max_stream_id + 1,
             "rows_inserted": 0,
         }
-        progress_json = simplejson.dumps(progress)
+        progress_json = json.dumps(progress)
 
         sql = (
             "INSERT into background_updates (update_name, progress_json)"
diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index f8c776be3f..290317fd94 100644
--- a/synapse/storage/data_stores/main/tags.py
+++ b/synapse/storage/data_stores/main/tags.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 import logging
+from typing import List, Tuple
 
 from canonicaljson import json
 
@@ -53,18 +54,32 @@ class TagsWorkerStore(AccountDataWorkerStore):
 
         return deferred
 
-    @defer.inlineCallbacks
-    def get_all_updated_tags(self, last_id, current_id, limit):
-        """Get all the client tags that have changed on the server
+    async def get_all_updated_tags(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for tags replication stream.
+
         Args:
-            last_id(int): The position to fetch from.
-            current_id(int): The position to fetch up to.
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
         Returns:
-            A deferred list of tuples of stream_id int, user_id string,
-            room_id string, tag string and content string.
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
+
         if last_id == current_id:
-            return []
+            return [], current_id, False
 
         def get_all_updated_tags_txn(txn):
             sql = (
@@ -76,7 +91,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
             txn.execute(sql, (last_id, current_id, limit))
             return txn.fetchall()
 
-        tag_ids = yield self.db.runInteraction(
+        tag_ids = await self.db.runInteraction(
             "get_all_updated_tags", get_all_updated_tags_txn
         )
 
@@ -89,21 +104,27 @@ class TagsWorkerStore(AccountDataWorkerStore):
                 for tag, content in txn:
                     tags.append(json.dumps(tag) + ":" + content)
                 tag_json = "{" + ",".join(tags) + "}"
-                results.append((stream_id, user_id, room_id, tag_json))
+                results.append((stream_id, (user_id, room_id, tag_json)))
 
             return results
 
         batch_size = 50
         results = []
         for i in range(0, len(tag_ids), batch_size):
-            tags = yield self.db.runInteraction(
+            tags = await self.db.runInteraction(
                 "get_all_updated_tag_content",
                 get_tag_content,
                 tag_ids[i : i + batch_size],
             )
             results.extend(tags)
 
-        return results
+        limited = False
+        upto_token = current_id
+        if len(results) >= limit:
+            upto_token = results[-1][0]
+            limited = True
+
+        return results, upto_token, limited
 
     @defer.inlineCallbacks
     def get_updated_tags(self, user_id, stream_id):
diff --git a/synapse/storage/data_stores/main/ui_auth.py b/synapse/storage/data_stores/main/ui_auth.py
index ec2f38c373..4c044b1a15 100644
--- a/synapse/storage/data_stores/main/ui_auth.py
+++ b/synapse/storage/data_stores/main/ui_auth.py
@@ -17,10 +17,10 @@ from typing import Any, Dict, Optional, Union
 
 import attr
 
-import synapse.util.stringutils as stringutils
 from synapse.api.errors import StoreError
 from synapse.storage._base import SQLBaseStore
 from synapse.types import JsonDict
+from synapse.util import stringutils as stringutils
 
 
 @attr.s
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 6c7d08a6f2..a31588080d 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -92,7 +92,7 @@ class PostgresEngine(BaseDatabaseEngine):
             errors.append("    - 'COLLATE' is set to %r. Should be 'C'" % (collation,))
 
         if ctype != "C":
-            errors.append("    - 'CTYPE' is set to %r. Should be 'C'" % (collation,))
+            errors.append("    - 'CTYPE' is set to %r. Should be 'C'" % (ctype,))
 
         if errors:
             raise IncorrectDatabaseSetup(
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index ec894a91cb..fa46041676 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -783,9 +783,3 @@ class EventsPersistenceStorage(object):
 
         for user_id in left_users:
             await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id)
-
-    async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
-        """Mark the invite has having been rejected even though we failed to
-        create a leave event for it.
-        """
-        return await self.persist_events_store.locally_reject_invite(user_id, room_id)
diff --git a/synapse/storage/types.py b/synapse/storage/types.py
index daff81c5ee..2d2b560e74 100644
--- a/synapse/storage/types.py
+++ b/synapse/storage/types.py
@@ -12,12 +12,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 from typing import Any, Iterable, Iterator, List, Tuple
 
 from typing_extensions import Protocol
 
-
 """
 Some very basic protocol definitions for the DB-API2 classes specified in PEP-249
 """