diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 539584288d..067820a5da 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -33,16 +33,9 @@ logger = logging.getLogger(__name__)
LAST_SEEN_GRANULARITY = 120 * 1000
-class ClientIpStore(background_updates.BackgroundUpdateStore):
+class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore):
def __init__(self, db_conn, hs):
-
- self.client_ip_last_seen = Cache(
- name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR
- )
-
- super(ClientIpStore, self).__init__(db_conn, hs)
-
- self.user_ips_max_age = hs.config.user_ips_max_age
+ super(ClientIpBackgroundUpdateStore, self).__init__(db_conn, hs)
self.register_background_index_update(
"user_ips_device_index",
@@ -92,19 +85,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
"devices_last_seen", self._devices_last_seen_update
)
- # (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
- self._batch_row_update = {}
-
- self._client_ip_looper = self._clock.looping_call(
- self._update_client_ips_batch, 5 * 1000
- )
- self.hs.get_reactor().addSystemEventTrigger(
- "before", "shutdown", self._update_client_ips_batch
- )
-
- if self.user_ips_max_age:
- self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)
-
@defer.inlineCallbacks
def _remove_user_ip_nonunique(self, progress, batch_size):
def f(conn):
@@ -304,6 +284,110 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
return batch_size
@defer.inlineCallbacks
+ def _devices_last_seen_update(self, progress, batch_size):
+ """Background update to insert last seen info into devices table
+ """
+
+ last_user_id = progress.get("last_user_id", "")
+ last_device_id = progress.get("last_device_id", "")
+
+ def _devices_last_seen_update_txn(txn):
+ # This consists of two queries:
+ #
+ # 1. The sub-query searches for the next N devices and joins
+ # against user_ips to find the max last_seen associated with
+ # that device.
+ # 2. The outer query then joins again against user_ips on
+ # user/device/last_seen. This *should* hopefully only
+ # return one row, but if it does return more than one then
+ # we'll just end up updating the same device row multiple
+ # times, which is fine.
+
+ if self.database_engine.supports_tuple_comparison:
+ where_clause = "(user_id, device_id) > (?, ?)"
+ where_args = [last_user_id, last_device_id]
+ else:
+ # We explicitly do a `user_id >= ? AND (...)` here to ensure
+ # that an index is used, as doing `user_id > ? OR (user_id = ? AND ...)`
+ # makes it hard for query optimiser to tell that it can use the
+ # index on user_id
+ where_clause = "user_id >= ? AND (user_id > ? OR device_id > ?)"
+ where_args = [last_user_id, last_user_id, last_device_id]
+
+ sql = """
+ SELECT
+ last_seen, ip, user_agent, user_id, device_id
+ FROM (
+ SELECT
+ user_id, device_id, MAX(u.last_seen) AS last_seen
+ FROM devices
+ INNER JOIN user_ips AS u USING (user_id, device_id)
+ WHERE %(where_clause)s
+ GROUP BY user_id, device_id
+ ORDER BY user_id ASC, device_id ASC
+ LIMIT ?
+ ) c
+ INNER JOIN user_ips AS u USING (user_id, device_id, last_seen)
+ """ % {
+ "where_clause": where_clause
+ }
+ txn.execute(sql, where_args + [batch_size])
+
+ rows = txn.fetchall()
+ if not rows:
+ return 0
+
+ sql = """
+ UPDATE devices
+ SET last_seen = ?, ip = ?, user_agent = ?
+ WHERE user_id = ? AND device_id = ?
+ """
+ txn.execute_batch(sql, rows)
+
+ _, _, _, user_id, device_id = rows[-1]
+ self._background_update_progress_txn(
+ txn,
+ "devices_last_seen",
+ {"last_user_id": user_id, "last_device_id": device_id},
+ )
+
+ return len(rows)
+
+ updated = yield self.runInteraction(
+ "_devices_last_seen_update", _devices_last_seen_update_txn
+ )
+
+ if not updated:
+ yield self._end_background_update("devices_last_seen")
+
+ return updated
+
+
+class ClientIpStore(ClientIpBackgroundUpdateStore):
+ def __init__(self, db_conn, hs):
+
+ self.client_ip_last_seen = Cache(
+ name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR
+ )
+
+ super(ClientIpStore, self).__init__(db_conn, hs)
+
+ self.user_ips_max_age = hs.config.user_ips_max_age
+
+ # (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
+ self._batch_row_update = {}
+
+ self._client_ip_looper = self._clock.looping_call(
+ self._update_client_ips_batch, 5 * 1000
+ )
+ self.hs.get_reactor().addSystemEventTrigger(
+ "before", "shutdown", self._update_client_ips_batch
+ )
+
+ if self.user_ips_max_age:
+ self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)
+
+ @defer.inlineCallbacks
def insert_client_ip(
self, user_id, access_token, ip, user_agent, device_id, now=None
):
@@ -454,53 +538,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
for (access_token, ip), (user_agent, last_seen) in iteritems(results)
)
- @defer.inlineCallbacks
- def _devices_last_seen_update(self, progress, batch_size):
- """Background update to insert last seen info into devices table
- """
-
- last_user_id = progress.get("last_user_id", "")
- last_device_id = progress.get("last_device_id", "")
-
- def _devices_last_seen_update_txn(txn):
- sql = """
- SELECT u.last_seen, u.ip, u.user_agent, user_id, device_id FROM devices
- INNER JOIN user_ips AS u USING (user_id, device_id)
- WHERE user_id > ? OR (user_id = ? AND device_id > ?)
- ORDER BY user_id ASC, device_id ASC
- LIMIT ?
- """
- txn.execute(sql, (last_user_id, last_user_id, last_device_id, batch_size))
-
- rows = txn.fetchall()
- if not rows:
- return 0
-
- sql = """
- UPDATE devices
- SET last_seen = ?, ip = ?, user_agent = ?
- WHERE user_id = ? AND device_id = ?
- """
- txn.execute_batch(sql, rows)
-
- _, _, _, user_id, device_id = rows[-1]
- self._background_update_progress_txn(
- txn,
- "devices_last_seen",
- {"last_user_id": user_id, "last_device_id": device_id},
- )
-
- return len(rows)
-
- updated = yield self.runInteraction(
- "_devices_last_seen_update", _devices_last_seen_update_txn
- )
-
- if not updated:
- yield self._end_background_update("devices_last_seen")
-
- return updated
-
@wrap_as_background_process("prune_old_user_ips")
async def _prune_old_user_ips(self):
"""Removes entries in user IPs older than the configured period.
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 6b7458304e..70bc2bb2cc 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -208,11 +208,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
)
-class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
+class DeviceInboxBackgroundUpdateStore(BackgroundUpdateStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
def __init__(self, db_conn, hs):
- super(DeviceInboxStore, self).__init__(db_conn, hs)
+ super(DeviceInboxBackgroundUpdateStore, self).__init__(db_conn, hs)
self.register_background_index_update(
"device_inbox_stream_index",
@@ -225,6 +225,26 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
)
+ @defer.inlineCallbacks
+ def _background_drop_index_device_inbox(self, progress, batch_size):
+ def reindex_txn(conn):
+ txn = conn.cursor()
+ txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
+ txn.close()
+
+ yield self.runWithConnection(reindex_txn)
+
+ yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID)
+
+ return 1
+
+
+class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
+ DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
+
+ def __init__(self, db_conn, hs):
+ super(DeviceInboxStore, self).__init__(db_conn, hs)
+
# Map of (user_id, device_id) to the last stream_id that has been
# deleted up to. This is so that we can no op deletions.
self._last_device_delete_cache = ExpiringCache(
@@ -435,16 +455,3 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
return self.runInteraction(
"get_all_new_device_messages", get_all_new_device_messages_txn
)
-
- @defer.inlineCallbacks
- def _background_drop_index_device_inbox(self, progress, batch_size):
- def reindex_txn(conn):
- txn = conn.cursor()
- txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
- txn.close()
-
- yield self.runWithConnection(reindex_txn)
-
- yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID)
-
- return 1
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 79a58df591..111bfb3d64 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -512,17 +512,9 @@ class DeviceWorkerStore(SQLBaseStore):
return results
-class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
+class DeviceBackgroundUpdateStore(BackgroundUpdateStore):
def __init__(self, db_conn, hs):
- super(DeviceStore, self).__init__(db_conn, hs)
-
- # Map of (user_id, device_id) -> bool. If there is an entry that implies
- # the device exists.
- self.device_id_exists_cache = Cache(
- name="device_id_exists", keylen=2, max_entries=10000
- )
-
- self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000)
+ super(DeviceBackgroundUpdateStore, self).__init__(db_conn, hs)
self.register_background_index_update(
"device_lists_stream_idx",
@@ -556,6 +548,31 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
)
@defer.inlineCallbacks
+ def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
+ def f(conn):
+ txn = conn.cursor()
+ txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id")
+ txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id")
+ txn.close()
+
+ yield self.runWithConnection(f)
+ yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
+ return 1
+
+
+class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
+ def __init__(self, db_conn, hs):
+ super(DeviceStore, self).__init__(db_conn, hs)
+
+ # Map of (user_id, device_id) -> bool. If there is an entry that implies
+ # the device exists.
+ self.device_id_exists_cache = Cache(
+ name="device_id_exists", keylen=2, max_entries=10000
+ )
+
+ self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000)
+
+ @defer.inlineCallbacks
def store_device(self, user_id, device_id, initial_device_display_name):
"""Ensure the given device is known; add it to the store if not
@@ -910,15 +927,3 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
"_prune_old_outbound_device_pokes",
_prune_txn,
)
-
- @defer.inlineCallbacks
- def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
- def f(conn):
- txn = conn.cursor()
- txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id")
- txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id")
- txn.close()
-
- yield self.runWithConnection(f)
- yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
- return 1
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 289b6bc281..601617b21e 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -72,6 +72,13 @@ class PostgresEngine(object):
"""
return True
+ @property
+ def supports_tuple_comparison(self):
+ """
+ Do we support comparing tuples, i.e. `(a, b) > (c, d)`?
+ """
+ return True
+
def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index e9b9caa49a..ac92109366 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -38,6 +38,14 @@ class Sqlite3Engine(object):
"""
return self.module.sqlite_version_info >= (3, 24, 0)
+ @property
+ def supports_tuple_comparison(self):
+ """
+ Do we support comparing tuples, i.e. `(a, b) > (c, d)`? This requires
+ SQLite 3.15+.
+ """
+ return self.module.sqlite_version_info >= (3, 15, 0)
+
def check_database(self, txn):
pass
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ddf7ab6479..bb6ff0595a 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -23,7 +23,7 @@ from functools import wraps
from six import iteritems, text_type
from six.moves import range
-from canonicaljson import encode_canonical_json, json
+from canonicaljson import json
from prometheus_client import Counter, Histogram
from twisted.internet import defer
@@ -1389,6 +1389,18 @@ class EventsStore(
],
)
+ for event, _ in events_and_contexts:
+ if not event.internal_metadata.is_redacted():
+ # If we're persisting an unredacted event we go and ensure
+ # that we mark any redactions that reference this event as
+ # requiring censoring.
+ self._simple_update_txn(
+ txn,
+ table="redactions",
+ keyvalues={"redacts": event.event_id},
+ updatevalues={"have_censored": False},
+ )
+
def _store_rejected_events_txn(self, txn, events_and_contexts):
"""Add rows to the 'rejections' table for received events which were
rejected
@@ -1552,9 +1564,15 @@ class EventsStore(
def _store_redaction(self, txn, event):
# invalidate the cache for the redacted event
txn.call_after(self._invalidate_get_event_cache, event.redacts)
- txn.execute(
- "INSERT INTO redactions (event_id, redacts) VALUES (?,?)",
- (event.event_id, event.redacts),
+
+ self._simple_insert_txn(
+ txn,
+ table="redactions",
+ values={
+ "event_id": event.event_id,
+ "redacts": event.redacts,
+ "received_ts": self._clock.time_msec(),
+ },
)
@defer.inlineCallbacks
@@ -1571,36 +1589,29 @@ class EventsStore(
if self.hs.config.redaction_retention_period is None:
return
- max_pos = yield self.find_first_stream_ordering_after_ts(
- self._clock.time_msec() - self.hs.config.redaction_retention_period
- )
+ before_ts = self._clock.time_msec() - self.hs.config.redaction_retention_period
# We fetch all redactions that:
# 1. point to an event we have,
- # 2. has a stream ordering from before the cut off, and
+ # 2. has a received_ts from before the cut off, and
# 3. we haven't yet censored.
#
# This is limited to 100 events to ensure that we don't try and do too
# much at once. We'll get called again so this should eventually catch
# up.
- #
- # We use the range [-max_pos, max_pos] to handle backfilled events,
- # which are given negative stream ordering.
sql = """
- SELECT redact_event.event_id, redacts FROM redactions
- INNER JOIN events AS redact_event USING (event_id)
- INNER JOIN events AS original_event ON (
- redact_event.room_id = original_event.room_id
- AND redacts = original_event.event_id
+ SELECT redactions.event_id, redacts FROM redactions
+ LEFT JOIN events AS original_event ON (
+ redacts = original_event.event_id
)
WHERE NOT have_censored
- AND ? <= redact_event.stream_ordering AND redact_event.stream_ordering <= ?
- ORDER BY redact_event.stream_ordering ASC
+ AND redactions.received_ts <= ?
+ ORDER BY redactions.received_ts ASC
LIMIT ?
"""
rows = yield self._execute(
- "_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100
+ "_censor_redactions_fetch", None, sql, before_ts, 100
)
updates = []
@@ -1621,9 +1632,7 @@ class EventsStore(
and original_event.internal_metadata.is_redacted()
):
# Redaction was allowed
- pruned_json = encode_canonical_json(
- prune_event_dict(original_event.get_dict())
- )
+ pruned_json = encode_json(prune_event_dict(original_event.get_dict()))
else:
# Redaction wasn't allowed
pruned_json = None
diff --git a/synapse/storage/events_bg_updates.py b/synapse/storage/events_bg_updates.py
index 6587f31e2b..5717baf48c 100644
--- a/synapse/storage/events_bg_updates.py
+++ b/synapse/storage/events_bg_updates.py
@@ -67,6 +67,10 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update
)
+ self.register_background_update_handler(
+ "redactions_received_ts", self._redactions_received_ts
+ )
+
@defer.inlineCallbacks
def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
@@ -397,3 +401,60 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
)
return num_handled
+
+ @defer.inlineCallbacks
+ def _redactions_received_ts(self, progress, batch_size):
+ """Handles filling out the `received_ts` column in redactions.
+ """
+ last_event_id = progress.get("last_event_id", "")
+
+ def _redactions_received_ts_txn(txn):
+ # Fetch the set of event IDs that we want to update
+ sql = """
+ SELECT event_id FROM redactions
+ WHERE event_id > ?
+ ORDER BY event_id ASC
+ LIMIT ?
+ """
+
+ txn.execute(sql, (last_event_id, batch_size))
+
+ rows = txn.fetchall()
+ if not rows:
+ return 0
+
+ upper_event_id, = rows[-1]
+
+ # Update the redactions with the received_ts.
+ #
+ # Note: Not all events have an associated received_ts, so we
+ # fallback to using origin_server_ts. If we for some reason don't
+ # have an origin_server_ts, lets just use the current timestamp.
+ #
+ # We don't want to leave it null, as then we'll never try and
+ # censor those redactions.
+ sql = """
+ UPDATE redactions
+ SET received_ts = (
+ SELECT COALESCE(received_ts, origin_server_ts, ?) FROM events
+ WHERE events.event_id = redactions.event_id
+ )
+ WHERE ? <= event_id AND event_id <= ?
+ """
+
+ txn.execute(sql, (self._clock.time_msec(), last_event_id, upper_event_id))
+
+ self._background_update_progress_txn(
+ txn, "redactions_received_ts", {"last_event_id": upper_event_id}
+ )
+
+ return len(rows)
+
+ count = yield self.runInteraction(
+ "_redactions_received_ts", _redactions_received_ts_txn
+ )
+
+ if not count:
+ yield self._end_background_update("redactions_received_ts")
+
+ return count
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index c6fa7f82fd..57ce0304e9 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -238,6 +238,20 @@ class EventsWorkerStore(SQLBaseStore):
# we have to recheck auth now.
if not allow_rejected and entry.event.type == EventTypes.Redaction:
+ if not hasattr(entry.event, "redacts"):
+ # A redacted redaction doesn't have a `redacts` key, in
+ # which case lets just withhold the event.
+ #
+ # Note: Most of the time if the redactions has been
+ # redacted we still have the un-redacted event in the DB
+ # and so we'll still see the `redacts` key. However, this
+ # isn't always true e.g. if we have censored the event.
+ logger.debug(
+ "Withholding redaction event %s as we don't have redacts key",
+ event_id,
+ )
+ continue
+
redacted_event_id = entry.event.redacts
event_map = yield self._get_events_from_cache_or_db([redacted_event_id])
original_event_entry = event_map.get(redacted_event_id)
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 6b1238ce4a..84b5f3ad5e 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -15,11 +15,9 @@
from synapse.storage.background_updates import BackgroundUpdateStore
-class MediaRepositoryStore(BackgroundUpdateStore):
- """Persistence for attachments and avatars"""
-
+class MediaRepositoryBackgroundUpdateStore(BackgroundUpdateStore):
def __init__(self, db_conn, hs):
- super(MediaRepositoryStore, self).__init__(db_conn, hs)
+ super(MediaRepositoryBackgroundUpdateStore, self).__init__(db_conn, hs)
self.register_background_index_update(
update_name="local_media_repository_url_idx",
@@ -29,6 +27,13 @@ class MediaRepositoryStore(BackgroundUpdateStore):
where_clause="url_cache IS NOT NULL",
)
+
+class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
+ """Persistence for attachments and avatars"""
+
+ def __init__(self, db_conn, hs):
+ super(MediaRepositoryStore, self).__init__(db_conn, hs)
+
def get_local_media(self, media_id):
"""Get the metadata for a local piece of media
Returns:
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 1f878e6575..c4e24edff2 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -183,8 +183,8 @@ class PushRulesWorkerStore(
return results
@defer.inlineCallbacks
- def move_push_rule_from_room_to_room(self, new_room_id, user_id, rule):
- """Move a single push rule from one room to another for a specific user.
+ def copy_push_rule_from_room_to_room(self, new_room_id, user_id, rule):
+ """Copy a single push rule from one room to another for a specific user.
Args:
new_room_id (str): ID of the new room.
@@ -209,14 +209,11 @@ class PushRulesWorkerStore(
actions=rule["actions"],
)
- # Delete push rule for the old room
- yield self.delete_push_rule(user_id, rule["rule_id"])
-
@defer.inlineCallbacks
- def move_push_rules_from_room_to_room_for_user(
+ def copy_push_rules_from_room_to_room_for_user(
self, old_room_id, new_room_id, user_id
):
- """Move all of the push rules from one room to another for a specific
+ """Copy all of the push rules from one room to another for a specific
user.
Args:
@@ -227,15 +224,14 @@ class PushRulesWorkerStore(
# Retrieve push rules for this user
user_push_rules = yield self.get_push_rules_for_user(user_id)
- # Get rules relating to the old room, move them to the new room, then
- # delete them from the old room
+ # Get rules relating to the old room and copy them to the new room
for rule in user_push_rules:
conditions = rule.get("conditions", [])
if any(
(c.get("key") == "room_id" and c.get("pattern") == old_room_id)
for c in conditions
):
- yield self.move_push_rule_from_room_to_room(new_room_id, user_id, rule)
+ yield self.copy_push_rule_from_room_to_room(new_room_id, user_id, rule)
@defer.inlineCallbacks
def bulk_get_push_rules_for_room(self, event, context):
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 241a7be51e..6c5b29288a 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -493,7 +493,9 @@ class RegistrationWorkerStore(SQLBaseStore):
"""
def _find_next_generated_user_id(txn):
- txn.execute("SELECT name FROM users")
+ # We bound between '@1' and '@a' to avoid pulling the entire table
+ # out.
+ txn.execute("SELECT name FROM users WHERE '@1' <= name AND name < '@a'")
regex = re.compile(r"^@(\d+):")
@@ -785,13 +787,14 @@ class RegistrationWorkerStore(SQLBaseStore):
)
-class RegistrationStore(
+class RegistrationBackgroundUpdateStore(
RegistrationWorkerStore, background_updates.BackgroundUpdateStore
):
def __init__(self, db_conn, hs):
- super(RegistrationStore, self).__init__(db_conn, hs)
+ super(RegistrationBackgroundUpdateStore, self).__init__(db_conn, hs)
self.clock = hs.get_clock()
+ self.config = hs.config
self.register_background_index_update(
"access_tokens_device_index",
@@ -807,8 +810,6 @@ class RegistrationStore(
columns=["creation_ts"],
)
- self._account_validity = hs.config.account_validity
-
# we no longer use refresh tokens, but it's possible that some people
# might have a background update queued to build this index. Just
# clear the background update.
@@ -822,17 +823,6 @@ class RegistrationStore(
"users_set_deactivated_flag", self._background_update_set_deactivated_flag
)
- # Create a background job for culling expired 3PID validity tokens
- def start_cull():
- # run as a background process to make sure that the database transactions
- # have a logcontext to report to
- return run_as_background_process(
- "cull_expired_threepid_validation_tokens",
- self.cull_expired_threepid_validation_tokens,
- )
-
- hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS)
-
@defer.inlineCallbacks
def _background_update_set_deactivated_flag(self, progress, batch_size):
"""Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1
@@ -895,6 +885,54 @@ class RegistrationStore(
return nb_processed
@defer.inlineCallbacks
+ def _bg_user_threepids_grandfather(self, progress, batch_size):
+ """We now track which identity servers a user binds their 3PID to, so
+ we need to handle the case of existing bindings where we didn't track
+ this.
+
+ We do this by grandfathering in existing user threepids assuming that
+ they used one of the server configured trusted identity servers.
+ """
+ id_servers = set(self.config.trusted_third_party_id_servers)
+
+ def _bg_user_threepids_grandfather_txn(txn):
+ sql = """
+ INSERT INTO user_threepid_id_server
+ (user_id, medium, address, id_server)
+ SELECT user_id, medium, address, ?
+ FROM user_threepids
+ """
+
+ txn.executemany(sql, [(id_server,) for id_server in id_servers])
+
+ if id_servers:
+ yield self.runInteraction(
+ "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn
+ )
+
+ yield self._end_background_update("user_threepids_grandfather")
+
+ return 1
+
+
+class RegistrationStore(RegistrationBackgroundUpdateStore):
+ def __init__(self, db_conn, hs):
+ super(RegistrationStore, self).__init__(db_conn, hs)
+
+ self._account_validity = hs.config.account_validity
+
+ # Create a background job for culling expired 3PID validity tokens
+ def start_cull():
+ # run as a background process to make sure that the database transactions
+ # have a logcontext to report to
+ return run_as_background_process(
+ "cull_expired_threepid_validation_tokens",
+ self.cull_expired_threepid_validation_tokens,
+ )
+
+ hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS)
+
+ @defer.inlineCallbacks
def add_access_token_to_user(self, user_id, token, device_id, valid_until_ms):
"""Adds an access token for the given user.
@@ -1242,36 +1280,6 @@ class RegistrationStore(
desc="get_users_pending_deactivation",
)
- @defer.inlineCallbacks
- def _bg_user_threepids_grandfather(self, progress, batch_size):
- """We now track which identity servers a user binds their 3PID to, so
- we need to handle the case of existing bindings where we didn't track
- this.
-
- We do this by grandfathering in existing user threepids assuming that
- they used one of the server configured trusted identity servers.
- """
- id_servers = set(self.config.trusted_third_party_id_servers)
-
- def _bg_user_threepids_grandfather_txn(txn):
- sql = """
- INSERT INTO user_threepid_id_server
- (user_id, medium, address, id_server)
- SELECT user_id, medium, address, ?
- FROM user_threepids
- """
-
- txn.executemany(sql, [(id_server,) for id_server in id_servers])
-
- if id_servers:
- yield self.runInteraction(
- "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn
- )
-
- yield self._end_background_update("user_threepids_grandfather")
-
- return 1
-
def validate_threepid_session(self, session_id, client_secret, token, current_ts):
"""Attempt to validate a threepid session using a token
@@ -1463,17 +1471,6 @@ class RegistrationStore(
self.clock.time_msec(),
)
- def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
- self._simple_update_one_txn(
- txn=txn,
- table="users",
- keyvalues={"name": user_id},
- updatevalues={"deactivated": 1 if deactivated else 0},
- )
- self._invalidate_cache_and_stream(
- txn, self.get_user_deactivated_status, (user_id,)
- )
-
@defer.inlineCallbacks
def set_user_deactivated_status(self, user_id, deactivated):
"""Set the `deactivated` property for the provided user to the provided value.
@@ -1489,3 +1486,14 @@ class RegistrationStore(
user_id,
deactivated,
)
+
+ def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
+ self._simple_update_one_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"deactivated": 1 if deactivated else 0},
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_deactivated_status, (user_id,)
+ )
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 08e13f3a3b..43cc56fa6f 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,6 +17,7 @@
import collections
import logging
import re
+from typing import Optional, Tuple
from canonicaljson import json
@@ -24,6 +26,7 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.storage._base import SQLBaseStore
from synapse.storage.search import SearchStore
+from synapse.types import ThirdPartyInstanceID
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
logger = logging.getLogger(__name__)
@@ -63,103 +66,196 @@ class RoomWorkerStore(SQLBaseStore):
desc="get_public_room_ids",
)
- @cached(num_args=2, max_entries=100)
- def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
- """Get pulbic rooms for a particular list, or across all lists.
+ def count_public_rooms(self, network_tuple, ignore_non_federatable):
+ """Counts the number of public rooms as tracked in the room_stats_current
+ and room_stats_state table.
Args:
- stream_id (int)
- network_tuple (ThirdPartyInstanceID): The list to use (None, None)
- means the main list, None means all lsits.
+ network_tuple (ThirdPartyInstanceID|None)
+ ignore_non_federatable (bool): If true filters out non-federatable rooms
"""
- return self.runInteraction(
- "get_public_room_ids_at_stream_id",
- self.get_public_room_ids_at_stream_id_txn,
- stream_id,
- network_tuple=network_tuple,
- )
-
- def get_public_room_ids_at_stream_id_txn(self, txn, stream_id, network_tuple):
- return {
- rm
- for rm, vis in self.get_published_at_stream_id_txn(
- txn, stream_id, network_tuple=network_tuple
- ).items()
- if vis
- }
- def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
- if network_tuple:
- # We want to get from a particular list. No aggregation required.
+ def _count_public_rooms_txn(txn):
+ query_args = []
+
+ if network_tuple:
+ if network_tuple.appservice_id:
+ published_sql = """
+ SELECT room_id from appservice_room_list
+ WHERE appservice_id = ? AND network_id = ?
+ """
+ query_args.append(network_tuple.appservice_id)
+ query_args.append(network_tuple.network_id)
+ else:
+ published_sql = """
+ SELECT room_id FROM rooms WHERE is_public
+ """
+ else:
+ published_sql = """
+ SELECT room_id FROM rooms WHERE is_public
+ UNION SELECT room_id from appservice_room_list
+ """
sql = """
- SELECT room_id, visibility FROM public_room_list_stream
- INNER JOIN (
- SELECT room_id, max(stream_id) AS stream_id
- FROM public_room_list_stream
- WHERE stream_id <= ? %s
- GROUP BY room_id
- ) grouped USING (room_id, stream_id)
+ SELECT
+ COALESCE(COUNT(*), 0)
+ FROM (
+ %(published_sql)s
+ ) published
+ INNER JOIN room_stats_state USING (room_id)
+ INNER JOIN room_stats_current USING (room_id)
+ WHERE
+ (
+ join_rules = 'public' OR history_visibility = 'world_readable'
+ )
+ AND joined_members > 0
+ """ % {
+ "published_sql": published_sql
+ }
+
+ txn.execute(sql, query_args)
+ return txn.fetchone()[0]
+
+ return self.runInteraction("count_public_rooms", _count_public_rooms_txn)
+
+ @defer.inlineCallbacks
+ def get_largest_public_rooms(
+ self,
+ network_tuple: Optional[ThirdPartyInstanceID],
+ search_filter: Optional[dict],
+ limit: Optional[int],
+ bounds: Optional[Tuple[int, str]],
+ forwards: bool,
+ ignore_non_federatable: bool = False,
+ ):
+ """Gets the largest public rooms (where largest is in terms of joined
+ members, as tracked in the statistics table).
+
+ Args:
+ network_tuple
+ search_filter
+ limit: Maxmimum number of rows to return, unlimited otherwise.
+ bounds: An uppoer or lower bound to apply to result set if given,
+ consists of a joined member count and room_id (these are
+ excluded from result set).
+ forwards: true iff going forwards, going backwards otherwise
+ ignore_non_federatable: If true filters out non-federatable rooms.
+
+ Returns:
+ Rooms in order: biggest number of joined users first.
+ We then arbitrarily use the room_id as a tie breaker.
+
+ """
+
+ where_clauses = []
+ query_args = []
+
+ if network_tuple:
+ if network_tuple.appservice_id:
+ published_sql = """
+ SELECT room_id from appservice_room_list
+ WHERE appservice_id = ? AND network_id = ?
+ """
+ query_args.append(network_tuple.appservice_id)
+ query_args.append(network_tuple.network_id)
+ else:
+ published_sql = """
+ SELECT room_id FROM rooms WHERE is_public
+ """
+ else:
+ published_sql = """
+ SELECT room_id FROM rooms WHERE is_public
+ UNION SELECT room_id from appservice_room_list
"""
- if network_tuple.appservice_id is not None:
- txn.execute(
- sql % ("AND appservice_id = ? AND network_id = ?",),
- (stream_id, network_tuple.appservice_id, network_tuple.network_id),
+ # Work out the bounds if we're given them, these bounds look slightly
+ # odd, but are designed to help query planner use indices by pulling
+ # out a common bound.
+ if bounds:
+ last_joined_members, last_room_id = bounds
+ if forwards:
+ where_clauses.append(
+ """
+ joined_members <= ? AND (
+ joined_members < ? OR room_id < ?
+ )
+ """
)
else:
- txn.execute(sql % ("AND appservice_id IS NULL",), (stream_id,))
- return dict(txn)
- else:
- # We want to get from all lists, so we need to aggregate the results
+ where_clauses.append(
+ """
+ joined_members >= ? AND (
+ joined_members > ? OR room_id > ?
+ )
+ """
+ )
- logger.info("Executing full list")
+ query_args += [last_joined_members, last_joined_members, last_room_id]
- sql = """
- SELECT room_id, visibility
- FROM public_room_list_stream
- INNER JOIN (
- SELECT
- room_id, max(stream_id) AS stream_id, appservice_id,
- network_id
- FROM public_room_list_stream
- WHERE stream_id <= ?
- GROUP BY room_id, appservice_id, network_id
- ) grouped USING (room_id, stream_id)
- """
+ if ignore_non_federatable:
+ where_clauses.append("is_federatable")
- txn.execute(sql, (stream_id,))
+ if search_filter and search_filter.get("generic_search_term", None):
+ search_term = "%" + search_filter["generic_search_term"] + "%"
- results = {}
- # A room is visible if its visible on any list.
- for room_id, visibility in txn:
- results[room_id] = bool(visibility) or results.get(room_id, False)
+ where_clauses.append(
+ """
+ (
+ name LIKE ?
+ OR topic LIKE ?
+ OR canonical_alias LIKE ?
+ )
+ """
+ )
+ query_args += [search_term, search_term, search_term]
+
+ where_clause = ""
+ if where_clauses:
+ where_clause = " AND " + " AND ".join(where_clauses)
+
+ sql = """
+ SELECT
+ room_id, name, topic, canonical_alias, joined_members,
+ avatar, history_visibility, joined_members, guest_access
+ FROM (
+ %(published_sql)s
+ ) published
+ INNER JOIN room_stats_state USING (room_id)
+ INNER JOIN room_stats_current USING (room_id)
+ WHERE
+ (
+ join_rules = 'public' OR history_visibility = 'world_readable'
+ )
+ AND joined_members > 0
+ %(where_clause)s
+ ORDER BY joined_members %(dir)s, room_id %(dir)s
+ """ % {
+ "published_sql": published_sql,
+ "where_clause": where_clause,
+ "dir": "DESC" if forwards else "ASC",
+ }
- return results
+ if limit is not None:
+ query_args.append(limit)
- def get_public_room_changes(self, prev_stream_id, new_stream_id, network_tuple):
- def get_public_room_changes_txn(txn):
- then_rooms = self.get_public_room_ids_at_stream_id_txn(
- txn, prev_stream_id, network_tuple
- )
+ sql += """
+ LIMIT ?
+ """
- now_rooms_dict = self.get_published_at_stream_id_txn(
- txn, new_stream_id, network_tuple
- )
+ def _get_largest_public_rooms_txn(txn):
+ txn.execute(sql, query_args)
- now_rooms_visible = set(rm for rm, vis in now_rooms_dict.items() if vis)
- now_rooms_not_visible = set(
- rm for rm, vis in now_rooms_dict.items() if not vis
- )
+ results = self.cursor_to_dict(txn)
- newly_visible = now_rooms_visible - then_rooms
- newly_unpublished = now_rooms_not_visible & then_rooms
+ if not forwards:
+ results.reverse()
- return newly_visible, newly_unpublished
+ return results
- return self.runInteraction(
- "get_public_room_changes", get_public_room_changes_txn
+ ret_val = yield self.runInteraction(
+ "get_largest_public_rooms", _get_largest_public_rooms_txn
)
+ defer.returnValue(ret_val)
@cached(max_entries=10000)
def is_room_blocked(self, room_id):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 4df8ebdacd..4e606a8380 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -27,12 +27,14 @@ from synapse.api.constants import EventTypes, Membership
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction
+from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import Sqlite3Engine
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
+from synapse.util.metrics import Measure
from synapse.util.stringutils import to_ascii
logger = logging.getLogger(__name__)
@@ -483,6 +485,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
)
return result
+ @defer.inlineCallbacks
def get_joined_users_from_state(self, room_id, state_entry):
state_group = state_entry.state_group
if not state_group:
@@ -492,9 +495,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# To do this we set the state_group to a new object as object() != object()
state_group = object()
- return self._get_joined_users_from_context(
- room_id, state_group, state_entry.state, context=state_entry
- )
+ with Measure(self._clock, "get_joined_users_from_state"):
+ return (
+ yield self._get_joined_users_from_context(
+ room_id, state_group, state_entry.state, context=state_entry
+ )
+ )
@cachedInlineCallbacks(
num_args=2, cache_context=True, iterable=True, max_entries=100000
@@ -567,25 +573,10 @@ class RoomMemberWorkerStore(EventsWorkerStore):
missing_member_event_ids.append(event_id)
if missing_member_event_ids:
- rows = yield self._simple_select_many_batch(
- table="room_memberships",
- column="event_id",
- iterable=missing_member_event_ids,
- retcols=("user_id", "display_name", "avatar_url"),
- keyvalues={"membership": Membership.JOIN},
- batch_size=500,
- desc="_get_joined_users_from_context",
- )
-
- users_in_room.update(
- {
- to_ascii(row["user_id"]): ProfileInfo(
- avatar_url=to_ascii(row["avatar_url"]),
- display_name=to_ascii(row["display_name"]),
- )
- for row in rows
- }
+ event_to_memberships = yield self._get_joined_profiles_from_event_ids(
+ missing_member_event_ids
)
+ users_in_room.update((row for row in event_to_memberships.values() if row))
if event is not None and event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
@@ -597,6 +588,47 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return users_in_room
+ @cached(max_entries=10000)
+ def _get_joined_profile_from_event_id(self, event_id):
+ raise NotImplementedError()
+
+ @cachedList(
+ cached_method_name="_get_joined_profile_from_event_id",
+ list_name="event_ids",
+ inlineCallbacks=True,
+ )
+ def _get_joined_profiles_from_event_ids(self, event_ids):
+ """For given set of member event_ids check if they point to a join
+ event and if so return the associated user and profile info.
+
+ Args:
+ event_ids (Iterable[str]): The member event IDs to lookup
+
+ Returns:
+ Deferred[dict[str, Tuple[str, ProfileInfo]|None]]: Map from event ID
+ to `user_id` and ProfileInfo (or None if not join event).
+ """
+
+ rows = yield self._simple_select_many_batch(
+ table="room_memberships",
+ column="event_id",
+ iterable=event_ids,
+ retcols=("user_id", "display_name", "avatar_url", "event_id"),
+ keyvalues={"membership": Membership.JOIN},
+ batch_size=500,
+ desc="_get_membership_from_event_ids",
+ )
+
+ return {
+ row["event_id"]: (
+ row["user_id"],
+ ProfileInfo(
+ avatar_url=row["avatar_url"], display_name=row["display_name"]
+ ),
+ )
+ for row in rows
+ }
+
@cachedInlineCallbacks(max_entries=10000)
def is_host_joined(self, room_id, host):
if "%" in host or "_" in host:
@@ -669,6 +701,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return True
+ @defer.inlineCallbacks
def get_joined_hosts(self, room_id, state_entry):
state_group = state_entry.state_group
if not state_group:
@@ -678,9 +711,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# To do this we set the state_group to a new object as object() != object()
state_group = object()
- return self._get_joined_hosts(
- room_id, state_group, state_entry.state, state_entry=state_entry
- )
+ with Measure(self._clock, "get_joined_hosts"):
+ return (
+ yield self._get_joined_hosts(
+ room_id, state_group, state_entry.state, state_entry=state_entry
+ )
+ )
@cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
# @defer.inlineCallbacks
@@ -785,9 +821,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return set(room_ids)
-class RoomMemberStore(RoomMemberWorkerStore):
+class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
def __init__(self, db_conn, hs):
- super(RoomMemberStore, self).__init__(db_conn, hs)
+ super(RoomMemberBackgroundUpdateStore, self).__init__(db_conn, hs)
self.register_background_update_handler(
_MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
)
@@ -803,112 +839,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
where_clause="forgotten = 1",
)
- def _store_room_members_txn(self, txn, events, backfilled):
- """Store a room member in the database.
- """
- self._simple_insert_many_txn(
- txn,
- table="room_memberships",
- values=[
- {
- "event_id": event.event_id,
- "user_id": event.state_key,
- "sender": event.user_id,
- "room_id": event.room_id,
- "membership": event.membership,
- "display_name": event.content.get("displayname", None),
- "avatar_url": event.content.get("avatar_url", None),
- }
- for event in events
- ],
- )
-
- for event in events:
- txn.call_after(
- self._membership_stream_cache.entity_has_changed,
- event.state_key,
- event.internal_metadata.stream_ordering,
- )
- txn.call_after(
- self.get_invited_rooms_for_user.invalidate, (event.state_key,)
- )
-
- # We update the local_invites table only if the event is "current",
- # i.e., its something that has just happened. If the event is an
- # outlier it is only current if its an "out of band membership",
- # like a remote invite or a rejection of a remote invite.
- is_new_state = not backfilled and (
- not event.internal_metadata.is_outlier()
- or event.internal_metadata.is_out_of_band_membership()
- )
- is_mine = self.hs.is_mine_id(event.state_key)
- if is_new_state and is_mine:
- if event.membership == Membership.INVITE:
- self._simple_insert_txn(
- txn,
- table="local_invites",
- values={
- "event_id": event.event_id,
- "invitee": event.state_key,
- "inviter": event.sender,
- "room_id": event.room_id,
- "stream_id": event.internal_metadata.stream_ordering,
- },
- )
- else:
- sql = (
- "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE"
- " room_id = ? AND invitee = ? AND locally_rejected is NULL"
- " AND replaced_by is NULL"
- )
-
- txn.execute(
- sql,
- (
- event.internal_metadata.stream_ordering,
- event.event_id,
- event.room_id,
- event.state_key,
- ),
- )
-
- @defer.inlineCallbacks
- def locally_reject_invite(self, user_id, room_id):
- sql = (
- "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
- " room_id = ? AND invitee = ? AND locally_rejected is NULL"
- " AND replaced_by is NULL"
- )
-
- def f(txn, stream_ordering):
- txn.execute(sql, (stream_ordering, True, room_id, user_id))
-
- with self._stream_id_gen.get_next() as stream_ordering:
- yield self.runInteraction("locally_reject_invite", f, stream_ordering)
-
- def forget(self, user_id, room_id):
- """Indicate that user_id wishes to discard history for room_id."""
-
- def f(txn):
- sql = (
- "UPDATE"
- " room_memberships"
- " SET"
- " forgotten = 1"
- " WHERE"
- " user_id = ?"
- " AND"
- " room_id = ?"
- )
- txn.execute(sql, (user_id, room_id))
-
- self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
- self._invalidate_cache_and_stream(
- txn, self.get_forgotten_rooms_for_user, (user_id,)
- )
-
- return self.runInteraction("forget_membership", f)
-
@defer.inlineCallbacks
def _background_add_membership_profile(self, progress, batch_size):
target_min_stream_id = progress.get(
@@ -1043,6 +973,117 @@ class RoomMemberStore(RoomMemberWorkerStore):
return row_count
+class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
+ def __init__(self, db_conn, hs):
+ super(RoomMemberStore, self).__init__(db_conn, hs)
+
+ def _store_room_members_txn(self, txn, events, backfilled):
+ """Store a room member in the database.
+ """
+ self._simple_insert_many_txn(
+ txn,
+ table="room_memberships",
+ values=[
+ {
+ "event_id": event.event_id,
+ "user_id": event.state_key,
+ "sender": event.user_id,
+ "room_id": event.room_id,
+ "membership": event.membership,
+ "display_name": event.content.get("displayname", None),
+ "avatar_url": event.content.get("avatar_url", None),
+ }
+ for event in events
+ ],
+ )
+
+ for event in events:
+ txn.call_after(
+ self._membership_stream_cache.entity_has_changed,
+ event.state_key,
+ event.internal_metadata.stream_ordering,
+ )
+ txn.call_after(
+ self.get_invited_rooms_for_user.invalidate, (event.state_key,)
+ )
+
+ # We update the local_invites table only if the event is "current",
+ # i.e., its something that has just happened. If the event is an
+ # outlier it is only current if its an "out of band membership",
+ # like a remote invite or a rejection of a remote invite.
+ is_new_state = not backfilled and (
+ not event.internal_metadata.is_outlier()
+ or event.internal_metadata.is_out_of_band_membership()
+ )
+ is_mine = self.hs.is_mine_id(event.state_key)
+ if is_new_state and is_mine:
+ if event.membership == Membership.INVITE:
+ self._simple_insert_txn(
+ txn,
+ table="local_invites",
+ values={
+ "event_id": event.event_id,
+ "invitee": event.state_key,
+ "inviter": event.sender,
+ "room_id": event.room_id,
+ "stream_id": event.internal_metadata.stream_ordering,
+ },
+ )
+ else:
+ sql = (
+ "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE"
+ " room_id = ? AND invitee = ? AND locally_rejected is NULL"
+ " AND replaced_by is NULL"
+ )
+
+ txn.execute(
+ sql,
+ (
+ event.internal_metadata.stream_ordering,
+ event.event_id,
+ event.room_id,
+ event.state_key,
+ ),
+ )
+
+ @defer.inlineCallbacks
+ def locally_reject_invite(self, user_id, room_id):
+ sql = (
+ "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
+ " room_id = ? AND invitee = ? AND locally_rejected is NULL"
+ " AND replaced_by is NULL"
+ )
+
+ def f(txn, stream_ordering):
+ txn.execute(sql, (stream_ordering, True, room_id, user_id))
+
+ with self._stream_id_gen.get_next() as stream_ordering:
+ yield self.runInteraction("locally_reject_invite", f, stream_ordering)
+
+ def forget(self, user_id, room_id):
+ """Indicate that user_id wishes to discard history for room_id."""
+
+ def f(txn):
+ sql = (
+ "UPDATE"
+ " room_memberships"
+ " SET"
+ " forgotten = 1"
+ " WHERE"
+ " user_id = ?"
+ " AND"
+ " room_id = ?"
+ )
+ txn.execute(sql, (user_id, room_id))
+
+ self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
+ self._invalidate_cache_and_stream(
+ txn, self.get_forgotten_rooms_for_user, (user_id,)
+ )
+
+ return self.runInteraction("forget_membership", f)
+
+
class _JoinedHostsCache(object):
"""Cache for joined hosts in a room that is optimised to handle updates
via state deltas.
diff --git a/synapse/storage/schema/delta/56/destinations_retry_interval_type.sql.postgres b/synapse/storage/schema/delta/56/destinations_retry_interval_type.sql.postgres
new file mode 100644
index 0000000000..b9bbb18a91
--- /dev/null
+++ b/synapse/storage/schema/delta/56/destinations_retry_interval_type.sql.postgres
@@ -0,0 +1,18 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- We want to store large retry intervals so we upgrade the column from INT
+-- to BIGINT. We don't need to do this on SQLite.
+ALTER TABLE destinations ALTER retry_interval SET DATA TYPE BIGINT;
diff --git a/synapse/storage/schema/delta/56/drop_unused_event_tables.sql b/synapse/storage/schema/delta/56/drop_unused_event_tables.sql
new file mode 100644
index 0000000000..9f09922c67
--- /dev/null
+++ b/synapse/storage/schema/delta/56/drop_unused_event_tables.sql
@@ -0,0 +1,20 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- these tables are never used.
+DROP TABLE IF EXISTS room_names;
+DROP TABLE IF EXISTS topics;
+DROP TABLE IF EXISTS history_visibility;
+DROP TABLE IF EXISTS guest_access;
diff --git a/synapse/storage/schema/delta/56/public_room_list_idx.sql b/synapse/storage/schema/delta/56/public_room_list_idx.sql
new file mode 100644
index 0000000000..7be31ffebb
--- /dev/null
+++ b/synapse/storage/schema/delta/56/public_room_list_idx.sql
@@ -0,0 +1,16 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE INDEX public_room_list_stream_network ON public_room_list_stream (appservice_id, network_id, room_id);
diff --git a/synapse/storage/schema/delta/56/redaction_censor2.sql b/synapse/storage/schema/delta/56/redaction_censor2.sql
new file mode 100644
index 0000000000..77a5eca499
--- /dev/null
+++ b/synapse/storage/schema/delta/56/redaction_censor2.sql
@@ -0,0 +1,20 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE redactions ADD COLUMN received_ts BIGINT;
+CREATE INDEX redactions_have_censored_ts ON redactions(received_ts) WHERE not have_censored;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('redactions_received_ts', '{}');
diff --git a/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres b/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres
new file mode 100644
index 0000000000..f7bcc5e2f2
--- /dev/null
+++ b/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres
@@ -0,0 +1,26 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- There was a bug where we may have updated censored redactions as bytes,
+-- which can (somehow) cause json to be inserted hex encoded. This goes and
+-- undoes any such hex encoded JSON.
+UPDATE event_json SET json = convert_from(json::bytea, 'utf8')
+WHERE event_id IN (
+ SELECT event_json.event_id
+ FROM event_json
+ INNER JOIN redactions ON (event_json.event_id = redacts)
+ WHERE have_censored AND json NOT LIKE '{%'
+);
diff --git a/synapse/storage/schema/delta/56/unique_user_filter_index.py b/synapse/storage/schema/delta/56/unique_user_filter_index.py
new file mode 100644
index 0000000000..60031f23ca
--- /dev/null
+++ b/synapse/storage/schema/delta/56/unique_user_filter_index.py
@@ -0,0 +1,46 @@
+import logging
+
+from synapse.storage.engines import PostgresEngine
+
+logger = logging.getLogger(__name__)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+ if isinstance(database_engine, PostgresEngine):
+ select_clause = """
+ CREATE TEMPORARY TABLE user_filters_migration AS
+ SELECT DISTINCT ON (user_id, filter_id) user_id, filter_id, filter_json
+ FROM user_filters;
+ """
+ else:
+ select_clause = """
+ CREATE TEMPORARY TABLE user_filters_migration AS
+ SELECT * FROM user_filters GROUP BY user_id, filter_id;
+ """
+ sql = (
+ """
+ BEGIN;
+ %s
+ DROP INDEX user_filters_by_user_id_filter_id;
+ DELETE FROM user_filters;
+ ALTER TABLE user_filters
+ ALTER COLUMN user_id SET NOT NULL,
+ ALTER COLUMN filter_id SET NOT NULL,
+ ALTER COLUMN filter_json SET NOT NULL;
+ INSERT INTO user_filters(user_id, filter_id, filter_json)
+ SELECT * FROM user_filters_migration;
+ DROP TABLE user_filters_migration;
+ CREATE UNIQUE INDEX user_filters_by_user_id_filter_id_unique
+ ON user_filters(user_id, filter_id);
+ END;
+ """
+ % select_clause
+ )
+ if isinstance(database_engine, PostgresEngine):
+ cur.execute(sql)
+ else:
+ cur.executescript(sql)
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+ pass
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index df87ab6a6d..6ba4190f1a 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -36,7 +36,7 @@ SearchEntry = namedtuple(
)
-class SearchStore(BackgroundUpdateStore):
+class SearchBackgroundUpdateStore(BackgroundUpdateStore):
EVENT_SEARCH_UPDATE_NAME = "event_search"
EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
@@ -44,7 +44,7 @@ class SearchStore(BackgroundUpdateStore):
EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin"
def __init__(self, db_conn, hs):
- super(SearchStore, self).__init__(db_conn, hs)
+ super(SearchBackgroundUpdateStore, self).__init__(db_conn, hs)
if not hs.config.enable_search:
return
@@ -289,29 +289,6 @@ class SearchStore(BackgroundUpdateStore):
return num_rows
- def store_event_search_txn(self, txn, event, key, value):
- """Add event to the search table
-
- Args:
- txn (cursor):
- event (EventBase):
- key (str):
- value (str):
- """
- self.store_search_entries_txn(
- txn,
- (
- SearchEntry(
- key=key,
- value=value,
- event_id=event.event_id,
- room_id=event.room_id,
- stream_ordering=event.internal_metadata.stream_ordering,
- origin_server_ts=event.origin_server_ts,
- ),
- ),
- )
-
def store_search_entries_txn(self, txn, entries):
"""Add entries to the search table
@@ -358,6 +335,34 @@ class SearchStore(BackgroundUpdateStore):
# This should be unreachable.
raise Exception("Unrecognized database engine")
+
+class SearchStore(SearchBackgroundUpdateStore):
+ def __init__(self, db_conn, hs):
+ super(SearchStore, self).__init__(db_conn, hs)
+
+ def store_event_search_txn(self, txn, event, key, value):
+ """Add event to the search table
+
+ Args:
+ txn (cursor):
+ event (EventBase):
+ key (str):
+ value (str):
+ """
+ self.store_search_entries_txn(
+ txn,
+ (
+ SearchEntry(
+ key=key,
+ value=value,
+ event_id=event.event_id,
+ room_id=event.room_id,
+ stream_ordering=event.internal_metadata.stream_ordering,
+ origin_server_ts=event.origin_server_ts,
+ ),
+ ),
+ )
+
@defer.inlineCallbacks
def search_msgs(self, room_ids, search_term, keys):
"""Performs a full text search over events with given keys.
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 1980a87108..a941a5ae3f 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -353,8 +353,158 @@ class StateFilter(object):
return member_filter, non_member_filter
+class StateGroupBackgroundUpdateStore(SQLBaseStore):
+ """Defines functions related to state groups needed to run the state backgroud
+ updates.
+ """
+
+ def _count_state_group_hops_txn(self, txn, state_group):
+ """Given a state group, count how many hops there are in the tree.
+
+ This is used to ensure the delta chains don't get too long.
+ """
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = """
+ WITH RECURSIVE state(state_group) AS (
+ VALUES(?::bigint)
+ UNION ALL
+ SELECT prev_state_group FROM state_group_edges e, state s
+ WHERE s.state_group = e.state_group
+ )
+ SELECT count(*) FROM state;
+ """
+
+ txn.execute(sql, (state_group,))
+ row = txn.fetchone()
+ if row and row[0]:
+ return row[0]
+ else:
+ return 0
+ else:
+ # We don't use WITH RECURSIVE on sqlite3 as there are distributions
+ # that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
+ next_group = state_group
+ count = 0
+
+ while next_group:
+ next_group = self._simple_select_one_onecol_txn(
+ txn,
+ table="state_group_edges",
+ keyvalues={"state_group": next_group},
+ retcol="prev_state_group",
+ allow_none=True,
+ )
+ if next_group:
+ count += 1
+
+ return count
+
+ def _get_state_groups_from_groups_txn(
+ self, txn, groups, state_filter=StateFilter.all()
+ ):
+ results = {group: {} for group in groups}
+
+ where_clause, where_args = state_filter.make_sql_filter_clause()
+
+ # Unless the filter clause is empty, we're going to append it after an
+ # existing where clause
+ if where_clause:
+ where_clause = " AND (%s)" % (where_clause,)
+
+ if isinstance(self.database_engine, PostgresEngine):
+ # Temporarily disable sequential scans in this transaction. This is
+ # a temporary hack until we can add the right indices in
+ txn.execute("SET LOCAL enable_seqscan=off")
+
+ # The below query walks the state_group tree so that the "state"
+ # table includes all state_groups in the tree. It then joins
+ # against `state_groups_state` to fetch the latest state.
+ # It assumes that previous state groups are always numerically
+ # lesser.
+ # The PARTITION is used to get the event_id in the greatest state
+ # group for the given type, state_key.
+ # This may return multiple rows per (type, state_key), but last_value
+ # should be the same.
+ sql = """
+ WITH RECURSIVE state(state_group) AS (
+ VALUES(?::bigint)
+ UNION ALL
+ SELECT prev_state_group FROM state_group_edges e, state s
+ WHERE s.state_group = e.state_group
+ )
+ SELECT DISTINCT type, state_key, last_value(event_id) OVER (
+ PARTITION BY type, state_key ORDER BY state_group ASC
+ ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
+ ) AS event_id FROM state_groups_state
+ WHERE state_group IN (
+ SELECT state_group FROM state
+ )
+ """
+
+ for group in groups:
+ args = [group]
+ args.extend(where_args)
+
+ txn.execute(sql + where_clause, args)
+ for row in txn:
+ typ, state_key, event_id = row
+ key = (typ, state_key)
+ results[group][key] = event_id
+ else:
+ max_entries_returned = state_filter.max_entries_returned()
+
+ # We don't use WITH RECURSIVE on sqlite3 as there are distributions
+ # that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
+ for group in groups:
+ next_group = group
+
+ while next_group:
+ # We did this before by getting the list of group ids, and
+ # then passing that list to sqlite to get latest event for
+ # each (type, state_key). However, that was terribly slow
+ # without the right indices (which we can't add until
+ # after we finish deduping state, which requires this func)
+ args = [next_group]
+ args.extend(where_args)
+
+ txn.execute(
+ "SELECT type, state_key, event_id FROM state_groups_state"
+ " WHERE state_group = ? " + where_clause,
+ args,
+ )
+ results[group].update(
+ ((typ, state_key), event_id)
+ for typ, state_key, event_id in txn
+ if (typ, state_key) not in results[group]
+ )
+
+ # If the number of entries in the (type,state_key)->event_id dict
+ # matches the number of (type,state_keys) types we were searching
+ # for, then we must have found them all, so no need to go walk
+ # further down the tree... UNLESS our types filter contained
+ # wildcards (i.e. Nones) in which case we have to do an exhaustive
+ # search
+ if (
+ max_entries_returned is not None
+ and len(results[group]) == max_entries_returned
+ ):
+ break
+
+ next_group = self._simple_select_one_onecol_txn(
+ txn,
+ table="state_group_edges",
+ keyvalues={"state_group": next_group},
+ retcol="prev_state_group",
+ allow_none=True,
+ )
+
+ return results
+
+
# this inherits from EventsWorkerStore because it calls self.get_events
-class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
+class StateGroupWorkerStore(
+ EventsWorkerStore, StateGroupBackgroundUpdateStore, SQLBaseStore
+):
"""The parts of StateGroupStore that can be called from workers.
"""
@@ -694,107 +844,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
return results
- def _get_state_groups_from_groups_txn(
- self, txn, groups, state_filter=StateFilter.all()
- ):
- results = {group: {} for group in groups}
-
- where_clause, where_args = state_filter.make_sql_filter_clause()
-
- # Unless the filter clause is empty, we're going to append it after an
- # existing where clause
- if where_clause:
- where_clause = " AND (%s)" % (where_clause,)
-
- if isinstance(self.database_engine, PostgresEngine):
- # Temporarily disable sequential scans in this transaction. This is
- # a temporary hack until we can add the right indices in
- txn.execute("SET LOCAL enable_seqscan=off")
-
- # The below query walks the state_group tree so that the "state"
- # table includes all state_groups in the tree. It then joins
- # against `state_groups_state` to fetch the latest state.
- # It assumes that previous state groups are always numerically
- # lesser.
- # The PARTITION is used to get the event_id in the greatest state
- # group for the given type, state_key.
- # This may return multiple rows per (type, state_key), but last_value
- # should be the same.
- sql = """
- WITH RECURSIVE state(state_group) AS (
- VALUES(?::bigint)
- UNION ALL
- SELECT prev_state_group FROM state_group_edges e, state s
- WHERE s.state_group = e.state_group
- )
- SELECT DISTINCT type, state_key, last_value(event_id) OVER (
- PARTITION BY type, state_key ORDER BY state_group ASC
- ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
- ) AS event_id FROM state_groups_state
- WHERE state_group IN (
- SELECT state_group FROM state
- )
- """
-
- for group in groups:
- args = [group]
- args.extend(where_args)
-
- txn.execute(sql + where_clause, args)
- for row in txn:
- typ, state_key, event_id = row
- key = (typ, state_key)
- results[group][key] = event_id
- else:
- max_entries_returned = state_filter.max_entries_returned()
-
- # We don't use WITH RECURSIVE on sqlite3 as there are distributions
- # that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
- for group in groups:
- next_group = group
-
- while next_group:
- # We did this before by getting the list of group ids, and
- # then passing that list to sqlite to get latest event for
- # each (type, state_key). However, that was terribly slow
- # without the right indices (which we can't add until
- # after we finish deduping state, which requires this func)
- args = [next_group]
- args.extend(where_args)
-
- txn.execute(
- "SELECT type, state_key, event_id FROM state_groups_state"
- " WHERE state_group = ? " + where_clause,
- args,
- )
- results[group].update(
- ((typ, state_key), event_id)
- for typ, state_key, event_id in txn
- if (typ, state_key) not in results[group]
- )
-
- # If the number of entries in the (type,state_key)->event_id dict
- # matches the number of (type,state_keys) types we were searching
- # for, then we must have found them all, so no need to go walk
- # further down the tree... UNLESS our types filter contained
- # wildcards (i.e. Nones) in which case we have to do an exhaustive
- # search
- if (
- max_entries_returned is not None
- and len(results[group]) == max_entries_returned
- ):
- break
-
- next_group = self._simple_select_one_onecol_txn(
- txn,
- table="state_group_edges",
- keyvalues={"state_group": next_group},
- retcol="prev_state_group",
- allow_none=True,
- )
-
- return results
-
@defer.inlineCallbacks
def get_state_for_events(self, event_ids, state_filter=StateFilter.all()):
"""Given a list of event_ids and type tuples, return a list of state
@@ -1238,66 +1287,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
return self.runInteraction("store_state_group", _store_state_group_txn)
- def _count_state_group_hops_txn(self, txn, state_group):
- """Given a state group, count how many hops there are in the tree.
-
- This is used to ensure the delta chains don't get too long.
- """
- if isinstance(self.database_engine, PostgresEngine):
- sql = """
- WITH RECURSIVE state(state_group) AS (
- VALUES(?::bigint)
- UNION ALL
- SELECT prev_state_group FROM state_group_edges e, state s
- WHERE s.state_group = e.state_group
- )
- SELECT count(*) FROM state;
- """
-
- txn.execute(sql, (state_group,))
- row = txn.fetchone()
- if row and row[0]:
- return row[0]
- else:
- return 0
- else:
- # We don't use WITH RECURSIVE on sqlite3 as there are distributions
- # that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
- next_group = state_group
- count = 0
-
- while next_group:
- next_group = self._simple_select_one_onecol_txn(
- txn,
- table="state_group_edges",
- keyvalues={"state_group": next_group},
- retcol="prev_state_group",
- allow_none=True,
- )
- if next_group:
- count += 1
-
- return count
-
-
-class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
- """ Keeps track of the state at a given event.
-
- This is done by the concept of `state groups`. Every event is a assigned
- a state group (identified by an arbitrary string), which references a
- collection of state events. The current state of an event is then the
- collection of state events referenced by the event's state group.
- Hence, every change in the current state causes a new state group to be
- generated. However, if no change happens (e.g., if we get a message event
- with only one parent it inherits the state group from its parent.)
-
- There are three tables:
- * `state_groups`: Stores group name, first event with in the group and
- room id.
- * `event_to_state_groups`: Maps events to state groups.
- * `state_groups_state`: Maps state group to state events.
- """
+class StateBackgroundUpdateStore(
+ StateGroupBackgroundUpdateStore, BackgroundUpdateStore
+):
STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
@@ -1305,7 +1298,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
def __init__(self, db_conn, hs):
- super(StateStore, self).__init__(db_conn, hs)
+ super(StateBackgroundUpdateStore, self).__init__(db_conn, hs)
self.register_background_update_handler(
self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME,
self._background_deduplicate_state,
@@ -1327,34 +1320,6 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
columns=["state_group"],
)
- def _store_event_state_mappings_txn(self, txn, events_and_contexts):
- state_groups = {}
- for event, context in events_and_contexts:
- if event.internal_metadata.is_outlier():
- continue
-
- # if the event was rejected, just give it the same state as its
- # predecessor.
- if context.rejected:
- state_groups[event.event_id] = context.prev_group
- continue
-
- state_groups[event.event_id] = context.state_group
-
- self._simple_insert_many_txn(
- txn,
- table="event_to_state_groups",
- values=[
- {"state_group": state_group_id, "event_id": event_id}
- for event_id, state_group_id in iteritems(state_groups)
- ],
- )
-
- for event_id, state_group_id in iteritems(state_groups):
- txn.call_after(
- self._get_state_group_for_event.prefill, (event_id,), state_group_id
- )
-
@defer.inlineCallbacks
def _background_deduplicate_state(self, progress, batch_size):
"""This background update will slowly deduplicate state by reencoding
@@ -1527,3 +1492,54 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
yield self._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME)
return 1
+
+
+class StateStore(StateGroupWorkerStore, StateBackgroundUpdateStore):
+ """ Keeps track of the state at a given event.
+
+ This is done by the concept of `state groups`. Every event is a assigned
+ a state group (identified by an arbitrary string), which references a
+ collection of state events. The current state of an event is then the
+ collection of state events referenced by the event's state group.
+
+ Hence, every change in the current state causes a new state group to be
+ generated. However, if no change happens (e.g., if we get a message event
+ with only one parent it inherits the state group from its parent.)
+
+ There are three tables:
+ * `state_groups`: Stores group name, first event with in the group and
+ room id.
+ * `event_to_state_groups`: Maps events to state groups.
+ * `state_groups_state`: Maps state group to state events.
+ """
+
+ def __init__(self, db_conn, hs):
+ super(StateStore, self).__init__(db_conn, hs)
+
+ def _store_event_state_mappings_txn(self, txn, events_and_contexts):
+ state_groups = {}
+ for event, context in events_and_contexts:
+ if event.internal_metadata.is_outlier():
+ continue
+
+ # if the event was rejected, just give it the same state as its
+ # predecessor.
+ if context.rejected:
+ state_groups[event.event_id] = context.prev_group
+ continue
+
+ state_groups[event.event_id] = context.state_group
+
+ self._simple_insert_many_txn(
+ txn,
+ table="event_to_state_groups",
+ values=[
+ {"state_group": state_group_id, "event_id": event_id}
+ for event_id, state_group_id in iteritems(state_groups)
+ ],
+ )
+
+ for event_id, state_group_id in iteritems(state_groups):
+ txn.call_after(
+ self._get_state_group_for_event.prefill, (event_id,), state_group_id
+ )
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 09190d684e..7c224cd3d9 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -332,6 +332,9 @@ class StatsStore(StateDeltasStore):
def _bulk_update_stats_delta_txn(txn):
for stats_type, stats_updates in updates.items():
for stats_id, fields in stats_updates.items():
+ logger.info(
+ "Updating %s stats for %s: %s", stats_type, stats_id, fields
+ )
self._update_stats_delta_txn(
txn,
ts=ts,
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index b5188d9bee..1b1e4751b9 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -32,14 +32,14 @@ logger = logging.getLogger(__name__)
TEMP_TABLE = "_temp_populate_user_directory"
-class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
+class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore):
# How many records do we calculate before sending it to
# add_users_who_share_private_rooms?
SHARE_PRIVATE_WORKING_SET = 500
def __init__(self, db_conn, hs):
- super(UserDirectoryStore, self).__init__(db_conn, hs)
+ super(UserDirectoryBackgroundUpdateStore, self).__init__(db_conn, hs)
self.server_name = hs.hostname
@@ -452,55 +452,6 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
"update_profile_in_user_dir", _update_profile_in_user_dir_txn
)
- def remove_from_user_dir(self, user_id):
- def _remove_from_user_dir_txn(txn):
- self._simple_delete_txn(
- txn, table="user_directory", keyvalues={"user_id": user_id}
- )
- self._simple_delete_txn(
- txn, table="user_directory_search", keyvalues={"user_id": user_id}
- )
- self._simple_delete_txn(
- txn, table="users_in_public_rooms", keyvalues={"user_id": user_id}
- )
- self._simple_delete_txn(
- txn,
- table="users_who_share_private_rooms",
- keyvalues={"user_id": user_id},
- )
- self._simple_delete_txn(
- txn,
- table="users_who_share_private_rooms",
- keyvalues={"other_user_id": user_id},
- )
- txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
-
- return self.runInteraction("remove_from_user_dir", _remove_from_user_dir_txn)
-
- @defer.inlineCallbacks
- def get_users_in_dir_due_to_room(self, room_id):
- """Get all user_ids that are in the room directory because they're
- in the given room_id
- """
- user_ids_share_pub = yield self._simple_select_onecol(
- table="users_in_public_rooms",
- keyvalues={"room_id": room_id},
- retcol="user_id",
- desc="get_users_in_dir_due_to_room",
- )
-
- user_ids_share_priv = yield self._simple_select_onecol(
- table="users_who_share_private_rooms",
- keyvalues={"room_id": room_id},
- retcol="other_user_id",
- desc="get_users_in_dir_due_to_room",
- )
-
- user_ids = set(user_ids_share_pub)
- user_ids.update(user_ids_share_priv)
-
- return user_ids
-
def add_users_who_share_private_room(self, room_id, user_id_tuples):
"""Insert entries into the users_who_share_private_rooms table. The first
user should be a local user.
@@ -551,6 +502,98 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
"add_users_in_public_rooms", _add_users_in_public_rooms_txn
)
+ def delete_all_from_user_dir(self):
+ """Delete the entire user directory
+ """
+
+ def _delete_all_from_user_dir_txn(txn):
+ txn.execute("DELETE FROM user_directory")
+ txn.execute("DELETE FROM user_directory_search")
+ txn.execute("DELETE FROM users_in_public_rooms")
+ txn.execute("DELETE FROM users_who_share_private_rooms")
+ txn.call_after(self.get_user_in_directory.invalidate_all)
+
+ return self.runInteraction(
+ "delete_all_from_user_dir", _delete_all_from_user_dir_txn
+ )
+
+ @cached()
+ def get_user_in_directory(self, user_id):
+ return self._simple_select_one(
+ table="user_directory",
+ keyvalues={"user_id": user_id},
+ retcols=("display_name", "avatar_url"),
+ allow_none=True,
+ desc="get_user_in_directory",
+ )
+
+ def update_user_directory_stream_pos(self, stream_id):
+ return self._simple_update_one(
+ table="user_directory_stream_pos",
+ keyvalues={},
+ updatevalues={"stream_id": stream_id},
+ desc="update_user_directory_stream_pos",
+ )
+
+
+class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
+
+ # How many records do we calculate before sending it to
+ # add_users_who_share_private_rooms?
+ SHARE_PRIVATE_WORKING_SET = 500
+
+ def __init__(self, db_conn, hs):
+ super(UserDirectoryStore, self).__init__(db_conn, hs)
+
+ def remove_from_user_dir(self, user_id):
+ def _remove_from_user_dir_txn(txn):
+ self._simple_delete_txn(
+ txn, table="user_directory", keyvalues={"user_id": user_id}
+ )
+ self._simple_delete_txn(
+ txn, table="user_directory_search", keyvalues={"user_id": user_id}
+ )
+ self._simple_delete_txn(
+ txn, table="users_in_public_rooms", keyvalues={"user_id": user_id}
+ )
+ self._simple_delete_txn(
+ txn,
+ table="users_who_share_private_rooms",
+ keyvalues={"user_id": user_id},
+ )
+ self._simple_delete_txn(
+ txn,
+ table="users_who_share_private_rooms",
+ keyvalues={"other_user_id": user_id},
+ )
+ txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
+
+ return self.runInteraction("remove_from_user_dir", _remove_from_user_dir_txn)
+
+ @defer.inlineCallbacks
+ def get_users_in_dir_due_to_room(self, room_id):
+ """Get all user_ids that are in the room directory because they're
+ in the given room_id
+ """
+ user_ids_share_pub = yield self._simple_select_onecol(
+ table="users_in_public_rooms",
+ keyvalues={"room_id": room_id},
+ retcol="user_id",
+ desc="get_users_in_dir_due_to_room",
+ )
+
+ user_ids_share_priv = yield self._simple_select_onecol(
+ table="users_who_share_private_rooms",
+ keyvalues={"room_id": room_id},
+ retcol="other_user_id",
+ desc="get_users_in_dir_due_to_room",
+ )
+
+ user_ids = set(user_ids_share_pub)
+ user_ids.update(user_ids_share_priv)
+
+ return user_ids
+
def remove_user_who_share_room(self, user_id, room_id):
"""
Deletes entries in the users_who_share_*_rooms table. The first
@@ -637,31 +680,6 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
return [room_id for room_id, in rows]
- def delete_all_from_user_dir(self):
- """Delete the entire user directory
- """
-
- def _delete_all_from_user_dir_txn(txn):
- txn.execute("DELETE FROM user_directory")
- txn.execute("DELETE FROM user_directory_search")
- txn.execute("DELETE FROM users_in_public_rooms")
- txn.execute("DELETE FROM users_who_share_private_rooms")
- txn.call_after(self.get_user_in_directory.invalidate_all)
-
- return self.runInteraction(
- "delete_all_from_user_dir", _delete_all_from_user_dir_txn
- )
-
- @cached()
- def get_user_in_directory(self, user_id):
- return self._simple_select_one(
- table="user_directory",
- keyvalues={"user_id": user_id},
- retcols=("display_name", "avatar_url"),
- allow_none=True,
- desc="get_user_in_directory",
- )
-
def get_user_directory_stream_pos(self):
return self._simple_select_one_onecol(
table="user_directory_stream_pos",
@@ -670,14 +688,6 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
desc="get_user_directory_stream_pos",
)
- def update_user_directory_stream_pos(self, stream_id):
- return self._simple_update_one(
- table="user_directory_stream_pos",
- keyvalues={},
- updatevalues={"stream_id": stream_id},
- desc="update_user_directory_stream_pos",
- )
-
@defer.inlineCallbacks
def search_user_dir(self, user_id, search_term, limit):
"""Searches for users in directory
|