diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 489ce82fae..abe16334ec 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -1395,14 +1395,22 @@ class SQLBaseStore(object):
"""
txn.call_after(self._invalidate_state_caches, room_id, members_changed)
- # We need to be careful that the size of the `members_changed` list
- # isn't so large that it causes problems sending over replication, so we
- # send them in chunks.
- # Max line length is 16K, and max user ID length is 255, so 50 should
- # be safe.
- for chunk in batch_iter(members_changed, 50):
- keys = itertools.chain([room_id], chunk)
- self._send_invalidation_to_replication(txn, _CURRENT_STATE_CACHE_NAME, keys)
+ if members_changed:
+ # We need to be careful that the size of the `members_changed` list
+ # isn't so large that it causes problems sending over replication, so we
+ # send them in chunks.
+ # Max line length is 16K, and max user ID length is 255, so 50 should
+ # be safe.
+ for chunk in batch_iter(members_changed, 50):
+ keys = itertools.chain([room_id], chunk)
+ self._send_invalidation_to_replication(
+ txn, _CURRENT_STATE_CACHE_NAME, keys
+ )
+ else:
+ # if no members changed, we still need to invalidate the other caches.
+ self._send_invalidation_to_replication(
+ txn, _CURRENT_STATE_CACHE_NAME, [room_id]
+ )
def _invalidate_state_caches(self, room_id, members_changed):
"""Invalidates caches that are based on the current state, but does
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 9fa5b4f3d6..6afbfc0d74 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -90,7 +90,7 @@ class AccountDataWorkerStore(SQLBaseStore):
room_data = by_room.setdefault(row["room_id"], {})
room_data[row["account_data_type"]] = json.loads(row["content"])
- return (global_account_data, by_room)
+ return global_account_data, by_room
return self.runInteraction(
"get_account_data_for_user", get_account_data_for_user_txn
@@ -205,7 +205,7 @@ class AccountDataWorkerStore(SQLBaseStore):
)
txn.execute(sql, (last_room_id, current_id, limit))
room_results = txn.fetchall()
- return (global_results, room_results)
+ return global_results, room_results
return self.runInteraction(
"get_all_updated_account_data_txn", get_updated_account_data_txn
@@ -244,13 +244,13 @@ class AccountDataWorkerStore(SQLBaseStore):
room_account_data = account_data_by_room.setdefault(row[0], {})
room_account_data[row[1]] = json.loads(row[2])
- return (global_account_data, account_data_by_room)
+ return global_account_data, account_data_by_room
changed = self._account_data_stream_cache.has_entity_changed(
user_id, int(stream_id)
)
if not changed:
- return ({}, {})
+ return {}, {}
return self.runInteraction(
"get_updated_account_data_for_user", get_updated_account_data_for_user_txn
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 05d9c05c3f..435b2acd4d 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -165,7 +165,6 @@ class ApplicationServiceTransactionWorkerStore(
)
if result:
return result.get("state")
- return
return None
def set_appservice_state(self, service, state):
@@ -358,7 +357,7 @@ class ApplicationServiceTransactionWorkerStore(
events = yield self.get_events_as_list(event_ids)
- return (upper_bound, events)
+ return upper_bound, events
class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore):
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 79bb0ea46d..6b7458304e 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -19,6 +19,7 @@ from canonicaljson import json
from twisted.internet import defer
+from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util.caches.expiringcache import ExpiringCache
@@ -66,12 +67,13 @@ class DeviceInboxWorkerStore(SQLBaseStore):
messages.append(json.loads(row[1]))
if len(messages) < limit:
stream_pos = current_stream_id
- return (messages, stream_pos)
+ return messages, stream_pos
return self.runInteraction(
"get_new_messages_for_device", get_new_messages_for_device_txn
)
+ @trace
@defer.inlineCallbacks
def delete_messages_for_device(self, user_id, device_id, up_to_stream_id):
"""
@@ -87,11 +89,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), None
)
+
+ set_tag("last_deleted_stream_id", last_deleted_stream_id)
+
if last_deleted_stream_id:
has_changed = self._device_inbox_stream_cache.has_entity_changed(
user_id, last_deleted_stream_id
)
if not has_changed:
+ log_kv({"message": "No changes in cache since last check"})
return 0
def delete_messages_for_device_txn(txn):
@@ -107,6 +113,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
"delete_messages_for_device", delete_messages_for_device_txn
)
+ log_kv(
+ {"message": "deleted {} messages for device".format(count), "count": count}
+ )
+
# Update the cache, ensuring that we only ever increase the value
last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0
@@ -117,6 +127,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
return count
+ @trace
def get_new_device_msgs_for_remote(
self, destination, last_stream_id, current_stream_id, limit
):
@@ -132,16 +143,23 @@ class DeviceInboxWorkerStore(SQLBaseStore):
in the stream the messages got to.
"""
+ set_tag("destination", destination)
+ set_tag("last_stream_id", last_stream_id)
+ set_tag("current_stream_id", current_stream_id)
+ set_tag("limit", limit)
+
has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
destination, last_stream_id
)
if not has_changed or last_stream_id == current_stream_id:
+ log_kv({"message": "No new messages in stream"})
return defer.succeed(([], current_stream_id))
if limit <= 0:
# This can happen if we run out of room for EDUs in the transaction.
return defer.succeed(([], last_stream_id))
+ @trace
def get_new_messages_for_remote_destination_txn(txn):
sql = (
"SELECT stream_id, messages_json FROM device_federation_outbox"
@@ -156,14 +174,16 @@ class DeviceInboxWorkerStore(SQLBaseStore):
stream_pos = row[0]
messages.append(json.loads(row[1]))
if len(messages) < limit:
+ log_kv({"message": "Set stream position to current position"})
stream_pos = current_stream_id
- return (messages, stream_pos)
+ return messages, stream_pos
return self.runInteraction(
"get_new_device_msgs_for_remote",
get_new_messages_for_remote_destination_txn,
)
+ @trace
def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
"""Used to delete messages when the remote destination acknowledges
their receipt.
@@ -214,6 +234,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
expiry_ms=30 * 60 * 1000,
)
+ @trace
@defer.inlineCallbacks
def add_messages_to_device_inbox(
self, local_messages_by_user_then_device, remote_messages_by_destination
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 8f72d92895..79a58df591 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -21,6 +21,12 @@ from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import StoreError
+from synapse.logging.opentracing import (
+ get_active_span_text_map,
+ set_tag,
+ trace,
+ whitelisted_homeserver,
+)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import Cache, SQLBaseStore, db_to_json
from synapse.storage.background_updates import BackgroundUpdateStore
@@ -73,6 +79,7 @@ class DeviceWorkerStore(SQLBaseStore):
return {d["device_id"]: d for d in devices}
+ @trace
@defer.inlineCallbacks
def get_devices_by_remote(self, destination, from_stream_id, limit):
"""Get stream of updates to send to remote servers
@@ -88,7 +95,7 @@ class DeviceWorkerStore(SQLBaseStore):
destination, int(from_stream_id)
)
if not has_changed:
- return (now_stream_id, [])
+ return now_stream_id, []
# We retrieve n+1 devices from the list of outbound pokes where n is
# our outbound device update limit. We then check if the very last
@@ -111,7 +118,7 @@ class DeviceWorkerStore(SQLBaseStore):
# Return an empty list if there are no updates
if not updates:
- return (now_stream_id, [])
+ return now_stream_id, []
# if we have exceeded the limit, we need to exclude any results with the
# same stream_id as the last row.
@@ -127,8 +134,15 @@ class DeviceWorkerStore(SQLBaseStore):
# (user_id, device_id) entries into a map, with the value being
# the max stream_id across each set of duplicate entries
#
- # maps (user_id, device_id) -> stream_id
+ # maps (user_id, device_id) -> (stream_id, opentracing_context)
# as long as their stream_id does not match that of the last row
+ #
+ # opentracing_context contains the opentracing metadata for the request
+ # that created the poke
+ #
+ # The most recent request's opentracing_context is used as the
+ # context which created the Edu.
+
query_map = {}
for update in updates:
if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
@@ -136,7 +150,14 @@ class DeviceWorkerStore(SQLBaseStore):
break
key = (update[0], update[1])
- query_map[key] = max(query_map.get(key, 0), update[2])
+
+ update_context = update[3]
+ update_stream_id = update[2]
+
+ previous_update_stream_id, _ = query_map.get(key, (0, None))
+
+ if update_stream_id > previous_update_stream_id:
+ query_map[key] = (update_stream_id, update_context)
# If we didn't find any updates with a stream_id lower than the cutoff, it
# means that there are more than limit updates all of which have the same
@@ -147,13 +168,13 @@ class DeviceWorkerStore(SQLBaseStore):
# skip that stream_id and return an empty list, and continue with the next
# stream_id next time.
if not query_map:
- return (stream_id_cutoff, [])
+ return stream_id_cutoff, []
results = yield self._get_device_update_edus_by_remote(
destination, from_stream_id, query_map
)
- return (now_stream_id, results)
+ return now_stream_id, results
def _get_devices_by_remote_txn(
self, txn, destination, from_stream_id, now_stream_id, limit
@@ -171,7 +192,7 @@ class DeviceWorkerStore(SQLBaseStore):
List: List of device updates
"""
sql = """
- SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
+ SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes
WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
ORDER BY stream_id
LIMIT ?
@@ -187,8 +208,9 @@ class DeviceWorkerStore(SQLBaseStore):
Args:
destination (str): The host the device updates are intended for
from_stream_id (int): The minimum stream_id to filter updates by, exclusive
- query_map (Dict[(str, str): int]): Dictionary mapping
- user_id/device_id to update stream_id
+ query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping
+ user_id/device_id to update stream_id and the relevent json-encoded
+ opentracing context
Returns:
List[Dict]: List of objects representing an device update EDU
@@ -210,12 +232,13 @@ class DeviceWorkerStore(SQLBaseStore):
destination, user_id, from_stream_id
)
for device_id, device in iteritems(user_devices):
- stream_id = query_map[(user_id, device_id)]
+ stream_id, opentracing_context = query_map[(user_id, device_id)]
result = {
"user_id": user_id,
"device_id": device_id,
"prev_id": [prev_id] if prev_id else [],
"stream_id": stream_id,
+ "org.matrix.opentracing_context": opentracing_context,
}
prev_id = stream_id
@@ -299,6 +322,7 @@ class DeviceWorkerStore(SQLBaseStore):
def get_device_stream_token(self):
return self._device_list_id_gen.get_current_token()
+ @trace
@defer.inlineCallbacks
def get_user_devices_from_cache(self, query_list):
"""Get the devices (and keys if any) for remote users from the cache.
@@ -330,7 +354,10 @@ class DeviceWorkerStore(SQLBaseStore):
else:
results[user_id] = yield self._get_cached_devices_for_user(user_id)
- return (user_ids_not_in_cache, results)
+ set_tag("in_cache", results)
+ set_tag("not_in_cache", user_ids_not_in_cache)
+
+ return user_ids_not_in_cache, results
@cachedInlineCallbacks(num_args=2, tree=True)
def _get_cached_user_device(self, user_id, device_id):
@@ -814,6 +841,8 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
],
)
+ context = get_active_span_text_map()
+
self._simple_insert_many_txn(
txn,
table="device_lists_outbound_pokes",
@@ -825,6 +854,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
"device_id": device_id,
"sent": False,
"ts": now,
+ "opentracing_context": json.dumps(context)
+ if whitelisted_homeserver(destination)
+ else "{}",
}
for destination in hosts
for device_id in device_ids
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index e966a73f3d..eed7757ed5 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -47,7 +47,6 @@ class DirectoryWorkerStore(SQLBaseStore):
if not room_id:
return None
- return
servers = yield self._simple_select_onecol(
"room_alias_servers",
@@ -58,7 +57,6 @@ class DirectoryWorkerStore(SQLBaseStore):
if not servers:
return None
- return
return RoomAliasMapping(room_id, room_alias.to_string(), servers)
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
index 99128f2df7..be2fe2bab6 100644
--- a/synapse/storage/e2e_room_keys.py
+++ b/synapse/storage/e2e_room_keys.py
@@ -18,6 +18,7 @@ import json
from twisted.internet import defer
from synapse.api.errors import StoreError
+from synapse.logging.opentracing import log_kv, trace
from ._base import SQLBaseStore
@@ -82,11 +83,11 @@ class EndToEndRoomKeyStore(SQLBaseStore):
table="e2e_room_keys",
keyvalues={
"user_id": user_id,
+ "version": version,
"room_id": room_id,
"session_id": session_id,
},
values={
- "version": version,
"first_message_index": room_key["first_message_index"],
"forwarded_count": room_key["forwarded_count"],
"is_verified": room_key["is_verified"],
@@ -94,7 +95,16 @@ class EndToEndRoomKeyStore(SQLBaseStore):
},
lock=False,
)
+ log_kv(
+ {
+ "message": "Set room key",
+ "room_id": room_id,
+ "session_id": session_id,
+ "room_key": room_key,
+ }
+ )
+ @trace
@defer.inlineCallbacks
def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -153,6 +163,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
return sessions
+ @trace
@defer.inlineCallbacks
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -236,6 +247,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
)
+ @trace
def create_e2e_room_keys_version(self, user_id, info):
"""Atomically creates a new version of this user's e2e_room_keys store
with the given version info.
@@ -276,6 +288,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
)
+ @trace
def update_e2e_room_keys_version(self, user_id, version, info):
"""Update a given backup version
@@ -292,6 +305,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
desc="update_e2e_room_keys_version",
)
+ @trace
def delete_e2e_room_keys_version(self, user_id, version=None):
"""Delete a given backup version of the user's room keys.
Doesn't delete their actual key data.
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 1e07474e70..33e3a84933 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -18,12 +18,14 @@ from canonicaljson import encode_canonical_json
from twisted.internet import defer
+from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore, db_to_json
class EndToEndKeyWorkerStore(SQLBaseStore):
+ @trace
@defer.inlineCallbacks
def get_e2e_device_keys(
self, query_list, include_all_devices=False, include_deleted_devices=False
@@ -40,6 +42,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
Dict mapping from user-id to dict mapping from device_id to
dict containing "key_json", "device_display_name".
"""
+ set_tag("query_list", query_list)
if not query_list:
return {}
@@ -57,9 +60,13 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
return results
+ @trace
def _get_e2e_device_keys_txn(
self, txn, query_list, include_all_devices=False, include_deleted_devices=False
):
+ set_tag("include_all_devices", include_all_devices)
+ set_tag("include_deleted_devices", include_deleted_devices)
+
query_clauses = []
query_params = []
@@ -104,6 +111,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
for user_id, device_id in deleted_devices:
result.setdefault(user_id, {})[device_id] = None
+ log_kv(result)
return result
@defer.inlineCallbacks
@@ -129,8 +137,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
keyvalues={"user_id": user_id, "device_id": device_id},
desc="add_e2e_one_time_keys_check",
)
-
- return {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
+ result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
+ log_kv({"message": "Fetched one time keys for user", "one_time_keys": result})
+ return result
@defer.inlineCallbacks
def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
@@ -146,6 +155,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
"""
def _add_e2e_one_time_keys(txn):
+ set_tag("user_id", user_id)
+ set_tag("device_id", device_id)
+ set_tag("new_keys", new_keys)
# We are protected from race between lookup and insertion due to
# a unique constraint. If there is a race of two calls to
# `add_e2e_one_time_keys` then they'll conflict and we will only
@@ -202,6 +214,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
"""
def _set_e2e_device_keys_txn(txn):
+ set_tag("user_id", user_id)
+ set_tag("device_id", device_id)
+ set_tag("time_now", time_now)
+ set_tag("device_keys", device_keys)
+
old_key_json = self._simple_select_one_onecol_txn(
txn,
table="e2e_device_keys_json",
@@ -215,6 +232,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
new_key_json = encode_canonical_json(device_keys).decode("utf-8")
if old_key_json == new_key_json:
+ log_kv({"Message": "Device key already stored."})
return False
self._simple_upsert_txn(
@@ -223,7 +241,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
keyvalues={"user_id": user_id, "device_id": device_id},
values={"ts_added_ms": time_now, "key_json": new_key_json},
)
-
+ log_kv({"message": "Device keys stored."})
return True
return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn)
@@ -231,6 +249,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def claim_e2e_one_time_keys(self, query_list):
"""Take a list of one time keys out of the database"""
+ @trace
def _claim_e2e_one_time_keys(txn):
sql = (
"SELECT key_id, key_json FROM e2e_one_time_keys_json"
@@ -252,7 +271,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
" AND key_id = ?"
)
for user_id, device_id, algorithm, key_id in delete:
+ log_kv(
+ {
+ "message": "Executing claim e2e_one_time_keys transaction on database."
+ }
+ )
txn.execute(sql, (user_id, device_id, algorithm, key_id))
+ log_kv({"message": "finished executing and invalidating cache"})
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id)
)
@@ -262,6 +287,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def delete_e2e_keys_by_device(self, user_id, device_id):
def delete_e2e_keys_by_device_txn(txn):
+ log_kv(
+ {
+ "message": "Deleting keys for device",
+ "device_id": device_id,
+ "user_id": user_id,
+ }
+ )
self._simple_delete_txn(
txn,
table="e2e_device_keys_json",
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ac876287fc..ddf7ab6479 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -23,7 +23,7 @@ from functools import wraps
from six import iteritems, text_type
from six.moves import range
-from canonicaljson import json
+from canonicaljson import encode_canonical_json, json
from prometheus_client import Counter, Histogram
from twisted.internet import defer
@@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.events.utils import prune_event_dict
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.utils import log_function
from synapse.metrics import BucketCollector
@@ -262,6 +263,14 @@ class EventsStore(
hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)
+ def _censor_redactions():
+ return run_as_background_process(
+ "_censor_redactions", self._censor_redactions
+ )
+
+ if self.hs.config.redaction_retention_period is not None:
+ hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)
+
@defer.inlineCallbacks
def _read_forward_extremities(self):
def fetch(txn):
@@ -810,7 +819,7 @@ class EventsStore(
# If they old and new groups are the same then we don't need to do
# anything.
if old_state_groups == new_state_groups:
- return (None, None)
+ return None, None
if len(new_state_groups) == 1 and len(old_state_groups) == 1:
# If we're going from one state group to another, lets check if
@@ -827,7 +836,7 @@ class EventsStore(
# the current state in memory then lets also return that,
# but it doesn't matter if we don't.
new_state = state_groups_map.get(new_state_group)
- return (new_state, delta_ids)
+ return new_state, delta_ids
# Now that we have calculated new_state_groups we need to get
# their state IDs so we can resolve to a single state set.
@@ -839,7 +848,7 @@ class EventsStore(
if len(new_state_groups) == 1:
# If there is only one state group, then we know what the current
# state is.
- return (state_groups_map[new_state_groups.pop()], None)
+ return state_groups_map[new_state_groups.pop()], None
# Ok, we need to defer to the state handler to resolve our state sets.
@@ -868,7 +877,7 @@ class EventsStore(
state_res_store=StateResolutionStore(self),
)
- return (res.state, None)
+ return res.state, None
@defer.inlineCallbacks
def _calculate_state_delta(self, room_id, current_state):
@@ -891,7 +900,7 @@ class EventsStore(
if ev_id != existing_state.get(key)
}
- return (to_delete, to_insert)
+ return to_delete, to_insert
@log_function
def _persist_events_txn(
@@ -1302,15 +1311,11 @@ class EventsStore(
"event_reference_hashes",
"event_search",
"event_to_state_groups",
- "guest_access",
- "history_visibility",
"local_invites",
- "room_names",
"state_events",
"rejections",
"redactions",
"room_memberships",
- "topics",
):
txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
@@ -1454,10 +1459,10 @@ class EventsStore(
for event, _ in events_and_contexts:
if event.type == EventTypes.Name:
- # Insert into the room_names and event_search tables.
+ # Insert into the event_search table.
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
- # Insert into the topics table and event_search table.
+ # Insert into the event_search table.
self._store_room_topic_txn(txn, event)
elif event.type == EventTypes.Message:
# Insert into the event_search table.
@@ -1465,12 +1470,6 @@ class EventsStore(
elif event.type == EventTypes.Redaction:
# Insert into the redactions table.
self._store_redaction(txn, event)
- elif event.type == EventTypes.RoomHistoryVisibility:
- # Insert into the event_search table.
- self._store_history_visibility_txn(txn, event)
- elif event.type == EventTypes.GuestAccess:
- # Insert into the event_search table.
- self._store_guest_access_txn(txn, event)
self._handle_event_relations(txn, event)
@@ -1559,6 +1558,98 @@ class EventsStore(
)
@defer.inlineCallbacks
+ def _censor_redactions(self):
+ """Censors all redactions older than the configured period that haven't
+ been censored yet.
+
+ By censor we mean update the event_json table with the redacted event.
+
+ Returns:
+ Deferred
+ """
+
+ if self.hs.config.redaction_retention_period is None:
+ return
+
+ max_pos = yield self.find_first_stream_ordering_after_ts(
+ self._clock.time_msec() - self.hs.config.redaction_retention_period
+ )
+
+ # We fetch all redactions that:
+ # 1. point to an event we have,
+ # 2. has a stream ordering from before the cut off, and
+ # 3. we haven't yet censored.
+ #
+ # This is limited to 100 events to ensure that we don't try and do too
+ # much at once. We'll get called again so this should eventually catch
+ # up.
+ #
+ # We use the range [-max_pos, max_pos] to handle backfilled events,
+ # which are given negative stream ordering.
+ sql = """
+ SELECT redact_event.event_id, redacts FROM redactions
+ INNER JOIN events AS redact_event USING (event_id)
+ INNER JOIN events AS original_event ON (
+ redact_event.room_id = original_event.room_id
+ AND redacts = original_event.event_id
+ )
+ WHERE NOT have_censored
+ AND ? <= redact_event.stream_ordering AND redact_event.stream_ordering <= ?
+ ORDER BY redact_event.stream_ordering ASC
+ LIMIT ?
+ """
+
+ rows = yield self._execute(
+ "_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100
+ )
+
+ updates = []
+
+ for redaction_id, event_id in rows:
+ redaction_event = yield self.get_event(redaction_id, allow_none=True)
+ original_event = yield self.get_event(
+ event_id, allow_rejected=True, allow_none=True
+ )
+
+ # The SQL above ensures that we have both the redaction and
+ # original event, so if the `get_event` calls return None it
+ # means that the redaction wasn't allowed. Either way we know that
+ # the result won't change so we mark the fact that we've checked.
+ if (
+ redaction_event
+ and original_event
+ and original_event.internal_metadata.is_redacted()
+ ):
+ # Redaction was allowed
+ pruned_json = encode_canonical_json(
+ prune_event_dict(original_event.get_dict())
+ )
+ else:
+ # Redaction wasn't allowed
+ pruned_json = None
+
+ updates.append((redaction_id, event_id, pruned_json))
+
+ def _update_censor_txn(txn):
+ for redaction_id, event_id, pruned_json in updates:
+ if pruned_json:
+ self._simple_update_one_txn(
+ txn,
+ table="event_json",
+ keyvalues={"event_id": event_id},
+ updatevalues={"json": pruned_json},
+ )
+
+ self._simple_update_one_txn(
+ txn,
+ table="redactions",
+ keyvalues={"event_id": redaction_id},
+ updatevalues={"have_censored": True},
+ )
+
+ yield self.runInteraction("_update_censor_txn", _update_censor_txn)
+
+ @defer.inlineCallbacks
def count_daily_messages(self):
"""
Returns an estimate of the number of messages sent in the last day.
@@ -2191,6 +2282,144 @@ class EventsStore(
return to_delete, to_dedelta
+ def purge_room(self, room_id):
+ """Deletes all record of a room
+
+ Args:
+ room_id (str):
+ """
+
+ return self.runInteraction("purge_room", self._purge_room_txn, room_id)
+
+ def _purge_room_txn(self, txn, room_id):
+ # first we have to delete the state groups states
+ logger.info("[purge] removing %s from state_groups_state", room_id)
+
+ txn.execute(
+ """
+ DELETE FROM state_groups_state WHERE state_group IN (
+ SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
+ WHERE events.room_id=?
+ )
+ """,
+ (room_id,),
+ )
+
+ # ... and the state group edges
+ logger.info("[purge] removing %s from state_group_edges", room_id)
+
+ txn.execute(
+ """
+ DELETE FROM state_group_edges WHERE state_group IN (
+ SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
+ WHERE events.room_id=?
+ )
+ """,
+ (room_id,),
+ )
+
+ # ... and the state groups
+ logger.info("[purge] removing %s from state_groups", room_id)
+
+ txn.execute(
+ """
+ DELETE FROM state_groups WHERE id IN (
+ SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
+ WHERE events.room_id=?
+ )
+ """,
+ (room_id,),
+ )
+
+ # and then tables which lack an index on room_id but have one on event_id
+ for table in (
+ "event_auth",
+ "event_edges",
+ "event_push_actions_staging",
+ "event_reference_hashes",
+ "event_relations",
+ "event_to_state_groups",
+ "redactions",
+ "rejections",
+ "state_events",
+ ):
+ logger.info("[purge] removing %s from %s", room_id, table)
+
+ txn.execute(
+ """
+ DELETE FROM %s WHERE event_id IN (
+ SELECT event_id FROM events WHERE room_id=?
+ )
+ """
+ % (table,),
+ (room_id,),
+ )
+
+ # and finally, the tables with an index on room_id (or no useful index)
+ for table in (
+ "current_state_events",
+ "event_backward_extremities",
+ "event_forward_extremities",
+ "event_json",
+ "event_push_actions",
+ "event_search",
+ "events",
+ "group_rooms",
+ "public_room_list_stream",
+ "receipts_graph",
+ "receipts_linearized",
+ "room_aliases",
+ "room_depth",
+ "room_memberships",
+ "room_stats_state",
+ "room_stats_current",
+ "room_stats_historical",
+ "room_stats_earliest_token",
+ "rooms",
+ "stream_ordering_to_exterm",
+ "topics",
+ "users_in_public_rooms",
+ "users_who_share_private_rooms",
+ # no useful index, but let's clear them anyway
+ "appservice_room_list",
+ "e2e_room_keys",
+ "event_push_summary",
+ "pusher_throttle",
+ "group_summary_rooms",
+ "local_invites",
+ "room_account_data",
+ "room_tags",
+ ):
+ logger.info("[purge] removing %s from %s", room_id, table)
+ txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))
+
+ # Other tables we do NOT need to clear out:
+ #
+ # - blocked_rooms
+ # This is important, to make sure that we don't accidentally rejoin a blocked
+ # room after it was purged
+ #
+ # - user_directory
+ # This has a room_id column, but it is unused
+ #
+
+ # Other tables that we might want to consider clearing out include:
+ #
+ # - event_reports
+ # Given that these are intended for abuse management my initial
+ # inclination is to leave them in place.
+ #
+ # - current_state_delta_stream
+ # - ex_outlier_stream
+ # - room_tags_revisions
+ # The problem with these is that they are largeish and there is no room_id
+ # index on them. In any case we should be clearing out 'stream' tables
+ # periodically anyway (#5888)
+
+ # TODO: we could probably usefully do a bunch of cache invalidation here
+
+ logger.info("[purge] done")
+
@defer.inlineCallbacks
def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index d20eacda59..e96eed8a6d 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -238,6 +238,13 @@ def _upgrade_existing_database(
logger.debug("applied_delta_files: %s", applied_delta_files)
+ if isinstance(database_engine, PostgresEngine):
+ specific_engine_extension = ".postgres"
+ else:
+ specific_engine_extension = ".sqlite"
+
+ specific_engine_extensions = (".sqlite", ".postgres")
+
for v in range(start_ver, SCHEMA_VERSION + 1):
logger.info("Upgrading schema to v%d", v)
@@ -274,15 +281,22 @@ def _upgrade_existing_database(
# Sometimes .pyc files turn up anyway even though we've
# disabled their generation; e.g. from distribution package
# installers. Silently skip it
- pass
+ continue
elif ext == ".sql":
# A plain old .sql file, just read and execute it
logger.info("Applying schema %s", relative_path)
executescript(cur, absolute_path)
+ elif ext == specific_engine_extension and root_name.endswith(".sql"):
+ # A .sql file specific to our engine; just read and execute it
+ logger.info("Applying engine-specific schema %s", relative_path)
+ executescript(cur, absolute_path)
+ elif ext in specific_engine_extensions and root_name.endswith(".sql"):
+ # A .sql file for a different engine; skip it.
+ continue
else:
# Not a valid delta file.
- logger.warn(
- "Found directory entry that did not end in .py or" " .sql: %s",
+ logger.warning(
+ "Found directory entry that did not end in .py or .sql: %s",
relative_path,
)
continue
@@ -290,7 +304,7 @@ def _upgrade_existing_database(
# Mark as done.
cur.execute(
database_engine.convert_param_style(
- "INSERT INTO applied_schema_deltas (version, file)" " VALUES (?,?)"
+ "INSERT INTO applied_schema_deltas (version, file) VALUES (?,?)"
),
(v, relative_path),
)
@@ -298,7 +312,7 @@ def _upgrade_existing_database(
cur.execute("DELETE FROM schema_version")
cur.execute(
database_engine.convert_param_style(
- "INSERT INTO schema_version (version, upgraded)" " VALUES (?,?)"
+ "INSERT INTO schema_version (version, upgraded) VALUES (?,?)"
),
(v, True),
)
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 1a0f2d5768..5db6f2d84a 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -90,7 +90,7 @@ class PresenceStore(SQLBaseStore):
presence_states,
)
- return (stream_orderings[-1], self._presence_id_gen.get_current_token())
+ return stream_orderings[-1], self._presence_id_gen.get_current_token()
def _update_presence_txn(self, txn, stream_orderings, presence_states):
for stream_id, state in zip(stream_orderings, presence_states):
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index 8a5d8e9b18..912c1df6be 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -35,7 +35,6 @@ class ProfileWorkerStore(SQLBaseStore):
if e.code == 404:
# no match
return ProfileInfo(None, None)
- return
else:
raise
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index b431d24b8a..3e0e834a62 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -133,7 +133,7 @@ class PusherWorkerStore(SQLBaseStore):
txn.execute(sql, (last_id, current_id, limit))
deleted = txn.fetchall()
- return (updated, deleted)
+ return updated, deleted
return self.runInteraction(
"get_all_updated_pushers", get_all_updated_pushers_txn
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 6aa6d98ebb..290ddb30e8 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -478,7 +478,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
max_persisted_id = self._receipts_id_gen.get_current_token()
- return (stream_id, max_persisted_id)
+ return stream_id, max_persisted_id
def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids, data):
return self.runInteraction(
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 1e3c2148f6..cc1b1b73c9 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -57,6 +57,7 @@ class RegistrationWorkerStore(SQLBaseStore):
"consent_server_notice_sent",
"appservice_id",
"creation_ts",
+ "user_type",
],
allow_none=True,
desc="get_user_by_id",
@@ -273,6 +274,14 @@ class RegistrationWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def is_server_admin(self, user):
+ """Determines if a user is an admin of this homeserver.
+
+ Args:
+ user (UserID): user ID of the user to test
+
+ Returns (bool):
+ true iff the user is a server admin, false otherwise.
+ """
res = yield self._simple_select_one_onecol(
table="users",
keyvalues={"name": user.to_string()},
@@ -283,6 +292,21 @@ class RegistrationWorkerStore(SQLBaseStore):
return res if res else False
+ def set_server_admin(self, user, admin):
+ """Sets whether a user is an admin of this homeserver.
+
+ Args:
+ user (UserID): user ID of the user to test
+ admin (bool): true iff the user is to be a server admin,
+ false otherwise.
+ """
+ return self._simple_update_one(
+ table="users",
+ keyvalues={"name": user.to_string()},
+ updatevalues={"admin": 1 if admin else 0},
+ desc="set_server_admin",
+ )
+
def _query_for_auth(self, txn, token):
sql = (
"SELECT users.name, users.is_guest, access_tokens.id as token_id,"
@@ -300,6 +324,19 @@ class RegistrationWorkerStore(SQLBaseStore):
return None
@cachedInlineCallbacks()
+ def is_real_user(self, user_id):
+ """Determines if the user is a real user, ie does not have a 'user_type'.
+
+ Args:
+ user_id (str): user id to test
+
+ Returns:
+ Deferred[bool]: True if user 'user_type' is null or empty string
+ """
+ res = yield self.runInteraction("is_real_user", self.is_real_user_txn, user_id)
+ return res
+
+ @cachedInlineCallbacks()
def is_support_user(self, user_id):
"""Determines if the user is of type UserTypes.SUPPORT
@@ -314,6 +351,16 @@ class RegistrationWorkerStore(SQLBaseStore):
)
return res
+ def is_real_user_txn(self, txn, user_id):
+ res = self._simple_select_one_onecol_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ retcol="user_type",
+ allow_none=True,
+ )
+ return res is None
+
def is_support_user_txn(self, txn, user_id):
res = self._simple_select_one_onecol_txn(
txn=txn,
@@ -419,6 +466,20 @@ class RegistrationWorkerStore(SQLBaseStore):
return ret
@defer.inlineCallbacks
+ def count_real_users(self):
+ """Counts all users without a special user_type registered on the homeserver."""
+
+ def _count_users(txn):
+ txn.execute("SELECT COUNT(*) AS users FROM users where user_type is null")
+ rows = self.cursor_to_dict(txn)
+ if rows:
+ return rows[0]["users"]
+ return 0
+
+ ret = yield self.runInteraction("count_real_users", _count_users)
+ return ret
+
+ @defer.inlineCallbacks
def find_next_generated_user_id_localpart(self):
"""
Gets the localpart of the next generated user ID.
@@ -611,6 +672,85 @@ class RegistrationWorkerStore(SQLBaseStore):
# Convert the integer into a boolean.
return res == 1
+ def get_threepid_validation_session(
+ self, medium, client_secret, address=None, sid=None, validated=True
+ ):
+ """Gets a session_id and last_send_attempt (if available) for a
+ client_secret/medium/(address|session_id) combo
+
+ Args:
+ medium (str|None): The medium of the 3PID
+ address (str|None): The address of the 3PID
+ sid (str|None): The ID of the validation session
+ client_secret (str|None): A unique string provided by the client to
+ help identify this validation attempt
+ validated (bool|None): Whether sessions should be filtered by
+ whether they have been validated already or not. None to
+ perform no filtering
+
+ Returns:
+ deferred {str, int}|None: A dict containing the
+ latest session_id and send_attempt count for this 3PID.
+ Otherwise None if there hasn't been a previous attempt
+ """
+ keyvalues = {"medium": medium, "client_secret": client_secret}
+ if address:
+ keyvalues["address"] = address
+ if sid:
+ keyvalues["session_id"] = sid
+
+ assert address or sid
+
+ def get_threepid_validation_session_txn(txn):
+ sql = """
+ SELECT address, session_id, medium, client_secret,
+ last_send_attempt, validated_at
+ FROM threepid_validation_session WHERE %s
+ """ % (
+ " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),
+ )
+
+ if validated is not None:
+ sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL")
+
+ sql += " LIMIT 1"
+
+ txn.execute(sql, list(keyvalues.values()))
+ rows = self.cursor_to_dict(txn)
+ if not rows:
+ return None
+
+ return rows[0]
+
+ return self.runInteraction(
+ "get_threepid_validation_session", get_threepid_validation_session_txn
+ )
+
+ def delete_threepid_session(self, session_id):
+ """Removes a threepid validation session from the database. This can
+ be done after validation has been performed and whatever action was
+ waiting on it has been carried out
+
+ Args:
+ session_id (str): The ID of the session to delete
+ """
+
+ def delete_threepid_session_txn(txn):
+ self._simple_delete_txn(
+ txn,
+ table="threepid_validation_token",
+ keyvalues={"session_id": session_id},
+ )
+ self._simple_delete_txn(
+ txn,
+ table="threepid_validation_session",
+ keyvalues={"session_id": session_id},
+ )
+
+ return self.runInteraction(
+ "delete_threepid_session", delete_threepid_session_txn
+ )
+
class RegistrationStore(
RegistrationWorkerStore, background_updates.BackgroundUpdateStore
@@ -866,6 +1006,17 @@ class RegistrationStore(
(user_id_obj.localpart, create_profile_with_displayname),
)
+ if self.hs.config.stats_enabled:
+ # we create a new completed user statistics row
+
+ # we don't strictly need current_token since this user really can't
+ # have any state deltas before now (as it is a new user), but still,
+ # we include it for completeness.
+ current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
+ self._update_stats_delta_txn(
+ txn, now, "user", user_id, {}, complete_with_stream_id=current_token
+ )
+
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
txn.call_after(self.is_guest.invalidate, (user_id,))
@@ -1088,60 +1239,6 @@ class RegistrationStore(
return 1
- def get_threepid_validation_session(
- self, medium, client_secret, address=None, sid=None, validated=True
- ):
- """Gets a session_id and last_send_attempt (if available) for a
- client_secret/medium/(address|session_id) combo
-
- Args:
- medium (str|None): The medium of the 3PID
- address (str|None): The address of the 3PID
- sid (str|None): The ID of the validation session
- client_secret (str|None): A unique string provided by the client to
- help identify this validation attempt
- validated (bool|None): Whether sessions should be filtered by
- whether they have been validated already or not. None to
- perform no filtering
-
- Returns:
- deferred {str, int}|None: A dict containing the
- latest session_id and send_attempt count for this 3PID.
- Otherwise None if there hasn't been a previous attempt
- """
- keyvalues = {"medium": medium, "client_secret": client_secret}
- if address:
- keyvalues["address"] = address
- if sid:
- keyvalues["session_id"] = sid
-
- assert address or sid
-
- def get_threepid_validation_session_txn(txn):
- sql = """
- SELECT address, session_id, medium, client_secret,
- last_send_attempt, validated_at
- FROM threepid_validation_session WHERE %s
- """ % (
- " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),
- )
-
- if validated is not None:
- sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL")
-
- sql += " LIMIT 1"
-
- txn.execute(sql, list(keyvalues.values()))
- rows = self.cursor_to_dict(txn)
- if not rows:
- return None
-
- return rows[0]
-
- return self.runInteraction(
- "get_threepid_validation_session", get_threepid_validation_session_txn
- )
-
def validate_threepid_session(self, session_id, client_secret, token, current_ts):
"""Attempt to validate a threepid session using a token
@@ -1157,6 +1254,7 @@ class RegistrationStore(
deferred str|None: A str representing a link to redirect the user
to if there is one.
"""
+
# Insert everything into a transaction in order to run atomically
def validate_threepid_session_txn(txn):
row = self._simple_select_one_txn(
@@ -1328,31 +1426,6 @@ class RegistrationStore(
self.clock.time_msec(),
)
- def delete_threepid_session(self, session_id):
- """Removes a threepid validation session from the database. This can
- be done after validation has been performed and whatever action was
- waiting on it has been carried out
-
- Args:
- session_id (str): The ID of the session to delete
- """
-
- def delete_threepid_session_txn(txn):
- self._simple_delete_txn(
- txn,
- table="threepid_validation_token",
- keyvalues={"session_id": session_id},
- )
- self._simple_delete_txn(
- txn,
- table="threepid_validation_session",
- keyvalues={"session_id": session_id},
- )
-
- return self.runInteraction(
- "delete_threepid_session", delete_threepid_session_txn
- )
-
def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
self._simple_update_one_txn(
txn=txn,
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index bc606292b8..08e13f3a3b 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -386,32 +386,12 @@ class RoomStore(RoomWorkerStore, SearchStore):
def _store_room_topic_txn(self, txn, event):
if hasattr(event, "content") and "topic" in event.content:
- self._simple_insert_txn(
- txn,
- "topics",
- {
- "event_id": event.event_id,
- "room_id": event.room_id,
- "topic": event.content["topic"],
- },
- )
-
self.store_event_search_txn(
txn, event, "content.topic", event.content["topic"]
)
def _store_room_name_txn(self, txn, event):
if hasattr(event, "content") and "name" in event.content:
- self._simple_insert_txn(
- txn,
- "room_names",
- {
- "event_id": event.event_id,
- "room_id": event.room_id,
- "name": event.content["name"],
- },
- )
-
self.store_event_search_txn(
txn, event, "content.name", event.content["name"]
)
@@ -422,21 +402,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
txn, event, "content.body", event.content["body"]
)
- def _store_history_visibility_txn(self, txn, event):
- self._store_content_index_txn(txn, event, "history_visibility")
-
- def _store_guest_access_txn(self, txn, event):
- self._store_content_index_txn(txn, event, "guest_access")
-
- def _store_content_index_txn(self, txn, event, key):
- if hasattr(event, "content") and key in event.content:
- sql = (
- "INSERT INTO %(key)s"
- " (event_id, room_id, %(key)s)"
- " VALUES (?, ?, ?)" % {"key": key}
- )
- txn.execute(sql, (event.event_id, event.room_id, event.content[key]))
-
def add_event_report(
self, room_id, event_id, user_id, reason, content, received_ts
):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index eecb276465..4df8ebdacd 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,8 +24,10 @@ from canonicaljson import json
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
+from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction
+from synapse.storage.engines import Sqlite3Engine
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer
@@ -74,6 +76,63 @@ class RoomMemberWorkerStore(EventsWorkerStore):
self._check_safe_current_state_events_membership_updated_txn(txn)
txn.close()
+ if self.hs.config.metrics_flags.known_servers:
+ self._known_servers_count = 1
+ self.hs.get_clock().looping_call(
+ run_as_background_process,
+ 60 * 1000,
+ "_count_known_servers",
+ self._count_known_servers,
+ )
+ self.hs.get_clock().call_later(
+ 1000,
+ run_as_background_process,
+ "_count_known_servers",
+ self._count_known_servers,
+ )
+ LaterGauge(
+ "synapse_federation_known_servers",
+ "",
+ [],
+ lambda: self._known_servers_count,
+ )
+
+ @defer.inlineCallbacks
+ def _count_known_servers(self):
+ """
+ Count the servers that this server knows about.
+
+ The statistic is stored on the class for the
+ `synapse_federation_known_servers` LaterGauge to collect.
+ """
+
+ def _transact(txn):
+ if isinstance(self.database_engine, Sqlite3Engine):
+ query = """
+ SELECT COUNT(DISTINCT substr(out.user_id, pos+1))
+ FROM (
+ SELECT rm.user_id as user_id, instr(rm.user_id, ':')
+ AS pos FROM room_memberships as rm
+ INNER JOIN current_state_events as c ON rm.event_id = c.event_id
+ WHERE c.type = 'm.room.member'
+ ) as out
+ """
+ else:
+ query = """
+ SELECT COUNT(DISTINCT split_part(state_key, ':', 2))
+ FROM current_state_events
+ WHERE type = 'm.room.member' AND membership = 'join';
+ """
+ txn.execute(query)
+ return list(txn)[0][0]
+
+ count = yield self.runInteraction("get_known_servers", _transact)
+
+ # We always know about ourselves, even if we have nothing in
+ # room_memberships (for example, the server is new).
+ self._known_servers_count = max([count, 1])
+ return self._known_servers_count
+
def _check_safe_current_state_events_membership_updated_txn(self, txn):
"""Checks if it is safe to assume the new current_state_events
membership column is up to date
@@ -112,29 +171,31 @@ class RoomMemberWorkerStore(EventsWorkerStore):
@cached(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id):
- def f(txn):
- # If we can assume current_state_events.membership is up to date
- # then we can avoid a join, which is a Very Good Thing given how
- # frequently this function gets called.
- if self._current_state_events_membership_up_to_date:
- sql = """
- SELECT state_key FROM current_state_events
- WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
- """
- else:
- sql = """
- SELECT state_key FROM room_memberships as m
- INNER JOIN current_state_events as c
- ON m.event_id = c.event_id
- AND m.room_id = c.room_id
- AND m.user_id = c.state_key
- WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
- """
+ return self.runInteraction(
+ "get_users_in_room", self.get_users_in_room_txn, room_id
+ )
- txn.execute(sql, (room_id, Membership.JOIN))
- return [to_ascii(r[0]) for r in txn]
+ def get_users_in_room_txn(self, txn, room_id):
+ # If we can assume current_state_events.membership is up to date
+ # then we can avoid a join, which is a Very Good Thing given how
+ # frequently this function gets called.
+ if self._current_state_events_membership_up_to_date:
+ sql = """
+ SELECT state_key FROM current_state_events
+ WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
+ """
+ else:
+ sql = """
+ SELECT state_key FROM room_memberships as m
+ INNER JOIN current_state_events as c
+ ON m.event_id = c.event_id
+ AND m.room_id = c.room_id
+ AND m.user_id = c.state_key
+ WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
+ """
- return self.runInteraction("get_users_in_room", f)
+ txn.execute(sql, (room_id, Membership.JOIN))
+ return [to_ascii(r[0]) for r in txn]
@cached(max_entries=100000)
def get_room_summary(self, room_id):
diff --git a/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql b/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql
new file mode 100644
index 0000000000..41807eb1e7
--- /dev/null
+++ b/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql
@@ -0,0 +1,20 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Opentracing context data for inclusion in the device_list_update EDUs, as a
+ * json-encoded dictionary. NULL if opentracing is disabled (or not enabled for this destination).
+ */
+ALTER TABLE device_lists_outbound_pokes ADD opentracing_context TEXT;
diff --git a/synapse/storage/schema/delta/56/destinations_failure_ts.sql b/synapse/storage/schema/delta/56/destinations_failure_ts.sql
new file mode 100644
index 0000000000..f00889290b
--- /dev/null
+++ b/synapse/storage/schema/delta/56/destinations_failure_ts.sql
@@ -0,0 +1,25 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Record the timestamp when a given server started failing
+ */
+ALTER TABLE destinations ADD failure_ts BIGINT;
+
+/* as a rough approximation, we assume that the server started failing at
+ * retry_interval before the last retry
+ */
+UPDATE destinations SET failure_ts = retry_last_ts - retry_interval
+ WHERE retry_last_ts > 0;
diff --git a/synapse/storage/schema/delta/56/drop_unused_event_tables.sql b/synapse/storage/schema/delta/56/drop_unused_event_tables.sql
new file mode 100644
index 0000000000..9f09922c67
--- /dev/null
+++ b/synapse/storage/schema/delta/56/drop_unused_event_tables.sql
@@ -0,0 +1,20 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- these tables are never used.
+DROP TABLE IF EXISTS room_names;
+DROP TABLE IF EXISTS topics;
+DROP TABLE IF EXISTS history_visibility;
+DROP TABLE IF EXISTS guest_access;
diff --git a/synapse/storage/schema/delta/56/fix_room_keys_index.sql b/synapse/storage/schema/delta/56/fix_room_keys_index.sql
new file mode 100644
index 0000000000..014cb3b538
--- /dev/null
+++ b/synapse/storage/schema/delta/56/fix_room_keys_index.sql
@@ -0,0 +1,18 @@
+/* Copyright 2019 Matrix.org Foundation CIC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- version is supposed to be part of the room keys index
+CREATE UNIQUE INDEX e2e_room_keys_with_version_idx ON e2e_room_keys(user_id, version, room_id, session_id);
+DROP INDEX IF EXISTS e2e_room_keys_idx;
diff --git a/synapse/storage/schema/delta/56/redaction_censor.sql b/synapse/storage/schema/delta/56/redaction_censor.sql
new file mode 100644
index 0000000000..fe51b02309
--- /dev/null
+++ b/synapse/storage/schema/delta/56/redaction_censor.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE redactions ADD COLUMN have_censored BOOL NOT NULL DEFAULT false;
+CREATE INDEX redactions_have_censored ON redactions(event_id) WHERE not have_censored;
diff --git a/synapse/storage/schema/delta/56/stats_separated.sql b/synapse/storage/schema/delta/56/stats_separated.sql
new file mode 100644
index 0000000000..163529c071
--- /dev/null
+++ b/synapse/storage/schema/delta/56/stats_separated.sql
@@ -0,0 +1,152 @@
+/* Copyright 2018 New Vector Ltd
+ * Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+----- First clean up from previous versions of room stats.
+
+-- First remove old stats stuff
+DROP TABLE IF EXISTS room_stats;
+DROP TABLE IF EXISTS room_state;
+DROP TABLE IF EXISTS room_stats_state;
+DROP TABLE IF EXISTS user_stats;
+DROP TABLE IF EXISTS room_stats_earliest_tokens;
+DROP TABLE IF EXISTS _temp_populate_stats_position;
+DROP TABLE IF EXISTS _temp_populate_stats_rooms;
+DROP TABLE IF EXISTS stats_stream_pos;
+
+-- Unschedule old background updates if they're still scheduled
+DELETE FROM background_updates WHERE update_name IN (
+ 'populate_stats_createtables',
+ 'populate_stats_process_rooms',
+ 'populate_stats_process_users',
+ 'populate_stats_cleanup'
+);
+
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_stats_process_rooms', '{}', '');
+
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_stats_process_users', '{}', 'populate_stats_process_rooms');
+
+----- Create tables for our version of room stats.
+
+-- single-row table to track position of incremental updates
+DROP TABLE IF EXISTS stats_incremental_position;
+CREATE TABLE stats_incremental_position (
+ Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
+ stream_id BIGINT NOT NULL,
+ CHECK (Lock='X')
+);
+
+-- insert a null row and make sure it is the only one.
+INSERT INTO stats_incremental_position (
+ stream_id
+) SELECT COALESCE(MAX(stream_ordering), 0) from events;
+
+-- represents PRESENT room statistics for a room
+-- only holds absolute fields
+DROP TABLE IF EXISTS room_stats_current;
+CREATE TABLE room_stats_current (
+ room_id TEXT NOT NULL PRIMARY KEY,
+
+ -- These are absolute counts
+ current_state_events INT NOT NULL,
+ joined_members INT NOT NULL,
+ invited_members INT NOT NULL,
+ left_members INT NOT NULL,
+ banned_members INT NOT NULL,
+
+ local_users_in_room INT NOT NULL,
+
+ -- The maximum delta stream position that this row takes into account.
+ completed_delta_stream_id BIGINT NOT NULL
+);
+
+
+-- represents HISTORICAL room statistics for a room
+DROP TABLE IF EXISTS room_stats_historical;
+CREATE TABLE room_stats_historical (
+ room_id TEXT NOT NULL,
+ -- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms).
+ -- Note that end_ts is quantised.
+ end_ts BIGINT NOT NULL,
+ bucket_size BIGINT NOT NULL,
+
+ -- These stats are absolute counts
+ current_state_events BIGINT NOT NULL,
+ joined_members BIGINT NOT NULL,
+ invited_members BIGINT NOT NULL,
+ left_members BIGINT NOT NULL,
+ banned_members BIGINT NOT NULL,
+ local_users_in_room BIGINT NOT NULL,
+
+ -- These stats are per time slice
+ total_events BIGINT NOT NULL,
+ total_event_bytes BIGINT NOT NULL,
+
+ PRIMARY KEY (room_id, end_ts)
+);
+
+-- We use this index to speed up deletion of ancient room stats.
+CREATE INDEX room_stats_historical_end_ts ON room_stats_historical (end_ts);
+
+-- represents PRESENT statistics for a user
+-- only holds absolute fields
+DROP TABLE IF EXISTS user_stats_current;
+CREATE TABLE user_stats_current (
+ user_id TEXT NOT NULL PRIMARY KEY,
+
+ joined_rooms BIGINT NOT NULL,
+
+ -- The maximum delta stream position that this row takes into account.
+ completed_delta_stream_id BIGINT NOT NULL
+);
+
+-- represents HISTORICAL statistics for a user
+DROP TABLE IF EXISTS user_stats_historical;
+CREATE TABLE user_stats_historical (
+ user_id TEXT NOT NULL,
+ end_ts BIGINT NOT NULL,
+ bucket_size BIGINT NOT NULL,
+
+ joined_rooms BIGINT NOT NULL,
+
+ invites_sent BIGINT NOT NULL,
+ rooms_created BIGINT NOT NULL,
+ total_events BIGINT NOT NULL,
+ total_event_bytes BIGINT NOT NULL,
+
+ PRIMARY KEY (user_id, end_ts)
+);
+
+-- We use this index to speed up deletion of ancient user stats.
+CREATE INDEX user_stats_historical_end_ts ON user_stats_historical (end_ts);
+
+
+CREATE TABLE room_stats_state (
+ room_id TEXT NOT NULL,
+ name TEXT,
+ canonical_alias TEXT,
+ join_rules TEXT,
+ history_visibility TEXT,
+ encryption TEXT,
+ avatar TEXT,
+ guest_access TEXT,
+ is_federatable BOOLEAN,
+ topic TEXT
+);
+
+CREATE UNIQUE INDEX room_stats_state_room ON room_stats_state(room_id);
diff --git a/synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql b/synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql
new file mode 100644
index 0000000000..149f8be8b6
--- /dev/null
+++ b/synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 Matrix.org Foundation CIC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- this was apparently forgotten when the table was created back in delta 53.
+CREATE INDEX users_in_public_rooms_r_idx ON users_in_public_rooms(room_id);
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index e13efed417..09190d684e 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2018, 2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,17 +15,22 @@
# limitations under the License.
import logging
+from itertools import chain
from twisted.internet import defer
+from twisted.internet.defer import DeferredLock
from synapse.api.constants import EventTypes, Membership
-from synapse.storage.prepare_database import get_statements
+from synapse.storage import PostgresEngine
from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
# these fields track absolutes (e.g. total number of rooms on the server)
+# You can think of these as Prometheus Gauges.
+# You can draw these stats on a line graph.
+# Example: number of users in a room
ABSOLUTE_STATS_FIELDS = {
"room": (
"current_state_events",
@@ -32,14 +38,23 @@ ABSOLUTE_STATS_FIELDS = {
"invited_members",
"left_members",
"banned_members",
- "state_events",
+ "local_users_in_room",
),
- "user": ("public_rooms", "private_rooms"),
+ "user": ("joined_rooms",),
}
-TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+# these fields are per-timeslice and so should be reset to 0 upon a new slice
+# You can draw these stats on a histogram.
+# Example: number of events sent locally during a time slice
+PER_SLICE_FIELDS = {
+ "room": ("total_events", "total_event_bytes"),
+ "user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"),
+}
+
+TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
-TEMP_TABLE = "_temp_populate_stats"
+# these are the tables (& ID columns) which contain our actual subjects
+TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")}
class StatsStore(StateDeltasStore):
@@ -51,136 +66,102 @@ class StatsStore(StateDeltasStore):
self.stats_enabled = hs.config.stats_enabled
self.stats_bucket_size = hs.config.stats_bucket_size
- self.register_background_update_handler(
- "populate_stats_createtables", self._populate_stats_createtables
- )
+ self.stats_delta_processing_lock = DeferredLock()
+
self.register_background_update_handler(
"populate_stats_process_rooms", self._populate_stats_process_rooms
)
self.register_background_update_handler(
- "populate_stats_cleanup", self._populate_stats_cleanup
+ "populate_stats_process_users", self._populate_stats_process_users
)
+ # we no longer need to perform clean-up, but we will give ourselves
+ # the potential to reintroduce it in the future – so documentation
+ # will still encourage the use of this no-op handler.
+ self.register_noop_background_update("populate_stats_cleanup")
+ self.register_noop_background_update("populate_stats_prepare")
- @defer.inlineCallbacks
- def _populate_stats_createtables(self, progress, batch_size):
-
- if not self.stats_enabled:
- yield self._end_background_update("populate_stats_createtables")
- return 1
-
- # Get all the rooms that we want to process.
- def _make_staging_area(txn):
- # Create the temporary tables
- stmts = get_statements(
- """
- -- We just recreate the table, we'll be reinserting the
- -- correct entries again later anyway.
- DROP TABLE IF EXISTS {temp}_rooms;
-
- CREATE TABLE IF NOT EXISTS {temp}_rooms(
- room_id TEXT NOT NULL,
- events BIGINT NOT NULL
- );
-
- CREATE INDEX {temp}_rooms_events
- ON {temp}_rooms(events);
- CREATE INDEX {temp}_rooms_id
- ON {temp}_rooms(room_id);
- """.format(
- temp=TEMP_TABLE
- ).splitlines()
- )
-
- for statement in stmts:
- txn.execute(statement)
-
- sql = (
- "CREATE TABLE IF NOT EXISTS "
- + TEMP_TABLE
- + "_position(position TEXT NOT NULL)"
- )
- txn.execute(sql)
-
- # Get rooms we want to process from the database, only adding
- # those that we haven't (i.e. those not in room_stats_earliest_token)
- sql = """
- INSERT INTO %s_rooms (room_id, events)
- SELECT c.room_id, count(*) FROM current_state_events AS c
- LEFT JOIN room_stats_earliest_token AS t USING (room_id)
- WHERE t.room_id IS NULL
- GROUP BY c.room_id
- """ % (
- TEMP_TABLE,
- )
- txn.execute(sql)
+ def quantise_stats_time(self, ts):
+ """
+ Quantises a timestamp to be a multiple of the bucket size.
- new_pos = yield self.get_max_stream_id_in_current_state_deltas()
- yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
- yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
- self.get_earliest_token_for_room_stats.invalidate_all()
+ Args:
+ ts (int): the timestamp to quantise, in milliseconds since the Unix
+ Epoch
- yield self._end_background_update("populate_stats_createtables")
- return 1
+ Returns:
+ int: a timestamp which
+ - is divisible by the bucket size;
+ - is no later than `ts`; and
+ - is the largest such timestamp.
+ """
+ return (ts // self.stats_bucket_size) * self.stats_bucket_size
@defer.inlineCallbacks
- def _populate_stats_cleanup(self, progress, batch_size):
+ def _populate_stats_process_users(self, progress, batch_size):
"""
- Update the user directory stream position, then clean up the old tables.
+ This is a background update which regenerates statistics for users.
"""
if not self.stats_enabled:
- yield self._end_background_update("populate_stats_cleanup")
+ yield self._end_background_update("populate_stats_process_users")
return 1
- position = yield self._simple_select_one_onecol(
- TEMP_TABLE + "_position", None, "position"
+ last_user_id = progress.get("last_user_id", "")
+
+ def _get_next_batch(txn):
+ sql = """
+ SELECT DISTINCT name FROM users
+ WHERE name > ?
+ ORDER BY name ASC
+ LIMIT ?
+ """
+ txn.execute(sql, (last_user_id, batch_size))
+ return [r for r, in txn]
+
+ users_to_work_on = yield self.runInteraction(
+ "_populate_stats_process_users", _get_next_batch
)
- yield self.update_stats_stream_pos(position)
- def _delete_staging_area(txn):
- txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
- txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
+ # No more rooms -- complete the transaction.
+ if not users_to_work_on:
+ yield self._end_background_update("populate_stats_process_users")
+ return 1
- yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
+ for user_id in users_to_work_on:
+ yield self._calculate_and_set_initial_state_for_user(user_id)
+ progress["last_user_id"] = user_id
- yield self._end_background_update("populate_stats_cleanup")
- return 1
+ yield self.runInteraction(
+ "populate_stats_process_users",
+ self._background_update_progress_txn,
+ "populate_stats_process_users",
+ progress,
+ )
+
+ return len(users_to_work_on)
@defer.inlineCallbacks
def _populate_stats_process_rooms(self, progress, batch_size):
-
+ """
+ This is a background update which regenerates statistics for rooms.
+ """
if not self.stats_enabled:
yield self._end_background_update("populate_stats_process_rooms")
return 1
- # If we don't have progress filed, delete everything.
- if not progress:
- yield self.delete_all_stats()
+ last_room_id = progress.get("last_room_id", "")
def _get_next_batch(txn):
- # Only fetch 250 rooms, so we don't fetch too many at once, even
- # if those 250 rooms have less than batch_size state events.
sql = """
- SELECT room_id, events FROM %s_rooms
- ORDER BY events DESC
- LIMIT 250
- """ % (
- TEMP_TABLE,
- )
- txn.execute(sql)
- rooms_to_work_on = txn.fetchall()
-
- if not rooms_to_work_on:
- return None
-
- # Get how many are left to process, so we can give status on how
- # far we are in processing
- txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
- progress["remaining"] = txn.fetchone()[0]
-
- return rooms_to_work_on
+ SELECT DISTINCT room_id FROM current_state_events
+ WHERE room_id > ?
+ ORDER BY room_id ASC
+ LIMIT ?
+ """
+ txn.execute(sql, (last_room_id, batch_size))
+ return [r for r, in txn]
rooms_to_work_on = yield self.runInteraction(
- "populate_stats_temp_read", _get_next_batch
+ "populate_stats_rooms_get_batch", _get_next_batch
)
# No more rooms -- complete the transaction.
@@ -188,154 +169,28 @@ class StatsStore(StateDeltasStore):
yield self._end_background_update("populate_stats_process_rooms")
return 1
- logger.info(
- "Processing the next %d rooms of %d remaining",
- len(rooms_to_work_on),
- progress["remaining"],
- )
-
- # Number of state events we've processed by going through each room
- processed_event_count = 0
-
- for room_id, event_count in rooms_to_work_on:
-
- current_state_ids = yield self.get_current_state_ids(room_id)
-
- join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
- history_visibility_id = current_state_ids.get(
- (EventTypes.RoomHistoryVisibility, "")
- )
- encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
- name_id = current_state_ids.get((EventTypes.Name, ""))
- topic_id = current_state_ids.get((EventTypes.Topic, ""))
- avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
- canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
-
- event_ids = [
- join_rules_id,
- history_visibility_id,
- encryption_id,
- name_id,
- topic_id,
- avatar_id,
- canonical_alias_id,
- ]
-
- state_events = yield self.get_events(
- [ev for ev in event_ids if ev is not None]
- )
-
- def _get_or_none(event_id, arg):
- event = state_events.get(event_id)
- if event:
- return event.content.get(arg)
- return None
-
- yield self.update_room_state(
- room_id,
- {
- "join_rules": _get_or_none(join_rules_id, "join_rule"),
- "history_visibility": _get_or_none(
- history_visibility_id, "history_visibility"
- ),
- "encryption": _get_or_none(encryption_id, "algorithm"),
- "name": _get_or_none(name_id, "name"),
- "topic": _get_or_none(topic_id, "topic"),
- "avatar": _get_or_none(avatar_id, "url"),
- "canonical_alias": _get_or_none(canonical_alias_id, "alias"),
- },
- )
-
- now = self.hs.get_reactor().seconds()
-
- # quantise time to the nearest bucket
- now = (now // self.stats_bucket_size) * self.stats_bucket_size
-
- def _fetch_data(txn):
-
- # Get the current token of the room
- current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
-
- current_state_events = len(current_state_ids)
-
- membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
-
- total_state_events = self._get_total_state_event_counts_txn(
- txn, room_id
- )
-
- self._update_stats_txn(
- txn,
- "room",
- room_id,
- now,
- {
- "bucket_size": self.stats_bucket_size,
- "current_state_events": current_state_events,
- "joined_members": membership_counts.get(Membership.JOIN, 0),
- "invited_members": membership_counts.get(Membership.INVITE, 0),
- "left_members": membership_counts.get(Membership.LEAVE, 0),
- "banned_members": membership_counts.get(Membership.BAN, 0),
- "state_events": total_state_events,
- },
- )
- self._simple_insert_txn(
- txn,
- "room_stats_earliest_token",
- {"room_id": room_id, "token": current_token},
- )
-
- # We've finished a room. Delete it from the table.
- self._simple_delete_one_txn(
- txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
- )
+ for room_id in rooms_to_work_on:
+ yield self._calculate_and_set_initial_state_for_room(room_id)
+ progress["last_room_id"] = room_id
- yield self.runInteraction("update_room_stats", _fetch_data)
-
- # Update the remaining counter.
- progress["remaining"] -= 1
- yield self.runInteraction(
- "populate_stats",
- self._background_update_progress_txn,
- "populate_stats_process_rooms",
- progress,
- )
-
- processed_event_count += event_count
-
- if processed_event_count > batch_size:
- # Don't process any more rooms, we've hit our batch size.
- return processed_event_count
+ yield self.runInteraction(
+ "_populate_stats_process_rooms",
+ self._background_update_progress_txn,
+ "populate_stats_process_rooms",
+ progress,
+ )
- return processed_event_count
+ return len(rooms_to_work_on)
- def delete_all_stats(self):
+ def get_stats_positions(self):
"""
- Delete all statistics records.
+ Returns the stats processor positions.
"""
-
- def _delete_all_stats_txn(txn):
- txn.execute("DELETE FROM room_state")
- txn.execute("DELETE FROM room_stats")
- txn.execute("DELETE FROM room_stats_earliest_token")
- txn.execute("DELETE FROM user_stats")
-
- return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
-
- def get_stats_stream_pos(self):
return self._simple_select_one_onecol(
- table="stats_stream_pos",
+ table="stats_incremental_position",
keyvalues={},
retcol="stream_id",
- desc="stats_stream_pos",
- )
-
- def update_stats_stream_pos(self, stream_id):
- return self._simple_update_one(
- table="stats_stream_pos",
- keyvalues={},
- updatevalues={"stream_id": stream_id},
- desc="update_stats_stream_pos",
+ desc="stats_incremental_position",
)
def update_room_state(self, room_id, fields):
@@ -361,42 +216,87 @@ class StatsStore(StateDeltasStore):
fields[col] = None
return self._simple_upsert(
- table="room_state",
+ table="room_stats_state",
keyvalues={"room_id": room_id},
values=fields,
desc="update_room_state",
)
- def get_deltas_for_room(self, room_id, start, size=100):
+ def get_statistics_for_subject(self, stats_type, stats_id, start, size=100):
"""
- Get statistics deltas for a given room.
+ Get statistics for a given subject.
Args:
- room_id (str)
+ stats_type (str): The type of subject
+ stats_id (str): The ID of the subject (e.g. room_id or user_id)
start (int): Pagination start. Number of entries, not timestamp.
size (int): How many entries to return.
Returns:
Deferred[list[dict]], where the dict has the keys of
- ABSOLUTE_STATS_FIELDS["room"] and "ts".
+ ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts".
"""
- return self._simple_select_list_paginate(
- "room_stats",
- {"room_id": room_id},
- "ts",
+ return self.runInteraction(
+ "get_statistics_for_subject",
+ self._get_statistics_for_subject_txn,
+ stats_type,
+ stats_id,
start,
size,
- retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
+ )
+
+ def _get_statistics_for_subject_txn(
+ self, txn, stats_type, stats_id, start, size=100
+ ):
+ """
+ Transaction-bound version of L{get_statistics_for_subject}.
+ """
+
+ table, id_col = TYPE_TO_TABLE[stats_type]
+ selected_columns = list(
+ ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
+ )
+
+ slice_list = self._simple_select_list_paginate_txn(
+ txn,
+ table + "_historical",
+ {id_col: stats_id},
+ "end_ts",
+ start,
+ size,
+ retcols=selected_columns + ["bucket_size", "end_ts"],
order_direction="DESC",
)
- def get_all_room_state(self):
- return self._simple_select_list(
- "room_state", None, retcols=("name", "topic", "canonical_alias")
+ return slice_list
+
+ def get_room_stats_state(self, room_id):
+ """
+ Returns the current room_stats_state for a room.
+
+ Args:
+ room_id (str): The ID of the room to return state for.
+
+ Returns (dict):
+ Dictionary containing these keys:
+ "name", "topic", "canonical_alias", "avatar", "join_rules",
+ "history_visibility"
+ """
+ return self._simple_select_one(
+ "room_stats_state",
+ {"room_id": room_id},
+ retcols=(
+ "name",
+ "topic",
+ "canonical_alias",
+ "avatar",
+ "join_rules",
+ "history_visibility",
+ ),
)
@cached()
- def get_earliest_token_for_room_stats(self, room_id):
+ def get_earliest_token_for_stats(self, stats_type, id):
"""
Fetch the "earliest token". This is used by the room stats delta
processor to ignore deltas that have been processed between the
@@ -406,79 +306,573 @@ class StatsStore(StateDeltasStore):
Returns:
Deferred[int]
"""
+ table, id_col = TYPE_TO_TABLE[stats_type]
+
return self._simple_select_one_onecol(
- "room_stats_earliest_token",
- {"room_id": room_id},
- retcol="token",
+ "%s_current" % (table,),
+ keyvalues={id_col: id},
+ retcol="completed_delta_stream_id",
allow_none=True,
)
- def update_stats(self, stats_type, stats_id, ts, fields):
- table, id_col = TYPE_TO_ROOM[stats_type]
- return self._simple_upsert(
- table=table,
- keyvalues={id_col: stats_id, "ts": ts},
- values=fields,
- desc="update_stats",
+ def bulk_update_stats_delta(self, ts, updates, stream_id):
+ """Bulk update stats tables for a given stream_id and updates the stats
+ incremental position.
+
+ Args:
+ ts (int): Current timestamp in ms
+ updates(dict[str, dict[str, dict[str, Counter]]]): The updates to
+ commit as a mapping stats_type -> stats_id -> field -> delta.
+ stream_id (int): Current position.
+
+ Returns:
+ Deferred
+ """
+
+ def _bulk_update_stats_delta_txn(txn):
+ for stats_type, stats_updates in updates.items():
+ for stats_id, fields in stats_updates.items():
+ self._update_stats_delta_txn(
+ txn,
+ ts=ts,
+ stats_type=stats_type,
+ stats_id=stats_id,
+ fields=fields,
+ complete_with_stream_id=stream_id,
+ )
+
+ self._simple_update_one_txn(
+ txn,
+ table="stats_incremental_position",
+ keyvalues={},
+ updatevalues={"stream_id": stream_id},
+ )
+
+ return self.runInteraction(
+ "bulk_update_stats_delta", _bulk_update_stats_delta_txn
)
- def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
- table, id_col = TYPE_TO_ROOM[stats_type]
- return self._simple_upsert_txn(
- txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
+ def update_stats_delta(
+ self,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id,
+ absolute_field_overrides=None,
+ ):
+ """
+ Updates the statistics for a subject, with a delta (difference/relative
+ change).
+
+ Args:
+ ts (int): timestamp of the change
+ stats_type (str): "room" or "user" – the kind of subject
+ stats_id (str): the subject's ID (room ID or user ID)
+ fields (dict[str, int]): Deltas of stats values.
+ complete_with_stream_id (int, optional):
+ If supplied, converts an incomplete row into a complete row,
+ with the supplied stream_id marked as the stream_id where the
+ row was completed.
+ absolute_field_overrides (dict[str, int]): Current stats values
+ (i.e. not deltas) of absolute fields.
+ Does not work with per-slice fields.
+ """
+
+ return self.runInteraction(
+ "update_stats_delta",
+ self._update_stats_delta_txn,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id=complete_with_stream_id,
+ absolute_field_overrides=absolute_field_overrides,
)
- def update_stats_delta(self, ts, stats_type, stats_id, field, value):
- def _update_stats_delta(txn):
- table, id_col = TYPE_TO_ROOM[stats_type]
-
- sql = (
- "SELECT * FROM %s"
- " WHERE %s=? and ts=("
- " SELECT MAX(ts) FROM %s"
- " WHERE %s=?"
- ")"
- ) % (table, id_col, table, id_col)
- txn.execute(sql, (stats_id, stats_id))
- rows = self.cursor_to_dict(txn)
- if len(rows) == 0:
- # silently skip as we don't have anything to apply a delta to yet.
- # this tries to minimise any race between the initial sync and
- # subsequent deltas arriving.
- return
-
- current_ts = ts
- latest_ts = rows[0]["ts"]
- if current_ts < latest_ts:
- # This one is in the past, but we're just encountering it now.
- # Mark it as part of the current bucket.
- current_ts = latest_ts
- elif ts != latest_ts:
- # we have to copy our absolute counters over to the new entry.
- values = {
- key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
- }
- values[id_col] = stats_id
- values["ts"] = ts
- values["bucket_size"] = self.stats_bucket_size
-
- self._simple_insert_txn(txn, table=table, values=values)
-
- # actually update the new value
- if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
- self._simple_update_txn(
- txn,
- table=table,
- keyvalues={id_col: stats_id, "ts": current_ts},
- updatevalues={field: value},
+ def _update_stats_delta_txn(
+ self,
+ txn,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id,
+ absolute_field_overrides=None,
+ ):
+ if absolute_field_overrides is None:
+ absolute_field_overrides = {}
+
+ table, id_col = TYPE_TO_TABLE[stats_type]
+
+ quantised_ts = self.quantise_stats_time(int(ts))
+ end_ts = quantised_ts + self.stats_bucket_size
+
+ # Lets be paranoid and check that all the given field names are known
+ abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
+ slice_field_names = PER_SLICE_FIELDS[stats_type]
+ for field in chain(fields.keys(), absolute_field_overrides.keys()):
+ if field not in abs_field_names and field not in slice_field_names:
+ # guard against potential SQL injection dodginess
+ raise ValueError(
+ "%s is not a recognised field"
+ " for stats type %s" % (field, stats_type)
)
+
+ # Per slice fields do not get added to the _current table
+
+ # This calculates the deltas (`field = field + ?` values)
+ # for absolute fields,
+ # * defaulting to 0 if not specified
+ # (required for the INSERT part of upserting to work)
+ # * omitting overrides specified in `absolute_field_overrides`
+ deltas_of_absolute_fields = {
+ key: fields.get(key, 0)
+ for key in abs_field_names
+ if key not in absolute_field_overrides
+ }
+
+ # Keep the delta stream ID field up to date
+ absolute_field_overrides = absolute_field_overrides.copy()
+ absolute_field_overrides["completed_delta_stream_id"] = complete_with_stream_id
+
+ # first upsert the `_current` table
+ self._upsert_with_additive_relatives_txn(
+ txn=txn,
+ table=table + "_current",
+ keyvalues={id_col: stats_id},
+ absolutes=absolute_field_overrides,
+ additive_relatives=deltas_of_absolute_fields,
+ )
+
+ per_slice_additive_relatives = {
+ key: fields.get(key, 0) for key in slice_field_names
+ }
+ self._upsert_copy_from_table_with_additive_relatives_txn(
+ txn=txn,
+ into_table=table + "_historical",
+ keyvalues={id_col: stats_id},
+ extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
+ extra_dst_keyvalues={"end_ts": end_ts},
+ additive_relatives=per_slice_additive_relatives,
+ src_table=table + "_current",
+ copy_columns=abs_field_names,
+ )
+
+ def _upsert_with_additive_relatives_txn(
+ self, txn, table, keyvalues, absolutes, additive_relatives
+ ):
+ """Used to update values in the stats tables.
+
+ This is basically a slightly convoluted upsert that *adds* to any
+ existing rows.
+
+ Args:
+ txn
+ table (str): Table name
+ keyvalues (dict[str, any]): Row-identifying key values
+ absolutes (dict[str, any]): Absolute (set) fields
+ additive_relatives (dict[str, int]): Fields that will be added onto
+ if existing row present.
+ """
+ if self.database_engine.can_native_upsert:
+ absolute_updates = [
+ "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+ for field in absolutes.keys()
+ ]
+
+ relative_updates = [
+ "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+ % {"table": table, "field": field}
+ for field in additive_relatives.keys()
+ ]
+
+ insert_cols = []
+ qargs = []
+
+ for (key, val) in chain(
+ keyvalues.items(), absolutes.items(), additive_relatives.items()
+ ):
+ insert_cols.append(key)
+ qargs.append(val)
+
+ sql = """
+ INSERT INTO %(table)s (%(insert_cols_cs)s)
+ VALUES (%(insert_vals_qs)s)
+ ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
+ """ % {
+ "table": table,
+ "insert_cols_cs": ", ".join(insert_cols),
+ "insert_vals_qs": ", ".join(
+ ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+ ),
+ "key_columns": ", ".join(keyvalues),
+ "updates": ", ".join(chain(absolute_updates, relative_updates)),
+ }
+
+ txn.execute(sql, qargs)
+ else:
+ self.database_engine.lock_table(txn, table)
+ retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
+ current_row = self._simple_select_one_txn(
+ txn, table, keyvalues, retcols, allow_none=True
+ )
+ if current_row is None:
+ merged_dict = {**keyvalues, **absolutes, **additive_relatives}
+ self._simple_insert_txn(txn, table, merged_dict)
else:
- sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
- table,
- field,
- field,
- id_col,
+ for (key, val) in additive_relatives.items():
+ current_row[key] += val
+ current_row.update(absolutes)
+ self._simple_update_one_txn(txn, table, keyvalues, current_row)
+
+ def _upsert_copy_from_table_with_additive_relatives_txn(
+ self,
+ txn,
+ into_table,
+ keyvalues,
+ extra_dst_keyvalues,
+ extra_dst_insvalues,
+ additive_relatives,
+ src_table,
+ copy_columns,
+ ):
+ """Updates the historic stats table with latest updates.
+
+ This involves copying "absolute" fields from the `_current` table, and
+ adding relative fields to any existing values.
+
+ Args:
+ txn: Transaction
+ into_table (str): The destination table to UPSERT the row into
+ keyvalues (dict[str, any]): Row-identifying key values
+ extra_dst_keyvalues (dict[str, any]): Additional keyvalues
+ for `into_table`.
+ extra_dst_insvalues (dict[str, any]): Additional values to insert
+ on new row creation for `into_table`.
+ additive_relatives (dict[str, any]): Fields that will be added onto
+ if existing row present. (Must be disjoint from copy_columns.)
+ src_table (str): The source table to copy from
+ copy_columns (iterable[str]): The list of columns to copy
+ """
+ if self.database_engine.can_native_upsert:
+ ins_columns = chain(
+ keyvalues,
+ copy_columns,
+ additive_relatives,
+ extra_dst_keyvalues,
+ extra_dst_insvalues,
+ )
+ sel_exprs = chain(
+ keyvalues,
+ copy_columns,
+ (
+ "?"
+ for _ in chain(
+ additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
+ )
+ ),
+ )
+ keyvalues_where = ("%s = ?" % f for f in keyvalues)
+
+ sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
+ sets_ar = (
+ "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
+ for f in additive_relatives
+ )
+
+ sql = """
+ INSERT INTO %(into_table)s (%(ins_columns)s)
+ SELECT %(sel_exprs)s
+ FROM %(src_table)s
+ WHERE %(keyvalues_where)s
+ ON CONFLICT (%(keyvalues)s)
+ DO UPDATE SET %(sets)s
+ """ % {
+ "into_table": into_table,
+ "ins_columns": ", ".join(ins_columns),
+ "sel_exprs": ", ".join(sel_exprs),
+ "keyvalues_where": " AND ".join(keyvalues_where),
+ "src_table": src_table,
+ "keyvalues": ", ".join(
+ chain(keyvalues.keys(), extra_dst_keyvalues.keys())
+ ),
+ "sets": ", ".join(chain(sets_cc, sets_ar)),
+ }
+
+ qargs = list(
+ chain(
+ additive_relatives.values(),
+ extra_dst_keyvalues.values(),
+ extra_dst_insvalues.values(),
+ keyvalues.values(),
)
- txn.execute(sql, (value, stats_id, current_ts))
+ )
+ txn.execute(sql, qargs)
+ else:
+ self.database_engine.lock_table(txn, into_table)
+ src_row = self._simple_select_one_txn(
+ txn, src_table, keyvalues, copy_columns
+ )
+ all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
+ dest_current_row = self._simple_select_one_txn(
+ txn,
+ into_table,
+ keyvalues=all_dest_keyvalues,
+ retcols=list(chain(additive_relatives.keys(), copy_columns)),
+ allow_none=True,
+ )
+
+ if dest_current_row is None:
+ merged_dict = {
+ **keyvalues,
+ **extra_dst_keyvalues,
+ **extra_dst_insvalues,
+ **src_row,
+ **additive_relatives,
+ }
+ self._simple_insert_txn(txn, into_table, merged_dict)
+ else:
+ for (key, val) in additive_relatives.items():
+ src_row[key] = dest_current_row[key] + val
+ self._simple_update_txn(txn, into_table, all_dest_keyvalues, src_row)
+
+ def get_changes_room_total_events_and_bytes(self, min_pos, max_pos):
+ """Fetches the counts of events in the given range of stream IDs.
+
+ Args:
+ min_pos (int)
+ max_pos (int)
+
+ Returns:
+ Deferred[dict[str, dict[str, int]]]: Mapping of room ID to field
+ changes.
+ """
+
+ return self.runInteraction(
+ "stats_incremental_total_events_and_bytes",
+ self.get_changes_room_total_events_and_bytes_txn,
+ min_pos,
+ max_pos,
+ )
- return self.runInteraction("update_stats_delta", _update_stats_delta)
+ def get_changes_room_total_events_and_bytes_txn(self, txn, low_pos, high_pos):
+ """Gets the total_events and total_event_bytes counts for rooms and
+ senders, in a range of stream_orderings (including backfilled events).
+
+ Args:
+ txn
+ low_pos (int): Low stream ordering
+ high_pos (int): High stream ordering
+
+ Returns:
+ tuple[dict[str, dict[str, int]], dict[str, dict[str, int]]]: The
+ room and user deltas for total_events/total_event_bytes in the
+ format of `stats_id` -> fields
+ """
+
+ if low_pos >= high_pos:
+ # nothing to do here.
+ return {}, {}
+
+ if isinstance(self.database_engine, PostgresEngine):
+ new_bytes_expression = "OCTET_LENGTH(json)"
+ else:
+ new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
+
+ sql = """
+ SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
+ FROM events INNER JOIN event_json USING (event_id)
+ WHERE (? < stream_ordering AND stream_ordering <= ?)
+ OR (? <= stream_ordering AND stream_ordering <= ?)
+ GROUP BY events.room_id
+ """ % (
+ new_bytes_expression,
+ )
+
+ txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
+
+ room_deltas = {
+ room_id: {"total_events": new_events, "total_event_bytes": new_bytes}
+ for room_id, new_events, new_bytes in txn
+ }
+
+ sql = """
+ SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes
+ FROM events INNER JOIN event_json USING (event_id)
+ WHERE (? < stream_ordering AND stream_ordering <= ?)
+ OR (? <= stream_ordering AND stream_ordering <= ?)
+ GROUP BY events.sender
+ """ % (
+ new_bytes_expression,
+ )
+
+ txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
+
+ user_deltas = {
+ user_id: {"total_events": new_events, "total_event_bytes": new_bytes}
+ for user_id, new_events, new_bytes in txn
+ if self.hs.is_mine_id(user_id)
+ }
+
+ return room_deltas, user_deltas
+
+ @defer.inlineCallbacks
+ def _calculate_and_set_initial_state_for_room(self, room_id):
+ """Calculate and insert an entry into room_stats_current.
+
+ Args:
+ room_id (str)
+
+ Returns:
+ Deferred[tuple[dict, dict, int]]: A tuple of room state, membership
+ counts and stream position.
+ """
+
+ def _fetch_current_state_stats(txn):
+ pos = self.get_room_max_stream_ordering()
+
+ rows = self._simple_select_many_txn(
+ txn,
+ table="current_state_events",
+ column="type",
+ iterable=[
+ EventTypes.Create,
+ EventTypes.JoinRules,
+ EventTypes.RoomHistoryVisibility,
+ EventTypes.Encryption,
+ EventTypes.Name,
+ EventTypes.Topic,
+ EventTypes.RoomAvatar,
+ EventTypes.CanonicalAlias,
+ ],
+ keyvalues={"room_id": room_id, "state_key": ""},
+ retcols=["event_id"],
+ )
+
+ event_ids = [row["event_id"] for row in rows]
+
+ txn.execute(
+ """
+ SELECT membership, count(*) FROM current_state_events
+ WHERE room_id = ? AND type = 'm.room.member'
+ GROUP BY membership
+ """,
+ (room_id,),
+ )
+ membership_counts = {membership: cnt for membership, cnt in txn}
+
+ txn.execute(
+ """
+ SELECT COALESCE(count(*), 0) FROM current_state_events
+ WHERE room_id = ?
+ """,
+ (room_id,),
+ )
+
+ current_state_events_count, = txn.fetchone()
+
+ users_in_room = self.get_users_in_room_txn(txn, room_id)
+
+ return (
+ event_ids,
+ membership_counts,
+ current_state_events_count,
+ users_in_room,
+ pos,
+ )
+
+ (
+ event_ids,
+ membership_counts,
+ current_state_events_count,
+ users_in_room,
+ pos,
+ ) = yield self.runInteraction(
+ "get_initial_state_for_room", _fetch_current_state_stats
+ )
+
+ state_event_map = yield self.get_events(event_ids, get_prev_content=False)
+
+ room_state = {
+ "join_rules": None,
+ "history_visibility": None,
+ "encryption": None,
+ "name": None,
+ "topic": None,
+ "avatar": None,
+ "canonical_alias": None,
+ "is_federatable": True,
+ }
+
+ for event in state_event_map.values():
+ if event.type == EventTypes.JoinRules:
+ room_state["join_rules"] = event.content.get("join_rule")
+ elif event.type == EventTypes.RoomHistoryVisibility:
+ room_state["history_visibility"] = event.content.get(
+ "history_visibility"
+ )
+ elif event.type == EventTypes.Encryption:
+ room_state["encryption"] = event.content.get("algorithm")
+ elif event.type == EventTypes.Name:
+ room_state["name"] = event.content.get("name")
+ elif event.type == EventTypes.Topic:
+ room_state["topic"] = event.content.get("topic")
+ elif event.type == EventTypes.RoomAvatar:
+ room_state["avatar"] = event.content.get("url")
+ elif event.type == EventTypes.CanonicalAlias:
+ room_state["canonical_alias"] = event.content.get("alias")
+ elif event.type == EventTypes.Create:
+ room_state["is_federatable"] = (
+ event.content.get("m.federate", True) is True
+ )
+
+ yield self.update_room_state(room_id, room_state)
+
+ local_users_in_room = [u for u in users_in_room if self.hs.is_mine_id(u)]
+
+ yield self.update_stats_delta(
+ ts=self.clock.time_msec(),
+ stats_type="room",
+ stats_id=room_id,
+ fields={},
+ complete_with_stream_id=pos,
+ absolute_field_overrides={
+ "current_state_events": current_state_events_count,
+ "joined_members": membership_counts.get(Membership.JOIN, 0),
+ "invited_members": membership_counts.get(Membership.INVITE, 0),
+ "left_members": membership_counts.get(Membership.LEAVE, 0),
+ "banned_members": membership_counts.get(Membership.BAN, 0),
+ "local_users_in_room": len(local_users_in_room),
+ },
+ )
+
+ @defer.inlineCallbacks
+ def _calculate_and_set_initial_state_for_user(self, user_id):
+ def _calculate_and_set_initial_state_for_user_txn(txn):
+ pos = self._get_max_stream_id_in_current_state_deltas_txn(txn)
+
+ txn.execute(
+ """
+ SELECT COUNT(distinct room_id) FROM current_state_events
+ WHERE type = 'm.room.member' AND state_key = ?
+ AND membership = 'join'
+ """,
+ (user_id,),
+ )
+ count, = txn.fetchone()
+ return count, pos
+
+ joined_rooms, pos = yield self.runInteraction(
+ "calculate_and_set_initial_state_for_user",
+ _calculate_and_set_initial_state_for_user_txn,
+ )
+
+ yield self.update_stats_delta(
+ ts=self.clock.time_msec(),
+ stats_type="user",
+ stats_id=user_id,
+ fields={},
+ complete_with_stream_id=pos,
+ absolute_field_overrides={"joined_rooms": joined_rooms},
+ )
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 856c2ee8d8..490454f19a 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -364,7 +364,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
the chunk of events returned.
"""
if from_key == to_key:
- return ([], from_key)
+ return [], from_key
from_id = RoomStreamToken.parse_stream_token(from_key).stream
to_id = RoomStreamToken.parse_stream_token(to_key).stream
@@ -374,7 +374,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
if not has_changed:
- return ([], from_key)
+ return [], from_key
def f(txn):
sql = (
@@ -407,7 +407,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# get.
key = from_key
- return (ret, key)
+ return ret, key
@defer.inlineCallbacks
def get_membership_changes_for_user(self, user_id, from_key, to_key):
@@ -496,7 +496,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"""
# Allow a zero limit here, and no-op.
if limit == 0:
- return ([], end_token)
+ return [], end_token
end_token = RoomStreamToken.parse(end_token)
@@ -511,7 +511,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# We want to return the results in ascending order.
rows.reverse()
- return (rows, token)
+ return rows, token
def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
"""Gets details of the first event in a room at or after a stream ordering
@@ -783,7 +783,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
events = yield self.get_events_as_list(event_ids)
- return (upper_bound, events)
+ return upper_bound, events
def get_federation_out_pos(self, typ):
return self._simple_select_one_onecol(
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index b3c3bf55bc..289c117396 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -165,7 +165,7 @@ class TransactionStore(SQLBaseStore):
txn,
table="destinations",
keyvalues={"destination": destination},
- retcols=("destination", "retry_last_ts", "retry_interval"),
+ retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"),
allow_none=True,
)
@@ -174,12 +174,15 @@ class TransactionStore(SQLBaseStore):
else:
return None
- def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval):
+ def set_destination_retry_timings(
+ self, destination, failure_ts, retry_last_ts, retry_interval
+ ):
"""Sets the current retry timings for a given destination.
Both timings should be zero if retrying is no longer occuring.
Args:
destination (str)
+ failure_ts (int|None) - when the server started failing (ms since epoch)
retry_last_ts (int) - time of last retry attempt in unix epoch ms
retry_interval (int) - how long until next retry in ms
"""
@@ -189,12 +192,13 @@ class TransactionStore(SQLBaseStore):
"set_destination_retry_timings",
self._set_destination_retry_timings,
destination,
+ failure_ts,
retry_last_ts,
retry_interval,
)
def _set_destination_retry_timings(
- self, txn, destination, retry_last_ts, retry_interval
+ self, txn, destination, failure_ts, retry_last_ts, retry_interval
):
if self.database_engine.can_native_upsert:
@@ -202,9 +206,12 @@ class TransactionStore(SQLBaseStore):
# resetting it) or greater than the existing retry interval.
sql = """
- INSERT INTO destinations (destination, retry_last_ts, retry_interval)
- VALUES (?, ?, ?)
+ INSERT INTO destinations (
+ destination, failure_ts, retry_last_ts, retry_interval
+ )
+ VALUES (?, ?, ?, ?)
ON CONFLICT (destination) DO UPDATE SET
+ failure_ts = EXCLUDED.failure_ts,
retry_last_ts = EXCLUDED.retry_last_ts,
retry_interval = EXCLUDED.retry_interval
WHERE
@@ -212,7 +219,7 @@ class TransactionStore(SQLBaseStore):
OR destinations.retry_interval < EXCLUDED.retry_interval
"""
- txn.execute(sql, (destination, retry_last_ts, retry_interval))
+ txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
return
@@ -225,7 +232,7 @@ class TransactionStore(SQLBaseStore):
txn,
table="destinations",
keyvalues={"destination": destination},
- retcols=("retry_last_ts", "retry_interval"),
+ retcols=("failure_ts", "retry_last_ts", "retry_interval"),
allow_none=True,
)
@@ -235,6 +242,7 @@ class TransactionStore(SQLBaseStore):
table="destinations",
values={
"destination": destination,
+ "failure_ts": failure_ts,
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval,
},
@@ -245,31 +253,12 @@ class TransactionStore(SQLBaseStore):
"destinations",
keyvalues={"destination": destination},
updatevalues={
+ "failure_ts": failure_ts,
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval,
},
)
- def get_destinations_needing_retry(self):
- """Get all destinations which are due a retry for sending a transaction.
-
- Returns:
- list: A list of dicts
- """
-
- return self.runInteraction(
- "get_destinations_needing_retry", self._get_destinations_needing_retry
- )
-
- def _get_destinations_needing_retry(self, txn):
- query = (
- "SELECT * FROM destinations"
- " WHERE retry_last_ts > 0 and retry_next_ts < ?"
- )
-
- txn.execute(query, (self._clock.time_msec(),))
- return self.cursor_to_dict(txn)
-
def _start_cleanup_transactions(self):
return run_as_background_process(
"cleanup_transactions", self._cleanup_transactions
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index f1c8d99419..cbb0a4810a 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -195,6 +195,6 @@ class ChainedIdGenerator(object):
with self._lock:
if self._unfinished_ids:
stream_id, chained_id = self._unfinished_ids[0]
- return (stream_id - 1, chained_id)
+ return stream_id - 1, chained_id
- return (self._current_max, self.chained_generator.get_current_token())
+ return self._current_max, self.chained_generator.get_current_token()
|