diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 748548bbe2..a12c18f4df 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -53,6 +53,7 @@ class FederationRemoteSendQueue(object):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
+ self.is_mine_id = hs.is_mine_id
self.presence_map = {}
self.presence_changed = sorteddict()
@@ -120,7 +121,9 @@ class FederationRemoteSendQueue(object):
del self.presence_changed[key]
user_ids = set(
- user_id for uids in self.presence_changed.values() for _, user_id in uids
+ user_id
+ for uids in self.presence_changed.itervalues()
+ for user_id in uids
)
to_del = [
@@ -187,18 +190,14 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
- def send_presence(self, destination, states):
+ def send_presence(self, states):
"""As per TransactionQueue"""
pos = self._next_pos()
- self.presence_map.update({
- state.user_id: state
- for state in states
- })
+ local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
- self.presence_changed[pos] = [
- (destination, state.user_id) for state in states
- ]
+ self.presence_map.update({state.user_id: state for state in local_states})
+ self.presence_changed[pos] = [state.user_id for state in local_states]
self.notifier.on_new_replication_data()
@@ -251,15 +250,14 @@ class FederationRemoteSendQueue(object):
keys = self.presence_changed.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
- dest_user_ids = set(
- (pos, dest_user_id)
+ dest_user_ids = [
+ (pos, user_id)
for pos in keys[i:j]
- for dest_user_id in self.presence_changed[pos]
- )
+ for user_id in self.presence_changed[pos]
+ ]
- for (key, (dest, user_id)) in dest_user_ids:
+ for (key, user_id) in dest_user_ids:
rows.append((key, PresenceRow(
- destination=dest,
state=self.presence_map[user_id],
)))
@@ -354,7 +352,6 @@ class BaseFederationRow(object):
class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
- "destination", # str
"state", # UserPresenceState
))):
TypeId = "p"
@@ -362,18 +359,14 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
@staticmethod
def from_data(data):
return PresenceRow(
- destination=data["destination"],
- state=UserPresenceState.from_dict(data["state"])
+ state=UserPresenceState.from_dict(data)
)
def to_data(self):
- return {
- "destination": self.destination,
- "state": self.state.as_dict()
- }
+ return self.state.as_dict()
def add_to_buffer(self, buff):
- buff.presence.setdefault(self.destination, []).append(self.state)
+ buff.presence.append(self.state)
class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
@@ -469,7 +462,7 @@ TypeToRow = {
ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
- "presence", # dict of destination -> [UserPresenceState]
+ "presence", # list(UserPresenceState)
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
"failures", # dict of destination -> [failures]
@@ -491,7 +484,7 @@ def process_rows_for_federation(transaction_queue, rows):
# them into the appropriate collection and then send them off.
buff = ParsedFederationStreamData(
- presence={},
+ presence=[],
keyed_edus={},
edus={},
failures={},
@@ -508,8 +501,8 @@ def process_rows_for_federation(transaction_queue, rows):
parsed_row = RowType.from_data(row.data)
parsed_row.add_to_buffer(buff)
- for destination, states in buff.presence.iteritems():
- transaction_queue.send_presence(destination, states)
+ if buff.presence:
+ transaction_queue.send_presence(buff.presence)
for destination, edu_map in buff.keyed_edus.iteritems():
for key, edu in edu_map.items():
|