From ed3d0170d9317644004b1203ecedaba58866ab9d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Jun 2017 13:37:04 +0100 Subject: Batch upsert user ips --- synapse/api/auth.py | 3 +-- synapse/storage/client_ips.py | 57 ++++++++++++++++++++++++++++--------------- 2 files changed, 39 insertions(+), 21 deletions(-) (limited to 'synapse') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 0c297cb022..10f4972369 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -23,7 +23,6 @@ from synapse import event_auth from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import AuthError, Codes from synapse.types import UserID -from synapse.util import logcontext from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -200,7 +199,7 @@ class Auth(object): default=[""] )[0] if user and access_token and ip_addr: - logcontext.preserve_fn(self.store.insert_client_ip)( + self.store.insert_client_ip( user=user, access_token=access_token, ip=ip_addr, diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 014ab635b7..68955e740b 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -15,7 +15,7 @@ import logging -from twisted.internet import defer +from twisted.internet import defer, reactor from ._base import Cache from . import background_updates @@ -50,7 +50,13 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): columns=["user_id", "device_id", "last_seen"], ) - @defer.inlineCallbacks + self._batch_row_update = {} + + self._client_ip_looper = self._clock.looping_call( + self._update_client_ips_batch, 5 * 1000 + ) + reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch) + def insert_client_ip(self, user, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) key = (user.to_string(), access_token, ip) @@ -62,28 +68,41 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): # Rate-limited inserts if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: - defer.returnValue(None) + return self.client_ip_last_seen.prefill(key, now) - # It's safe not to lock here: a) no unique constraint, - # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely - yield self._simple_upsert( - "user_ips", - keyvalues={ - "user_id": user.to_string(), - "access_token": access_token, - "ip": ip, - "user_agent": user_agent, - "device_id": device_id, - }, - values={ - "last_seen": now, - }, - desc="insert_client_ip", - lock=False, + self._batch_row_update[key] = (user_agent, device_id, now) + + def _update_client_ips_batch(self): + to_update = self._batch_row_update + self._batch_row_update = {} + return self.runInteraction( + "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update ) + def _update_client_ips_batch_txn(self, txn, to_update): + self.database_engine.lock_table(txn, "user_ips") + + for entry in to_update.iteritems(): + (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry + + self._simple_upsert_txn( + txn, + table="user_ips", + keyvalues={ + "user_id": user_id, + "access_token": access_token, + "ip": ip, + "user_agent": user_agent, + "device_id": device_id, + }, + values={ + "last_seen": last_seen, + }, + lock=False, + ) + @defer.inlineCallbacks def get_last_client_ip_by_device(self, devices): """For each device_id listed, give the user_ip it was last seen on -- cgit 1.4.1