summary refs log tree commit diff
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2021-04-23 16:24:31 +0100
committerAndrew Morgan <andrew@amorgan.xyz>2021-04-23 16:24:31 +0100
commit96f8556e08cfcf8fa146150cce100dddb87cb4b9 (patch)
tree8931d2c6262aaa4baa9daf109877fbbc841e773f
parentMerge commit 'f00c4e7af' into anoa/dinsic_release_1_31_0 (diff)
parentUpdate nginx reverse-proxy docs (#9512) (diff)
downloadsynapse-96f8556e08cfcf8fa146150cce100dddb87cb4b9.tar.xz
Merge commit 'a5daae2a5' into anoa/dinsic_release_1_31_0
-rw-r--r--changelog.d/9358.misc1
-rw-r--r--changelog.d/9503.bugfix1
-rw-r--r--changelog.d/9512.feature1
-rw-r--r--changelog.d/9516.bugfix1
-rw-r--r--changelog.d/9530.bugfix1
-rw-r--r--docs/reverse_proxy.md2
-rwxr-xr-xscripts/synapse_port_db2
-rw-r--r--synapse/handlers/presence.py31
-rw-r--r--synapse/handlers/sync.py3
-rw-r--r--synapse/storage/databases/main/pusher.py53
-rw-r--r--synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql9
-rw-r--r--synapse/storage/databases/main/schema/delta/59/09rejected_events_metadata.sql (renamed from synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql)9
-rw-r--r--synapse/util/caches/response_cache.py34
13 files changed, 94 insertions, 54 deletions
diff --git a/changelog.d/9358.misc b/changelog.d/9358.misc
deleted file mode 100644
index cc7614afc0..0000000000
--- a/changelog.d/9358.misc
+++ /dev/null
@@ -1 +0,0 @@
-Added a fix that invalidates cache for empty timed-out sync responses.
\ No newline at end of file
diff --git a/changelog.d/9503.bugfix b/changelog.d/9503.bugfix
new file mode 100644
index 0000000000..0868691389
--- /dev/null
+++ b/changelog.d/9503.bugfix
@@ -0,0 +1 @@
+Fix missing chain cover index due to a schema delta not being applied correctly. Only affected servers that ran development versions.
diff --git a/changelog.d/9512.feature b/changelog.d/9512.feature
new file mode 100644
index 0000000000..06cfd5d199
--- /dev/null
+++ b/changelog.d/9512.feature
@@ -0,0 +1 @@
+Add support for `X-Forwarded-Proto` header when using a reverse proxy.
diff --git a/changelog.d/9516.bugfix b/changelog.d/9516.bugfix
new file mode 100644
index 0000000000..81188c5473
--- /dev/null
+++ b/changelog.d/9516.bugfix
@@ -0,0 +1 @@
+Fix a bug where users' pushers were not all deleted when they deactivated their account.
diff --git a/changelog.d/9530.bugfix b/changelog.d/9530.bugfix
new file mode 100644
index 0000000000..bb4db675d9
--- /dev/null
+++ b/changelog.d/9530.bugfix
@@ -0,0 +1 @@
+Prevent presence background jobs from running when presence is disabled.
\ No newline at end of file
diff --git a/docs/reverse_proxy.md b/docs/reverse_proxy.md
index bb7caa8bb9..81e5a68a36 100644
--- a/docs/reverse_proxy.md
+++ b/docs/reverse_proxy.md
@@ -53,6 +53,8 @@ server {
         proxy_pass http://localhost:8008;
         proxy_set_header X-Forwarded-For $remote_addr;
         proxy_set_header X-Forwarded-Proto $scheme;
+        proxy_set_header Host $host;
+
         # Nginx by default only allows file uploads up to 1M in size
         # Increase client_max_body_size to match max_upload_size defined in homeserver.yaml
         client_max_body_size 50M;
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index 83c53d9887..2e4aabdd26 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -47,6 +47,7 @@ from synapse.storage.databases.main.events_bg_updates import (
 from synapse.storage.databases.main.media_repository import (
     MediaRepositoryBackgroundUpdateStore,
 )
+from synapse.storage.databases.main.pusher import PusherWorkerStore
 from synapse.storage.databases.main.profile import ProfileStore
 from synapse.storage.databases.main.registration import (
     RegistrationBackgroundUpdateStore,
@@ -179,6 +180,7 @@ class Store(
     UserDirectoryBackgroundUpdateStore,
     EndToEndKeyBackgroundStore,
     StatsStore,
+    PusherWorkerStore,
 ):
     def execute(self, f, *args, **kwargs):
         return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index b6a9ce4f38..54631b4ee2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -274,22 +274,25 @@ class PresenceHandler(BasePresenceHandler):
 
         self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
 
-        # Start a LoopingCall in 30s that fires every 5s.
-        # The initial delay is to allow disconnected clients a chance to
-        # reconnect before we treat them as offline.
-        def run_timeout_handler():
-            return run_as_background_process(
-                "handle_presence_timeouts", self._handle_timeouts
-            )
-
-        self.clock.call_later(30, self.clock.looping_call, run_timeout_handler, 5000)
+        if self._presence_enabled:
+            # Start a LoopingCall in 30s that fires every 5s.
+            # The initial delay is to allow disconnected clients a chance to
+            # reconnect before we treat them as offline.
+            def run_timeout_handler():
+                return run_as_background_process(
+                    "handle_presence_timeouts", self._handle_timeouts
+                )
 
-        def run_persister():
-            return run_as_background_process(
-                "persist_presence_changes", self._persist_unpersisted_changes
+            self.clock.call_later(
+                30, self.clock.looping_call, run_timeout_handler, 5000
             )
 
-        self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000)
+            def run_persister():
+                return run_as_background_process(
+                    "persist_presence_changes", self._persist_unpersisted_changes
+                )
+
+            self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000)
 
         LaterGauge(
             "synapse_handlers_presence_wheel_timer_size",
@@ -299,7 +302,7 @@ class PresenceHandler(BasePresenceHandler):
         )
 
         # Used to handle sending of presence to newly joined users/servers
-        if hs.config.use_presence:
+        if self._presence_enabled:
             self.notifier.add_replication_callback(self.notify_new_event)
 
         # Presence is best effort and quickly heals itself, so lets just always
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b45b179fed..9059382246 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -291,9 +291,8 @@ class SyncHandler:
         user_id = sync_config.user.to_string()
         await self.auth.check_auth_blocking(requester=requester)
 
-        res = await self.response_cache.wrap_conditional(
+        res = await self.response_cache.wrap(
             sync_config.request_key,
-            lambda result: since_token != result.next_batch,
             self._wait_for_sync_for_user,
             sync_config,
             since_token,
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 74219cb05e..6b608ebc9b 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -39,6 +39,11 @@ class PusherWorkerStore(SQLBaseStore):
             db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
         )
 
+        self.db_pool.updates.register_background_update_handler(
+            "remove_deactivated_pushers",
+            self._remove_deactivated_pushers,
+        )
+
     def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
         """JSON-decode the data in the rows returned from the `pushers` table
 
@@ -284,6 +289,54 @@ class PusherWorkerStore(SQLBaseStore):
             lock=False,
         )
 
+    async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int:
+        """A background update that deletes all pushers for deactivated users.
+
+        Note that we don't proacively tell the pusherpool that we've deleted
+        these (just because its a bit off a faff to do from here), but they will
+        get cleaned up at the next restart
+        """
+
+        last_user = progress.get("last_user", "")
+
+        def _delete_pushers(txn) -> int:
+
+            sql = """
+                SELECT name FROM users
+                WHERE deactivated = ? and name > ?
+                ORDER BY name ASC
+                LIMIT ?
+            """
+
+            txn.execute(sql, (1, last_user, batch_size))
+            users = [row[0] for row in txn]
+
+            self.db_pool.simple_delete_many_txn(
+                txn,
+                table="pushers",
+                column="user_name",
+                iterable=users,
+                keyvalues={},
+            )
+
+            if users:
+                self.db_pool.updates._background_update_progress_txn(
+                    txn, "remove_deactivated_pushers", {"last_user": users[-1]}
+                )
+
+            return len(users)
+
+        number_deleted = await self.db_pool.runInteraction(
+            "_remove_deactivated_pushers", _delete_pushers
+        )
+
+        if number_deleted < batch_size:
+            await self.db_pool.updates._end_background_update(
+                "remove_deactivated_pushers"
+            )
+
+        return number_deleted
+
 
 class PusherStore(PusherWorkerStore):
     def get_pushers_stream_token(self) -> int:
diff --git a/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql b/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql
index 20ba4abca3..0ec6764150 100644
--- a/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql
+++ b/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql
@@ -14,8 +14,7 @@
  */
 
 
--- We may not have deleted all pushers for deactivated accounts. Do so now.
---
--- Note: We don't bother updating the `deleted_pushers` table as it's just use
--- to stop pushers on workers, and that will happen when they get next restarted.
-DELETE FROM pushers WHERE user_name IN (SELECT name FROM users WHERE deactivated = 1);
+-- We may not have deleted all pushers for deactivated accounts, so we set up a
+-- background job to delete them.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (5908, 'remove_deactivated_pushers', '{}');
diff --git a/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql b/synapse/storage/databases/main/schema/delta/59/09rejected_events_metadata.sql
index 9c95646281..cc9b267c7d 100644
--- a/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql
+++ b/synapse/storage/databases/main/schema/delta/59/09rejected_events_metadata.sql
@@ -13,5 +13,14 @@
  * limitations under the License.
  */
 
+-- This originally was in 58/, but landed after 59/ was created, and so some
+-- servers running develop didn't run this delta. Running it again should be
+-- safe.
+--
+-- We first delete any in progress `rejected_events_metadata` background update,
+-- to ensure that we don't conflict when trying to insert the new one. (We could
+-- alternatively do an ON CONFLICT DO NOTHING, but that syntax isn't supported
+-- by older SQLite versions. Plus, this should be a rare case).
+DELETE FROM background_updates WHERE update_name = 'rejected_events_metadata';
 INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
   (5828, 'rejected_events_metadata', '{}');
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 53f85195a7..32228f42ee 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, Set, TypeVar
+from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, TypeVar
 
 from twisted.internet import defer
 
@@ -40,7 +40,6 @@ class ResponseCache(Generic[T]):
     def __init__(self, hs: "HomeServer", name: str, timeout_ms: float = 0):
         # Requests that haven't finished yet.
         self.pending_result_cache = {}  # type: Dict[T, ObservableDeferred]
-        self.pending_conditionals = {}  # type: Dict[T, Set[Callable[[Any], bool]]]
 
         self.clock = hs.get_clock()
         self.timeout_sec = timeout_ms / 1000.0
@@ -102,11 +101,7 @@ class ResponseCache(Generic[T]):
         self.pending_result_cache[key] = result
 
         def remove(r):
-            should_cache = all(
-                func(r) for func in self.pending_conditionals.pop(key, [])
-            )
-
-            if self.timeout_sec and should_cache:
+            if self.timeout_sec:
                 self.clock.call_later(
                     self.timeout_sec, self.pending_result_cache.pop, key, None
                 )
@@ -117,31 +112,6 @@ class ResponseCache(Generic[T]):
         result.addBoth(remove)
         return result.observe()
 
-    def add_conditional(self, key: T, conditional: Callable[[Any], bool]):
-        self.pending_conditionals.setdefault(key, set()).add(conditional)
-
-    def wrap_conditional(
-        self,
-        key: T,
-        should_cache: Callable[[Any], bool],
-        callback: "Callable[..., Any]",
-        *args: Any,
-        **kwargs: Any
-    ) -> defer.Deferred:
-        """The same as wrap(), but adds a conditional to the final execution.
-
-        When the final execution completes, *all* conditionals need to return True for it to properly cache,
-        else it'll not be cached in a timed fashion.
-        """
-
-        # See if there's already a result on this key that hasn't yet completed. Due to the single-threaded nature of
-        # python, adding a key immediately in the same execution thread will not cause a race condition.
-        result = self.get(key)
-        if not result or isinstance(result, defer.Deferred) and not result.called:
-            self.add_conditional(key, should_cache)
-
-        return self.wrap(key, callback, *args, **kwargs)
-
     def wrap(
         self, key: T, callback: "Callable[..., Any]", *args: Any, **kwargs: Any
     ) -> defer.Deferred: