diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 3fc625c4dd..99b5835780 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -158,10 +158,6 @@ class FederationRemoteSendQueue(object):
self.failures[pos] = (destination, str(failure))
- def send_pdu(self, pdu, destinations):
- # This gets sent down a separate path
- pass
-
def send_device_messages(self, destination):
pos = self._next_pos()
self.device_messages[pos] = destination
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index c864e12287..0b3fdc1067 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
from .persistence import TransactionActions
from .units import Transaction, Edu
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import HttpResponseException
from synapse.util.async import run_on_reactor
from synapse.util.logcontext import preserve_context_over_fn
@@ -153,13 +154,17 @@ class TransactionQueue(object):
event.room_id, latest_event_ids=[event.event_id],
)
- destinations = [
+ destinations = set(
get_domain_from_id(user_id) for user_id in users_in_room
- ]
+ )
+
+ if event.type == EventTypes.Member:
+ if event.content["membership"] == Membership.JOIN:
+ destinations.add(get_domain_from_id(event.state_key))
logger.debug("Sending %s to %r", event, destinations)
- self.send_pdu(event, destinations)
+ self._send_pdu(event, destinations)
yield self.store.update_federation_out_pos(
"events", next_token
@@ -168,7 +173,7 @@ class TransactionQueue(object):
finally:
self._is_processing = False
- def send_pdu(self, pdu, destinations):
+ def _send_pdu(self, pdu, destinations):
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 38592d5577..4ca563c85e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -81,22 +81,6 @@ class FederationHandler(BaseHandler):
# When joining a room we need to queue any events for that room up
self.room_queues = {}
- def handle_new_event(self, event, destinations):
- """ Takes in an event from the client to server side, that has already
- been authed and handled by the state module, and sends it to any
- remote home servers that may be interested.
-
- Args:
- event: The event to send
- destinations: A list of destinations to send it to
-
- Returns:
- Deferred: Resolved when it has successfully been queued for
- processing.
- """
-
- return self.federation_sender.send_pdu(event, destinations)
-
@log_function
@defer.inlineCallbacks
def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
@@ -831,25 +815,6 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
- new_pdu = event
-
- users_in_room = yield self.store.get_joined_users_from_context(event, context)
-
- destinations = set(
- get_domain_from_id(user_id) for user_id in users_in_room
- if not self.hs.is_mine_id(user_id)
- )
-
- destinations.discard(origin)
-
- logger.debug(
- "on_send_join_request: Sending event: %s, signatures: %s",
- event.event_id,
- event.signatures,
- )
-
- self.federation_sender.send_pdu(new_pdu, destinations)
-
state_ids = context.prev_state_ids.values()
auth_chain = yield self.store.get_auth_chain(set(
[event.event_id] + state_ids
@@ -1056,24 +1021,6 @@ class FederationHandler(BaseHandler):
event, event_stream_id, max_stream_id, extra_users=extra_users
)
- new_pdu = event
-
- users_in_room = yield self.store.get_joined_users_from_context(event, context)
-
- destinations = set(
- get_domain_from_id(user_id) for user_id in users_in_room
- if not self.hs.is_mine_id(user_id)
- )
- destinations.discard(origin)
-
- logger.debug(
- "on_send_leave_request: Sending event: %s, signatures: %s",
- event.event_id,
- event.signatures,
- )
-
- self.federation_sender.send_pdu(new_pdu, destinations)
-
defer.returnValue(None)
@defer.inlineCallbacks
|