diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index a7a2ec4523..cffa831d80 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -20,8 +20,9 @@ import six
from twisted.internet import defer
from twisted.internet.defer import DeferredList
-from synapse.api.constants import MAX_DEPTH, EventTypes, Membership, RoomVersions
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions
from synapse.crypto.event_signing import check_event_content_hash
from synapse.events import event_type_from_format_version
from synapse.events.utils import prune_event
@@ -268,15 +269,29 @@ def _check_sigs_on_pdus(keyring, room_version, pdus):
for p in pdus_to_check_sender
])
+ def sender_err(e, pdu_to_check):
+ errmsg = "event id %s: unable to verify signature for sender %s: %s" % (
+ pdu_to_check.pdu.event_id,
+ pdu_to_check.sender_domain,
+ e.getErrorMessage(),
+ )
+ # XX not really sure if these are the right codes, but they are what
+ # we've done for ages
+ raise SynapseError(400, errmsg, Codes.UNAUTHORIZED)
+
for p, d in zip(pdus_to_check_sender, more_deferreds):
+ d.addErrback(sender_err, p)
p.deferreds.append(d)
# now let's look for events where the sender's domain is different to the
# event id's domain (normally only the case for joins/leaves), and add additional
# checks. Only do this if the room version has a concept of event ID domain
- if room_version in (
- RoomVersions.V1, RoomVersions.V2, RoomVersions.STATE_V2_TEST,
- ):
+ # (ie, the room version uses old-style non-hash event IDs).
+ v = KNOWN_ROOM_VERSIONS.get(room_version)
+ if not v:
+ raise RuntimeError("Unrecognized room version %s" % (room_version,))
+
+ if v.event_format == EventFormatVersions.V1:
pdus_to_check_event_id = [
p for p in pdus_to_check
if p.sender_domain != get_domain_from_id(p.pdu.event_id)
@@ -287,12 +302,19 @@ def _check_sigs_on_pdus(keyring, room_version, pdus):
for p in pdus_to_check_event_id
])
+ def event_err(e, pdu_to_check):
+ errmsg = (
+ "event id %s: unable to verify signature for event id domain: %s" % (
+ pdu_to_check.pdu.event_id,
+ e.getErrorMessage(),
+ )
+ )
+ # XX as above: not really sure if these are the right codes
+ raise SynapseError(400, errmsg, Codes.UNAUTHORIZED)
+
for p, d in zip(pdus_to_check_event_id, more_deferreds):
+ d.addErrback(event_err, p)
p.deferreds.append(d)
- elif room_version in (RoomVersions.V3,):
- pass # No further checks needed, as event IDs are hashes here
- else:
- raise RuntimeError("Unrecognized room version %s" % (room_version,))
# replace lists of deferreds with single Deferreds
return [_flatten_deferred_list(p.deferreds) for p in pdus_to_check]
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 58e04d81ab..f3fc897a0a 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -25,12 +25,7 @@ from prometheus_client import Counter
from twisted.internet import defer
-from synapse.api.constants import (
- KNOWN_ROOM_VERSIONS,
- EventTypes,
- Membership,
- RoomVersions,
-)
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import (
CodeMessageException,
Codes,
@@ -38,6 +33,11 @@ from synapse.api.errors import (
HttpResponseException,
SynapseError,
)
+from synapse.api.room_versions import (
+ KNOWN_ROOM_VERSIONS,
+ EventFormatVersions,
+ RoomVersions,
+)
from synapse.events import builder, room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.util import logcontext, unwrapFirstError
@@ -570,7 +570,7 @@ class FederationClient(FederationBase):
Deferred[tuple[str, FrozenEvent, int]]: resolves to a tuple of
`(origin, event, event_format)` where origin is the remote
homeserver which generated the event, and event_format is one of
- `synapse.api.constants.EventFormatVersions`.
+ `synapse.api.room_versions.EventFormatVersions`.
Fails with a ``SynapseError`` if the chosen remote server
returns a 300/400 code.
@@ -592,7 +592,7 @@ class FederationClient(FederationBase):
# Note: If not supplied, the room version may be either v1 or v2,
# however either way the event format version will be v1.
- room_version = ret.get("room_version", RoomVersions.V1)
+ room_version = ret.get("room_version", RoomVersions.V1.identifier)
event_format = room_version_to_event_format(room_version)
pdu_dict = ret.get("event", None)
@@ -695,7 +695,9 @@ class FederationClient(FederationBase):
room_version = None
for e in state:
if (e.type, e.state_key) == (EventTypes.Create, ""):
- room_version = e.content.get("room_version", RoomVersions.V1)
+ room_version = e.content.get(
+ "room_version", RoomVersions.V1.identifier
+ )
break
if room_version is None:
@@ -802,11 +804,10 @@ class FederationClient(FederationBase):
raise err
# Otherwise, we assume that the remote server doesn't understand
- # the v2 invite API.
-
- if room_version in (RoomVersions.V1, RoomVersions.V2):
- pass # We'll fall through
- else:
+ # the v2 invite API. That's ok provided the room uses old-style event
+ # IDs.
+ v = KNOWN_ROOM_VERSIONS.get(room_version)
+ if v.event_format != EventFormatVersions.V1:
raise SynapseError(
400,
"User's homeserver does not support this room version",
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 81f3b4b1ff..df60828dba 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -25,7 +25,7 @@ from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure
-from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
@@ -34,6 +34,7 @@ from synapse.api.errors import (
NotFoundError,
SynapseError,
)
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.crypto.event_signing import compute_event_signature
from synapse.events import room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 04d04a4457..0240b339b0 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -55,7 +55,12 @@ class FederationRemoteSendQueue(object):
self.is_mine_id = hs.is_mine_id
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
- self.presence_changed = SortedDict() # Stream position -> user_id
+ self.presence_changed = SortedDict() # Stream position -> list[user_id]
+
+ # Stores the destinations we need to explicitly send presence to about a
+ # given user.
+ # Stream position -> (user_id, destinations)
+ self.presence_destinations = SortedDict()
self.keyed_edu = {} # (destination, key) -> EDU
self.keyed_edu_changed = SortedDict() # stream position -> (destination, key)
@@ -77,7 +82,7 @@ class FederationRemoteSendQueue(object):
for queue_name in [
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
- "edus", "device_messages", "pos_time",
+ "edus", "device_messages", "pos_time", "presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
@@ -121,6 +126,15 @@ class FederationRemoteSendQueue(object):
for user_id in uids
)
+ keys = self.presence_destinations.keys()
+ i = self.presence_destinations.bisect_left(position_to_delete)
+ for key in keys[:i]:
+ del self.presence_destinations[key]
+
+ user_ids.update(
+ user_id for user_id, _ in self.presence_destinations.values()
+ )
+
to_del = [
user_id for user_id in self.presence_map if user_id not in user_ids
]
@@ -209,6 +223,20 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
+ def send_presence_to_destinations(self, states, destinations):
+ """As per FederationSender
+
+ Args:
+ states (list[UserPresenceState])
+ destinations (list[str])
+ """
+ for state in states:
+ pos = self._next_pos()
+ self.presence_map.update({state.user_id: state for state in states})
+ self.presence_destinations[pos] = (state.user_id, destinations)
+
+ self.notifier.on_new_replication_data()
+
def send_device_messages(self, destination):
"""As per FederationSender"""
pos = self._next_pos()
@@ -261,6 +289,16 @@ class FederationRemoteSendQueue(object):
state=self.presence_map[user_id],
)))
+ # Fetch presence to send to destinations
+ i = self.presence_destinations.bisect_right(from_token)
+ j = self.presence_destinations.bisect_right(to_token) + 1
+
+ for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
+ rows.append((pos, PresenceDestinationsRow(
+ state=self.presence_map[user_id],
+ destinations=list(dests),
+ )))
+
# Fetch changes keyed edus
i = self.keyed_edu_changed.bisect_right(from_token)
j = self.keyed_edu_changed.bisect_right(to_token) + 1
@@ -357,6 +395,29 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
buff.presence.append(self.state)
+class PresenceDestinationsRow(BaseFederationRow, namedtuple("PresenceDestinationsRow", (
+ "state", # UserPresenceState
+ "destinations", # list[str]
+))):
+ TypeId = "pd"
+
+ @staticmethod
+ def from_data(data):
+ return PresenceDestinationsRow(
+ state=UserPresenceState.from_dict(data["state"]),
+ destinations=data["dests"],
+ )
+
+ def to_data(self):
+ return {
+ "state": self.state.as_dict(),
+ "dests": self.destinations,
+ }
+
+ def add_to_buffer(self, buff):
+ buff.presence_destinations.append((self.state, self.destinations))
+
+
class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
"key", # tuple(str) - the edu key passed to send_edu
"edu", # Edu
@@ -428,6 +489,7 @@ TypeToRow = {
Row.TypeId: Row
for Row in (
PresenceRow,
+ PresenceDestinationsRow,
KeyedEduRow,
EduRow,
DeviceRow,
@@ -437,6 +499,7 @@ TypeToRow = {
ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
"presence", # list(UserPresenceState)
+ "presence_destinations", # list of tuples of UserPresenceState and destinations
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
"device_destinations", # set of destinations
@@ -458,6 +521,7 @@ def process_rows_for_federation(transaction_queue, rows):
buff = ParsedFederationStreamData(
presence=[],
+ presence_destinations=[],
keyed_edus={},
edus={},
device_destinations=set(),
@@ -476,6 +540,11 @@ def process_rows_for_federation(transaction_queue, rows):
if buff.presence:
transaction_queue.send_presence(buff.presence)
+ for state, destinations in buff.presence_destinations:
+ transaction_queue.send_presence_to_destinations(
+ states=[state], destinations=destinations,
+ )
+
for destination, edu_map in iteritems(buff.keyed_edus):
for key, edu in edu_map.items():
transaction_queue.send_edu(edu, key)
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 1dc041752b..4f0f939102 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -371,7 +371,7 @@ class FederationSender(object):
return
# First we queue up the new presence by user ID, so multiple presence
- # updates in quick successtion are correctly handled
+ # updates in quick succession are correctly handled.
# We only want to send presence for our own users, so lets always just
# filter here just in case.
self.pending_presence.update({
@@ -402,6 +402,23 @@ class FederationSender(object):
finally:
self._processing_pending_presence = False
+ def send_presence_to_destinations(self, states, destinations):
+ """Send the given presence states to the given destinations.
+
+ Args:
+ states (list[UserPresenceState])
+ destinations (list[str])
+ """
+
+ if not states or not self.hs.config.use_presence:
+ # No-op if presence is disabled.
+ return
+
+ for destination in destinations:
+ if destination == self.server_name:
+ continue
+ self._get_per_destination_queue(destination).send_presence(states)
+
@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
def _process_presence_inner(self, states):
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index be99211003..fae8bea392 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -33,12 +33,14 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage import UserPresenceState
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
+# This is defined in the Matrix spec and enforced by the receiver.
+MAX_EDUS_PER_TRANSACTION = 100
+
logger = logging.getLogger(__name__)
sent_edus_counter = Counter(
- "synapse_federation_client_sent_edus",
- "Total number of EDUs successfully sent",
+ "synapse_federation_client_sent_edus", "Total number of EDUs successfully sent"
)
sent_edus_by_type = Counter(
@@ -58,6 +60,7 @@ class PerDestinationQueue(object):
destination (str): the server_name of the destination that we are managing
transmission for.
"""
+
def __init__(self, hs, transaction_manager, destination):
self._server_name = hs.hostname
self._clock = hs.get_clock()
@@ -68,17 +71,17 @@ class PerDestinationQueue(object):
self.transmission_loop_running = False
# a list of tuples of (pending pdu, order)
- self._pending_pdus = [] # type: list[tuple[EventBase, int]]
- self._pending_edus = [] # type: list[Edu]
+ self._pending_pdus = [] # type: list[tuple[EventBase, int]]
+ self._pending_edus = [] # type: list[Edu]
# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of (edu_type, key) -> Edu
- self._pending_edus_keyed = {} # type: dict[tuple[str, str], Edu]
+ self._pending_edus_keyed = {} # type: dict[tuple[str, str], Edu]
# Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination
- self._pending_presence = {} # type: dict[str, UserPresenceState]
+ self._pending_presence = {} # type: dict[str, UserPresenceState]
# room_id -> receipt_type -> user_id -> receipt_dict
self._pending_rrs = {}
@@ -120,9 +123,7 @@ class PerDestinationQueue(object):
Args:
states (iterable[UserPresenceState]): presence to send
"""
- self._pending_presence.update({
- state.user_id: state for state in states
- })
+ self._pending_presence.update({state.user_id: state for state in states})
self.attempt_new_transaction()
def queue_read_receipt(self, receipt):
@@ -132,14 +133,9 @@ class PerDestinationQueue(object):
Args:
receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued
"""
- self._pending_rrs.setdefault(
- receipt.room_id, {},
- ).setdefault(
+ self._pending_rrs.setdefault(receipt.room_id, {}).setdefault(
receipt.receipt_type, {}
- )[receipt.user_id] = {
- "event_ids": receipt.event_ids,
- "data": receipt.data,
- }
+ )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data}
def flush_read_receipts_for_room(self, room_id):
# if we don't have any read-receipts for this room, it may be that we've already
@@ -170,10 +166,7 @@ class PerDestinationQueue(object):
# request at which point pending_pdus just keeps growing.
# we need application-layer timeouts of some flavour of these
# requests
- logger.debug(
- "TX [%s] Transaction already in progress",
- self._destination
- )
+ logger.debug("TX [%s] Transaction already in progress", self._destination)
return
logger.debug("TX [%s] Starting transaction loop", self._destination)
@@ -197,7 +190,8 @@ class PerDestinationQueue(object):
pending_pdus = []
while True:
device_message_edus, device_stream_id, dev_list_id = (
- yield self._get_new_device_messages()
+ # We have to keep 2 free slots for presence and rr_edus
+ yield self._get_new_device_messages(MAX_EDUS_PER_TRANSACTION - 2)
)
# BEGIN CRITICAL SECTION
@@ -216,19 +210,9 @@ class PerDestinationQueue(object):
pending_edus = []
- pending_edus.extend(self._get_rr_edus(force_flush=False))
-
# We can only include at most 100 EDUs per transactions
- pending_edus.extend(self._pop_pending_edus(100 - len(pending_edus)))
-
- pending_edus.extend(
- self._pending_edus_keyed.values()
- )
-
- self._pending_edus_keyed = {}
-
- pending_edus.extend(device_message_edus)
-
+ # rr_edus and pending_presence take at most one slot each
+ pending_edus.extend(self._get_rr_edus(force_flush=False))
pending_presence = self._pending_presence
self._pending_presence = {}
if pending_presence:
@@ -248,9 +232,23 @@ class PerDestinationQueue(object):
)
)
+ pending_edus.extend(device_message_edus)
+ pending_edus.extend(
+ self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
+ )
+ while (
+ len(pending_edus) < MAX_EDUS_PER_TRANSACTION
+ and self._pending_edus_keyed
+ ):
+ _, val = self._pending_edus_keyed.popitem()
+ pending_edus.append(val)
+
if pending_pdus:
- logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
- self._destination, len(pending_pdus))
+ logger.debug(
+ "TX [%s] len(pending_pdus_by_dest[dest]) = %d",
+ self._destination,
+ len(pending_pdus),
+ )
if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", self._destination)
@@ -259,7 +257,7 @@ class PerDestinationQueue(object):
# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
- if len(pending_edus) < 100:
+ if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self._get_rr_edus(force_flush=True))
# END CRITICAL SECTION
@@ -303,22 +301,25 @@ class PerDestinationQueue(object):
except HttpResponseException as e:
logger.warning(
"TX [%s] Received %d response to transaction: %s",
- self._destination, e.code, e,
+ self._destination,
+ e.code,
+ e,
)
except RequestSendFailed as e:
- logger.warning("TX [%s] Failed to send transaction: %s", self._destination, e)
+ logger.warning(
+ "TX [%s] Failed to send transaction: %s", self._destination, e
+ )
for p, _ in pending_pdus:
- logger.info("Failed to send event %s to %s", p.event_id,
- self._destination)
+ logger.info(
+ "Failed to send event %s to %s", p.event_id, self._destination
+ )
except Exception:
- logger.exception(
- "TX [%s] Failed to send transaction",
- self._destination,
- )
+ logger.exception("TX [%s] Failed to send transaction", self._destination)
for p, _ in pending_pdus:
- logger.info("Failed to send event %s to %s", p.event_id,
- self._destination)
+ logger.info(
+ "Failed to send event %s to %s", p.event_id, self._destination
+ )
finally:
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False
@@ -346,33 +347,40 @@ class PerDestinationQueue(object):
return pending_edus
@defer.inlineCallbacks
- def _get_new_device_messages(self):
- last_device_stream_id = self._last_device_stream_id
- to_device_stream_id = self._store.get_to_device_stream_token()
- contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
- self._destination, last_device_stream_id, to_device_stream_id
+ def _get_new_device_messages(self, limit):
+ last_device_list = self._last_device_list_stream_id
+ # Will return at most 20 entries
+ now_stream_id, results = yield self._store.get_devices_by_remote(
+ self._destination, last_device_list
)
edus = [
Edu(
origin=self._server_name,
destination=self._destination,
- edu_type="m.direct_to_device",
+ edu_type="m.device_list_update",
content=content,
)
- for content in contents
+ for content in results
]
- last_device_list = self._last_device_list_stream_id
- now_stream_id, results = yield self._store.get_devices_by_remote(
- self._destination, last_device_list
+ assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs"
+
+ last_device_stream_id = self._last_device_stream_id
+ to_device_stream_id = self._store.get_to_device_stream_token()
+ contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
+ self._destination,
+ last_device_stream_id,
+ to_device_stream_id,
+ limit - len(edus),
)
edus.extend(
Edu(
origin=self._server_name,
destination=self._destination,
- edu_type="m.device_list_update",
+ edu_type="m.direct_to_device",
content=content,
)
- for content in results
+ for content in contents
)
+
defer.returnValue((edus, stream_id, now_stream_id))
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index efb6bdca48..9030eb18c5 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -21,8 +21,8 @@ import re
from twisted.internet import defer
import synapse
-from synapse.api.constants import RoomVersions
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
+from synapse.api.room_versions import RoomVersions
from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
from synapse.http.endpoint import parse_and_validate_server_name
from synapse.http.server import JsonResource
@@ -513,7 +513,7 @@ class FederationV1InviteServlet(BaseFederationServlet):
# state resolution algorithm, and we don't use that for processing
# invites
content = yield self.handler.on_invite_request(
- origin, content, room_version=RoomVersions.V1,
+ origin, content, room_version=RoomVersions.V1.identifier,
)
# V1 federation API is defined to return a content of `[200, {...}]`
@@ -716,8 +716,17 @@ class PublicRoomList(BaseFederationServlet):
PATH = "/publicRooms"
+ def __init__(self, handler, authenticator, ratelimiter, server_name, deny_access):
+ super(PublicRoomList, self).__init__(
+ handler, authenticator, ratelimiter, server_name,
+ )
+ self.deny_access = deny_access
+
@defer.inlineCallbacks
def on_GET(self, origin, content, query):
+ if self.deny_access:
+ raise FederationDeniedError(origin)
+
limit = parse_integer_from_args(query, "limit", 0)
since_token = parse_string_from_args(query, "since", None)
include_all_networks = parse_boolean_from_args(
@@ -1417,6 +1426,7 @@ def register_servlets(hs, resource, authenticator, ratelimiter, servlet_groups=N
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
+ deny_access=hs.config.restrict_public_rooms_to_local_users,
).register(resource)
if "group_server" in servlet_groups:
|