diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 79eaa31031..4cc98a3fe8 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -14,7 +14,10 @@
# limitations under the License.
import logging
-from synapse.api.errors import SynapseError
+import six
+
+from synapse.api.constants import MAX_DEPTH
+from synapse.api.errors import SynapseError, Codes
from synapse.crypto.event_signing import check_event_content_hash
from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
@@ -190,11 +193,23 @@ def event_from_pdu_json(pdu_json, outlier=False):
FrozenEvent
Raises:
- SynapseError: if the pdu is missing required fields
+ SynapseError: if the pdu is missing required fields or is otherwise
+ not a valid matrix event
"""
# we could probably enforce a bunch of other fields here (room_id, sender,
# origin, etc etc)
- assert_params_in_request(pdu_json, ('event_id', 'type'))
+ assert_params_in_request(pdu_json, ('event_id', 'type', 'depth'))
+
+ depth = pdu_json['depth']
+ if not isinstance(depth, six.integer_types):
+ raise SynapseError(400, "Depth %r not an intger" % (depth, ),
+ Codes.BAD_JSON)
+
+ if depth < 0:
+ raise SynapseError(400, "Depth too small", Codes.BAD_JSON)
+ elif depth > MAX_DEPTH:
+ raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
+
event = FrozenEvent(
pdu_json
)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 38440da5b5..2761ffae07 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -19,6 +19,8 @@ import itertools
import logging
import random
+from six.moves import range
+
from twisted.internet import defer
from synapse.api.constants import Membership
@@ -30,20 +32,17 @@ from synapse.federation.federation_base import (
FederationBase,
event_from_pdu_json,
)
-import synapse.metrics
from synapse.util import logcontext, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
-logger = logging.getLogger(__name__)
-
+from prometheus_client import Counter
-# synapse.federation.federation_client is a silly name
-metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
+logger = logging.getLogger(__name__)
-sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
+sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"])
PDU_RETRY_TIME_MS = 1 * 60 * 1000
@@ -106,7 +105,7 @@ class FederationClient(FederationBase):
a Deferred which will eventually yield a JSON object from the
response
"""
- sent_queries_counter.inc(query_type)
+ sent_queries_counter.labels(query_type).inc()
return self.transport_layer.make_query(
destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail,
@@ -125,7 +124,7 @@ class FederationClient(FederationBase):
a Deferred which will eventually yield a JSON object from the
response
"""
- sent_queries_counter.inc("client_device_keys")
+ sent_queries_counter.labels("client_device_keys").inc()
return self.transport_layer.query_client_keys(
destination, content, timeout
)
@@ -135,7 +134,7 @@ class FederationClient(FederationBase):
"""Query the device keys for a list of user ids hosted on a remote
server.
"""
- sent_queries_counter.inc("user_devices")
+ sent_queries_counter.labels("user_devices").inc()
return self.transport_layer.query_user_devices(
destination, user_id, timeout
)
@@ -152,7 +151,7 @@ class FederationClient(FederationBase):
a Deferred which will eventually yield a JSON object from the
response
"""
- sent_queries_counter.inc("client_one_time_keys")
+ sent_queries_counter.labels("client_one_time_keys").inc()
return self.transport_layer.claim_client_keys(
destination, content, timeout
)
@@ -394,7 +393,7 @@ class FederationClient(FederationBase):
seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
signed_events = seen_events.values()
else:
- seen_events = yield self.store.have_events(event_ids)
+ seen_events = yield self.store.have_seen_events(event_ids)
signed_events = []
failed_to_fetch = set()
@@ -413,11 +412,12 @@ class FederationClient(FederationBase):
batch_size = 20
missing_events = list(missing_events)
- for i in xrange(0, len(missing_events), batch_size):
+ for i in range(0, len(missing_events), batch_size):
batch = set(missing_events[i:i + batch_size])
deferreds = [
- preserve_fn(self.get_pdu)(
+ run_in_background(
+ self.get_pdu,
destinations=random_server_list(),
event_id=e_id,
)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index bea7fd0b71..2d420a58a2 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -26,30 +27,32 @@ from synapse.federation.federation_base import (
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
-import synapse.metrics
from synapse.types import get_domain_from_id
from synapse.util import async
from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logutils import log_function
+from prometheus_client import Counter
+
+from six import iteritems
+
# when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit.
TRANSACTION_CONCURRENCY_LIMIT = 10
logger = logging.getLogger(__name__)
-# synapse.federation.federation_server is a silly name
-metrics = synapse.metrics.get_metrics_for("synapse.federation.server")
+received_pdus_counter = Counter("synapse_federation_server_received_pdus", "")
-received_pdus_counter = metrics.register_counter("received_pdus")
+received_edus_counter = Counter("synapse_federation_server_received_edus", "")
-received_edus_counter = metrics.register_counter("received_edus")
-
-received_queries_counter = metrics.register_counter("received_queries", labels=["type"])
+received_queries_counter = Counter(
+ "synapse_federation_server_received_queries", "", ["type"]
+)
class FederationServer(FederationBase):
+
def __init__(self, hs):
super(FederationServer, self).__init__(hs)
@@ -65,7 +68,7 @@ class FederationServer(FederationBase):
# We cache responses to state queries, as they take a while and often
# come in waves.
- self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
+ self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000)
@defer.inlineCallbacks
@log_function
@@ -129,7 +132,7 @@ class FederationServer(FederationBase):
logger.debug("[%s] Transaction is new", transaction.transaction_id)
- received_pdus_counter.inc_by(len(transaction.pdus))
+ received_pdus_counter.inc(len(transaction.pdus))
pdus_by_room = {}
@@ -212,16 +215,17 @@ class FederationServer(FederationBase):
if not in_room:
raise AuthError(403, "Host not in room.")
- result = self._state_resp_cache.get((room_id, event_id))
- if not result:
- with (yield self._server_linearizer.queue((origin, room_id))):
- d = self._state_resp_cache.set(
- (room_id, event_id),
- preserve_fn(self._on_context_state_request_compute)(room_id, event_id)
- )
- resp = yield make_deferred_yieldable(d)
- else:
- resp = yield make_deferred_yieldable(result)
+ # we grab the linearizer to protect ourselves from servers which hammer
+ # us. In theory we might already have the response to this query
+ # in the cache so we could return it without waiting for the linearizer
+ # - but that's non-trivial to get right, and anyway somewhat defeats
+ # the point of the linearizer.
+ with (yield self._server_linearizer.queue((origin, room_id))):
+ resp = yield self._state_resp_cache.wrap(
+ (room_id, event_id),
+ self._on_context_state_request_compute,
+ room_id, event_id,
+ )
defer.returnValue((200, resp))
@@ -289,7 +293,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_query_request(self, query_type, args):
- received_queries_counter.inc(query_type)
+ received_queries_counter.labels(query_type).inc()
resp = yield self.registry.on_query(query_type, args)
defer.returnValue((200, resp))
@@ -425,9 +429,9 @@ class FederationServer(FederationBase):
"Claimed one-time-keys: %s",
",".join((
"%s for %s:%s" % (key_id, user_id, device_id)
- for user_id, user_keys in json_result.iteritems()
- for device_id, device_keys in user_keys.iteritems()
- for key_id, _ in device_keys.iteritems()
+ for user_id, user_keys in iteritems(json_result)
+ for device_id, device_keys in iteritems(user_keys)
+ for key_id, _ in iteritems(device_keys)
)),
)
@@ -494,13 +498,33 @@ class FederationServer(FederationBase):
def _handle_received_pdu(self, origin, pdu):
""" Process a PDU received in a federation /send/ transaction.
+ If the event is invalid, then this method throws a FederationError.
+ (The error will then be logged and sent back to the sender (which
+ probably won't do anything with it), and other events in the
+ transaction will be processed as normal).
+
+ It is likely that we'll then receive other events which refer to
+ this rejected_event in their prev_events, etc. When that happens,
+ we'll attempt to fetch the rejected event again, which will presumably
+ fail, so those second-generation events will also get rejected.
+
+ Eventually, we get to the point where there are more than 10 events
+ between any new events and the original rejected event. Since we
+ only try to backfill 10 events deep on received pdu, we then accept the
+ new event, possibly introducing a discontinuity in the DAG, with new
+ forward extremities, so normal service is approximately returned,
+ until we try to backfill across the discontinuity.
+
Args:
origin (str): server which sent the pdu
pdu (FrozenEvent): received pdu
Returns (Deferred): completes with None
- Raises: FederationError if the signatures / hash do not match
- """
+
+ Raises: FederationError if the signatures / hash do not match, or
+ if the event was unacceptable for any other reason (eg, too large,
+ too many prev_events, couldn't find the prev_events)
+ """
# check that it's actually being sent from a valid destination to
# workaround bug #1753 in 0.18.5 and 0.18.6
if origin != get_domain_from_id(pdu.event_id):
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 93e5acebc1..c7ed465617 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -33,17 +33,16 @@ from .units import Edu
from synapse.storage.presence import UserPresenceState
from synapse.util.metrics import Measure
-import synapse.metrics
+from synapse.metrics import LaterGauge
from blist import sorteddict
from collections import namedtuple
import logging
-logger = logging.getLogger(__name__)
-
+from six import itervalues, iteritems
-metrics = synapse.metrics.get_metrics_for(__name__)
+logger = logging.getLogger(__name__)
class FederationRemoteSendQueue(object):
@@ -75,10 +74,8 @@ class FederationRemoteSendQueue(object):
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(name, queue):
- metrics.register_callback(
- queue_name + "_size",
- lambda: len(queue),
- )
+ LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,),
+ "", lambda: len(queue))
for queue_name in [
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
@@ -122,7 +119,7 @@ class FederationRemoteSendQueue(object):
user_ids = set(
user_id
- for uids in self.presence_changed.itervalues()
+ for uids in itervalues(self.presence_changed)
for user_id in uids
)
@@ -276,7 +273,7 @@ class FederationRemoteSendQueue(object):
# stream position.
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
- for ((destination, edu_key), pos) in keyed_edus.iteritems():
+ for ((destination, edu_key), pos) in iteritems(keyed_edus):
rows.append((pos, KeyedEduRow(
key=edu_key,
edu=self.keyed_edu[(destination, edu_key)],
@@ -309,7 +306,7 @@ class FederationRemoteSendQueue(object):
j = keys.bisect_right(to_token) + 1
device_messages = {self.device_messages[k]: k for k in keys[i:j]}
- for (destination, pos) in device_messages.iteritems():
+ for (destination, pos) in iteritems(device_messages):
rows.append((pos, DeviceRow(
destination=destination,
)))
@@ -528,19 +525,19 @@ def process_rows_for_federation(transaction_queue, rows):
if buff.presence:
transaction_queue.send_presence(buff.presence)
- for destination, edu_map in buff.keyed_edus.iteritems():
+ for destination, edu_map in iteritems(buff.keyed_edus):
for key, edu in edu_map.items():
transaction_queue.send_edu(
edu.destination, edu.edu_type, edu.content, key=key,
)
- for destination, edu_list in buff.edus.iteritems():
+ for destination, edu_list in iteritems(buff.edus):
for edu in edu_list:
transaction_queue.send_edu(
edu.destination, edu.edu_type, edu.content, key=None,
)
- for destination, failure_list in buff.failures.iteritems():
+ for destination, failure_list in iteritems(buff.failures):
for failure in failure_list:
transaction_queue.send_failure(destination, failure)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index a141ec9953..69312ec233 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -26,23 +26,23 @@ from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
import synapse.metrics
+from synapse.metrics import LaterGauge
+from synapse.metrics import (
+ sent_edus_counter,
+ sent_transactions_counter,
+ events_processed_counter,
+)
+
+from prometheus_client import Counter
import logging
logger = logging.getLogger(__name__)
-metrics = synapse.metrics.get_metrics_for(__name__)
-
-client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
-sent_pdus_destination_dist = client_metrics.register_distribution(
- "sent_pdu_destinations"
+sent_pdus_destination_dist = Counter(
+ "synapse_federation_transaction_queue_sent_pdu_destinations", ""
)
-sent_edus_counter = client_metrics.register_counter("sent_edus")
-
-sent_transactions_counter = client_metrics.register_counter("sent_transactions")
-
-events_processed_counter = client_metrics.register_counter("events_processed")
class TransactionQueue(object):
@@ -69,8 +69,10 @@ class TransactionQueue(object):
# done
self.pending_transactions = {}
- metrics.register_callback(
- "pending_destinations",
+ LaterGauge(
+ "synapse_federation_transaction_queue_pending_destinations",
+ "",
+ [],
lambda: len(self.pending_transactions),
)
@@ -94,12 +96,16 @@ class TransactionQueue(object):
# Map of destination -> (edu_type, key) -> Edu
self.pending_edus_keyed_by_dest = edus_keyed = {}
- metrics.register_callback(
- "pending_pdus",
+ LaterGauge(
+ "synapse_federation_transaction_queue_pending_pdus",
+ "",
+ [],
lambda: sum(map(len, pdus.values())),
)
- metrics.register_callback(
- "pending_edus",
+ LaterGauge(
+ "synapse_federation_transaction_queue_pending_edus",
+ "",
+ [],
lambda: (
sum(map(len, edus.values()))
+ sum(map(len, presence.values()))
@@ -169,7 +175,7 @@ class TransactionQueue(object):
while True:
last_token = yield self.store.get_federation_out_pos("events")
next_token, events = yield self.store.get_all_new_events_stream(
- last_token, self._last_poked_id, limit=20,
+ last_token, self._last_poked_id, limit=100,
)
logger.debug("Handling %s -> %s", last_token, next_token)
@@ -177,24 +183,33 @@ class TransactionQueue(object):
if not events and next_token >= self._last_poked_id:
break
- for event in events:
+ @defer.inlineCallbacks
+ def handle_event(event):
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.event_id)
if not is_mine and send_on_behalf_of is None:
- continue
-
- # Get the state from before the event.
- # We need to make sure that this is the state from before
- # the event and not from after it.
- # Otherwise if the last member on a server in a room is
- # banned then it won't receive the event because it won't
- # be in the room after the ban.
- destinations = yield self.state.get_current_hosts_in_room(
- event.room_id, latest_event_ids=[
- prev_id for prev_id, _ in event.prev_events
- ],
- )
+ return
+
+ try:
+ # Get the state from before the event.
+ # We need to make sure that this is the state from before
+ # the event and not from after it.
+ # Otherwise if the last member on a server in a room is
+ # banned then it won't receive the event because it won't
+ # be in the room after the ban.
+ destinations = yield self.state.get_current_hosts_in_room(
+ event.room_id, latest_event_ids=[
+ prev_id for prev_id, _ in event.prev_events
+ ],
+ )
+ except Exception:
+ logger.exception(
+ "Failed to calculate hosts in room for event: %s",
+ event.event_id,
+ )
+ return
+
destinations = set(destinations)
if send_on_behalf_of is not None:
@@ -207,12 +222,41 @@ class TransactionQueue(object):
self._send_pdu(event, destinations)
- events_processed_counter.inc_by(len(events))
+ @defer.inlineCallbacks
+ def handle_room_events(events):
+ for event in events:
+ yield handle_event(event)
+
+ events_by_room = {}
+ for event in events:
+ events_by_room.setdefault(event.room_id, []).append(event)
+
+ yield logcontext.make_deferred_yieldable(defer.gatherResults(
+ [
+ logcontext.run_in_background(handle_room_events, evs)
+ for evs in events_by_room.itervalues()
+ ],
+ consumeErrors=True
+ ))
yield self.store.update_federation_out_pos(
"events", next_token
)
+ if events:
+ now = self.clock.time_msec()
+ ts = yield self.store.get_received_ts(events[-1].event_id)
+
+ synapse.metrics.event_processing_lag.labels(
+ "federation_sender").set(now - ts)
+ synapse.metrics.event_processing_last_ts.labels(
+ "federation_sender").set(ts)
+
+ events_processed_counter.inc(len(events))
+
+ synapse.metrics.event_processing_positions.labels(
+ "federation_sender").set(next_token)
+
finally:
self._is_processing = False
@@ -234,7 +278,7 @@ class TransactionQueue(object):
if not destinations:
return
- sent_pdus_destination_dist.inc_by(len(destinations))
+ sent_pdus_destination_dist.inc(len(destinations))
for destination in destinations:
self.pending_pdus_by_dest.setdefault(destination, []).append(
@@ -282,6 +326,8 @@ class TransactionQueue(object):
break
yield self._process_presence_inner(states_map.values())
+ except Exception:
+ logger.exception("Error sending presence states to servers")
finally:
self._processing_pending_presence = False
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 5488e82985..6db8efa6dd 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -20,6 +21,7 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX
from synapse.util.logutils import log_function
import logging
+import urllib
logger = logging.getLogger(__name__)
@@ -49,7 +51,7 @@ class TransportLayerClient(object):
logger.debug("get_room_state dest=%s, room=%s",
destination, room_id)
- path = PREFIX + "/state/%s/" % room_id
+ path = _create_path(PREFIX, "/state/%s/", room_id)
return self.client.get_json(
destination, path=path, args={"event_id": event_id},
)
@@ -71,7 +73,7 @@ class TransportLayerClient(object):
logger.debug("get_room_state_ids dest=%s, room=%s",
destination, room_id)
- path = PREFIX + "/state_ids/%s/" % room_id
+ path = _create_path(PREFIX, "/state_ids/%s/", room_id)
return self.client.get_json(
destination, path=path, args={"event_id": event_id},
)
@@ -93,7 +95,7 @@ class TransportLayerClient(object):
logger.debug("get_pdu dest=%s, event_id=%s",
destination, event_id)
- path = PREFIX + "/event/%s/" % (event_id, )
+ path = _create_path(PREFIX, "/event/%s/", event_id)
return self.client.get_json(destination, path=path, timeout=timeout)
@log_function
@@ -119,7 +121,7 @@ class TransportLayerClient(object):
# TODO: raise?
return
- path = PREFIX + "/backfill/%s/" % (room_id,)
+ path = _create_path(PREFIX, "/backfill/%s/", room_id)
args = {
"v": event_tuples,
@@ -157,9 +159,11 @@ class TransportLayerClient(object):
# generated by the json_data_callback.
json_data = transaction.get_dict()
+ path = _create_path(PREFIX, "/send/%s/", transaction.transaction_id)
+
response = yield self.client.put_json(
transaction.destination,
- path=PREFIX + "/send/%s/" % transaction.transaction_id,
+ path=path,
data=json_data,
json_data_callback=json_data_callback,
long_retries=True,
@@ -177,7 +181,7 @@ class TransportLayerClient(object):
@log_function
def make_query(self, destination, query_type, args, retry_on_dns_fail,
ignore_backoff=False):
- path = PREFIX + "/query/%s" % query_type
+ path = _create_path(PREFIX, "/query/%s", query_type)
content = yield self.client.get_json(
destination=destination,
@@ -222,7 +226,7 @@ class TransportLayerClient(object):
"make_membership_event called with membership='%s', must be one of %s" %
(membership, ",".join(valid_memberships))
)
- path = PREFIX + "/make_%s/%s/%s" % (membership, room_id, user_id)
+ path = _create_path(PREFIX, "/make_%s/%s/%s", membership, room_id, user_id)
ignore_backoff = False
retry_on_dns_fail = False
@@ -248,7 +252,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_join(self, destination, room_id, event_id, content):
- path = PREFIX + "/send_join/%s/%s" % (room_id, event_id)
+ path = _create_path(PREFIX, "/send_join/%s/%s", room_id, event_id)
response = yield self.client.put_json(
destination=destination,
@@ -261,7 +265,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_leave(self, destination, room_id, event_id, content):
- path = PREFIX + "/send_leave/%s/%s" % (room_id, event_id)
+ path = _create_path(PREFIX, "/send_leave/%s/%s", room_id, event_id)
response = yield self.client.put_json(
destination=destination,
@@ -280,7 +284,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_invite(self, destination, room_id, event_id, content):
- path = PREFIX + "/invite/%s/%s" % (room_id, event_id)
+ path = _create_path(PREFIX, "/invite/%s/%s", room_id, event_id)
response = yield self.client.put_json(
destination=destination,
@@ -322,7 +326,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def exchange_third_party_invite(self, destination, room_id, event_dict):
- path = PREFIX + "/exchange_third_party_invite/%s" % (room_id,)
+ path = _create_path(PREFIX, "/exchange_third_party_invite/%s", room_id,)
response = yield self.client.put_json(
destination=destination,
@@ -335,7 +339,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def get_event_auth(self, destination, room_id, event_id):
- path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id)
+ path = _create_path(PREFIX, "/event_auth/%s/%s", room_id, event_id)
content = yield self.client.get_json(
destination=destination,
@@ -347,7 +351,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_query_auth(self, destination, room_id, event_id, content):
- path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id)
+ path = _create_path(PREFIX, "/query_auth/%s/%s", room_id, event_id)
content = yield self.client.post_json(
destination=destination,
@@ -409,7 +413,7 @@ class TransportLayerClient(object):
Returns:
A dict containg the device keys.
"""
- path = PREFIX + "/user/devices/" + user_id
+ path = _create_path(PREFIX, "/user/devices/%s", user_id)
content = yield self.client.get_json(
destination=destination,
@@ -459,7 +463,7 @@ class TransportLayerClient(object):
@log_function
def get_missing_events(self, destination, room_id, earliest_events,
latest_events, limit, min_depth, timeout):
- path = PREFIX + "/get_missing_events/%s" % (room_id,)
+ path = _create_path(PREFIX, "/get_missing_events/%s", room_id,)
content = yield self.client.post_json(
destination=destination,
@@ -479,7 +483,7 @@ class TransportLayerClient(object):
def get_group_profile(self, destination, group_id, requester_user_id):
"""Get a group profile
"""
- path = PREFIX + "/groups/%s/profile" % (group_id,)
+ path = _create_path(PREFIX, "/groups/%s/profile", group_id,)
return self.client.get_json(
destination=destination,
@@ -498,7 +502,7 @@ class TransportLayerClient(object):
requester_user_id (str)
content (dict): The new profile of the group
"""
- path = PREFIX + "/groups/%s/profile" % (group_id,)
+ path = _create_path(PREFIX, "/groups/%s/profile", group_id,)
return self.client.post_json(
destination=destination,
@@ -512,7 +516,7 @@ class TransportLayerClient(object):
def get_group_summary(self, destination, group_id, requester_user_id):
"""Get a group summary
"""
- path = PREFIX + "/groups/%s/summary" % (group_id,)
+ path = _create_path(PREFIX, "/groups/%s/summary", group_id,)
return self.client.get_json(
destination=destination,
@@ -525,7 +529,7 @@ class TransportLayerClient(object):
def get_rooms_in_group(self, destination, group_id, requester_user_id):
"""Get all rooms in a group
"""
- path = PREFIX + "/groups/%s/rooms" % (group_id,)
+ path = _create_path(PREFIX, "/groups/%s/rooms", group_id,)
return self.client.get_json(
destination=destination,
@@ -538,7 +542,7 @@ class TransportLayerClient(object):
content):
"""Add a room to a group
"""
- path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,)
+ path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,)
return self.client.post_json(
destination=destination,
@@ -552,7 +556,10 @@ class TransportLayerClient(object):
config_key, content):
"""Update room in group
"""
- path = PREFIX + "/groups/%s/room/%s/config/%s" % (group_id, room_id, config_key,)
+ path = _create_path(
+ PREFIX, "/groups/%s/room/%s/config/%s",
+ group_id, room_id, config_key,
+ )
return self.client.post_json(
destination=destination,
@@ -565,7 +572,7 @@ class TransportLayerClient(object):
def remove_room_from_group(self, destination, group_id, requester_user_id, room_id):
"""Remove a room from a group
"""
- path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,)
+ path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,)
return self.client.delete_json(
destination=destination,
@@ -578,7 +585,7 @@ class TransportLayerClient(object):
def get_users_in_group(self, destination, group_id, requester_user_id):
"""Get users in a group
"""
- path = PREFIX + "/groups/%s/users" % (group_id,)
+ path = _create_path(PREFIX, "/groups/%s/users", group_id,)
return self.client.get_json(
destination=destination,
@@ -591,7 +598,7 @@ class TransportLayerClient(object):
def get_invited_users_in_group(self, destination, group_id, requester_user_id):
"""Get users that have been invited to a group
"""
- path = PREFIX + "/groups/%s/invited_users" % (group_id,)
+ path = _create_path(PREFIX, "/groups/%s/invited_users", group_id,)
return self.client.get_json(
destination=destination,
@@ -604,7 +611,23 @@ class TransportLayerClient(object):
def accept_group_invite(self, destination, group_id, user_id, content):
"""Accept a group invite
"""
- path = PREFIX + "/groups/%s/users/%s/accept_invite" % (group_id, user_id)
+ path = _create_path(
+ PREFIX, "/groups/%s/users/%s/accept_invite",
+ group_id, user_id,
+ )
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
+ def join_group(self, destination, group_id, user_id, content):
+ """Attempts to join a group
+ """
+ path = _create_path(PREFIX, "/groups/%s/users/%s/join", group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -617,7 +640,7 @@ class TransportLayerClient(object):
def invite_to_group(self, destination, group_id, user_id, requester_user_id, content):
"""Invite a user to a group
"""
- path = PREFIX + "/groups/%s/users/%s/invite" % (group_id, user_id)
+ path = _create_path(PREFIX, "/groups/%s/users/%s/invite", group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -633,7 +656,7 @@ class TransportLayerClient(object):
invited.
"""
- path = PREFIX + "/groups/local/%s/users/%s/invite" % (group_id, user_id)
+ path = _create_path(PREFIX, "/groups/local/%s/users/%s/invite", group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -647,7 +670,7 @@ class TransportLayerClient(object):
user_id, content):
"""Remove a user fron a group
"""
- path = PREFIX + "/groups/%s/users/%s/remove" % (group_id, user_id)
+ path = _create_path(PREFIX, "/groups/%s/users/%s/remove", group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -664,7 +687,7 @@ class TransportLayerClient(object):
kicked from the group.
"""
- path = PREFIX + "/groups/local/%s/users/%s/remove" % (group_id, user_id)
+ path = _create_path(PREFIX, "/groups/local/%s/users/%s/remove", group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -679,7 +702,7 @@ class TransportLayerClient(object):
the attestations
"""
- path = PREFIX + "/groups/%s/renew_attestation/%s" % (group_id, user_id)
+ path = _create_path(PREFIX, "/groups/%s/renew_attestation/%s", group_id, user_id)
return self.client.post_json(
destination=destination,
@@ -694,11 +717,12 @@ class TransportLayerClient(object):
"""Update a room entry in a group summary
"""
if category_id:
- path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % (
+ path = _create_path(
+ PREFIX, "/groups/%s/summary/categories/%s/rooms/%s",
group_id, category_id, room_id,
)
else:
- path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,)
+ path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,)
return self.client.post_json(
destination=destination,
@@ -714,11 +738,12 @@ class TransportLayerClient(object):
"""Delete a room entry in a group summary
"""
if category_id:
- path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % (
+ path = _create_path(
+ PREFIX + "/groups/%s/summary/categories/%s/rooms/%s",
group_id, category_id, room_id,
)
else:
- path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,)
+ path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,)
return self.client.delete_json(
destination=destination,
@@ -731,7 +756,7 @@ class TransportLayerClient(object):
def get_group_categories(self, destination, group_id, requester_user_id):
"""Get all categories in a group
"""
- path = PREFIX + "/groups/%s/categories" % (group_id,)
+ path = _create_path(PREFIX, "/groups/%s/categories", group_id,)
return self.client.get_json(
destination=destination,
@@ -744,7 +769,7 @@ class TransportLayerClient(object):
def get_group_category(self, destination, group_id, requester_user_id, category_id):
"""Get category info in a group
"""
- path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
+ path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
return self.client.get_json(
destination=destination,
@@ -758,7 +783,7 @@ class TransportLayerClient(object):
content):
"""Update a category in a group
"""
- path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
+ path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
return self.client.post_json(
destination=destination,
@@ -773,7 +798,7 @@ class TransportLayerClient(object):
category_id):
"""Delete a category in a group
"""
- path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
+ path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
return self.client.delete_json(
destination=destination,
@@ -786,7 +811,7 @@ class TransportLayerClient(object):
def get_group_roles(self, destination, group_id, requester_user_id):
"""Get all roles in a group
"""
- path = PREFIX + "/groups/%s/roles" % (group_id,)
+ path = _create_path(PREFIX, "/groups/%s/roles", group_id,)
return self.client.get_json(
destination=destination,
@@ -799,7 +824,7 @@ class TransportLayerClient(object):
def get_group_role(self, destination, group_id, requester_user_id, role_id):
"""Get a roles info
"""
- path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
+ path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
return self.client.get_json(
destination=destination,
@@ -813,7 +838,7 @@ class TransportLayerClient(object):
content):
"""Update a role in a group
"""
- path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
+ path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
return self.client.post_json(
destination=destination,
@@ -827,7 +852,7 @@ class TransportLayerClient(object):
def delete_group_role(self, destination, group_id, requester_user_id, role_id):
"""Delete a role in a group
"""
- path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
+ path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
return self.client.delete_json(
destination=destination,
@@ -842,11 +867,12 @@ class TransportLayerClient(object):
"""Update a users entry in a group
"""
if role_id:
- path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % (
+ path = _create_path(
+ PREFIX, "/groups/%s/summary/roles/%s/users/%s",
group_id, role_id, user_id,
)
else:
- path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,)
+ path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,)
return self.client.post_json(
destination=destination,
@@ -857,16 +883,32 @@ class TransportLayerClient(object):
)
@log_function
+ def set_group_join_policy(self, destination, group_id, requester_user_id,
+ content):
+ """Sets the join policy for a group
+ """
+ path = _create_path(PREFIX, "/groups/%s/settings/m.join_policy", group_id,)
+
+ return self.client.put_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
def delete_group_summary_user(self, destination, group_id, requester_user_id,
user_id, role_id):
"""Delete a users entry in a group
"""
if role_id:
- path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % (
+ path = _create_path(
+ PREFIX, "/groups/%s/summary/roles/%s/users/%s",
group_id, role_id, user_id,
)
else:
- path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,)
+ path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,)
return self.client.delete_json(
destination=destination,
@@ -889,3 +931,22 @@ class TransportLayerClient(object):
data=content,
ignore_backoff=True,
)
+
+
+def _create_path(prefix, path, *args):
+ """Creates a path from the prefix, path template and args. Ensures that
+ all args are url encoded.
+
+ Example:
+
+ _create_path(PREFIX, "/event/%s/", event_id)
+
+ Args:
+ prefix (str)
+ path (str): String template for the path
+ args: ([str]): Args to insert into path. Each arg will be url encoded
+
+ Returns:
+ str
+ """
+ return prefix + path % tuple(urllib.quote(arg, "") for arg in args)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a66a6b0692..19d09f5422 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -24,7 +25,7 @@ from synapse.http.servlet import (
)
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import run_in_background
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
import functools
@@ -93,12 +94,6 @@ class Authenticator(object):
"signatures": {},
}
- if (
- self.federation_domain_whitelist is not None and
- self.server_name not in self.federation_domain_whitelist
- ):
- raise FederationDeniedError(self.server_name)
-
if content is not None:
json_request["content"] = content
@@ -137,6 +132,12 @@ class Authenticator(object):
json_request["origin"] = origin
json_request["signatures"].setdefault(origin, {})[key] = sig
+ if (
+ self.federation_domain_whitelist is not None and
+ origin not in self.federation_domain_whitelist
+ ):
+ raise FederationDeniedError(origin)
+
if not json_request["signatures"]:
raise NoAuthenticationError(
401, "Missing Authorization headers", Codes.UNAUTHORIZED,
@@ -151,11 +152,18 @@ class Authenticator(object):
# alive
retry_timings = yield self.store.get_destination_retry_timings(origin)
if retry_timings and retry_timings["retry_last_ts"]:
- logger.info("Marking origin %r as up", origin)
- preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0)
+ run_in_background(self._reset_retry_timings, origin)
defer.returnValue(origin)
+ @defer.inlineCallbacks
+ def _reset_retry_timings(self, origin):
+ try:
+ logger.info("Marking origin %r as up", origin)
+ yield self.store.set_destination_retry_timings(origin, 0, 0)
+ except Exception:
+ logger.exception("Error resetting retry timings on %s", origin)
+
class BaseFederationServlet(object):
REQUIRE_AUTH = True
@@ -802,6 +810,23 @@ class FederationGroupsAcceptInviteServlet(BaseFederationServlet):
defer.returnValue((200, new_content))
+class FederationGroupsJoinServlet(BaseFederationServlet):
+ """Attempt to join a group
+ """
+ PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join$"
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query, group_id, user_id):
+ if get_domain_from_id(user_id) != origin:
+ raise SynapseError(403, "user_id doesn't match origin")
+
+ new_content = yield self.handler.join_group(
+ group_id, user_id, content,
+ )
+
+ defer.returnValue((200, new_content))
+
+
class FederationGroupsRemoveUserServlet(BaseFederationServlet):
"""Leave or kick a user from the group
"""
@@ -1124,6 +1149,24 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet):
defer.returnValue((200, resp))
+class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
+ """Sets whether a group is joinable without an invite or knock
+ """
+ PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy$"
+
+ @defer.inlineCallbacks
+ def on_PUT(self, origin, content, query, group_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ new_content = yield self.handler.set_group_join_policy(
+ group_id, requester_user_id, content
+ )
+
+ defer.returnValue((200, new_content))
+
+
FEDERATION_SERVLET_CLASSES = (
FederationSendServlet,
FederationPullServlet,
@@ -1163,6 +1206,7 @@ GROUP_SERVER_SERVLET_CLASSES = (
FederationGroupsInvitedUsersServlet,
FederationGroupsInviteServlet,
FederationGroupsAcceptInviteServlet,
+ FederationGroupsJoinServlet,
FederationGroupsRemoveUserServlet,
FederationGroupsSummaryRoomsServlet,
FederationGroupsCategoriesServlet,
@@ -1172,6 +1216,7 @@ GROUP_SERVER_SERVLET_CLASSES = (
FederationGroupsSummaryUsersServlet,
FederationGroupsAddRoomsServlet,
FederationGroupsAddRoomsConfigServlet,
+ FederationGroupsSettingJoinPolicyServlet,
)
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 3f645acc43..01c5b8fe17 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -74,8 +74,6 @@ class Transaction(JsonEncodedObject):
"previous_ids",
"pdus",
"edus",
- "transaction_id",
- "destination",
"pdu_failures",
]
|