summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/config/stats.py9
-rw-r--r--synapse/handlers/message.py18
-rw-r--r--synapse/handlers/receipts.py15
-rw-r--r--synapse/handlers/stats.py27
-rw-r--r--synapse/handlers/typing.py14
-rw-r--r--synapse/rest/admin/rooms.py9
-rw-r--r--synapse/rest/client/v1/room.py112
-rw-r--r--synapse/storage/databases/main/event_federation.py4
-rw-r--r--synapse/storage/databases/main/events.py4
-rw-r--r--synapse/storage/databases/main/metrics.py4
-rw-r--r--synapse/storage/databases/main/purge_events.py2
-rw-r--r--synapse/storage/databases/main/room.py104
-rw-r--r--synapse/storage/databases/main/stats.py299
-rw-r--r--synapse/storage/schema/__init__.py6
-rw-r--r--synapse/storage/schema/main/delta/61/01change_appservices_txns.sql.postgres23
-rw-r--r--synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql18
-rw-r--r--synapse/storage/schema/main/delta/61/03recreate_min_depth.py70
17 files changed, 368 insertions, 370 deletions
diff --git a/synapse/config/stats.py b/synapse/config/stats.py
index 78f61fe9da..6f253e00c0 100644
--- a/synapse/config/stats.py
+++ b/synapse/config/stats.py
@@ -38,13 +38,9 @@ class StatsConfig(Config):
 
     def read_config(self, config, **kwargs):
         self.stats_enabled = True
-        self.stats_bucket_size = 86400 * 1000
         stats_config = config.get("stats", None)
         if stats_config:
             self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
-            self.stats_bucket_size = self.parse_duration(
-                stats_config.get("bucket_size", "1d")
-            )
         if not self.stats_enabled:
             logger.warning(ROOM_STATS_DISABLED_WARN)
 
@@ -59,9 +55,4 @@ class StatsConfig(Config):
           # correctly.
           #
           #enabled: false
-
-          # The size of each timeslice in the room_stats_historical and
-          # user_stats_historical tables, as a time period. Defaults to "1d".
-          #
-          #bucket_size: 1h
         """
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 66e40a915d..e06655f3d4 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -518,6 +518,9 @@ class EventCreationHandler:
             outlier: Indicates whether the event is an `outlier`, i.e. if
                 it's from an arbitrary point and floating in the DAG as
                 opposed to being inline with the current DAG.
+            historical: Indicates whether the message is being inserted
+                back in time around some existing events. This is used to skip
+                a few checks and mark the event as backfilled.
             depth: Override the depth used to order the event in the DAG.
                 Should normally be set to None, which will cause the depth to be calculated
                 based on the prev_events.
@@ -772,6 +775,7 @@ class EventCreationHandler:
         txn_id: Optional[str] = None,
         ignore_shadow_ban: bool = False,
         outlier: bool = False,
+        historical: bool = False,
         depth: Optional[int] = None,
     ) -> Tuple[EventBase, int]:
         """
@@ -799,6 +803,9 @@ class EventCreationHandler:
             outlier: Indicates whether the event is an `outlier`, i.e. if
                 it's from an arbitrary point and floating in the DAG as
                 opposed to being inline with the current DAG.
+            historical: Indicates whether the message is being inserted
+                back in time around some existing events. This is used to skip
+                a few checks and mark the event as backfilled.
             depth: Override the depth used to order the event in the DAG.
                 Should normally be set to None, which will cause the depth to be calculated
                 based on the prev_events.
@@ -847,6 +854,7 @@ class EventCreationHandler:
                 prev_event_ids=prev_event_ids,
                 auth_event_ids=auth_event_ids,
                 outlier=outlier,
+                historical=historical,
                 depth=depth,
             )
 
@@ -1594,11 +1602,13 @@ class EventCreationHandler:
         for k, v in original_event.internal_metadata.get_dict().items():
             setattr(builder.internal_metadata, k, v)
 
-        # the event type hasn't changed, so there's no point in re-calculating the
-        # auth events.
+        # modules can send new state events, so we re-calculate the auth events just in
+        # case.
+        prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id)
+
         event = await builder.build(
-            prev_event_ids=original_event.prev_event_ids(),
-            auth_event_ids=original_event.auth_event_ids(),
+            prev_event_ids=prev_event_ids,
+            auth_event_ids=None,
         )
 
         # we rebuild the event context, to be on the safe side. If nothing else,
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index f782d9db32..0059ad0f56 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -30,6 +30,8 @@ class ReceiptsHandler(BaseHandler):
 
         self.server_name = hs.config.server_name
         self.store = hs.get_datastore()
+        self.event_auth_handler = hs.get_event_auth_handler()
+
         self.hs = hs
 
         # We only need to poke the federation sender explicitly if its on the
@@ -59,6 +61,19 @@ class ReceiptsHandler(BaseHandler):
         """Called when we receive an EDU of type m.receipt from a remote HS."""
         receipts = []
         for room_id, room_values in content.items():
+            # If we're not in the room just ditch the event entirely. This is
+            # probably an old server that has come back and thinks we're still in
+            # the room (or we've been rejoined to the room by a state reset).
+            is_in_room = await self.event_auth_handler.check_host_in_room(
+                room_id, self.server_name
+            )
+            if not is_in_room:
+                logger.info(
+                    "Ignoring receipt from %s as we're not in the room",
+                    origin,
+                )
+                continue
+
             for receipt_type, users in room_values.items():
                 for user_id, user_values in users.items():
                     if get_domain_from_id(user_id) != origin:
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 4e45d1da57..814d08efcb 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -45,7 +45,6 @@ class StatsHandler:
         self.clock = hs.get_clock()
         self.notifier = hs.get_notifier()
         self.is_mine_id = hs.is_mine_id
-        self.stats_bucket_size = hs.config.stats_bucket_size
 
         self.stats_enabled = hs.config.stats_enabled
 
@@ -106,20 +105,6 @@ class StatsHandler:
                 room_deltas = {}
                 user_deltas = {}
 
-            # Then count deltas for total_events and total_event_bytes.
-            (
-                room_count,
-                user_count,
-            ) = await self.store.get_changes_room_total_events_and_bytes(
-                self.pos, max_pos
-            )
-
-            for room_id, fields in room_count.items():
-                room_deltas.setdefault(room_id, Counter()).update(fields)
-
-            for user_id, fields in user_count.items():
-                user_deltas.setdefault(user_id, Counter()).update(fields)
-
             logger.debug("room_deltas: %s", room_deltas)
             logger.debug("user_deltas: %s", user_deltas)
 
@@ -181,12 +166,10 @@ class StatsHandler:
 
             event_content = {}  # type: JsonDict
 
-            sender = None
             if event_id is not None:
                 event = await self.store.get_event(event_id, allow_none=True)
                 if event:
                     event_content = event.content or {}
-                    sender = event.sender
 
             # All the values in this dict are deltas (RELATIVE changes)
             room_stats_delta = room_to_stats_deltas.setdefault(room_id, Counter())
@@ -244,12 +227,6 @@ class StatsHandler:
                     room_stats_delta["joined_members"] += 1
                 elif membership == Membership.INVITE:
                     room_stats_delta["invited_members"] += 1
-
-                    if sender and self.is_mine_id(sender):
-                        user_to_stats_deltas.setdefault(sender, Counter())[
-                            "invites_sent"
-                        ] += 1
-
                 elif membership == Membership.LEAVE:
                     room_stats_delta["left_members"] += 1
                 elif membership == Membership.BAN:
@@ -279,10 +256,6 @@ class StatsHandler:
                 room_state["is_federatable"] = (
                     event_content.get("m.federate", True) is True
                 )
-                if sender and self.is_mine_id(sender):
-                    user_to_stats_deltas.setdefault(sender, Counter())[
-                        "rooms_created"
-                    ] += 1
             elif typ == EventTypes.JoinRules:
                 room_state["join_rules"] = event_content.get("join_rule")
             elif typ == EventTypes.RoomHistoryVisibility:
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index e22393adc4..c0a8364755 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -208,6 +208,7 @@ class TypingWriterHandler(FollowerTypingHandler):
 
         self.auth = hs.get_auth()
         self.notifier = hs.get_notifier()
+        self.event_auth_handler = hs.get_event_auth_handler()
 
         self.hs = hs
 
@@ -326,6 +327,19 @@ class TypingWriterHandler(FollowerTypingHandler):
         room_id = content["room_id"]
         user_id = content["user_id"]
 
+        # If we're not in the room just ditch the event entirely. This is
+        # probably an old server that has come back and thinks we're still in
+        # the room (or we've been rejoined to the room by a state reset).
+        is_in_room = await self.event_auth_handler.check_host_in_room(
+            room_id, self.server_name
+        )
+        if not is_in_room:
+            logger.info(
+                "Ignoring typing update from %s as we're not in the room",
+                origin,
+            )
+            return
+
         member = RoomMember(user_id=user_id, room_id=room_id)
 
         # Check that the string is a valid user id
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index f0cddd2d2c..3c51a742bf 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -462,6 +462,7 @@ class MakeRoomAdminRestServlet(ResolveRoomIdMixin, RestServlet):
         super().__init__(hs)
         self.hs = hs
         self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
         self.event_creation_handler = hs.get_event_creation_handler()
         self.state_handler = hs.get_state_handler()
         self.is_mine_id = hs.is_mine_id
@@ -500,7 +501,13 @@ class MakeRoomAdminRestServlet(ResolveRoomIdMixin, RestServlet):
             admin_user_id = None
 
             for admin_user in reversed(admin_users):
-                if room_state.get((EventTypes.Member, admin_user)):
+                (
+                    current_membership_type,
+                    _,
+                ) = await self.store.get_local_current_membership_for_user_in_room(
+                    admin_user, room_id
+                )
+                if current_membership_type == "join":
                     admin_user_id = admin_user
                     break
 
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 92ebe838fd..9c58e3689e 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -349,6 +349,35 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
 
         return depth
 
+    def _create_insertion_event_dict(
+        self, sender: str, room_id: str, origin_server_ts: int
+    ):
+        """Creates an event dict for an "insertion" event with the proper fields
+        and a random chunk ID.
+
+        Args:
+            sender: The event author MXID
+            room_id: The room ID that the event belongs to
+            origin_server_ts: Timestamp when the event was sent
+
+        Returns:
+            Tuple of event ID and stream ordering position
+        """
+
+        next_chunk_id = random_string(8)
+        insertion_event = {
+            "type": EventTypes.MSC2716_INSERTION,
+            "sender": sender,
+            "room_id": room_id,
+            "content": {
+                EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id,
+                EventContentFields.MSC2716_HISTORICAL: True,
+            },
+            "origin_server_ts": origin_server_ts,
+        }
+
+        return insertion_event
+
     async def on_POST(self, request, room_id):
         requester = await self.auth.get_user_by_req(request, allow_guest=False)
 
@@ -449,37 +478,68 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
 
         events_to_create = body["events"]
 
-        # If provided, connect the chunk to the last insertion point
-        # The chunk ID passed in comes from the chunk_id in the
-        # "insertion" event from the previous chunk.
+        prev_event_ids = prev_events_from_query
+        inherited_depth = await self.inherit_depth_from_prev_ids(prev_events_from_query)
+
+        # Figure out which chunk to connect to. If they passed in
+        # chunk_id_from_query let's use it. The chunk ID passed in comes
+        # from the chunk_id in the "insertion" event from the previous chunk.
+        last_event_in_chunk = events_to_create[-1]
+        chunk_id_to_connect_to = chunk_id_from_query
+        base_insertion_event = None
         if chunk_id_from_query:
-            last_event_in_chunk = events_to_create[-1]
-            last_event_in_chunk["content"][
-                EventContentFields.MSC2716_CHUNK_ID
-            ] = chunk_id_from_query
+            # TODO: Verify the chunk_id_from_query corresponds to an insertion event
+            pass
+        # Otherwise, create an insertion event to act as a starting point.
+        #
+        # We don't always have an insertion event to start hanging more history
+        # off of (ideally there would be one in the main DAG, but that's not the
+        # case if we're wanting to add history to e.g. existing rooms without
+        # an insertion event), in which case we just create a new insertion event
+        # that can then get pointed to by a "marker" event later.
+        else:
+            base_insertion_event_dict = self._create_insertion_event_dict(
+                sender=requester.user.to_string(),
+                room_id=room_id,
+                origin_server_ts=last_event_in_chunk["origin_server_ts"],
+            )
+            base_insertion_event_dict["prev_events"] = prev_event_ids.copy()
 
-        # Add an "insertion" event to the start of each chunk (next to the oldest
+            (
+                base_insertion_event,
+                _,
+            ) = await self.event_creation_handler.create_and_send_nonmember_event(
+                requester,
+                base_insertion_event_dict,
+                prev_event_ids=base_insertion_event_dict.get("prev_events"),
+                auth_event_ids=auth_event_ids,
+                historical=True,
+                depth=inherited_depth,
+            )
+
+            chunk_id_to_connect_to = base_insertion_event["content"][
+                EventContentFields.MSC2716_NEXT_CHUNK_ID
+            ]
+
+        # Connect this current chunk to the insertion event from the previous chunk
+        last_event_in_chunk["content"][
+            EventContentFields.MSC2716_CHUNK_ID
+        ] = chunk_id_to_connect_to
+
+        # Add an "insertion" event to the start of each chunk (next to the oldest-in-time
         # event in the chunk) so the next chunk can be connected to this one.
-        next_chunk_id = random_string(64)
-        insertion_event = {
-            "type": EventTypes.MSC2716_INSERTION,
-            "sender": requester.user.to_string(),
-            "content": {
-                EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id,
-                EventContentFields.MSC2716_HISTORICAL: True,
-            },
+        insertion_event = self._create_insertion_event_dict(
+            sender=requester.user.to_string(),
+            room_id=room_id,
             # Since the insertion event is put at the start of the chunk,
-            # where the oldest event is, copy the origin_server_ts from
+            # where the oldest-in-time event is, copy the origin_server_ts from
             # the first event we're inserting
-            "origin_server_ts": events_to_create[0]["origin_server_ts"],
-        }
+            origin_server_ts=events_to_create[0]["origin_server_ts"],
+        )
         # Prepend the insertion event to the start of the chunk
         events_to_create = [insertion_event] + events_to_create
 
-        inherited_depth = await self.inherit_depth_from_prev_ids(prev_events_from_query)
-
         event_ids = []
-        prev_event_ids = prev_events_from_query
         events_to_persist = []
         for ev in events_to_create:
             assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
@@ -533,10 +593,16 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
                 context=context,
             )
 
+        # Add the base_insertion_event to the bottom of the list we return
+        if base_insertion_event is not None:
+            event_ids.append(base_insertion_event.event_id)
+
         return 200, {
             "state_events": auth_event_ids,
             "events": event_ids,
-            "next_chunk_id": next_chunk_id,
+            "next_chunk_id": insertion_event["content"][
+                EventContentFields.MSC2716_NEXT_CHUNK_ID
+            ],
         }
 
     def on_GET(self, request, room_id):
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index c4474df975..4e06938849 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1230,7 +1230,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
                 "SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging"
             )
 
-            (age,) = txn.fetchone()
+            (received_ts,) = txn.fetchone()
+
+            age = self._clock.time_msec() - received_ts
 
             return count, age
 
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 897fa06639..08c580b0dc 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1580,11 +1580,11 @@ class PersistEventsStore:
         # invalidate the cache for the redacted event
         txn.call_after(self.store._invalidate_get_event_cache, event.redacts)
 
-        self.db_pool.simple_insert_txn(
+        self.db_pool.simple_upsert_txn(
             txn,
             table="redactions",
+            keyvalues={"event_id": event.event_id},
             values={
-                "event_id": event.event_id,
                 "redacts": event.redacts,
                 "received_ts": self._clock.time_msec(),
             },
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index c3f551d377..e3a544d9b2 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -320,7 +320,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
         """
         Returns millisecond unixtime for start of UTC day.
         """
-        now = time.gmtime()
+        now = time.gmtime(self._clock.time())
         today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
         return today_start * 1000
 
@@ -352,7 +352,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
                     ) udv
                     ON u.user_id = udv.user_id AND u.device_id=udv.device_id
                     INNER JOIN users ON users.name=u.user_id
-                    WHERE last_seen > ? AND last_seen <= ?
+                    WHERE ? <= last_seen AND last_seen < ?
                     AND udv.timestamp IS NULL AND users.is_guest=0
                     AND users.appservice_id IS NULL
                     GROUP BY u.user_id, u.device_id
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 7fb7780d0f..eb4841830d 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -215,6 +215,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
             "event_relations",
             "event_search",
             "rejections",
+            "redactions",
         ):
             logger.info("[purge] removing events from %s", table)
 
@@ -392,7 +393,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
             "room_memberships",
             "room_stats_state",
             "room_stats_current",
-            "room_stats_historical",
             "room_stats_earliest_token",
             "rooms",
             "stream_ordering_to_exterm",
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 9f0d64a325..6ddafe5434 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -25,6 +25,7 @@ from synapse.api.room_versions import RoomVersion, RoomVersions
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool, LoggingTransaction
 from synapse.storage.databases.main.search import SearchStore
+from synapse.storage.types import Cursor
 from synapse.types import JsonDict, ThirdPartyInstanceID
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
@@ -1022,10 +1023,22 @@ class RoomWorkerStore(SQLBaseStore):
         )
 
 
-class RoomBackgroundUpdateStore(SQLBaseStore):
+class _BackgroundUpdates:
     REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
     ADD_ROOMS_ROOM_VERSION_COLUMN = "add_rooms_room_version_column"
+    POPULATE_ROOM_DEPTH_MIN_DEPTH2 = "populate_room_depth_min_depth2"
+    REPLACE_ROOM_DEPTH_MIN_DEPTH = "replace_room_depth_min_depth"
+
+
+_REPLACE_ROOM_DEPTH_SQL_COMMANDS = (
+    "DROP TRIGGER populate_min_depth2_trigger ON room_depth",
+    "DROP FUNCTION populate_min_depth2()",
+    "ALTER TABLE room_depth DROP COLUMN min_depth",
+    "ALTER TABLE room_depth RENAME COLUMN min_depth2 TO min_depth",
+)
+
 
+class RoomBackgroundUpdateStore(SQLBaseStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
         super().__init__(database, db_conn, hs)
 
@@ -1037,15 +1050,25 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
         )
 
         self.db_pool.updates.register_background_update_handler(
-            self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE,
+            _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE,
             self._remove_tombstoned_rooms_from_directory,
         )
 
         self.db_pool.updates.register_background_update_handler(
-            self.ADD_ROOMS_ROOM_VERSION_COLUMN,
+            _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN,
             self._background_add_rooms_room_version_column,
         )
 
+        # BG updates to change the type of room_depth.min_depth
+        self.db_pool.updates.register_background_update_handler(
+            _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2,
+            self._background_populate_room_depth_min_depth2,
+        )
+        self.db_pool.updates.register_background_update_handler(
+            _BackgroundUpdates.REPLACE_ROOM_DEPTH_MIN_DEPTH,
+            self._background_replace_room_depth_min_depth,
+        )
+
     async def _background_insert_retention(self, progress, batch_size):
         """Retrieves a list of all rooms within a range and inserts an entry for each of
         them into the room_retention table.
@@ -1164,7 +1187,9 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
                 new_last_room_id = room_id
 
             self.db_pool.updates._background_update_progress_txn(
-                txn, self.ADD_ROOMS_ROOM_VERSION_COLUMN, {"room_id": new_last_room_id}
+                txn,
+                _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN,
+                {"room_id": new_last_room_id},
             )
 
             return False
@@ -1176,7 +1201,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
 
         if end:
             await self.db_pool.updates._end_background_update(
-                self.ADD_ROOMS_ROOM_VERSION_COLUMN
+                _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN
             )
 
         return batch_size
@@ -1215,7 +1240,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
 
         if not rooms:
             await self.db_pool.updates._end_background_update(
-                self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE
+                _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE
             )
             return 0
 
@@ -1224,7 +1249,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
             await self.set_room_is_public(room_id, False)
 
         await self.db_pool.updates._background_update_progress(
-            self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]}
+            _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]}
         )
 
         return len(rooms)
@@ -1268,6 +1293,71 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
 
         return max_ordering is None
 
+    async def _background_populate_room_depth_min_depth2(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Populate room_depth.min_depth2
+
+        This is to deal with the fact that min_depth was initially created as a
+        32-bit integer field.
+        """
+
+        def process(txn: Cursor) -> int:
+            last_room = progress.get("last_room", "")
+            txn.execute(
+                """
+                UPDATE room_depth SET min_depth2=min_depth
+                WHERE room_id IN (
+                   SELECT room_id FROM room_depth WHERE room_id > ?
+                   ORDER BY room_id LIMIT ?
+                )
+                RETURNING room_id;
+                """,
+                (last_room, batch_size),
+            )
+            row_count = txn.rowcount
+            if row_count == 0:
+                return 0
+            last_room = max(row[0] for row in txn)
+            logger.info("populated room_depth up to %s", last_room)
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn,
+                _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2,
+                {"last_room": last_room},
+            )
+            return row_count
+
+        result = await self.db_pool.runInteraction(
+            "_background_populate_min_depth2", process
+        )
+
+        if result != 0:
+            return result
+
+        await self.db_pool.updates._end_background_update(
+            _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2
+        )
+        return 0
+
+    async def _background_replace_room_depth_min_depth(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Drop the old 'min_depth' column and rename 'min_depth2' into its place."""
+
+        def process(txn: Cursor) -> None:
+            for sql in _REPLACE_ROOM_DEPTH_SQL_COMMANDS:
+                logger.info("completing room_depth migration: %s", sql)
+                txn.execute(sql)
+
+        await self.db_pool.runInteraction("_background_replace_room_depth", process)
+
+        await self.db_pool.updates._end_background_update(
+            _BackgroundUpdates.REPLACE_ROOM_DEPTH_MIN_DEPTH,
+        )
+
+        return 0
+
 
 class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 82a1833509..59d67c255b 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -26,7 +26,6 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import StoreError
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.state_deltas import StateDeltasStore
-from synapse.storage.engines import PostgresEngine
 from synapse.types import JsonDict
 from synapse.util.caches.descriptors import cached
 
@@ -49,14 +48,6 @@ ABSOLUTE_STATS_FIELDS = {
     "user": ("joined_rooms",),
 }
 
-# these fields are per-timeslice and so should be reset to 0 upon a new slice
-# You can draw these stats on a histogram.
-# Example: number of events sent locally during a time slice
-PER_SLICE_FIELDS = {
-    "room": ("total_events", "total_event_bytes"),
-    "user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"),
-}
-
 TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
 
 # these are the tables (& ID columns) which contain our actual subjects
@@ -106,7 +97,6 @@ class StatsStore(StateDeltasStore):
         self.server_name = hs.hostname
         self.clock = self.hs.get_clock()
         self.stats_enabled = hs.config.stats_enabled
-        self.stats_bucket_size = hs.config.stats_bucket_size
 
         self.stats_delta_processing_lock = DeferredLock()
 
@@ -122,22 +112,6 @@ class StatsStore(StateDeltasStore):
         self.db_pool.updates.register_noop_background_update("populate_stats_cleanup")
         self.db_pool.updates.register_noop_background_update("populate_stats_prepare")
 
-    def quantise_stats_time(self, ts):
-        """
-        Quantises a timestamp to be a multiple of the bucket size.
-
-        Args:
-            ts (int): the timestamp to quantise, in milliseconds since the Unix
-                Epoch
-
-        Returns:
-            int: a timestamp which
-              - is divisible by the bucket size;
-              - is no later than `ts`; and
-              - is the largest such timestamp.
-        """
-        return (ts // self.stats_bucket_size) * self.stats_bucket_size
-
     async def _populate_stats_process_users(self, progress, batch_size):
         """
         This is a background update which regenerates statistics for users.
@@ -288,56 +262,6 @@ class StatsStore(StateDeltasStore):
             desc="update_room_state",
         )
 
-    async def get_statistics_for_subject(
-        self, stats_type: str, stats_id: str, start: str, size: int = 100
-    ) -> List[dict]:
-        """
-        Get statistics for a given subject.
-
-        Args:
-            stats_type: The type of subject
-            stats_id: The ID of the subject (e.g. room_id or user_id)
-            start: Pagination start. Number of entries, not timestamp.
-            size: How many entries to return.
-
-        Returns:
-            A list of dicts, where the dict has the keys of
-            ABSOLUTE_STATS_FIELDS[stats_type],  and "bucket_size" and "end_ts".
-        """
-        return await self.db_pool.runInteraction(
-            "get_statistics_for_subject",
-            self._get_statistics_for_subject_txn,
-            stats_type,
-            stats_id,
-            start,
-            size,
-        )
-
-    def _get_statistics_for_subject_txn(
-        self, txn, stats_type, stats_id, start, size=100
-    ):
-        """
-        Transaction-bound version of L{get_statistics_for_subject}.
-        """
-
-        table, id_col = TYPE_TO_TABLE[stats_type]
-        selected_columns = list(
-            ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
-        )
-
-        slice_list = self.db_pool.simple_select_list_paginate_txn(
-            txn,
-            table + "_historical",
-            "end_ts",
-            start,
-            size,
-            retcols=selected_columns + ["bucket_size", "end_ts"],
-            keyvalues={id_col: stats_id},
-            order_direction="DESC",
-        )
-
-        return slice_list
-
     @cached()
     async def get_earliest_token_for_stats(
         self, stats_type: str, id: str
@@ -451,14 +375,10 @@ class StatsStore(StateDeltasStore):
 
         table, id_col = TYPE_TO_TABLE[stats_type]
 
-        quantised_ts = self.quantise_stats_time(int(ts))
-        end_ts = quantised_ts + self.stats_bucket_size
-
         # Lets be paranoid and check that all the given field names are known
         abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
-        slice_field_names = PER_SLICE_FIELDS[stats_type]
         for field in chain(fields.keys(), absolute_field_overrides.keys()):
-            if field not in abs_field_names and field not in slice_field_names:
+            if field not in abs_field_names:
                 # guard against potential SQL injection dodginess
                 raise ValueError(
                     "%s is not a recognised field"
@@ -491,20 +411,6 @@ class StatsStore(StateDeltasStore):
             additive_relatives=deltas_of_absolute_fields,
         )
 
-        per_slice_additive_relatives = {
-            key: fields.get(key, 0) for key in slice_field_names
-        }
-        self._upsert_copy_from_table_with_additive_relatives_txn(
-            txn=txn,
-            into_table=table + "_historical",
-            keyvalues={id_col: stats_id},
-            extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
-            extra_dst_keyvalues={"end_ts": end_ts},
-            additive_relatives=per_slice_additive_relatives,
-            src_table=table + "_current",
-            copy_columns=abs_field_names,
-        )
-
     def _upsert_with_additive_relatives_txn(
         self, txn, table, keyvalues, absolutes, additive_relatives
     ):
@@ -528,7 +434,7 @@ class StatsStore(StateDeltasStore):
             ]
 
             relative_updates = [
-                "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+                "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
                 % {"table": table, "field": field}
                 for field in additive_relatives.keys()
             ]
@@ -568,205 +474,13 @@ class StatsStore(StateDeltasStore):
                 self.db_pool.simple_insert_txn(txn, table, merged_dict)
             else:
                 for (key, val) in additive_relatives.items():
-                    current_row[key] += val
+                    if current_row[key] is None:
+                        current_row[key] = val
+                    else:
+                        current_row[key] += val
                 current_row.update(absolutes)
                 self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
 
-    def _upsert_copy_from_table_with_additive_relatives_txn(
-        self,
-        txn,
-        into_table,
-        keyvalues,
-        extra_dst_keyvalues,
-        extra_dst_insvalues,
-        additive_relatives,
-        src_table,
-        copy_columns,
-    ):
-        """Updates the historic stats table with latest updates.
-
-        This involves copying "absolute" fields from the `_current` table, and
-        adding relative fields to any existing values.
-
-        Args:
-             txn: Transaction
-             into_table (str): The destination table to UPSERT the row into
-             keyvalues (dict[str, any]): Row-identifying key values
-             extra_dst_keyvalues (dict[str, any]): Additional keyvalues
-                for `into_table`.
-             extra_dst_insvalues (dict[str, any]): Additional values to insert
-                on new row creation for `into_table`.
-             additive_relatives (dict[str, any]): Fields that will be added onto
-                if existing row present. (Must be disjoint from copy_columns.)
-             src_table (str): The source table to copy from
-             copy_columns (iterable[str]): The list of columns to copy
-        """
-        if self.database_engine.can_native_upsert:
-            ins_columns = chain(
-                keyvalues,
-                copy_columns,
-                additive_relatives,
-                extra_dst_keyvalues,
-                extra_dst_insvalues,
-            )
-            sel_exprs = chain(
-                keyvalues,
-                copy_columns,
-                (
-                    "?"
-                    for _ in chain(
-                        additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
-                    )
-                ),
-            )
-            keyvalues_where = ("%s = ?" % f for f in keyvalues)
-
-            sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
-            sets_ar = (
-                "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
-                for f in additive_relatives
-            )
-
-            sql = """
-                INSERT INTO %(into_table)s (%(ins_columns)s)
-                SELECT %(sel_exprs)s
-                FROM %(src_table)s
-                WHERE %(keyvalues_where)s
-                ON CONFLICT (%(keyvalues)s)
-                DO UPDATE SET %(sets)s
-            """ % {
-                "into_table": into_table,
-                "ins_columns": ", ".join(ins_columns),
-                "sel_exprs": ", ".join(sel_exprs),
-                "keyvalues_where": " AND ".join(keyvalues_where),
-                "src_table": src_table,
-                "keyvalues": ", ".join(
-                    chain(keyvalues.keys(), extra_dst_keyvalues.keys())
-                ),
-                "sets": ", ".join(chain(sets_cc, sets_ar)),
-            }
-
-            qargs = list(
-                chain(
-                    additive_relatives.values(),
-                    extra_dst_keyvalues.values(),
-                    extra_dst_insvalues.values(),
-                    keyvalues.values(),
-                )
-            )
-            txn.execute(sql, qargs)
-        else:
-            self.database_engine.lock_table(txn, into_table)
-            src_row = self.db_pool.simple_select_one_txn(
-                txn, src_table, keyvalues, copy_columns
-            )
-            all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
-            dest_current_row = self.db_pool.simple_select_one_txn(
-                txn,
-                into_table,
-                keyvalues=all_dest_keyvalues,
-                retcols=list(chain(additive_relatives.keys(), copy_columns)),
-                allow_none=True,
-            )
-
-            if dest_current_row is None:
-                merged_dict = {
-                    **keyvalues,
-                    **extra_dst_keyvalues,
-                    **extra_dst_insvalues,
-                    **src_row,
-                    **additive_relatives,
-                }
-                self.db_pool.simple_insert_txn(txn, into_table, merged_dict)
-            else:
-                for (key, val) in additive_relatives.items():
-                    src_row[key] = dest_current_row[key] + val
-                self.db_pool.simple_update_txn(
-                    txn, into_table, all_dest_keyvalues, src_row
-                )
-
-    async def get_changes_room_total_events_and_bytes(
-        self, min_pos: int, max_pos: int
-    ) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
-        """Fetches the counts of events in the given range of stream IDs.
-
-        Args:
-            min_pos
-            max_pos
-
-        Returns:
-            Mapping of room ID to field changes.
-        """
-
-        return await self.db_pool.runInteraction(
-            "stats_incremental_total_events_and_bytes",
-            self.get_changes_room_total_events_and_bytes_txn,
-            min_pos,
-            max_pos,
-        )
-
-    def get_changes_room_total_events_and_bytes_txn(
-        self, txn, low_pos: int, high_pos: int
-    ) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
-        """Gets the total_events and total_event_bytes counts for rooms and
-        senders, in a range of stream_orderings (including backfilled events).
-
-        Args:
-            txn
-            low_pos: Low stream ordering
-            high_pos: High stream ordering
-
-        Returns:
-            The room and user deltas for total_events/total_event_bytes in the
-            format of `stats_id` -> fields
-        """
-
-        if low_pos >= high_pos:
-            # nothing to do here.
-            return {}, {}
-
-        if isinstance(self.database_engine, PostgresEngine):
-            new_bytes_expression = "OCTET_LENGTH(json)"
-        else:
-            new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
-
-        sql = """
-            SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
-            FROM events INNER JOIN event_json USING (event_id)
-            WHERE (? < stream_ordering AND stream_ordering <= ?)
-                OR (? <= stream_ordering AND stream_ordering <= ?)
-            GROUP BY events.room_id
-        """ % (
-            new_bytes_expression,
-        )
-
-        txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
-
-        room_deltas = {
-            room_id: {"total_events": new_events, "total_event_bytes": new_bytes}
-            for room_id, new_events, new_bytes in txn
-        }
-
-        sql = """
-            SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes
-            FROM events INNER JOIN event_json USING (event_id)
-            WHERE (? < stream_ordering AND stream_ordering <= ?)
-                OR (? <= stream_ordering AND stream_ordering <= ?)
-            GROUP BY events.sender
-        """ % (
-            new_bytes_expression,
-        )
-
-        txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
-
-        user_deltas = {
-            user_id: {"total_events": new_events, "total_event_bytes": new_bytes}
-            for user_id, new_events, new_bytes in txn
-            if self.hs.is_mine_id(user_id)
-        }
-
-        return room_deltas, user_deltas
-
     async def _calculate_and_set_initial_state_for_room(
         self, room_id: str
     ) -> Tuple[dict, dict, int]:
@@ -893,6 +607,7 @@ class StatsStore(StateDeltasStore):
                 "invited_members": membership_counts.get(Membership.INVITE, 0),
                 "left_members": membership_counts.get(Membership.LEAVE, 0),
                 "banned_members": membership_counts.get(Membership.BAN, 0),
+                "knocked_members": membership_counts.get(Membership.KNOCK, 0),
                 "local_users_in_room": len(local_users_in_room),
             },
         )
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 0a53b73ccc..36340a652a 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SCHEMA_VERSION = 60
+SCHEMA_VERSION = 61
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
@@ -21,6 +21,10 @@ older versions of Synapse).
 
 See `README.md <synapse/storage/schema/README.md>`_  for more information on how this
 works.
+
+Changes in SCHEMA_VERSION = 61:
+    - The `user_stats_historical` and `room_stats_historical` tables are not written and
+      are not read (previously, they were written but not read).
 """
 
 
diff --git a/synapse/storage/schema/main/delta/61/01change_appservices_txns.sql.postgres b/synapse/storage/schema/main/delta/61/01change_appservices_txns.sql.postgres
new file mode 100644
index 0000000000..c8aec78e60
--- /dev/null
+++ b/synapse/storage/schema/main/delta/61/01change_appservices_txns.sql.postgres
@@ -0,0 +1,23 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- we use bigint elsewhere in the database for appservice txn ids (notably,
+-- application_services_state.last_txn), and generally we use bigints everywhere else
+-- we have monotonic counters, so let's bring this one in line.
+--
+-- assuming there aren't thousands of rows for decommisioned/non-functional ASes, this
+-- table should be pretty small, so safe to do a synchronous ALTER TABLE.
+
+ALTER TABLE application_services_txns ALTER COLUMN txn_id SET DATA TYPE BIGINT;
diff --git a/synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql b/synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql
new file mode 100644
index 0000000000..35ca7a40c0
--- /dev/null
+++ b/synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql
@@ -0,0 +1,18 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- this index is redundant; there is another UNIQUE index on this table.
+DROP INDEX IF EXISTS room_depth_room;
+
diff --git a/synapse/storage/schema/main/delta/61/03recreate_min_depth.py b/synapse/storage/schema/main/delta/61/03recreate_min_depth.py
new file mode 100644
index 0000000000..f8d7db9f2e
--- /dev/null
+++ b/synapse/storage/schema/main/delta/61/03recreate_min_depth.py
@@ -0,0 +1,70 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This migration handles the process of changing the type of `room_depth.min_depth` to
+a BIGINT.
+"""
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
+from synapse.storage.types import Cursor
+
+
+def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
+    if not isinstance(database_engine, PostgresEngine):
+        # this only applies to postgres - sqlite does not distinguish between big and
+        # little ints.
+        return
+
+    # First add a new column to contain the bigger min_depth
+    cur.execute("ALTER TABLE room_depth ADD COLUMN min_depth2 BIGINT")
+
+    # Create a trigger which will keep it populated.
+    cur.execute(
+        """
+        CREATE OR REPLACE FUNCTION populate_min_depth2() RETURNS trigger AS $BODY$
+            BEGIN
+                new.min_depth2 := new.min_depth;
+                RETURN NEW;
+            END;
+        $BODY$ LANGUAGE plpgsql
+        """
+    )
+
+    cur.execute(
+        """
+        CREATE TRIGGER populate_min_depth2_trigger BEFORE INSERT OR UPDATE ON room_depth
+        FOR EACH ROW
+        EXECUTE PROCEDURE populate_min_depth2()
+        """
+    )
+
+    # Start a bg process to populate it for old rooms
+    cur.execute(
+        """
+       INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+            (6103, 'populate_room_depth_min_depth2', '{}')
+       """
+    )
+
+    # and another to switch them over once it completes.
+    cur.execute(
+        """
+        INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
+            (6103, 'replace_room_depth_min_depth', '{}', 'populate_room_depth2')
+        """
+    )
+
+
+def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
+    pass