From 0e830d377065b35f7430415288c54e13d9a6f0c6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 16 Nov 2016 11:32:16 +0000 Subject: Rename transaction queue functions to send_* --- synapse/federation/federation_client.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 94e76b1978..783ccf12f6 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -111,19 +111,19 @@ class FederationClient(FederationBase): sent_pdus_destination_dist.inc_by(len(destinations)) - logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id) + logger.debug("[%s] transaction_layer.send_pdu... ", pdu.event_id) # TODO, add errback, etc. - self._transaction_queue.enqueue_pdu(pdu, destinations, order) + self._transaction_queue.send_pdu(pdu, destinations, order) logger.debug( - "[%s] transaction_layer.enqueue_pdu... done", + "[%s] transaction_layer.send_pdu... done", pdu.event_id ) def send_presence(self, destination, states): if destination != self.server_name: - self._transaction_queue.enqueue_presence(destination, states) + self._transaction_queue.send_presence(destination, states) @log_function def send_edu(self, destination, edu_type, content, key=None): @@ -136,17 +136,17 @@ class FederationClient(FederationBase): sent_edus_counter.inc() - self._transaction_queue.enqueue_edu(edu, key=key) + self._transaction_queue.send_edu(edu, key=key) @log_function def send_device_messages(self, destination): """Sends the device messages in the local database to the remote destination""" - self._transaction_queue.enqueue_device_messages(destination) + self._transaction_queue.send_device_messages(destination) @log_function def send_failure(self, failure, destination): - self._transaction_queue.enqueue_failure(failure, destination) + self._transaction_queue.send_failure(failure, destination) return defer.succeed(None) @log_function -- cgit 1.5.1 From daec6fc355517b70c159526e20e739fa09c8e443 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 16 Nov 2016 13:41:21 +0000 Subject: Move logic into transaction_queue --- synapse/federation/federation_client.py | 16 ++-------------- synapse/federation/replication.py | 2 -- synapse/federation/transaction_queue.py | 19 ++++++++++++++++--- 3 files changed, 18 insertions(+), 19 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 783ccf12f6..9c69fe511c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -106,15 +106,12 @@ class FederationClient(FederationBase): Deferred: Completes when we have successfully processed the PDU and replicated it to any interested remote home servers. """ - order = self._order - self._order += 1 - sent_pdus_destination_dist.inc_by(len(destinations)) logger.debug("[%s] transaction_layer.send_pdu... ", pdu.event_id) # TODO, add errback, etc. - self._transaction_queue.send_pdu(pdu, destinations, order) + self._transaction_queue.send_pdu(pdu, destinations) logger.debug( "[%s] transaction_layer.send_pdu... done", @@ -127,16 +124,7 @@ class FederationClient(FederationBase): @log_function def send_edu(self, destination, edu_type, content, key=None): - edu = Edu( - origin=self.server_name, - destination=destination, - edu_type=edu_type, - content=content, - ) - - sent_edus_counter.inc() - - self._transaction_queue.send_edu(edu, key=key) + self._transaction_queue.send_edu(destination, edu_type, content, key=key) @log_function def send_device_messages(self, destination): diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index ea66a5dcbc..043baef13f 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -68,8 +68,6 @@ class ReplicationLayer(FederationClient, FederationServer): self.transaction_actions = TransactionActions(self.store) self._transaction_queue = TransactionQueue(hs, transport_layer) - self._order = 0 - self.hs = hs super(ReplicationLayer, self).__init__(hs) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index e0abe4b40b..69e01d6521 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -95,6 +95,8 @@ class TransactionQueue(object): # HACK to get unique tx id self._next_txn_id = int(self.clock.time_msec()) + self._order = 1 + def can_send_to(self, destination): """Can we send messages to the given server? @@ -115,11 +117,14 @@ class TransactionQueue(object): else: return not destination.startswith("localhost") - def send_pdu(self, pdu, destinations, order): + def send_pdu(self, pdu, destinations): # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. + order = self._order + self._order += 1 + destinations = set(destinations) destinations = set( dest for dest in destinations if self.can_send_to(dest) @@ -140,6 +145,9 @@ class TransactionQueue(object): ) def send_presence(self, destination, states): + if not self.can_send_to(destination): + return + self.pending_presence_by_dest.setdefault(destination, {}).update({ state.user_id: state for state in states }) @@ -148,8 +156,13 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def send_edu(self, edu, key=None): - destination = edu.destination + def send_edu(self, destination, edu_type, content, key=None): + edu = Edu( + origin=self.server_name, + destination=destination, + edu_type=edu_type, + content=content, + ) if not self.can_send_to(destination): return -- cgit 1.5.1 From 847d5db1d1aa30fd6a8166e36fe04e6d94533521 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 16 Nov 2016 14:15:50 +0000 Subject: Add transaction queue and transport layer to DI --- synapse/federation/__init__.py | 7 +++---- synapse/federation/federation_client.py | 1 - synapse/federation/replication.py | 4 +--- synapse/federation/transaction_queue.py | 4 ++-- synapse/server.py | 10 ++++++++++ 5 files changed, 16 insertions(+), 10 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/__init__.py b/synapse/federation/__init__.py index 979fdf2431..2e32d245ba 100644 --- a/synapse/federation/__init__.py +++ b/synapse/federation/__init__.py @@ -17,10 +17,9 @@ """ from .replication import ReplicationLayer -from .transport.client import TransportLayerClient -def initialize_http_replication(homeserver): - transport = TransportLayerClient(homeserver) +def initialize_http_replication(hs): + transport = hs.get_federation_transport_client() - return ReplicationLayer(homeserver, transport) + return ReplicationLayer(hs, transport) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 9c69fe511c..0fe21ac8d7 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -18,7 +18,6 @@ from twisted.internet import defer from .federation_base import FederationBase from synapse.api.constants import Membership -from .units import Edu from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 043baef13f..797c4bedbf 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -20,8 +20,6 @@ a given transport. from .federation_client import FederationClient from .federation_server import FederationServer -from .transaction_queue import TransactionQueue - from .persistence import TransactionActions import logging @@ -66,7 +64,7 @@ class ReplicationLayer(FederationClient, FederationServer): self._clock = hs.get_clock() self.transaction_actions = TransactionActions(self.store) - self._transaction_queue = TransactionQueue(hs, transport_layer) + self._transaction_queue = hs.get_federation_sender() self.hs = hs diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 69e01d6521..eb504055f8 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -44,13 +44,13 @@ class TransactionQueue(object): It batches pending PDUs into single transactions. """ - def __init__(self, hs, transport_layer): + def __init__(self, hs): self.server_name = hs.hostname self.store = hs.get_datastore() self.transaction_actions = TransactionActions(self.store) - self.transport_layer = transport_layer + self.transport_layer = hs.get_federation_transport_client() self.clock = hs.get_clock() diff --git a/synapse/server.py b/synapse/server.py index 374124a147..faab617b4f 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -32,6 +32,8 @@ from synapse.appservice.scheduler import ApplicationServiceScheduler from synapse.crypto.keyring import Keyring from synapse.events.builder import EventBuilderFactory from synapse.federation import initialize_http_replication +from synapse.federation.transport.client import TransportLayerClient +from synapse.federation.transaction_queue import TransactionQueue from synapse.handlers import Handlers from synapse.handlers.appservice import ApplicationServicesHandler from synapse.handlers.auth import AuthHandler @@ -124,6 +126,8 @@ class HomeServer(object): 'http_client_context_factory', 'simple_http_client', 'media_repository', + 'federation_transport_client', + 'federation_sender', ] def __init__(self, hostname, **kwargs): @@ -265,6 +269,12 @@ class HomeServer(object): def build_media_repository(self): return MediaRepository(self) + def build_federation_transport_client(self): + return TransportLayerClient(self) + + def build_federation_sender(self): + return TransactionQueue(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) -- cgit 1.5.1 From 59ef517e6bc63b2613f18c9b85356a0f973f5698 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 16 Nov 2016 14:28:03 +0000 Subject: Use new federation_sender DI --- synapse/federation/federation_client.py | 49 --------------------------------- synapse/federation/transaction_queue.py | 10 +++++++ synapse/handlers/devicemessage.py | 4 +-- synapse/handlers/federation.py | 7 +++-- synapse/handlers/presence.py | 11 ++++---- synapse/handlers/receipts.py | 4 +-- synapse/handlers/typing.py | 4 +-- 7 files changed, 26 insertions(+), 63 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 0fe21ac8d7..b255709165 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -44,10 +44,6 @@ logger = logging.getLogger(__name__) # synapse.federation.federation_client is a silly name metrics = synapse.metrics.get_metrics_for("synapse.federation.client") -sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations") - -sent_edus_counter = metrics.register_counter("sent_edus") - sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) @@ -91,51 +87,6 @@ class FederationClient(FederationBase): self._get_pdu_cache.start() - @log_function - def send_pdu(self, pdu, destinations): - """Informs the replication layer about a new PDU generated within the - home server that should be transmitted to others. - - TODO: Figure out when we should actually resolve the deferred. - - Args: - pdu (Pdu): The new Pdu. - - Returns: - Deferred: Completes when we have successfully processed the PDU - and replicated it to any interested remote home servers. - """ - sent_pdus_destination_dist.inc_by(len(destinations)) - - logger.debug("[%s] transaction_layer.send_pdu... ", pdu.event_id) - - # TODO, add errback, etc. - self._transaction_queue.send_pdu(pdu, destinations) - - logger.debug( - "[%s] transaction_layer.send_pdu... done", - pdu.event_id - ) - - def send_presence(self, destination, states): - if destination != self.server_name: - self._transaction_queue.send_presence(destination, states) - - @log_function - def send_edu(self, destination, edu_type, content, key=None): - self._transaction_queue.send_edu(destination, edu_type, content, key=key) - - @log_function - def send_device_messages(self, destination): - """Sends the device messages in the local database to the remote - destination""" - self._transaction_queue.send_device_messages(destination) - - @log_function - def send_failure(self, failure, destination): - self._transaction_queue.send_failure(failure, destination) - return defer.succeed(None) - @log_function def make_query(self, destination, query_type, args, retry_on_dns_fail=False): diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index eb504055f8..5d4f244377 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -36,6 +36,12 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) +client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client") +sent_pdus_destination_dist = client_metrics.register_distribution( + "sent_pdu_destinations" +) +sent_edus_counter = client_metrics.register_counter("sent_edus") + class TransactionQueue(object): """This class makes sure we only have one transaction in flight at @@ -135,6 +141,8 @@ class TransactionQueue(object): if not destinations: return + sent_pdus_destination_dist.inc_by(len(destinations)) + for destination in destinations: self.pending_pdus_by_dest.setdefault(destination, []).append( (pdu, order) @@ -167,6 +175,8 @@ class TransactionQueue(object): if not self.can_send_to(destination): return + sent_edus_counter.inc() + if key: self.pending_edus_keyed_by_dest.setdefault( destination, {} diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index c5368e5df2..f7fad15c62 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -34,9 +34,9 @@ class DeviceMessageHandler(object): self.store = hs.get_datastore() self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id - self.federation = hs.get_replication_layer() + self.federation = hs.get_federation_sender() - self.federation.register_edu_handler( + hs.get_replication_layer().register_edu_handler( "m.direct_to_device", self.on_direct_to_device_edu ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2d801bad47..38592d5577 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -71,6 +71,7 @@ class FederationHandler(BaseHandler): self.store = hs.get_datastore() self.replication_layer = hs.get_replication_layer() + self.federation_sender = hs.get_federation_sender() self.state_handler = hs.get_state_handler() self.server_name = hs.hostname self.keyring = hs.get_keyring() @@ -94,7 +95,7 @@ class FederationHandler(BaseHandler): processing. """ - return self.replication_layer.send_pdu(event, destinations) + return self.federation_sender.send_pdu(event, destinations) @log_function @defer.inlineCallbacks @@ -847,7 +848,7 @@ class FederationHandler(BaseHandler): event.signatures, ) - self.replication_layer.send_pdu(new_pdu, destinations) + self.federation_sender.send_pdu(new_pdu, destinations) state_ids = context.prev_state_ids.values() auth_chain = yield self.store.get_auth_chain(set( @@ -1071,7 +1072,7 @@ class FederationHandler(BaseHandler): event.signatures, ) - self.replication_layer.send_pdu(new_pdu, destinations) + self.federation_sender.send_pdu(new_pdu, destinations) defer.returnValue(None) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b047ae2250..1b89dc6274 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -91,28 +91,29 @@ class PresenceHandler(object): self.store = hs.get_datastore() self.wheel_timer = WheelTimer() self.notifier = hs.get_notifier() - self.federation = hs.get_replication_layer() + self.replication = hs.get_replication_layer() + self.federation = hs.get_federation_sender() self.state = hs.get_state_handler() - self.federation.register_edu_handler( + self.replication.register_edu_handler( "m.presence", self.incoming_presence ) - self.federation.register_edu_handler( + self.replication.register_edu_handler( "m.presence_invite", lambda origin, content: self.invite_presence( observed_user=UserID.from_string(content["observed_user"]), observer_user=UserID.from_string(content["observer_user"]), ) ) - self.federation.register_edu_handler( + self.replication.register_edu_handler( "m.presence_accept", lambda origin, content: self.accept_presence( observed_user=UserID.from_string(content["observed_user"]), observer_user=UserID.from_string(content["observer_user"]), ) ) - self.federation.register_edu_handler( + self.replication.register_edu_handler( "m.presence_deny", lambda origin, content: self.deny_presence( observed_user=UserID.from_string(content["observed_user"]), diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index e536a909d0..916e80a48e 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -33,8 +33,8 @@ class ReceiptsHandler(BaseHandler): self.server_name = hs.config.server_name self.store = hs.get_datastore() self.hs = hs - self.federation = hs.get_replication_layer() - self.federation.register_edu_handler( + self.federation = hs.get_federation_sender() + hs.get_replication_layer().register_edu_handler( "m.receipt", self._received_remote_receipt ) self.clock = self.hs.get_clock() diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 27ee715ff0..0eea7f8f9c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -55,9 +55,9 @@ class TypingHandler(object): self.clock = hs.get_clock() self.wheel_timer = WheelTimer(bucket_size=5000) - self.federation = hs.get_replication_layer() + self.federation = hs.get_federation_sender() - self.federation.register_edu_handler("m.typing", self._recv_edu) + hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu) hs.get_distributor().observe("user_left_room", self.user_left_room) -- cgit 1.5.1