diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 72858cca1f..ac0a3655a5 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -270,7 +270,11 @@ class Auth(object):
rights (str): The operation being performed; the access token must
allow this.
Returns:
- dict : dict that includes the user and the ID of their access token.
+ Deferred[dict]: dict that includes:
+ `user` (UserID)
+ `is_guest` (bool)
+ `token_id` (int|None): access token id. May be None if guest
+ `device_id` (str|None): device corresponding to access token
Raises:
AuthError if no user by that token exists or the token is invalid.
"""
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 9e26146338..6b8875afb4 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -43,7 +43,6 @@ from synapse.rest import ClientRestResource
from synapse.rest.key.v1.server_key_resource import LocalKey
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
from synapse.server import HomeServer
from synapse.storage import are_all_users_on_domain
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
@@ -195,14 +194,19 @@ class SynapseHomeServer(HomeServer):
})
if name in ["media", "federation", "client"]:
- media_repo = MediaRepositoryResource(self)
- resources.update({
- MEDIA_PREFIX: media_repo,
- LEGACY_MEDIA_PREFIX: media_repo,
- CONTENT_REPO_PREFIX: ContentRepoResource(
- self, self.config.uploads_path
- ),
- })
+ if self.get_config().enable_media_repo:
+ media_repo = self.get_media_repository_resource()
+ resources.update({
+ MEDIA_PREFIX: media_repo,
+ LEGACY_MEDIA_PREFIX: media_repo,
+ CONTENT_REPO_PREFIX: ContentRepoResource(
+ self, self.config.uploads_path
+ ),
+ })
+ elif name == "media":
+ raise ConfigError(
+ "'media' resource conflicts with enable_media_repo=False",
+ )
if name in ["keys", "federation"]:
resources.update({
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 36c18bdbcb..c4e5f0965d 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -35,7 +35,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.media_repository import MediaRepositoryStore
@@ -89,7 +88,7 @@ class MediaRepositoryServer(HomeServer):
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
elif name == "media":
- media_repo = MediaRepositoryResource(self)
+ media_repo = self.get_media_repository_resource()
resources.update({
MEDIA_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
@@ -151,6 +150,13 @@ def start(config_options):
assert config.worker_app == "synapse.app.media_repository"
+ if config.enable_media_repo:
+ _base.quit_with_error(
+ "enable_media_repo must be disabled in the main synapse process\n"
+ "before the media repo can be run in a separate worker.\n"
+ "Please add ``enable_media_repo: false`` to the main config\n"
+ )
+
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 576ac6fb7e..323fddee21 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -340,11 +340,10 @@ class SyncReplicationHandler(ReplicationClientHandler):
self.store = hs.get_datastore()
self.typing_handler = hs.get_typing_handler()
+ # NB this is a SynchrotronPresence, not a normal PresenceHandler
self.presence_handler = hs.get_presence_handler()
self.notifier = hs.get_notifier()
- self.presence_handler.sync_callback = self.send_user_sync
-
def on_rdata(self, stream_name, token, rows):
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 4d9193536d..edb90a1348 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -41,6 +41,12 @@ class ServerConfig(Config):
# false only if we are updating the user directory in a worker
self.update_user_directory = config.get("update_user_directory", True)
+ # whether to enable the media repository endpoints. This should be set
+ # to false if the media repository is running as a separate endpoint;
+ # doing so ensures that we will not run cache cleanup jobs on the
+ # master, potentially causing inconsistency.
+ self.enable_media_repo = config.get("enable_media_repo", True)
+
self.filter_timeline_limit = config.get("filter_timeline_limit", -1)
# Whether we should block invites sent to users on this server
diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py
index 0d0e7b5286..aaa3efaca3 100644
--- a/synapse/crypto/event_signing.py
+++ b/synapse/crypto/event_signing.py
@@ -32,15 +32,22 @@ def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
"""Check whether the hash for this PDU matches the contents"""
name, expected_hash = compute_content_hash(event, hash_algorithm)
logger.debug("Expecting hash: %s", encode_base64(expected_hash))
- if name not in event.hashes:
+
+ # some malformed events lack a 'hashes'. Protect against it being missing
+ # or a weird type by basically treating it the same as an unhashed event.
+ hashes = event.get("hashes")
+ if not isinstance(hashes, dict):
+ raise SynapseError(400, "Malformed 'hashes'", Codes.UNAUTHORIZED)
+
+ if name not in hashes:
raise SynapseError(
400,
"Algorithm %s not in hashes %s" % (
- name, list(event.hashes),
+ name, list(hashes),
),
Codes.UNAUTHORIZED,
)
- message_hash_base64 = event.hashes[name]
+ message_hash_base64 = hashes[name]
try:
message_hash_bytes = decode_base64(message_hash_base64)
except Exception:
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 7a3c9cbb70..3e7809b04f 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -20,7 +20,7 @@ from .persistence import TransactionActions
from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException
-from synapse.util import logcontext
+from synapse.util import logcontext, PreserveLoggingContext
from synapse.util.async import run_on_reactor
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
@@ -146,7 +146,6 @@ class TransactionQueue(object):
else:
return not destination.startswith("localhost")
- @defer.inlineCallbacks
def notify_new_events(self, current_id):
"""This gets called when we have some new events we might want to
send out to other servers.
@@ -156,6 +155,13 @@ class TransactionQueue(object):
if self._is_processing:
return
+ # fire off a processing loop in the background. It's likely it will
+ # outlast the current request, so run it in the sentinel logcontext.
+ with PreserveLoggingContext():
+ self._process_event_queue_loop()
+
+ @defer.inlineCallbacks
+ def _process_event_queue_loop(self):
try:
self._is_processing = True
while True:
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 080eb14271..2f30f183ce 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -650,41 +650,6 @@ class AuthHandler(BaseHandler):
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
@defer.inlineCallbacks
- def set_password(self, user_id, newpassword, requester=None):
- password_hash = self.hash(newpassword)
-
- except_access_token_id = requester.access_token_id if requester else None
-
- try:
- yield self.store.user_set_password_hash(user_id, password_hash)
- except StoreError as e:
- if e.code == 404:
- raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
- raise e
- yield self.delete_access_tokens_for_user(
- user_id, except_token_id=except_access_token_id,
- )
- yield self.hs.get_pusherpool().remove_pushers_by_user(
- user_id, except_access_token_id
- )
-
- @defer.inlineCallbacks
- def deactivate_account(self, user_id):
- """Deactivate a user's account
-
- Args:
- user_id (str): ID of user to be deactivated
-
- Returns:
- Deferred
- """
- # FIXME: Theoretically there is a race here wherein user resets
- # password using threepid.
- yield self.delete_access_tokens_for_user(user_id)
- yield self.store.user_delete_threepids(user_id)
- yield self.store.user_set_password_hash(user_id, None)
-
- @defer.inlineCallbacks
def delete_access_token(self, access_token):
"""Invalidate a single access token
@@ -706,6 +671,12 @@ class AuthHandler(BaseHandler):
access_token=access_token,
)
+ # delete pushers associated with this access token
+ if user_info["token_id"] is not None:
+ yield self.hs.get_pusherpool().remove_pushers_by_access_token(
+ str(user_info["user"]), (user_info["token_id"], )
+ )
+
@defer.inlineCallbacks
def delete_access_tokens_for_user(self, user_id, except_token_id=None,
device_id=None):
@@ -728,13 +699,18 @@ class AuthHandler(BaseHandler):
# see if any of our auth providers want to know about this
for provider in self.password_providers:
if hasattr(provider, "on_logged_out"):
- for token, device_id in tokens_and_devices:
+ for token, token_id, device_id in tokens_and_devices:
yield provider.on_logged_out(
user_id=user_id,
device_id=device_id,
access_token=token,
)
+ # delete pushers associated with the access tokens
+ yield self.hs.get_pusherpool().remove_pushers_by_access_token(
+ user_id, (token_id for _, token_id, _ in tokens_and_devices),
+ )
+
@defer.inlineCallbacks
def add_threepid(self, user_id, medium, address, validated_at):
# 'Canonicalise' email addresses down to lower case.
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
new file mode 100644
index 0000000000..b1d3814909
--- /dev/null
+++ b/synapse/handlers/deactivate_account.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+from ._base import BaseHandler
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class DeactivateAccountHandler(BaseHandler):
+ """Handler which deals with deactivating user accounts."""
+ def __init__(self, hs):
+ super(DeactivateAccountHandler, self).__init__(hs)
+ self._auth_handler = hs.get_auth_handler()
+ self._device_handler = hs.get_device_handler()
+
+ @defer.inlineCallbacks
+ def deactivate_account(self, user_id):
+ """Deactivate a user's account
+
+ Args:
+ user_id (str): ID of user to be deactivated
+
+ Returns:
+ Deferred
+ """
+ # FIXME: Theoretically there is a race here wherein user resets
+ # password using threepid.
+
+ # first delete any devices belonging to the user, which will also
+ # delete corresponding access tokens.
+ yield self._device_handler.delete_all_devices_for_user(user_id)
+ # then delete any remaining access tokens which weren't associated with
+ # a device.
+ yield self._auth_handler.delete_access_tokens_for_user(user_id)
+
+ yield self.store.user_delete_threepids(user_id)
+ yield self.store.user_set_password_hash(user_id, None)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 579d8477ba..2152efc692 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -171,12 +171,30 @@ class DeviceHandler(BaseHandler):
yield self.notify_device_update(user_id, [device_id])
@defer.inlineCallbacks
+ def delete_all_devices_for_user(self, user_id, except_device_id=None):
+ """Delete all of the user's devices
+
+ Args:
+ user_id (str):
+ except_device_id (str|None): optional device id which should not
+ be deleted
+
+ Returns:
+ defer.Deferred:
+ """
+ device_map = yield self.store.get_devices_by_user(user_id)
+ device_ids = device_map.keys()
+ if except_device_id is not None:
+ device_ids = [d for d in device_ids if d != except_device_id]
+ yield self.delete_devices(user_id, device_ids)
+
+ @defer.inlineCallbacks
def delete_devices(self, user_id, device_ids):
""" Delete several devices
Args:
user_id (str):
- device_ids (str): The list of device IDs to delete
+ device_ids (List[str]): The list of device IDs to delete
Returns:
defer.Deferred:
diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py
new file mode 100644
index 0000000000..44414e1dc1
--- /dev/null
+++ b/synapse/handlers/set_password.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import Codes, StoreError, SynapseError
+from ._base import BaseHandler
+
+logger = logging.getLogger(__name__)
+
+
+class SetPasswordHandler(BaseHandler):
+ """Handler which deals with changing user account passwords"""
+ def __init__(self, hs):
+ super(SetPasswordHandler, self).__init__(hs)
+ self._auth_handler = hs.get_auth_handler()
+ self._device_handler = hs.get_device_handler()
+
+ @defer.inlineCallbacks
+ def set_password(self, user_id, newpassword, requester=None):
+ password_hash = self._auth_handler.hash(newpassword)
+
+ except_device_id = requester.device_id if requester else None
+ except_access_token_id = requester.access_token_id if requester else None
+
+ try:
+ yield self.store.user_set_password_hash(user_id, password_hash)
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
+ raise e
+
+ # we want to log out all of the user's other sessions. First delete
+ # all his other devices.
+ yield self._device_handler.delete_all_devices_for_user(
+ user_id, except_device_id=except_device_id,
+ )
+
+ # and now delete any access tokens which weren't associated with
+ # devices (or were associated with this device).
+ yield self._auth_handler.delete_access_tokens_for_user(
+ user_id, except_token_id=except_access_token_id,
+ )
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index a97532162f..e2b99ef3bd 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -362,8 +362,10 @@ def _get_hosts_for_srv_record(dns_client, host):
return res
# no logcontexts here, so we can safely fire these off and gatherResults
- d1 = dns_client.lookupAddress(host).addCallbacks(cb, eb)
- d2 = dns_client.lookupIPV6Address(host).addCallbacks(cb, eb)
+ d1 = dns_client.lookupAddress(host).addCallbacks(
+ cb, eb, errbackArgs=("A", ))
+ d2 = dns_client.lookupIPV6Address(host).addCallbacks(
+ cb, eb, errbackArgs=("AAAA", ))
results = yield defer.DeferredList(
[d1, d2], consumeErrors=True)
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 3ca1c9947c..25466cd292 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -28,6 +28,7 @@ from canonicaljson import (
)
from twisted.internet import defer
+from twisted.python import failure
from twisted.web import server, resource
from twisted.web.server import NOT_DONE_YET
from twisted.web.util import redirectTo
@@ -131,12 +132,17 @@ def wrap_request_handler(request_handler, include_metrics=False):
version_string=self.version_string,
)
except Exception:
- logger.exception(
- "Failed handle request %s.%s on %r: %r",
+ # failure.Failure() fishes the original Failure out
+ # of our stack, and thus gives us a sensible stack
+ # trace.
+ f = failure.Failure()
+ logger.error(
+ "Failed handle request %s.%s on %r: %r: %s",
request_handler.__module__,
request_handler.__name__,
self,
- request
+ request,
+ f.getTraceback().rstrip(),
)
respond_with_json(
request,
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index dc680ddf43..097c844d31 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet import defer
from synapse.types import UserID
@@ -81,6 +82,7 @@ class ModuleApi(object):
reg = self.hs.get_handlers().registration_handler
return reg.register(localpart=localpart)
+ @defer.inlineCallbacks
def invalidate_access_token(self, access_token):
"""Invalidate an access token for a user
@@ -94,8 +96,16 @@ class ModuleApi(object):
Raises:
synapse.api.errors.AuthError: the access token is invalid
"""
-
- return self._auth_handler.delete_access_token(access_token)
+ # see if the access token corresponds to a device
+ user_info = yield self._auth.get_user_by_access_token(access_token)
+ device_id = user_info.get("device_id")
+ user_id = user_info["user"].to_string()
+ if device_id:
+ # delete the device, which will also delete its access tokens
+ yield self.hs.get_device_handler().delete_device(user_id, device_id)
+ else:
+ # no associated device. Just delete the access token.
+ yield self._auth_handler.delete_access_token(access_token)
def run_db_interaction(self, desc, func, *args, **kwargs):
"""Run a function with a database connection
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 626da778cd..ef042681bc 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -255,9 +255,7 @@ class Notifier(object):
)
if self.federation_sender:
- preserve_fn(self.federation_sender.notify_new_events)(
- room_stream_id
- )
+ self.federation_sender.notify_new_events(room_stream_id)
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
self._user_joined_room(event.state_key, event.room_id)
@@ -297,8 +295,7 @@ class Notifier(object):
def on_new_replication_data(self):
"""Used to inform replication listeners that something has happend
without waking up any of the normal user event streams"""
- with PreserveLoggingContext():
- self.notify_replication()
+ self.notify_replication()
@defer.inlineCallbacks
def wait_for_events(self, user_id, timeout, callback, room_ids=None,
@@ -516,8 +513,14 @@ class Notifier(object):
self.replication_deferred = ObservableDeferred(defer.Deferred())
deferred.callback(None)
- for cb in self.replication_callbacks:
- preserve_fn(cb)()
+ # the callbacks may well outlast the current request, so we run
+ # them in the sentinel logcontext.
+ #
+ # (ideally it would be up to the callbacks to know if they were
+ # starting off background processes and drop the logcontext
+ # accordingly, but that requires more changes)
+ for cb in self.replication_callbacks:
+ cb()
@defer.inlineCallbacks
def wait_for_replication(self, callback, timeout):
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 34cb108dcb..134e89b371 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -103,19 +103,25 @@ class PusherPool:
yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
@defer.inlineCallbacks
- def remove_pushers_by_user(self, user_id, except_access_token_id=None):
- all = yield self.store.get_all_pushers()
- logger.info(
- "Removing all pushers for user %s except access tokens id %r",
- user_id, except_access_token_id
- )
- for p in all:
- if p['user_name'] == user_id and p['access_token'] != except_access_token_id:
+ def remove_pushers_by_access_token(self, user_id, access_tokens):
+ """Remove the pushers for a given user corresponding to a set of
+ access_tokens.
+
+ Args:
+ user_id (str): user to remove pushers for
+ access_tokens (Iterable[int]): access token *ids* to remove pushers
+ for
+ """
+ tokens = set(access_tokens)
+ for p in (yield self.store.get_pushers_by_user_id(user_id)):
+ if p['access_token'] in tokens:
logger.info(
"Removing pusher for app id %s, pushkey %s, user %s",
p['app_id'], p['pushkey'], p['user_name']
)
- yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+ yield self.remove_pusher(
+ p['app_id'], p['pushkey'], p['user_name'],
+ )
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_id, max_stream_id):
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 1d03e79b85..786c3fe864 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -216,11 +216,12 @@ class ReplicationStreamer(object):
self.federation_sender.federation_ack(token)
@measure_func("repl.on_user_sync")
+ @defer.inlineCallbacks
def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
"""A client has started/stopped syncing on a worker.
"""
user_sync_counter.inc()
- self.presence_handler.update_external_syncs_row(
+ yield self.presence_handler.update_external_syncs_row(
conn_id, user_id, is_syncing, last_sync_ms,
)
@@ -244,11 +245,12 @@ class ReplicationStreamer(object):
getattr(self.store, cache_func).invalidate(tuple(keys))
@measure_func("repl.on_user_ip")
+ @defer.inlineCallbacks
def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
"""The client saw a user request
"""
user_ip_cache_counter.inc()
- self.store.insert_client_ip(
+ yield self.store.insert_client_ip(
user_id, access_token, ip, user_agent, device_id, last_seen,
)
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 1197158fdc..5022808ea9 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -137,8 +137,8 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/admin/deactivate/(?P<target_user_id>[^/]*)")
def __init__(self, hs):
- self._auth_handler = hs.get_auth_handler()
super(DeactivateAccountRestServlet, self).__init__(hs)
+ self._deactivate_account_handler = hs.get_deactivate_account_handler()
@defer.inlineCallbacks
def on_POST(self, request, target_user_id):
@@ -149,7 +149,7 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
if not is_admin:
raise AuthError(403, "You are not a server admin")
- yield self._auth_handler.deactivate_account(target_user_id)
+ yield self._deactivate_account_handler.deactivate_account(target_user_id)
defer.returnValue((200, {}))
@@ -309,7 +309,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
super(ResetPasswordRestServlet, self).__init__(hs)
self.hs = hs
self.auth = hs.get_auth()
- self.auth_handler = hs.get_auth_handler()
+ self._set_password_handler = hs.get_set_password_handler()
@defer.inlineCallbacks
def on_POST(self, request, target_user_id):
@@ -330,7 +330,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
logger.info("new_password: %r", new_password)
- yield self.auth_handler.set_password(
+ yield self._set_password_handler.set_password(
target_user_id, new_password, requester
)
defer.returnValue((200, {}))
diff --git a/synapse/rest/client/v1/logout.py b/synapse/rest/client/v1/logout.py
index 6add754782..ca49955935 100644
--- a/synapse/rest/client/v1/logout.py
+++ b/synapse/rest/client/v1/logout.py
@@ -16,6 +16,7 @@
from twisted.internet import defer
from synapse.api.auth import get_access_token_from_request
+from synapse.api.errors import AuthError
from .base import ClientV1RestServlet, client_path_patterns
@@ -30,15 +31,30 @@ class LogoutRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(LogoutRestServlet, self).__init__(hs)
+ self._auth = hs.get_auth()
self._auth_handler = hs.get_auth_handler()
+ self._device_handler = hs.get_device_handler()
def on_OPTIONS(self, request):
return (200, {})
@defer.inlineCallbacks
def on_POST(self, request):
- access_token = get_access_token_from_request(request)
- yield self._auth_handler.delete_access_token(access_token)
+ try:
+ requester = yield self.auth.get_user_by_req(request)
+ except AuthError:
+ # this implies the access token has already been deleted.
+ pass
+ else:
+ if requester.device_id is None:
+ # the acccess token wasn't associated with a device.
+ # Just delete the access token
+ access_token = get_access_token_from_request(request)
+ yield self._auth_handler.delete_access_token(access_token)
+ else:
+ yield self._device_handler.delete_device(
+ requester.user.to_string(), requester.device_id)
+
defer.returnValue((200, {}))
@@ -49,6 +65,7 @@ class LogoutAllRestServlet(ClientV1RestServlet):
super(LogoutAllRestServlet, self).__init__(hs)
self.auth = hs.get_auth()
self._auth_handler = hs.get_auth_handler()
+ self._device_handler = hs.get_device_handler()
def on_OPTIONS(self, request):
return (200, {})
@@ -57,6 +74,12 @@ class LogoutAllRestServlet(ClientV1RestServlet):
def on_POST(self, request):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
+
+ # first delete all of the user's devices
+ yield self._device_handler.delete_all_devices_for_user(user_id)
+
+ # .. and then delete any access tokens which weren't associated with
+ # devices.
yield self._auth_handler.delete_access_tokens_for_user(user_id)
defer.returnValue((200, {}))
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 726e0a2826..c26ce63bcf 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -98,6 +98,7 @@ class PasswordRestServlet(RestServlet):
self.auth = hs.get_auth()
self.auth_handler = hs.get_auth_handler()
self.datastore = self.hs.get_datastore()
+ self._set_password_handler = hs.get_set_password_handler()
@defer.inlineCallbacks
def on_POST(self, request):
@@ -147,7 +148,7 @@ class PasswordRestServlet(RestServlet):
raise SynapseError(400, "", Codes.MISSING_PARAM)
new_password = params['new_password']
- yield self.auth_handler.set_password(
+ yield self._set_password_handler.set_password(
user_id, new_password, requester
)
@@ -161,10 +162,11 @@ class DeactivateAccountRestServlet(RestServlet):
PATTERNS = client_v2_patterns("/account/deactivate$")
def __init__(self, hs):
+ super(DeactivateAccountRestServlet, self).__init__()
self.hs = hs
self.auth = hs.get_auth()
self.auth_handler = hs.get_auth_handler()
- super(DeactivateAccountRestServlet, self).__init__()
+ self._deactivate_account_handler = hs.get_deactivate_account_handler()
@defer.inlineCallbacks
def on_POST(self, request):
@@ -179,7 +181,7 @@ class DeactivateAccountRestServlet(RestServlet):
# allow ASes to dectivate their own users
if requester and requester.app_service:
- yield self.auth_handler.deactivate_account(
+ yield self._deactivate_account_handler.deactivate_account(
requester.user.to_string()
)
defer.returnValue((200, {}))
@@ -206,7 +208,7 @@ class DeactivateAccountRestServlet(RestServlet):
logger.error("Auth succeeded but no known type!", result.keys())
raise SynapseError(500, "", Codes.UNKNOWN)
- yield self.auth_handler.deactivate_account(user_id)
+ yield self._deactivate_account_handler.deactivate_account(user_id)
defer.returnValue((200, {}))
diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py
index 089ec71c81..f762dbfa9a 100644
--- a/synapse/rest/client/v2_alpha/groups.py
+++ b/synapse/rest/client/v2_alpha/groups.py
@@ -38,7 +38,7 @@ class GroupServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, group_id):
- requester = yield self.auth.get_user_by_req(request)
+ requester = yield self.auth.get_user_by_req(request, allow_guest=True)
requester_user_id = requester.user.to_string()
group_description = yield self.groups_handler.get_group_profile(
@@ -74,7 +74,7 @@ class GroupSummaryServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, group_id):
- requester = yield self.auth.get_user_by_req(request)
+ requester = yield self.auth.get_user_by_req(request, allow_guest=True)
requester_user_id = requester.user.to_string()
get_group_summary = yield self.groups_handler.get_group_summary(
@@ -148,7 +148,7 @@ class GroupCategoryServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, group_id, category_id):
- requester = yield self.auth.get_user_by_req(request)
+ requester = yield self.auth.get_user_by_req(request, allow_guest=True)
requester_user_id = requester.user.to_string()
category = yield self.groups_handler.get_group_category(
@@ -200,7 +200,7 @@ class GroupCategoriesServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, group_id):
- requester = yield self.auth.get_user_by_req(request)
+ requester = yield self.auth.get_user_by_req(request, allow_guest=True)
requester_user_id = requester.user.to_string()
category = yield self.groups_handler.get_group_categories(
@@ -225,7 +225,7 @@ class GroupRoleServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, group_id, role_id):
- requester = yield self.auth.get_user_by_req(request)
+ requester = yield self.auth.get_user_by_req(request, allow_guest=True)
requester_user_id = requester.user.to_string()
category = yield self.groups_handler.get_group_role(
@@ -277,7 +277,7 @@ class GroupRolesServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, group_id):
- requester = yield self.auth.get_user_by_req(request)
+ requester = yield self.auth.get_user_by_req(request, allow_guest=True)
requester_user_id = requester.user.to_string()
category = yield self.groups_handler.get_group_roles(
@@ -348,7 +348,7 @@ class GroupRoomServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, group_id):
- requester = yield self.auth.get_user_by_req(request)
+ requester = yield self.auth.get_user_by_req(request, allow_guest=True)
requester_user_id = requester.user.to_string()
result = yield self.groups_handler.get_rooms_in_group(group_id, requester_user_id)
@@ -369,7 +369,7 @@ class GroupUsersServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, group_id):
- requester = yield self.auth.get_user_by_req(request)
+ requester = yield self.auth.get_user_by_req(request, allow_guest=True)
requester_user_id = requester.user.to_string()
result = yield self.groups_handler.get_users_in_group(group_id, requester_user_id)
@@ -672,7 +672,7 @@ class PublicisedGroupsForUserServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request, user_id):
- yield self.auth.get_user_by_req(request)
+ yield self.auth.get_user_by_req(request, allow_guest=True)
result = yield self.groups_handler.get_publicised_groups_for_user(
user_id
@@ -697,7 +697,7 @@ class PublicisedGroupsForUsersServlet(RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
- yield self.auth.get_user_by_req(request)
+ yield self.auth.get_user_by_req(request, allow_guest=True)
content = parse_json_object_from_request(request)
user_ids = content["user_ids"]
@@ -724,7 +724,7 @@ class GroupsForUserServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request):
- requester = yield self.auth.get_user_by_req(request)
+ requester = yield self.auth.get_user_by_req(request, allow_guest=True)
requester_user_id = requester.user.to_string()
result = yield self.groups_handler.get_joined_groups(requester_user_id)
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 723f7043f4..40d2e664eb 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -25,7 +25,8 @@ from synapse.util.stringutils import random_string
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.http.client import SpiderHttpClient
from synapse.http.server import (
- request_handler, respond_with_json_bytes
+ request_handler, respond_with_json_bytes,
+ respond_with_json,
)
from synapse.util.async import ObservableDeferred
from synapse.util.stringutils import is_ascii
@@ -78,6 +79,9 @@ class PreviewUrlResource(Resource):
self._expire_url_cache_data, 10 * 1000
)
+ def render_OPTIONS(self, request):
+ return respond_with_json(request, 200, {}, send_cors=True)
+
def render_GET(self, request):
self._async_render_GET(request)
return NOT_DONE_YET
@@ -348,11 +352,16 @@ class PreviewUrlResource(Resource):
def _expire_url_cache_data(self):
"""Clean up expired url cache content, media and thumbnails.
"""
-
# TODO: Delete from backup media store
now = self.clock.time_msec()
+ logger.info("Running url preview cache expiry")
+
+ if not (yield self.store.has_completed_background_updates()):
+ logger.info("Still running DB updates; skipping expiry")
+ return
+
# First we delete expired url cache entries
media_ids = yield self.store.get_expired_url_cache(now)
@@ -426,8 +435,7 @@ class PreviewUrlResource(Resource):
yield self.store.delete_url_cache_media(removed_media)
- if removed_media:
- logger.info("Deleted %d media from url cache", len(removed_media))
+ logger.info("Deleted %d media from url cache", len(removed_media))
def decode_and_calc_og(body, media_uri, request_encoding=None):
diff --git a/synapse/server.py b/synapse/server.py
index 6de33019d2..99693071b6 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -39,11 +39,13 @@ from synapse.federation.transaction_queue import TransactionQueue
from synapse.handlers import Handlers
from synapse.handlers.appservice import ApplicationServicesHandler
from synapse.handlers.auth import AuthHandler, MacaroonGeneartor
+from synapse.handlers.deactivate_account import DeactivateAccountHandler
from synapse.handlers.devicemessage import DeviceMessageHandler
from synapse.handlers.device import DeviceHandler
from synapse.handlers.e2e_keys import E2eKeysHandler
from synapse.handlers.presence import PresenceHandler
from synapse.handlers.room_list import RoomListHandler
+from synapse.handlers.set_password import SetPasswordHandler
from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import TypingHandler
from synapse.handlers.events import EventHandler, EventStreamHandler
@@ -60,7 +62,10 @@ from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.notifier import Notifier
from synapse.push.action_generator import ActionGenerator
from synapse.push.pusherpool import PusherPool
-from synapse.rest.media.v1.media_repository import MediaRepository
+from synapse.rest.media.v1.media_repository import (
+ MediaRepository,
+ MediaRepositoryResource,
+)
from synapse.state import StateHandler
from synapse.storage import DataStore
from synapse.streams.events import EventSources
@@ -90,17 +95,12 @@ class HomeServer(object):
"""
DEPENDENCIES = [
- 'config',
- 'clock',
'http_client',
'db_pool',
- 'persistence_service',
'replication_layer',
- 'datastore',
'handlers',
'v1auth',
'auth',
- 'rest_servlet_factory',
'state_handler',
'presence_handler',
'sync_handler',
@@ -117,19 +117,10 @@ class HomeServer(object):
'application_service_handler',
'device_message_handler',
'profile_handler',
+ 'deactivate_account_handler',
+ 'set_password_handler',
'notifier',
- 'distributor',
- 'client_resource',
- 'resource_for_federation',
- 'resource_for_static_content',
- 'resource_for_web_client',
- 'resource_for_content_repo',
- 'resource_for_server_key',
- 'resource_for_server_key_v2',
- 'resource_for_media_repository',
- 'resource_for_metrics',
'event_sources',
- 'ratelimiter',
'keyring',
'pusherpool',
'event_builder_factory',
@@ -137,6 +128,7 @@ class HomeServer(object):
'http_client_context_factory',
'simple_http_client',
'media_repository',
+ 'media_repository_resource',
'federation_transport_client',
'federation_sender',
'receipts_handler',
@@ -183,6 +175,21 @@ class HomeServer(object):
def is_mine_id(self, string):
return string.split(":", 1)[1] == self.hostname
+ def get_clock(self):
+ return self.clock
+
+ def get_datastore(self):
+ return self.datastore
+
+ def get_config(self):
+ return self.config
+
+ def get_distributor(self):
+ return self.distributor
+
+ def get_ratelimiter(self):
+ return self.ratelimiter
+
def build_replication_layer(self):
return initialize_http_replication(self)
@@ -265,6 +272,12 @@ class HomeServer(object):
def build_profile_handler(self):
return ProfileHandler(self)
+ def build_deactivate_account_handler(self):
+ return DeactivateAccountHandler(self)
+
+ def build_set_password_handler(self):
+ return SetPasswordHandler(self)
+
def build_event_sources(self):
return EventSources(self)
@@ -294,6 +307,11 @@ class HomeServer(object):
**self.db_config.get("args", {})
)
+ def build_media_repository_resource(self):
+ # build the media repo resource. This indirects through the HomeServer
+ # to ensure that we only have a single instance of
+ return MediaRepositoryResource(self)
+
def build_media_repository(self):
return MediaRepository(self)
diff --git a/synapse/server.pyi b/synapse/server.pyi
index e8c0386b7f..41416ef252 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -3,10 +3,14 @@ import synapse.federation.transaction_queue
import synapse.federation.transport.client
import synapse.handlers
import synapse.handlers.auth
+import synapse.handlers.deactivate_account
import synapse.handlers.device
import synapse.handlers.e2e_keys
-import synapse.storage
+import synapse.handlers.set_password
+import synapse.rest.media.v1.media_repository
import synapse.state
+import synapse.storage
+
class HomeServer(object):
def get_auth(self) -> synapse.api.auth.Auth:
@@ -30,8 +34,20 @@ class HomeServer(object):
def get_state_handler(self) -> synapse.state.StateHandler:
pass
+ def get_deactivate_account_handler(self) -> synapse.handlers.deactivate_account.DeactivateAccountHandler:
+ pass
+
+ def get_set_password_handler(self) -> synapse.handlers.set_password.SetPasswordHandler:
+ pass
+
def get_federation_sender(self) -> synapse.federation.transaction_queue.TransactionQueue:
pass
def get_federation_transport_client(self) -> synapse.federation.transport.client.TransportLayerClient:
pass
+
+ def get_media_repository_resource(self) -> synapse.rest.media.v1.media_repository.MediaRepositoryResource:
+ pass
+
+ def get_media_repository(self) -> synapse.rest.media.v1.media_repository.MediaRepository:
+ pass
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 0fdf49a2fd..b971f0cb18 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -495,6 +495,7 @@ class SQLBaseStore(object):
Deferred(bool): True if a new entry was created, False if an
existing one was updated.
"""
+ attempts = 0
while True:
try:
result = yield self.runInteraction(
@@ -504,6 +505,12 @@ class SQLBaseStore(object):
)
defer.returnValue(result)
except self.database_engine.module.IntegrityError as e:
+ attempts += 1
+ if attempts >= 5:
+ # don't retry forever, because things other than races
+ # can cause IntegrityErrors
+ raise
+
# presumably we raced with another transaction: let's retry.
logger.warn(
"IntegrityError when upserting into %s; retrying: %s",
@@ -600,20 +607,18 @@ class SQLBaseStore(object):
@staticmethod
def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
- if keyvalues:
- where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
- else:
- where = ""
-
sql = (
- "SELECT %(retcol)s FROM %(table)s %(where)s"
+ "SELECT %(retcol)s FROM %(table)s"
) % {
"retcol": retcol,
"table": table,
- "where": where,
}
- txn.execute(sql, keyvalues.values())
+ if keyvalues:
+ sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
+ txn.execute(sql, keyvalues.values())
+ else:
+ txn.execute(sql)
return [r[0] for r in txn]
@@ -624,7 +629,7 @@ class SQLBaseStore(object):
Args:
table (str): table name
- keyvalues (dict): column names and values to select the rows with
+ keyvalues (dict|None): column names and values to select the rows with
retcol (str): column whos value we wish to retrieve.
Returns:
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index c8a1eb016b..56a0bde549 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -222,9 +222,12 @@ class AccountDataStore(SQLBaseStore):
"""
content_json = json.dumps(content)
- def add_account_data_txn(txn, next_id):
- self._simple_upsert_txn(
- txn,
+ with self._account_data_id_gen.get_next() as next_id:
+ # no need to lock here as room_account_data has a unique constraint
+ # on (user_id, room_id, account_data_type) so _simple_upsert will
+ # retry if there is a conflict.
+ yield self._simple_upsert(
+ desc="add_room_account_data",
table="room_account_data",
keyvalues={
"user_id": user_id,
@@ -234,19 +237,20 @@ class AccountDataStore(SQLBaseStore):
values={
"stream_id": next_id,
"content": content_json,
- }
- )
- txn.call_after(
- self._account_data_stream_cache.entity_has_changed,
- user_id, next_id,
+ },
+ lock=False,
)
- txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
- self._update_max_stream_id(txn, next_id)
- with self._account_data_id_gen.get_next() as next_id:
- yield self.runInteraction(
- "add_room_account_data", add_account_data_txn, next_id
- )
+ # it's theoretically possible for the above to succeed and the
+ # below to fail - in which case we might reuse a stream id on
+ # restart, and the above update might not get propagated. That
+ # doesn't sound any worse than the whole update getting lost,
+ # which is what would happen if we combined the two into one
+ # transaction.
+ yield self._update_max_stream_id(next_id)
+
+ self._account_data_stream_cache.entity_has_changed(user_id, next_id)
+ self.get_account_data_for_user.invalidate((user_id,))
result = self._account_data_id_gen.get_current_token()
defer.returnValue(result)
@@ -263,9 +267,12 @@ class AccountDataStore(SQLBaseStore):
"""
content_json = json.dumps(content)
- def add_account_data_txn(txn, next_id):
- self._simple_upsert_txn(
- txn,
+ with self._account_data_id_gen.get_next() as next_id:
+ # no need to lock here as account_data has a unique constraint on
+ # (user_id, account_data_type) so _simple_upsert will retry if
+ # there is a conflict.
+ yield self._simple_upsert(
+ desc="add_user_account_data",
table="account_data",
keyvalues={
"user_id": user_id,
@@ -274,40 +281,46 @@ class AccountDataStore(SQLBaseStore):
values={
"stream_id": next_id,
"content": content_json,
- }
+ },
+ lock=False,
)
- txn.call_after(
- self._account_data_stream_cache.entity_has_changed,
+
+ # it's theoretically possible for the above to succeed and the
+ # below to fail - in which case we might reuse a stream id on
+ # restart, and the above update might not get propagated. That
+ # doesn't sound any worse than the whole update getting lost,
+ # which is what would happen if we combined the two into one
+ # transaction.
+ yield self._update_max_stream_id(next_id)
+
+ self._account_data_stream_cache.entity_has_changed(
user_id, next_id,
)
- txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
- txn.call_after(
- self.get_global_account_data_by_type_for_user.invalidate,
+ self.get_account_data_for_user.invalidate((user_id,))
+ self.get_global_account_data_by_type_for_user.invalidate(
(account_data_type, user_id,)
)
- self._update_max_stream_id(txn, next_id)
-
- with self._account_data_id_gen.get_next() as next_id:
- yield self.runInteraction(
- "add_user_account_data", add_account_data_txn, next_id
- )
result = self._account_data_id_gen.get_current_token()
defer.returnValue(result)
- def _update_max_stream_id(self, txn, next_id):
+ def _update_max_stream_id(self, next_id):
"""Update the max stream_id
Args:
- txn: The database cursor
next_id(int): The the revision to advance to.
"""
- update_max_id_sql = (
- "UPDATE account_data_max_stream_id"
- " SET stream_id = ?"
- " WHERE stream_id < ?"
+ def _update(txn):
+ update_max_id_sql = (
+ "UPDATE account_data_max_stream_id"
+ " SET stream_id = ?"
+ " WHERE stream_id < ?"
+ )
+ txn.execute(update_max_id_sql, (next_id, next_id))
+ return self.runInteraction(
+ "update_account_data_max_stream_id",
+ _update,
)
- txn.execute(update_max_id_sql, (next_id, next_id))
@cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000)
def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 6f235ac051..11a1b942f1 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -85,6 +85,7 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_performance = {}
self._background_update_queue = []
self._background_update_handlers = {}
+ self._all_done = False
@defer.inlineCallbacks
def start_doing_background_updates(self):
@@ -106,9 +107,41 @@ class BackgroundUpdateStore(SQLBaseStore):
"No more background updates to do."
" Unscheduling background update task."
)
+ self._all_done = True
defer.returnValue(None)
@defer.inlineCallbacks
+ def has_completed_background_updates(self):
+ """Check if all the background updates have completed
+
+ Returns:
+ Deferred[bool]: True if all background updates have completed
+ """
+ # if we've previously determined that there is nothing left to do, that
+ # is easy
+ if self._all_done:
+ defer.returnValue(True)
+
+ # obviously, if we have things in our queue, we're not done.
+ if self._background_update_queue:
+ defer.returnValue(False)
+
+ # otherwise, check if there are updates to be run. This is important,
+ # as we may be running on a worker which doesn't perform the bg updates
+ # itself, but still wants to wait for them to happen.
+ updates = yield self._simple_select_onecol(
+ "background_updates",
+ keyvalues=None,
+ retcol="1",
+ desc="check_background_updates",
+ )
+ if not updates:
+ self._all_done = True
+ defer.returnValue(True)
+
+ defer.returnValue(False)
+
+ @defer.inlineCallbacks
def do_next_background_update(self, desired_duration_ms):
"""Does some amount of work on the next queued background update
@@ -269,7 +302,7 @@ class BackgroundUpdateStore(SQLBaseStore):
# Sqlite doesn't support concurrent creation of indexes.
#
# We don't use partial indices on SQLite as it wasn't introduced
- # until 3.8, and wheezy has 3.7
+ # until 3.8, and wheezy and CentOS 7 have 3.7
#
# We assume that sqlite doesn't give us invalid indices; however
# we may still end up with the index existing but the
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 52e5cdad70..a66ff7c1e0 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -12,13 +12,23 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from synapse.storage.background_updates import BackgroundUpdateStore
-from ._base import SQLBaseStore
-
-class MediaRepositoryStore(SQLBaseStore):
+class MediaRepositoryStore(BackgroundUpdateStore):
"""Persistence for attachments and avatars"""
+ def __init__(self, db_conn, hs):
+ super(MediaRepositoryStore, self).__init__(db_conn, hs)
+
+ self.register_background_index_update(
+ update_name='local_media_repository_url_idx',
+ index_name='local_media_repository_url_idx',
+ table='local_media_repository',
+ columns=['created_ts'],
+ where_clause='url_cache IS NOT NULL',
+ )
+
def get_default_thumbnails(self, top_level_type, sub_type):
return []
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 8b9544c209..3aa810981f 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -254,8 +254,8 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
If None, tokens associated with any device (or no device) will
be deleted
Returns:
- defer.Deferred[list[str, str|None]]: a list of the deleted tokens
- and device IDs
+ defer.Deferred[list[str, int, str|None, int]]: a list of
+ (token, token id, device id) for each of the deleted tokens
"""
def f(txn):
keyvalues = {
@@ -272,12 +272,12 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
values.append(except_token_id)
txn.execute(
- "SELECT token, device_id FROM access_tokens WHERE %s" % where_clause,
+ "SELECT token, id, device_id FROM access_tokens WHERE %s" % where_clause,
values
)
- tokens_and_devices = [(r[0], r[1]) for r in txn]
+ tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
- for token, _ in tokens_and_devices:
+ for token, _, _ in tokens_and_devices:
self._invalidate_cache_and_stream(
txn, self.get_user_by_access_token, (token,)
)
diff --git a/synapse/storage/schema/delta/44/expire_url_cache.sql b/synapse/storage/schema/delta/44/expire_url_cache.sql
index e2b775f038..b12f9b2ebf 100644
--- a/synapse/storage/schema/delta/44/expire_url_cache.sql
+++ b/synapse/storage/schema/delta/44/expire_url_cache.sql
@@ -13,7 +13,10 @@
* limitations under the License.
*/
-CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL;
+-- this didn't work on SQLite 3.7 (because of lack of partial indexes), so was
+-- removed and replaced with 46/local_media_repository_url_idx.sql.
+--
+-- CREATE INDEX local_media_repository_url_idx ON local_media_repository(created_ts) WHERE url_cache IS NOT NULL;
-- we need to change `expires` to `expires_ts` so that we can index on it. SQLite doesn't support
-- indices on expressions until 3.9.
diff --git a/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql b/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql
new file mode 100644
index 0000000000..bbfc7f5d1a
--- /dev/null
+++ b/synapse/storage/schema/delta/46/local_media_repository_url_idx.sql
@@ -0,0 +1,24 @@
+/* Copyright 2017 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- register a background update which will recreate the
+-- local_media_repository_url_idx index.
+--
+-- We do this as a bg update not because it is a particularly onerous
+-- operation, but because we'd like it to be a partial index if possible, and
+-- the background_index_update code will understand whether we are on
+-- postgres or sqlite and behave accordingly.
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('local_media_repository_url_idx', '{}');
|