diff options
author | Andrew Morgan <andrew@amorgan.xyz> | 2021-04-23 16:24:31 +0100 |
---|---|---|
committer | Andrew Morgan <andrew@amorgan.xyz> | 2021-04-23 16:24:31 +0100 |
commit | 96f8556e08cfcf8fa146150cce100dddb87cb4b9 (patch) | |
tree | 8931d2c6262aaa4baa9daf109877fbbc841e773f | |
parent | Merge commit 'f00c4e7af' into anoa/dinsic_release_1_31_0 (diff) | |
parent | Update nginx reverse-proxy docs (#9512) (diff) | |
download | synapse-96f8556e08cfcf8fa146150cce100dddb87cb4b9.tar.xz |
Merge commit 'a5daae2a5' into anoa/dinsic_release_1_31_0
-rw-r--r-- | changelog.d/9358.misc | 1 | ||||
-rw-r--r-- | changelog.d/9503.bugfix | 1 | ||||
-rw-r--r-- | changelog.d/9512.feature | 1 | ||||
-rw-r--r-- | changelog.d/9516.bugfix | 1 | ||||
-rw-r--r-- | changelog.d/9530.bugfix | 1 | ||||
-rw-r--r-- | docs/reverse_proxy.md | 2 | ||||
-rwxr-xr-x | scripts/synapse_port_db | 2 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 31 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 3 | ||||
-rw-r--r-- | synapse/storage/databases/main/pusher.py | 53 | ||||
-rw-r--r-- | synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql | 9 | ||||
-rw-r--r-- | synapse/storage/databases/main/schema/delta/59/09rejected_events_metadata.sql (renamed from synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql) | 9 | ||||
-rw-r--r-- | synapse/util/caches/response_cache.py | 34 |
13 files changed, 94 insertions, 54 deletions
diff --git a/changelog.d/9358.misc b/changelog.d/9358.misc deleted file mode 100644 index cc7614afc0..0000000000 --- a/changelog.d/9358.misc +++ /dev/null @@ -1 +0,0 @@ -Added a fix that invalidates cache for empty timed-out sync responses. \ No newline at end of file diff --git a/changelog.d/9503.bugfix b/changelog.d/9503.bugfix new file mode 100644 index 0000000000..0868691389 --- /dev/null +++ b/changelog.d/9503.bugfix @@ -0,0 +1 @@ +Fix missing chain cover index due to a schema delta not being applied correctly. Only affected servers that ran development versions. diff --git a/changelog.d/9512.feature b/changelog.d/9512.feature new file mode 100644 index 0000000000..06cfd5d199 --- /dev/null +++ b/changelog.d/9512.feature @@ -0,0 +1 @@ +Add support for `X-Forwarded-Proto` header when using a reverse proxy. diff --git a/changelog.d/9516.bugfix b/changelog.d/9516.bugfix new file mode 100644 index 0000000000..81188c5473 --- /dev/null +++ b/changelog.d/9516.bugfix @@ -0,0 +1 @@ +Fix a bug where users' pushers were not all deleted when they deactivated their account. diff --git a/changelog.d/9530.bugfix b/changelog.d/9530.bugfix new file mode 100644 index 0000000000..bb4db675d9 --- /dev/null +++ b/changelog.d/9530.bugfix @@ -0,0 +1 @@ +Prevent presence background jobs from running when presence is disabled. \ No newline at end of file diff --git a/docs/reverse_proxy.md b/docs/reverse_proxy.md index bb7caa8bb9..81e5a68a36 100644 --- a/docs/reverse_proxy.md +++ b/docs/reverse_proxy.md @@ -53,6 +53,8 @@ server { proxy_pass http://localhost:8008; proxy_set_header X-Forwarded-For $remote_addr; proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header Host $host; + # Nginx by default only allows file uploads up to 1M in size # Increase client_max_body_size to match max_upload_size defined in homeserver.yaml client_max_body_size 50M; diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 83c53d9887..2e4aabdd26 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -47,6 +47,7 @@ from synapse.storage.databases.main.events_bg_updates import ( from synapse.storage.databases.main.media_repository import ( MediaRepositoryBackgroundUpdateStore, ) +from synapse.storage.databases.main.pusher import PusherWorkerStore from synapse.storage.databases.main.profile import ProfileStore from synapse.storage.databases.main.registration import ( RegistrationBackgroundUpdateStore, @@ -179,6 +180,7 @@ class Store( UserDirectoryBackgroundUpdateStore, EndToEndKeyBackgroundStore, StatsStore, + PusherWorkerStore, ): def execute(self, f, *args, **kwargs): return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b6a9ce4f38..54631b4ee2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -274,22 +274,25 @@ class PresenceHandler(BasePresenceHandler): self.external_sync_linearizer = Linearizer(name="external_sync_linearizer") - # Start a LoopingCall in 30s that fires every 5s. - # The initial delay is to allow disconnected clients a chance to - # reconnect before we treat them as offline. - def run_timeout_handler(): - return run_as_background_process( - "handle_presence_timeouts", self._handle_timeouts - ) - - self.clock.call_later(30, self.clock.looping_call, run_timeout_handler, 5000) + if self._presence_enabled: + # Start a LoopingCall in 30s that fires every 5s. + # The initial delay is to allow disconnected clients a chance to + # reconnect before we treat them as offline. + def run_timeout_handler(): + return run_as_background_process( + "handle_presence_timeouts", self._handle_timeouts + ) - def run_persister(): - return run_as_background_process( - "persist_presence_changes", self._persist_unpersisted_changes + self.clock.call_later( + 30, self.clock.looping_call, run_timeout_handler, 5000 ) - self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000) + def run_persister(): + return run_as_background_process( + "persist_presence_changes", self._persist_unpersisted_changes + ) + + self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000) LaterGauge( "synapse_handlers_presence_wheel_timer_size", @@ -299,7 +302,7 @@ class PresenceHandler(BasePresenceHandler): ) # Used to handle sending of presence to newly joined users/servers - if hs.config.use_presence: + if self._presence_enabled: self.notifier.add_replication_callback(self.notify_new_event) # Presence is best effort and quickly heals itself, so lets just always diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b45b179fed..9059382246 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -291,9 +291,8 @@ class SyncHandler: user_id = sync_config.user.to_string() await self.auth.check_auth_blocking(requester=requester) - res = await self.response_cache.wrap_conditional( + res = await self.response_cache.wrap( sync_config.request_key, - lambda result: since_token != result.next_batch, self._wait_for_sync_for_user, sync_config, since_token, diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 74219cb05e..6b608ebc9b 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -39,6 +39,11 @@ class PusherWorkerStore(SQLBaseStore): db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")] ) + self.db_pool.updates.register_background_update_handler( + "remove_deactivated_pushers", + self._remove_deactivated_pushers, + ) + def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]: """JSON-decode the data in the rows returned from the `pushers` table @@ -284,6 +289,54 @@ class PusherWorkerStore(SQLBaseStore): lock=False, ) + async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int: + """A background update that deletes all pushers for deactivated users. + + Note that we don't proacively tell the pusherpool that we've deleted + these (just because its a bit off a faff to do from here), but they will + get cleaned up at the next restart + """ + + last_user = progress.get("last_user", "") + + def _delete_pushers(txn) -> int: + + sql = """ + SELECT name FROM users + WHERE deactivated = ? and name > ? + ORDER BY name ASC + LIMIT ? + """ + + txn.execute(sql, (1, last_user, batch_size)) + users = [row[0] for row in txn] + + self.db_pool.simple_delete_many_txn( + txn, + table="pushers", + column="user_name", + iterable=users, + keyvalues={}, + ) + + if users: + self.db_pool.updates._background_update_progress_txn( + txn, "remove_deactivated_pushers", {"last_user": users[-1]} + ) + + return len(users) + + number_deleted = await self.db_pool.runInteraction( + "_remove_deactivated_pushers", _delete_pushers + ) + + if number_deleted < batch_size: + await self.db_pool.updates._end_background_update( + "remove_deactivated_pushers" + ) + + return number_deleted + class PusherStore(PusherWorkerStore): def get_pushers_stream_token(self) -> int: diff --git a/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql b/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql index 20ba4abca3..0ec6764150 100644 --- a/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql +++ b/synapse/storage/databases/main/schema/delta/59/08delete_pushers_for_deactivated_accounts.sql @@ -14,8 +14,7 @@ */ --- We may not have deleted all pushers for deactivated accounts. Do so now. --- --- Note: We don't bother updating the `deleted_pushers` table as it's just use --- to stop pushers on workers, and that will happen when they get next restarted. -DELETE FROM pushers WHERE user_name IN (SELECT name FROM users WHERE deactivated = 1); +-- We may not have deleted all pushers for deactivated accounts, so we set up a +-- background job to delete them. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (5908, 'remove_deactivated_pushers', '{}'); diff --git a/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql b/synapse/storage/databases/main/schema/delta/59/09rejected_events_metadata.sql index 9c95646281..cc9b267c7d 100644 --- a/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql +++ b/synapse/storage/databases/main/schema/delta/59/09rejected_events_metadata.sql @@ -13,5 +13,14 @@ * limitations under the License. */ +-- This originally was in 58/, but landed after 59/ was created, and so some +-- servers running develop didn't run this delta. Running it again should be +-- safe. +-- +-- We first delete any in progress `rejected_events_metadata` background update, +-- to ensure that we don't conflict when trying to insert the new one. (We could +-- alternatively do an ON CONFLICT DO NOTHING, but that syntax isn't supported +-- by older SQLite versions. Plus, this should be a rare case). +DELETE FROM background_updates WHERE update_name = 'rejected_events_metadata'; INSERT INTO background_updates (ordering, update_name, progress_json) VALUES (5828, 'rejected_events_metadata', '{}'); diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 53f85195a7..32228f42ee 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, Set, TypeVar +from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, TypeVar from twisted.internet import defer @@ -40,7 +40,6 @@ class ResponseCache(Generic[T]): def __init__(self, hs: "HomeServer", name: str, timeout_ms: float = 0): # Requests that haven't finished yet. self.pending_result_cache = {} # type: Dict[T, ObservableDeferred] - self.pending_conditionals = {} # type: Dict[T, Set[Callable[[Any], bool]]] self.clock = hs.get_clock() self.timeout_sec = timeout_ms / 1000.0 @@ -102,11 +101,7 @@ class ResponseCache(Generic[T]): self.pending_result_cache[key] = result def remove(r): - should_cache = all( - func(r) for func in self.pending_conditionals.pop(key, []) - ) - - if self.timeout_sec and should_cache: + if self.timeout_sec: self.clock.call_later( self.timeout_sec, self.pending_result_cache.pop, key, None ) @@ -117,31 +112,6 @@ class ResponseCache(Generic[T]): result.addBoth(remove) return result.observe() - def add_conditional(self, key: T, conditional: Callable[[Any], bool]): - self.pending_conditionals.setdefault(key, set()).add(conditional) - - def wrap_conditional( - self, - key: T, - should_cache: Callable[[Any], bool], - callback: "Callable[..., Any]", - *args: Any, - **kwargs: Any - ) -> defer.Deferred: - """The same as wrap(), but adds a conditional to the final execution. - - When the final execution completes, *all* conditionals need to return True for it to properly cache, - else it'll not be cached in a timed fashion. - """ - - # See if there's already a result on this key that hasn't yet completed. Due to the single-threaded nature of - # python, adding a key immediately in the same execution thread will not cause a race condition. - result = self.get(key) - if not result or isinstance(result, defer.Deferred) and not result.called: - self.add_conditional(key, should_cache) - - return self.wrap(key, callback, *args, **kwargs) - def wrap( self, key: T, callback: "Callable[..., Any]", *args: Any, **kwargs: Any ) -> defer.Deferred: |