diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 5a92428f56..d05ed91d64 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -209,8 +209,6 @@ class FederationClient(FederationBase):
Will attempt to get the PDU from each destination in the list until
one succeeds.
- This will persist the PDU locally upon receipt.
-
Args:
destinations (list): Which home servers to query
event_id (str): event to fetch
@@ -289,8 +287,7 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
@log_function
def get_state_for_room(self, destination, room_id, event_id):
- """Requests all of the `current` state PDUs for a given room from
- a remote home server.
+ """Requests all of the room state at a given event from a remote home server.
Args:
destination (str): The remote homeserver to query for the state.
@@ -298,9 +295,10 @@ class FederationClient(FederationBase):
event_id (str): The id of the event we want the state at.
Returns:
- Deferred: Results in a list of PDUs.
+ Deferred[Tuple[List[EventBase], List[EventBase]]]:
+ A list of events in the state, and a list of events in the auth chain
+ for the given event.
"""
-
try:
# First we try and ask for just the IDs, as thats far quicker if
# we have most of the state and auth_chain already.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9a571e4fc7..819e8f7331 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -46,6 +46,7 @@ from synapse.replication.http.federation import (
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.logcontext import nested_logging_context
from synapse.util.logutils import log_function
# when processing incoming transactions, we try to handle multiple rooms in
@@ -187,21 +188,22 @@ class FederationServer(FederationBase):
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
- try:
- yield self._handle_received_pdu(
- origin, pdu
- )
- pdu_results[event_id] = {}
- except FederationError as e:
- logger.warn("Error handling PDU %s: %s", event_id, e)
- pdu_results[event_id] = {"error": str(e)}
- except Exception as e:
- f = failure.Failure()
- pdu_results[event_id] = {"error": str(e)}
- logger.error(
- "Failed to handle PDU %s: %s",
- event_id, f.getTraceback().rstrip(),
- )
+ with nested_logging_context(event_id):
+ try:
+ yield self._handle_received_pdu(
+ origin, pdu
+ )
+ pdu_results[event_id] = {}
+ except FederationError as e:
+ logger.warn("Error handling PDU %s: %s", event_id, e)
+ pdu_results[event_id] = {"error": str(e)}
+ except Exception as e:
+ f = failure.Failure()
+ pdu_results[event_id] = {"error": str(e)}
+ logger.error(
+ "Failed to handle PDU %s: %s",
+ event_id, f.getTraceback().rstrip(),
+ )
yield concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 8cbf8c4f7f..98b5950800 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -137,26 +137,6 @@ class TransactionQueue(object):
self._processing_pending_presence = False
- def can_send_to(self, destination):
- """Can we send messages to the given server?
-
- We can't send messages to ourselves. If we are running on localhost
- then we can only federation with other servers running on localhost.
- Otherwise we only federate with servers on a public domain.
-
- Args:
- destination(str): The server we are possibly trying to send to.
- Returns:
- bool: True if we can send to the server.
- """
-
- if destination == self.server_name:
- return False
- if self.server_name.startswith("localhost"):
- return destination.startswith("localhost")
- else:
- return not destination.startswith("localhost")
-
def notify_new_events(self, current_id):
"""This gets called when we have some new events we might want to
send out to other servers.
@@ -279,10 +259,7 @@ class TransactionQueue(object):
self._order += 1
destinations = set(destinations)
- destinations = set(
- dest for dest in destinations if self.can_send_to(dest)
- )
-
+ destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
if not destinations:
@@ -358,7 +335,7 @@ class TransactionQueue(object):
for destinations, states in hosts_and_states:
for destination in destinations:
- if not self.can_send_to(destination):
+ if destination == self.server_name:
continue
self.pending_presence_by_dest.setdefault(
@@ -377,7 +354,8 @@ class TransactionQueue(object):
content=content,
)
- if not self.can_send_to(destination):
+ if destination == self.server_name:
+ logger.info("Not sending EDU to ourselves")
return
sent_edus_counter.inc()
@@ -392,10 +370,8 @@ class TransactionQueue(object):
self._attempt_new_transaction(destination)
def send_device_messages(self, destination):
- if destination == self.server_name or destination == "localhost":
- return
-
- if not self.can_send_to(destination):
+ if destination == self.server_name:
+ logger.info("Not sending device update to ourselves")
return
self._attempt_new_transaction(destination)
|