diff options
author | Olivier Wilkinson (reivilibre) <oliverw@matrix.org> | 2022-03-17 14:38:35 +0000 |
---|---|---|
committer | Olivier Wilkinson (reivilibre) <oliverw@matrix.org> | 2022-03-17 14:53:28 +0000 |
commit | 49082227209abf1960f620b4ad4225b7162045a5 (patch) | |
tree | f8ed953abed16a43f895c35fe154550a56e51781 | |
parent | Fold MontlyActiveUsersStore into MonthlyActiveUsersWorkerStore (diff) | |
download | synapse-49082227209abf1960f620b4ad4225b7162045a5.tar.xz |
Add assertions that client IP update methods only run on a designated worker
-rw-r--r-- | synapse/storage/databases/main/client_ips.py | 20 |
1 files changed, 20 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 9cc3f3eeb4..065842b29f 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -549,6 +549,7 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore): db_conn: LoggingDatabaseConnection, hs: "HomeServer", ): + self._update_on_this_worker = hs.config.worker.run_background_tasks # (user_id, access_token, ip,) -> last_seen self.client_ip_last_seen = LruCache[Tuple[str, str, str], int]( @@ -578,6 +579,10 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore): device_id: Optional[str], now: Optional[int] = None, ) -> None: + assert ( + self._update_on_this_worker + ), "This worker is not designated to update client IPs" + if not now: now = int(self._clock.time_msec()) key = (user_id, access_token, ip) @@ -597,6 +602,9 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore): @wrap_as_background_process("update_client_ips") async def _update_client_ips_batch(self) -> None: + assert ( + self._update_on_this_worker + ), "This worker is not designated to update client IPs" # If the DB pool has already terminated, don't try updating if not self.db_pool.is_running(): @@ -614,6 +622,10 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore): txn: LoggingTransaction, to_update: Mapping[Tuple[str, str, str], Tuple[str, Optional[str], int]], ) -> None: + assert ( + self._update_on_this_worker + ), "This worker is not designated to update client IPs" + if "user_ips" in self.db_pool._unsafe_to_upsert_tables or ( not self.database_engine.can_native_upsert ): @@ -664,6 +676,10 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore): A dictionary mapping a tuple of (user_id, device_id) to dicts, with keys giving the column names from the devices table. """ + assert ( + self._update_on_this_worker + ), "This worker is not designated to update client IPs" + ret = await super().get_last_client_ip_by_device(user_id, device_id) # Update what is retrieved from the database with data which is pending @@ -709,6 +725,10 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore): Only the latest user agent for each access token and IP address combination is available. """ + assert ( + self._update_on_this_worker + ), "This worker is not designated to update client IPs" + results: Dict[Tuple[str, str], LastConnectionInfo] = { (connection["access_token"], connection["ip"]): connection for connection in await super().get_user_ip_and_agents(user, since_ts) |