diff options
author | Erik Johnston <erik@matrix.org> | 2016-11-04 15:35:25 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-11-16 14:47:52 +0000 |
commit | 1587b5a0339d485c8b078024269a5d888ac5e652 (patch) | |
tree | 0441ac0cb049b79bd20ae248b9e1a6aa22713fae | |
parent | Use new federation_sender DI (diff) | |
download | synapse-1587b5a0339d485c8b078024269a5d888ac5e652.tar.xz |
Add initial cut of federation send queue
-rw-r--r-- | synapse/federation/send_queue.py | 174 |
1 files changed, 174 insertions, 0 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py new file mode 100644 index 0000000000..3d3c3d98ff --- /dev/null +++ b/synapse/federation/send_queue.py @@ -0,0 +1,174 @@ +# -*- coding: utf-8 -*- +# Copyright 2014-2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from blist import sorteddict + + +class FederationRemoteSendQueue(object): + def __init__(self, hs): + self.clock = hs.get_clock() + + # TODO: Add metrics for size of lists below + + self.presence_map = {} + self.presence_changed = sorteddict() + + self.keyed_edu = {} + self.keyed_edu_changed = sorteddict() + + self.edus = sorteddict() + + self.failures = sorteddict() + + self.pos = 1 + self.pos_time = sorteddict() + + self.clock.looping_call(self._clear_queue, 30 * 1000) + + def _next_pos(self): + pos = self.pos + self.pos += 1 + self.pos_time[self.clock.time_msec()] = pos + return pos + + def _clear_queue(self): + # TODO measure this function time. + + FIVE_MINUTES_AGO = 5 * 60 * 1000 + now = self.clock.time_msec() + + keys = self.pos_time.keys() + time = keys.bisect_left(now - FIVE_MINUTES_AGO) + if not keys[:time]: + return + + position_to_delete = max(keys[:time]) + for key in keys[:time]: + del self.pos_time[key] + + self._clear_queue_before_pos(position_to_delete) + + def _clear_queue_before_pos(self, position_to_delete): + # Delete things out of presence maps + keys = self.presence_changed.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.presence_changed[key] + + user_ids = set() + for _, states in self.presence_changed.values(): + user_ids.update(s.user_id for s in user_ids) + + to_del = [user_id for user_id in self.presence_map if user_id not in user_ids] + for user_id in self.to_del: + del self.presence_map[user_id] + + # Delete things out of keyed edus + keys = self.keyed_edu_changed.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.keyed_edu_changed[key] + + live_keys = set() + for edu_key in self.keyed_edu_changed.values(): + live_keys.add(edu_key) + + to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys] + for edu_key in to_del: + del self.keyed_edu[edu_key] + + # Delete things out of edu map + keys = self.edus.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.edus[key] + + # Delete things out of failure map + keys = self.failures.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.failures[key] + + def send_edu(self, edu, key=None): + pos = self._next_pos() + + if key: + self.keyed_edu[(edu.destination, key)] = edu + self.keyed_edu_changed[pos] = (edu.destination, key) + else: + self.edus[pos] = edu + + def send_presence(self, destination, states): + pos = self._next_pos() + + self.presence_map.presence_map.update({ + state.user_id: state + for state in states + }) + + self.presence_changed[pos] = (destination, [ + state.user_id for state in states + ]) + + def send_failure(self, failure, destination): + pos = self._next_pos() + + self.failures[pos] = (destination, failure) + + def notify_new_device_message(self, destination): + # TODO + pass + + def get_replication_rows(self, token): + rows = [] + + # Fetch changed presence + keys = self.presence_changed.keys() + i = keys.bisect_right(token) + dest_user_ids = set((k, self.presence_changed[k]) for k in keys[i:]) + + for (key, (dest, user_ids)) in dest_user_ids: + for user_id in user_ids: + rows.append((key, dest, "p", self.presence_map[user_id])) + + # Fetch changes keyed edus + keys = self.keyed_edu_changed.keys() + i = keys.bisect_right(token) + keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:]) + + for (pos, edu_key) in keyed_edus: + rows.append((pos, edu_key, "k", self.keyed_edu[edu_key])) + + # Fetch changed edus + keys = self.edus.keys() + i = keys.bisect_right(token) + edus = set((k, self.edus[k]) for k in keys[i:]) + + for (pos, edu) in edus: + rows.append((pos, edu.destination, "e", edu)) + + # Fetch changed failures + keys = self.failures.keys() + i = keys.bisect_right(token) + failures = set((k, self.failures[k]) for k in keys[i:]) + + for (pos, (destination, failure)) in failures: + rows.append((pos, destination, "f", failure)) + + # Sort rows based on pos + rows.sort() + + return rows |