diff options
author | Erik Johnston <erik@matrix.org> | 2016-09-12 12:37:09 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-09-12 12:37:09 +0100 |
commit | 7fe42cf9492273c68a4fede9c697c0c2fb6d020b (patch) | |
tree | 44d1b5bd52c26dfb9e8e0389b6bebe0e2d861ed3 /synapse/federation/transaction_queue.py | |
parent | Make reindex happen in bg (diff) | |
parent | Merge pull request #1103 from matrix-org/markjh/comment_on_create_index (diff) | |
download | synapse-7fe42cf9492273c68a4fede9c697c0c2fb6d020b.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/make_notif_highlight_query_fast
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r-- | synapse/federation/transaction_queue.py | 50 |
1 files changed, 47 insertions, 3 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 1ac569b305..f8ca93e4c3 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,6 +26,7 @@ from synapse.util.retryutils import ( get_retry_limiter, NotRetryingDestination, ) from synapse.util.metrics import measure_func +from synapse.handlers.presence import format_user_presence_state import synapse.metrics import logging @@ -69,13 +70,21 @@ class TransactionQueue(object): # destination -> list of tuple(edu, deferred) self.pending_edus_by_dest = edus = {} + # Presence needs to be separate as we send single aggragate EDUs + self.pending_presence_by_dest = presence = {} + self.pending_edus_keyed_by_dest = edus_keyed = {} + metrics.register_callback( "pending_pdus", lambda: sum(map(len, pdus.values())), ) metrics.register_callback( "pending_edus", - lambda: sum(map(len, edus.values())), + lambda: ( + sum(map(len, edus.values())) + + sum(map(len, presence.values())) + + sum(map(len, edus_keyed.values())) + ), ) # destination -> list of tuple(failure, deferred) @@ -130,13 +139,27 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def enqueue_edu(self, edu): + def enqueue_presence(self, destination, states): + self.pending_presence_by_dest.setdefault(destination, {}).update({ + state.user_id: state for state in states + }) + + preserve_context_over_fn( + self._attempt_new_transaction, destination + ) + + def enqueue_edu(self, edu, key=None): destination = edu.destination if not self.can_send_to(destination): return - self.pending_edus_by_dest.setdefault(destination, []).append(edu) + if key: + self.pending_edus_keyed_by_dest.setdefault( + destination, {} + )[(edu.edu_type, key)] = edu + else: + self.pending_edus_by_dest.setdefault(destination, []).append(edu) preserve_context_over_fn( self._attempt_new_transaction, destination @@ -190,8 +213,13 @@ class TransactionQueue(object): while True: pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_presence = self.pending_presence_by_dest.pop(destination, {}) pending_failures = self.pending_failures_by_dest.pop(destination, []) + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) + limiter = yield get_retry_limiter( destination, self.clock, @@ -203,6 +231,22 @@ class TransactionQueue(object): ) pending_edus.extend(device_message_edus) + if pending_presence: + pending_edus.append( + Edu( + origin=self.server_name, + destination=destination, + edu_type="m.presence", + content={ + "push": [ + format_user_presence_state( + presence, self.clock.time_msec() + ) + for presence in pending_presence.values() + ] + }, + ) + ) if pending_pdus: logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", |