diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 37ee469fa2..d835c1b038 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -24,6 +24,7 @@ from synapse.api.errors import (
CodeMessageException, HttpResponseException, SynapseError,
)
from synapse.util import unwrapFirstError
+from synapse.util.async import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
from synapse.events import FrozenEvent
@@ -551,6 +552,25 @@ class FederationClient(FederationBase):
raise RuntimeError("Failed to send to any server.")
@defer.inlineCallbacks
+ def get_public_rooms(self, destinations):
+ results_by_server = {}
+
+ @defer.inlineCallbacks
+ def _get_result(s):
+ if s == self.server_name:
+ defer.returnValue()
+
+ try:
+ result = yield self.transport_layer.get_public_rooms(s)
+ results_by_server[s] = result
+ except:
+ logger.exception("Error getting room list from server %r", s)
+
+ yield concurrently_execute(_get_result, destinations, 3)
+
+ defer.returnValue(results_by_server)
+
+ @defer.inlineCallbacks
def query_auth(self, destination, room_id, event_id, local_auth):
"""
Params:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 429ab6ddec..f1d231b9d8 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -388,6 +388,11 @@ class FederationServer(FederationBase):
})
@log_function
+ def on_openid_userinfo(self, token):
+ ts_now_ms = self._clock.time_msec()
+ return self.store.get_user_id_for_open_id_token(token, ts_now_ms)
+
+ @log_function
def _get_persisted_pdu(self, origin, event_id, do_auth=True):
""" Get a PDU from the database with given origin and id.
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 1928da03b3..5787f854d4 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -20,6 +20,7 @@ from .persistence import TransactionActions
from .units import Transaction
from synapse.api.errors import HttpResponseException
+from synapse.util.async import run_on_reactor
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.retryutils import (
@@ -199,6 +200,8 @@ class TransactionQueue(object):
@defer.inlineCallbacks
@log_function
def _attempt_new_transaction(self, destination):
+ yield run_on_reactor()
+
# list of (pending_pdu, deferred, order)
if destination in self.pending_transactions:
# XXX: pending_transactions can get stuck on by a never-ending
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 2237e3413c..ebb698e278 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -179,7 +179,8 @@ class TransportLayerClient(object):
content = yield self.client.get_json(
destination=destination,
path=path,
- retry_on_dns_fail=True,
+ retry_on_dns_fail=False,
+ timeout=20000,
)
defer.returnValue(content)
@@ -225,6 +226,18 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
+ def get_public_rooms(self, remote_server):
+ path = PREFIX + "/publicRooms"
+
+ response = yield self.client.get_json(
+ destination=remote_server,
+ path=path,
+ )
+
+ defer.returnValue(response)
+
+ @defer.inlineCallbacks
+ @log_function
def exchange_third_party_invite(self, destination, room_id, event_dict):
path = PREFIX + "/exchange_third_party_invite/%s" % (room_id,)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index d65a7893d8..a1a334955f 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
from synapse.api.urls import FEDERATION_PREFIX as PREFIX
from synapse.api.errors import Codes, SynapseError
from synapse.http.server import JsonResource
-from synapse.http.servlet import parse_json_object_from_request
+from synapse.http.servlet import parse_json_object_from_request, parse_string
from synapse.util.ratelimitutils import FederationRateLimiter
import functools
@@ -134,10 +134,12 @@ class Authenticator(object):
class BaseFederationServlet(object):
- def __init__(self, handler, authenticator, ratelimiter, server_name):
+ def __init__(self, handler, authenticator, ratelimiter, server_name,
+ room_list_handler):
self.handler = handler
self.authenticator = authenticator
self.ratelimiter = ratelimiter
+ self.room_list_handler = room_list_handler
def _wrap(self, code):
authenticator = self.authenticator
@@ -323,7 +325,7 @@ class FederationSendLeaveServlet(BaseFederationServlet):
class FederationEventAuthServlet(BaseFederationServlet):
- PATH = "/event_auth(?P<context>[^/]*)/(?P<event_id>[^/]*)"
+ PATH = "/event_auth/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
def on_GET(self, origin, content, query, context, event_id):
return self.handler.on_event_auth(origin, context, event_id)
@@ -448,6 +450,94 @@ class On3pidBindServlet(BaseFederationServlet):
return code
+class OpenIdUserInfo(BaseFederationServlet):
+ """
+ Exchange a bearer token for information about a user.
+
+ The response format should be compatible with:
+ http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse
+
+ GET /openid/userinfo?access_token=ABDEFGH HTTP/1.1
+
+ HTTP/1.1 200 OK
+ Content-Type: application/json
+
+ {
+ "sub": "@userpart:example.org",
+ }
+ """
+
+ PATH = "/openid/userinfo"
+
+ @defer.inlineCallbacks
+ def on_GET(self, request):
+ token = parse_string(request, "access_token")
+ if token is None:
+ defer.returnValue((401, {
+ "errcode": "M_MISSING_TOKEN", "error": "Access Token required"
+ }))
+ return
+
+ user_id = yield self.handler.on_openid_userinfo(token)
+
+ if user_id is None:
+ defer.returnValue((401, {
+ "errcode": "M_UNKNOWN_TOKEN",
+ "error": "Access Token unknown or expired"
+ }))
+
+ defer.returnValue((200, {"sub": user_id}))
+
+ # Avoid doing remote HS authorization checks which are done by default by
+ # BaseFederationServlet.
+ def _wrap(self, code):
+ return code
+
+
+class PublicRoomList(BaseFederationServlet):
+ """
+ Fetch the public room list for this server.
+
+ This API returns information in the same format as /publicRooms on the
+ client API, but will only ever include local public rooms and hence is
+ intended for consumption by other home servers.
+
+ GET /publicRooms HTTP/1.1
+
+ HTTP/1.1 200 OK
+ Content-Type: application/json
+
+ {
+ "chunk": [
+ {
+ "aliases": [
+ "#test:localhost"
+ ],
+ "guest_can_join": false,
+ "name": "test room",
+ "num_joined_members": 3,
+ "room_id": "!whkydVegtvatLfXmPN:localhost",
+ "world_readable": false
+ }
+ ],
+ "end": "END",
+ "start": "START"
+ }
+ """
+
+ PATH = "/publicRooms"
+
+ @defer.inlineCallbacks
+ def on_GET(self, request):
+ data = yield self.room_list_handler.get_local_public_room_list()
+ defer.returnValue((200, data))
+
+ # Avoid doing remote HS authorization checks which are done by default by
+ # BaseFederationServlet.
+ def _wrap(self, code):
+ return code
+
+
SERVLET_CLASSES = (
FederationSendServlet,
FederationPullServlet,
@@ -468,6 +558,8 @@ SERVLET_CLASSES = (
FederationClientKeysClaimServlet,
FederationThirdPartyInviteExchangeServlet,
On3pidBindServlet,
+ OpenIdUserInfo,
+ PublicRoomList,
)
@@ -478,4 +570,5 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
+ room_list_handler=hs.get_room_list_handler(),
).register(resource)
|