summary refs log tree commit diff
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2022-03-17 14:38:35 +0000
committerOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2022-03-17 14:53:28 +0000
commit49082227209abf1960f620b4ad4225b7162045a5 (patch)
treef8ed953abed16a43f895c35fe154550a56e51781
parentFold MontlyActiveUsersStore into MonthlyActiveUsersWorkerStore (diff)
downloadsynapse-49082227209abf1960f620b4ad4225b7162045a5.tar.xz
Add assertions that client IP update methods only run on a designated worker
-rw-r--r--synapse/storage/databases/main/client_ips.py20
1 files changed, 20 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 9cc3f3eeb4..065842b29f 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -549,6 +549,7 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore):
         db_conn: LoggingDatabaseConnection,
         hs: "HomeServer",
     ):
+        self._update_on_this_worker = hs.config.worker.run_background_tasks
 
         # (user_id, access_token, ip,) -> last_seen
         self.client_ip_last_seen = LruCache[Tuple[str, str, str], int](
@@ -578,6 +579,10 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore):
         device_id: Optional[str],
         now: Optional[int] = None,
     ) -> None:
+        assert (
+            self._update_on_this_worker
+        ), "This worker is not designated to update client IPs"
+
         if not now:
             now = int(self._clock.time_msec())
         key = (user_id, access_token, ip)
@@ -597,6 +602,9 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore):
 
     @wrap_as_background_process("update_client_ips")
     async def _update_client_ips_batch(self) -> None:
+        assert (
+            self._update_on_this_worker
+        ), "This worker is not designated to update client IPs"
 
         # If the DB pool has already terminated, don't try updating
         if not self.db_pool.is_running():
@@ -614,6 +622,10 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore):
         txn: LoggingTransaction,
         to_update: Mapping[Tuple[str, str, str], Tuple[str, Optional[str], int]],
     ) -> None:
+        assert (
+            self._update_on_this_worker
+        ), "This worker is not designated to update client IPs"
+
         if "user_ips" in self.db_pool._unsafe_to_upsert_tables or (
             not self.database_engine.can_native_upsert
         ):
@@ -664,6 +676,10 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore):
             A dictionary mapping a tuple of (user_id, device_id) to dicts, with
             keys giving the column names from the devices table.
         """
+        assert (
+            self._update_on_this_worker
+        ), "This worker is not designated to update client IPs"
+
         ret = await super().get_last_client_ip_by_device(user_id, device_id)
 
         # Update what is retrieved from the database with data which is pending
@@ -709,6 +725,10 @@ class ClientIpStore(ClientIpWorkerStore, MonthlyActiveUsersWorkerStore):
             Only the latest user agent for each access token and IP address combination
             is available.
         """
+        assert (
+            self._update_on_this_worker
+        ), "This worker is not designated to update client IPs"
+
         results: Dict[Tuple[str, str], LastConnectionInfo] = {
             (connection["access_token"], connection["ip"]): connection
             for connection in await super().get_user_ip_and_agents(user, since_ts)