summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorDavid Baker <dave@matrix.org>2018-04-10 17:41:58 +0100
committerDavid Baker <dave@matrix.org>2018-04-10 17:41:58 +0100
commite654230a51affdeca69b365b0ec43ff95134bdf6 (patch)
treeb886945c6c954ecb95756a188ae5a9cb855b29ca /synapse/handlers
parentMerge pull request #2973 from matrix-org/matthew/dinsic_3pid_check (diff)
downloadsynapse-e654230a51affdeca69b365b0ec43ff95134bdf6.tar.xz
Written but untested profile replication
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/profile.py70
1 files changed, 69 insertions, 1 deletions
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index cb710fe796..be9d1d8c41 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -15,12 +15,14 @@
 
 import logging
 
-from twisted.internet import defer
+from twisted.internet import defer, reactor
 
 from synapse.api.errors import SynapseError, AuthError, CodeMessageException
 from synapse.types import UserID, get_domain_from_id
 from ._base import BaseHandler
 
+from signedjson.sign import sign_json
+
 logger = logging.getLogger(__name__)
 
 
@@ -28,6 +30,8 @@ class ProfileHandler(BaseHandler):
     PROFILE_UPDATE_MS = 60 * 1000
     PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
 
+    PROFILE_REPLICATE_INTERVAL = 2 * 60 * 1000
+
     def __init__(self, hs):
         super(ProfileHandler, self).__init__(hs)
 
@@ -38,8 +42,72 @@ class ProfileHandler(BaseHandler):
 
         self.user_directory_handler = hs.get_user_directory_handler()
 
+        self.http_client = hs.get_simple_http_client()
+
         self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS)
 
+        reactor.callWhenRunning(self._assign_profile_replication_batches)
+        reactor.callWhenRunning(self._replicate_profiles)
+        self.clock.looping_call(self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL)
+
+    @defer.inlineCallbacks
+    def _assign_profile_replication_batches(self):
+        """If no profile replication has been done yet, allocate replication batch
+        numbers to each profile to start the replication process.
+        """
+        logger.info("Assigning profile batch numbers...")
+        total = 0
+        while True:
+            assigned = yield self.store.assign_profile_batch()
+            total += assigned
+            if assigned == 0:
+                break
+        logger.info("Assigned %d profile batch numbers", total)
+
+    @defer.inlineCallbacks
+    def _replicate_profiles(self):
+        """If any profile data has been updated and not pushed to the replication targets,
+        replicate it.
+        """
+        host_batches = yield self.store.get_replication_hosts()
+        latest_batch = yield self.store.get_latest_profile_replication_batch_number()
+        for repl_host in self.hs.config.replicate_user_profiles_to:
+            if repl_host not in host_batches:
+                host_batches[repl_host] = -1
+            try:
+                for i in xrange(host_batches[repl_host] + 1, latest_batch + 1):
+                    yield self._replicate_host_profile_batch(repl_host, i)
+            except:
+                logger.exception(
+                    "Exception while replicating to %s: aborting for now", repl_host,
+                )
+
+    @defer.inlineCallbacks
+    def _replicate_host_profile_batch(self, host, batchnum):
+        logger.info("Replicating profile batch %d to %s", batchnum, host)
+        batch_rows = yield self.store.get_profile_batch(batchnum)
+        batch = {
+            UserID(r["user_id"], self.hs.hostname).to_string(): {
+                "displayname": r["displayname"],
+                "avatar_url": r["avatar_url"],
+            } for r in batch_rows
+        }
+
+        url = "https://%s/_matrix/federation/v1/replicate_profiles" % (host,)
+        signed_batch = {
+            "batchnum": batchnum,
+            "signed_batch": sign_json(batch, self.hs.hostname, self.hs.config.signing_key[0]),
+            "origin_server": self.hs.hostname,
+        }
+        try:
+            yield self.http_client.post_json_get_json(url, signed_batch)
+            self.store.update_replication_batch_for_host(host, batchnum)
+            logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host)
+        except:
+            # This will get retried when the looping call next comes around
+            logger.exception("Failed to replicate profile batch %d to %s", batchnum, host)
+            raise
+
     @defer.inlineCallbacks
     def get_profile(self, user_id):
         target_user = UserID.from_string(user_id)