summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-03-03 12:12:15 +0000
committerErik Johnston <erik@matrix.org>2021-03-03 12:12:15 +0000
commit8965291b8711c5d92e87b7d63a505482acc47dae (patch)
treeed949b11c913cd064971a9b0931ff1e46fc9d1cb
parentMerge remote-tracking branch 'origin/develop' into matrix-org-hotfixes (diff)
parentMake deleting stale pushers a background update (#9536) (diff)
downloadsynapse-8965291b8711c5d92e87b7d63a505482acc47dae.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
-rw-r--r--changelog.d/9512.feature1
-rw-r--r--changelog.d/9536.bugfix1
-rw-r--r--docs/reverse_proxy.md2
-rw-r--r--synapse/storage/databases/main/pusher.py52
-rw-r--r--synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql3
5 files changed, 58 insertions, 1 deletions
diff --git a/changelog.d/9512.feature b/changelog.d/9512.feature
new file mode 100644
index 0000000000..06cfd5d199
--- /dev/null
+++ b/changelog.d/9512.feature
@@ -0,0 +1 @@
+Add support for `X-Forwarded-Proto` header when using a reverse proxy.
diff --git a/changelog.d/9536.bugfix b/changelog.d/9536.bugfix
new file mode 100644
index 0000000000..2ab4f315c1
--- /dev/null
+++ b/changelog.d/9536.bugfix
@@ -0,0 +1 @@
+Fix deleting pushers when using sharded pushers.
diff --git a/docs/reverse_proxy.md b/docs/reverse_proxy.md
index bb7caa8bb9..81e5a68a36 100644
--- a/docs/reverse_proxy.md
+++ b/docs/reverse_proxy.md
@@ -53,6 +53,8 @@ server {
         proxy_pass http://localhost:8008;
         proxy_set_header X-Forwarded-For $remote_addr;
         proxy_set_header X-Forwarded-Proto $scheme;
+        proxy_set_header Host $host;
+
         # Nginx by default only allows file uploads up to 1M in size
         # Increase client_max_body_size to match max_upload_size defined in homeserver.yaml
         client_max_body_size 50M;
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 6b608ebc9b..85f1ebac98 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -44,6 +44,11 @@ class PusherWorkerStore(SQLBaseStore):
             self._remove_deactivated_pushers,
         )
 
+        self.db_pool.updates.register_background_update_handler(
+            "remove_stale_pushers",
+            self._remove_stale_pushers,
+        )
+
     def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
         """JSON-decode the data in the rows returned from the `pushers` table
 
@@ -337,6 +342,53 @@ class PusherWorkerStore(SQLBaseStore):
 
         return number_deleted
 
+    async def _remove_stale_pushers(self, progress: dict, batch_size: int) -> int:
+        """A background update that deletes all pushers for logged out devices.
+
+        Note that we don't proacively tell the pusherpool that we've deleted
+        these (just because its a bit off a faff to do from here), but they will
+        get cleaned up at the next restart
+        """
+
+        last_pusher = progress.get("last_pusher", 0)
+
+        def _delete_pushers(txn) -> int:
+
+            sql = """
+                SELECT p.id, access_token FROM pushers AS p
+                LEFT JOIN access_tokens AS a ON (p.access_token = a.id)
+                WHERE p.id > ?
+                ORDER BY p.id ASC
+                LIMIT ?
+            """
+
+            txn.execute(sql, (last_pusher, batch_size))
+            pushers = [(row[0], row[1]) for row in txn]
+
+            self.db_pool.simple_delete_many_txn(
+                txn,
+                table="pushers",
+                column="id",
+                iterable=(pusher_id for pusher_id, token in pushers if token is None),
+                keyvalues={},
+            )
+
+            if pushers:
+                self.db_pool.updates._background_update_progress_txn(
+                    txn, "remove_stale_pushers", {"last_pusher": pushers[-1][0]}
+                )
+
+            return len(pushers)
+
+        number_deleted = await self.db_pool.runInteraction(
+            "_remove_stale_pushers", _delete_pushers
+        )
+
+        if number_deleted < batch_size:
+            await self.db_pool.updates._end_background_update("remove_stale_pushers")
+
+        return number_deleted
+
 
 class PusherStore(PusherWorkerStore):
     def get_pushers_stream_token(self) -> int:
diff --git a/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql b/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql
index 2442eea6bc..85196db288 100644
--- a/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql
+++ b/synapse/storage/databases/main/schema/delta/59/08delete_stale_pushers.sql
@@ -16,4 +16,5 @@
 
 -- Delete all pushers associated with deleted devices. This is to clear up after
 -- a bug where they weren't correctly deleted when using workers.
-DELETE FROM pushers WHERE access_token NOT IN (SELECT id FROM access_tokens);
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (5908, 'remove_stale_pushers', '{}');