diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index 36d01a10af..f83c05df44 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -115,7 +115,7 @@ class EmailConfig(Config):
missing.append("email." + k)
if config.get("public_baseurl") is None:
- missing.append("public_base_url")
+ missing.append("public_baseurl")
if len(missing) > 0:
raise RuntimeError(
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 2d7434fb2f..7cfad192e8 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -18,7 +18,6 @@ import logging
from collections import defaultdict
import six
-from six import raise_from
from six.moves import urllib
import attr
@@ -650,9 +649,10 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
},
)
except (NotRetryingDestination, RequestSendFailed) as e:
- raise_from(KeyLookupError("Failed to connect to remote server"), e)
+ # these both have str() representations which we can't really improve upon
+ raise KeyLookupError(str(e))
except HttpResponseException as e:
- raise_from(KeyLookupError("Remote server returned an error"), e)
+ raise KeyLookupError("Remote server returned an error: %s" % (e,))
keys = {}
added_keys = []
@@ -814,9 +814,11 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
timeout=10000,
)
except (NotRetryingDestination, RequestSendFailed) as e:
- raise_from(KeyLookupError("Failed to connect to remote server"), e)
+ # these both have str() representations which we can't really improve
+ # upon
+ raise KeyLookupError(str(e))
except HttpResponseException as e:
- raise_from(KeyLookupError("Remote server returned an error"), e)
+ raise KeyLookupError("Remote server returned an error: %s" % (e,))
if response["server_name"] != server_name:
raise KeyLookupError(
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index d216c46dfe..05fd49f3c1 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -43,6 +43,7 @@ from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
from synapse.logging.context import nested_logging_context
+from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
from synapse.logging.utils import log_function
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
@@ -507,6 +508,7 @@ class FederationServer(FederationBase):
def on_query_user_devices(self, origin, user_id):
return self.on_query_request("user_devices", user_id)
+ @trace
@defer.inlineCallbacks
@log_function
def on_claim_client_keys(self, origin, content):
@@ -515,6 +517,7 @@ class FederationServer(FederationBase):
for device_id, algorithm in device_keys.items():
query.append((user_id, device_id, algorithm))
+ log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
results = yield self.store.claim_e2e_one_time_keys(query)
json_result = {}
@@ -808,12 +811,13 @@ class FederationHandlerRegistry(object):
if not handler:
logger.warn("No handler registered for EDU type %s", edu_type)
- try:
- yield handler(origin, content)
- except SynapseError as e:
- logger.info("Failed to handle edu %r: %r", edu_type, e)
- except Exception:
- logger.exception("Failed to handle edu %r", edu_type)
+ with start_active_span_from_edu(content, "handle_edu"):
+ try:
+ yield handler(origin, content)
+ except SynapseError as e:
+ logger.info("Failed to handle edu %r: %r", edu_type, e)
+ except Exception:
+ logger.exception("Failed to handle edu %r", edu_type)
def on_query(self, query_type, args):
handler = self.query_handlers.get(query_type)
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 52706302f2..62ca6a3e87 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -14,11 +14,19 @@
# limitations under the License.
import logging
+from canonicaljson import json
+
from twisted.internet import defer
from synapse.api.errors import HttpResponseException
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Transaction
+from synapse.logging.opentracing import (
+ extract_text_map,
+ set_tag,
+ start_active_span_follows_from,
+ tags,
+)
from synapse.util.metrics import measure_func
logger = logging.getLogger(__name__)
@@ -44,93 +52,109 @@ class TransactionManager(object):
@defer.inlineCallbacks
def send_new_transaction(self, destination, pending_pdus, pending_edus):
- # Sort based on the order field
- pending_pdus.sort(key=lambda t: t[1])
- pdus = [x[0] for x in pending_pdus]
- edus = pending_edus
+ # Make a transaction-sending opentracing span. This span follows on from
+ # all the edus in that transaction. This needs to be done since there is
+ # no active span here, so if the edus were not received by the remote the
+ # span would have no causality and it would be forgotten.
+ # The span_contexts is a generator so that it won't be evaluated if
+ # opentracing is disabled. (Yay speed!)
- success = True
+ span_contexts = (
+ extract_text_map(json.loads(edu.get_context())) for edu in pending_edus
+ )
- logger.debug("TX [%s] _attempt_new_transaction", destination)
+ with start_active_span_follows_from("send_transaction", span_contexts):
- txn_id = str(self._next_txn_id)
+ # Sort based on the order field
+ pending_pdus.sort(key=lambda t: t[1])
+ pdus = [x[0] for x in pending_pdus]
+ edus = pending_edus
- logger.debug(
- "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
- destination,
- txn_id,
- len(pdus),
- len(edus),
- )
+ success = True
- transaction = Transaction.create_new(
- origin_server_ts=int(self.clock.time_msec()),
- transaction_id=txn_id,
- origin=self._server_name,
- destination=destination,
- pdus=pdus,
- edus=edus,
- )
+ logger.debug("TX [%s] _attempt_new_transaction", destination)
- self._next_txn_id += 1
+ txn_id = str(self._next_txn_id)
- logger.info(
- "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
- destination,
- txn_id,
- transaction.transaction_id,
- len(pdus),
- len(edus),
- )
+ logger.debug(
+ "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
+ destination,
+ txn_id,
+ len(pdus),
+ len(edus),
+ )
- # Actually send the transaction
-
- # FIXME (erikj): This is a bit of a hack to make the Pdu age
- # keys work
- def json_data_cb():
- data = transaction.get_dict()
- now = int(self.clock.time_msec())
- if "pdus" in data:
- for p in data["pdus"]:
- if "age_ts" in p:
- unsigned = p.setdefault("unsigned", {})
- unsigned["age"] = now - int(p["age_ts"])
- del p["age_ts"]
- return data
-
- try:
- response = yield self._transport_layer.send_transaction(
- transaction, json_data_cb
+ transaction = Transaction.create_new(
+ origin_server_ts=int(self.clock.time_msec()),
+ transaction_id=txn_id,
+ origin=self._server_name,
+ destination=destination,
+ pdus=pdus,
+ edus=edus,
)
- code = 200
- except HttpResponseException as e:
- code = e.code
- response = e.response
- if e.code in (401, 404, 429) or 500 <= e.code:
- logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
- raise e
+ self._next_txn_id += 1
- logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
+ logger.info(
+ "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
+ destination,
+ txn_id,
+ transaction.transaction_id,
+ len(pdus),
+ len(edus),
+ )
- if code == 200:
- for e_id, r in response.get("pdus", {}).items():
- if "error" in r:
+ # Actually send the transaction
+
+ # FIXME (erikj): This is a bit of a hack to make the Pdu age
+ # keys work
+ def json_data_cb():
+ data = transaction.get_dict()
+ now = int(self.clock.time_msec())
+ if "pdus" in data:
+ for p in data["pdus"]:
+ if "age_ts" in p:
+ unsigned = p.setdefault("unsigned", {})
+ unsigned["age"] = now - int(p["age_ts"])
+ del p["age_ts"]
+ return data
+
+ try:
+ response = yield self._transport_layer.send_transaction(
+ transaction, json_data_cb
+ )
+ code = 200
+ except HttpResponseException as e:
+ code = e.code
+ response = e.response
+
+ if e.code in (401, 404, 429) or 500 <= e.code:
+ logger.info(
+ "TX [%s] {%s} got %d response", destination, txn_id, code
+ )
+ raise e
+
+ logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
+
+ if code == 200:
+ for e_id, r in response.get("pdus", {}).items():
+ if "error" in r:
+ logger.warn(
+ "TX [%s] {%s} Remote returned error for %s: %s",
+ destination,
+ txn_id,
+ e_id,
+ r,
+ )
+ else:
+ for p in pdus:
logger.warn(
- "TX [%s] {%s} Remote returned error for %s: %s",
+ "TX [%s] {%s} Failed to send event %s",
destination,
txn_id,
- e_id,
- r,
+ p.event_id,
)
- else:
- for p in pdus:
- logger.warn(
- "TX [%s] {%s} Failed to send event %s",
- destination,
- txn_id,
- p.event_id,
- )
- success = False
+ success = False
- return success
+ set_tag(tags.ERROR, not success)
+ return success
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a17148fc3c..dc53b4b170 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -38,7 +38,12 @@ from synapse.http.servlet import (
parse_string_from_args,
)
from synapse.logging.context import run_in_background
-from synapse.logging.opentracing import start_active_span_from_context, tags
+from synapse.logging.opentracing import (
+ start_active_span,
+ start_active_span_from_request,
+ tags,
+ whitelisted_homeserver,
+)
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
@@ -288,20 +293,28 @@ class BaseFederationServlet(object):
logger.warn("authenticate_request failed: %s", e)
raise
- # Start an opentracing span
- with start_active_span_from_context(
- request.requestHeaders,
- "incoming-federation-request",
- tags={
- "request_id": request.get_request_id(),
- tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
- tags.HTTP_METHOD: request.get_method(),
- tags.HTTP_URL: request.get_redacted_uri(),
- tags.PEER_HOST_IPV6: request.getClientIP(),
- "authenticated_entity": origin,
- "servlet_name": request.request_metrics.name,
- },
- ):
+ request_tags = {
+ "request_id": request.get_request_id(),
+ tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
+ tags.HTTP_METHOD: request.get_method(),
+ tags.HTTP_URL: request.get_redacted_uri(),
+ tags.PEER_HOST_IPV6: request.getClientIP(),
+ "authenticated_entity": origin,
+ "servlet_name": request.request_metrics.name,
+ }
+
+ # Only accept the span context if the origin is authenticated
+ # and whitelisted
+ if origin and whitelisted_homeserver(origin):
+ scope = start_active_span_from_request(
+ request, "incoming-federation-request", tags=request_tags
+ )
+ else:
+ scope = start_active_span(
+ "incoming-federation-request", tags=request_tags
+ )
+
+ with scope:
if origin:
with ratelimiter.ratelimit(origin) as d:
await d
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 14aad8f09d..aa84621206 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -38,6 +38,9 @@ class Edu(JsonEncodedObject):
internal_keys = ["origin", "destination"]
+ def get_context(self):
+ return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
+
class Transaction(JsonEncodedObject):
""" A transaction is a list of Pdus and Edus to be sent to a remote home
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 2f22f56ca4..d30a68b650 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -94,6 +94,16 @@ class AdminHandler(BaseHandler):
return ret
+ def set_user_server_admin(self, user, admin):
+ """
+ Set the admin bit on a user.
+
+ Args:
+ user_id (UserID): the (necessarily local) user to manipulate
+ admin (bool): whether or not the user should be an admin of this server
+ """
+ return self.store.set_server_admin(user, admin)
+
@defer.inlineCallbacks
def export_user_data(self, user_id, writer):
"""Write all data we have on the user to the given writer.
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index e1ebb6346c..c7d56779b8 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -15,9 +15,17 @@
import logging
+from canonicaljson import json
+
from twisted.internet import defer
from synapse.api.errors import SynapseError
+from synapse.logging.opentracing import (
+ get_active_span_text_map,
+ set_tag,
+ start_active_span,
+ whitelisted_homeserver,
+)
from synapse.types import UserID, get_domain_from_id
from synapse.util.stringutils import random_string
@@ -100,14 +108,21 @@ class DeviceMessageHandler(object):
message_id = random_string(16)
+ context = get_active_span_text_map()
+
remote_edu_contents = {}
for destination, messages in remote_messages.items():
- remote_edu_contents[destination] = {
- "messages": messages,
- "sender": sender_user_id,
- "type": message_type,
- "message_id": message_id,
- }
+ with start_active_span("to_device_for_user"):
+ set_tag("destination", destination)
+ remote_edu_contents[destination] = {
+ "messages": messages,
+ "sender": sender_user_id,
+ "type": message_type,
+ "message_id": message_id,
+ "org.matrix.opentracing_context": json.dumps(context)
+ if whitelisted_homeserver(destination)
+ else None,
+ }
stream_id = yield self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 1f90b0d278..056fb97acb 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -24,6 +24,7 @@ from twisted.internet import defer
from synapse.api.errors import CodeMessageException, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
+from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
from synapse.types import UserID, get_domain_from_id
from synapse.util import unwrapFirstError
from synapse.util.retryutils import NotRetryingDestination
@@ -46,6 +47,7 @@ class E2eKeysHandler(object):
"client_keys", self.on_federation_query_client_keys
)
+ @trace
@defer.inlineCallbacks
def query_devices(self, query_body, timeout):
""" Handle a device key query from a client
@@ -81,6 +83,9 @@ class E2eKeysHandler(object):
else:
remote_queries[user_id] = device_ids
+ set_tag("local_key_query", local_query)
+ set_tag("remote_key_query", remote_queries)
+
# First get local devices.
failures = {}
results = {}
@@ -121,6 +126,7 @@ class E2eKeysHandler(object):
r[user_id] = remote_queries[user_id]
# Now fetch any devices that we don't have in our cache
+ @trace
@defer.inlineCallbacks
def do_remote_query(destination):
"""This is called when we are querying the device list of a user on
@@ -185,6 +191,8 @@ class E2eKeysHandler(object):
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
+ set_tag("error", True)
+ set_tag("reason", failure)
yield make_deferred_yieldable(
defer.gatherResults(
@@ -198,6 +206,7 @@ class E2eKeysHandler(object):
return {"device_keys": results, "failures": failures}
+ @trace
@defer.inlineCallbacks
def query_local_devices(self, query):
"""Get E2E device keys for local users
@@ -210,6 +219,7 @@ class E2eKeysHandler(object):
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
map from user_id -> device_id -> device details
"""
+ set_tag("local_query", query)
local_query = []
result_dict = {}
@@ -217,6 +227,14 @@ class E2eKeysHandler(object):
# we use UserID.from_string to catch invalid user ids
if not self.is_mine(UserID.from_string(user_id)):
logger.warning("Request for keys for non-local user %s", user_id)
+ log_kv(
+ {
+ "message": "Requested a local key for a user which"
+ " was not local to the homeserver",
+ "user_id": user_id,
+ }
+ )
+ set_tag("error", True)
raise SynapseError(400, "Not a user here")
if not device_ids:
@@ -241,6 +259,7 @@ class E2eKeysHandler(object):
r["unsigned"]["device_display_name"] = display_name
result_dict[user_id][device_id] = r
+ log_kv(results)
return result_dict
@defer.inlineCallbacks
@@ -251,6 +270,7 @@ class E2eKeysHandler(object):
res = yield self.query_local_devices(device_keys_query)
return {"device_keys": res}
+ @trace
@defer.inlineCallbacks
def claim_one_time_keys(self, query, timeout):
local_query = []
@@ -265,6 +285,9 @@ class E2eKeysHandler(object):
domain = get_domain_from_id(user_id)
remote_queries.setdefault(domain, {})[user_id] = device_keys
+ set_tag("local_key_query", local_query)
+ set_tag("remote_key_query", remote_queries)
+
results = yield self.store.claim_e2e_one_time_keys(local_query)
json_result = {}
@@ -276,8 +299,10 @@ class E2eKeysHandler(object):
key_id: json.loads(json_bytes)
}
+ @trace
@defer.inlineCallbacks
def claim_client_keys(destination):
+ set_tag("destination", destination)
device_keys = remote_queries[destination]
try:
remote_result = yield self.federation.claim_client_keys(
@@ -290,6 +315,8 @@ class E2eKeysHandler(object):
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
+ set_tag("error", True)
+ set_tag("reason", failure)
yield make_deferred_yieldable(
defer.gatherResults(
@@ -313,9 +340,11 @@ class E2eKeysHandler(object):
),
)
+ log_kv({"one_time_keys": json_result, "failures": failures})
return {"one_time_keys": json_result, "failures": failures}
@defer.inlineCallbacks
+ @tag_args
def upload_keys_for_user(self, user_id, device_id, keys):
time_now = self.clock.time_msec()
@@ -329,6 +358,13 @@ class E2eKeysHandler(object):
user_id,
time_now,
)
+ log_kv(
+ {
+ "message": "Updating device_keys for user.",
+ "user_id": user_id,
+ "device_id": device_id,
+ }
+ )
# TODO: Sign the JSON with the server key
changed = yield self.store.set_e2e_device_keys(
user_id, device_id, time_now, device_keys
@@ -336,12 +372,24 @@ class E2eKeysHandler(object):
if changed:
# Only notify about device updates *if* the keys actually changed
yield self.device_handler.notify_device_update(user_id, [device_id])
-
+ else:
+ log_kv({"message": "Not updating device_keys for user", "user_id": user_id})
one_time_keys = keys.get("one_time_keys", None)
if one_time_keys:
+ log_kv(
+ {
+ "message": "Updating one_time_keys for device.",
+ "user_id": user_id,
+ "device_id": device_id,
+ }
+ )
yield self._upload_one_time_keys_for_user(
user_id, device_id, time_now, one_time_keys
)
+ else:
+ log_kv(
+ {"message": "Did not update one_time_keys", "reason": "no keys given"}
+ )
# the device should have been registered already, but it may have been
# deleted due to a race with a DELETE request. Or we may be using an
@@ -352,6 +400,7 @@ class E2eKeysHandler(object):
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
+ set_tag("one_time_key_counts", result)
return {"one_time_key_counts": result}
@defer.inlineCallbacks
@@ -395,6 +444,7 @@ class E2eKeysHandler(object):
(algorithm, key_id, encode_canonical_json(key).decode("ascii"))
)
+ log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 41b871fc59..a9d80f708c 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -26,6 +26,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
+from synapse.logging.opentracing import log_kv, trace
from synapse.util.async_helpers import Linearizer
logger = logging.getLogger(__name__)
@@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object):
# changed.
self._upload_linearizer = Linearizer("upload_room_keys_lock")
+ @trace
@defer.inlineCallbacks
def get_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object):
user_id, version, room_id, session_id
)
+ log_kv(results)
return results
+ @trace
@defer.inlineCallbacks
def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object):
with (yield self._upload_linearizer.queue(user_id)):
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
+ @trace
@defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys):
"""Bulk upload a list of room keys into a given backup version, asserting
@@ -186,7 +191,14 @@ class E2eRoomKeysHandler(object):
session_id(str): the session whose room_key we're setting
room_key(dict): the room_key being set
"""
-
+ log_kv(
+ {
+ "message": "Trying to upload room key",
+ "room_id": room_id,
+ "session_id": session_id,
+ "user_id": user_id,
+ }
+ )
# get the room_key for this particular row
current_room_key = None
try:
@@ -195,14 +207,23 @@ class E2eRoomKeysHandler(object):
)
except StoreError as e:
if e.code == 404:
- pass
+ log_kv(
+ {
+ "message": "Room key not found.",
+ "room_id": room_id,
+ "user_id": user_id,
+ }
+ )
else:
raise
if self._should_replace_room_key(current_room_key, room_key):
+ log_kv({"message": "Replacing room key."})
yield self.store.set_e2e_room_key(
user_id, version, room_id, session_id, room_key
)
+ else:
+ log_kv({"message": "Not replacing room_key."})
@staticmethod
def _should_replace_room_key(current_room_key, room_key):
@@ -236,6 +257,7 @@ class E2eRoomKeysHandler(object):
return False
return True
+ @trace
@defer.inlineCallbacks
def create_version(self, user_id, version_info):
"""Create a new backup version. This automatically becomes the new
@@ -294,6 +316,7 @@ class E2eRoomKeysHandler(object):
raise
return res
+ @trace
@defer.inlineCallbacks
def delete_version(self, user_id, version=None):
"""Deletes a given version of the user's e2e_room_keys backup
@@ -314,6 +337,7 @@ class E2eRoomKeysHandler(object):
else:
raise
+ @trace
@defer.inlineCallbacks
def update_version(self, user_id, version, version_info):
"""Update the info about a given version of the user's backup
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d83aab3f74..5744f4579d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -70,6 +70,7 @@ class PaginationHandler(object):
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
+ self._server_name = hs.hostname
self.pagination_lock = ReadWriteLock()
self._purges_in_progress_by_room = set()
@@ -153,6 +154,22 @@ class PaginationHandler(object):
"""
return self._purges_by_id.get(purge_id)
+ async def purge_room(self, room_id):
+ """Purge the given room from the database"""
+ with (await self.pagination_lock.write(room_id)):
+ # check we know about the room
+ await self.store.get_room_version(room_id)
+
+ # first check that we have no users in this room
+ joined = await defer.maybeDeferred(
+ self.store.is_host_joined, room_id, self._server_name
+ )
+
+ if joined:
+ raise SynapseError(400, "Users are still joined to this room")
+
+ await self.store.purge_room(room_id)
+
@defer.inlineCallbacks
def get_messages(
self,
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index 71a15f434d..64f62aaeec 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -51,9 +51,9 @@ class MatrixFederationAgent(object):
SRVResolver impl to use for looking up SRV records. None to use a default
implementation.
- _well_known_cache (TTLCache|None):
- TTLCache impl for storing cached well-known lookups. None to use a default
- implementation.
+ _well_known_resolver (WellKnownResolver|None):
+ WellKnownResolver to use to perform well-known lookups. None to use a
+ default implementation.
"""
def __init__(
@@ -61,7 +61,7 @@ class MatrixFederationAgent(object):
reactor,
tls_client_options_factory,
_srv_resolver=None,
- _well_known_cache=None,
+ _well_known_resolver=None,
):
self._reactor = reactor
self._clock = Clock(reactor)
@@ -76,15 +76,17 @@ class MatrixFederationAgent(object):
self._pool.maxPersistentPerHost = 5
self._pool.cachedConnectionTimeout = 2 * 60
- self._well_known_resolver = WellKnownResolver(
- self._reactor,
- agent=Agent(
+ if _well_known_resolver is None:
+ _well_known_resolver = WellKnownResolver(
self._reactor,
- pool=self._pool,
- contextFactory=tls_client_options_factory,
- ),
- well_known_cache=_well_known_cache,
- )
+ agent=Agent(
+ self._reactor,
+ pool=self._pool,
+ contextFactory=tls_client_options_factory,
+ ),
+ )
+
+ self._well_known_resolver = _well_known_resolver
@defer.inlineCallbacks
def request(self, method, uri, headers=None, bodyProducer=None):
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index bb250c6922..5e9b0befb0 100644
--- a/synapse/http/federation/well_known_resolver.py
+++ b/synapse/http/federation/well_known_resolver.py
@@ -32,12 +32,19 @@ from synapse.util.metrics import Measure
# period to cache .well-known results for by default
WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
-# jitter to add to the .well-known default cache ttl
-WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
+# jitter factor to add to the .well-known default cache ttls
+WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 0.1
# period to cache failure to fetch .well-known for
WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
+# period to cache failure to fetch .well-known if there has recently been a
+# valid well-known for that domain.
+WELL_KNOWN_DOWN_CACHE_PERIOD = 2 * 60
+
+# period to remember there was a valid well-known after valid record expires
+WELL_KNOWN_REMEMBER_DOMAIN_HAD_VALID = 2 * 3600
+
# cap for .well-known cache period
WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
@@ -49,11 +56,16 @@ WELL_KNOWN_MIN_CACHE_PERIOD = 5 * 60
# we'll start trying to refetch 1 minute before it expires.
WELL_KNOWN_GRACE_PERIOD_FACTOR = 0.2
+# Number of times we retry fetching a well-known for a domain we know recently
+# had a valid entry.
+WELL_KNOWN_RETRY_ATTEMPTS = 3
+
logger = logging.getLogger(__name__)
_well_known_cache = TTLCache("well-known")
+_had_valid_well_known_cache = TTLCache("had-valid-well-known")
@attr.s(slots=True, frozen=True)
@@ -65,14 +77,20 @@ class WellKnownResolver(object):
"""Handles well-known lookups for matrix servers.
"""
- def __init__(self, reactor, agent, well_known_cache=None):
+ def __init__(
+ self, reactor, agent, well_known_cache=None, had_well_known_cache=None
+ ):
self._reactor = reactor
self._clock = Clock(reactor)
if well_known_cache is None:
well_known_cache = _well_known_cache
+ if had_well_known_cache is None:
+ had_well_known_cache = _had_valid_well_known_cache
+
self._well_known_cache = well_known_cache
+ self._had_valid_well_known_cache = had_well_known_cache
self._well_known_agent = RedirectAgent(agent)
@defer.inlineCallbacks
@@ -100,7 +118,7 @@ class WellKnownResolver(object):
# requests for the same server in parallel?
try:
with Measure(self._clock, "get_well_known"):
- result, cache_period = yield self._do_get_well_known(server_name)
+ result, cache_period = yield self._fetch_well_known(server_name)
except _FetchWellKnownFailure as e:
if prev_result and e.temporary:
@@ -111,10 +129,18 @@ class WellKnownResolver(object):
result = None
- # add some randomness to the TTL to avoid a stampeding herd every hour
- # after startup
- cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
- cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
+ if self._had_valid_well_known_cache.get(server_name, False):
+ # We have recently seen a valid well-known record for this
+ # server, so we cache the lack of well-known for a shorter time.
+ cache_period = WELL_KNOWN_DOWN_CACHE_PERIOD
+ else:
+ cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
+
+ # add some randomness to the TTL to avoid a stampeding herd
+ cache_period *= random.uniform(
+ 1 - WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
+ 1 + WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
+ )
if cache_period > 0:
self._well_known_cache.set(server_name, result, cache_period)
@@ -122,7 +148,7 @@ class WellKnownResolver(object):
return WellKnownLookupResult(delegated_server=result)
@defer.inlineCallbacks
- def _do_get_well_known(self, server_name):
+ def _fetch_well_known(self, server_name):
"""Actually fetch and parse a .well-known, without checking the cache
Args:
@@ -134,24 +160,15 @@ class WellKnownResolver(object):
Returns:
Deferred[Tuple[bytes,int]]: The lookup result and cache period.
"""
- uri = b"https://%s/.well-known/matrix/server" % (server_name,)
- uri_str = uri.decode("ascii")
- logger.info("Fetching %s", uri_str)
+
+ had_valid_well_known = self._had_valid_well_known_cache.get(server_name, False)
# We do this in two steps to differentiate between possibly transient
# errors (e.g. can't connect to host, 503 response) and more permenant
# errors (such as getting a 404 response).
- try:
- response = yield make_deferred_yieldable(
- self._well_known_agent.request(b"GET", uri)
- )
- body = yield make_deferred_yieldable(readBody(response))
-
- if 500 <= response.code < 600:
- raise Exception("Non-200 response %s" % (response.code,))
- except Exception as e:
- logger.info("Error fetching %s: %s", uri_str, e)
- raise _FetchWellKnownFailure(temporary=True)
+ response, body = yield self._make_well_known_request(
+ server_name, retry=had_valid_well_known
+ )
try:
if response.code != 200:
@@ -161,8 +178,11 @@ class WellKnownResolver(object):
logger.info("Response from .well-known: %s", parsed_body)
result = parsed_body["m.server"].encode("ascii")
+ except defer.CancelledError:
+ # Bail if we've been cancelled
+ raise
except Exception as e:
- logger.info("Error fetching %s: %s", uri_str, e)
+ logger.info("Error parsing well-known for %s: %s", server_name, e)
raise _FetchWellKnownFailure(temporary=False)
cache_period = _cache_period_from_headers(
@@ -172,13 +192,69 @@ class WellKnownResolver(object):
cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
# add some randomness to the TTL to avoid a stampeding herd every 24 hours
# after startup
- cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
+ cache_period *= random.uniform(
+ 1 - WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
+ 1 + WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
+ )
else:
cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD)
+ # We got a success, mark as such in the cache
+ self._had_valid_well_known_cache.set(
+ server_name,
+ bool(result),
+ cache_period + WELL_KNOWN_REMEMBER_DOMAIN_HAD_VALID,
+ )
+
return (result, cache_period)
+ @defer.inlineCallbacks
+ def _make_well_known_request(self, server_name, retry):
+ """Make the well known request.
+
+ This will retry the request if requested and it fails (with unable
+ to connect or receives a 5xx error).
+
+ Args:
+ server_name (bytes)
+ retry (bool): Whether to retry the request if it fails.
+
+ Returns:
+ Deferred[tuple[IResponse, bytes]] Returns the response object and
+ body. Response may be a non-200 response.
+ """
+ uri = b"https://%s/.well-known/matrix/server" % (server_name,)
+ uri_str = uri.decode("ascii")
+
+ i = 0
+ while True:
+ i += 1
+
+ logger.info("Fetching %s", uri_str)
+ try:
+ response = yield make_deferred_yieldable(
+ self._well_known_agent.request(b"GET", uri)
+ )
+ body = yield make_deferred_yieldable(readBody(response))
+
+ if 500 <= response.code < 600:
+ raise Exception("Non-200 response %s" % (response.code,))
+
+ return response, body
+ except defer.CancelledError:
+ # Bail if we've been cancelled
+ raise
+ except Exception as e:
+ if not retry or i >= WELL_KNOWN_RETRY_ATTEMPTS:
+ logger.info("Error fetching %s: %s", uri_str, e)
+ raise _FetchWellKnownFailure(temporary=True)
+
+ logger.info("Error fetching %s: %s. Retrying", uri_str, e)
+
+ # Sleep briefly in the hopes that they come back up
+ yield self._clock.sleep(0.5)
+
def _cache_period_from_headers(headers, time_now=time.time):
cache_controls = _parse_cache_control(headers)
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index fd07bf7b8e..c186b31f59 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -300,7 +300,7 @@ class RestServlet(object):
http_server.register_paths(
method,
patterns,
- trace_servlet(servlet_classname, method_handler),
+ trace_servlet(servlet_classname)(method_handler),
servlet_classname,
)
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 6b706e1892..dd296027a1 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -149,6 +149,9 @@ unchartered waters will require the enforcement of the whitelist.
``logging/opentracing.py`` has a ``whitelisted_homeserver`` method which takes
in a destination and compares it to the whitelist.
+Most injection methods take a 'destination' arg. The context will only be injected
+if the destination matches the whitelist or the destination is None.
+
=======
Gotchas
=======
@@ -174,10 +177,48 @@ from twisted.internet import defer
from synapse.config import ConfigError
+# Helper class
+
+
+class _DummyTagNames(object):
+ """wrapper of opentracings tags. We need to have them if we
+ want to reference them without opentracing around. Clearly they
+ should never actually show up in a trace. `set_tags` overwrites
+ these with the correct ones."""
+
+ INVALID_TAG = "invalid-tag"
+ COMPONENT = INVALID_TAG
+ DATABASE_INSTANCE = INVALID_TAG
+ DATABASE_STATEMENT = INVALID_TAG
+ DATABASE_TYPE = INVALID_TAG
+ DATABASE_USER = INVALID_TAG
+ ERROR = INVALID_TAG
+ HTTP_METHOD = INVALID_TAG
+ HTTP_STATUS_CODE = INVALID_TAG
+ HTTP_URL = INVALID_TAG
+ MESSAGE_BUS_DESTINATION = INVALID_TAG
+ PEER_ADDRESS = INVALID_TAG
+ PEER_HOSTNAME = INVALID_TAG
+ PEER_HOST_IPV4 = INVALID_TAG
+ PEER_HOST_IPV6 = INVALID_TAG
+ PEER_PORT = INVALID_TAG
+ PEER_SERVICE = INVALID_TAG
+ SAMPLING_PRIORITY = INVALID_TAG
+ SERVICE = INVALID_TAG
+ SPAN_KIND = INVALID_TAG
+ SPAN_KIND_CONSUMER = INVALID_TAG
+ SPAN_KIND_PRODUCER = INVALID_TAG
+ SPAN_KIND_RPC_CLIENT = INVALID_TAG
+ SPAN_KIND_RPC_SERVER = INVALID_TAG
+
+
try:
import opentracing
+
+ tags = opentracing.tags
except ImportError:
opentracing = None
+ tags = _DummyTagNames
try:
from jaeger_client import Config as JaegerConfig
from synapse.logging.scopecontextmanager import LogContextScopeManager
@@ -252,10 +293,6 @@ def init_tracer(config):
scope_manager=LogContextScopeManager(config),
).initialize_tracer()
- # Set up tags to be opentracing's tags
- global tags
- tags = opentracing.tags
-
# Whitelisting
@@ -334,8 +371,8 @@ def start_active_span_follows_from(operation_name, contexts):
return scope
-def start_active_span_from_context(
- headers,
+def start_active_span_from_request(
+ request,
operation_name,
references=None,
tags=None,
@@ -344,9 +381,9 @@ def start_active_span_from_context(
finish_on_close=True,
):
"""
- Extracts a span context from Twisted Headers.
+ Extracts a span context from a Twisted Request.
args:
- headers (twisted.web.http_headers.Headers)
+ headers (twisted.web.http.Request)
For the other args see opentracing.tracer
@@ -360,7 +397,9 @@ def start_active_span_from_context(
if opentracing is None:
return _noop_context_manager()
- header_dict = {k.decode(): v[0].decode() for k, v in headers.getAllRawHeaders()}
+ header_dict = {
+ k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
+ }
context = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)
return opentracing.tracer.start_active_span(
@@ -448,7 +487,7 @@ def set_operation_name(operation_name):
@only_if_tracing
-def inject_active_span_twisted_headers(headers, destination):
+def inject_active_span_twisted_headers(headers, destination, check_destination=True):
"""
Injects a span context into twisted headers in-place
@@ -467,7 +506,7 @@ def inject_active_span_twisted_headers(headers, destination):
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
"""
- if not whitelisted_homeserver(destination):
+ if check_destination and not whitelisted_homeserver(destination):
return
span = opentracing.tracer.active_span
@@ -479,7 +518,7 @@ def inject_active_span_twisted_headers(headers, destination):
@only_if_tracing
-def inject_active_span_byte_dict(headers, destination):
+def inject_active_span_byte_dict(headers, destination, check_destination=True):
"""
Injects a span context into a dict where the headers are encoded as byte
strings
@@ -511,7 +550,7 @@ def inject_active_span_byte_dict(headers, destination):
@only_if_tracing
-def inject_active_span_text_map(carrier, destination=None):
+def inject_active_span_text_map(carrier, destination, check_destination=True):
"""
Injects a span context into a dict
@@ -532,7 +571,7 @@ def inject_active_span_text_map(carrier, destination=None):
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
"""
- if destination and not whitelisted_homeserver(destination):
+ if check_destination and not whitelisted_homeserver(destination):
return
opentracing.tracer.inject(
@@ -540,6 +579,29 @@ def inject_active_span_text_map(carrier, destination=None):
)
+def get_active_span_text_map(destination=None):
+ """
+ Gets a span context as a dict. This can be used instead of manually
+ injecting a span into an empty carrier.
+
+ Args:
+ destination (str): the name of the remote server.
+
+ Returns:
+ dict: the active span's context if opentracing is enabled, otherwise empty.
+ """
+
+ if not opentracing or (destination and not whitelisted_homeserver(destination)):
+ return {}
+
+ carrier = {}
+ opentracing.tracer.inject(
+ opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
+ )
+
+ return carrier
+
+
def active_span_context_as_string():
"""
Returns:
@@ -689,65 +751,43 @@ def tag_args(func):
return _tag_args_inner
-def trace_servlet(servlet_name, func):
+def trace_servlet(servlet_name, extract_context=False):
"""Decorator which traces a serlet. It starts a span with some servlet specific
- tags such as the servlet_name and request information"""
- if not opentracing:
- return func
+ tags such as the servlet_name and request information
- @wraps(func)
- @defer.inlineCallbacks
- def _trace_servlet_inner(request, *args, **kwargs):
- with start_active_span(
- "incoming-client-request",
- tags={
+ Args:
+ servlet_name (str): The name to be used for the span's operation_name
+ extract_context (bool): Whether to attempt to extract the opentracing
+ context from the request the servlet is handling.
+
+ """
+
+ def _trace_servlet_inner_1(func):
+ if not opentracing:
+ return func
+
+ @wraps(func)
+ @defer.inlineCallbacks
+ def _trace_servlet_inner(request, *args, **kwargs):
+ request_tags = {
"request_id": request.get_request_id(),
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
tags.HTTP_METHOD: request.get_method(),
tags.HTTP_URL: request.get_redacted_uri(),
tags.PEER_HOST_IPV6: request.getClientIP(),
- "servlet_name": servlet_name,
- },
- ):
- result = yield defer.maybeDeferred(func, request, *args, **kwargs)
- return result
-
- return _trace_servlet_inner
-
-
-# Helper class
+ }
+ if extract_context:
+ scope = start_active_span_from_request(
+ request, servlet_name, tags=request_tags
+ )
+ else:
+ scope = start_active_span(servlet_name, tags=request_tags)
-class _DummyTagNames(object):
- """wrapper of opentracings tags. We need to have them if we
- want to reference them without opentracing around. Clearly they
- should never actually show up in a trace. `set_tags` overwrites
- these with the correct ones."""
-
- INVALID_TAG = "invalid-tag"
- COMPONENT = INVALID_TAG
- DATABASE_INSTANCE = INVALID_TAG
- DATABASE_STATEMENT = INVALID_TAG
- DATABASE_TYPE = INVALID_TAG
- DATABASE_USER = INVALID_TAG
- ERROR = INVALID_TAG
- HTTP_METHOD = INVALID_TAG
- HTTP_STATUS_CODE = INVALID_TAG
- HTTP_URL = INVALID_TAG
- MESSAGE_BUS_DESTINATION = INVALID_TAG
- PEER_ADDRESS = INVALID_TAG
- PEER_HOSTNAME = INVALID_TAG
- PEER_HOST_IPV4 = INVALID_TAG
- PEER_HOST_IPV6 = INVALID_TAG
- PEER_PORT = INVALID_TAG
- PEER_SERVICE = INVALID_TAG
- SAMPLING_PRIORITY = INVALID_TAG
- SERVICE = INVALID_TAG
- SPAN_KIND = INVALID_TAG
- SPAN_KIND_CONSUMER = INVALID_TAG
- SPAN_KIND_PRODUCER = INVALID_TAG
- SPAN_KIND_RPC_CLIENT = INVALID_TAG
- SPAN_KIND_RPC_SERVER = INVALID_TAG
+ with scope:
+ result = yield defer.maybeDeferred(func, request, *args, **kwargs)
+ return result
+ return _trace_servlet_inner
-tags = _DummyTagNames
+ return _trace_servlet_inner_1
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 2e0594e581..c4be9273f6 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -22,6 +22,7 @@ from six.moves import urllib
from twisted.internet import defer
+import synapse.logging.opentracing as opentracing
from synapse.api.errors import (
CodeMessageException,
HttpResponseException,
@@ -165,8 +166,12 @@ class ReplicationEndpoint(object):
# have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not.
while True:
+ headers = {}
+ opentracing.inject_active_span_byte_dict(
+ headers, None, check_destination=False
+ )
try:
- result = yield request_func(uri, data)
+ result = yield request_func(uri, data, headers=headers)
break
except CodeMessageException as e:
if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
@@ -205,7 +210,14 @@ class ReplicationEndpoint(object):
args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
pattern = re.compile("^/_synapse/replication/%s/%s$" % (self.NAME, args))
- http_server.register_paths(method, [pattern], handler, self.__class__.__name__)
+ http_server.register_paths(
+ method,
+ [pattern],
+ opentracing.trace_servlet(self.__class__.__name__, extract_context=True)(
+ handler
+ ),
+ self.__class__.__name__,
+ )
def _cached_handler(self, request, txn_id, **kwargs):
"""Called on new incoming requests when caching is enabled. Checks
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 5720cab425..9ab1c2c9e0 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -42,7 +42,9 @@ from synapse.rest.admin._base import (
historical_admin_path_patterns,
)
from synapse.rest.admin.media import register_servlets_for_media_repo
+from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
+from synapse.rest.admin.users import UserAdminServlet
from synapse.types import UserID, create_requester
from synapse.util.versionstring import get_version_string
@@ -738,8 +740,10 @@ def register_servlets(hs, http_server):
Register all the admin servlets.
"""
register_servlets_for_client_rest_resource(hs, http_server)
+ PurgeRoomServlet(hs).register(http_server)
SendServerNoticeServlet(hs).register(http_server)
VersionServlet(hs).register(http_server)
+ UserAdminServlet(hs).register(http_server)
def register_servlets_for_client_rest_resource(hs, http_server):
diff --git a/synapse/rest/admin/purge_room_servlet.py b/synapse/rest/admin/purge_room_servlet.py
new file mode 100644
index 0000000000..2922eb543e
--- /dev/null
+++ b/synapse/rest/admin/purge_room_servlet.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import re
+
+from synapse.http.servlet import (
+ RestServlet,
+ assert_params_in_dict,
+ parse_json_object_from_request,
+)
+from synapse.rest.admin import assert_requester_is_admin
+
+
+class PurgeRoomServlet(RestServlet):
+ """Servlet which will remove all trace of a room from the database
+
+ POST /_synapse/admin/v1/purge_room
+ {
+ "room_id": "!room:id"
+ }
+
+ returns:
+
+ {}
+ """
+
+ PATTERNS = (re.compile("^/_synapse/admin/v1/purge_room$"),)
+
+ def __init__(self, hs):
+ """
+ Args:
+ hs (synapse.server.HomeServer): server
+ """
+ self.hs = hs
+ self.auth = hs.get_auth()
+ self.pagination_handler = hs.get_pagination_handler()
+
+ async def on_POST(self, request):
+ await assert_requester_is_admin(self.auth, request)
+
+ body = parse_json_object_from_request(request)
+ assert_params_in_dict(body, ("room_id",))
+
+ await self.pagination_handler.purge_room(body["room_id"])
+
+ return (200, {})
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
new file mode 100644
index 0000000000..b0fddb6898
--- /dev/null
+++ b/synapse/rest/admin/users.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import re
+
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
+from synapse.http.servlet import (
+ RestServlet,
+ assert_params_in_dict,
+ parse_json_object_from_request,
+)
+from synapse.rest.admin import assert_requester_is_admin
+from synapse.types import UserID
+
+
+class UserAdminServlet(RestServlet):
+ """
+ Set whether or not a user is a server administrator.
+
+ Note that only local users can be server administrators, and that an
+ administrator may not demote themselves.
+
+ Only server administrators can use this API.
+
+ Example:
+ PUT /_synapse/admin/v1/users/@reivilibre:librepush.net/admin
+ {
+ "admin": true
+ }
+ """
+
+ PATTERNS = (re.compile("^/_synapse/admin/v1/users/(?P<user_id>@[^/]*)/admin$"),)
+
+ def __init__(self, hs):
+ self.hs = hs
+ self.auth = hs.get_auth()
+ self.handlers = hs.get_handlers()
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, user_id):
+ yield assert_requester_is_admin(self.auth, request)
+ requester = yield self.auth.get_user_by_req(request)
+ auth_user = requester.user
+
+ target_user = UserID.from_string(user_id)
+
+ body = parse_json_object_from_request(request)
+
+ assert_params_in_dict(body, ["admin"])
+
+ if not self.hs.is_mine(target_user):
+ raise SynapseError(400, "Only local users can be admins of this homeserver")
+
+ set_admin_to = bool(body["admin"])
+
+ if target_user == auth_user and not set_admin_to:
+ raise SynapseError(400, "You may not demote yourself.")
+
+ yield self.handlers.admin_handler.set_user_server_admin(
+ target_user, set_admin_to
+ )
+
+ return (200, {})
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index 6008adec7c..b218a3f334 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -24,6 +24,7 @@ from synapse.http.servlet import (
parse_json_object_from_request,
parse_string,
)
+from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name
from synapse.types import StreamToken
from ._base import client_patterns
@@ -68,6 +69,7 @@ class KeyUploadServlet(RestServlet):
self.auth = hs.get_auth()
self.e2e_keys_handler = hs.get_e2e_keys_handler()
+ @trace_using_operation_name("upload_keys")
@defer.inlineCallbacks
def on_POST(self, request, device_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
@@ -78,6 +80,14 @@ class KeyUploadServlet(RestServlet):
# passing the device_id here is deprecated; however, we allow it
# for now for compatibility with older clients.
if requester.device_id is not None and device_id != requester.device_id:
+ set_tag("error", True)
+ log_kv(
+ {
+ "message": "Client uploading keys for a different device",
+ "logged_in_id": requester.device_id,
+ "key_being_uploaded": device_id,
+ }
+ )
logger.warning(
"Client uploading keys for a different device "
"(logged in as %s, uploading for %s)",
@@ -178,10 +188,11 @@ class KeyChangesServlet(RestServlet):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
from_token_string = parse_string(request, "from")
+ set_tag("from", from_token_string)
# We want to enforce they do pass us one, but we ignore it and return
# changes after the "to" as well as before.
- parse_string(request, "to")
+ set_tag("to", parse_string(request, "to"))
from_token = StreamToken.from_string(from_token_string)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 05ea1459e3..9510a1e2b0 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -16,7 +16,6 @@
import hmac
import logging
-from hashlib import sha1
from six import string_types
@@ -239,14 +238,12 @@ class RegisterRestServlet(RestServlet):
# we do basic sanity checks here because the auth layer will store these
# in sessions. Pull out the username/password provided to us.
- desired_password = None
if "password" in body:
if (
not isinstance(body["password"], string_types)
or len(body["password"]) > 512
):
raise SynapseError(400, "Invalid password")
- desired_password = body["password"]
desired_username = None
if "username" in body:
@@ -261,8 +258,8 @@ class RegisterRestServlet(RestServlet):
if self.auth.has_access_token(request):
appservice = yield self.auth.get_appservice_by_req(request)
- # fork off as soon as possible for ASes and shared secret auth which
- # have completely different registration flows to normal users
+ # fork off as soon as possible for ASes which have completely
+ # different registration flows to normal users
# == Application Service Registration ==
if appservice:
@@ -285,8 +282,8 @@ class RegisterRestServlet(RestServlet):
return (200, result) # we throw for non 200 responses
return
- # for either shared secret or regular registration, downcase the
- # provided username before attempting to register it. This should mean
+ # for regular registration, downcase the provided username before
+ # attempting to register it. This should mean
# that people who try to register with upper-case in their usernames
# don't get a nasty surprise. (Note that we treat username
# case-insenstively in login, so they are free to carry on imagining
@@ -294,16 +291,6 @@ class RegisterRestServlet(RestServlet):
if desired_username is not None:
desired_username = desired_username.lower()
- # == Shared Secret Registration == (e.g. create new user scripts)
- if "mac" in body:
- # FIXME: Should we really be determining if this is shared secret
- # auth based purely on the 'mac' key?
- result = yield self._do_shared_secret_registration(
- desired_username, desired_password, body
- )
- return (200, result) # we throw for non 200 responses
- return
-
# == Normal User Registration == (everyone else)
if not self.hs.config.enable_registration:
raise SynapseError(403, "Registration has been disabled")
@@ -513,42 +500,6 @@ class RegisterRestServlet(RestServlet):
return (yield self._create_registration_details(user_id, body))
@defer.inlineCallbacks
- def _do_shared_secret_registration(self, username, password, body):
- if not self.hs.config.registration_shared_secret:
- raise SynapseError(400, "Shared secret registration is not enabled")
- if not username:
- raise SynapseError(
- 400, "username must be specified", errcode=Codes.BAD_JSON
- )
-
- # use the username from the original request rather than the
- # downcased one in `username` for the mac calculation
- user = body["username"].encode("utf-8")
-
- # str() because otherwise hmac complains that 'unicode' does not
- # have the buffer interface
- got_mac = str(body["mac"])
-
- # FIXME this is different to the /v1/register endpoint, which
- # includes the password and admin flag in the hashed text. Why are
- # these different?
- want_mac = hmac.new(
- key=self.hs.config.registration_shared_secret.encode(),
- msg=user,
- digestmod=sha1,
- ).hexdigest()
-
- if not compare_digest(want_mac, got_mac):
- raise SynapseError(403, "HMAC incorrect")
-
- user_id = yield self.registration_handler.register_user(
- localpart=username, password=password
- )
-
- result = yield self._create_registration_details(user_id, body)
- return result
-
- @defer.inlineCallbacks
def _create_registration_details(self, user_id, params):
"""Complete registration of newly-registered user
diff --git a/synapse/rest/well_known.py b/synapse/rest/well_known.py
index 5e8fda4b65..20177b44e7 100644
--- a/synapse/rest/well_known.py
+++ b/synapse/rest/well_known.py
@@ -34,7 +34,7 @@ class WellKnownBuilder(object):
self._config = hs.config
def get_well_known(self):
- # if we don't have a public_base_url, we can't help much here.
+ # if we don't have a public_baseurl, we can't help much here.
if self._config.public_baseurl is None:
return None
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 8f72d92895..e11881161d 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -21,6 +21,11 @@ from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import StoreError
+from synapse.logging.opentracing import (
+ get_active_span_text_map,
+ trace,
+ whitelisted_homeserver,
+)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import Cache, SQLBaseStore, db_to_json
from synapse.storage.background_updates import BackgroundUpdateStore
@@ -73,6 +78,7 @@ class DeviceWorkerStore(SQLBaseStore):
return {d["device_id"]: d for d in devices}
+ @trace
@defer.inlineCallbacks
def get_devices_by_remote(self, destination, from_stream_id, limit):
"""Get stream of updates to send to remote servers
@@ -127,8 +133,15 @@ class DeviceWorkerStore(SQLBaseStore):
# (user_id, device_id) entries into a map, with the value being
# the max stream_id across each set of duplicate entries
#
- # maps (user_id, device_id) -> stream_id
+ # maps (user_id, device_id) -> (stream_id, opentracing_context)
# as long as their stream_id does not match that of the last row
+ #
+ # opentracing_context contains the opentracing metadata for the request
+ # that created the poke
+ #
+ # The most recent request's opentracing_context is used as the
+ # context which created the Edu.
+
query_map = {}
for update in updates:
if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
@@ -136,7 +149,14 @@ class DeviceWorkerStore(SQLBaseStore):
break
key = (update[0], update[1])
- query_map[key] = max(query_map.get(key, 0), update[2])
+
+ update_context = update[3]
+ update_stream_id = update[2]
+
+ previous_update_stream_id, _ = query_map.get(key, (0, None))
+
+ if update_stream_id > previous_update_stream_id:
+ query_map[key] = (update_stream_id, update_context)
# If we didn't find any updates with a stream_id lower than the cutoff, it
# means that there are more than limit updates all of which have the same
@@ -171,7 +191,7 @@ class DeviceWorkerStore(SQLBaseStore):
List: List of device updates
"""
sql = """
- SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
+ SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes
WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
ORDER BY stream_id
LIMIT ?
@@ -187,8 +207,9 @@ class DeviceWorkerStore(SQLBaseStore):
Args:
destination (str): The host the device updates are intended for
from_stream_id (int): The minimum stream_id to filter updates by, exclusive
- query_map (Dict[(str, str): int]): Dictionary mapping
- user_id/device_id to update stream_id
+ query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping
+ user_id/device_id to update stream_id and the relevent json-encoded
+ opentracing context
Returns:
List[Dict]: List of objects representing an device update EDU
@@ -210,12 +231,13 @@ class DeviceWorkerStore(SQLBaseStore):
destination, user_id, from_stream_id
)
for device_id, device in iteritems(user_devices):
- stream_id = query_map[(user_id, device_id)]
+ stream_id, opentracing_context = query_map[(user_id, device_id)]
result = {
"user_id": user_id,
"device_id": device_id,
"prev_id": [prev_id] if prev_id else [],
"stream_id": stream_id,
+ "org.matrix.opentracing_context": opentracing_context,
}
prev_id = stream_id
@@ -814,6 +836,8 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
],
)
+ context = get_active_span_text_map()
+
self._simple_insert_many_txn(
txn,
table="device_lists_outbound_pokes",
@@ -825,6 +849,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
"device_id": device_id,
"sent": False,
"ts": now,
+ "opentracing_context": json.dumps(context)
+ if whitelisted_homeserver(destination)
+ else None,
}
for destination in hosts
for device_id in device_ids
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
index b1901404af..be2fe2bab6 100644
--- a/synapse/storage/e2e_room_keys.py
+++ b/synapse/storage/e2e_room_keys.py
@@ -18,6 +18,7 @@ import json
from twisted.internet import defer
from synapse.api.errors import StoreError
+from synapse.logging.opentracing import log_kv, trace
from ._base import SQLBaseStore
@@ -94,7 +95,16 @@ class EndToEndRoomKeyStore(SQLBaseStore):
},
lock=False,
)
+ log_kv(
+ {
+ "message": "Set room key",
+ "room_id": room_id,
+ "session_id": session_id,
+ "room_key": room_key,
+ }
+ )
+ @trace
@defer.inlineCallbacks
def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -153,6 +163,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
return sessions
+ @trace
@defer.inlineCallbacks
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -236,6 +247,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
)
+ @trace
def create_e2e_room_keys_version(self, user_id, info):
"""Atomically creates a new version of this user's e2e_room_keys store
with the given version info.
@@ -276,6 +288,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
)
+ @trace
def update_e2e_room_keys_version(self, user_id, version, info):
"""Update a given backup version
@@ -292,6 +305,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
desc="update_e2e_room_keys_version",
)
+ @trace
def delete_e2e_room_keys_version(self, user_id, version=None):
"""Delete a given backup version of the user's room keys.
Doesn't delete their actual key data.
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 1e07474e70..33e3a84933 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -18,12 +18,14 @@ from canonicaljson import encode_canonical_json
from twisted.internet import defer
+from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore, db_to_json
class EndToEndKeyWorkerStore(SQLBaseStore):
+ @trace
@defer.inlineCallbacks
def get_e2e_device_keys(
self, query_list, include_all_devices=False, include_deleted_devices=False
@@ -40,6 +42,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
Dict mapping from user-id to dict mapping from device_id to
dict containing "key_json", "device_display_name".
"""
+ set_tag("query_list", query_list)
if not query_list:
return {}
@@ -57,9 +60,13 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
return results
+ @trace
def _get_e2e_device_keys_txn(
self, txn, query_list, include_all_devices=False, include_deleted_devices=False
):
+ set_tag("include_all_devices", include_all_devices)
+ set_tag("include_deleted_devices", include_deleted_devices)
+
query_clauses = []
query_params = []
@@ -104,6 +111,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
for user_id, device_id in deleted_devices:
result.setdefault(user_id, {})[device_id] = None
+ log_kv(result)
return result
@defer.inlineCallbacks
@@ -129,8 +137,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
keyvalues={"user_id": user_id, "device_id": device_id},
desc="add_e2e_one_time_keys_check",
)
-
- return {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
+ result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
+ log_kv({"message": "Fetched one time keys for user", "one_time_keys": result})
+ return result
@defer.inlineCallbacks
def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
@@ -146,6 +155,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
"""
def _add_e2e_one_time_keys(txn):
+ set_tag("user_id", user_id)
+ set_tag("device_id", device_id)
+ set_tag("new_keys", new_keys)
# We are protected from race between lookup and insertion due to
# a unique constraint. If there is a race of two calls to
# `add_e2e_one_time_keys` then they'll conflict and we will only
@@ -202,6 +214,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
"""
def _set_e2e_device_keys_txn(txn):
+ set_tag("user_id", user_id)
+ set_tag("device_id", device_id)
+ set_tag("time_now", time_now)
+ set_tag("device_keys", device_keys)
+
old_key_json = self._simple_select_one_onecol_txn(
txn,
table="e2e_device_keys_json",
@@ -215,6 +232,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
new_key_json = encode_canonical_json(device_keys).decode("utf-8")
if old_key_json == new_key_json:
+ log_kv({"Message": "Device key already stored."})
return False
self._simple_upsert_txn(
@@ -223,7 +241,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
keyvalues={"user_id": user_id, "device_id": device_id},
values={"ts_added_ms": time_now, "key_json": new_key_json},
)
-
+ log_kv({"message": "Device keys stored."})
return True
return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn)
@@ -231,6 +249,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def claim_e2e_one_time_keys(self, query_list):
"""Take a list of one time keys out of the database"""
+ @trace
def _claim_e2e_one_time_keys(txn):
sql = (
"SELECT key_id, key_json FROM e2e_one_time_keys_json"
@@ -252,7 +271,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
" AND key_id = ?"
)
for user_id, device_id, algorithm, key_id in delete:
+ log_kv(
+ {
+ "message": "Executing claim e2e_one_time_keys transaction on database."
+ }
+ )
txn.execute(sql, (user_id, device_id, algorithm, key_id))
+ log_kv({"message": "finished executing and invalidating cache"})
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id)
)
@@ -262,6 +287,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def delete_e2e_keys_by_device(self, user_id, device_id):
def delete_e2e_keys_by_device_txn(txn):
+ log_kv(
+ {
+ "message": "Deleting keys for device",
+ "device_id": device_id,
+ "user_id": user_id,
+ }
+ )
self._simple_delete_txn(
txn,
table="e2e_device_keys_json",
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ac876287fc..5a95c36a8b 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1302,15 +1302,11 @@ class EventsStore(
"event_reference_hashes",
"event_search",
"event_to_state_groups",
- "guest_access",
- "history_visibility",
"local_invites",
- "room_names",
"state_events",
"rejections",
"redactions",
"room_memberships",
- "topics",
):
txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
@@ -1454,10 +1450,10 @@ class EventsStore(
for event, _ in events_and_contexts:
if event.type == EventTypes.Name:
- # Insert into the room_names and event_search tables.
+ # Insert into the event_search table.
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
- # Insert into the topics table and event_search table.
+ # Insert into the event_search table.
self._store_room_topic_txn(txn, event)
elif event.type == EventTypes.Message:
# Insert into the event_search table.
@@ -1465,12 +1461,6 @@ class EventsStore(
elif event.type == EventTypes.Redaction:
# Insert into the redactions table.
self._store_redaction(txn, event)
- elif event.type == EventTypes.RoomHistoryVisibility:
- # Insert into the event_search table.
- self._store_history_visibility_txn(txn, event)
- elif event.type == EventTypes.GuestAccess:
- # Insert into the event_search table.
- self._store_guest_access_txn(txn, event)
self._handle_event_relations(txn, event)
@@ -2191,6 +2181,143 @@ class EventsStore(
return to_delete, to_dedelta
+ def purge_room(self, room_id):
+ """Deletes all record of a room
+
+ Args:
+ room_id (str):
+ """
+
+ return self.runInteraction("purge_room", self._purge_room_txn, room_id)
+
+ def _purge_room_txn(self, txn, room_id):
+ # first we have to delete the state groups states
+ logger.info("[purge] removing %s from state_groups_state", room_id)
+
+ txn.execute(
+ """
+ DELETE FROM state_groups_state WHERE state_group IN (
+ SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
+ WHERE events.room_id=?
+ )
+ """,
+ (room_id,),
+ )
+
+ # ... and the state group edges
+ logger.info("[purge] removing %s from state_group_edges", room_id)
+
+ txn.execute(
+ """
+ DELETE FROM state_group_edges WHERE state_group IN (
+ SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
+ WHERE events.room_id=?
+ )
+ """,
+ (room_id,),
+ )
+
+ # ... and the state groups
+ logger.info("[purge] removing %s from state_groups", room_id)
+
+ txn.execute(
+ """
+ DELETE FROM state_groups WHERE id IN (
+ SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
+ WHERE events.room_id=?
+ )
+ """,
+ (room_id,),
+ )
+
+ # and then tables which lack an index on room_id but have one on event_id
+ for table in (
+ "event_auth",
+ "event_edges",
+ "event_push_actions_staging",
+ "event_reference_hashes",
+ "event_relations",
+ "event_to_state_groups",
+ "redactions",
+ "rejections",
+ "state_events",
+ ):
+ logger.info("[purge] removing %s from %s", room_id, table)
+
+ txn.execute(
+ """
+ DELETE FROM %s WHERE event_id IN (
+ SELECT event_id FROM events WHERE room_id=?
+ )
+ """
+ % (table,),
+ (room_id,),
+ )
+
+ # and finally, the tables with an index on room_id (or no useful index)
+ for table in (
+ "current_state_events",
+ "event_backward_extremities",
+ "event_forward_extremities",
+ "event_json",
+ "event_push_actions",
+ "event_search",
+ "events",
+ "group_rooms",
+ "public_room_list_stream",
+ "receipts_graph",
+ "receipts_linearized",
+ "room_aliases",
+ "room_depth",
+ "room_memberships",
+ "room_state",
+ "room_stats",
+ "room_stats_earliest_token",
+ "rooms",
+ "stream_ordering_to_exterm",
+ "topics",
+ "users_in_public_rooms",
+ "users_who_share_private_rooms",
+ # no useful index, but let's clear them anyway
+ "appservice_room_list",
+ "e2e_room_keys",
+ "event_push_summary",
+ "pusher_throttle",
+ "group_summary_rooms",
+ "local_invites",
+ "room_account_data",
+ "room_tags",
+ ):
+ logger.info("[purge] removing %s from %s", room_id, table)
+ txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))
+
+ # Other tables we do NOT need to clear out:
+ #
+ # - blocked_rooms
+ # This is important, to make sure that we don't accidentally rejoin a blocked
+ # room after it was purged
+ #
+ # - user_directory
+ # This has a room_id column, but it is unused
+ #
+
+ # Other tables that we might want to consider clearing out include:
+ #
+ # - event_reports
+ # Given that these are intended for abuse management my initial
+ # inclination is to leave them in place.
+ #
+ # - current_state_delta_stream
+ # - ex_outlier_stream
+ # - room_tags_revisions
+ # The problem with these is that they are largeish and there is no room_id
+ # index on them. In any case we should be clearing out 'stream' tables
+ # periodically anyway (#5888)
+
+ # TODO: we could probably usefully do a bunch of cache invalidation here
+
+ logger.info("[purge] done")
+
@defer.inlineCallbacks
def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 55e4e84d71..9027b917c1 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -272,6 +272,14 @@ class RegistrationWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def is_server_admin(self, user):
+ """Determines if a user is an admin of this homeserver.
+
+ Args:
+ user (UserID): user ID of the user to test
+
+ Returns (bool):
+ true iff the user is a server admin, false otherwise.
+ """
res = yield self._simple_select_one_onecol(
table="users",
keyvalues={"name": user.to_string()},
@@ -282,6 +290,21 @@ class RegistrationWorkerStore(SQLBaseStore):
return res if res else False
+ def set_server_admin(self, user, admin):
+ """Sets whether a user is an admin of this homeserver.
+
+ Args:
+ user (UserID): user ID of the user to test
+ admin (bool): true iff the user is to be a server admin,
+ false otherwise.
+ """
+ return self._simple_update_one(
+ table="users",
+ keyvalues={"name": user.to_string()},
+ updatevalues={"admin": 1 if admin else 0},
+ desc="set_server_admin",
+ )
+
def _query_for_auth(self, txn, token):
sql = (
"SELECT users.name, users.is_guest, access_tokens.id as token_id,"
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index bc606292b8..08e13f3a3b 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -386,32 +386,12 @@ class RoomStore(RoomWorkerStore, SearchStore):
def _store_room_topic_txn(self, txn, event):
if hasattr(event, "content") and "topic" in event.content:
- self._simple_insert_txn(
- txn,
- "topics",
- {
- "event_id": event.event_id,
- "room_id": event.room_id,
- "topic": event.content["topic"],
- },
- )
-
self.store_event_search_txn(
txn, event, "content.topic", event.content["topic"]
)
def _store_room_name_txn(self, txn, event):
if hasattr(event, "content") and "name" in event.content:
- self._simple_insert_txn(
- txn,
- "room_names",
- {
- "event_id": event.event_id,
- "room_id": event.room_id,
- "name": event.content["name"],
- },
- )
-
self.store_event_search_txn(
txn, event, "content.name", event.content["name"]
)
@@ -422,21 +402,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
txn, event, "content.body", event.content["body"]
)
- def _store_history_visibility_txn(self, txn, event):
- self._store_content_index_txn(txn, event, "history_visibility")
-
- def _store_guest_access_txn(self, txn, event):
- self._store_content_index_txn(txn, event, "guest_access")
-
- def _store_content_index_txn(self, txn, event, key):
- if hasattr(event, "content") and key in event.content:
- sql = (
- "INSERT INTO %(key)s"
- " (event_id, room_id, %(key)s)"
- " VALUES (?, ?, ?)" % {"key": key}
- )
- txn.execute(sql, (event.event_id, event.room_id, event.content[key]))
-
def add_event_report(
self, room_id, event_id, user_id, reason, content, received_ts
):
diff --git a/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql b/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql
new file mode 100644
index 0000000000..41807eb1e7
--- /dev/null
+++ b/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql
@@ -0,0 +1,20 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Opentracing context data for inclusion in the device_list_update EDUs, as a
+ * json-encoded dictionary. NULL if opentracing is disabled (or not enabled for this destination).
+ */
+ALTER TABLE device_lists_outbound_pokes ADD opentracing_context TEXT;
diff --git a/synapse/storage/schema/delta/56/drop_unused_event_tables.sql b/synapse/storage/schema/delta/56/drop_unused_event_tables.sql
new file mode 100644
index 0000000000..9f09922c67
--- /dev/null
+++ b/synapse/storage/schema/delta/56/drop_unused_event_tables.sql
@@ -0,0 +1,20 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- these tables are never used.
+DROP TABLE IF EXISTS room_names;
+DROP TABLE IF EXISTS topics;
+DROP TABLE IF EXISTS history_visibility;
+DROP TABLE IF EXISTS guest_access;
diff --git a/synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql b/synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql
new file mode 100644
index 0000000000..149f8be8b6
--- /dev/null
+++ b/synapse/storage/schema/delta/56/users_in_public_rooms_idx.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 Matrix.org Foundation CIC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- this was apparently forgotten when the table was created back in delta 53.
+CREATE INDEX users_in_public_rooms_r_idx ON users_in_public_rooms(room_id);
|