diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 1ac569b305..f8ca93e4c3 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -26,6 +26,7 @@ from synapse.util.retryutils import (
get_retry_limiter, NotRetryingDestination,
)
from synapse.util.metrics import measure_func
+from synapse.handlers.presence import format_user_presence_state
import synapse.metrics
import logging
@@ -69,13 +70,21 @@ class TransactionQueue(object):
# destination -> list of tuple(edu, deferred)
self.pending_edus_by_dest = edus = {}
+ # Presence needs to be separate as we send single aggragate EDUs
+ self.pending_presence_by_dest = presence = {}
+ self.pending_edus_keyed_by_dest = edus_keyed = {}
+
metrics.register_callback(
"pending_pdus",
lambda: sum(map(len, pdus.values())),
)
metrics.register_callback(
"pending_edus",
- lambda: sum(map(len, edus.values())),
+ lambda: (
+ sum(map(len, edus.values()))
+ + sum(map(len, presence.values()))
+ + sum(map(len, edus_keyed.values()))
+ ),
)
# destination -> list of tuple(failure, deferred)
@@ -130,13 +139,27 @@ class TransactionQueue(object):
self._attempt_new_transaction, destination
)
- def enqueue_edu(self, edu):
+ def enqueue_presence(self, destination, states):
+ self.pending_presence_by_dest.setdefault(destination, {}).update({
+ state.user_id: state for state in states
+ })
+
+ preserve_context_over_fn(
+ self._attempt_new_transaction, destination
+ )
+
+ def enqueue_edu(self, edu, key=None):
destination = edu.destination
if not self.can_send_to(destination):
return
- self.pending_edus_by_dest.setdefault(destination, []).append(edu)
+ if key:
+ self.pending_edus_keyed_by_dest.setdefault(
+ destination, {}
+ )[(edu.edu_type, key)] = edu
+ else:
+ self.pending_edus_by_dest.setdefault(destination, []).append(edu)
preserve_context_over_fn(
self._attempt_new_transaction, destination
@@ -190,8 +213,13 @@ class TransactionQueue(object):
while True:
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, [])
+ pending_presence = self.pending_presence_by_dest.pop(destination, {})
pending_failures = self.pending_failures_by_dest.pop(destination, [])
+ pending_edus.extend(
+ self.pending_edus_keyed_by_dest.pop(destination, {}).values()
+ )
+
limiter = yield get_retry_limiter(
destination,
self.clock,
@@ -203,6 +231,22 @@ class TransactionQueue(object):
)
pending_edus.extend(device_message_edus)
+ if pending_presence:
+ pending_edus.append(
+ Edu(
+ origin=self.server_name,
+ destination=destination,
+ edu_type="m.presence",
+ content={
+ "push": [
+ format_user_presence_state(
+ presence, self.clock.time_msec()
+ )
+ for presence in pending_presence.values()
+ ]
+ },
+ )
+ )
if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
|