diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index af652a7659..f99d17a7de 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -17,6 +17,7 @@
import copy
import itertools
import logging
+from typing import Dict, Iterable
from prometheus_client import Counter
@@ -29,6 +30,7 @@ from synapse.api.errors import (
FederationDeniedError,
HttpResponseException,
SynapseError,
+ UnsupportedRoomVersionError,
)
from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
@@ -196,7 +198,7 @@ class FederationClient(FederationBase):
logger.debug("backfill transaction_data=%r", transaction_data)
- room_version = yield self.store.get_room_version(room_id)
+ room_version = yield self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
pdus = [
@@ -334,7 +336,7 @@ class FederationClient(FederationBase):
def get_event_auth(self, destination, room_id, event_id):
res = yield self.transport_layer.get_event_auth(destination, room_id, event_id)
- room_version = yield self.store.get_room_version(room_id)
+ room_version = yield self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
auth_chain = [
@@ -385,6 +387,8 @@ class FederationClient(FederationBase):
return res
except InvalidResponseError as e:
logger.warning("Failed to %s via %s: %s", description, destination, e)
+ except UnsupportedRoomVersionError:
+ raise
except HttpResponseException as e:
if not 500 <= e.code < 600:
raise e.to_synapse_error()
@@ -404,7 +408,13 @@ class FederationClient(FederationBase):
raise SynapseError(502, "Failed to %s via any server" % (description,))
def make_membership_event(
- self, destinations, room_id, user_id, membership, content, params
+ self,
+ destinations: Iterable[str],
+ room_id: str,
+ user_id: str,
+ membership: str,
+ content: dict,
+ params: Dict[str, str],
):
"""
Creates an m.room.member event, with context, without participating in the room.
@@ -417,21 +427,23 @@ class FederationClient(FederationBase):
Note that this does not append any events to any graphs.
Args:
- destinations (Iterable[str]): Candidate homeservers which are probably
+ destinations: Candidate homeservers which are probably
participating in the room.
- room_id (str): The room in which the event will happen.
- user_id (str): The user whose membership is being evented.
- membership (str): The "membership" property of the event. Must be
- one of "join" or "leave".
- content (dict): Any additional data to put into the content field
- of the event.
- params (dict[str, str|Iterable[str]]): Query parameters to include in the
- request.
+ room_id: The room in which the event will happen.
+ user_id: The user whose membership is being evented.
+ membership: The "membership" property of the event. Must be one of
+ "join" or "leave".
+ content: Any additional data to put into the content field of the
+ event.
+ params: Query parameters to include in the request.
Return:
- Deferred[tuple[str, FrozenEvent, int]]: resolves to a tuple of
- `(origin, event, event_format)` where origin is the remote
- homeserver which generated the event, and event_format is one of
- `synapse.api.room_versions.EventFormatVersions`.
+ Deferred[Tuple[str, FrozenEvent, RoomVersion]]: resolves to a tuple of
+ `(origin, event, room_version)` where origin is the remote
+ homeserver which generated the event, and room_version is the
+ version of the room.
+
+ Fails with a `UnsupportedRoomVersionError` if remote responds with
+ a room version we don't understand.
Fails with a ``SynapseError`` if the chosen remote server
returns a 300/400 code.
@@ -453,8 +465,10 @@ class FederationClient(FederationBase):
# Note: If not supplied, the room version may be either v1 or v2,
# however either way the event format version will be v1.
- room_version = ret.get("room_version", RoomVersions.V1.identifier)
- event_format = room_version_to_event_format(room_version)
+ room_version_id = ret.get("room_version", RoomVersions.V1.identifier)
+ room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
+ if not room_version:
+ raise UnsupportedRoomVersionError()
pdu_dict = ret.get("event", None)
if not isinstance(pdu_dict, dict):
@@ -474,11 +488,11 @@ class FederationClient(FederationBase):
self._clock,
self.hostname,
self.signing_key,
- format_version=event_format,
+ room_version=room_version,
event_dict=pdu_dict,
)
- return (destination, ev, event_format)
+ return (destination, ev, room_version)
return self._try_destination_list(
"make_" + membership, destinations, send_request
@@ -633,7 +647,7 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
def send_invite(self, destination, room_id, event_id, pdu):
- room_version = yield self.store.get_room_version(room_id)
+ room_version = yield self.store.get_room_version_id(room_id)
content = yield self._do_send_invite(destination, pdu, room_version)
@@ -641,7 +655,7 @@ class FederationClient(FederationBase):
logger.debug("Got response to send_invite: %s", pdu_dict)
- room_version = yield self.store.get_room_version(room_id)
+ room_version = yield self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(pdu_dict, format_ver)
@@ -843,7 +857,7 @@ class FederationClient(FederationBase):
timeout=timeout,
)
- room_version = yield self.store.get_room_version(room_id)
+ room_version = yield self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
events = [
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 8eddb3bf2c..a4c97ed458 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -234,7 +234,7 @@ class FederationServer(FederationBase):
continue
try:
- room_version = await self.store.get_room_version(room_id)
+ room_version = await self.store.get_room_version_id(room_id)
except NotFoundError:
logger.info("Ignoring PDU for unknown room_id: %s", room_id)
continue
@@ -334,7 +334,7 @@ class FederationServer(FederationBase):
)
)
- room_version = await self.store.get_room_version(room_id)
+ room_version = await self.store.get_room_version_id(room_id)
resp["room_version"] = room_version
return 200, resp
@@ -385,7 +385,7 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
- room_version = await self.store.get_room_version(room_id)
+ room_version = await self.store.get_room_version_id(room_id)
if room_version not in supported_versions:
logger.warning(
"Room version %s not in %s", room_version, supported_versions
@@ -410,14 +410,14 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, pdu.room_id)
pdu = await self._check_sigs_and_hash(room_version, pdu)
- ret_pdu = await self.handler.on_invite_request(origin, pdu)
+ ret_pdu = await self.handler.on_invite_request(origin, pdu, room_version)
time_now = self._clock.time_msec()
return {"event": ret_pdu.get_pdu_json(time_now)}
async def on_send_join_request(self, origin, content, room_id):
logger.debug("on_send_join_request: content: %s", content)
- room_version = await self.store.get_room_version(room_id)
+ room_version = await self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(content, format_ver)
@@ -440,7 +440,7 @@ class FederationServer(FederationBase):
await self.check_server_matches_acl(origin_host, room_id)
pdu = await self.handler.on_make_leave_request(origin, room_id, user_id)
- room_version = await self.store.get_room_version(room_id)
+ room_version = await self.store.get_room_version_id(room_id)
time_now = self._clock.time_msec()
return {"event": pdu.get_pdu_json(time_now), "room_version": room_version}
@@ -448,7 +448,7 @@ class FederationServer(FederationBase):
async def on_send_leave_request(self, origin, content, room_id):
logger.debug("on_send_leave_request: content: %s", content)
- room_version = await self.store.get_room_version(room_id)
+ room_version = await self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
pdu = event_from_pdu_json(content, format_ver)
@@ -495,7 +495,7 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
- room_version = await self.store.get_room_version(room_id)
+ room_version = await self.store.get_room_version_id(room_id)
format_ver = room_version_to_event_format(room_version)
auth_chain = [
@@ -664,7 +664,7 @@ class FederationServer(FederationBase):
logger.info("Accepting join PDU %s from %s", pdu.event_id, origin)
# We've already checked that we know the room version by this point
- room_version = await self.store.get_room_version(pdu.room_id)
+ room_version = await self.store.get_room_version_id(pdu.room_id)
# Check signature.
try:
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 174f6e42be..001bb304ae 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -69,8 +69,6 @@ class FederationRemoteSendQueue(object):
self.edus = SortedDict() # stream position -> Edu
- self.device_messages = SortedDict() # stream position -> destination
-
self.pos = 1
self.pos_time = SortedDict()
@@ -92,7 +90,6 @@ class FederationRemoteSendQueue(object):
"keyed_edu",
"keyed_edu_changed",
"edus",
- "device_messages",
"pos_time",
"presence_destinations",
]:
@@ -171,12 +168,6 @@ class FederationRemoteSendQueue(object):
for key in keys[:i]:
del self.edus[key]
- # Delete things out of device map
- keys = self.device_messages.keys()
- i = self.device_messages.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.device_messages[key]
-
def notify_new_events(self, current_id):
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
@@ -249,9 +240,8 @@ class FederationRemoteSendQueue(object):
def send_device_messages(self, destination):
"""As per FederationSender"""
- pos = self._next_pos()
- self.device_messages[pos] = destination
- self.notifier.on_new_replication_data()
+ # We don't need to replicate this as it gets sent down a different
+ # stream.
def get_current_token(self):
return self.pos - 1
@@ -339,14 +329,6 @@ class FederationRemoteSendQueue(object):
for (pos, edu) in edus:
rows.append((pos, EduRow(edu)))
- # Fetch changed device messages
- i = self.device_messages.bisect_right(from_token)
- j = self.device_messages.bisect_right(to_token) + 1
- device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
-
- for (destination, pos) in iteritems(device_messages):
- rows.append((pos, DeviceRow(destination=destination)))
-
# Sort rows based on pos
rows.sort()
@@ -472,28 +454,9 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
buff.edus.setdefault(self.edu.destination, []).append(self.edu)
-class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ("destination",))): # str
- """Streams the fact that either a) there is pending to device messages for
- users on the remote, or b) a local users device has changed and needs to
- be sent to the remote.
- """
-
- TypeId = "d"
-
- @staticmethod
- def from_data(data):
- return DeviceRow(destination=data["destination"])
-
- def to_data(self):
- return {"destination": self.destination}
-
- def add_to_buffer(self, buff):
- buff.device_destinations.add(self.destination)
-
-
TypeToRow = {
Row.TypeId: Row
- for Row in (PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow, DeviceRow)
+ for Row in (PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow,)
}
@@ -504,7 +467,6 @@ ParsedFederationStreamData = namedtuple(
"presence_destinations", # list of tuples of UserPresenceState and destinations
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
- "device_destinations", # set of destinations
),
)
@@ -523,11 +485,7 @@ def process_rows_for_federation(transaction_queue, rows):
# them into the appropriate collection and then send them off.
buff = ParsedFederationStreamData(
- presence=[],
- presence_destinations=[],
- keyed_edus={},
- edus={},
- device_destinations=set(),
+ presence=[], presence_destinations=[], keyed_edus={}, edus={},
)
# Parse the rows in the stream and add to the buffer
@@ -555,6 +513,3 @@ def process_rows_for_federation(transaction_queue, rows):
for destination, edu_list in iteritems(buff.edus):
for edu in edu_list:
transaction_queue.send_edu(edu, None)
-
- for destination in buff.device_destinations:
- transaction_queue.send_device_messages(destination)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 198257414b..dc563538de 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -15,6 +15,7 @@
# limitations under the License.
import logging
+from typing import Any, Dict
from six.moves import urllib
@@ -352,7 +353,9 @@ class TransportLayerClient(object):
else:
path = _create_v1_path("/publicRooms")
- args = {"include_all_networks": "true" if include_all_networks else "false"}
+ args = {
+ "include_all_networks": "true" if include_all_networks else "false"
+ } # type: Dict[str, Any]
if third_party_instance_id:
args["third_party_instance_id"] = (third_party_instance_id,)
if limit:
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index d8cf9ed299..125eadd796 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -18,6 +18,7 @@
import functools
import logging
import re
+from typing import Optional, Tuple, Type
from twisted.internet.defer import maybeDeferred
@@ -267,6 +268,8 @@ class BaseFederationServlet(object):
returned.
"""
+ PATH = "" # Overridden in subclasses, the regex to match against the path.
+
REQUIRE_AUTH = True
PREFIX = FEDERATION_V1_PREFIX # Allows specifying the API version
@@ -347,9 +350,6 @@ class BaseFederationServlet(object):
return response
- # Extra logic that functools.wraps() doesn't finish
- new_func.__self__ = func.__self__
-
return new_func
def register(self, server):
@@ -824,7 +824,7 @@ class PublicRoomList(BaseFederationServlet):
if not self.allow_access:
raise FederationDeniedError(origin)
- limit = int(content.get("limit", 100))
+ limit = int(content.get("limit", 100)) # type: Optional[int]
since_token = content.get("since", None)
search_filter = content.get("filter", None)
@@ -971,7 +971,7 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet):
if get_domain_from_id(requester_user_id) != origin:
raise SynapseError(403, "requester_user_id doesn't match origin")
- result = await self.groups_handler.update_room_in_group(
+ result = await self.handler.update_room_in_group(
group_id, requester_user_id, room_id, config_key, content
)
@@ -1422,11 +1422,13 @@ FEDERATION_SERVLET_CLASSES = (
On3pidBindServlet,
FederationVersionServlet,
RoomComplexityServlet,
-)
+) # type: Tuple[Type[BaseFederationServlet], ...]
-OPENID_SERVLET_CLASSES = (OpenIdUserInfo,)
+OPENID_SERVLET_CLASSES = (
+ OpenIdUserInfo,
+) # type: Tuple[Type[BaseFederationServlet], ...]
-ROOM_LIST_CLASSES = (PublicRoomList,)
+ROOM_LIST_CLASSES = (PublicRoomList,) # type: Tuple[Type[PublicRoomList], ...]
GROUP_SERVER_SERVLET_CLASSES = (
FederationGroupsProfileServlet,
@@ -1447,17 +1449,19 @@ GROUP_SERVER_SERVLET_CLASSES = (
FederationGroupsAddRoomsServlet,
FederationGroupsAddRoomsConfigServlet,
FederationGroupsSettingJoinPolicyServlet,
-)
+) # type: Tuple[Type[BaseFederationServlet], ...]
GROUP_LOCAL_SERVLET_CLASSES = (
FederationGroupsLocalInviteServlet,
FederationGroupsRemoveLocalUserServlet,
FederationGroupsBulkPublicisedServlet,
-)
+) # type: Tuple[Type[BaseFederationServlet], ...]
-GROUP_ATTESTATION_SERVLET_CLASSES = (FederationGroupsRenewAttestaionServlet,)
+GROUP_ATTESTATION_SERVLET_CLASSES = (
+ FederationGroupsRenewAttestaionServlet,
+) # type: Tuple[Type[BaseFederationServlet], ...]
DEFAULT_SERVLET_GROUPS = (
"federation",
|