From 69054e3d4c46d99f877b3242707bbeaa43485f17 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 14:12:11 +0100 Subject: Record why we have chosen to notify --- synapse/handlers/presence.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'synapse/handlers/presence.py') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index cf82a2336e..7ae05603f5 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -52,6 +52,8 @@ bump_active_time_counter = metrics.register_counter("bump_active_time") get_updates_counter = metrics.register_counter("get_updates", labels=["type"]) +notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"]) + # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them # "currently_active" @@ -940,26 +942,32 @@ def should_notify(old_state, new_state): """Decides if a presence state change should be sent to interested parties. """ if old_state.status_msg != new_state.status_msg: + notify_reason_counter.inc("status_msg_change") return True if old_state.state == PresenceState.ONLINE: if new_state.state != PresenceState.ONLINE: # Always notify for online -> anything + notify_reason_counter.inc("online_to_not") return True if new_state.currently_active != old_state.currently_active: + notify_reason_counter.inc("current_active_change") return True if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Only notify about last active bumps if we're not currently acive if not (old_state.currently_active and new_state.currently_active): + notify_reason_counter.inc("last_active_change") return True elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Always notify for a transition where last active gets bumped. + notify_reason_counter.inc("last_active_change") return True if old_state.state != new_state.state: + notify_reason_counter.inc("state_change") return True return False -- cgit 1.5.1 From 74a3b4a650022a669f528c332f1913161562c7d0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Sep 2016 10:23:38 +0100 Subject: Fiddle should_notify to better report stats --- synapse/handlers/presence.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) (limited to 'synapse/handlers/presence.py') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7ae05603f5..af389b590f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -945,29 +945,24 @@ def should_notify(old_state, new_state): notify_reason_counter.inc("status_msg_change") return True - if old_state.state == PresenceState.ONLINE: - if new_state.state != PresenceState.ONLINE: - # Always notify for online -> anything - notify_reason_counter.inc("online_to_not") - return True + if old_state.state != new_state.state: + notify_reason_counter.inc("state_change") + return True + if old_state.state == PresenceState.ONLINE: if new_state.currently_active != old_state.currently_active: notify_reason_counter.inc("current_active_change") return True if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Only notify about last active bumps if we're not currently acive - if not (old_state.currently_active and new_state.currently_active): - notify_reason_counter.inc("last_active_change") + if not new_state.currently_active: + notify_reason_counter.inc("last_active_change_online") return True elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Always notify for a transition where last active gets bumped. - notify_reason_counter.inc("last_active_change") - return True - - if old_state.state != new_state.state: - notify_reason_counter.inc("state_change") + notify_reason_counter.inc("last_active_change_not_online") return True return False -- cgit 1.5.1 From 438ef4763704fb90c3aa0b7aa0c688607e60b010 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Sep 2016 10:28:35 +0100 Subject: Short circuit if presence is the same --- synapse/handlers/presence.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/handlers/presence.py') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index af389b590f..a9f5233119 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -941,6 +941,9 @@ class PresenceHandler(object): def should_notify(old_state, new_state): """Decides if a presence state change should be sent to interested parties. """ + if old_state == new_state: + return False + if old_state.status_msg != new_state.status_msg: notify_reason_counter.inc("status_msg_change") return True -- cgit 1.5.1 From 3c4208a0570fb7410a8d12e11999a78ee35700a6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Sep 2016 11:31:01 +0100 Subject: Record counts of state changes --- synapse/handlers/presence.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/handlers/presence.py') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a9f5233119..da9f0da69e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -53,6 +53,9 @@ bump_active_time_counter = metrics.register_counter("bump_active_time") get_updates_counter = metrics.register_counter("get_updates", labels=["type"]) notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"]) +state_transition_counter = metrics.register_counter( + "state_transition", labels=["from", "to"] +) # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them @@ -950,6 +953,7 @@ def should_notify(old_state, new_state): if old_state.state != new_state.state: notify_reason_counter.inc("state_change") + state_transition_counter.inc(old_state.state, new_state.state) return True if old_state.state == PresenceState.ONLINE: -- cgit 1.5.1 From 8b93af662d432cf6b3d36cbbcbd4dd2427bde658 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Sep 2016 15:04:46 +0100 Subject: Check the user_id for presence/typing matches origin --- synapse/handlers/presence.py | 7 +++++++ synapse/handlers/typing.py | 9 ++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) (limited to 'synapse/handlers/presence.py') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index da9f0da69e..7a3c16a8aa 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -651,6 +651,13 @@ class PresenceHandler(object): ) continue + if get_domain_from_id(user_id) != origin: + logger.info( + "Got presence update from %r with bad 'user_id': %r", + origin, user_id, + ) + continue + presence_state = push.get("presence", None) if not presence_state: logger.info( diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 0b530b9034..3b687957dd 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -199,7 +199,14 @@ class TypingHandler(object): user_id = content["user_id"] # Check that the string is a valid user id - UserID.from_string(user_id) + user = UserID.from_string(user_id) + + if user.domain != origin: + logger.info( + "Got typing update from %r with bad 'user_id': %r", + origin, user_id, + ) + return users = yield self.state.get_current_user_in_room(room_id) domains = set(get_domain_from_id(u) for u in users) -- cgit 1.5.1 From 6c4d5821446c861c0448a8d952a7aa40897b1ebd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 14:26:05 +0100 Subject: Deduplicate presence in _update_states --- synapse/handlers/presence.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'synapse/handlers/presence.py') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7a3c16a8aa..16dbddee03 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -265,6 +265,12 @@ class PresenceHandler(object): to_notify = {} # Changes we want to notify everyone about to_federation_ping = {} # These need sending keep-alives + # Only bother handling the last presence change for each user + new_states_dict = {} + for new_state in new_states: + new_states_dict[new_state.user_id] = new_state + new_state = new_states_dict.values() + for new_state in new_states: user_id = new_state.user_id -- cgit 1.5.1 From 52b2318777ac334480316b8a8ac2778367dcf53d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 15:59:08 +0100 Subject: Clobber EDUs in send queue --- synapse/federation/federation_client.py | 8 ++++-- synapse/federation/transaction_queue.py | 48 ++++++++++++++++++++++++++++++--- synapse/handlers/presence.py | 20 ++++---------- synapse/handlers/receipts.py | 1 + synapse/handlers/typing.py | 1 + 5 files changed, 58 insertions(+), 20 deletions(-) (limited to 'synapse/handlers/presence.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 78719eed25..3395c9e41e 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -122,8 +122,12 @@ class FederationClient(FederationBase): pdu.event_id ) + def send_presence(self, destination, states): + if destination != self.server_name: + self._transaction_queue.enqueue_presence(destination, states) + @log_function - def send_edu(self, destination, edu_type, content): + def send_edu(self, destination, edu_type, content, key=None): edu = Edu( origin=self.server_name, destination=destination, @@ -134,7 +138,7 @@ class FederationClient(FederationBase): sent_edus_counter.inc() # TODO, add errback, etc. - self._transaction_queue.enqueue_edu(edu) + self._transaction_queue.enqueue_edu(edu, key=key) return defer.succeed(None) @log_function diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 1ac569b305..bd2a04af9e 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,6 +26,7 @@ from synapse.util.retryutils import ( get_retry_limiter, NotRetryingDestination, ) from synapse.util.metrics import measure_func +from synapse.handlers.presence import format_user_presence_state import synapse.metrics import logging @@ -69,13 +70,20 @@ class TransactionQueue(object): # destination -> list of tuple(edu, deferred) self.pending_edus_by_dest = edus = {} + self.pending_presence_by_dest = presence = {} + self.pending_edus_keyed_by_dest = edus_keyed = {} + metrics.register_callback( "pending_pdus", lambda: sum(map(len, pdus.values())), ) metrics.register_callback( "pending_edus", - lambda: sum(map(len, edus.values())), + lambda: ( + sum(map(len, edus.values())) + + sum(map(len, presence.values())) + + sum(map(len, edus_keyed.values())) + ), ) # destination -> list of tuple(failure, deferred) @@ -130,13 +138,25 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def enqueue_edu(self, edu): + def enqueue_presence(self, destination, states): + self.pending_presence_by_dest.setdefault(destination, {}).update({ + state.user_id: state for state in states + }) + + preserve_context_over_fn( + self._attempt_new_transaction, destination + ) + + def enqueue_edu(self, edu, key=None): destination = edu.destination if not self.can_send_to(destination): return - self.pending_edus_by_dest.setdefault(destination, []).append(edu) + if key: + self.pending_edus_keyed_by_dest.setdefault(destination, {})[key] = edu + else: + self.pending_edus_by_dest.setdefault(destination, []).append(edu) preserve_context_over_fn( self._attempt_new_transaction, destination @@ -190,8 +210,13 @@ class TransactionQueue(object): while True: pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_presence = self.pending_presence_by_dest.pop(destination, {}) pending_failures = self.pending_failures_by_dest.pop(destination, []) + pending_edus.extend( + self.pending_edus_keyed_by_dest.pop(destination, {}).values() + ) + limiter = yield get_retry_limiter( destination, self.clock, @@ -203,6 +228,23 @@ class TransactionQueue(object): ) pending_edus.extend(device_message_edus) + logger.info("Sending presence: %r", pending_presence) + if pending_presence: + pending_edus.append( + Edu( + origin=self.server_name, + destination=destination, + edu_type="m.presence", + content={ + "push": [ + format_user_presence_state( + presence, self.clock.time_msec() + ) + for presence in pending_presence.values() + ] + }, + ) + ) if pending_pdus: logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 16dbddee03..a949e39bda 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -625,18 +625,8 @@ class PresenceHandler(object): Args: hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]` """ - now = self.clock.time_msec() for host, states in hosts_to_states.items(): - self.federation.send_edu( - destination=host, - edu_type="m.presence", - content={ - "push": [ - _format_user_presence_state(state, now) - for state in states - ] - } - ) + self.federation.send_presence(host, states) @defer.inlineCallbacks def incoming_presence(self, origin, content): @@ -723,13 +713,13 @@ class PresenceHandler(object): defer.returnValue([ { "type": "m.presence", - "content": _format_user_presence_state(state, now), + "content": format_user_presence_state(state, now), } for state in updates ]) else: defer.returnValue([ - _format_user_presence_state(state, now) for state in updates + format_user_presence_state(state, now) for state in updates ]) @defer.inlineCallbacks @@ -988,7 +978,7 @@ def should_notify(old_state, new_state): return False -def _format_user_presence_state(state, now): +def format_user_presence_state(state, now): """Convert UserPresenceState to a format that can be sent down to clients and to other servers. """ @@ -1101,7 +1091,7 @@ class PresenceEventSource(object): defer.returnValue(([ { "type": "m.presence", - "content": _format_user_presence_state(s, now), + "content": format_user_presence_state(s, now), } for s in updates.values() if include_offline or s.state != PresenceState.OFFLINE diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 726f7308d2..e536a909d0 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -156,6 +156,7 @@ class ReceiptsHandler(BaseHandler): } }, }, + key=(room_id, receipt_type, user_id), ) @defer.inlineCallbacks diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 3b687957dd..0548b81c34 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -187,6 +187,7 @@ class TypingHandler(object): "user_id": user_id, "typing": typing, }, + key=(room_id, user_id), )) yield preserve_context_over_deferred( -- cgit 1.5.1 From ca35e54d6b080fce04bb92536977d48504933561 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Tue, 13 Sep 2016 13:26:33 +0100 Subject: Fix typo "persiting" --- synapse/handlers/presence.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/handlers/presence.py') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a949e39bda..b047ae2250 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -217,7 +217,7 @@ class PresenceHandler(object): is some spurious presence changes that will self-correct. """ logger.info( - "Performing _on_shutdown. Persiting %d unpersisted changes", + "Performing _on_shutdown. Persisting %d unpersisted changes", len(self.user_to_current_state) ) @@ -234,7 +234,7 @@ class PresenceHandler(object): may stack up and slow down shutdown times. """ logger.info( - "Performing _persist_unpersisted_changes. Persiting %d unpersisted changes", + "Performing _persist_unpersisted_changes. Persisting %d unpersisted changes", len(self.unpersisted_users_changes) ) -- cgit 1.5.1