diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index 5c21287998..34326718ad 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -114,6 +114,8 @@ class RegistrationConfig(Config):
# If enabled, user IDs, display names and avatar URLs will be replicated
# to this server whenever they change.
+ # This is an experimental API currently implemented by sydent to support
+ # cross-homeserver user directories.
# replicate_user_profiles_to: example.com
# Users who register on this homeserver will automatically be joined
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 8332771c15..7202d3c81d 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -53,6 +53,9 @@ class ProfileHandler(BaseHandler):
if len(self.hs.config.replicate_user_profiles_to) > 0:
reactor.callWhenRunning(self._assign_profile_replication_batches)
reactor.callWhenRunning(self._replicate_profiles)
+ # Add a looping call to replicate_profiles: this handles retries
+ # if the replication is unsuccessful when the user updated their
+ # profile.
self.clock.looping_call(
self._replicate_profiles, self.PROFILE_REPLICATE_INTERVAL
)
@@ -109,7 +112,7 @@ class ProfileHandler(BaseHandler):
signed_body = sign_json(body, self.hs.hostname, self.hs.config.signing_key[0])
try:
yield self.http_client.post_json_get_json(url, signed_body)
- self.store.update_replication_batch_for_host(host, batchnum)
+ yield self.store.update_replication_batch_for_host(host, batchnum)
logger.info("Sucessfully replicated profile batch %d to %s", batchnum, host)
except Exception:
# This will get retried when the looping call next comes around
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index 048f48dcc1..12e2d44406 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -65,16 +65,14 @@ class ProfileWorkerStore(SQLBaseStore):
desc="get_profile_avatar_url",
)
- @defer.inlineCallbacks
def get_latest_profile_replication_batch_number(self):
def f(txn):
txn.execute("SELECT MAX(batch) as maxbatch FROM profiles")
rows = self.cursor_to_dict(txn)
return rows[0]['maxbatch']
- max_batch = yield self.runInteraction(
+ return self.runInteraction(
"get_latest_profile_replication_batch_number", f,
)
- defer.returnValue(max_batch)
def get_profile_batch(self, batchnum):
return self._simple_select_list(
@@ -86,7 +84,6 @@ class ProfileWorkerStore(SQLBaseStore):
desc="get_profile_batch",
)
- @defer.inlineCallbacks
def assign_profile_batch(self):
def f(txn):
sql = (
@@ -98,17 +95,14 @@ class ProfileWorkerStore(SQLBaseStore):
)
txn.execute(sql, (BATCH_SIZE,))
return txn.rowcount
- assigned = yield self.runInteraction("assign_profile_batch", f)
- defer.returnValue(assigned)
+ return self.runInteraction("assign_profile_batch", f)
- @defer.inlineCallbacks
def get_replication_hosts(self):
def f(txn):
txn.execute("SELECT host, last_synced_batch FROM profile_replication_status")
rows = self.cursor_to_dict(txn)
return {r['host']: r['last_synced_batch'] for r in rows}
- result = yield self.runInteraction("get_replication_hosts", f)
- defer.returnValue(result)
+ return self.runInteraction("get_replication_hosts", f)
def update_replication_batch_for_host(self, host, last_synced_batch):
return self._simple_upsert(
diff --git a/synapse/storage/schema/delta/48/profiles_batch.sql b/synapse/storage/schema/delta/48/profiles_batch.sql
index 7639ff22d5..e744c02fe8 100644
--- a/synapse/storage/schema/delta/48/profiles_batch.sql
+++ b/synapse/storage/schema/delta/48/profiles_batch.sql
@@ -13,10 +13,23 @@
* limitations under the License.
*/
+/*
+ * Add a batch number to track changes to profiles and the
+ * order they're made in so we can replicate user profiles
+ * to other hosts as they change
+ */
ALTER TABLE profiles ADD COLUMN batch BIGINT DEFAULT NULL;
+/*
+ * Index on the batch number so we can get profiles
+ * by their batch
+ */
CREATE INDEX profiles_batch_idx ON profiles(batch);
+/*
+ * A table to track what batch of user profiles has been
+ * synced to what profile replication target.
+ */
CREATE TABLE profile_replication_status (
host TEXT NOT NULL,
last_synced_batch BIGINT NOT NULL
diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py
index 205190f8d6..8646c4e434 100644
--- a/tests/handlers/test_profile.py
+++ b/tests/handlers/test_profile.py
@@ -75,7 +75,7 @@ class ProfileTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_get_my_name(self):
yield self.store.set_profile_displayname(
- self.frank.localpart, "Frank", 1
+ self.frank.localpart, "Frank", 1,
)
displayname = yield self.handler.get_displayname(self.frank)
@@ -135,7 +135,7 @@ class ProfileTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_get_my_avatar(self):
yield self.store.set_profile_avatar_url(
- self.frank.localpart, "http://my.server/me.png", 1
+ self.frank.localpart, "http://my.server/me.png", 1,
)
avatar_url = yield self.handler.get_avatar_url(self.frank)
diff --git a/tests/storage/test_profile.py b/tests/storage/test_profile.py
index 6b0cc17010..1bfabc15ad 100644
--- a/tests/storage/test_profile.py
+++ b/tests/storage/test_profile.py
@@ -40,7 +40,7 @@ class ProfileStoreTestCase(unittest.TestCase):
)
yield self.store.set_profile_displayname(
- self.u_frank.localpart, "Frank", 1
+ self.u_frank.localpart, "Frank", 1,
)
self.assertEquals(
@@ -55,7 +55,7 @@ class ProfileStoreTestCase(unittest.TestCase):
)
yield self.store.set_profile_avatar_url(
- self.u_frank.localpart, "http://my.site/here", 1
+ self.u_frank.localpart, "http://my.site/here", 1,
)
self.assertEquals(
|