diff options
author | Erik Johnston <erik@matrix.org> | 2017-06-27 13:37:04 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2017-06-27 13:37:04 +0100 |
commit | ed3d0170d9317644004b1203ecedaba58866ab9d (patch) | |
tree | f1011cd5b43ce0bd8b50f98f3ba53c69c7cd9740 /synapse/storage/client_ips.py | |
parent | Merge pull request #2304 from matrix-org/erikj/users_share_fix (diff) | |
download | synapse-ed3d0170d9317644004b1203ecedaba58866ab9d.tar.xz |
Batch upsert user ips
Diffstat (limited to 'synapse/storage/client_ips.py')
-rw-r--r-- | synapse/storage/client_ips.py | 57 |
1 files changed, 38 insertions, 19 deletions
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 014ab635b7..68955e740b 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -15,7 +15,7 @@ import logging -from twisted.internet import defer +from twisted.internet import defer, reactor from ._base import Cache from . import background_updates @@ -50,7 +50,13 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): columns=["user_id", "device_id", "last_seen"], ) - @defer.inlineCallbacks + self._batch_row_update = {} + + self._client_ip_looper = self._clock.looping_call( + self._update_client_ips_batch, 5 * 1000 + ) + reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch) + def insert_client_ip(self, user, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) key = (user.to_string(), access_token, ip) @@ -62,28 +68,41 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): # Rate-limited inserts if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: - defer.returnValue(None) + return self.client_ip_last_seen.prefill(key, now) - # It's safe not to lock here: a) no unique constraint, - # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely - yield self._simple_upsert( - "user_ips", - keyvalues={ - "user_id": user.to_string(), - "access_token": access_token, - "ip": ip, - "user_agent": user_agent, - "device_id": device_id, - }, - values={ - "last_seen": now, - }, - desc="insert_client_ip", - lock=False, + self._batch_row_update[key] = (user_agent, device_id, now) + + def _update_client_ips_batch(self): + to_update = self._batch_row_update + self._batch_row_update = {} + return self.runInteraction( + "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update ) + def _update_client_ips_batch_txn(self, txn, to_update): + self.database_engine.lock_table(txn, "user_ips") + + for entry in to_update.iteritems(): + (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry + + self._simple_upsert_txn( + txn, + table="user_ips", + keyvalues={ + "user_id": user_id, + "access_token": access_token, + "ip": ip, + "user_agent": user_agent, + "device_id": device_id, + }, + values={ + "last_seen": last_seen, + }, + lock=False, + ) + @defer.inlineCallbacks def get_last_client_ip_by_device(self, devices): """For each device_id listed, give the user_ip it was last seen on |