diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 24329dd0e3..23278e36b7 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -22,7 +22,6 @@ from typing import (
Callable,
Dict,
List,
- Match,
Optional,
Tuple,
Union,
@@ -100,10 +99,15 @@ class FederationServer(FederationBase):
super().__init__(hs)
self.auth = hs.get_auth()
- self.handler = hs.get_handlers().federation_handler
+ self.handler = hs.get_federation_handler()
self.state = hs.get_state_handler()
self.device_handler = hs.get_device_handler()
+
+ # Ensure the following handlers are loaded since they register callbacks
+ # with FederationHandlerRegistry.
+ hs.get_directory_handler()
+
self._federation_ratelimiter = hs.get_federation_ratelimiter()
self._server_linearizer = Linearizer("fed_server")
@@ -112,7 +116,7 @@ class FederationServer(FederationBase):
# We cache results for transaction with the same ID
self._transaction_resp_cache = ResponseCache(
hs, "fed_txn_handler", timeout_ms=30000
- )
+ ) # type: ResponseCache[Tuple[str, str]]
self.transaction_actions = TransactionActions(self.store)
@@ -120,10 +124,12 @@ class FederationServer(FederationBase):
# We cache responses to state queries, as they take a while and often
# come in waves.
- self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000)
+ self._state_resp_cache = ResponseCache(
+ hs, "state_resp", timeout_ms=30000
+ ) # type: ResponseCache[Tuple[str, str]]
self._state_ids_resp_cache = ResponseCache(
hs, "state_ids_resp", timeout_ms=30000
- )
+ ) # type: ResponseCache[Tuple[str, str]]
self._federation_metrics_domains = (
hs.get_config().federation.federation_metrics_domains
@@ -825,14 +831,14 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool:
return False
-def _acl_entry_matches(server_name: str, acl_entry: str) -> Match:
+def _acl_entry_matches(server_name: str, acl_entry: Any) -> bool:
if not isinstance(acl_entry, str):
logger.warning(
"Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry)
)
return False
regex = glob_to_regex(acl_entry)
- return regex.match(server_name)
+ return bool(regex.match(server_name))
class FederationHandlerRegistry:
@@ -862,7 +868,7 @@ class FederationHandlerRegistry:
self._edu_type_to_instance = {} # type: Dict[str, str]
def register_edu_handler(
- self, edu_type: str, handler: Callable[[str, dict], Awaitable[None]]
+ self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]]
):
"""Sets the handler callable that will be used to handle an incoming
federation EDU of the given type.
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 8e46957d15..5f1bf492c1 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -188,7 +188,7 @@ class FederationRemoteSendQueue:
for key in keys[:i]:
del self.edus[key]
- def notify_new_events(self, current_id):
+ def notify_new_events(self, max_token):
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 8bb17b3a05..604cfd1935 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -40,7 +40,7 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import ReadReceipt
+from synapse.types import ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func
logger = logging.getLogger(__name__)
@@ -154,10 +154,15 @@ class FederationSender:
self._per_destination_queues[destination] = queue
return queue
- def notify_new_events(self, current_id: int) -> None:
+ def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""This gets called when we have some new events we might want to
send out to other servers.
"""
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ current_id = max_token.stream
+
self._last_poked_id = max(current_id, self._last_poked_id)
if self._is_processing:
@@ -297,6 +302,8 @@ class FederationSender:
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
+ assert pdu.internal_metadata.stream_ordering
+
# track the fact that we have a PDU for these destinations,
# to allow us to perform catch-up later on if the remote is unreachable
# for a while.
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index bc99af3fdd..db8e456fe8 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -158,6 +158,7 @@ class PerDestinationQueue:
# yet know if we have anything to catch up (None)
self._pending_pdus.append(pdu)
else:
+ assert pdu.internal_metadata.stream_ordering
self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
self.attempt_new_transaction()
@@ -361,6 +362,7 @@ class PerDestinationQueue:
last_successful_stream_ordering = (
final_pdu.internal_metadata.stream_ordering
)
+ assert last_successful_stream_ordering
await self._store.set_destination_last_successful_stream_ordering(
self._destination, last_successful_stream_ordering
)
|