diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 04d04a4457..0240b339b0 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -55,7 +55,12 @@ class FederationRemoteSendQueue(object):
self.is_mine_id = hs.is_mine_id
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
- self.presence_changed = SortedDict() # Stream position -> user_id
+ self.presence_changed = SortedDict() # Stream position -> list[user_id]
+
+ # Stores the destinations we need to explicitly send presence to about a
+ # given user.
+ # Stream position -> (user_id, destinations)
+ self.presence_destinations = SortedDict()
self.keyed_edu = {} # (destination, key) -> EDU
self.keyed_edu_changed = SortedDict() # stream position -> (destination, key)
@@ -77,7 +82,7 @@ class FederationRemoteSendQueue(object):
for queue_name in [
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
- "edus", "device_messages", "pos_time",
+ "edus", "device_messages", "pos_time", "presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
@@ -121,6 +126,15 @@ class FederationRemoteSendQueue(object):
for user_id in uids
)
+ keys = self.presence_destinations.keys()
+ i = self.presence_destinations.bisect_left(position_to_delete)
+ for key in keys[:i]:
+ del self.presence_destinations[key]
+
+ user_ids.update(
+ user_id for user_id, _ in self.presence_destinations.values()
+ )
+
to_del = [
user_id for user_id in self.presence_map if user_id not in user_ids
]
@@ -209,6 +223,20 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
+ def send_presence_to_destinations(self, states, destinations):
+ """As per FederationSender
+
+ Args:
+ states (list[UserPresenceState])
+ destinations (list[str])
+ """
+ for state in states:
+ pos = self._next_pos()
+ self.presence_map.update({state.user_id: state for state in states})
+ self.presence_destinations[pos] = (state.user_id, destinations)
+
+ self.notifier.on_new_replication_data()
+
def send_device_messages(self, destination):
"""As per FederationSender"""
pos = self._next_pos()
@@ -261,6 +289,16 @@ class FederationRemoteSendQueue(object):
state=self.presence_map[user_id],
)))
+ # Fetch presence to send to destinations
+ i = self.presence_destinations.bisect_right(from_token)
+ j = self.presence_destinations.bisect_right(to_token) + 1
+
+ for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
+ rows.append((pos, PresenceDestinationsRow(
+ state=self.presence_map[user_id],
+ destinations=list(dests),
+ )))
+
# Fetch changes keyed edus
i = self.keyed_edu_changed.bisect_right(from_token)
j = self.keyed_edu_changed.bisect_right(to_token) + 1
@@ -357,6 +395,29 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
buff.presence.append(self.state)
+class PresenceDestinationsRow(BaseFederationRow, namedtuple("PresenceDestinationsRow", (
+ "state", # UserPresenceState
+ "destinations", # list[str]
+))):
+ TypeId = "pd"
+
+ @staticmethod
+ def from_data(data):
+ return PresenceDestinationsRow(
+ state=UserPresenceState.from_dict(data["state"]),
+ destinations=data["dests"],
+ )
+
+ def to_data(self):
+ return {
+ "state": self.state.as_dict(),
+ "dests": self.destinations,
+ }
+
+ def add_to_buffer(self, buff):
+ buff.presence_destinations.append((self.state, self.destinations))
+
+
class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
"key", # tuple(str) - the edu key passed to send_edu
"edu", # Edu
@@ -428,6 +489,7 @@ TypeToRow = {
Row.TypeId: Row
for Row in (
PresenceRow,
+ PresenceDestinationsRow,
KeyedEduRow,
EduRow,
DeviceRow,
@@ -437,6 +499,7 @@ TypeToRow = {
ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
"presence", # list(UserPresenceState)
+ "presence_destinations", # list of tuples of UserPresenceState and destinations
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
"device_destinations", # set of destinations
@@ -458,6 +521,7 @@ def process_rows_for_federation(transaction_queue, rows):
buff = ParsedFederationStreamData(
presence=[],
+ presence_destinations=[],
keyed_edus={},
edus={},
device_destinations=set(),
@@ -476,6 +540,11 @@ def process_rows_for_federation(transaction_queue, rows):
if buff.presence:
transaction_queue.send_presence(buff.presence)
+ for state, destinations in buff.presence_destinations:
+ transaction_queue.send_presence_to_destinations(
+ states=[state], destinations=destinations,
+ )
+
for destination, edu_map in iteritems(buff.keyed_edus):
for key, edu in edu_map.items():
transaction_queue.send_edu(edu, key)
|