diff options
Diffstat (limited to 'synapse/storage/client_ips.py')
-rw-r--r-- | synapse/storage/client_ips.py | 37 |
1 files changed, 26 insertions, 11 deletions
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index ce338514e8..8fc678fa67 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -15,15 +15,15 @@ import logging -from twisted.internet import defer, reactor +from six import iteritems -from ._base import Cache -from . import background_updates +from twisted.internet import defer +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches import CACHE_SIZE_FACTOR -from six import iteritems - +from . import background_updates +from ._base import Cache logger = logging.getLogger(__name__) @@ -35,6 +35,7 @@ LAST_SEEN_GRANULARITY = 120 * 1000 class ClientIpStore(background_updates.BackgroundUpdateStore): def __init__(self, db_conn, hs): + self.client_ip_last_seen = Cache( name="client_ip_last_seen", keylen=4, @@ -70,8 +71,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): self._client_ip_looper = self._clock.looping_call( self._update_client_ips_batch, 5 * 1000 ) - reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch) + self.hs.get_reactor().addSystemEventTrigger( + "before", "shutdown", self._update_client_ips_batch + ) + @defer.inlineCallbacks def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id, now=None): if not now: @@ -82,7 +86,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): last_seen = self.client_ip_last_seen.get(key) except KeyError: last_seen = None - + yield self.populate_monthly_active_users(user_id) # Rate-limited inserts if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: return @@ -92,10 +96,21 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): self._batch_row_update[key] = (user_agent, device_id, now) def _update_client_ips_batch(self): - to_update = self._batch_row_update - self._batch_row_update = {} - return self.runInteraction( - "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update + + # If the DB pool has already terminated, don't try updating + if not self.hs.get_db_pool().running: + return + + def update(): + to_update = self._batch_row_update + self._batch_row_update = {} + return self.runInteraction( + "_update_client_ips_batch", self._update_client_ips_batch_txn, + to_update, + ) + + return run_as_background_process( + "update_client_ips", update, ) def _update_client_ips_batch_txn(self, txn, to_update): |