diff options
author | David Baker <dave@matrix.org> | 2018-04-10 17:41:58 +0100 |
---|---|---|
committer | David Baker <dave@matrix.org> | 2018-04-10 17:41:58 +0100 |
commit | e654230a51affdeca69b365b0ec43ff95134bdf6 (patch) | |
tree | b886945c6c954ecb95756a188ae5a9cb855b29ca /synapse/handlers | |
parent | Merge pull request #2973 from matrix-org/matthew/dinsic_3pid_check (diff) | |
download | synapse-e654230a51affdeca69b365b0ec43ff95134bdf6.tar.xz |
Written but untested profile replication
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/profile.py | 70 |
1 files changed, 69 insertions, 1 deletions
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index cb710fe796..be9d1d8c41 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -15,12 +15,14 @@ import logging -from twisted.internet import defer +from twisted.internet import defer, reactor from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.types import UserID, get_domain_from_id from ._base import BaseHandler +from signedjson.sign import sign_json + logger = logging.getLogger(__name__) @@ -28,6 +30,8 @@ class ProfileHandler(BaseHandler): PROFILE_UPDATE_MS = 60 * 1000 PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000 + PROFILE_REPLICATE_INTERVAL = 2 * 60 * 1000 + def __init__(self, hs): super(ProfileHandler, self).__init__(hs) @@ -38,8 +42,72 @@ class ProfileHandler(BaseHandler): self.user_directory_handler = hs.get_user_directory_handler() + self.http_client = hs.get_simple_http_client() + self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS) + reactor.callWhenRunning(self._assign_profile_replication_batches) + reactor.callWhenRunning(self._replicate_profiles) + self.clock.looping_call(self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL) + + @defer.inlineCallbacks + def _assign_profile_replication_batches(self): + """If no profile replication has been done yet, allocate replication batch + numbers to each profile to start the replication process. + """ + logger.info("Assigning profile batch numbers...") + total = 0 + while True: + assigned = yield self.store.assign_profile_batch() + total += assigned + if assigned == 0: + break + logger.info("Assigned %d profile batch numbers", total) + + @defer.inlineCallbacks + def _replicate_profiles(self): + """If any profile data has been updated and not pushed to the replication targets, + replicate it. + """ + host_batches = yield self.store.get_replication_hosts() + latest_batch = yield self.store.get_latest_profile_replication_batch_number() + for repl_host in self.hs.config.replicate_user_profiles_to: + if repl_host not in host_batches: + host_batches[repl_host] = -1 + try: + for i in xrange(host_batches[repl_host] + 1, latest_batch + 1): + yield self._replicate_host_profile_batch(repl_host, i) + except: + logger.exception( + "Exception while replicating to %s: aborting for now", repl_host, + ) + + @defer.inlineCallbacks + def _replicate_host_profile_batch(self, host, batchnum): + logger.info("Replicating profile batch %d to %s", batchnum, host) + batch_rows = yield self.store.get_profile_batch(batchnum) + batch = { + UserID(r["user_id"], self.hs.hostname).to_string(): { + "displayname": r["displayname"], + "avatar_url": r["avatar_url"], + } for r in batch_rows + } + + url = "https://%s/_matrix/federation/v1/replicate_profiles" % (host,) + signed_batch = { + "batchnum": batchnum, + "signed_batch": sign_json(batch, self.hs.hostname, self.hs.config.signing_key[0]), + "origin_server": self.hs.hostname, + } + try: + yield self.http_client.post_json_get_json(url, signed_batch) + self.store.update_replication_batch_for_host(host, batchnum) + logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host) + except: + # This will get retried when the looping call next comes around + logger.exception("Failed to replicate profile batch %d to %s", batchnum, host) + raise + @defer.inlineCallbacks def get_profile(self, user_id): target_user = UserID.from_string(user_id) |