diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 748548bbe2..93e5acebc1 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -53,18 +53,19 @@ class FederationRemoteSendQueue(object):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
+ self.is_mine_id = hs.is_mine_id
- self.presence_map = {}
- self.presence_changed = sorteddict()
+ self.presence_map = {} # Pending presence map user_id -> UserPresenceState
+ self.presence_changed = sorteddict() # Stream position -> user_id
- self.keyed_edu = {}
- self.keyed_edu_changed = sorteddict()
+ self.keyed_edu = {} # (destination, key) -> EDU
+ self.keyed_edu_changed = sorteddict() # stream position -> (destination, key)
- self.edus = sorteddict()
+ self.edus = sorteddict() # stream position -> Edu
- self.failures = sorteddict()
+ self.failures = sorteddict() # stream position -> (destination, Failure)
- self.device_messages = sorteddict()
+ self.device_messages = sorteddict() # stream position -> destination
self.pos = 1
self.pos_time = sorteddict()
@@ -120,7 +121,9 @@ class FederationRemoteSendQueue(object):
del self.presence_changed[key]
user_ids = set(
- user_id for uids in self.presence_changed.values() for _, user_id in uids
+ user_id
+ for uids in self.presence_changed.itervalues()
+ for user_id in uids
)
to_del = [
@@ -187,18 +190,20 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
- def send_presence(self, destination, states):
- """As per TransactionQueue"""
+ def send_presence(self, states):
+ """As per TransactionQueue
+
+ Args:
+ states (list(UserPresenceState))
+ """
pos = self._next_pos()
- self.presence_map.update({
- state.user_id: state
- for state in states
- })
+ # We only want to send presence for our own users, so lets always just
+ # filter here just in case.
+ local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
- self.presence_changed[pos] = [
- (destination, state.user_id) for state in states
- ]
+ self.presence_map.update({state.user_id: state for state in local_states})
+ self.presence_changed[pos] = [state.user_id for state in local_states]
self.notifier.on_new_replication_data()
@@ -251,15 +256,14 @@ class FederationRemoteSendQueue(object):
keys = self.presence_changed.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
- dest_user_ids = set(
- (pos, dest_user_id)
+ dest_user_ids = [
+ (pos, user_id)
for pos in keys[i:j]
- for dest_user_id in self.presence_changed[pos]
- )
+ for user_id in self.presence_changed[pos]
+ ]
- for (key, (dest, user_id)) in dest_user_ids:
+ for (key, user_id) in dest_user_ids:
rows.append((key, PresenceRow(
- destination=dest,
state=self.presence_map[user_id],
)))
@@ -267,9 +271,12 @@ class FederationRemoteSendQueue(object):
keys = self.keyed_edu_changed.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
- keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j])
+ # We purposefully clobber based on the key here, python dict comprehensions
+ # always use the last value, so this will correctly point to the last
+ # stream position.
+ keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
- for (pos, (destination, edu_key)) in keyed_edus:
+ for ((destination, edu_key), pos) in keyed_edus.iteritems():
rows.append((pos, KeyedEduRow(
key=edu_key,
edu=self.keyed_edu[(destination, edu_key)],
@@ -279,7 +286,7 @@ class FederationRemoteSendQueue(object):
keys = self.edus.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
- edus = set((k, self.edus[k]) for k in keys[i:j])
+ edus = ((k, self.edus[k]) for k in keys[i:j])
for (pos, edu) in edus:
rows.append((pos, EduRow(edu)))
@@ -288,7 +295,7 @@ class FederationRemoteSendQueue(object):
keys = self.failures.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
- failures = set((k, self.failures[k]) for k in keys[i:j])
+ failures = ((k, self.failures[k]) for k in keys[i:j])
for (pos, (destination, failure)) in failures:
rows.append((pos, FailureRow(
@@ -300,9 +307,9 @@ class FederationRemoteSendQueue(object):
keys = self.device_messages.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
- device_messages = set((k, self.device_messages[k]) for k in keys[i:j])
+ device_messages = {self.device_messages[k]: k for k in keys[i:j]}
- for (pos, destination) in device_messages:
+ for (destination, pos) in device_messages.iteritems():
rows.append((pos, DeviceRow(
destination=destination,
)))
@@ -354,7 +361,6 @@ class BaseFederationRow(object):
class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
- "destination", # str
"state", # UserPresenceState
))):
TypeId = "p"
@@ -362,24 +368,24 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
@staticmethod
def from_data(data):
return PresenceRow(
- destination=data["destination"],
- state=UserPresenceState.from_dict(data["state"])
+ state=UserPresenceState.from_dict(data)
)
def to_data(self):
- return {
- "destination": self.destination,
- "state": self.state.as_dict()
- }
+ return self.state.as_dict()
def add_to_buffer(self, buff):
- buff.presence.setdefault(self.destination, []).append(self.state)
+ buff.presence.append(self.state)
class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
"key", # tuple(str) - the edu key passed to send_edu
"edu", # Edu
))):
+ """Streams EDUs that have an associated key that is ued to clobber. For example,
+ typing EDUs clobber based on room_id.
+ """
+
TypeId = "k"
@staticmethod
@@ -404,6 +410,8 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
class EduRow(BaseFederationRow, namedtuple("EduRow", (
"edu", # Edu
))):
+ """Streams EDUs that don't have keys. See KeyedEduRow
+ """
TypeId = "e"
@staticmethod
@@ -421,6 +429,11 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
"destination", # str
"failure",
))):
+ """Streams failures to a remote server. Failures are issued when there was
+ something wrong with a transaction the remote sent us, e.g. it included
+ an event that was invalid.
+ """
+
TypeId = "f"
@staticmethod
@@ -443,6 +456,10 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
"destination", # str
))):
+ """Streams the fact that either a) there is pending to device messages for
+ users on the remote, or b) a local users device has changed and needs to
+ be sent to the remote.
+ """
TypeId = "d"
@staticmethod
@@ -469,7 +486,7 @@ TypeToRow = {
ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
- "presence", # dict of destination -> [UserPresenceState]
+ "presence", # list(UserPresenceState)
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
"failures", # dict of destination -> [failures]
@@ -491,7 +508,7 @@ def process_rows_for_federation(transaction_queue, rows):
# them into the appropriate collection and then send them off.
buff = ParsedFederationStreamData(
- presence={},
+ presence=[],
keyed_edus={},
edus={},
failures={},
@@ -508,8 +525,8 @@ def process_rows_for_federation(transaction_queue, rows):
parsed_row = RowType.from_data(row.data)
parsed_row.add_to_buffer(buff)
- for destination, states in buff.presence.iteritems():
- transaction_queue.send_presence(destination, states)
+ if buff.presence:
+ transaction_queue.send_presence(buff.presence)
for destination, edu_map in buff.keyed_edus.iteritems():
for key, edu in edu_map.items():
|