From 52b2318777ac334480316b8a8ac2778367dcf53d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 15:59:08 +0100 Subject: Clobber EDUs in send queue --- synapse/federation/federation_client.py | 8 ++++-- synapse/federation/transaction_queue.py | 48 ++++++++++++++++++++++++++++++--- 2 files changed, 51 insertions(+), 5 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 78719eed25..3395c9e41e 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -122,8 +122,12 @@ class FederationClient(FederationBase): pdu.event_id ) + def send_presence(self, destination, states): + if destination != self.server_name: + self._transaction_queue.enqueue_presence(destination, states) + @log_function - def send_edu(self, destination, edu_type, content): + def send_edu(self, destination, edu_type, content, key=None): edu = Edu( origin=self.server_name, destination=destination, @@ -134,7 +138,7 @@ class FederationClient(FederationBase): sent_edus_counter.inc() # TODO, add errback, etc. - self._transaction_queue.enqueue_edu(edu) + self._transaction_queue.enqueue_edu(edu, key=key) return defer.succeed(None) @log_function diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 1ac569b305..bd2a04af9e 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,6 +26,7 @@ from synapse.util.retryutils import ( get_retry_limiter, NotRetryingDestination, ) from synapse.util.metrics import measure_func +from synapse.handlers.presence import format_user_presence_state import synapse.metrics import logging @@ -69,13 +70,20 @@ class TransactionQueue(object): # destination -> list of tuple(edu, deferred) self.pending_edus_by_dest = edus = {} + self.pending_presence_by_dest = presence = {} + self.pending_edus_keyed_by_dest = edus_keyed = {} + metrics.register_callback( "pending_pdus", lambda: sum(map(len, pdus.values())), ) metrics.register_callback( "pending_edus", - lambda: sum(map(len, edus.values())), + lambda: ( + sum(map(len, edus.values())) + + sum(map(len, presence.values())) + + sum(map(len, edus_keyed.values())) + ), ) # destination -> list of tuple(failure, deferred) @@ -130,13 +138,25 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def enqueue_edu(self, edu): + def enqueue_presence(self, destination, states): + self.pending_presence_by_dest.setdefault(destination, {}).update({ + state.user_id: state for state in states + }) + + preserve_context_over_fn( + self._attempt_new_transaction, destination + ) + + def enqueue_edu(self, edu, key=None): destination = edu.destination if not self.can_send_to(destination): return - self.pending_edus_by_dest.setdefault(destination, []).append(edu) + if key: + self.pending_edus_keyed_by_dest.setdefault(destination, {})[key] = edu + else: + self.pending_edus_by_dest.setdefault(destination, []).append(edu) preserve_context_over_fn( self._attempt_new_transaction, destination @@ -190,8 +210,13 @@ class TransactionQueue(object): while True: pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_presence = self.pending_presence_by_dest.pop(destination, {}) pending_failures = self.pending_failures_by_dest.pop(destination, []) + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) + limiter = yield get_retry_limiter( destination, self.clock, @@ -203,6 +228,23 @@ class TransactionQueue(object): ) pending_edus.extend(device_message_edus) + logger.info("Sending presence: %r", pending_presence) + if pending_presence: + pending_edus.append( + Edu( + origin=self.server_name, + destination=destination, + edu_type="m.presence", + content={ + "push": [ + format_user_presence_state( + presence, self.clock.time_msec() + ) + for presence in pending_presence.values() + ] + }, + ) + ) if pending_pdus: logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", -- cgit 1.5.1 From 327425764e44ea299ea4d85859035f3052c7b8b1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 17:13:30 +0100 Subject: Add edu.type as part of key. Remove debug logging --- synapse/federation/transaction_queue.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index bd2a04af9e..4f8315e59d 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -154,7 +154,9 @@ class TransactionQueue(object): return if key: - self.pending_edus_keyed_by_dest.setdefault(destination, {})[key] = edu + self.pending_edus_keyed_by_dest.setdefault( + destination, {} + )[(edu.type, key)] = edu else: self.pending_edus_by_dest.setdefault(destination, []).append(edu) @@ -228,7 +230,6 @@ class TransactionQueue(object): ) pending_edus.extend(device_message_edus) - logger.info("Sending presence: %r", pending_presence) if pending_presence: pending_edus.append( Edu( -- cgit 1.5.1 From 464ffd1b5efd30e59ee3d0adef0fa1541130781f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 17:17:23 +0100 Subject: Comment --- synapse/federation/transaction_queue.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 4f8315e59d..1898e4b44b 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -70,6 +70,7 @@ class TransactionQueue(object): # destination -> list of tuple(edu, deferred) self.pending_edus_by_dest = edus = {} + # Presence needs to be separate as we send single aggragate EDUs self.pending_presence_by_dest = presence = {} self.pending_edus_keyed_by_dest = edus_keyed = {} -- cgit 1.5.1 From af4701b311f60e6410d98ff8526ff16db5d22142 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 17:36:56 +0100 Subject: Fix incorrect attribute name --- synapse/federation/transaction_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 1898e4b44b..f8ca93e4c3 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -157,7 +157,7 @@ class TransactionQueue(object): if key: self.pending_edus_keyed_by_dest.setdefault( destination, {} - )[(edu.type, key)] = edu + )[(edu.edu_type, key)] = edu else: self.pending_edus_by_dest.setdefault(destination, []).append(edu) -- cgit 1.5.1