diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index b952e59518..93e5acebc1 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -53,18 +53,19 @@ class FederationRemoteSendQueue(object):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
+ self.is_mine_id = hs.is_mine_id
- self.presence_map = {}
- self.presence_changed = sorteddict()
+ self.presence_map = {} # Pending presence map user_id -> UserPresenceState
+ self.presence_changed = sorteddict() # Stream position -> user_id
- self.keyed_edu = {}
- self.keyed_edu_changed = sorteddict()
+ self.keyed_edu = {} # (destination, key) -> EDU
+ self.keyed_edu_changed = sorteddict() # stream position -> (destination, key)
- self.edus = sorteddict()
+ self.edus = sorteddict() # stream position -> Edu
- self.failures = sorteddict()
+ self.failures = sorteddict() # stream position -> (destination, Failure)
- self.device_messages = sorteddict()
+ self.device_messages = sorteddict() # stream position -> destination
self.pos = 1
self.pos_time = sorteddict()
@@ -120,7 +121,9 @@ class FederationRemoteSendQueue(object):
del self.presence_changed[key]
user_ids = set(
- user_id for uids in self.presence_changed.values() for _, user_id in uids
+ user_id
+ for uids in self.presence_changed.itervalues()
+ for user_id in uids
)
to_del = [
@@ -187,18 +190,20 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
- def send_presence(self, destination, states):
- """As per TransactionQueue"""
+ def send_presence(self, states):
+ """As per TransactionQueue
+
+ Args:
+ states (list(UserPresenceState))
+ """
pos = self._next_pos()
- self.presence_map.update({
- state.user_id: state
- for state in states
- })
+ # We only want to send presence for our own users, so lets always just
+ # filter here just in case.
+ local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
- self.presence_changed[pos] = [
- (destination, state.user_id) for state in states
- ]
+ self.presence_map.update({state.user_id: state for state in local_states})
+ self.presence_changed[pos] = [state.user_id for state in local_states]
self.notifier.on_new_replication_data()
@@ -251,15 +256,14 @@ class FederationRemoteSendQueue(object):
keys = self.presence_changed.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
- dest_user_ids = set(
- (pos, dest_user_id)
+ dest_user_ids = [
+ (pos, user_id)
for pos in keys[i:j]
- for dest_user_id in self.presence_changed[pos]
- )
+ for user_id in self.presence_changed[pos]
+ ]
- for (key, (dest, user_id)) in dest_user_ids:
+ for (key, user_id) in dest_user_ids:
rows.append((key, PresenceRow(
- destination=dest,
state=self.presence_map[user_id],
)))
@@ -357,7 +361,6 @@ class BaseFederationRow(object):
class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
- "destination", # str
"state", # UserPresenceState
))):
TypeId = "p"
@@ -365,18 +368,14 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
@staticmethod
def from_data(data):
return PresenceRow(
- destination=data["destination"],
- state=UserPresenceState.from_dict(data["state"])
+ state=UserPresenceState.from_dict(data)
)
def to_data(self):
- return {
- "destination": self.destination,
- "state": self.state.as_dict()
- }
+ return self.state.as_dict()
def add_to_buffer(self, buff):
- buff.presence.setdefault(self.destination, []).append(self.state)
+ buff.presence.append(self.state)
class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
@@ -487,7 +486,7 @@ TypeToRow = {
ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
- "presence", # dict of destination -> [UserPresenceState]
+ "presence", # list(UserPresenceState)
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
"failures", # dict of destination -> [failures]
@@ -509,7 +508,7 @@ def process_rows_for_federation(transaction_queue, rows):
# them into the appropriate collection and then send them off.
buff = ParsedFederationStreamData(
- presence={},
+ presence=[],
keyed_edus={},
edus={},
failures={},
@@ -526,8 +525,8 @@ def process_rows_for_federation(transaction_queue, rows):
parsed_row = RowType.from_data(row.data)
parsed_row.add_to_buffer(buff)
- for destination, states in buff.presence.iteritems():
- transaction_queue.send_presence(destination, states)
+ if buff.presence:
+ transaction_queue.send_presence(buff.presence)
for destination, edu_map in buff.keyed_edus.iteritems():
for key, edu in edu_map.items():
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 0e43891647..dee387eb7f 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -21,11 +21,11 @@ from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException
from synapse.util.async import run_on_reactor
-from synapse.util.logcontext import preserve_context_over_fn
+from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
from synapse.types import get_domain_from_id
-from synapse.handlers.presence import format_user_presence_state
+from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
import synapse.metrics
import logging
@@ -79,8 +79,18 @@ class TransactionQueue(object):
# destination -> list of tuple(edu, deferred)
self.pending_edus_by_dest = edus = {}
- # Presence needs to be separate as we send single aggragate EDUs
+ # Map of user_id -> UserPresenceState for all the pending presence
+ # to be sent out by user_id. Entries here get processed and put in
+ # pending_presence_by_dest
+ self.pending_presence = {}
+
+ # Map of destination -> user_id -> UserPresenceState of pending presence
+ # to be sent to each destinations
self.pending_presence_by_dest = presence = {}
+
+ # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
+ # based on their key (e.g. typing events by room_id)
+ # Map of destination -> (edu_type, key) -> Edu
self.pending_edus_keyed_by_dest = edus_keyed = {}
metrics.register_callback(
@@ -115,6 +125,8 @@ class TransactionQueue(object):
self._is_processing = False
self._last_poked_id = -1
+ self._processing_pending_presence = False
+
def can_send_to(self, destination):
"""Can we send messages to the given server?
@@ -226,17 +238,71 @@ class TransactionQueue(object):
self._attempt_new_transaction, destination
)
- def send_presence(self, destination, states):
- if not self.can_send_to(destination):
- return
+ @preserve_fn # the caller should not yield on this
+ @defer.inlineCallbacks
+ def send_presence(self, states):
+ """Send the new presence states to the appropriate destinations.
- self.pending_presence_by_dest.setdefault(destination, {}).update({
+ This actually queues up the presence states ready for sending and
+ triggers a background task to process them and send out the transactions.
+
+ Args:
+ states (list(UserPresenceState))
+ """
+
+ # First we queue up the new presence by user ID, so multiple presence
+ # updates in quick successtion are correctly handled
+ # We only want to send presence for our own users, so lets always just
+ # filter here just in case.
+ self.pending_presence.update({
state.user_id: state for state in states
+ if self.is_mine_id(state.user_id)
})
- preserve_context_over_fn(
- self._attempt_new_transaction, destination
- )
+ # We then handle the new pending presence in batches, first figuring
+ # out the destinations we need to send each state to and then poking it
+ # to attempt a new transaction. We linearize this so that we don't
+ # accidentally mess up the ordering and send multiple presence updates
+ # in the wrong order
+ if self._processing_pending_presence:
+ return
+
+ self._processing_pending_presence = True
+ try:
+ while True:
+ states_map = self.pending_presence
+ self.pending_presence = {}
+
+ if not states_map:
+ break
+
+ yield self._process_presence_inner(states_map.values())
+ finally:
+ self._processing_pending_presence = False
+
+ @measure_func("txnqueue._process_presence")
+ @defer.inlineCallbacks
+ def _process_presence_inner(self, states):
+ """Given a list of states populate self.pending_presence_by_dest and
+ poke to send a new transaction to each destination
+
+ Args:
+ states (list(UserPresenceState))
+ """
+ hosts_and_states = yield get_interested_remotes(self.store, states)
+
+ for destinations, states in hosts_and_states:
+ for destination in destinations:
+ if not self.can_send_to(destination):
+ continue
+
+ self.pending_presence_by_dest.setdefault(
+ destination, {}
+ ).update({
+ state.user_id: state for state in states
+ })
+
+ preserve_fn(self._attempt_new_transaction)(destination)
def send_edu(self, destination, edu_type, content, key=None):
edu = Edu(
|