diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index fc468ea185..77ae10da3d 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -15,13 +15,15 @@
import logging
-from twisted.internet import defer, reactor
+from six import iteritems
-from ._base import Cache
-from . import background_updates
+from twisted.internet import defer
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import CACHE_SIZE_FACTOR
+from . import background_updates
+from ._base import Cache
logger = logging.getLogger(__name__)
@@ -32,14 +34,14 @@ LAST_SEEN_GRANULARITY = 120 * 1000
class ClientIpStore(background_updates.BackgroundUpdateStore):
- def __init__(self, hs):
+ def __init__(self, db_conn, hs):
self.client_ip_last_seen = Cache(
name="client_ip_last_seen",
keylen=4,
max_entries=50000 * CACHE_SIZE_FACTOR,
)
- super(ClientIpStore, self).__init__(hs)
+ super(ClientIpStore, self).__init__(db_conn, hs)
self.register_background_index_update(
"user_ips_device_index",
@@ -48,17 +50,35 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
columns=["user_id", "device_id", "last_seen"],
)
+ self.register_background_index_update(
+ "user_ips_last_seen_index",
+ index_name="user_ips_last_seen",
+ table="user_ips",
+ columns=["user_id", "last_seen"],
+ )
+
+ self.register_background_index_update(
+ "user_ips_last_seen_only_index",
+ index_name="user_ips_last_seen_only",
+ table="user_ips",
+ columns=["last_seen"],
+ )
+
# (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}
self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
- reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)
+ self.hs.get_reactor().addSystemEventTrigger(
+ "before", "shutdown", self._update_client_ips_batch
+ )
- def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
- now = int(self._clock.time_msec())
- key = (user.to_string(), access_token, ip)
+ def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
+ now=None):
+ if not now:
+ now = int(self._clock.time_msec())
+ key = (user_id, access_token, ip)
try:
last_seen = self.client_ip_last_seen.get(key)
@@ -74,16 +94,22 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
self._batch_row_update[key] = (user_agent, device_id, now)
def _update_client_ips_batch(self):
- to_update = self._batch_row_update
- self._batch_row_update = {}
- return self.runInteraction(
- "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
+ def update():
+ to_update = self._batch_row_update
+ self._batch_row_update = {}
+ return self.runInteraction(
+ "_update_client_ips_batch", self._update_client_ips_batch_txn,
+ to_update,
+ )
+
+ run_as_background_process(
+ "update_client_ips", update,
)
def _update_client_ips_batch_txn(self, txn, to_update):
self.database_engine.lock_table(txn, "user_ips")
- for entry in to_update.iteritems():
+ for entry in iteritems(to_update):
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
self._simple_upsert_txn(
@@ -215,5 +241,5 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
"user_agent": user_agent,
"last_seen": last_seen,
}
- for (access_token, ip), (user_agent, last_seen) in results.iteritems()
+ for (access_token, ip), (user_agent, last_seen) in iteritems(results)
))
|