diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index da3b99f93d..13de5f1f62 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -56,7 +56,7 @@ class SQLBaseStore(metaclass=ABCMeta):
members_changed (iterable[str]): The user_ids of members that have
changed
"""
- for host in set(get_domain_from_id(u) for u in members_changed):
+ for host in {get_domain_from_id(u) for u in members_changed}:
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
self._attempt_to_invalidate_cache("was_host_joined", (room_id, host))
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index bd547f35cf..eb1a7e5002 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -189,7 +189,7 @@ class BackgroundUpdater(object):
keyvalues=None,
retcols=("update_name", "depends_on"),
)
- in_flight = set(update["update_name"] for update in updates)
+ in_flight = {update["update_name"] for update in updates}
for update in updates:
if update["depends_on"] not in in_flight:
self._background_update_queue.append(update["update_name"])
diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py
index 2700cca822..acca079f23 100644
--- a/synapse/storage/data_stores/main/__init__.py
+++ b/synapse/storage/data_stores/main/__init__.py
@@ -20,6 +20,7 @@ import logging
import time
from synapse.api.constants import PresenceState
+from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import Database
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import (
@@ -117,16 +118,6 @@ class DataStore(
self._clock = hs.get_clock()
self.database_engine = database.engine
- all_users_native = are_all_users_on_domain(
- db_conn.cursor(), database.engine, hs.hostname
- )
- if not all_users_native:
- raise Exception(
- "Found users in database not native to %s!\n"
- "You cannot changed a synapse server_name after it's been configured"
- % (hs.hostname,)
- )
-
self._stream_id_gen = StreamIdGenerator(
db_conn,
"events",
@@ -567,13 +558,26 @@ class DataStore(
)
-def are_all_users_on_domain(txn, database_engine, domain):
+def check_database_before_upgrade(cur, database_engine, config: HomeServerConfig):
+ """Called before upgrading an existing database to check that it is broadly sane
+ compared with the configuration.
+ """
+ domain = config.server_name
+
sql = database_engine.convert_param_style(
"SELECT COUNT(*) FROM users WHERE name NOT LIKE ?"
)
pat = "%:" + domain
- txn.execute(sql, (pat,))
- num_not_matching = txn.fetchall()[0][0]
+ cur.execute(sql, (pat,))
+ num_not_matching = cur.fetchall()[0][0]
if num_not_matching == 0:
- return True
- return False
+ return
+
+ raise Exception(
+ "Found users in database not native to %s!\n"
+ "You cannot changed a synapse server_name after it's been configured"
+ % (domain,)
+ )
+
+
+__all__ = ["DataStore", "check_database_before_upgrade"]
diff --git a/synapse/storage/data_stores/main/appservice.py b/synapse/storage/data_stores/main/appservice.py
index b2f39649fd..9c52aa5340 100644
--- a/synapse/storage/data_stores/main/appservice.py
+++ b/synapse/storage/data_stores/main/appservice.py
@@ -35,7 +35,7 @@ def _make_exclusive_regex(services_cache):
exclusive_user_regexes = [
regex.pattern
for service in services_cache
- for regex in service.get_exlusive_user_regexes()
+ for regex in service.get_exclusive_user_regexes()
]
if exclusive_user_regexes:
exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
@@ -135,7 +135,7 @@ class ApplicationServiceTransactionWorkerStore(
may be empty.
"""
results = yield self.db.simple_select_list(
- "application_services_state", dict(state=state), ["as_id"]
+ "application_services_state", {"state": state}, ["as_id"]
)
# NB: This assumes this class is linked with ApplicationServiceStore
as_list = self.get_app_services()
@@ -158,7 +158,7 @@ class ApplicationServiceTransactionWorkerStore(
"""
result = yield self.db.simple_select_one(
"application_services_state",
- dict(as_id=service.id),
+ {"as_id": service.id},
["state"],
allow_none=True,
desc="get_appservice_state",
@@ -177,7 +177,7 @@ class ApplicationServiceTransactionWorkerStore(
A Deferred which resolves when the state was set successfully.
"""
return self.db.simple_upsert(
- "application_services_state", dict(as_id=service.id), dict(state=state)
+ "application_services_state", {"as_id": service.id}, {"state": state}
)
def create_appservice_txn(self, service, events):
@@ -253,13 +253,15 @@ class ApplicationServiceTransactionWorkerStore(
self.db.simple_upsert_txn(
txn,
"application_services_state",
- dict(as_id=service.id),
- dict(last_txn=txn_id),
+ {"as_id": service.id},
+ {"last_txn": txn_id},
)
# Delete txn
self.db.simple_delete_txn(
- txn, "application_services_txns", dict(txn_id=txn_id, as_id=service.id)
+ txn,
+ "application_services_txns",
+ {"txn_id": txn_id, "as_id": service.id},
)
return self.db.runInteraction(
diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py
index 13f4c9c72e..e1ccb27142 100644
--- a/synapse/storage/data_stores/main/client_ips.py
+++ b/synapse/storage/data_stores/main/client_ips.py
@@ -530,7 +530,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
((row["access_token"], row["ip"]), (row["user_agent"], row["last_seen"]))
for row in rows
)
- return list(
+ return [
{
"access_token": access_token,
"ip": ip,
@@ -538,7 +538,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
"last_seen": last_seen,
}
for (access_token, ip), (user_agent, last_seen) in iteritems(results)
- )
+ ]
@wrap_as_background_process("prune_old_user_ips")
async def _prune_old_user_ips(self):
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index b7617efb80..d55733a4cd 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -137,7 +137,7 @@ class DeviceWorkerStore(SQLBaseStore):
# get the cross-signing keys of the users in the list, so that we can
# determine which of the device changes were cross-signing keys
- users = set(r[0] for r in updates)
+ users = {r[0] for r in updates}
master_key_by_user = {}
self_signing_key_by_user = {}
for user in users:
@@ -446,7 +446,7 @@ class DeviceWorkerStore(SQLBaseStore):
a set of user_ids and results_map is a mapping of
user_id -> device_id -> device_info
"""
- user_ids = set(user_id for user_id, _ in query_list)
+ user_ids = {user_id for user_id, _ in query_list}
user_map = yield self.get_device_list_last_stream_id_for_remotes(list(user_ids))
# We go and check if any of the users need to have their device lists
@@ -454,10 +454,9 @@ class DeviceWorkerStore(SQLBaseStore):
users_needing_resync = yield self.get_user_ids_requiring_device_list_resync(
user_ids
)
- user_ids_in_cache = (
- set(user_id for user_id, stream_id in user_map.items() if stream_id)
- - users_needing_resync
- )
+ user_ids_in_cache = {
+ user_id for user_id, stream_id in user_map.items() if stream_id
+ } - users_needing_resync
user_ids_not_in_cache = user_ids - user_ids_in_cache
results = {}
@@ -604,7 +603,7 @@ class DeviceWorkerStore(SQLBaseStore):
rows = yield self.db.execute(
"get_users_whose_signatures_changed", None, sql, user_id, from_key
)
- return set(user for row in rows for user in json.loads(row[0]))
+ return {user for row in rows for user in json.loads(row[0])}
else:
return set()
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index e551606f9d..001a53f9b4 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -680,11 +680,6 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
'user_signing' for a user-signing key
key (dict): the key data
"""
- # the cross-signing keys need to occupy the same namespace as devices,
- # since signatures are identified by device ID. So add an entry to the
- # device table to make sure that we don't have a collision with device
- # IDs
-
# the 'key' dict will look something like:
# {
# "user_id": "@alice:example.com",
@@ -701,16 +696,24 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
# The "keys" property must only have one entry, which will be the public
# key, so we just grab the first value in there
pubkey = next(iter(key["keys"].values()))
- self.db.simple_insert_txn(
- txn,
- "devices",
- values={
- "user_id": user_id,
- "device_id": pubkey,
- "display_name": key_type + " signing key",
- "hidden": True,
- },
- )
+
+ # The cross-signing keys need to occupy the same namespace as devices,
+ # since signatures are identified by device ID. So add an entry to the
+ # device table to make sure that we don't have a collision with device
+ # IDs.
+ # We only need to do this for local users, since remote servers should be
+ # responsible for checking this for their own users.
+ if self.hs.is_mine_id(user_id):
+ self.db.simple_insert_txn(
+ txn,
+ "devices",
+ values={
+ "user_id": user_id,
+ "device_id": pubkey,
+ "display_name": key_type + " signing key",
+ "hidden": True,
+ },
+ )
# and finally, store the key itself
with self._cross_signing_id_gen.get_next() as stream_id:
diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py
index 60c67457b4..49a7b8b433 100644
--- a/synapse/storage/data_stores/main/event_federation.py
+++ b/synapse/storage/data_stores/main/event_federation.py
@@ -14,8 +14,8 @@
# limitations under the License.
import itertools
import logging
+from typing import List, Optional, Set
-from six.moves import range
from six.moves.queue import Empty, PriorityQueue
from twisted.internet import defer
@@ -27,6 +27,7 @@ from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
from synapse.storage.data_stores.main.signatures import SignatureWorkerStore
from synapse.storage.database import Database
from synapse.util.caches.descriptors import cached
+from synapse.util.iterutils import batch_iter
logger = logging.getLogger(__name__)
@@ -46,21 +47,37 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
event_ids, include_given=include_given
).addCallback(self.get_events_as_list)
- def get_auth_chain_ids(self, event_ids, include_given=False):
+ def get_auth_chain_ids(
+ self,
+ event_ids: List[str],
+ include_given: bool = False,
+ ignore_events: Optional[Set[str]] = None,
+ ):
"""Get auth events for given event_ids. The events *must* be state events.
Args:
- event_ids (list): state events
- include_given (bool): include the given events in result
+ event_ids: state events
+ include_given: include the given events in result
+ ignore_events: Set of events to exclude from the returned auth
+ chain. This is useful if the caller will just discard the
+ given events anyway, and saves us from figuring out their auth
+ chains if not required.
Returns:
list of event_ids
"""
return self.db.runInteraction(
- "get_auth_chain_ids", self._get_auth_chain_ids_txn, event_ids, include_given
+ "get_auth_chain_ids",
+ self._get_auth_chain_ids_txn,
+ event_ids,
+ include_given,
+ ignore_events,
)
- def _get_auth_chain_ids_txn(self, txn, event_ids, include_given):
+ def _get_auth_chain_ids_txn(self, txn, event_ids, include_given, ignore_events):
+ if ignore_events is None:
+ ignore_events = set()
+
if include_given:
results = set(event_ids)
else:
@@ -71,15 +88,14 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
front = set(event_ids)
while front:
new_front = set()
- front_list = list(front)
- chunks = [front_list[x : x + 100] for x in range(0, len(front), 100)]
- for chunk in chunks:
+ for chunk in batch_iter(front, 100):
clause, args = make_in_list_sql_clause(
txn.database_engine, "event_id", chunk
)
- txn.execute(base_sql + clause, list(args))
- new_front.update([r[0] for r in txn])
+ txn.execute(base_sql + clause, args)
+ new_front.update(r[0] for r in txn)
+ new_front -= ignore_events
new_front -= results
front = new_front
@@ -410,7 +426,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
query, (room_id, event_id, False, limit - len(event_results))
)
- new_results = set(t[0] for t in txn) - seen_events
+ new_results = {t[0] for t in txn} - seen_events
new_front |= new_results
seen_events |= new_results
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index c9d0d68c3a..8ae23df00a 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -145,7 +145,7 @@ class EventsStore(
return txn.fetchall()
res = yield self.db.runInteraction("read_forward_extremities", fetch)
- self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
+ self._current_forward_extremities_amount = c_counter([x[0] for x in res])
@_retry_on_integrity_error
@defer.inlineCallbacks
@@ -598,11 +598,11 @@ class EventsStore(
# We find out which membership events we may have deleted
# and which we have added, then we invlidate the caches for all
# those users.
- members_changed = set(
+ members_changed = {
state_key
for ev_type, state_key in itertools.chain(to_delete, to_insert)
if ev_type == EventTypes.Member
- )
+ }
for member in members_changed:
txn.call_after(
@@ -1615,7 +1615,7 @@ class EventsStore(
"""
)
- referenced_state_groups = set(sg for sg, in txn)
+ referenced_state_groups = {sg for sg, in txn}
logger.info(
"[purge] found %i referenced state groups", len(referenced_state_groups)
)
diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py
index 5177b71016..f54c8b1ee0 100644
--- a/synapse/storage/data_stores/main/events_bg_updates.py
+++ b/synapse/storage/data_stores/main/events_bg_updates.py
@@ -402,7 +402,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
keyvalues={},
retcols=("room_id",),
)
- room_ids = set(row["room_id"] for row in rows)
+ room_ids = {row["room_id"] for row in rows}
for room_id in room_ids:
txn.call_after(
self.get_latest_event_ids_in_room.invalidate, (room_id,)
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 7251e819f5..47a3a26072 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -494,9 +494,9 @@ class EventsWorkerStore(SQLBaseStore):
"""
with Measure(self._clock, "_fetch_event_list"):
try:
- events_to_fetch = set(
+ events_to_fetch = {
event_id for events, _ in event_list for event_id in events
- )
+ }
row_dict = self.db.new_transaction(
conn, "do_fetch", [], [], self._fetch_event_rows, events_to_fetch
@@ -804,7 +804,7 @@ class EventsWorkerStore(SQLBaseStore):
desc="have_events_in_timeline",
)
- return set(r["event_id"] for r in rows)
+ return {r["event_id"] for r in rows}
@defer.inlineCallbacks
def have_seen_events(self, event_ids):
diff --git a/synapse/storage/data_stores/main/profile.py b/synapse/storage/data_stores/main/profile.py
index 2b52cf9c1a..3dc4451447 100644
--- a/synapse/storage/data_stores/main/profile.py
+++ b/synapse/storage/data_stores/main/profile.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,9 +17,13 @@
from twisted.internet import defer
from synapse.api.errors import StoreError
+
+from synapse.storage import background_updates
from synapse.storage._base import SQLBaseStore
from synapse.storage.data_stores.main.roommember import ProfileInfo
+BATCH_SIZE = 100
+
class ProfileWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
@@ -57,6 +62,54 @@ class ProfileWorkerStore(SQLBaseStore):
desc="get_profile_avatar_url",
)
+ def get_latest_profile_replication_batch_number(self):
+ def f(txn):
+ txn.execute("SELECT MAX(batch) as maxbatch FROM profiles")
+ rows = self.db.cursor_to_dict(txn)
+ return rows[0]["maxbatch"]
+
+ return self.db.runInteraction("get_latest_profile_replication_batch_number", f)
+
+ def get_profile_batch(self, batchnum):
+ return self.db.simple_select_list(
+ table="profiles",
+ keyvalues={"batch": batchnum},
+ retcols=("user_id", "displayname", "avatar_url", "active"),
+ desc="get_profile_batch",
+ )
+
+ def assign_profile_batch(self):
+ def f(txn):
+ sql = (
+ "UPDATE profiles SET batch = "
+ "(SELECT COALESCE(MAX(batch), -1) + 1 FROM profiles) "
+ "WHERE user_id in ("
+ " SELECT user_id FROM profiles WHERE batch is NULL limit ?"
+ ")"
+ )
+ txn.execute(sql, (BATCH_SIZE,))
+ return txn.rowcount
+
+ return self.db.runInteraction("assign_profile_batch", f)
+
+ def get_replication_hosts(self):
+ def f(txn):
+ txn.execute(
+ "SELECT host, last_synced_batch FROM profile_replication_status"
+ )
+ rows = self.db.cursor_to_dict(txn)
+ return {r["host"]: r["last_synced_batch"] for r in rows}
+
+ return self.db.runInteraction("get_replication_hosts", f)
+
+ def update_replication_batch_for_host(self, host, last_synced_batch):
+ return self.db.simple_upsert(
+ table="profile_replication_status",
+ keyvalues={"host": host},
+ values={"last_synced_batch": last_synced_batch},
+ desc="update_replication_batch_for_host",
+ )
+
def get_from_remote_profile_cache(self, user_id):
return self.db.simple_select_one(
table="remote_profile_cache",
@@ -71,24 +124,53 @@ class ProfileWorkerStore(SQLBaseStore):
table="profiles", values={"user_id": user_localpart}, desc="create_profile"
)
- def set_profile_displayname(self, user_localpart, new_displayname):
- return self.db.simple_update_one(
+ def set_profile_displayname(self, user_localpart, new_displayname, batchnum):
+ return self.db.simple_upsert(
table="profiles",
keyvalues={"user_id": user_localpart},
- updatevalues={"displayname": new_displayname},
+ values={"displayname": new_displayname, "batch": batchnum},
desc="set_profile_displayname",
+ lock=False, # we can do this because user_id has a unique index
)
- def set_profile_avatar_url(self, user_localpart, new_avatar_url):
- return self.db.simple_update_one(
+ def set_profile_avatar_url(self, user_localpart, new_avatar_url, batchnum):
+ return self.db.simple_upsert(
table="profiles",
keyvalues={"user_id": user_localpart},
- updatevalues={"avatar_url": new_avatar_url},
+ values={"avatar_url": new_avatar_url, "batch": batchnum},
desc="set_profile_avatar_url",
+ lock=False, # we can do this because user_id has a unique index
+ )
+
+ def set_profile_active(self, user_localpart, active, hide, batchnum):
+ values = {"active": int(active), "batch": batchnum}
+ if not active and not hide:
+ # we are deactivating for real (not in hide mode)
+ # so clear the profile.
+ values["avatar_url"] = None
+ values["displayname"] = None
+ return self.db.simple_upsert(
+ table="profiles",
+ keyvalues={"user_id": user_localpart},
+ values=values,
+ desc="set_profile_active",
+ lock=False, # we can do this because user_id has a unique index
)
class ProfileStore(ProfileWorkerStore):
+ def __init__(self, database, db_conn, hs):
+
+ super(ProfileStore, self).__init__(database, db_conn, hs)
+
+ self.db.updates.register_background_index_update(
+ "profile_replication_status_host_index",
+ index_name="profile_replication_status_idx",
+ table="profile_replication_status",
+ columns=["host"],
+ unique=True,
+ )
+
def add_remote_profile_cache(self, user_id, displayname, avatar_url):
"""Ensure we are caching the remote user's profiles.
@@ -107,7 +189,7 @@ class ProfileStore(ProfileWorkerStore):
)
def update_remote_profile_cache(self, user_id, displayname, avatar_url):
- return self.db.simple_update(
+ return self.db.simple_upsert(
table="remote_profile_cache",
keyvalues={"user_id": user_id},
values={
diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py
index e2673ae073..62ac88d9f2 100644
--- a/synapse/storage/data_stores/main/push_rule.py
+++ b/synapse/storage/data_stores/main/push_rule.py
@@ -276,21 +276,21 @@ class PushRulesWorkerStore(
# We ignore app service users for now. This is so that we don't fill
# up the `get_if_users_have_pushers` cache with AS entries that we
# know don't have pushers, nor even read receipts.
- local_users_in_room = set(
+ local_users_in_room = {
u
for u in users_in_room
if self.hs.is_mine_id(u)
and not self.get_if_app_services_interested_in_user(u)
- )
+ }
# users in the room who have pushers need to get push rules run because
# that's how their pushers work
if_users_with_pushers = yield self.get_if_users_have_pushers(
local_users_in_room, on_invalidate=cache_context.invalidate
)
- user_ids = set(
+ user_ids = {
uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
- )
+ }
users_with_receipts = yield self.get_users_with_read_receipts_in_room(
room_id, on_invalidate=cache_context.invalidate
diff --git a/synapse/storage/data_stores/main/pusher.py b/synapse/storage/data_stores/main/pusher.py
index 6b03233262..547b9d69cb 100644
--- a/synapse/storage/data_stores/main/pusher.py
+++ b/synapse/storage/data_stores/main/pusher.py
@@ -197,6 +197,84 @@ class PusherWorkerStore(SQLBaseStore):
return result
+ @defer.inlineCallbacks
+ def update_pusher_last_stream_ordering(
+ self, app_id, pushkey, user_id, last_stream_ordering
+ ):
+ yield self.db.simple_update_one(
+ "pushers",
+ {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+ {"last_stream_ordering": last_stream_ordering},
+ desc="update_pusher_last_stream_ordering",
+ )
+
+ @defer.inlineCallbacks
+ def update_pusher_last_stream_ordering_and_success(
+ self, app_id, pushkey, user_id, last_stream_ordering, last_success
+ ):
+ """Update the last stream ordering position we've processed up to for
+ the given pusher.
+
+ Args:
+ app_id (str)
+ pushkey (str)
+ last_stream_ordering (int)
+ last_success (int)
+
+ Returns:
+ Deferred[bool]: True if the pusher still exists; False if it has been deleted.
+ """
+ updated = yield self.db.simple_update(
+ table="pushers",
+ keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+ updatevalues={
+ "last_stream_ordering": last_stream_ordering,
+ "last_success": last_success,
+ },
+ desc="update_pusher_last_stream_ordering_and_success",
+ )
+
+ return bool(updated)
+
+ @defer.inlineCallbacks
+ def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
+ yield self.db.simple_update(
+ table="pushers",
+ keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
+ updatevalues={"failing_since": failing_since},
+ desc="update_pusher_failing_since",
+ )
+
+ @defer.inlineCallbacks
+ def get_throttle_params_by_room(self, pusher_id):
+ res = yield self.db.simple_select_list(
+ "pusher_throttle",
+ {"pusher": pusher_id},
+ ["room_id", "last_sent_ts", "throttle_ms"],
+ desc="get_throttle_params_by_room",
+ )
+
+ params_by_room = {}
+ for row in res:
+ params_by_room[row["room_id"]] = {
+ "last_sent_ts": row["last_sent_ts"],
+ "throttle_ms": row["throttle_ms"],
+ }
+
+ return params_by_room
+
+ @defer.inlineCallbacks
+ def set_throttle_params(self, pusher_id, room_id, params):
+ # no need to lock because `pusher_throttle` has a primary key on
+ # (pusher, room_id) so simple_upsert will retry
+ yield self.db.simple_upsert(
+ "pusher_throttle",
+ {"pusher": pusher_id, "room_id": room_id},
+ params,
+ desc="set_throttle_params",
+ lock=False,
+ )
+
class PusherStore(PusherWorkerStore):
def get_pushers_stream_token(self):
@@ -282,81 +360,3 @@ class PusherStore(PusherWorkerStore):
with self._pushers_id_gen.get_next() as stream_id:
yield self.db.runInteraction("delete_pusher", delete_pusher_txn, stream_id)
-
- @defer.inlineCallbacks
- def update_pusher_last_stream_ordering(
- self, app_id, pushkey, user_id, last_stream_ordering
- ):
- yield self.db.simple_update_one(
- "pushers",
- {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
- {"last_stream_ordering": last_stream_ordering},
- desc="update_pusher_last_stream_ordering",
- )
-
- @defer.inlineCallbacks
- def update_pusher_last_stream_ordering_and_success(
- self, app_id, pushkey, user_id, last_stream_ordering, last_success
- ):
- """Update the last stream ordering position we've processed up to for
- the given pusher.
-
- Args:
- app_id (str)
- pushkey (str)
- last_stream_ordering (int)
- last_success (int)
-
- Returns:
- Deferred[bool]: True if the pusher still exists; False if it has been deleted.
- """
- updated = yield self.db.simple_update(
- table="pushers",
- keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
- updatevalues={
- "last_stream_ordering": last_stream_ordering,
- "last_success": last_success,
- },
- desc="update_pusher_last_stream_ordering_and_success",
- )
-
- return bool(updated)
-
- @defer.inlineCallbacks
- def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
- yield self.db.simple_update(
- table="pushers",
- keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
- updatevalues={"failing_since": failing_since},
- desc="update_pusher_failing_since",
- )
-
- @defer.inlineCallbacks
- def get_throttle_params_by_room(self, pusher_id):
- res = yield self.db.simple_select_list(
- "pusher_throttle",
- {"pusher": pusher_id},
- ["room_id", "last_sent_ts", "throttle_ms"],
- desc="get_throttle_params_by_room",
- )
-
- params_by_room = {}
- for row in res:
- params_by_room[row["room_id"]] = {
- "last_sent_ts": row["last_sent_ts"],
- "throttle_ms": row["throttle_ms"],
- }
-
- return params_by_room
-
- @defer.inlineCallbacks
- def set_throttle_params(self, pusher_id, room_id, params):
- # no need to lock because `pusher_throttle` has a primary key on
- # (pusher, room_id) so simple_upsert will retry
- yield self.db.simple_upsert(
- "pusher_throttle",
- {"pusher": pusher_id, "room_id": room_id},
- params,
- desc="set_throttle_params",
- lock=False,
- )
diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index 96e54d145e..0d932a0672 100644
--- a/synapse/storage/data_stores/main/receipts.py
+++ b/synapse/storage/data_stores/main/receipts.py
@@ -58,7 +58,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
@cachedInlineCallbacks()
def get_users_with_read_receipts_in_room(self, room_id):
receipts = yield self.get_receipts_for_room(room_id, "m.read")
- return set(r["user_id"] for r in receipts)
+ return {r["user_id"] for r in receipts}
@cached(num_args=2)
def get_receipts_for_room(self, room_id, receipt_type):
@@ -283,7 +283,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
args.append(limit)
txn.execute(sql, args)
- return list(r[0:5] + (json.loads(r[5]),) for r in txn)
+ return [r[0:5] + (json.loads(r[5]),) for r in txn]
return self.db.runInteraction(
"get_all_updated_receipts", get_all_updated_receipts_txn
diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index 49306642ed..40f891cc64 100644
--- a/synapse/storage/data_stores/main/registration.py
+++ b/synapse/storage/data_stores/main/registration.py
@@ -158,6 +158,28 @@ class RegistrationWorkerStore(SQLBaseStore):
)
@defer.inlineCallbacks
+ def get_expired_users(self):
+ """Get IDs of all expired users
+
+ Returns:
+ Deferred[list[str]]: List of expired user IDs
+ """
+
+ def get_expired_users_txn(txn, now_ms):
+ sql = """
+ SELECT user_id from account_validity
+ WHERE expiration_ts_ms <= ?
+ """
+ txn.execute(sql, (now_ms,))
+ rows = txn.fetchall()
+ return [row[0] for row in rows]
+
+ res = yield self.db.runInteraction(
+ "get_expired_users", get_expired_users_txn, self.clock.time_msec()
+ )
+ defer.returnValue(res)
+
+ @defer.inlineCallbacks
def set_renewal_token_for_user(self, user_id, renewal_token):
"""Defines a renewal token for a given user.
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index 9a17e336ba..009ad8c038 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -295,6 +295,24 @@ class RoomWorkerStore(SQLBaseStore):
desc="is_room_blocked",
)
+ @defer.inlineCallbacks
+ def is_room_published(self, room_id):
+ """Check whether a room has been published in the local public room
+ directory.
+
+ Args:
+ room_id (str)
+ Returns:
+ bool: Whether the room is currently published in the room directory
+ """
+ # Get room information
+ room_info = yield self.get_room(room_id)
+ if not room_info:
+ defer.returnValue(False)
+
+ # Check the is_public value
+ defer.returnValue(room_info.get("is_public", False))
+
async def get_rooms_paginate(
self,
start: int,
@@ -449,6 +467,11 @@ class RoomWorkerStore(SQLBaseStore):
Returns:
dict[int, int]: "min_lifetime" and "max_lifetime" for this room.
"""
+ # If the room retention feature is disabled, return a policy with no minimum nor
+ # maximum, in order not to filter out events we should filter out when sending to
+ # the client.
+ if not self.config.retention_enabled:
+ defer.returnValue({"min_lifetime": None, "max_lifetime": None})
def get_retention_policy_for_room_txn(txn):
txn.execute(
@@ -954,6 +977,23 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
self.config = hs.config
+ async def upsert_room_on_join(self, room_id: str, room_version: RoomVersion):
+ """Ensure that the room is stored in the table
+
+ Called when we join a room over federation, and overwrites any room version
+ currently in the table.
+ """
+ await self.db.simple_upsert(
+ desc="upsert_room_on_join",
+ table="rooms",
+ keyvalues={"room_id": room_id},
+ values={"room_version": room_version.identifier},
+ insertion_values={"is_public": False, "creator": ""},
+ # rooms has a unique constraint on room_id, so no need to lock when doing an
+ # emulated upsert.
+ lock=False,
+ )
+
@defer.inlineCallbacks
def store_room(
self,
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index d5ced05701..d5bd0cb5cf 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -465,7 +465,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
txn.execute(sql % (clause,), args)
- return set(row[0] for row in txn)
+ return {row[0] for row in txn}
return await self.db.runInteraction(
"get_users_server_still_shares_room_with",
@@ -826,7 +826,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
GROUP BY room_id, user_id;
"""
txn.execute(sql, (user_id,))
- return set(row[0] for row in txn if row[1] == 0)
+ return {row[0] for row in txn if row[1] == 0}
return self.db.runInteraction(
"get_forgotten_rooms_for_user", _get_forgotten_rooms_for_user_txn
diff --git a/synapse/storage/data_stores/main/schema/full_schemas/54/full.sql.postgres b/synapse/storage/data_stores/main/schema/full_schemas/54/full.sql.postgres
index 889a9a0ce4..20c5af2eb7 100644
--- a/synapse/storage/data_stores/main/schema/full_schemas/54/full.sql.postgres
+++ b/synapse/storage/data_stores/main/schema/full_schemas/54/full.sql.postgres
@@ -658,10 +658,19 @@ CREATE TABLE presence_stream (
+CREATE TABLE profile_replication_status (
+ host text NOT NULL,
+ last_synced_batch bigint NOT NULL
+);
+
+
+
CREATE TABLE profiles (
user_id text NOT NULL,
displayname text,
- avatar_url text
+ avatar_url text,
+ batch bigint,
+ active smallint DEFAULT 1 NOT NULL
);
@@ -1788,6 +1797,10 @@ CREATE INDEX presence_stream_user_id ON presence_stream USING btree (user_id);
+CREATE INDEX profiles_batch_idx ON profiles USING btree (batch);
+
+
+
CREATE INDEX public_room_index ON rooms USING btree (is_public);
diff --git a/synapse/storage/data_stores/main/schema/full_schemas/54/full.sql.sqlite b/synapse/storage/data_stores/main/schema/full_schemas/54/full.sql.sqlite
index a0411ede7e..e28ec3fa45 100644
--- a/synapse/storage/data_stores/main/schema/full_schemas/54/full.sql.sqlite
+++ b/synapse/storage/data_stores/main/schema/full_schemas/54/full.sql.sqlite
@@ -6,7 +6,7 @@ CREATE TABLE presence_allow_inbound( observed_user_id TEXT NOT NULL, observer_us
CREATE TABLE users( name TEXT, password_hash TEXT, creation_ts BIGINT, admin SMALLINT DEFAULT 0 NOT NULL, upgrade_ts BIGINT, is_guest SMALLINT DEFAULT 0 NOT NULL, appservice_id TEXT, consent_version TEXT, consent_server_notice_sent TEXT, user_type TEXT DEFAULT NULL, UNIQUE(name) );
CREATE TABLE access_tokens( id BIGINT PRIMARY KEY, user_id TEXT NOT NULL, device_id TEXT, token TEXT NOT NULL, last_used BIGINT, UNIQUE(token) );
CREATE TABLE user_ips ( user_id TEXT NOT NULL, access_token TEXT NOT NULL, device_id TEXT, ip TEXT NOT NULL, user_agent TEXT NOT NULL, last_seen BIGINT NOT NULL );
-CREATE TABLE profiles( user_id TEXT NOT NULL, displayname TEXT, avatar_url TEXT, UNIQUE(user_id) );
+CREATE TABLE profiles( user_id TEXT NOT NULL, displayname TEXT, avatar_url TEXT, batch BIGINT DEFAULT NULL, active SMALLINT DEFAULT 1 NOT NULL, UNIQUE(user_id) );
CREATE TABLE received_transactions( transaction_id TEXT, origin TEXT, ts BIGINT, response_code INTEGER, response_json bytea, has_been_referenced smallint default 0, UNIQUE (transaction_id, origin) );
CREATE TABLE destinations( destination TEXT PRIMARY KEY, retry_last_ts BIGINT, retry_interval INTEGER );
CREATE TABLE events( stream_ordering INTEGER PRIMARY KEY, topological_ordering BIGINT NOT NULL, event_id TEXT NOT NULL, type TEXT NOT NULL, room_id TEXT NOT NULL, content TEXT, unrecognized_keys TEXT, processed BOOL NOT NULL, outlier BOOL NOT NULL, depth BIGINT DEFAULT 0 NOT NULL, origin_server_ts BIGINT, received_ts BIGINT, sender TEXT, contains_url BOOLEAN, UNIQUE (event_id) );
@@ -202,6 +202,8 @@ CREATE INDEX group_users_u_idx ON group_users(user_id);
CREATE INDEX group_invites_u_idx ON group_invites(user_id);
CREATE UNIQUE INDEX group_rooms_g_idx ON group_rooms(group_id, room_id);
CREATE INDEX group_rooms_r_idx ON group_rooms(room_id);
+CREATE INDEX profiles_batch_idx ON profiles(batch);
+CREATE TABLE profile_replication_status ( host TEXT NOT NULL, last_synced_batch BIGINT NOT NULL );
CREATE TABLE user_daily_visits ( user_id TEXT NOT NULL, device_id TEXT, timestamp BIGINT NOT NULL );
CREATE INDEX user_daily_visits_uts_idx ON user_daily_visits(user_id, timestamp);
CREATE INDEX user_daily_visits_ts_idx ON user_daily_visits(timestamp);
diff --git a/synapse/storage/data_stores/main/schema/full_schemas/README.md b/synapse/storage/data_stores/main/schema/full_schemas/README.md
index bbd3f18604..c00f287190 100644
--- a/synapse/storage/data_stores/main/schema/full_schemas/README.md
+++ b/synapse/storage/data_stores/main/schema/full_schemas/README.md
@@ -1,13 +1,21 @@
-# Building full schema dumps
+# Synapse Database Schemas
-These schemas need to be made from a database that has had all background updates run.
+These schemas are used as a basis to create brand new Synapse databases, on both
+SQLite3 and Postgres.
-To do so, use `scripts-dev/make_full_schema.sh`. This will produce
-`full.sql.postgres ` and `full.sql.sqlite` files.
+## Building full schema dumps
+
+If you want to recreate these schemas, they need to be made from a database that
+has had all background updates run.
+
+To do so, use `scripts-dev/make_full_schema.sh`. This will produce new
+`full.sql.postgres ` and `full.sql.sqlite` files.
Ensure postgres is installed and your user has the ability to run bash commands
-such as `createdb`.
+such as `createdb`, then call
+
+ ./scripts-dev/make_full_schema.sh -p postgres_username -o output_dir/
-```
-./scripts-dev/make_full_schema.sh -p postgres_username -o output_dir/
-```
+There are currently two folders with full-schema snapshots. `16` is a snapshot
+from 2015, for historical reference. The other contains the most recent full
+schema snapshot.
diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py
index 3d34103e67..3a3b9a8e72 100644
--- a/synapse/storage/data_stores/main/state.py
+++ b/synapse/storage/data_stores/main/state.py
@@ -321,7 +321,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
desc="get_referenced_state_groups",
)
- return set(row["state_group"] for row in rows)
+ return {row["state_group"] for row in rows}
class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
@@ -367,7 +367,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
"""
txn.execute(sql, (last_room_id, batch_size))
- room_ids = list(row[0] for row in txn)
+ room_ids = [row[0] for row in txn]
if not room_ids:
return True, set()
@@ -384,7 +384,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
txn.execute(sql, (last_room_id, room_ids[-1], "%:" + self.server_name))
- joined_room_ids = set(row[0] for row in txn)
+ joined_room_ids = {row[0] for row in txn}
left_rooms = set(room_ids) - joined_room_ids
@@ -404,7 +404,7 @@ class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
retcols=("state_key",),
)
- potentially_left_users = set(row["state_key"] for row in rows)
+ potentially_left_users = {row["state_key"] for row in rows}
# Now lets actually delete the rooms from the DB.
self.db.simple_delete_many_txn(
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index 056b25b13a..ada5cce6c2 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -346,11 +346,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
from_key (str): The room_key portion of a StreamToken
"""
from_key = RoomStreamToken.parse_stream_token(from_key).stream
- return set(
+ return {
room_id
for room_id in room_ids
if self._events_stream_cache.has_entity_changed(room_id, from_key)
- )
+ }
@defer.inlineCallbacks
def get_room_events_stream_for_room(
@@ -679,11 +679,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
events_before = yield self.get_events_as_list(
- [e for e in results["before"]["event_ids"]], get_prev_content=True
+ list(results["before"]["event_ids"]), get_prev_content=True
)
events_after = yield self.get_events_as_list(
- [e for e in results["after"]["event_ids"]], get_prev_content=True
+ list(results["after"]["event_ids"]), get_prev_content=True
)
return {
diff --git a/synapse/storage/data_stores/main/user_erasure_store.py b/synapse/storage/data_stores/main/user_erasure_store.py
index af8025bc17..ec6b8a4ffd 100644
--- a/synapse/storage/data_stores/main/user_erasure_store.py
+++ b/synapse/storage/data_stores/main/user_erasure_store.py
@@ -63,9 +63,9 @@ class UserErasureWorkerStore(SQLBaseStore):
retcols=("user_id",),
desc="are_users_erased",
)
- erased_users = set(row["user_id"] for row in rows)
+ erased_users = {row["user_id"] for row in rows}
- res = dict((u, u in erased_users) for u in user_ids)
+ res = {u: u in erased_users for u in user_ids}
return res
diff --git a/synapse/storage/data_stores/state/store.py b/synapse/storage/data_stores/state/store.py
index c4ee9b7ccb..57a5267663 100644
--- a/synapse/storage/data_stores/state/store.py
+++ b/synapse/storage/data_stores/state/store.py
@@ -520,11 +520,11 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
retcols=("state_group",),
)
- remaining_state_groups = set(
+ remaining_state_groups = {
row["state_group"]
for row in rows
if row["state_group"] not in state_groups_to_delete
- )
+ }
logger.info(
"[purge] de-delta-ing %i remaining state groups",
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 3eeb2f7c04..1953614401 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -554,8 +554,8 @@ class Database(object):
Returns:
A list of dicts where the key is the column header.
"""
- col_headers = list(intern(str(column[0])) for column in cursor.description)
- results = list(dict(zip(col_headers, row)) for row in cursor)
+ col_headers = [intern(str(column[0])) for column in cursor.description]
+ results = [dict(zip(col_headers, row)) for row in cursor]
return results
def execute(self, desc, decoder, query, *args):
@@ -1504,7 +1504,7 @@ class Database(object):
def make_in_list_sql_clause(
database_engine, column: str, iterable: Iterable
-) -> Tuple[str, Iterable]:
+) -> Tuple[str, list]:
"""Returns an SQL clause that checks the given column is in the iterable.
On SQLite this expands to `column IN (?, ?, ...)`, whereas on Postgres
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index a077345960..53b3f372b0 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -53,7 +53,7 @@ class PostgresEngine(object):
if rows and rows[0][0] != "UTF8":
raise IncorrectDatabaseSetup(
"Database has incorrect encoding: '%s' instead of 'UTF8'\n"
- "See docs/postgres.rst for more information." % (rows[0][0],)
+ "See docs/postgres.md for more information." % (rows[0][0],)
)
txn.execute(
@@ -62,12 +62,16 @@ class PostgresEngine(object):
collation, ctype = txn.fetchone()
if collation != "C":
logger.warning(
- "Database has incorrect collation of %r. Should be 'C'", collation
+ "Database has incorrect collation of %r. Should be 'C'\n"
+ "See docs/postgres.md for more information.",
+ collation,
)
if ctype != "C":
logger.warning(
- "Database has incorrect ctype of %r. Should be 'C'", ctype
+ "Database has incorrect ctype of %r. Should be 'C'\n"
+ "See docs/postgres.md for more information.",
+ ctype,
)
def check_new_database(self, txn):
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index b950550f23..0f9ac1cf09 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -602,14 +602,14 @@ class EventsPersistenceStorage(object):
event_id_to_state_group.update(event_to_groups)
# State groups of old_latest_event_ids
- old_state_groups = set(
+ old_state_groups = {
event_id_to_state_group[evid] for evid in old_latest_event_ids
- )
+ }
# State groups of new_latest_event_ids
- new_state_groups = set(
+ new_state_groups = {
event_id_to_state_group[evid] for evid in new_latest_event_ids
- )
+ }
# If they old and new groups are the same then we don't need to do
# anything.
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index c285ef52a0..6cb7d4b922 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -278,13 +278,17 @@ def _upgrade_existing_database(
the current_version wasn't generated by applying those delta files.
database_engine (DatabaseEngine)
config (synapse.config.homeserver.HomeServerConfig|None):
- application config, or None if we are connecting to an existing
- database which we expect to be configured already
+ None if we are initialising a blank database, otherwise the application
+ config
data_stores (list[str]): The names of the data stores to instantiate
on the given database.
is_empty (bool): Is this a blank database? I.e. do we need to run the
upgrade portions of the delta scripts.
"""
+ if is_empty:
+ assert not applied_delta_files
+ else:
+ assert config
if current_version > SCHEMA_VERSION:
raise ValueError(
@@ -292,6 +296,13 @@ def _upgrade_existing_database(
+ "new for the server to understand"
)
+ # some of the deltas assume that config.server_name is set correctly, so now
+ # is a good time to run the sanity check.
+ if not is_empty and "main" in data_stores:
+ from synapse.storage.data_stores.main import check_database_before_upgrade
+
+ check_database_before_upgrade(cur, database_engine, config)
+
start_ver = current_version
if not upgraded:
start_ver += 1
@@ -345,9 +356,9 @@ def _upgrade_existing_database(
"Could not open delta dir for version %d: %s" % (v, directory)
)
- duplicates = set(
+ duplicates = {
file_name for file_name, count in file_name_counter.items() if count > 1
- )
+ }
if duplicates:
# We don't support using the same file name in the same delta version.
raise PrepareDatabaseException(
@@ -454,7 +465,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
),
(modname,),
)
- applied_deltas = set(d for d, in cur)
+ applied_deltas = {d for d, in cur}
for (name, stream) in names_and_streams:
if name in applied_deltas:
continue
diff --git a/synapse/storage/schema/delta/48/profiles_batch.sql b/synapse/storage/schema/delta/48/profiles_batch.sql
new file mode 100644
index 0000000000..e744c02fe8
--- /dev/null
+++ b/synapse/storage/schema/delta/48/profiles_batch.sql
@@ -0,0 +1,36 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Add a batch number to track changes to profiles and the
+ * order they're made in so we can replicate user profiles
+ * to other hosts as they change
+ */
+ALTER TABLE profiles ADD COLUMN batch BIGINT DEFAULT NULL;
+
+/*
+ * Index on the batch number so we can get profiles
+ * by their batch
+ */
+CREATE INDEX profiles_batch_idx ON profiles(batch);
+
+/*
+ * A table to track what batch of user profiles has been
+ * synced to what profile replication target.
+ */
+CREATE TABLE profile_replication_status (
+ host TEXT NOT NULL,
+ last_synced_batch BIGINT NOT NULL
+);
diff --git a/synapse/storage/schema/delta/50/profiles_deactivated_users.sql b/synapse/storage/schema/delta/50/profiles_deactivated_users.sql
new file mode 100644
index 0000000000..c8893ecbe8
--- /dev/null
+++ b/synapse/storage/schema/delta/50/profiles_deactivated_users.sql
@@ -0,0 +1,23 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * A flag saying whether the user owning the profile has been deactivated
+ * This really belongs on the users table, not here, but the users table
+ * stores users by their full user_id and profiles stores them by localpart,
+ * so we can't easily join between the two tables. Plus, the batch number
+ * realy ought to represent data in this table that has changed.
+ */
+ALTER TABLE profiles ADD COLUMN active SMALLINT DEFAULT 1 NOT NULL;
diff --git a/synapse/storage/schema/delta/55/profile_replication_status_index.sql b/synapse/storage/schema/delta/55/profile_replication_status_index.sql
new file mode 100644
index 0000000000..18a0f7e10c
--- /dev/null
+++ b/synapse/storage/schema/delta/55/profile_replication_status_index.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('profile_replication_status_host_index', '{}');
diff --git a/synapse/storage/data_stores/main/schema/delta/56/room_retention.sql b/synapse/storage/schema/delta/55/room_retention.sql
index ee6cdf7a14..ee6cdf7a14 100644
--- a/synapse/storage/data_stores/main/schema/delta/56/room_retention.sql
+++ b/synapse/storage/schema/delta/55/room_retention.sql
|