summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/appservice/__init__.py3
-rw-r--r--synapse/appservice/api.py8
-rw-r--r--synapse/federation/federation_client.py7
-rw-r--r--synapse/federation/transport/client.py9
-rw-r--r--synapse/federation/transport/server.py19
-rw-r--r--synapse/handlers/directory.py19
-rw-r--r--synapse/handlers/e2e_keys.py10
-rw-r--r--synapse/handlers/federation.py7
-rw-r--r--synapse/handlers/message.py60
-rw-r--r--synapse/handlers/room_list.py67
-rw-r--r--synapse/handlers/sync.py12
-rw-r--r--synapse/http/servlet.py8
-rw-r--r--synapse/replication/resource.py2
-rw-r--r--synapse/replication/slave/storage/room.py3
-rw-r--r--synapse/rest/client/v1/directory.py34
-rw-r--r--synapse/rest/client/v1/room.py19
-rw-r--r--synapse/rest/client/v2_alpha/keys.py43
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py5
-rw-r--r--synapse/storage/deviceinbox.py3
-rw-r--r--synapse/storage/room.py187
-rw-r--r--synapse/storage/roommember.py6
-rw-r--r--synapse/storage/schema/delta/39/appservice_room_list.sql29
-rw-r--r--synapse/types.py34
-rw-r--r--synapse/util/async.py58
25 files changed, 532 insertions, 122 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index f32c28be02..f006e10dc5 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.18.5-rc2"
+__version__ = "0.18.5"
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 91471f7e89..b0106a3597 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -89,6 +89,9 @@ class ApplicationService(object):
         self.namespaces = self._check_namespaces(namespaces)
         self.id = id
 
+        if "|" in self.id:
+            raise Exception("application service ID cannot contain '|' character")
+
         # .protocols is a publicly visible field
         if protocols:
             self.protocols = set(protocols)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index b0eb0c6d9d..6893610e71 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -19,6 +19,7 @@ from synapse.api.errors import CodeMessageException
 from synapse.http.client import SimpleHttpClient
 from synapse.events.utils import serialize_event
 from synapse.util.caches.response_cache import ResponseCache
+from synapse.types import ThirdPartyInstanceID
 
 import logging
 import urllib
@@ -177,6 +178,13 @@ class ApplicationServiceApi(SimpleHttpClient):
                                    " valid result", uri)
                     defer.returnValue(None)
 
+                for instance in info.get("instances", []):
+                    network_id = instance.get("network_id", None)
+                    if network_id is not None:
+                        instance["instance_id"] = ThirdPartyInstanceID(
+                            service.id, network_id,
+                        ).to_string()
+
                 defer.returnValue(info)
             except Exception as ex:
                 logger.warning("query_3pe_protocol to %s threw exception %s",
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index b255709165..6e23c207ee 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -655,12 +655,15 @@ class FederationClient(FederationBase):
         raise RuntimeError("Failed to send to any server.")
 
     def get_public_rooms(self, destination, limit=None, since_token=None,
-                         search_filter=None):
+                         search_filter=None, include_all_networks=False,
+                         third_party_instance_id=None):
         if destination == self.server_name:
             return
 
         return self.transport_layer.get_public_rooms(
-            destination, limit, since_token, search_filter
+            destination, limit, since_token, search_filter,
+            include_all_networks=include_all_networks,
+            third_party_instance_id=third_party_instance_id,
         )
 
     @defer.inlineCallbacks
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index db45c7826c..491cdc29e1 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -249,10 +249,15 @@ class TransportLayerClient(object):
     @defer.inlineCallbacks
     @log_function
     def get_public_rooms(self, remote_server, limit, since_token,
-                         search_filter=None):
+                         search_filter=None, include_all_networks=False,
+                         third_party_instance_id=None):
         path = PREFIX + "/publicRooms"
 
-        args = {}
+        args = {
+            "include_all_networks": "true" if include_all_networks else "false",
+        }
+        if third_party_instance_id:
+            args["third_party_instance_id"] = third_party_instance_id,
         if limit:
             args["limit"] = [str(limit)]
         if since_token:
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index fec337be64..159dbd1747 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -20,9 +20,11 @@ from synapse.api.errors import Codes, SynapseError
 from synapse.http.server import JsonResource
 from synapse.http.servlet import (
     parse_json_object_from_request, parse_integer_from_args, parse_string_from_args,
+    parse_boolean_from_args,
 )
 from synapse.util.ratelimitutils import FederationRateLimiter
 from synapse.util.versionstring import get_version_string
+from synapse.types import ThirdPartyInstanceID
 
 import functools
 import logging
@@ -558,8 +560,23 @@ class PublicRoomList(BaseFederationServlet):
     def on_GET(self, origin, content, query):
         limit = parse_integer_from_args(query, "limit", 0)
         since_token = parse_string_from_args(query, "since", None)
+        include_all_networks = parse_boolean_from_args(
+            query, "include_all_networks", False
+        )
+        third_party_instance_id = parse_string_from_args(
+            query, "third_party_instance_id", None
+        )
+
+        if include_all_networks:
+            network_tuple = None
+        elif third_party_instance_id:
+            network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)
+        else:
+            network_tuple = ThirdPartyInstanceID(None, None)
+
         data = yield self.room_list_handler.get_local_public_room_list(
-            limit, since_token
+            limit, since_token,
+            network_tuple=network_tuple
         )
         defer.returnValue((200, data))
 
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index c00274afc3..1b5317edf5 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -339,3 +339,22 @@ class DirectoryHandler(BaseHandler):
         yield self.auth.check_can_change_room_list(room_id, requester.user)
 
         yield self.store.set_room_is_public(room_id, visibility == "public")
+
+    @defer.inlineCallbacks
+    def edit_published_appservice_room_list(self, appservice_id, network_id,
+                                            room_id, visibility):
+        """Add or remove a room from the appservice/network specific public
+        room list.
+
+        Args:
+            appservice_id (str): ID of the appservice that owns the list
+            network_id (str): The ID of the network the list is associated with
+            room_id (str)
+            visibility (str): either "public" or "private"
+        """
+        if visibility not in ["public", "private"]:
+            raise SynapseError(400, "Invalid visibility setting")
+
+        yield self.store.set_room_is_public_appservice(
+            room_id, appservice_id, network_id, visibility == "public"
+        )
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index fd11935b40..b63a660c06 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -111,6 +111,11 @@ class E2eKeysHandler(object):
                 failures[destination] = {
                     "status": 503, "message": "Not ready for retry",
                 }
+            except Exception as e:
+                # include ConnectionRefused and other errors
+                failures[destination] = {
+                    "status": 503, "message": e.message
+                }
 
         yield preserve_context_over_deferred(defer.gatherResults([
             preserve_fn(do_remote_query)(destination)
@@ -222,6 +227,11 @@ class E2eKeysHandler(object):
                 failures[destination] = {
                     "status": 503, "message": "Not ready for retry",
                 }
+            except Exception as e:
+                # include ConnectionRefused and other errors
+                failures[destination] = {
+                    "status": 503, "message": e.message
+                }
 
         yield preserve_context_over_deferred(defer.gatherResults([
             preserve_fn(claim_client_keys)(destination)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 771ab3bc43..1d07e4d02b 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -252,9 +252,12 @@ class FederationHandler(BaseHandler):
             except:
                 return False
 
+        # Parses mapping `event_id -> (type, state_key) -> state event_id`
+        # to get all state ids that we're interested in.
         event_map = yield self.store.get_events([
-            e_id for key_to_eid in event_to_state_ids.values()
-            for key, e_id in key_to_eid
+            e_id
+            for key_to_eid in event_to_state_ids.values()
+            for key, e_id in key_to_eid.items()
             if key[0] != EventTypes.Member or check_match(key[1])
         ])
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index fd09397226..7a57a69bd3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -24,7 +24,7 @@ from synapse.push.action_generator import ActionGenerator
 from synapse.types import (
     UserID, RoomAlias, RoomStreamToken,
 )
-from synapse.util.async import run_on_reactor, ReadWriteLock
+from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
 from synapse.util.logcontext import preserve_fn
 from synapse.util.metrics import measure_func
 from synapse.visibility import filter_events_for_client
@@ -50,6 +50,10 @@ class MessageHandler(BaseHandler):
 
         self.pagination_lock = ReadWriteLock()
 
+        # We arbitrarily limit concurrent event creation for a room to 5.
+        # This is to stop us from diverging history *too* much.
+        self.limiter = Limiter(max_count=5)
+
     @defer.inlineCallbacks
     def purge_history(self, room_id, event_id):
         event = yield self.store.get_event(event_id)
@@ -191,36 +195,38 @@ class MessageHandler(BaseHandler):
         """
         builder = self.event_builder_factory.new(event_dict)
 
-        self.validator.validate_new(builder)
-
-        if builder.type == EventTypes.Member:
-            membership = builder.content.get("membership", None)
-            target = UserID.from_string(builder.state_key)
+        with (yield self.limiter.queue(builder.room_id)):
+            self.validator.validate_new(builder)
+
+            if builder.type == EventTypes.Member:
+                membership = builder.content.get("membership", None)
+                target = UserID.from_string(builder.state_key)
+
+                if membership in {Membership.JOIN, Membership.INVITE}:
+                    # If event doesn't include a display name, add one.
+                    profile = self.hs.get_handlers().profile_handler
+                    content = builder.content
+
+                    try:
+                        content["displayname"] = yield profile.get_displayname(target)
+                        content["avatar_url"] = yield profile.get_avatar_url(target)
+                    except Exception as e:
+                        logger.info(
+                            "Failed to get profile information for %r: %s",
+                            target, e
+                        )
 
-            if membership in {Membership.JOIN, Membership.INVITE}:
-                # If event doesn't include a display name, add one.
-                profile = self.hs.get_handlers().profile_handler
-                content = builder.content
+            if token_id is not None:
+                builder.internal_metadata.token_id = token_id
 
-                try:
-                    content["displayname"] = yield profile.get_displayname(target)
-                    content["avatar_url"] = yield profile.get_avatar_url(target)
-                except Exception as e:
-                    logger.info(
-                        "Failed to get profile information for %r: %s",
-                        target, e
-                    )
+            if txn_id is not None:
+                builder.internal_metadata.txn_id = txn_id
 
-        if token_id is not None:
-            builder.internal_metadata.token_id = token_id
-
-        if txn_id is not None:
-            builder.internal_metadata.txn_id = txn_id
+            event, context = yield self._create_new_client_event(
+                builder=builder,
+                prev_event_ids=prev_event_ids,
+            )
 
-        event, context = yield self._create_new_client_event(
-            builder=builder,
-            prev_event_ids=prev_event_ids,
-        )
         defer.returnValue((event, context))
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index b04aea0110..19eebbd43f 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -22,6 +22,7 @@ from synapse.api.constants import (
 )
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
+from synapse.types import ThirdPartyInstanceID
 
 from collections import namedtuple
 from unpaddedbase64 import encode_base64, decode_base64
@@ -34,6 +35,10 @@ logger = logging.getLogger(__name__)
 REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
 
 
+# This is used to indicate we should only return rooms published to the main list.
+EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
+
+
 class RoomListHandler(BaseHandler):
     def __init__(self, hs):
         super(RoomListHandler, self).__init__(hs)
@@ -41,22 +46,44 @@ class RoomListHandler(BaseHandler):
         self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
 
     def get_local_public_room_list(self, limit=None, since_token=None,
-                                   search_filter=None):
+                                   search_filter=None,
+                                   network_tuple=EMTPY_THIRD_PARTY_ID,):
+        """Generate a local public room list.
+
+        There are multiple different lists: the main one plus one per third
+        party network. A client can ask for a specific list or to return all.
+
+        Args:
+            limit (int)
+            since_token (str)
+            search_filter (dict)
+            network_tuple (ThirdPartyInstanceID): Which public list to use.
+                This can be (None, None) to indicate the main list, or a particular
+                appservice and network id to use an appservice specific one.
+                Setting to None returns all public rooms across all lists.
+        """
         if search_filter:
-            # We explicitly don't bother caching searches.
-            return self._get_public_room_list(limit, since_token, search_filter)
+            # We explicitly don't bother caching searches or requests for
+            # appservice specific lists.
+            return self._get_public_room_list(
+                limit, since_token, search_filter, network_tuple=network_tuple,
+            )
 
-        result = self.response_cache.get((limit, since_token))
+        key = (limit, since_token, network_tuple)
+        result = self.response_cache.get(key)
         if not result:
             result = self.response_cache.set(
-                (limit, since_token),
-                self._get_public_room_list(limit, since_token)
+                key,
+                self._get_public_room_list(
+                    limit, since_token, network_tuple=network_tuple
+                )
             )
         return result
 
     @defer.inlineCallbacks
     def _get_public_room_list(self, limit=None, since_token=None,
-                              search_filter=None):
+                              search_filter=None,
+                              network_tuple=EMTPY_THIRD_PARTY_ID,):
         if since_token and since_token != "END":
             since_token = RoomListNextBatch.from_token(since_token)
         else:
@@ -73,14 +100,15 @@ class RoomListHandler(BaseHandler):
             current_public_id = yield self.store.get_current_public_room_stream_id()
             public_room_stream_id = since_token.public_room_stream_id
             newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
-                public_room_stream_id, current_public_id
+                public_room_stream_id, current_public_id,
+                network_tuple=network_tuple,
             )
         else:
             stream_token = yield self.store.get_room_max_stream_ordering()
             public_room_stream_id = yield self.store.get_current_public_room_stream_id()
 
         room_ids = yield self.store.get_public_room_ids_at_stream_id(
-            public_room_stream_id
+            public_room_stream_id, network_tuple=network_tuple,
         )
 
         # We want to return rooms in a particular order: the number of joined
@@ -311,7 +339,8 @@ class RoomListHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
-                                    search_filter=None):
+                                    search_filter=None, include_all_networks=False,
+                                    third_party_instance_id=None,):
         if search_filter:
             # We currently don't support searching across federation, so we have
             # to do it manually without pagination
@@ -320,6 +349,8 @@ class RoomListHandler(BaseHandler):
 
         res = yield self._get_remote_list_cached(
             server_name, limit=limit, since_token=since_token,
+            include_all_networks=include_all_networks,
+            third_party_instance_id=third_party_instance_id,
         )
 
         if search_filter:
@@ -332,22 +363,30 @@ class RoomListHandler(BaseHandler):
         defer.returnValue(res)
 
     def _get_remote_list_cached(self, server_name, limit=None, since_token=None,
-                                search_filter=None):
+                                search_filter=None, include_all_networks=False,
+                                third_party_instance_id=None,):
         repl_layer = self.hs.get_replication_layer()
         if search_filter:
             # We can't cache when asking for search
             return repl_layer.get_public_rooms(
                 server_name, limit=limit, since_token=since_token,
-                search_filter=search_filter,
+                search_filter=search_filter, include_all_networks=include_all_networks,
+                third_party_instance_id=third_party_instance_id,
             )
 
-        result = self.remote_response_cache.get((server_name, limit, since_token))
+        key = (
+            server_name, limit, since_token, include_all_networks,
+            third_party_instance_id,
+        )
+        result = self.remote_response_cache.get(key)
         if not result:
             result = self.remote_response_cache.set(
-                (server_name, limit, since_token),
+                key,
                 repl_layer.get_public_rooms(
                     server_name, limit=limit, since_token=since_token,
                     search_filter=search_filter,
+                    include_all_networks=include_all_networks,
+                    third_party_instance_id=third_party_instance_id,
                 )
             )
         return result
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b62773dcbe..c880f61685 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -576,16 +576,20 @@ class SyncHandler(object):
             # We only delete messages when a new message comes in, but that's
             # fine so long as we delete them at some point.
 
-            logger.debug("Deleting messages up to %d", since_stream_id)
-            yield self.store.delete_messages_for_device(
+            deleted = yield self.store.delete_messages_for_device(
                 user_id, device_id, since_stream_id
             )
+            logger.info("Deleted %d to-device messages up to %d",
+                        deleted, since_stream_id)
 
-            logger.debug("Getting messages up to %d", now_token.to_device_key)
             messages, stream_id = yield self.store.get_new_messages_for_device(
                 user_id, device_id, since_stream_id, now_token.to_device_key
             )
-            logger.debug("Got messages up to %d: %r", stream_id, messages)
+
+            logger.info(
+                "Returning %d to-device messages between %d and %d (current token: %d)",
+                len(messages), since_stream_id, stream_id, now_token.to_device_key
+            )
             sync_result_builder.now_token = now_token.copy_and_replace(
                 "to_device_key", stream_id
             )
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 9346386238..8c22d6f00f 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -78,12 +78,16 @@ def parse_boolean(request, name, default=None, required=False):
             parameter is present and not one of "true" or "false".
     """
 
-    if name in request.args:
+    return parse_boolean_from_args(request.args, name, default, required)
+
+
+def parse_boolean_from_args(args, name, default=None, required=False):
+    if name in args:
         try:
             return {
                 "true": True,
                 "false": False,
-            }[request.args[name][0]]
+            }[args[name][0]]
         except:
             message = (
                 "Boolean query parameter %r must be one of"
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index d79b421cba..4616e9b34a 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -475,7 +475,7 @@ class ReplicationResource(Resource):
             )
             upto_token = _position_from_rows(public_rooms_rows, current_position)
             writer.write_header_and_rows("public_rooms", public_rooms_rows, (
-                "position", "room_id", "visibility"
+                "position", "room_id", "visibility", "appservice_id", "network_id",
             ), position=upto_token)
 
     def federation(self, writer, current_token, limit, request_streams, federation_ack):
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index 23c613863f..6df9a25ef3 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -15,6 +15,7 @@
 
 from ._base import BaseSlavedStore
 from synapse.storage import DataStore
+from synapse.storage.room import RoomStore
 from ._slaved_id_tracker import SlavedIdTracker
 
 
@@ -30,7 +31,7 @@ class RoomStore(BaseSlavedStore):
         DataStore.get_current_public_room_stream_id.__func__
     )
     get_public_room_ids_at_stream_id = (
-        DataStore.get_public_room_ids_at_stream_id.__func__
+        RoomStore.__dict__["get_public_room_ids_at_stream_id"]
     )
     get_public_room_ids_at_stream_id_txn = (
         DataStore.get_public_room_ids_at_stream_id_txn.__func__
diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py
index 09d0831594..8930f1826f 100644
--- a/synapse/rest/client/v1/directory.py
+++ b/synapse/rest/client/v1/directory.py
@@ -31,6 +31,7 @@ logger = logging.getLogger(__name__)
 def register_servlets(hs, http_server):
     ClientDirectoryServer(hs).register(http_server)
     ClientDirectoryListServer(hs).register(http_server)
+    ClientAppserviceDirectoryListServer(hs).register(http_server)
 
 
 class ClientDirectoryServer(ClientV1RestServlet):
@@ -184,3 +185,36 @@ class ClientDirectoryListServer(ClientV1RestServlet):
         )
 
         defer.returnValue((200, {}))
+
+
+class ClientAppserviceDirectoryListServer(ClientV1RestServlet):
+    PATTERNS = client_path_patterns(
+        "/directory/list/appservice/(?P<network_id>[^/]*)/(?P<room_id>[^/]*)$"
+    )
+
+    def __init__(self, hs):
+        super(ClientAppserviceDirectoryListServer, self).__init__(hs)
+        self.store = hs.get_datastore()
+        self.handlers = hs.get_handlers()
+
+    def on_PUT(self, request, network_id, room_id):
+        content = parse_json_object_from_request(request)
+        visibility = content.get("visibility", "public")
+        return self._edit(request, network_id, room_id, visibility)
+
+    def on_DELETE(self, request, network_id, room_id):
+        return self._edit(request, network_id, room_id, "private")
+
+    @defer.inlineCallbacks
+    def _edit(self, request, network_id, room_id, visibility):
+        requester = yield self.auth.get_user_by_req(request)
+        if not requester.app_service:
+            raise AuthError(
+                403, "Only appservices can edit the appservice published room list"
+            )
+
+        yield self.handlers.directory_handler.edit_published_appservice_room_list(
+            requester.app_service.id, network_id, room_id, visibility,
+        )
+
+        defer.returnValue((200, {}))
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index a0bba1fa3b..eead435bfd 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -21,7 +21,7 @@ from synapse.api.errors import SynapseError, Codes, AuthError
 from synapse.streams.config import PaginationConfig
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.filtering import Filter
-from synapse.types import UserID, RoomID, RoomAlias
+from synapse.types import UserID, RoomID, RoomAlias, ThirdPartyInstanceID
 from synapse.events.utils import serialize_event, format_event_for_client_v2
 from synapse.http.servlet import (
     parse_json_object_from_request, parse_string, parse_integer
@@ -321,6 +321,20 @@ class PublicRoomListRestServlet(ClientV1RestServlet):
         since_token = content.get("since", None)
         search_filter = content.get("filter", None)
 
+        include_all_networks = content.get("include_all_networks", False)
+        third_party_instance_id = content.get("third_party_instance_id", None)
+
+        if include_all_networks:
+            network_tuple = None
+            if third_party_instance_id is not None:
+                raise SynapseError(
+                    400, "Can't use include_all_networks with an explicit network"
+                )
+        elif third_party_instance_id is None:
+            network_tuple = ThirdPartyInstanceID(None, None)
+        else:
+            network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)
+
         handler = self.hs.get_room_list_handler()
         if server:
             data = yield handler.get_remote_public_room_list(
@@ -328,12 +342,15 @@ class PublicRoomListRestServlet(ClientV1RestServlet):
                 limit=limit,
                 since_token=since_token,
                 search_filter=search_filter,
+                include_all_networks=include_all_networks,
+                third_party_instance_id=third_party_instance_id,
             )
         else:
             data = yield handler.get_local_public_room_list(
                 limit=limit,
                 since_token=since_token,
                 search_filter=search_filter,
+                network_tuple=network_tuple,
             )
 
         defer.returnValue((200, data))
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index 08b7c99d57..46789775b9 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -94,10 +94,6 @@ class KeyUploadServlet(RestServlet):
 
 class KeyQueryServlet(RestServlet):
     """
-    GET /keys/query/<user_id> HTTP/1.1
-
-    GET /keys/query/<user_id>/<device_id> HTTP/1.1
-
     POST /keys/query HTTP/1.1
     Content-Type: application/json
     {
@@ -131,11 +127,7 @@ class KeyQueryServlet(RestServlet):
     """
 
     PATTERNS = client_v2_patterns(
-        "/keys/query(?:"
-        "/(?P<user_id>[^/]*)(?:"
-        "/(?P<device_id>[^/]*)"
-        ")?"
-        ")?",
+        "/keys/query$",
         releases=()
     )
 
@@ -149,31 +141,16 @@ class KeyQueryServlet(RestServlet):
         self.e2e_keys_handler = hs.get_e2e_keys_handler()
 
     @defer.inlineCallbacks
-    def on_POST(self, request, user_id, device_id):
+    def on_POST(self, request):
         yield self.auth.get_user_by_req(request, allow_guest=True)
         timeout = parse_integer(request, "timeout", 10 * 1000)
         body = parse_json_object_from_request(request)
         result = yield self.e2e_keys_handler.query_devices(body, timeout)
         defer.returnValue((200, result))
 
-    @defer.inlineCallbacks
-    def on_GET(self, request, user_id, device_id):
-        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
-        timeout = parse_integer(request, "timeout", 10 * 1000)
-        auth_user_id = requester.user.to_string()
-        user_id = user_id if user_id else auth_user_id
-        device_ids = [device_id] if device_id else []
-        result = yield self.e2e_keys_handler.query_devices(
-            {"device_keys": {user_id: device_ids}},
-            timeout,
-        )
-        defer.returnValue((200, result))
-
 
 class OneTimeKeyServlet(RestServlet):
     """
-    GET /keys/claim/<user-id>/<device-id>/<algorithm> HTTP/1.1
-
     POST /keys/claim HTTP/1.1
     {
       "one_time_keys": {
@@ -191,9 +168,7 @@ class OneTimeKeyServlet(RestServlet):
 
     """
     PATTERNS = client_v2_patterns(
-        "/keys/claim(?:/?|(?:/"
-        "(?P<user_id>[^/]*)/(?P<device_id>[^/]*)/(?P<algorithm>[^/]*)"
-        ")?)",
+        "/keys/claim$",
         releases=()
     )
 
@@ -203,17 +178,7 @@ class OneTimeKeyServlet(RestServlet):
         self.e2e_keys_handler = hs.get_e2e_keys_handler()
 
     @defer.inlineCallbacks
-    def on_GET(self, request, user_id, device_id, algorithm):
-        yield self.auth.get_user_by_req(request, allow_guest=True)
-        timeout = parse_integer(request, "timeout", 10 * 1000)
-        result = yield self.e2e_keys_handler.claim_one_time_keys(
-            {"one_time_keys": {user_id: {device_id: algorithm}}},
-            timeout,
-        )
-        defer.returnValue((200, result))
-
-    @defer.inlineCallbacks
-    def on_POST(self, request, user_id, device_id, algorithm):
+    def on_POST(self, request):
         yield self.auth.get_user_by_req(request, allow_guest=True)
         timeout = parse_integer(request, "timeout", 10 * 1000)
         body = parse_json_object_from_request(request)
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 6a5a57102f..99760d622f 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -381,7 +381,10 @@ def _calc_og(tree, media_uri):
     if 'og:title' not in og:
         # do some basic spidering of the HTML
         title = tree.xpath("(//title)[1] | (//h1)[1] | (//h2)[1] | (//h3)[1]")
-        og['og:title'] = title[0].text.strip() if title else None
+        if title and title[0].text is not None:
+            og['og:title'] = title[0].text.strip()
+        else:
+            og['og:title'] = None
 
     if 'og:image' not in og:
         # TODO: extract a favicon failing all else
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 87398d60bc..2821eb89c9 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -242,7 +242,7 @@ class DeviceInboxStore(SQLBaseStore):
             device_id(str): The recipient device_id.
             up_to_stream_id(int): Where to delete messages up to.
         Returns:
-            A deferred that resolves when the messages have been deleted.
+            A deferred that resolves to the number of messages deleted.
         """
         def delete_messages_for_device_txn(txn):
             sql = (
@@ -251,6 +251,7 @@ class DeviceInboxStore(SQLBaseStore):
                 " AND stream_id <= ?"
             )
             txn.execute(sql, (user_id, device_id, up_to_stream_id))
+            return txn.rowcount
 
         return self.runInteraction(
             "delete_messages_for_device", delete_messages_for_device_txn
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 11813b44f6..8a2fe2fdf5 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -16,6 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.util.caches.descriptors import cached
 
 from ._base import SQLBaseStore
 from .engines import PostgresEngine, Sqlite3Engine
@@ -106,7 +107,11 @@ class RoomStore(SQLBaseStore):
             entries = self._simple_select_list_txn(
                 txn,
                 table="public_room_list_stream",
-                keyvalues={"room_id": room_id},
+                keyvalues={
+                    "room_id": room_id,
+                    "appservice_id": None,
+                    "network_id": None,
+                },
                 retcols=("stream_id", "visibility"),
             )
 
@@ -124,6 +129,8 @@ class RoomStore(SQLBaseStore):
                         "stream_id": next_id,
                         "room_id": room_id,
                         "visibility": is_public,
+                        "appservice_id": None,
+                        "network_id": None,
                     }
                 )
 
@@ -132,6 +139,87 @@ class RoomStore(SQLBaseStore):
                 "set_room_is_public",
                 set_room_is_public_txn, next_id,
             )
+        self.hs.get_notifier().on_new_replication_data()
+
+    @defer.inlineCallbacks
+    def set_room_is_public_appservice(self, room_id, appservice_id, network_id,
+                                      is_public):
+        """Edit the appservice/network specific public room list.
+
+        Each appservice can have a number of published room lists associated
+        with them, keyed off of an appservice defined `network_id`, which
+        basically represents a single instance of a bridge to a third party
+        network.
+
+        Args:
+            room_id (str)
+            appservice_id (str)
+            network_id (str)
+            is_public (bool): Whether to publish or unpublish the room from the
+                list.
+        """
+        def set_room_is_public_appservice_txn(txn, next_id):
+            if is_public:
+                try:
+                    self._simple_insert_txn(
+                        txn,
+                        table="appservice_room_list",
+                        values={
+                            "appservice_id": appservice_id,
+                            "network_id": network_id,
+                            "room_id": room_id
+                        },
+                    )
+                except self.database_engine.module.IntegrityError:
+                    # We've already inserted, nothing to do.
+                    return
+            else:
+                self._simple_delete_txn(
+                    txn,
+                    table="appservice_room_list",
+                    keyvalues={
+                        "appservice_id": appservice_id,
+                        "network_id": network_id,
+                        "room_id": room_id
+                    },
+                )
+
+            entries = self._simple_select_list_txn(
+                txn,
+                table="public_room_list_stream",
+                keyvalues={
+                    "room_id": room_id,
+                    "appservice_id": appservice_id,
+                    "network_id": network_id,
+                },
+                retcols=("stream_id", "visibility"),
+            )
+
+            entries.sort(key=lambda r: r["stream_id"])
+
+            add_to_stream = True
+            if entries:
+                add_to_stream = bool(entries[-1]["visibility"]) != is_public
+
+            if add_to_stream:
+                self._simple_insert_txn(
+                    txn,
+                    table="public_room_list_stream",
+                    values={
+                        "stream_id": next_id,
+                        "room_id": room_id,
+                        "visibility": is_public,
+                        "appservice_id": appservice_id,
+                        "network_id": network_id,
+                    }
+                )
+
+        with self._public_room_id_gen.get_next() as next_id:
+            yield self.runInteraction(
+                "set_room_is_public_appservice",
+                set_room_is_public_appservice_txn, next_id,
+            )
+        self.hs.get_notifier().on_new_replication_data()
 
     def get_public_room_ids(self):
         return self._simple_select_onecol(
@@ -259,38 +347,96 @@ class RoomStore(SQLBaseStore):
     def get_current_public_room_stream_id(self):
         return self._public_room_id_gen.get_current_token()
 
-    def get_public_room_ids_at_stream_id(self, stream_id):
+    @cached(num_args=2, max_entries=100)
+    def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
+        """Get pulbic rooms for a particular list, or across all lists.
+
+        Args:
+            stream_id (int)
+            network_tuple (ThirdPartyInstanceID): The list to use (None, None)
+                means the main list, None means all lsits.
+        """
         return self.runInteraction(
             "get_public_room_ids_at_stream_id",
-            self.get_public_room_ids_at_stream_id_txn, stream_id
+            self.get_public_room_ids_at_stream_id_txn,
+            stream_id, network_tuple=network_tuple
         )
 
-    def get_public_room_ids_at_stream_id_txn(self, txn, stream_id):
+    def get_public_room_ids_at_stream_id_txn(self, txn, stream_id,
+                                             network_tuple):
         return {
             rm
-            for rm, vis in self.get_published_at_stream_id_txn(txn, stream_id).items()
+            for rm, vis in self.get_published_at_stream_id_txn(
+                txn, stream_id, network_tuple=network_tuple
+            ).items()
             if vis
         }
 
-    def get_published_at_stream_id_txn(self, txn, stream_id):
-        sql = ("""
-            SELECT room_id, visibility FROM public_room_list_stream
-            INNER JOIN (
-                SELECT room_id, max(stream_id) AS stream_id
+    def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
+        if network_tuple:
+            # We want to get from a particular list. No aggregation required.
+
+            sql = ("""
+                SELECT room_id, visibility FROM public_room_list_stream
+                INNER JOIN (
+                    SELECT room_id, max(stream_id) AS stream_id
+                    FROM public_room_list_stream
+                    WHERE stream_id <= ? %s
+                    GROUP BY room_id
+                ) grouped USING (room_id, stream_id)
+            """)
+
+            if network_tuple.appservice_id is not None:
+                txn.execute(
+                    sql % ("AND appservice_id = ? AND network_id = ?",),
+                    (stream_id, network_tuple.appservice_id, network_tuple.network_id,)
+                )
+            else:
+                txn.execute(
+                    sql % ("AND appservice_id IS NULL",),
+                    (stream_id,)
+                )
+            return dict(txn.fetchall())
+        else:
+            # We want to get from all lists, so we need to aggregate the results
+
+            logger.info("Executing full list")
+
+            sql = ("""
+                SELECT room_id, visibility
                 FROM public_room_list_stream
-                WHERE stream_id <= ?
-                GROUP BY room_id
-            ) grouped USING (room_id, stream_id)
-        """)
+                INNER JOIN (
+                    SELECT
+                        room_id, max(stream_id) AS stream_id, appservice_id,
+                        network_id
+                    FROM public_room_list_stream
+                    WHERE stream_id <= ?
+                    GROUP BY room_id, appservice_id, network_id
+                ) grouped USING (room_id, stream_id)
+            """)
 
-        txn.execute(sql, (stream_id,))
-        return dict(txn.fetchall())
+            txn.execute(
+                sql,
+                (stream_id,)
+            )
+
+            results = {}
+            # A room is visible if its visible on any list.
+            for room_id, visibility in txn.fetchall():
+                results[room_id] = bool(visibility) or results.get(room_id, False)
 
-    def get_public_room_changes(self, prev_stream_id, new_stream_id):
+            return results
+
+    def get_public_room_changes(self, prev_stream_id, new_stream_id,
+                                network_tuple):
         def get_public_room_changes_txn(txn):
-            then_rooms = self.get_public_room_ids_at_stream_id_txn(txn, prev_stream_id)
+            then_rooms = self.get_public_room_ids_at_stream_id_txn(
+                txn, prev_stream_id, network_tuple
+            )
 
-            now_rooms_dict = self.get_published_at_stream_id_txn(txn, new_stream_id)
+            now_rooms_dict = self.get_published_at_stream_id_txn(
+                txn, new_stream_id, network_tuple
+            )
 
             now_rooms_visible = set(
                 rm for rm, vis in now_rooms_dict.items() if vis
@@ -311,7 +457,8 @@ class RoomStore(SQLBaseStore):
     def get_all_new_public_rooms(self, prev_id, current_id, limit):
         def get_all_new_public_rooms(txn):
             sql = ("""
-                SELECT stream_id, room_id, visibility FROM public_room_list_stream
+                SELECT stream_id, room_id, visibility, appservice_id, network_id
+                FROM public_room_list_stream
                 WHERE stream_id > ? AND stream_id <= ?
                 ORDER BY stream_id ASC
                 LIMIT ?
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index b2a45a38c1..946d5a81cc 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -483,9 +483,9 @@ class RoomMemberStore(SQLBaseStore):
 
         def add_membership_profile_txn(txn):
             sql = ("""
-                SELECT stream_ordering, event_id, room_id, content
+                SELECT stream_ordering, event_id, events.room_id, content
                 FROM events
-                INNER JOIN room_memberships USING (room_id, event_id)
+                INNER JOIN room_memberships USING (event_id)
                 WHERE ? <= stream_ordering AND stream_ordering < ?
                 AND type = 'm.room.member'
                 ORDER BY stream_ordering DESC
@@ -534,7 +534,7 @@ class RoomMemberStore(SQLBaseStore):
                 txn, _MEMBERSHIP_PROFILE_UPDATE_NAME, progress
             )
 
-            return len(to_update)
+            return len(rows)
 
         result = yield self.runInteraction(
             _MEMBERSHIP_PROFILE_UPDATE_NAME, add_membership_profile_txn
diff --git a/synapse/storage/schema/delta/39/appservice_room_list.sql b/synapse/storage/schema/delta/39/appservice_room_list.sql
new file mode 100644
index 0000000000..74bdc49073
--- /dev/null
+++ b/synapse/storage/schema/delta/39/appservice_room_list.sql
@@ -0,0 +1,29 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE appservice_room_list(
+    appservice_id TEXT NOT NULL,
+    network_id TEXT NOT NULL,
+    room_id TEXT NOT NULL
+);
+
+-- Each appservice can have multiple published room lists associated with them,
+-- keyed of a particular network_id
+CREATE UNIQUE INDEX appservice_room_list_idx ON appservice_room_list(
+    appservice_id, network_id, room_id
+);
+
+ALTER TABLE public_room_list_stream ADD COLUMN appservice_id TEXT;
+ALTER TABLE public_room_list_stream ADD COLUMN network_id TEXT;
diff --git a/synapse/types.py b/synapse/types.py
index ffab12df09..3a3ab21d17 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -274,3 +274,37 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
             return "t%d-%d" % (self.topological, self.stream)
         else:
             return "s%d" % (self.stream,)
+
+
+class ThirdPartyInstanceID(
+        namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id"))
+):
+    # Deny iteration because it will bite you if you try to create a singleton
+    # set by:
+    #    users = set(user)
+    def __iter__(self):
+        raise ValueError("Attempted to iterate a %s" % (type(self).__name__,))
+
+    # Because this class is a namedtuple of strings, it is deeply immutable.
+    def __copy__(self):
+        return self
+
+    def __deepcopy__(self, memo):
+        return self
+
+    @classmethod
+    def from_string(cls, s):
+        bits = s.split("|", 2)
+        if len(bits) != 2:
+            raise SynapseError(400, "Invalid ID %r" % (s,))
+
+        return cls(appservice_id=bits[0], network_id=bits[1])
+
+    def to_string(self):
+        return "%s|%s" % (self.appservice_id, self.network_id,)
+
+    __str__ = to_string
+
+    @classmethod
+    def create(cls, appservice_id, network_id,):
+        return cls(appservice_id=appservice_id, network_id=network_id)
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 347fb1e380..16ed183d4c 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -197,6 +197,64 @@ class Linearizer(object):
         defer.returnValue(_ctx_manager())
 
 
+class Limiter(object):
+    """Limits concurrent access to resources based on a key. Useful to ensure
+    only a few thing happen at a time on a given resource.
+
+    Example:
+
+        with (yield limiter.queue("test_key")):
+            # do some work.
+
+    """
+    def __init__(self, max_count):
+        """
+        Args:
+            max_count(int): The maximum number of concurrent access
+        """
+        self.max_count = max_count
+
+        # key_to_defer is a map from the key to a 2 element list where
+        # the first element is the number of things executing
+        # the second element is a list of deferreds for the things blocked from
+        # executing.
+        self.key_to_defer = {}
+
+    @defer.inlineCallbacks
+    def queue(self, key):
+        entry = self.key_to_defer.setdefault(key, [0, []])
+
+        # If the number of things executing is greater than the maximum
+        # then add a deferred to the list of blocked items
+        # When on of the things currently executing finishes it will callback
+        # this item so that it can continue executing.
+        if entry[0] >= self.max_count:
+            new_defer = defer.Deferred()
+            entry[1].append(new_defer)
+            with PreserveLoggingContext():
+                yield new_defer
+
+        entry[0] += 1
+
+        @contextmanager
+        def _ctx_manager():
+            try:
+                yield
+            finally:
+                # We've finished executing so check if there are any things
+                # blocked waiting to execute and start one of them
+                entry[0] -= 1
+                try:
+                    entry[1].pop(0).callback(None)
+                except IndexError:
+                    # If nothing else is executing for this key then remove it
+                    # from the map
+                    if entry[0] == 0:
+                        self.key_to_defer.pop(key, None)
+
+        defer.returnValue(_ctx_manager())
+
+
 class ReadWriteLock(object):
     """A deferred style read write lock.