summary refs log tree commit diff
path: root/synapse/storage/profile.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/profile.py')
-rw-r--r--synapse/storage/profile.py100
1 files changed, 88 insertions, 12 deletions
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index 912c1df6be..0a36c9cb34 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,8 +19,11 @@ from twisted.internet import defer
 from synapse.api.errors import StoreError
 from synapse.storage.roommember import ProfileInfo
 
+from . import background_updates
 from ._base import SQLBaseStore
 
+BATCH_SIZE = 100
+
 
 class ProfileWorkerStore(SQLBaseStore):
     @defer.inlineCallbacks
@@ -58,6 +62,54 @@ class ProfileWorkerStore(SQLBaseStore):
             desc="get_profile_avatar_url",
         )
 
+    def get_latest_profile_replication_batch_number(self):
+        def f(txn):
+            txn.execute("SELECT MAX(batch) as maxbatch FROM profiles")
+            rows = self.cursor_to_dict(txn)
+            return rows[0]["maxbatch"]
+
+        return self.runInteraction("get_latest_profile_replication_batch_number", f)
+
+    def get_profile_batch(self, batchnum):
+        return self._simple_select_list(
+            table="profiles",
+            keyvalues={"batch": batchnum},
+            retcols=("user_id", "displayname", "avatar_url", "active"),
+            desc="get_profile_batch",
+        )
+
+    def assign_profile_batch(self):
+        def f(txn):
+            sql = (
+                "UPDATE profiles SET batch = "
+                "(SELECT COALESCE(MAX(batch), -1) + 1 FROM profiles) "
+                "WHERE user_id in ("
+                "    SELECT user_id FROM profiles WHERE batch is NULL limit ?"
+                ")"
+            )
+            txn.execute(sql, (BATCH_SIZE,))
+            return txn.rowcount
+
+        return self.runInteraction("assign_profile_batch", f)
+
+    def get_replication_hosts(self):
+        def f(txn):
+            txn.execute(
+                "SELECT host, last_synced_batch FROM profile_replication_status"
+            )
+            rows = self.cursor_to_dict(txn)
+            return {r["host"]: r["last_synced_batch"] for r in rows}
+
+        return self.runInteraction("get_replication_hosts", f)
+
+    def update_replication_batch_for_host(self, host, last_synced_batch):
+        return self._simple_upsert(
+            table="profile_replication_status",
+            keyvalues={"host": host},
+            values={"last_synced_batch": last_synced_batch},
+            desc="update_replication_batch_for_host",
+        )
+
     def get_from_remote_profile_cache(self, user_id):
         return self._simple_select_one(
             table="remote_profile_cache",
@@ -67,29 +119,53 @@ class ProfileWorkerStore(SQLBaseStore):
             desc="get_from_remote_profile_cache",
         )
 
-    def create_profile(self, user_localpart):
-        return self._simple_insert(
-            table="profiles", values={"user_id": user_localpart}, desc="create_profile"
-        )
-
-    def set_profile_displayname(self, user_localpart, new_displayname):
-        return self._simple_update_one(
+    def set_profile_displayname(self, user_localpart, new_displayname, batchnum):
+        return self._simple_upsert(
             table="profiles",
             keyvalues={"user_id": user_localpart},
-            updatevalues={"displayname": new_displayname},
+            values={"displayname": new_displayname, "batch": batchnum},
             desc="set_profile_displayname",
+            lock=False,  # we can do this because user_id has a unique index
         )
 
-    def set_profile_avatar_url(self, user_localpart, new_avatar_url):
-        return self._simple_update_one(
+    def set_profile_avatar_url(self, user_localpart, new_avatar_url, batchnum):
+        return self._simple_upsert(
             table="profiles",
             keyvalues={"user_id": user_localpart},
-            updatevalues={"avatar_url": new_avatar_url},
+            values={"avatar_url": new_avatar_url, "batch": batchnum},
             desc="set_profile_avatar_url",
+            lock=False,  # we can do this because user_id has a unique index
+        )
+
+    def set_profile_active(self, user_localpart, active, hide, batchnum):
+        values = {"active": int(active), "batch": batchnum}
+        if not active and not hide:
+            # we are deactivating for real (not in hide mode)
+            # so clear the profile.
+            values["avatar_url"] = None
+            values["displayname"] = None
+        return self._simple_upsert(
+            table="profiles",
+            keyvalues={"user_id": user_localpart},
+            values=values,
+            desc="set_profile_active",
+            lock=False,  # we can do this because user_id has a unique index
         )
 
 
-class ProfileStore(ProfileWorkerStore):
+class ProfileStore(ProfileWorkerStore, background_updates.BackgroundUpdateStore):
+    def __init__(self, db_conn, hs):
+
+        super(ProfileStore, self).__init__(db_conn, hs)
+
+        self.register_background_index_update(
+            "profile_replication_status_host_index",
+            index_name="profile_replication_status_idx",
+            table="profile_replication_status",
+            columns=["host"],
+            unique=True,
+        )
+
     def add_remote_profile_cache(self, user_id, displayname, avatar_url):
         """Ensure we are caching the remote user's profiles.