diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index 3c6bddff7a..e16c386a14 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -26,6 +26,7 @@ CLIENT_API_PREFIX = "/_matrix/client"
FEDERATION_PREFIX = "/_matrix/federation"
FEDERATION_V1_PREFIX = FEDERATION_PREFIX + "/v1"
FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2"
+FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable"
STATIC_PREFIX = "/_matrix/static"
WEB_CLIENT_PREFIX = "/_matrix/client"
CONTENT_REPO_PREFIX = "/_matrix/content"
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 08199a5e8d..8cc990399f 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -344,15 +344,21 @@ class _LimitedHostnameResolver(object):
def resolveHostName(self, resolutionReceiver, hostName, portNumber=0,
addressTypes=None, transportSemantics='TCP'):
- # Note this is happening deep within the reactor, so we don't need to
- # worry about log contexts.
-
# We need this function to return `resolutionReceiver` so we do all the
# actual logic involving deferreds in a separate function.
- self._resolve(
- resolutionReceiver, hostName, portNumber,
- addressTypes, transportSemantics,
- )
+
+ # even though this is happening within the depths of twisted, we need to drop
+ # our logcontext before starting _resolve, otherwise: (a) _resolve will drop
+ # the logcontext if it returns an incomplete deferred; (b) _resolve will
+ # call the resolutionReceiver *with* a logcontext, which it won't be expecting.
+ with PreserveLoggingContext():
+ self._resolve(
+ resolutionReceiver,
+ hostName,
+ portNumber,
+ addressTypes,
+ transportSemantics,
+ )
return resolutionReceiver
diff --git a/synapse/config/server.py b/synapse/config/server.py
index e9120d4d75..e763e19e15 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -414,6 +414,7 @@ class ServerConfig(Config):
#
# For example, for room version 1, default_room_version should be set
# to "1".
+ #
#default_room_version: "%(default_room_version)s"
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
diff --git a/synapse/config/user_directory.py b/synapse/config/user_directory.py
index 142754a7dc..023997ccde 100644
--- a/synapse/config/user_directory.py
+++ b/synapse/config/user_directory.py
@@ -43,9 +43,9 @@ class UserDirectoryConfig(Config):
#
# 'search_all_users' defines whether to search all users visible to your HS
# when searching the user directory, rather than limiting to users visible
- # in public rooms. Defaults to false. If you set it True, you'll have to run
- # UPDATE user_directory_stream_pos SET stream_id = NULL;
- # on your database to tell it to rebuild the user_directory search indexes.
+ # in public rooms. Defaults to false. If you set it True, you'll have to
+ # rebuild the user_directory search indexes, see
+ # https://github.com/matrix-org/synapse/blob/master/docs/user_directory.md
#
#user_directory:
# enabled: true
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index eaf41b983c..c63f106cf3 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -17,6 +17,7 @@
import logging
from collections import namedtuple
+import six
from six import raise_from
from six.moves import urllib
@@ -180,9 +181,7 @@ class Keyring(object):
# We want to wait for any previous lookups to complete before
# proceeding.
- yield self.wait_for_previous_lookups(
- [rq.server_name for rq in verify_requests], server_to_deferred
- )
+ yield self.wait_for_previous_lookups(server_to_deferred)
# Actually start fetching keys.
self._get_server_verify_keys(verify_requests)
@@ -215,12 +214,11 @@ class Keyring(object):
logger.exception("Error starting key lookups")
@defer.inlineCallbacks
- def wait_for_previous_lookups(self, server_names, server_to_deferred):
+ def wait_for_previous_lookups(self, server_to_deferred):
"""Waits for any previous key lookups for the given servers to finish.
Args:
- server_names (list): list of server_names we want to lookup
- server_to_deferred (dict): server_name to deferred which gets
+ server_to_deferred (dict[str, Deferred]): server_name to deferred which gets
resolved once we've finished looking up keys for that server.
The Deferreds should be regular twisted ones which call their
callbacks with no logcontext.
@@ -233,7 +231,7 @@ class Keyring(object):
while True:
wait_on = [
(server_name, self.key_downloads[server_name])
- for server_name in server_names
+ for server_name in server_to_deferred.keys()
if server_name in self.key_downloads
]
if not wait_on:
@@ -349,6 +347,7 @@ class KeyFetcher(object):
Args:
server_name_and_key_ids (iterable[Tuple[str, iterable[str]]]):
list of (server_name, iterable[key_id]) tuples to fetch keys for
+ Note that the iterables may be iterated more than once.
Returns:
Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult|None]]]:
@@ -394,8 +393,7 @@ class BaseV2KeyFetcher(object):
POST /_matrix/key/v2/query.
Checks that each signature in the response that claims to come from the origin
- server is valid. (Does not check that there actually is such a signature, for
- some reason.)
+ server is valid, and that there is at least one such signature.
Stores the json in server_keys_json so that it can be used for future responses
to /_matrix/key/v2/query.
@@ -430,16 +428,25 @@ class BaseV2KeyFetcher(object):
verify_key=verify_key, valid_until_ts=ts_valid_until_ms
)
- # TODO: improve this signature checking
server_name = response_json["server_name"]
+ verified = False
for key_id in response_json["signatures"].get(server_name, {}):
- if key_id not in verify_keys:
+ # each of the keys used for the signature must be present in the response
+ # json.
+ key = verify_keys.get(key_id)
+ if not key:
raise KeyLookupError(
- "Key response must include verification keys for all signatures"
+ "Key response is signed by key id %s:%s but that key is not "
+ "present in the response" % (server_name, key_id)
)
- verify_signed_json(
- response_json, server_name, verify_keys[key_id].verify_key
+ verify_signed_json(response_json, server_name, key.verify_key)
+ verified = True
+
+ if not verified:
+ raise KeyLookupError(
+ "Key response for %s is not signed by the origin server"
+ % (server_name,)
)
for key_id, key_data in response_json["old_verify_keys"].items():
@@ -549,7 +556,16 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
Returns:
Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult]]]: map
from server_name -> key_id -> FetchKeyResult
+
+ Raises:
+ KeyLookupError if there was an error processing the entire response from
+ the server
"""
+ logger.info(
+ "Requesting keys %s from notary server %s",
+ server_names_and_key_ids,
+ perspective_name,
+ )
# TODO(mark): Set the minimum_valid_until_ts to that needed by
# the events being validated or the current time if validating
# an incoming request.
@@ -578,40 +594,31 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
time_now_ms = self.clock.time_msec()
for response in query_response["server_keys"]:
- if (
- u"signatures" not in response
- or perspective_name not in response[u"signatures"]
- ):
+ # do this first, so that we can give useful errors thereafter
+ server_name = response.get("server_name")
+ if not isinstance(server_name, six.string_types):
raise KeyLookupError(
- "Key response not signed by perspective server"
- " %r" % (perspective_name,)
+ "Malformed response from key notary server %s: invalid server_name"
+ % (perspective_name,)
)
- verified = False
- for key_id in response[u"signatures"][perspective_name]:
- if key_id in perspective_keys:
- verify_signed_json(
- response, perspective_name, perspective_keys[key_id]
- )
- verified = True
-
- if not verified:
- logging.info(
- "Response from perspective server %r not signed with a"
- " known key, signed with: %r, known keys: %r",
+ try:
+ processed_response = yield self._process_perspectives_response(
perspective_name,
- list(response[u"signatures"][perspective_name]),
- list(perspective_keys),
+ perspective_keys,
+ response,
+ time_added_ms=time_now_ms,
)
- raise KeyLookupError(
- "Response not signed with a known key for perspective"
- " server %r" % (perspective_name,)
+ except KeyLookupError as e:
+ logger.warning(
+ "Error processing response from key notary server %s for origin "
+ "server %s: %s",
+ perspective_name,
+ server_name,
+ e,
)
-
- processed_response = yield self.process_v2_response(
- perspective_name, response, time_added_ms=time_now_ms
- )
- server_name = response["server_name"]
+ # we continue to process the rest of the response
+ continue
added_keys.extend(
(server_name, key_id, key) for key_id, key in processed_response.items()
@@ -624,6 +631,53 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
defer.returnValue(keys)
+ def _process_perspectives_response(
+ self, perspective_name, perspective_keys, response, time_added_ms
+ ):
+ """Parse a 'Server Keys' structure from the result of a /key/query request
+
+ Checks that the entry is correctly signed by the perspectives server, and then
+ passes over to process_v2_response
+
+ Args:
+ perspective_name (str): the name of the notary server that produced this
+ result
+
+ perspective_keys (dict[str, VerifyKey]): map of key_id->key for the
+ notary server
+
+ response (dict): the json-decoded Server Keys response object
+
+ time_added_ms (int): the timestamp to record in server_keys_json
+
+ Returns:
+ Deferred[dict[str, FetchKeyResult]]: map from key_id to result object
+ """
+ if (
+ u"signatures" not in response
+ or perspective_name not in response[u"signatures"]
+ ):
+ raise KeyLookupError("Response not signed by the notary server")
+
+ verified = False
+ for key_id in response[u"signatures"][perspective_name]:
+ if key_id in perspective_keys:
+ verify_signed_json(response, perspective_name, perspective_keys[key_id])
+ verified = True
+
+ if not verified:
+ raise KeyLookupError(
+ "Response not signed with a known key: signed with: %r, known keys: %r"
+ % (
+ list(response[u"signatures"][perspective_name].keys()),
+ list(perspective_keys.keys()),
+ )
+ )
+
+ return self.process_v2_response(
+ perspective_name, response, time_added_ms=time_added_ms
+ )
+
class ServerKeyFetcher(BaseV2KeyFetcher):
"""KeyFetcher impl which fetches keys from the origin servers"""
@@ -677,12 +731,6 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
except HttpResponseException as e:
raise_from(KeyLookupError("Remote server returned an error"), e)
- if (
- u"signatures" not in response
- or server_name not in response[u"signatures"]
- ):
- raise KeyLookupError("Key response not signed by remote server")
-
if response["server_name"] != server_name:
raise KeyLookupError(
"Expected a response for server %r not %r"
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 1fe995f212..546b6f4982 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -76,6 +76,7 @@ class EventBuilder(object):
# someone tries to get them when they don't exist.
_state_key = attr.ib(default=None)
_redacts = attr.ib(default=None)
+ _origin_server_ts = attr.ib(default=None)
internal_metadata = attr.ib(default=attr.Factory(lambda: _EventInternalMetadata({})))
@@ -142,6 +143,9 @@ class EventBuilder(object):
if self._redacts is not None:
event_dict["redacts"] = self._redacts
+ if self._origin_server_ts is not None:
+ event_dict["origin_server_ts"] = self._origin_server_ts
+
defer.returnValue(
create_local_event_from_event_dict(
clock=self._clock,
@@ -209,6 +213,7 @@ class EventBuilderFactory(object):
content=key_values.get("content", {}),
unsigned=key_values.get("unsigned", {}),
redacts=key_values.get("redacts", None),
+ origin_server_ts=key_values.get("origin_server_ts", None),
)
@@ -245,7 +250,7 @@ def create_local_event_from_event_dict(clock, hostname, signing_key,
event_dict["event_id"] = _create_event_id(clock, hostname)
event_dict["origin"] = hostname
- event_dict["origin_server_ts"] = time_now
+ event_dict.setdefault("origin_server_ts", time_now)
event_dict.setdefault("unsigned", {})
age = event_dict["unsigned"].pop("age", 0)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 385eda2dca..d0efc4e0d3 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -23,7 +23,11 @@ from twisted.internet import defer
import synapse
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
from synapse.api.room_versions import RoomVersions
-from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
+from synapse.api.urls import (
+ FEDERATION_UNSTABLE_PREFIX,
+ FEDERATION_V1_PREFIX,
+ FEDERATION_V2_PREFIX,
+)
from synapse.http.endpoint import parse_and_validate_server_name
from synapse.http.server import JsonResource
from synapse.http.servlet import (
@@ -1304,6 +1308,30 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
defer.returnValue((200, new_content))
+class RoomComplexityServlet(BaseFederationServlet):
+ """
+ Indicates to other servers how complex (and therefore likely
+ resource-intensive) a public room this server knows about is.
+ """
+ PATH = "/rooms/(?P<room_id>[^/]*)/complexity"
+ PREFIX = FEDERATION_UNSTABLE_PREFIX
+
+ @defer.inlineCallbacks
+ def on_GET(self, origin, content, query, room_id):
+
+ store = self.handler.hs.get_datastore()
+
+ is_public = yield store.is_room_world_readable_or_publicly_joinable(
+ room_id
+ )
+
+ if not is_public:
+ raise SynapseError(404, "Room not found", errcode=Codes.INVALID_PARAM)
+
+ complexity = yield store.get_room_complexity(room_id)
+ defer.returnValue((200, complexity))
+
+
FEDERATION_SERVLET_CLASSES = (
FederationSendServlet,
FederationEventServlet,
@@ -1327,6 +1355,7 @@ FEDERATION_SERVLET_CLASSES = (
FederationThirdPartyInviteExchangeServlet,
On3pidBindServlet,
FederationVersionServlet,
+ RoomComplexityServlet,
)
OPENID_SERVLET_CLASSES = (
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 59d53f1050..6209858bbb 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -182,17 +182,27 @@ class PresenceHandler(object):
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
+ def run_timeout_handler():
+ return run_as_background_process(
+ "handle_presence_timeouts", self._handle_timeouts
+ )
+
self.clock.call_later(
30,
self.clock.looping_call,
- self._handle_timeouts,
+ run_timeout_handler,
5000,
)
+ def run_persister():
+ return run_as_background_process(
+ "persist_presence_changes", self._persist_unpersisted_changes
+ )
+
self.clock.call_later(
60,
self.clock.looping_call,
- self._persist_unpersisted_changes,
+ run_persister,
60 * 1000,
)
@@ -229,6 +239,7 @@ class PresenceHandler(object):
)
if self.unpersisted_users_changes:
+
yield self.store.update_presence([
self.user_to_current_state[user_id]
for user_id in self.unpersisted_users_changes
@@ -240,30 +251,18 @@ class PresenceHandler(object):
"""We periodically persist the unpersisted changes, as otherwise they
may stack up and slow down shutdown times.
"""
- logger.info(
- "Performing _persist_unpersisted_changes. Persisting %d unpersisted changes",
- len(self.unpersisted_users_changes)
- )
-
unpersisted = self.unpersisted_users_changes
self.unpersisted_users_changes = set()
if unpersisted:
+ logger.info(
+ "Persisting %d upersisted presence updates", len(unpersisted)
+ )
yield self.store.update_presence([
self.user_to_current_state[user_id]
for user_id in unpersisted
])
- logger.info("Finished _persist_unpersisted_changes")
-
- @defer.inlineCallbacks
- def _update_states_and_catch_exception(self, new_states):
- try:
- res = yield self._update_states(new_states)
- defer.returnValue(res)
- except Exception:
- logger.exception("Error updating presence")
-
@defer.inlineCallbacks
def _update_states(self, new_states):
"""Updates presence of users. Sets the appropriate timeouts. Pokes
@@ -338,45 +337,41 @@ class PresenceHandler(object):
logger.info("Handling presence timeouts")
now = self.clock.time_msec()
- try:
- with Measure(self.clock, "presence_handle_timeouts"):
- # Fetch the list of users that *may* have timed out. Things may have
- # changed since the timeout was set, so we won't necessarily have to
- # take any action.
- users_to_check = set(self.wheel_timer.fetch(now))
-
- # Check whether the lists of syncing processes from an external
- # process have expired.
- expired_process_ids = [
- process_id for process_id, last_update
- in self.external_process_last_updated_ms.items()
- if now - last_update > EXTERNAL_PROCESS_EXPIRY
- ]
- for process_id in expired_process_ids:
- users_to_check.update(
- self.external_process_last_updated_ms.pop(process_id, ())
- )
- self.external_process_last_update.pop(process_id)
+ # Fetch the list of users that *may* have timed out. Things may have
+ # changed since the timeout was set, so we won't necessarily have to
+ # take any action.
+ users_to_check = set(self.wheel_timer.fetch(now))
+
+ # Check whether the lists of syncing processes from an external
+ # process have expired.
+ expired_process_ids = [
+ process_id for process_id, last_update
+ in self.external_process_last_updated_ms.items()
+ if now - last_update > EXTERNAL_PROCESS_EXPIRY
+ ]
+ for process_id in expired_process_ids:
+ users_to_check.update(
+ self.external_process_last_updated_ms.pop(process_id, ())
+ )
+ self.external_process_last_update.pop(process_id)
- states = [
- self.user_to_current_state.get(
- user_id, UserPresenceState.default(user_id)
- )
- for user_id in users_to_check
- ]
+ states = [
+ self.user_to_current_state.get(
+ user_id, UserPresenceState.default(user_id)
+ )
+ for user_id in users_to_check
+ ]
- timers_fired_counter.inc(len(states))
+ timers_fired_counter.inc(len(states))
- changes = handle_timeouts(
- states,
- is_mine_fn=self.is_mine_id,
- syncing_user_ids=self.get_currently_syncing_users(),
- now=now,
- )
+ changes = handle_timeouts(
+ states,
+ is_mine_fn=self.is_mine_id,
+ syncing_user_ids=self.get_currently_syncing_users(),
+ now=now,
+ )
- run_in_background(self._update_states_and_catch_exception, changes)
- except Exception:
- logger.exception("Exception in _handle_timeouts loop")
+ return self._update_states(changes)
@defer.inlineCallbacks
def bump_presence_active_time(self, user):
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 7eefc7b1fc..8197619a78 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -711,10 +711,6 @@ class MatrixFederationHttpClient(object):
RequestSendFailed: If there were problems connecting to the
remote, due to e.g. DNS failures, connection timeouts etc.
"""
- logger.debug("get_json args: %s", args)
-
- logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
-
request = MatrixFederationRequest(
method="GET",
destination=destination,
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 528125e737..197c652850 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -55,7 +55,7 @@ def parse_integer_from_args(args, name, default=None, required=False):
return int(args[name][0])
except Exception:
message = "Query parameter %r must be an integer" % (name,)
- raise SynapseError(400, message)
+ raise SynapseError(400, message, errcode=Codes.INVALID_PARAM)
else:
if required:
message = "Missing integer query parameter %r" % (name,)
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 744d85594f..d6c4dcdb18 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -822,10 +822,16 @@ class AdminRestResource(JsonResource):
def __init__(self, hs):
JsonResource.__init__(self, hs, canonical_json=False)
+ register_servlets(hs, self)
- register_servlets_for_client_rest_resource(hs, self)
- SendServerNoticeServlet(hs).register(self)
- VersionServlet(hs).register(self)
+
+def register_servlets(hs, http_server):
+ """
+ Register all the admin servlets.
+ """
+ register_servlets_for_client_rest_resource(hs, http_server)
+ SendServerNoticeServlet(hs).register(http_server)
+ VersionServlet(hs).register(http_server)
def register_servlets_for_client_rest_resource(hs, http_server):
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 5180e9eaf1..029039c162 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -386,7 +386,7 @@ class CasRedirectServlet(RestServlet):
b"redirectUrl": args[b"redirectUrl"][0]
}).encode('ascii')
hs_redirect_url = (self.cas_service_url +
- b"/_matrix/client/api/v1/login/cas/ticket")
+ b"/_matrix/client/r0/login/cas/ticket")
service_param = urllib.parse.urlencode({
b"service": b"%s?%s" % (hs_redirect_url, client_redirect_url_param)
}).encode('ascii')
@@ -395,7 +395,7 @@ class CasRedirectServlet(RestServlet):
class CasTicketServlet(ClientV1RestServlet):
- PATTERNS = client_path_patterns("/login/cas/ticket", releases=())
+ PATTERNS = client_path_patterns("/login/cas/ticket")
def __init__(self, hs):
super(CasTicketServlet, self).__init__(hs)
diff --git a/synapse/rest/client/v1/logout.py b/synapse/rest/client/v1/logout.py
index 430c692336..ba20e75033 100644
--- a/synapse/rest/client/v1/logout.py
+++ b/synapse/rest/client/v1/logout.py
@@ -17,8 +17,6 @@ import logging
from twisted.internet import defer
-from synapse.api.errors import AuthError
-
from .base import ClientV1RestServlet, client_path_patterns
logger = logging.getLogger(__name__)
@@ -38,23 +36,16 @@ class LogoutRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
- try:
- requester = yield self.auth.get_user_by_req(request)
- except AuthError:
- # this implies the access token has already been deleted.
- defer.returnValue((401, {
- "errcode": "M_UNKNOWN_TOKEN",
- "error": "Access Token unknown or expired"
- }))
+ requester = yield self.auth.get_user_by_req(request)
+
+ if requester.device_id is None:
+ # the acccess token wasn't associated with a device.
+ # Just delete the access token
+ access_token = self._auth.get_access_token_from_request(request)
+ yield self._auth_handler.delete_access_token(access_token)
else:
- if requester.device_id is None:
- # the acccess token wasn't associated with a device.
- # Just delete the access token
- access_token = self._auth.get_access_token_from_request(request)
- yield self._auth_handler.delete_access_token(access_token)
- else:
- yield self._device_handler.delete_device(
- requester.user.to_string(), requester.device_id)
+ yield self._device_handler.delete_device(
+ requester.user.to_string(), requester.device_id)
defer.returnValue((200, {}))
diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py
index 5305e9175f..35a750923b 100644
--- a/synapse/rest/media/v1/thumbnail_resource.py
+++ b/synapse/rest/media/v1/thumbnail_resource.py
@@ -56,8 +56,8 @@ class ThumbnailResource(Resource):
def _async_render_GET(self, request):
set_cors_headers(request)
server_name, media_id, _ = parse_media_id(request)
- width = parse_integer(request, "width")
- height = parse_integer(request, "height")
+ width = parse_integer(request, "width", required=True)
+ height = parse_integer(request, "height", required=True)
method = parse_string(request, "method", "scale")
m_type = parse_string(request, "type", "image/png")
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 66675d08ae..71316f7d09 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -36,6 +36,7 @@ from .engines import PostgresEngine
from .event_federation import EventFederationStore
from .event_push_actions import EventPushActionsStore
from .events import EventsStore
+from .events_bg_updates import EventsBackgroundUpdatesStore
from .filtering import FilteringStore
from .group_server import GroupServerStore
from .keys import KeyStore
@@ -66,6 +67,7 @@ logger = logging.getLogger(__name__)
class DataStore(
+ EventsBackgroundUpdatesStore,
RoomMemberStore,
RoomStore,
RegistrationStore,
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index fa6839ceca..3fe827cd43 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -1261,7 +1261,8 @@ class SQLBaseStore(object):
" AND ".join("%s = ?" % (k,) for k in keyvalues),
)
- return txn.execute(sql, list(keyvalues.values()))
+ txn.execute(sql, list(keyvalues.values()))
+ return txn.rowcount
def _simple_delete_many(self, table, column, iterable, keyvalues, desc):
return self.runInteraction(
@@ -1280,9 +1281,12 @@ class SQLBaseStore(object):
column : column name to test for inclusion against `iterable`
iterable : list
keyvalues : dict of column names and values to select the rows with
+
+ Returns:
+ int: Number rows deleted
"""
if not iterable:
- return
+ return 0
sql = "DELETE FROM %s" % table
@@ -1297,7 +1301,9 @@ class SQLBaseStore(object):
if clauses:
sql = "%s WHERE %s" % (sql, " AND ".join(clauses))
- return txn.execute(sql, values)
+ txn.execute(sql, values)
+
+ return txn.rowcount
def _get_cache_dict(
self, db_conn, table, entity_column, stream_column, max_value, limit=100000
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2ffc27ff41..f9162be9b9 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2018-2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -219,41 +220,11 @@ class EventsStore(
EventsWorkerStore,
BackgroundUpdateStore,
):
- EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
- EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
- self.register_background_update_handler(
- self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
- )
- self.register_background_update_handler(
- self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
- self._background_reindex_fields_sender,
- )
-
- self.register_background_index_update(
- "event_contains_url_index",
- index_name="event_contains_url_index",
- table="events",
- columns=["room_id", "topological_ordering", "stream_ordering"],
- where_clause="contains_url = true AND outlier = false",
- )
-
- # an event_id index on event_search is useful for the purge_history
- # api. Plus it means we get to enforce some integrity with a UNIQUE
- # clause
- self.register_background_index_update(
- "event_search_event_id_idx",
- index_name="event_search_event_id_idx",
- table="event_search",
- columns=["event_id"],
- unique=True,
- psql_only=True,
- )
self._event_persist_queue = _EventPeristenceQueue()
-
self._state_resolution_handler = hs.get_state_resolution_handler()
@defer.inlineCallbacks
@@ -554,10 +525,18 @@ class EventsStore(
e_id for event in new_events for e_id in event.prev_event_ids()
)
- # Finally, remove any events which are prev_events of any existing events.
+ # Remove any events which are prev_events of any existing events.
existing_prevs = yield self._get_events_which_are_prevs(result)
result.difference_update(existing_prevs)
+ # Finally handle the case where the new events have soft-failed prev
+ # events. If they do we need to remove them and their prev events,
+ # otherwise we end up with dangling extremities.
+ existing_prevs = yield self._get_prevs_before_rejected(
+ e_id for event in new_events for e_id in event.prev_event_ids()
+ )
+ result.difference_update(existing_prevs)
+
defer.returnValue(result)
@defer.inlineCallbacks
@@ -573,7 +552,7 @@ class EventsStore(
"""
results = []
- def _get_events(txn, batch):
+ def _get_events_which_are_prevs_txn(txn, batch):
sql = """
SELECT prev_event_id, internal_metadata
FROM event_edges
@@ -596,11 +575,79 @@ class EventsStore(
)
for chunk in batch_iter(event_ids, 100):
- yield self.runInteraction("_get_events_which_are_prevs", _get_events, chunk)
+ yield self.runInteraction(
+ "_get_events_which_are_prevs",
+ _get_events_which_are_prevs_txn,
+ chunk,
+ )
defer.returnValue(results)
@defer.inlineCallbacks
+ def _get_prevs_before_rejected(self, event_ids):
+ """Get soft-failed ancestors to remove from the extremities.
+
+ Given a set of events, find all those that have been soft-failed or
+ rejected. Returns those soft failed/rejected events and their prev
+ events (whether soft-failed/rejected or not), and recurses up the
+ prev-event graph until it finds no more soft-failed/rejected events.
+
+ This is used to find extremities that are ancestors of new events, but
+ are separated by soft failed events.
+
+ Args:
+ event_ids (Iterable[str]): Events to find prev events for. Note
+ that these must have already been persisted.
+
+ Returns:
+ Deferred[set[str]]
+ """
+
+ # The set of event_ids to return. This includes all soft-failed events
+ # and their prev events.
+ existing_prevs = set()
+
+ def _get_prevs_before_rejected_txn(txn, batch):
+ to_recursively_check = batch
+
+ while to_recursively_check:
+ sql = """
+ SELECT
+ event_id, prev_event_id, internal_metadata,
+ rejections.event_id IS NOT NULL
+ FROM event_edges
+ INNER JOIN events USING (event_id)
+ LEFT JOIN rejections USING (event_id)
+ LEFT JOIN event_json USING (event_id)
+ WHERE
+ event_id IN (%s)
+ AND NOT events.outlier
+ """ % (
+ ",".join("?" for _ in to_recursively_check),
+ )
+
+ txn.execute(sql, to_recursively_check)
+ to_recursively_check = []
+
+ for event_id, prev_event_id, metadata, rejected in txn:
+ if prev_event_id in existing_prevs:
+ continue
+
+ soft_failed = json.loads(metadata).get("soft_failed")
+ if soft_failed or rejected:
+ to_recursively_check.append(prev_event_id)
+ existing_prevs.add(prev_event_id)
+
+ for chunk in batch_iter(event_ids, 100):
+ yield self.runInteraction(
+ "_get_prevs_before_rejected",
+ _get_prevs_before_rejected_txn,
+ chunk,
+ )
+
+ defer.returnValue(existing_prevs)
+
+ @defer.inlineCallbacks
def _get_new_state_after_events(
self, room_id, events_context, old_latest_event_ids, new_latest_event_ids
):
@@ -1503,153 +1550,6 @@ class EventsStore(
ret = yield self.runInteraction("count_daily_active_rooms", _count)
defer.returnValue(ret)
- @defer.inlineCallbacks
- def _background_reindex_fields_sender(self, progress, batch_size):
- target_min_stream_id = progress["target_min_stream_id_inclusive"]
- max_stream_id = progress["max_stream_id_exclusive"]
- rows_inserted = progress.get("rows_inserted", 0)
-
- INSERT_CLUMP_SIZE = 1000
-
- def reindex_txn(txn):
- sql = (
- "SELECT stream_ordering, event_id, json FROM events"
- " INNER JOIN event_json USING (event_id)"
- " WHERE ? <= stream_ordering AND stream_ordering < ?"
- " ORDER BY stream_ordering DESC"
- " LIMIT ?"
- )
-
- txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
-
- rows = txn.fetchall()
- if not rows:
- return 0
-
- min_stream_id = rows[-1][0]
-
- update_rows = []
- for row in rows:
- try:
- event_id = row[1]
- event_json = json.loads(row[2])
- sender = event_json["sender"]
- content = event_json["content"]
-
- contains_url = "url" in content
- if contains_url:
- contains_url &= isinstance(content["url"], text_type)
- except (KeyError, AttributeError):
- # If the event is missing a necessary field then
- # skip over it.
- continue
-
- update_rows.append((sender, contains_url, event_id))
-
- sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
-
- for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
- clump = update_rows[index : index + INSERT_CLUMP_SIZE]
- txn.executemany(sql, clump)
-
- progress = {
- "target_min_stream_id_inclusive": target_min_stream_id,
- "max_stream_id_exclusive": min_stream_id,
- "rows_inserted": rows_inserted + len(rows),
- }
-
- self._background_update_progress_txn(
- txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
- )
-
- return len(rows)
-
- result = yield self.runInteraction(
- self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
- )
-
- if not result:
- yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
-
- defer.returnValue(result)
-
- @defer.inlineCallbacks
- def _background_reindex_origin_server_ts(self, progress, batch_size):
- target_min_stream_id = progress["target_min_stream_id_inclusive"]
- max_stream_id = progress["max_stream_id_exclusive"]
- rows_inserted = progress.get("rows_inserted", 0)
-
- INSERT_CLUMP_SIZE = 1000
-
- def reindex_search_txn(txn):
- sql = (
- "SELECT stream_ordering, event_id FROM events"
- " WHERE ? <= stream_ordering AND stream_ordering < ?"
- " ORDER BY stream_ordering DESC"
- " LIMIT ?"
- )
-
- txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
-
- rows = txn.fetchall()
- if not rows:
- return 0
-
- min_stream_id = rows[-1][0]
- event_ids = [row[1] for row in rows]
-
- rows_to_update = []
-
- chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
- for chunk in chunks:
- ev_rows = self._simple_select_many_txn(
- txn,
- table="event_json",
- column="event_id",
- iterable=chunk,
- retcols=["event_id", "json"],
- keyvalues={},
- )
-
- for row in ev_rows:
- event_id = row["event_id"]
- event_json = json.loads(row["json"])
- try:
- origin_server_ts = event_json["origin_server_ts"]
- except (KeyError, AttributeError):
- # If the event is missing a necessary field then
- # skip over it.
- continue
-
- rows_to_update.append((origin_server_ts, event_id))
-
- sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
-
- for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
- clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
- txn.executemany(sql, clump)
-
- progress = {
- "target_min_stream_id_inclusive": target_min_stream_id,
- "max_stream_id_exclusive": min_stream_id,
- "rows_inserted": rows_inserted + len(rows_to_update),
- }
-
- self._background_update_progress_txn(
- txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
- )
-
- return len(rows_to_update)
-
- result = yield self.runInteraction(
- self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
- )
-
- if not result:
- yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
-
- defer.returnValue(result)
-
def get_current_backfill_token(self):
"""The current minimum token that backfilled events have reached"""
return -self._backfill_id_gen.get_current_token()
diff --git a/synapse/storage/events_bg_updates.py b/synapse/storage/events_bg_updates.py
new file mode 100644
index 0000000000..75c1935bf3
--- /dev/null
+++ b/synapse/storage/events_bg_updates.py
@@ -0,0 +1,401 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from six import text_type
+
+from canonicaljson import json
+
+from twisted.internet import defer
+
+from synapse.storage.background_updates import BackgroundUpdateStore
+
+logger = logging.getLogger(__name__)
+
+
+class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
+
+ EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
+ EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
+ DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
+
+ def __init__(self, db_conn, hs):
+ super(EventsBackgroundUpdatesStore, self).__init__(db_conn, hs)
+
+ self.register_background_update_handler(
+ self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
+ )
+ self.register_background_update_handler(
+ self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
+ self._background_reindex_fields_sender,
+ )
+
+ self.register_background_index_update(
+ "event_contains_url_index",
+ index_name="event_contains_url_index",
+ table="events",
+ columns=["room_id", "topological_ordering", "stream_ordering"],
+ where_clause="contains_url = true AND outlier = false",
+ )
+
+ # an event_id index on event_search is useful for the purge_history
+ # api. Plus it means we get to enforce some integrity with a UNIQUE
+ # clause
+ self.register_background_index_update(
+ "event_search_event_id_idx",
+ index_name="event_search_event_id_idx",
+ table="event_search",
+ columns=["event_id"],
+ unique=True,
+ psql_only=True,
+ )
+
+ self.register_background_update_handler(
+ self.DELETE_SOFT_FAILED_EXTREMITIES,
+ self._cleanup_extremities_bg_update,
+ )
+
+ @defer.inlineCallbacks
+ def _background_reindex_fields_sender(self, progress, batch_size):
+ target_min_stream_id = progress["target_min_stream_id_inclusive"]
+ max_stream_id = progress["max_stream_id_exclusive"]
+ rows_inserted = progress.get("rows_inserted", 0)
+
+ INSERT_CLUMP_SIZE = 1000
+
+ def reindex_txn(txn):
+ sql = (
+ "SELECT stream_ordering, event_id, json FROM events"
+ " INNER JOIN event_json USING (event_id)"
+ " WHERE ? <= stream_ordering AND stream_ordering < ?"
+ " ORDER BY stream_ordering DESC"
+ " LIMIT ?"
+ )
+
+ txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+ rows = txn.fetchall()
+ if not rows:
+ return 0
+
+ min_stream_id = rows[-1][0]
+
+ update_rows = []
+ for row in rows:
+ try:
+ event_id = row[1]
+ event_json = json.loads(row[2])
+ sender = event_json["sender"]
+ content = event_json["content"]
+
+ contains_url = "url" in content
+ if contains_url:
+ contains_url &= isinstance(content["url"], text_type)
+ except (KeyError, AttributeError):
+ # If the event is missing a necessary field then
+ # skip over it.
+ continue
+
+ update_rows.append((sender, contains_url, event_id))
+
+ sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
+
+ for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
+ clump = update_rows[index : index + INSERT_CLUMP_SIZE]
+ txn.executemany(sql, clump)
+
+ progress = {
+ "target_min_stream_id_inclusive": target_min_stream_id,
+ "max_stream_id_exclusive": min_stream_id,
+ "rows_inserted": rows_inserted + len(rows),
+ }
+
+ self._background_update_progress_txn(
+ txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
+ )
+
+ return len(rows)
+
+ result = yield self.runInteraction(
+ self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
+ )
+
+ if not result:
+ yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
+
+ defer.returnValue(result)
+
+ @defer.inlineCallbacks
+ def _background_reindex_origin_server_ts(self, progress, batch_size):
+ target_min_stream_id = progress["target_min_stream_id_inclusive"]
+ max_stream_id = progress["max_stream_id_exclusive"]
+ rows_inserted = progress.get("rows_inserted", 0)
+
+ INSERT_CLUMP_SIZE = 1000
+
+ def reindex_search_txn(txn):
+ sql = (
+ "SELECT stream_ordering, event_id FROM events"
+ " WHERE ? <= stream_ordering AND stream_ordering < ?"
+ " ORDER BY stream_ordering DESC"
+ " LIMIT ?"
+ )
+
+ txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+ rows = txn.fetchall()
+ if not rows:
+ return 0
+
+ min_stream_id = rows[-1][0]
+ event_ids = [row[1] for row in rows]
+
+ rows_to_update = []
+
+ chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
+ for chunk in chunks:
+ ev_rows = self._simple_select_many_txn(
+ txn,
+ table="event_json",
+ column="event_id",
+ iterable=chunk,
+ retcols=["event_id", "json"],
+ keyvalues={},
+ )
+
+ for row in ev_rows:
+ event_id = row["event_id"]
+ event_json = json.loads(row["json"])
+ try:
+ origin_server_ts = event_json["origin_server_ts"]
+ except (KeyError, AttributeError):
+ # If the event is missing a necessary field then
+ # skip over it.
+ continue
+
+ rows_to_update.append((origin_server_ts, event_id))
+
+ sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
+
+ for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
+ clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
+ txn.executemany(sql, clump)
+
+ progress = {
+ "target_min_stream_id_inclusive": target_min_stream_id,
+ "max_stream_id_exclusive": min_stream_id,
+ "rows_inserted": rows_inserted + len(rows_to_update),
+ }
+
+ self._background_update_progress_txn(
+ txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
+ )
+
+ return len(rows_to_update)
+
+ result = yield self.runInteraction(
+ self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
+ )
+
+ if not result:
+ yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
+
+ defer.returnValue(result)
+
+ @defer.inlineCallbacks
+ def _cleanup_extremities_bg_update(self, progress, batch_size):
+ """Background update to clean out extremities that should have been
+ deleted previously.
+
+ Mainly used to deal with the aftermath of #5269.
+ """
+
+ # This works by first copying all existing forward extremities into the
+ # `_extremities_to_check` table at start up, and then checking each
+ # event in that table whether we have any descendants that are not
+ # soft-failed/rejected. If that is the case then we delete that event
+ # from the forward extremities table.
+ #
+ # For efficiency, we do this in batches by recursively pulling out all
+ # descendants of a batch until we find the non soft-failed/rejected
+ # events, i.e. the set of descendants whose chain of prev events back
+ # to the batch of extremities are all soft-failed or rejected.
+ # Typically, we won't find any such events as extremities will rarely
+ # have any descendants, but if they do then we should delete those
+ # extremities.
+
+ def _cleanup_extremities_bg_update_txn(txn):
+ # The set of extremity event IDs that we're checking this round
+ original_set = set()
+
+ # A dict[str, set[str]] of event ID to their prev events.
+ graph = {}
+
+ # The set of descendants of the original set that are not rejected
+ # nor soft-failed. Ancestors of these events should be removed
+ # from the forward extremities table.
+ non_rejected_leaves = set()
+
+ # Set of event IDs that have been soft failed, and for which we
+ # should check if they have descendants which haven't been soft
+ # failed.
+ soft_failed_events_to_lookup = set()
+
+ # First, we get `batch_size` events from the table, pulling out
+ # their successor events, if any, and the successor events'
+ # rejection status.
+ txn.execute(
+ """SELECT prev_event_id, event_id, internal_metadata,
+ rejections.event_id IS NOT NULL, events.outlier
+ FROM (
+ SELECT event_id AS prev_event_id
+ FROM _extremities_to_check
+ LIMIT ?
+ ) AS f
+ LEFT JOIN event_edges USING (prev_event_id)
+ LEFT JOIN events USING (event_id)
+ LEFT JOIN event_json USING (event_id)
+ LEFT JOIN rejections USING (event_id)
+ """, (batch_size,)
+ )
+
+ for prev_event_id, event_id, metadata, rejected, outlier in txn:
+ original_set.add(prev_event_id)
+
+ if not event_id or outlier:
+ # Common case where the forward extremity doesn't have any
+ # descendants.
+ continue
+
+ graph.setdefault(event_id, set()).add(prev_event_id)
+
+ soft_failed = False
+ if metadata:
+ soft_failed = json.loads(metadata).get("soft_failed")
+
+ if soft_failed or rejected:
+ soft_failed_events_to_lookup.add(event_id)
+ else:
+ non_rejected_leaves.add(event_id)
+
+ # Now we recursively check all the soft-failed descendants we
+ # found above in the same way, until we have nothing left to
+ # check.
+ while soft_failed_events_to_lookup:
+ # We only want to do 100 at a time, so we split given list
+ # into two.
+ batch = list(soft_failed_events_to_lookup)
+ to_check, to_defer = batch[:100], batch[100:]
+ soft_failed_events_to_lookup = set(to_defer)
+
+ sql = """SELECT prev_event_id, event_id, internal_metadata,
+ rejections.event_id IS NOT NULL
+ FROM event_edges
+ INNER JOIN events USING (event_id)
+ INNER JOIN event_json USING (event_id)
+ LEFT JOIN rejections USING (event_id)
+ WHERE
+ prev_event_id IN (%s)
+ AND NOT events.outlier
+ """ % (
+ ",".join("?" for _ in to_check),
+ )
+ txn.execute(sql, to_check)
+
+ for prev_event_id, event_id, metadata, rejected in txn:
+ if event_id in graph:
+ # Already handled this event previously, but we still
+ # want to record the edge.
+ graph[event_id].add(prev_event_id)
+ continue
+
+ graph[event_id] = {prev_event_id}
+
+ soft_failed = json.loads(metadata).get("soft_failed")
+ if soft_failed or rejected:
+ soft_failed_events_to_lookup.add(event_id)
+ else:
+ non_rejected_leaves.add(event_id)
+
+ # We have a set of non-soft-failed descendants, so we recurse up
+ # the graph to find all ancestors and add them to the set of event
+ # IDs that we can delete from forward extremities table.
+ to_delete = set()
+ while non_rejected_leaves:
+ event_id = non_rejected_leaves.pop()
+ prev_event_ids = graph.get(event_id, set())
+ non_rejected_leaves.update(prev_event_ids)
+ to_delete.update(prev_event_ids)
+
+ to_delete.intersection_update(original_set)
+
+ deleted = self._simple_delete_many_txn(
+ txn=txn,
+ table="event_forward_extremities",
+ column="event_id",
+ iterable=to_delete,
+ keyvalues={},
+ )
+
+ logger.info(
+ "Deleted %d forward extremities of %d checked, to clean up #5269",
+ deleted,
+ len(original_set),
+ )
+
+ if deleted:
+ # We now need to invalidate the caches of these rooms
+ rows = self._simple_select_many_txn(
+ txn,
+ table="events",
+ column="event_id",
+ iterable=to_delete,
+ keyvalues={},
+ retcols=("room_id",)
+ )
+ room_ids = set(row["room_id"] for row in rows)
+ for room_id in room_ids:
+ txn.call_after(
+ self.get_latest_event_ids_in_room.invalidate,
+ (room_id,)
+ )
+
+ self._simple_delete_many_txn(
+ txn=txn,
+ table="_extremities_to_check",
+ column="event_id",
+ iterable=original_set,
+ keyvalues={},
+ )
+
+ return len(original_set)
+
+ num_handled = yield self.runInteraction(
+ "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn,
+ )
+
+ if not num_handled:
+ yield self._end_background_update(self.DELETE_SOFT_FAILED_EXTREMITIES)
+
+ def _drop_table_txn(txn):
+ txn.execute("DROP TABLE _extremities_to_check")
+
+ yield self.runInteraction(
+ "_cleanup_extremities_bg_update_drop_table",
+ _drop_table_txn,
+ )
+
+ defer.returnValue(num_handled)
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 21b353cad3..b56c83e460 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import division
+
import itertools
import logging
from collections import namedtuple
@@ -614,7 +616,7 @@ class EventsWorkerStore(SQLBaseStore):
def _get_total_state_event_counts_txn(self, txn, room_id):
"""
- See get_state_event_counts.
+ See get_total_state_event_counts.
"""
sql = "SELECT COUNT(*) FROM state_events WHERE room_id=?"
txn.execute(sql, (room_id,))
@@ -635,3 +637,49 @@ class EventsWorkerStore(SQLBaseStore):
"get_total_state_event_counts",
self._get_total_state_event_counts_txn, room_id
)
+
+ def _get_current_state_event_counts_txn(self, txn, room_id):
+ """
+ See get_current_state_event_counts.
+ """
+ sql = "SELECT COUNT(*) FROM current_state_events WHERE room_id=?"
+ txn.execute(sql, (room_id,))
+ row = txn.fetchone()
+ return row[0] if row else 0
+
+ def get_current_state_event_counts(self, room_id):
+ """
+ Gets the current number of state events in a room.
+
+ Args:
+ room_id (str)
+
+ Returns:
+ Deferred[int]
+ """
+ return self.runInteraction(
+ "get_current_state_event_counts",
+ self._get_current_state_event_counts_txn, room_id
+ )
+
+ @defer.inlineCallbacks
+ def get_room_complexity(self, room_id):
+ """
+ Get a rough approximation of the complexity of the room. This is used by
+ remote servers to decide whether they wish to join the room or not.
+ Higher complexity value indicates that being in the room will consume
+ more resources.
+
+ Args:
+ room_id (str)
+
+ Returns:
+ Deferred[dict[str:int]] of complexity version to complexity.
+ """
+ state_events = yield self.get_current_state_event_counts(room_id)
+
+ # Call this one "v1", so we can introduce new ones as we want to develop
+ # it.
+ complexity_v1 = round(state_events / 500, 2)
+
+ defer.returnValue({"v1": complexity_v1})
diff --git a/synapse/storage/schema/delta/54/account_validity.sql b/synapse/storage/schema/delta/54/account_validity_with_renewal.sql
index 2357626000..0adb2ad55e 100644
--- a/synapse/storage/schema/delta/54/account_validity.sql
+++ b/synapse/storage/schema/delta/54/account_validity_with_renewal.sql
@@ -13,6 +13,9 @@
* limitations under the License.
*/
+-- We previously changed the schema for this table without renaming the file, which means
+-- that some databases might still be using the old schema. This ensures Synapse uses the
+-- right schema for the table.
DROP TABLE IF EXISTS account_validity;
-- Track what users are in public rooms.
diff --git a/synapse/storage/schema/delta/54/delete_forward_extremities.sql b/synapse/storage/schema/delta/54/delete_forward_extremities.sql
new file mode 100644
index 0000000000..aa40f13da7
--- /dev/null
+++ b/synapse/storage/schema/delta/54/delete_forward_extremities.sql
@@ -0,0 +1,22 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Start a background job to cleanup extremities that were incorrectly added
+-- by bug #5269.
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('delete_soft_failed_extremities', '{}');
+
+DROP TABLE IF EXISTS _extremities_to_check; -- To make this delta schema file idempotent.
+CREATE TABLE _extremities_to_check AS SELECT event_id FROM event_forward_extremities;
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 71b80a891d..eb0ced5b5e 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -169,7 +169,7 @@ class StatsStore(StateDeltasStore):
logger.info(
"Processing the next %d rooms of %d remaining",
- (len(rooms_to_work_on), progress["remaining"]),
+ len(rooms_to_work_on), progress["remaining"],
)
# Number of state events we've processed by going through each room
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 311b49e18a..fe412355d8 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -226,6 +226,8 @@ class LoggingContext(object):
self.request = request
def __str__(self):
+ if self.request:
+ return str(self.request)
return "%s@%x" % (self.name, id(self))
@classmethod
@@ -274,12 +276,10 @@ class LoggingContext(object):
current = self.set_current_context(self.previous_context)
if current is not self:
if current is self.sentinel:
- logger.warn("Expected logging context %s has been lost", self)
+ logger.warning("Expected logging context %s was lost", self)
else:
- logger.warn(
- "Current logging context %s is not expected context %s",
- current,
- self
+ logger.warning(
+ "Expected logging context %s but found %s", self, current
)
self.previous_context = None
self.alive = False
@@ -433,10 +433,14 @@ class PreserveLoggingContext(object):
context = LoggingContext.set_current_context(self.current_context)
if context != self.new_context:
- logger.warn(
- "Unexpected logging context: %s is not %s",
- context, self.new_context,
- )
+ if context is LoggingContext.sentinel:
+ logger.warning("Expected logging context %s was lost", self.new_context)
+ else:
+ logger.warning(
+ "Expected logging context %s but found %s",
+ self.new_context,
+ context,
+ )
if self.current_context is not LoggingContext.sentinel:
if not self.current_context.alive:
|