summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/federation_sender.py12
-rw-r--r--synapse/app/synchrotron.py6
-rw-r--r--synapse/federation/send_queue.py47
-rw-r--r--synapse/federation/transaction_queue.py99
-rw-r--r--synapse/handlers/presence.py54
-rw-r--r--synapse/replication/slave/storage/events.py1
6 files changed, 139 insertions, 80 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 477e16e0fa..49efb602bc 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -32,6 +32,7 @@ from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.storage.engines import create_engine
+from synapse.storage.presence import PresenceStore
 from synapse.util.async import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
@@ -80,6 +81,17 @@ class FederationSenderSlaveStore(
 
         return rows[0][0] if rows else -1
 
+    # XXX: This is a bit broken because we don't persist the accepted list in a
+    # way that can be replicated. This means that we don't have a way to
+    # invalidate the cache correctly.
+    # This is fine since in practice nobody uses the presence list stuff...
+    get_presence_list_accepted = PresenceStore.__dict__[
+        "get_presence_list_accepted"
+    ]
+    get_presence_list_observers_accepted = PresenceStore.__dict__[
+        "get_presence_list_observers_accepted"
+    ]
+
 
 class FederationSenderServer(HomeServer):
     def get_db_conn(self, run_new_connection=True):
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index d39e3161fe..7b6f82abdc 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -206,10 +206,8 @@ class SynchrotronPresence(object):
 
     @defer.inlineCallbacks
     def notify_from_replication(self, states, stream_id):
-        parties = yield self._get_interested_parties(
-            states, calculate_remote_hosts=False
-        )
-        room_ids_to_states, users_to_states, _ = parties
+        parties = yield self._get_interested_parties(states)
+        room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
             "presence_key", stream_id, rooms=room_ids_to_states.keys(),
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 748548bbe2..a12c18f4df 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -53,6 +53,7 @@ class FederationRemoteSendQueue(object):
         self.server_name = hs.hostname
         self.clock = hs.get_clock()
         self.notifier = hs.get_notifier()
+        self.is_mine_id = hs.is_mine_id
 
         self.presence_map = {}
         self.presence_changed = sorteddict()
@@ -120,7 +121,9 @@ class FederationRemoteSendQueue(object):
                 del self.presence_changed[key]
 
             user_ids = set(
-                user_id for uids in self.presence_changed.values() for _, user_id in uids
+                user_id
+                for uids in self.presence_changed.itervalues()
+                for user_id in uids
             )
 
             to_del = [
@@ -187,18 +190,14 @@ class FederationRemoteSendQueue(object):
 
         self.notifier.on_new_replication_data()
 
-    def send_presence(self, destination, states):
+    def send_presence(self, states):
         """As per TransactionQueue"""
         pos = self._next_pos()
 
-        self.presence_map.update({
-            state.user_id: state
-            for state in states
-        })
+        local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
 
-        self.presence_changed[pos] = [
-            (destination, state.user_id) for state in states
-        ]
+        self.presence_map.update({state.user_id: state for state in local_states})
+        self.presence_changed[pos] = [state.user_id for state in local_states]
 
         self.notifier.on_new_replication_data()
 
@@ -251,15 +250,14 @@ class FederationRemoteSendQueue(object):
         keys = self.presence_changed.keys()
         i = keys.bisect_right(from_token)
         j = keys.bisect_right(to_token) + 1
-        dest_user_ids = set(
-            (pos, dest_user_id)
+        dest_user_ids = [
+            (pos, user_id)
             for pos in keys[i:j]
-            for dest_user_id in self.presence_changed[pos]
-        )
+            for user_id in self.presence_changed[pos]
+        ]
 
-        for (key, (dest, user_id)) in dest_user_ids:
+        for (key, user_id) in dest_user_ids:
             rows.append((key, PresenceRow(
-                destination=dest,
                 state=self.presence_map[user_id],
             )))
 
@@ -354,7 +352,6 @@ class BaseFederationRow(object):
 
 
 class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
-    "destination",  # str
     "state",  # UserPresenceState
 ))):
     TypeId = "p"
@@ -362,18 +359,14 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
     @staticmethod
     def from_data(data):
         return PresenceRow(
-            destination=data["destination"],
-            state=UserPresenceState.from_dict(data["state"])
+            state=UserPresenceState.from_dict(data)
         )
 
     def to_data(self):
-        return {
-            "destination": self.destination,
-            "state": self.state.as_dict()
-        }
+        return self.state.as_dict()
 
     def add_to_buffer(self, buff):
-        buff.presence.setdefault(self.destination, []).append(self.state)
+        buff.presence.append(self.state)
 
 
 class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
@@ -469,7 +462,7 @@ TypeToRow = {
 
 
 ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
-    "presence",  # dict of destination -> [UserPresenceState]
+    "presence",  # list(UserPresenceState)
     "keyed_edus",  # dict of destination -> { key -> Edu }
     "edus",  # dict of destination -> [Edu]
     "failures",  # dict of destination -> [failures]
@@ -491,7 +484,7 @@ def process_rows_for_federation(transaction_queue, rows):
     # them into the appropriate collection and then send them off.
 
     buff = ParsedFederationStreamData(
-        presence={},
+        presence=[],
         keyed_edus={},
         edus={},
         failures={},
@@ -508,8 +501,8 @@ def process_rows_for_federation(transaction_queue, rows):
         parsed_row = RowType.from_data(row.data)
         parsed_row.add_to_buffer(buff)
 
-    for destination, states in buff.presence.iteritems():
-        transaction_queue.send_presence(destination, states)
+    if buff.presence:
+        transaction_queue.send_presence(buff.presence)
 
     for destination, edu_map in buff.keyed_edus.iteritems():
         for key, edu in edu_map.items():
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index c27ce7c5f3..fd9e1fa01c 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -21,7 +21,7 @@ from .units import Transaction, Edu
 
 from synapse.api.errors import HttpResponseException
 from synapse.util.async import run_on_reactor
-from synapse.util.logcontext import preserve_context_over_fn
+from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 from synapse.util.metrics import measure_func
 from synapse.types import get_domain_from_id
@@ -78,6 +78,7 @@ class TransactionQueue(object):
         self.pending_edus_by_dest = edus = {}
 
         # Presence needs to be separate as we send single aggragate EDUs
+        self.pending_presence = {}
         self.pending_presence_by_dest = presence = {}
         self.pending_edus_keyed_by_dest = edus_keyed = {}
 
@@ -113,6 +114,8 @@ class TransactionQueue(object):
         self._is_processing = False
         self._last_poked_id = -1
 
+        self._processing_pending_presence = False
+
     def can_send_to(self, destination):
         """Can we send messages to the given server?
 
@@ -224,17 +227,95 @@ class TransactionQueue(object):
                 self._attempt_new_transaction, destination
             )
 
-    def send_presence(self, destination, states):
-        if not self.can_send_to(destination):
+    @preserve_fn
+    @defer.inlineCallbacks
+    def send_presence(self, states):
+        """Send the new presence states to the appropriate destinations.
+
+        Args:
+            states (list(UserPresenceState))
+        """
+
+        # First we queue up the new presence by user ID, so multiple presence
+        # updates in quick successtion are correctly handled
+        self.pending_presence.update({state.user_id: state for state in states})
+
+        # We then handle the new pending presence in batches, first figuring
+        # out the destinations we need to send each state to and then poking it
+        # to attempt a new transaction. We linearize this so that we don't
+        # accidentally mess up the ordering and send multiple presence updates
+        # in the wrong order
+        if self._processing_pending_presence:
             return
 
-        self.pending_presence_by_dest.setdefault(destination, {}).update({
-            state.user_id: state for state in states
-        })
+        self._processing_pending_presence = True
+        try:
+            while True:
+                states = self.pending_presence
+                self.pending_presence = {}
 
-        preserve_context_over_fn(
-            self._attempt_new_transaction, destination
-        )
+                if not states:
+                    break
+
+                yield self._process_presence_inner(states)
+        finally:
+            self._processing_pending_presence = False
+
+    @measure_func("txnqueue._process_presence")
+    @defer.inlineCallbacks
+    def _process_presence_inner(self, states):
+        """Given a list of states populate self.pending_presence_by_dest and
+        poke to send a new transaction to each destination
+
+        Args:
+            states (list(UserPresenceState))
+        """
+        # First we look up the rooms each user is in (as well as any explicit
+        # subscriptions), then for each distinct room we look up the remote
+        # hosts in those rooms.
+        room_ids_to_states = {}
+        users_to_states = {}
+        for state in states.itervalues():
+            room_ids = yield self.store.get_rooms_for_user(state.user_id)
+            for room_id in room_ids:
+                room_ids_to_states.setdefault(room_id, []).append(state)
+
+            plist = yield self.store.get_presence_list_observers_accepted(
+                state.user_id,
+            )
+            for u in plist:
+                users_to_states.setdefault(u, []).append(state)
+
+        hosts_and_states = []
+        for room_id, states in room_ids_to_states.items():
+            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
+            if not local_states:
+                continue
+
+            hosts = yield self.store.get_hosts_in_room(room_id)
+            hosts_and_states.append((hosts, local_states))
+
+        for user_id, states in users_to_states.items():
+            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
+            if not local_states:
+                continue
+
+            host = get_domain_from_id(user_id)
+            hosts_and_states.append(([host], local_states))
+
+        # And now finally queue up new transactions
+        for destinations, states in hosts_and_states:
+            for destination in destinations:
+                if not self.can_send_to(destination):
+                    continue
+
+                self.pending_presence_by_dest.setdefault(
+                    destination, {}
+                ).update({
+                    state.user_id: state for state in states
+                })
+
+                preserve_fn(self._attempt_new_transaction)(destination)
 
     def send_edu(self, destination, edu_type, content, key=None):
         edu = Edu(
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 9ed5af3cb4..c1c0dd4d3d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -318,11 +318,7 @@ class PresenceHandler(object):
             if to_federation_ping:
                 federation_presence_out_counter.inc_by(len(to_federation_ping))
 
-                _, _, hosts_to_states = yield self._get_interested_parties(
-                    to_federation_ping.values()
-                )
-
-                self._push_to_remotes(hosts_to_states)
+                self._push_to_remotes(to_federation_ping.values())
 
     def _handle_timeouts(self):
         """Checks the presence of users that have timed out and updates as
@@ -615,12 +611,12 @@ class PresenceHandler(object):
         defer.returnValue(states)
 
     @defer.inlineCallbacks
-    def _get_interested_parties(self, states, calculate_remote_hosts=True):
+    def _get_interested_parties(self, states):
         """Given a list of states return which entities (rooms, users, servers)
         are interested in the given states.
 
         Returns:
-            3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`,
+            2-tuple: `(room_ids_to_states, users_to_states)`,
             with each item being a dict of `entity_name` -> `[UserPresenceState]`
         """
         room_ids_to_states = {}
@@ -637,30 +633,10 @@ class PresenceHandler(object):
             # Always notify self
             users_to_states.setdefault(state.user_id, []).append(state)
 
-        hosts_to_states = {}
-        if calculate_remote_hosts:
-            for room_id, states in room_ids_to_states.items():
-                local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
-                if not local_states:
-                    continue
-
-                hosts = yield self.store.get_hosts_in_room(room_id)
-
-                for host in hosts:
-                    hosts_to_states.setdefault(host, []).extend(local_states)
-
-        for user_id, states in users_to_states.items():
-            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
-            if not local_states:
-                continue
-
-            host = get_domain_from_id(user_id)
-            hosts_to_states.setdefault(host, []).extend(local_states)
-
         # TODO: de-dup hosts_to_states, as a single host might have multiple
         # of same presence
 
-        defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states))
+        defer.returnValue((room_ids_to_states, users_to_states))
 
     @defer.inlineCallbacks
     def _persist_and_notify(self, states):
@@ -670,33 +646,32 @@ class PresenceHandler(object):
         stream_id, max_token = yield self.store.update_presence(states)
 
         parties = yield self._get_interested_parties(states)
-        room_ids_to_states, users_to_states, hosts_to_states = parties
+        room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
             "presence_key", stream_id, rooms=room_ids_to_states.keys(),
-            users=[UserID.from_string(u) for u in users_to_states.keys()]
+            users=[UserID.from_string(u) for u in users_to_states]
         )
 
-        self._push_to_remotes(hosts_to_states)
+        self._push_to_remotes(states)
 
     @defer.inlineCallbacks
     def notify_for_states(self, state, stream_id):
         parties = yield self._get_interested_parties([state])
-        room_ids_to_states, users_to_states, hosts_to_states = parties
+        room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
             "presence_key", stream_id, rooms=room_ids_to_states.keys(),
-            users=[UserID.from_string(u) for u in users_to_states.keys()]
+            users=[UserID.from_string(u) for u in users_to_states]
         )
 
-    def _push_to_remotes(self, hosts_to_states):
+    def _push_to_remotes(self, states):
         """Sends state updates to remote servers.
 
         Args:
-            hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]`
+            hosts_to_states (list): list(state)
         """
-        for host, states in hosts_to_states.items():
-            self.federation.send_presence(host, states)
+        self.federation.send_presence(states)
 
     @defer.inlineCallbacks
     def incoming_presence(self, origin, content):
@@ -837,14 +812,13 @@ class PresenceHandler(object):
         if self.is_mine(user):
             state = yield self.current_state_for_user(user.to_string())
 
-            hosts = set(get_domain_from_id(u) for u in user_ids)
-            self._push_to_remotes({host: (state,) for host in hosts})
+            self._push_to_remotes([state])
         else:
             user_ids = filter(self.is_mine_id, user_ids)
 
             states = yield self.current_state_for_users(user_ids)
 
-            self._push_to_remotes({user.domain: states.values()})
+            self._push_to_remotes(states.values())
 
     @defer.inlineCallbacks
     def get_presence_list(self, observer_user, accepted=None):
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 5fd47706ef..4ca1e5aa8c 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -71,6 +71,7 @@ class SlavedEventStore(BaseSlavedStore):
     # to reach inside the __dict__ to extract them.
     get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
     get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"]
+    get_hosts_in_room = RoomMemberStore.__dict__["get_hosts_in_room"]
     get_users_who_share_room_with_user = (
         RoomMemberStore.__dict__["get_users_who_share_room_with_user"]
     )