diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py
index e7517cac4d..95c40c6c1b 100644
--- a/synapse/federation/transport.py
+++ b/synapse/federation/transport.py
@@ -72,7 +72,7 @@ class TransportLayer(object):
self.received_handler = None
@log_function
- def get_context_state(self, destination, context):
+ def get_context_state(self, destination, context, event_id=None):
""" Requests all state for a given context (i.e. room) from the
given server.
@@ -89,54 +89,62 @@ class TransportLayer(object):
subpath = "/state/%s/" % context
- return self._do_request_for_transaction(destination, subpath)
+ args = {}
+ if event_id:
+ args["event_id"] = event_id
+
+ return self._do_request_for_transaction(
+ destination, subpath, args=args
+ )
@log_function
- def get_pdu(self, destination, pdu_origin, pdu_id):
+ def get_event(self, destination, event_id):
""" Requests the pdu with give id and origin from the given server.
Args:
destination (str): The host name of the remote home server we want
to get the state from.
- pdu_origin (str): The home server which created the PDU.
- pdu_id (str): The id of the PDU being requested.
+ event_id (str): The id of the event being requested.
Returns:
Deferred: Results in a dict received from the remote homeserver.
"""
- logger.debug("get_pdu dest=%s, pdu_origin=%s, pdu_id=%s",
- destination, pdu_origin, pdu_id)
+ logger.debug("get_pdu dest=%s, event_id=%s",
+ destination, event_id)
- subpath = "/pdu/%s/%s/" % (pdu_origin, pdu_id)
+ subpath = "/event/%s/" % (event_id, )
return self._do_request_for_transaction(destination, subpath)
@log_function
- def backfill(self, dest, context, pdu_tuples, limit):
+ def backfill(self, dest, context, event_tuples, limit):
""" Requests `limit` previous PDUs in a given context before list of
PDUs.
Args:
dest (str)
context (str)
- pdu_tuples (list)
+ event_tuples (list)
limt (int)
Returns:
Deferred: Results in a dict received from the remote homeserver.
"""
logger.debug(
- "backfill dest=%s, context=%s, pdu_tuples=%s, limit=%s",
- dest, context, repr(pdu_tuples), str(limit)
+ "backfill dest=%s, context=%s, event_tuples=%s, limit=%s",
+ dest, context, repr(event_tuples), str(limit)
)
- if not pdu_tuples:
+ if not event_tuples:
+ # TODO: raise?
return
- subpath = "/backfill/%s/" % context
+ subpath = "/backfill/%s/" % (context,)
- args = {"v": ["%s,%s" % (i, o) for i, o in pdu_tuples]}
- args["limit"] = limit
+ args = {
+ "v": event_tuples,
+ "limit": limit,
+ }
return self._do_request_for_transaction(
dest,
@@ -198,6 +206,72 @@ class TransportLayer(object):
defer.returnValue(response)
@defer.inlineCallbacks
+ @log_function
+ def make_join(self, destination, context, user_id, retry_on_dns_fail=True):
+ path = PREFIX + "/make_join/%s/%s" % (context, user_id,)
+
+ response = yield self.client.get_json(
+ destination=destination,
+ path=path,
+ retry_on_dns_fail=retry_on_dns_fail,
+ )
+
+ defer.returnValue(response)
+
+ @defer.inlineCallbacks
+ @log_function
+ def send_join(self, destination, context, event_id, content):
+ path = PREFIX + "/send_join/%s/%s" % (
+ context,
+ event_id,
+ )
+
+ code, content = yield self.client.put_json(
+ destination=destination,
+ path=path,
+ data=content,
+ )
+
+ if not 200 <= code < 300:
+ raise RuntimeError("Got %d from send_join", code)
+
+ defer.returnValue(json.loads(content))
+
+ @defer.inlineCallbacks
+ @log_function
+ def send_invite(self, destination, context, event_id, content):
+ path = PREFIX + "/invite/%s/%s" % (
+ context,
+ event_id,
+ )
+
+ code, content = yield self.client.put_json(
+ destination=destination,
+ path=path,
+ data=content,
+ )
+
+ if not 200 <= code < 300:
+ raise RuntimeError("Got %d from send_invite", code)
+
+ defer.returnValue(json.loads(content))
+
+ @defer.inlineCallbacks
+ @log_function
+ def get_event_auth(self, destination, context, event_id):
+ path = PREFIX + "/event_auth/%s/%s" % (
+ context,
+ event_id,
+ )
+
+ response = yield self.client.get_json(
+ destination=destination,
+ path=path,
+ )
+
+ defer.returnValue(response)
+
+ @defer.inlineCallbacks
def _authenticate_request(self, request):
json_request = {
"method": request.method,
@@ -210,7 +284,7 @@ class TransportLayer(object):
origin = None
if request.method == "PUT":
- #TODO: Handle other method types? other content types?
+ # TODO: Handle other method types? other content types?
try:
content_bytes = request.content.read()
content = json.loads(content_bytes)
@@ -222,11 +296,13 @@ class TransportLayer(object):
try:
params = auth.split(" ")[1].split(",")
param_dict = dict(kv.split("=") for kv in params)
+
def strip_quotes(value):
if value.startswith("\""):
return value[1:-1]
else:
return value
+
origin = strip_quotes(param_dict["origin"])
key = strip_quotes(param_dict["key"])
sig = strip_quotes(param_dict["sig"])
@@ -247,7 +323,7 @@ class TransportLayer(object):
if auth.startswith("X-Matrix"):
(origin, key, sig) = parse_auth_header(auth)
json_request["origin"] = origin
- json_request["signatures"].setdefault(origin,{})[key] = sig
+ json_request["signatures"].setdefault(origin, {})[key] = sig
if not json_request["signatures"]:
raise SynapseError(
@@ -313,10 +389,10 @@ class TransportLayer(object):
# data_id pair.
self.server.register_path(
"GET",
- re.compile("^" + PREFIX + "/pdu/([^/]*)/([^/]*)/$"),
+ re.compile("^" + PREFIX + "/event/([^/]*)/$"),
self._with_authentication(
- lambda origin, content, query, pdu_origin, pdu_id:
- handler.on_pdu_request(pdu_origin, pdu_id)
+ lambda origin, content, query, event_id:
+ handler.on_pdu_request(origin, event_id)
)
)
@@ -326,7 +402,11 @@ class TransportLayer(object):
re.compile("^" + PREFIX + "/state/([^/]*)/$"),
self._with_authentication(
lambda origin, content, query, context:
- handler.on_context_state_request(context)
+ handler.on_context_state_request(
+ origin,
+ context,
+ query.get("event_id", [None])[0],
+ )
)
)
@@ -336,28 +416,63 @@ class TransportLayer(object):
self._with_authentication(
lambda origin, content, query, context:
self._on_backfill_request(
- context, query["v"], query["limit"]
+ origin, context, query["v"], query["limit"]
)
)
)
+ # This is when we receive a server-server Query
self.server.register_path(
"GET",
- re.compile("^" + PREFIX + "/context/([^/]*)/$"),
+ re.compile("^" + PREFIX + "/query/([^/]*)$"),
self._with_authentication(
- lambda origin, content, query, context:
- handler.on_context_pdus_request(context)
+ lambda origin, content, query, query_type:
+ handler.on_query_request(
+ query_type, {k: v[0] for k, v in query.items()}
+ )
)
)
- # This is when we receive a server-server Query
self.server.register_path(
"GET",
- re.compile("^" + PREFIX + "/query/([^/]*)$"),
+ re.compile("^" + PREFIX + "/make_join/([^/]*)/([^/]*)$"),
self._with_authentication(
- lambda origin, content, query, query_type:
- handler.on_query_request(
- query_type, {k: v[0] for k, v in query.items()}
+ lambda origin, content, query, context, user_id:
+ self._on_make_join_request(
+ origin, content, query, context, user_id
+ )
+ )
+ )
+
+ self.server.register_path(
+ "GET",
+ re.compile("^" + PREFIX + "/event_auth/([^/]*)/([^/]*)$"),
+ self._with_authentication(
+ lambda origin, content, query, context, event_id:
+ handler.on_event_auth(
+ origin, context, event_id,
+ )
+ )
+ )
+
+ self.server.register_path(
+ "PUT",
+ re.compile("^" + PREFIX + "/send_join/([^/]*)/([^/]*)$"),
+ self._with_authentication(
+ lambda origin, content, query, context, event_id:
+ self._on_send_join_request(
+ origin, content, query,
+ )
+ )
+ )
+
+ self.server.register_path(
+ "PUT",
+ re.compile("^" + PREFIX + "/invite/([^/]*)/([^/]*)$"),
+ self._with_authentication(
+ lambda origin, content, query, context, event_id:
+ self._on_invite_request(
+ origin, content, query,
)
)
)
@@ -402,7 +517,8 @@ class TransportLayer(object):
return
try:
- code, response = yield self.received_handler.on_incoming_transaction(
+ handler = self.received_handler
+ code, response = yield handler.on_incoming_transaction(
transaction_data
)
except:
@@ -440,7 +556,7 @@ class TransportLayer(object):
defer.returnValue(data)
@log_function
- def _on_backfill_request(self, context, v_list, limits):
+ def _on_backfill_request(self, origin, context, v_list, limits):
if not limits:
return defer.succeed(
(400, {"error": "Did not include limit param"})
@@ -448,124 +564,34 @@ class TransportLayer(object):
limit = int(limits[-1])
- versions = [v.split(",", 1) for v in v_list]
+ versions = v_list
return self.request_handler.on_backfill_request(
- context, versions, limit)
-
-
-class TransportReceivedHandler(object):
- """ Callbacks used when we receive a transaction
- """
- def on_incoming_transaction(self, transaction):
- """ Called on PUT /send/<transaction_id>, or on response to a request
- that we sent (e.g. a backfill request)
-
- Args:
- transaction (synapse.transaction.Transaction): The transaction that
- was sent to us.
-
- Returns:
- twisted.internet.defer.Deferred: A deferred that gets fired when
- the transaction has finished being processed.
-
- The result should be a tuple in the form of
- `(response_code, respond_body)`, where `response_body` is a python
- dict that will get serialized to JSON.
-
- On errors, the dict should have an `error` key with a brief message
- of what went wrong.
- """
- pass
-
-
-class TransportRequestHandler(object):
- """ Handlers used when someone want's data from us
- """
- def on_pull_request(self, versions):
- """ Called on GET /pull/?v=...
-
- This is hit when a remote home server wants to get all data
- after a given transaction. Mainly used when a home server comes back
- online and wants to get everything it has missed.
-
- Args:
- versions (list): A list of transaction_ids that should be used to
- determine what PDUs the remote side have not yet seen.
-
- Returns:
- Deferred: Resultsin a tuple in the form of
- `(response_code, respond_body)`, where `response_body` is a python
- dict that will get serialized to JSON.
-
- On errors, the dict should have an `error` key with a brief message
- of what went wrong.
- """
- pass
-
- def on_pdu_request(self, pdu_origin, pdu_id):
- """ Called on GET /pdu/<pdu_origin>/<pdu_id>/
-
- Someone wants a particular PDU. This PDU may or may not have originated
- from us.
-
- Args:
- pdu_origin (str)
- pdu_id (str)
-
- Returns:
- Deferred: Resultsin a tuple in the form of
- `(response_code, respond_body)`, where `response_body` is a python
- dict that will get serialized to JSON.
-
- On errors, the dict should have an `error` key with a brief message
- of what went wrong.
- """
- pass
-
- def on_context_state_request(self, context):
- """ Called on GET /state/<context>/
-
- Gets hit when someone wants all the *current* state for a given
- contexts.
-
- Args:
- context (str): The name of the context that we're interested in.
-
- Returns:
- twisted.internet.defer.Deferred: A deferred that gets fired when
- the transaction has finished being processed.
-
- The result should be a tuple in the form of
- `(response_code, respond_body)`, where `response_body` is a python
- dict that will get serialized to JSON.
-
- On errors, the dict should have an `error` key with a brief message
- of what went wrong.
- """
- pass
-
- def on_backfill_request(self, context, versions, limit):
- """ Called on GET /backfill/<context>/?v=...&limit=...
+ origin, context, versions, limit
+ )
- Gets hit when we want to backfill backwards on a given context from
- the given point.
+ @defer.inlineCallbacks
+ @log_function
+ def _on_make_join_request(self, origin, content, query, context, user_id):
+ content = yield self.request_handler.on_make_join_request(
+ context, user_id,
+ )
+ defer.returnValue((200, content))
- Args:
- context (str): The context to backfill
- versions (list): A list of 2-tuples representing where to backfill
- from, in the form `(pdu_id, origin)`
- limit (int): How many pdus to return.
+ @defer.inlineCallbacks
+ @log_function
+ def _on_send_join_request(self, origin, content, query):
+ content = yield self.request_handler.on_send_join_request(
+ origin, content,
+ )
- Returns:
- Deferred: Results in a tuple in the form of
- `(response_code, respond_body)`, where `response_body` is a python
- dict that will get serialized to JSON.
+ defer.returnValue((200, content))
- On errors, the dict should have an `error` key with a brief message
- of what went wrong.
- """
- pass
+ @defer.inlineCallbacks
+ @log_function
+ def _on_invite_request(self, origin, content, query):
+ content = yield self.request_handler.on_invite_request(
+ origin, content,
+ )
- def on_query_request(self):
- """ Called on a GET /query/<query_type> request. """
+ defer.returnValue((200, content))
|