summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/api/errors.py74
-rw-r--r--synapse/handlers/device.py36
-rw-r--r--synapse/handlers/room_list.py66
-rw-r--r--synapse/http/matrixfederationclient.py15
-rw-r--r--synapse/notifier.py44
-rw-r--r--synapse/push/mailer.py2
-rw-r--r--synapse/replication/slave/storage/events.py4
-rw-r--r--synapse/rest/client/v2_alpha/devices.py47
-rw-r--r--synapse/rest/media/v1/download_resource.py12
-rw-r--r--synapse/rest/media/v1/media_repository.py44
-rw-r--r--synapse/storage/_base.py41
-rw-r--r--synapse/storage/devices.py17
-rw-r--r--synapse/storage/events.py49
-rw-r--r--synapse/storage/state.py14
-rw-r--r--synapse/storage/stream.py3
15 files changed, 395 insertions, 73 deletions
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 921c457738..6fbd5d6876 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -15,6 +15,7 @@
 
 """Contains exceptions and error codes."""
 
+import json
 import logging
 
 logger = logging.getLogger(__name__)
@@ -50,27 +51,35 @@ class Codes(object):
 
 
 class CodeMessageException(RuntimeError):
-    """An exception with integer code and message string attributes."""
+    """An exception with integer code and message string attributes.
 
+    Attributes:
+        code (int): HTTP error code
+        msg (str): string describing the error
+    """
     def __init__(self, code, msg):
         super(CodeMessageException, self).__init__("%d: %s" % (code, msg))
         self.code = code
         self.msg = msg
-        self.response_code_message = None
 
     def error_dict(self):
         return cs_error(self.msg)
 
 
 class SynapseError(CodeMessageException):
-    """A base error which can be caught for all synapse events."""
+    """A base exception type for matrix errors which have an errcode and error
+    message (as well as an HTTP status code).
+
+    Attributes:
+        errcode (str): Matrix error code e.g 'M_FORBIDDEN'
+    """
     def __init__(self, code, msg, errcode=Codes.UNKNOWN):
         """Constructs a synapse error.
 
         Args:
             code (int): The integer error code (an HTTP response code)
             msg (str): The human-readable error message.
-            err (str): The error code e.g 'M_FORBIDDEN'
+            errcode (str): The matrix error code e.g 'M_FORBIDDEN'
         """
         super(SynapseError, self).__init__(code, msg)
         self.errcode = errcode
@@ -81,6 +90,39 @@ class SynapseError(CodeMessageException):
             self.errcode,
         )
 
+    @classmethod
+    def from_http_response_exception(cls, err):
+        """Make a SynapseError based on an HTTPResponseException
+
+        This is useful when a proxied request has failed, and we need to
+        decide how to map the failure onto a matrix error to send back to the
+        client.
+
+        An attempt is made to parse the body of the http response as a matrix
+        error. If that succeeds, the errcode and error message from the body
+        are used as the errcode and error message in the new synapse error.
+
+        Otherwise, the errcode is set to M_UNKNOWN, and the error message is
+        set to the reason code from the HTTP response.
+
+        Args:
+            err (HttpResponseException):
+
+        Returns:
+            SynapseError:
+        """
+        # try to parse the body as json, to get better errcode/msg, but
+        # default to M_UNKNOWN with the HTTP status as the error text
+        try:
+            j = json.loads(err.response)
+        except ValueError:
+            j = {}
+        errcode = j.get('errcode', Codes.UNKNOWN)
+        errmsg = j.get('error', err.msg)
+
+        res = SynapseError(err.code, errmsg, errcode)
+        return res
+
 
 class RegistrationError(SynapseError):
     """An error raised when a registration event fails."""
@@ -106,13 +148,11 @@ class UnrecognizedRequestError(SynapseError):
 
 class NotFoundError(SynapseError):
     """An error indicating we can't find the thing you asked for"""
-    def __init__(self, *args, **kwargs):
-        if "errcode" not in kwargs:
-            kwargs["errcode"] = Codes.NOT_FOUND
+    def __init__(self, msg="Not found", errcode=Codes.NOT_FOUND):
         super(NotFoundError, self).__init__(
             404,
-            "Not found",
-            **kwargs
+            msg,
+            errcode=errcode
         )
 
 
@@ -173,7 +213,6 @@ class LimitExceededError(SynapseError):
                  errcode=Codes.LIMIT_EXCEEDED):
         super(LimitExceededError, self).__init__(code, msg, errcode)
         self.retry_after_ms = retry_after_ms
-        self.response_code_message = "Too Many Requests"
 
     def error_dict(self):
         return cs_error(
@@ -243,6 +282,19 @@ class FederationError(RuntimeError):
 
 
 class HttpResponseException(CodeMessageException):
+    """
+    Represents an HTTP-level failure of an outbound request
+
+    Attributes:
+        response (str): body of response
+    """
     def __init__(self, code, msg, response):
-        self.response = response
+        """
+
+        Args:
+            code (int): HTTP status code
+            msg (str): reason phrase from HTTP response status line
+            response (str): body of response
+        """
         super(HttpResponseException, self).__init__(code, msg)
+        self.response = response
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index e859b3165f..1b007d4945 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -170,6 +170,40 @@ class DeviceHandler(BaseHandler):
         yield self.notify_device_update(user_id, [device_id])
 
     @defer.inlineCallbacks
+    def delete_devices(self, user_id, device_ids):
+        """ Delete several devices
+
+        Args:
+            user_id (str):
+            device_ids (str): The list of device IDs to delete
+
+        Returns:
+            defer.Deferred:
+        """
+
+        try:
+            yield self.store.delete_devices(user_id, device_ids)
+        except errors.StoreError, e:
+            if e.code == 404:
+                # no match
+                pass
+            else:
+                raise
+
+        # Delete access tokens and e2e keys for each device. Not optimised as it is not
+        # considered as part of a critical path.
+        for device_id in device_ids:
+            yield self.store.user_delete_access_tokens(
+                user_id, device_id=device_id,
+                delete_refresh_tokens=True,
+            )
+            yield self.store.delete_e2e_keys_by_device(
+                user_id=user_id, device_id=device_id
+            )
+
+        yield self.notify_device_update(user_id, device_ids)
+
+    @defer.inlineCallbacks
     def update_device(self, user_id, device_id, content):
         """ Update the given device
 
@@ -262,7 +296,7 @@ class DeviceHandler(BaseHandler):
                 # ordering: treat it the same as a new room
                 event_ids = []
 
-            current_state_ids = yield self.state.get_current_state_ids(room_id)
+            current_state_ids = yield self.store.get_current_state_ids(room_id)
 
             # special-case for an empty prev state: include all members
             # in the changed list
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 19eebbd43f..516cd9a6ac 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -21,6 +21,7 @@ from synapse.api.constants import (
     EventTypes, JoinRules,
 )
 from synapse.util.async import concurrently_execute
+from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.types import ThirdPartyInstanceID
 
@@ -62,6 +63,10 @@ class RoomListHandler(BaseHandler):
                 appservice and network id to use an appservice specific one.
                 Setting to None returns all public rooms across all lists.
         """
+        logger.info(
+            "Getting public room list: limit=%r, since=%r, search=%r, network=%r",
+            limit, since_token, bool(search_filter), network_tuple,
+        )
         if search_filter:
             # We explicitly don't bother caching searches or requests for
             # appservice specific lists.
@@ -91,7 +96,6 @@ class RoomListHandler(BaseHandler):
 
         rooms_to_order_value = {}
         rooms_to_num_joined = {}
-        rooms_to_latest_event_ids = {}
 
         newly_visible = []
         newly_unpublished = []
@@ -116,19 +120,26 @@ class RoomListHandler(BaseHandler):
 
         @defer.inlineCallbacks
         def get_order_for_room(room_id):
-            latest_event_ids = rooms_to_latest_event_ids.get(room_id, None)
-            if not latest_event_ids:
+            # Most of the rooms won't have changed between the since token and
+            # now (especially if the since token is "now"). So, we can ask what
+            # the current users are in a room (that will hit a cache) and then
+            # check if the room has changed since the since token. (We have to
+            # do it in that order to avoid races).
+            # If things have changed then fall back to getting the current state
+            # at the since token.
+            joined_users = yield self.store.get_users_in_room(room_id)
+            if self.store.has_room_changed_since(room_id, stream_token):
                 latest_event_ids = yield self.store.get_forward_extremeties_for_room(
                     room_id, stream_token
                 )
-                rooms_to_latest_event_ids[room_id] = latest_event_ids
 
-            if not latest_event_ids:
-                return
+                if not latest_event_ids:
+                    return
+
+                joined_users = yield self.state_handler.get_current_user_in_room(
+                    room_id, latest_event_ids,
+                )
 
-            joined_users = yield self.state_handler.get_current_user_in_room(
-                room_id, latest_event_ids,
-            )
             num_joined_users = len(joined_users)
             rooms_to_num_joined[room_id] = num_joined_users
 
@@ -165,19 +176,19 @@ class RoomListHandler(BaseHandler):
                 rooms_to_scan = rooms_to_scan[:since_token.current_limit]
                 rooms_to_scan.reverse()
 
-        # Actually generate the entries. _generate_room_entry will append to
+        # Actually generate the entries. _append_room_entry_to_chunk will append to
         # chunk but will stop if len(chunk) > limit
         chunk = []
         if limit and not search_filter:
             step = limit + 1
             for i in xrange(0, len(rooms_to_scan), step):
                 # We iterate here because the vast majority of cases we'll stop
-                # at first iteration, but occaisonally _generate_room_entry
+                # at first iteration, but occaisonally _append_room_entry_to_chunk
                 # won't append to the chunk and so we need to loop again.
                 # We don't want to scan over the entire range either as that
                 # would potentially waste a lot of work.
                 yield concurrently_execute(
-                    lambda r: self._generate_room_entry(
+                    lambda r: self._append_room_entry_to_chunk(
                         r, rooms_to_num_joined[r],
                         chunk, limit, search_filter
                     ),
@@ -187,7 +198,7 @@ class RoomListHandler(BaseHandler):
                     break
         else:
             yield concurrently_execute(
-                lambda r: self._generate_room_entry(
+                lambda r: self._append_room_entry_to_chunk(
                     r, rooms_to_num_joined[r],
                     chunk, limit, search_filter
                 ),
@@ -256,21 +267,35 @@ class RoomListHandler(BaseHandler):
         defer.returnValue(results)
 
     @defer.inlineCallbacks
-    def _generate_room_entry(self, room_id, num_joined_users, chunk, limit,
-                             search_filter):
+    def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit,
+                                    search_filter):
+        """Generate the entry for a room in the public room list and append it
+        to the `chunk` if it matches the search filter
+        """
         if limit and len(chunk) > limit + 1:
             # We've already got enough, so lets just drop it.
             return
 
+        result = yield self._generate_room_entry(room_id, num_joined_users)
+
+        if result and _matches_room_entry(result, search_filter):
+            chunk.append(result)
+
+    @cachedInlineCallbacks(num_args=1, cache_context=True)
+    def _generate_room_entry(self, room_id, num_joined_users, cache_context):
+        """Returns the entry for a room
+        """
         result = {
             "room_id": room_id,
             "num_joined_members": num_joined_users,
         }
 
-        current_state_ids = yield self.state_handler.get_current_state_ids(room_id)
+        current_state_ids = yield self.store.get_current_state_ids(
+            room_id, on_invalidate=cache_context.invalidate,
+        )
 
         event_map = yield self.store.get_events([
-            event_id for key, event_id in current_state_ids.items()
+            event_id for key, event_id in current_state_ids.iteritems()
             if key[0] in (
                 EventTypes.JoinRules,
                 EventTypes.Name,
@@ -294,7 +319,9 @@ class RoomListHandler(BaseHandler):
             if join_rule and join_rule != JoinRules.PUBLIC:
                 defer.returnValue(None)
 
-        aliases = yield self.store.get_aliases_for_room(room_id)
+        aliases = yield self.store.get_aliases_for_room(
+            room_id, on_invalidate=cache_context.invalidate
+        )
         if aliases:
             result["aliases"] = aliases
 
@@ -334,8 +361,7 @@ class RoomListHandler(BaseHandler):
             if avatar_url:
                 result["avatar_url"] = avatar_url
 
-        if _matches_room_entry(result, search_filter):
-            chunk.append(result)
+        defer.returnValue(result)
 
     @defer.inlineCallbacks
     def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 78b92cef36..82586e3dea 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -108,6 +108,12 @@ class MatrixFederationHttpClient(object):
                         query_bytes=b"", retry_on_dns_fail=True,
                         timeout=None, long_retries=False):
         """ Creates and sends a request to the given url
+
+        Returns:
+            Deferred: resolves with the http response object on success.
+
+            Fails with ``HTTPRequestException``: if we get an HTTP response
+            code >= 300.
         """
         headers_dict[b"User-Agent"] = [self.version_string]
         headers_dict[b"Host"] = [destination]
@@ -408,8 +414,11 @@ class MatrixFederationHttpClient(object):
             output_stream (file): File to write the response body to.
             args (dict): Optional dictionary used to create the query string.
         Returns:
-            A (int,dict) tuple of the file length and a dict of the response
-            headers.
+            Deferred: resolves with an (int,dict) tuple of the file length and
+            a dict of the response headers.
+
+            Fails with ``HTTPRequestException`` if we get an HTTP response code
+            >= 300
         """
 
         encoded_args = {}
@@ -419,7 +428,7 @@ class MatrixFederationHttpClient(object):
             encoded_args[k] = [v.encode("UTF-8") for v in vs]
 
         query_bytes = urllib.urlencode(encoded_args, True)
-        logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
+        logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail)
 
         def body_callback(method, url_bytes, headers_dict):
             self.sign_request(destination, method, url_bytes, headers_dict)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 8051a7a842..6abb33bb3f 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -73,6 +73,13 @@ class _NotifierUserStream(object):
         self.user_id = user_id
         self.rooms = set(rooms)
         self.current_token = current_token
+
+        # The last token for which we should wake up any streams that have a
+        # token that comes before it. This gets updated everytime we get poked.
+        # We start it at the current token since if we get any streams
+        # that have a token from before we have no idea whether they should be
+        # woken up or not, so lets just wake them up.
+        self.last_notified_token = current_token
         self.last_notified_ms = time_now_ms
 
         with PreserveLoggingContext():
@@ -89,6 +96,7 @@ class _NotifierUserStream(object):
         self.current_token = self.current_token.copy_and_advance(
             stream_key, stream_id
         )
+        self.last_notified_token = self.current_token
         self.last_notified_ms = time_now_ms
         noify_deferred = self.notify_deferred
 
@@ -113,8 +121,14 @@ class _NotifierUserStream(object):
     def new_listener(self, token):
         """Returns a deferred that is resolved when there is a new token
         greater than the given token.
+
+        Args:
+            token: The token from which we are streaming from, i.e. we shouldn't
+                notify for things that happened before this.
         """
-        if self.current_token.is_after(token):
+        # Immediately wake up stream if something has already since happened
+        # since their last token.
+        if self.last_notified_token.is_after(token):
             return _NotificationListener(defer.succeed(self.current_token))
         else:
             return _NotificationListener(self.notify_deferred.observe())
@@ -294,40 +308,44 @@ class Notifier(object):
             self._register_with_keys(user_stream)
 
         result = None
+        prev_token = from_token
         if timeout:
             end_time = self.clock.time_msec() + timeout
 
-            prev_token = from_token
             while not result:
                 try:
-                    current_token = user_stream.current_token
-
-                    result = yield callback(prev_token, current_token)
-                    if result:
-                        break
-
                     now = self.clock.time_msec()
                     if end_time <= now:
                         break
 
                     # Now we wait for the _NotifierUserStream to be told there
                     # is a new token.
-                    # We need to supply the token we supplied to callback so
-                    # that we don't miss any current_token updates.
-                    prev_token = current_token
                     listener = user_stream.new_listener(prev_token)
                     with PreserveLoggingContext():
                         yield self.clock.time_bound_deferred(
                             listener.deferred,
                             time_out=(end_time - now) / 1000.
                         )
+
+                    current_token = user_stream.current_token
+
+                    result = yield callback(prev_token, current_token)
+                    if result:
+                        break
+
+                    # Update the prev_token to the current_token since nothing
+                    # has happened between the old prev_token and the current_token
+                    prev_token = current_token
                 except DeferredTimedOutError:
                     break
                 except defer.CancelledError:
                     break
-        else:
+
+        if result is None:
+            # This happened if there was no timeout or if the timeout had
+            # already expired.
             current_token = user_stream.current_token
-            result = yield callback(from_token, current_token)
+            result = yield callback(prev_token, current_token)
 
         defer.returnValue(result)
 
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 62d794f22b..3a50c72e0b 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -139,7 +139,7 @@ class Mailer(object):
 
         @defer.inlineCallbacks
         def _fetch_room_state(room_id):
-            room_state = yield self.state_handler.get_current_state_ids(room_id)
+            room_state = yield self.store.get_current_state_ids(room_id)
             state_by_room[room_id] = room_state
 
         # Run at most 3 of these at once: sync does 10 at a time but email
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 622b2d8540..518c9ea2e9 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -109,6 +109,10 @@ class SlavedEventStore(BaseSlavedStore):
     get_recent_event_ids_for_room = (
         StreamStore.__dict__["get_recent_event_ids_for_room"]
     )
+    get_current_state_ids = (
+        StateStore.__dict__["get_current_state_ids"]
+    )
+    has_room_changed_since = DataStore.has_room_changed_since.__func__
 
     get_unread_push_actions_for_user_in_range_for_http = (
         DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py
index a1feaf3d54..b57ba95d24 100644
--- a/synapse/rest/client/v2_alpha/devices.py
+++ b/synapse/rest/client/v2_alpha/devices.py
@@ -46,6 +46,52 @@ class DevicesRestServlet(servlet.RestServlet):
         defer.returnValue((200, {"devices": devices}))
 
 
+class DeleteDevicesRestServlet(servlet.RestServlet):
+    """
+    API for bulk deletion of devices. Accepts a JSON object with a devices
+    key which lists the device_ids to delete. Requires user interactive auth.
+    """
+    PATTERNS = client_v2_patterns("/delete_devices", releases=[], v2_alpha=False)
+
+    def __init__(self, hs):
+        super(DeleteDevicesRestServlet, self).__init__()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.device_handler = hs.get_device_handler()
+        self.auth_handler = hs.get_auth_handler()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        try:
+            body = servlet.parse_json_object_from_request(request)
+        except errors.SynapseError as e:
+            if e.errcode == errors.Codes.NOT_JSON:
+                # deal with older clients which didn't pass a J*DELETESON dict
+                # the same as those that pass an empty dict
+                body = {}
+            else:
+                raise e
+
+        if 'devices' not in body:
+            raise errors.SynapseError(
+                400, "No devices supplied", errcode=errors.Codes.MISSING_PARAM
+            )
+
+        authed, result, params, _ = yield self.auth_handler.check_auth([
+            [constants.LoginType.PASSWORD],
+        ], body, self.hs.get_ip_from_request(request))
+
+        if not authed:
+            defer.returnValue((401, result))
+
+        requester = yield self.auth.get_user_by_req(request)
+        yield self.device_handler.delete_devices(
+            requester.user.to_string(),
+            body['devices'],
+        )
+        defer.returnValue((200, {}))
+
+
 class DeviceRestServlet(servlet.RestServlet):
     PATTERNS = client_v2_patterns("/devices/(?P<device_id>[^/]*)$",
                                   releases=[], v2_alpha=False)
@@ -111,5 +157,6 @@ class DeviceRestServlet(servlet.RestServlet):
 
 
 def register_servlets(hs, http_server):
+    DeleteDevicesRestServlet(hs).register(http_server)
     DevicesRestServlet(hs).register(http_server)
     DeviceRestServlet(hs).register(http_server)
diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py
index dfb87ffd15..6788375e85 100644
--- a/synapse/rest/media/v1/download_resource.py
+++ b/synapse/rest/media/v1/download_resource.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import synapse.http.servlet
 
 from ._base import parse_media_id, respond_with_file, respond_404
 from twisted.web.resource import Resource
@@ -81,6 +82,17 @@ class DownloadResource(Resource):
 
     @defer.inlineCallbacks
     def _respond_remote_file(self, request, server_name, media_id, name):
+        # don't forward requests for remote media if allow_remote is false
+        allow_remote = synapse.http.servlet.parse_boolean(
+            request, "allow_remote", default=True)
+        if not allow_remote:
+            logger.info(
+                "Rejecting request for remote media %s/%s due to allow_remote",
+                server_name, media_id,
+            )
+            respond_404(request)
+            return
+
         media_info = yield self.media_repo.get_remote_media(server_name, media_id)
 
         media_type = media_info["media_type"]
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 481ffee200..c43b185e08 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -13,22 +13,23 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer, threads
+import twisted.internet.error
+import twisted.web.http
+from twisted.web.resource import Resource
+
 from .upload_resource import UploadResource
 from .download_resource import DownloadResource
 from .thumbnail_resource import ThumbnailResource
 from .identicon_resource import IdenticonResource
 from .preview_url_resource import PreviewUrlResource
 from .filepath import MediaFilePaths
-
-from twisted.web.resource import Resource
-
 from .thumbnailer import Thumbnailer
 
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.util.stringutils import random_string
-from synapse.api.errors import SynapseError
-
-from twisted.internet import defer, threads
+from synapse.api.errors import SynapseError, HttpResponseException, \
+    NotFoundError
 
 from synapse.util.async import Linearizer
 from synapse.util.stringutils import is_ascii
@@ -157,11 +158,34 @@ class MediaRepository(object):
                 try:
                     length, headers = yield self.client.get_file(
                         server_name, request_path, output_stream=f,
-                        max_size=self.max_upload_size,
+                        max_size=self.max_upload_size, args={
+                            # tell the remote server to 404 if it doesn't
+                            # recognise the server_name, to make sure we don't
+                            # end up with a routing loop.
+                            "allow_remote": "false",
+                        }
                     )
-                except Exception as e:
-                    logger.warn("Failed to fetch remoted media %r", e)
-                    raise SynapseError(502, "Failed to fetch remoted media")
+                except twisted.internet.error.DNSLookupError as e:
+                    logger.warn("HTTP error fetching remote media %s/%s: %r",
+                                server_name, media_id, e)
+                    raise NotFoundError()
+
+                except HttpResponseException as e:
+                    logger.warn("HTTP error fetching remote media %s/%s: %s",
+                                server_name, media_id, e.response)
+                    if e.code == twisted.web.http.NOT_FOUND:
+                        raise SynapseError.from_http_response_exception(e)
+                    raise SynapseError(502, "Failed to fetch remote media")
+
+                except SynapseError:
+                    logger.exception("Failed to fetch remote media %s/%s",
+                                     server_name, media_id)
+                    raise
+
+                except Exception:
+                    logger.exception("Failed to fetch remote media %s/%s",
+                                     server_name, media_id)
+                    raise SynapseError(502, "Failed to fetch remote media")
 
             media_type = headers["Content-Type"][0]
             time_now_ms = self.clock.time_msec()
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index a7a8ec9b7b..13b106bba1 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -840,6 +840,47 @@ class SQLBaseStore(object):
 
         return txn.execute(sql, keyvalues.values())
 
+    def _simple_delete_many(self, table, column, iterable, keyvalues, desc):
+        return self.runInteraction(
+            desc, self._simple_delete_many_txn, table, column, iterable, keyvalues
+        )
+
+    @staticmethod
+    def _simple_delete_many_txn(txn, table, column, iterable, keyvalues):
+        """Executes a DELETE query on the named table.
+
+        Filters rows by if value of `column` is in `iterable`.
+
+        Args:
+            txn : Transaction object
+            table : string giving the table name
+            column : column name to test for inclusion against `iterable`
+            iterable : list
+            keyvalues : dict of column names and values to select the rows with
+        """
+        if not iterable:
+            return
+
+        sql = "DELETE FROM %s" % table
+
+        clauses = []
+        values = []
+        clauses.append(
+            "%s IN (%s)" % (column, ",".join("?" for _ in iterable))
+        )
+        values.extend(iterable)
+
+        for key, value in keyvalues.items():
+            clauses.append("%s = ?" % (key,))
+            values.append(value)
+
+        if clauses:
+            sql = "%s WHERE %s" % (
+                sql,
+                " AND ".join(clauses),
+            )
+        return txn.execute(sql, values)
+
     def _get_cache_dict(self, db_conn, table, entity_column, stream_column,
                         max_value, limit=100000):
         # Fetch a mapping of room_id -> max stream position for "recent" rooms.
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index bd56ba2515..563071b7a9 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -108,6 +108,23 @@ class DeviceStore(SQLBaseStore):
             desc="delete_device",
         )
 
+    def delete_devices(self, user_id, device_ids):
+        """Deletes several devices.
+
+        Args:
+            user_id (str): The ID of the user which owns the devices
+            device_ids (list): The IDs of the devices to delete
+        Returns:
+            defer.Deferred
+        """
+        return self._simple_delete_many(
+            table="devices",
+            column="device_id",
+            iterable=device_ids,
+            keyvalues={"user_id": user_id},
+            desc="delete_devices",
+        )
+
     def update_device(self, user_id, device_id, new_display_name=None):
         """Update a device.
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index db01eb6d14..72319c35ae 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -433,23 +433,43 @@ class EventsStore(SQLBaseStore):
         if not new_latest_event_ids:
             current_state = {}
         elif was_updated:
+            # We work out the current state by passing the state sets to the
+            # state resolution algorithm. It may ask for some events, including
+            # the events we have yet to persist, so we need a slightly more
+            # complicated event lookup function than simply looking the events
+            # up in the db.
+            events_map = {ev.event_id: ev for ev, _ in events_context}
+
+            @defer.inlineCallbacks
+            def get_events(ev_ids):
+                # We get the events by first looking at the list of events we
+                # are trying to persist, and then fetching the rest from the DB.
+                db = []
+                to_return = {}
+                for ev_id in ev_ids:
+                    ev = events_map.get(ev_id, None)
+                    if ev:
+                        to_return[ev_id] = ev
+                    else:
+                        db.append(ev_id)
+
+                if db:
+                    evs = yield self.get_events(
+                        ev_ids, get_prev_content=False, check_redacted=False,
+                    )
+                    to_return.update(evs)
+                defer.returnValue(to_return)
+
             current_state = yield resolve_events(
                 state_sets,
-                state_map_factory=lambda ev_ids: self.get_events(
-                    ev_ids, get_prev_content=False, check_redacted=False,
-                ),
+                state_map_factory=get_events,
             )
         else:
             return
 
-        existing_state_rows = yield self._simple_select_list(
-            table="current_state_events",
-            keyvalues={"room_id": room_id},
-            retcols=["event_id", "type", "state_key"],
-            desc="_calculate_state_delta",
-        )
+        existing_state = yield self.get_current_state_ids(room_id)
 
-        existing_events = set(row["event_id"] for row in existing_state_rows)
+        existing_events = set(existing_state.itervalues())
         new_events = set(ev_id for ev_id in current_state.itervalues())
         changed_events = existing_events ^ new_events
 
@@ -457,9 +477,8 @@ class EventsStore(SQLBaseStore):
             return
 
         to_delete = {
-            (row["type"], row["state_key"]): row["event_id"]
-            for row in existing_state_rows
-            if row["event_id"] in changed_events
+            key: ev_id for key, ev_id in existing_state.iteritems()
+            if ev_id in changed_events
         }
         events_to_insert = (new_events - existing_events)
         to_insert = {
@@ -585,6 +604,10 @@ class EventsStore(SQLBaseStore):
                     txn, self.get_users_in_room, (room_id,)
                 )
 
+                self._invalidate_cache_and_stream(
+                    txn, self.get_current_state_ids, (room_id,)
+                )
+
         for room_id, new_extrem in new_forward_extremeties.items():
             self._simple_delete_txn(
                 txn,
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 84482d8285..27f1ec89ec 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
 from synapse.util.caches import intern_string
 from synapse.storage.engines import PostgresEngine
 
@@ -69,6 +69,18 @@ class StateStore(SQLBaseStore):
             where_clause="type='m.room.member'",
         )
 
+    @cachedInlineCallbacks(max_entries=100000, iterable=True)
+    def get_current_state_ids(self, room_id):
+        rows = yield self._simple_select_list(
+            table="current_state_events",
+            keyvalues={"room_id": room_id},
+            retcols=["event_id", "type", "state_key"],
+            desc="_calculate_state_delta",
+        )
+        defer.returnValue({
+            (r["type"], r["state_key"]): r["event_id"] for r in rows
+        })
+
     @defer.inlineCallbacks
     def get_state_groups_ids(self, room_id, event_ids):
         if not event_ids:
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 200d124632..dddd5fc0e7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -829,3 +829,6 @@ class StreamStore(SQLBaseStore):
             updatevalues={"stream_id": stream_id},
             desc="update_federation_out_pos",
         )
+
+    def has_room_changed_since(self, room_id, stream_id):
+        return self._events_stream_cache.has_entity_changed(room_id, stream_id)