diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 7d9f5a38d9..433ca2f416 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -400,11 +400,11 @@ class RulesForRoom(object):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Found members %r: %r", self.room_id, members.values())
- interested_in_user_ids = set(
+ interested_in_user_ids = {
user_id
for user_id, membership in itervalues(members)
if membership == Membership.JOIN
- )
+ }
logger.debug("Joined: %r", interested_in_user_ids)
@@ -412,9 +412,9 @@ class RulesForRoom(object):
interested_in_user_ids, on_invalidate=self.invalidate_all_cb
)
- user_ids = set(
+ user_ids = {
uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher
- )
+ }
logger.debug("With pushers: %r", user_ids)
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 8c818a86bf..ba4551d619 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -204,7 +204,7 @@ class EmailPusher(object):
yield self.send_notification(unprocessed, reason)
yield self.save_last_stream_ordering_and_success(
- max([ea["stream_ordering"] for ea in unprocessed])
+ max(ea["stream_ordering"] for ea in unprocessed)
)
# we update the throttle on all the possible unprocessed push actions
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index b13b646bfd..73580c1c6c 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -526,12 +526,10 @@ class Mailer(object):
# If the room doesn't have a name, say who the messages
# are from explicitly to avoid, "messages in the Bob room"
sender_ids = list(
- set(
- [
- notif_events[n["event_id"]].sender
- for n in notifs_by_room[room_id]
- ]
- )
+ {
+ notif_events[n["event_id"]].sender
+ for n in notifs_by_room[room_id]
+ }
)
member_events = yield self.store.get_events(
@@ -557,13 +555,13 @@ class Mailer(object):
else:
# If the reason room doesn't have a name, say who the messages
# are from explicitly to avoid, "messages in the Bob room"
+ room_id = reason["room_id"]
+
sender_ids = list(
- set(
- [
- notif_events[n["event_id"]].sender
- for n in notifs_by_room[reason["room_id"]]
- ]
- )
+ {
+ notif_events[n["event_id"]].sender
+ for n in notifs_by_room[room_id]
+ }
)
member_events = yield self.store.get_events(
diff --git a/synapse/push/presentable_names.py b/synapse/push/presentable_names.py
index 16a7e8e31d..0644a13cfc 100644
--- a/synapse/push/presentable_names.py
+++ b/synapse/push/presentable_names.py
@@ -18,6 +18,8 @@ import re
from twisted.internet import defer
+from synapse.api.constants import EventTypes
+
logger = logging.getLogger(__name__)
# intentionally looser than what aliases we allow to be registered since
@@ -50,17 +52,17 @@ def calculate_room_name(
(string or None) A human readable name for the room.
"""
# does it have a name?
- if ("m.room.name", "") in room_state_ids:
+ if (EventTypes.Name, "") in room_state_ids:
m_room_name = yield store.get_event(
- room_state_ids[("m.room.name", "")], allow_none=True
+ room_state_ids[(EventTypes.Name, "")], allow_none=True
)
if m_room_name and m_room_name.content and m_room_name.content["name"]:
return m_room_name.content["name"]
# does it have a canonical alias?
- if ("m.room.canonical_alias", "") in room_state_ids:
+ if (EventTypes.CanonicalAlias, "") in room_state_ids:
canon_alias = yield store.get_event(
- room_state_ids[("m.room.canonical_alias", "")], allow_none=True
+ room_state_ids[(EventTypes.CanonicalAlias, "")], allow_none=True
)
if (
canon_alias
@@ -74,32 +76,22 @@ def calculate_room_name(
# for an event type, so rearrange the data structure
room_state_bytype_ids = _state_as_two_level_dict(room_state_ids)
- # right then, any aliases at all?
- if "m.room.aliases" in room_state_bytype_ids:
- m_room_aliases = room_state_bytype_ids["m.room.aliases"]
- for alias_id in m_room_aliases.values():
- alias_event = yield store.get_event(alias_id, allow_none=True)
- if alias_event and alias_event.content.get("aliases"):
- the_aliases = alias_event.content["aliases"]
- if len(the_aliases) > 0 and _looks_like_an_alias(the_aliases[0]):
- return the_aliases[0]
-
if not fallback_to_members:
return None
my_member_event = None
- if ("m.room.member", user_id) in room_state_ids:
+ if (EventTypes.Member, user_id) in room_state_ids:
my_member_event = yield store.get_event(
- room_state_ids[("m.room.member", user_id)], allow_none=True
+ room_state_ids[(EventTypes.Member, user_id)], allow_none=True
)
if (
my_member_event is not None
and my_member_event.content["membership"] == "invite"
):
- if ("m.room.member", my_member_event.sender) in room_state_ids:
+ if (EventTypes.Member, my_member_event.sender) in room_state_ids:
inviter_member_event = yield store.get_event(
- room_state_ids[("m.room.member", my_member_event.sender)],
+ room_state_ids[(EventTypes.Member, my_member_event.sender)],
allow_none=True,
)
if inviter_member_event:
@@ -114,9 +106,9 @@ def calculate_room_name(
# we're going to have to generate a name based on who's in the room,
# so find out who is in the room that isn't the user.
- if "m.room.member" in room_state_bytype_ids:
+ if EventTypes.Member in room_state_bytype_ids:
member_events = yield store.get_events(
- list(room_state_bytype_ids["m.room.member"].values())
+ list(room_state_bytype_ids[EventTypes.Member].values())
)
all_members = [
ev
@@ -138,9 +130,9 @@ def calculate_room_name(
# self-chat, peeked room with 1 participant,
# or inbound invite, or outbound 3PID invite.
if all_members[0].sender == user_id:
- if "m.room.third_party_invite" in room_state_bytype_ids:
+ if EventTypes.ThirdPartyInvite in room_state_bytype_ids:
third_party_invites = room_state_bytype_ids[
- "m.room.third_party_invite"
+ EventTypes.ThirdPartyInvite
].values()
if len(third_party_invites) > 0:
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index b9dca5bc63..88d203aa44 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -15,11 +15,17 @@
# limitations under the License.
import logging
+from collections import defaultdict
+from threading import Lock
+from typing import Dict, Tuple, Union
from twisted.internet import defer
+from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
+from synapse.push.emailpusher import EmailPusher
+from synapse.push.httppusher import HttpPusher
from synapse.push.pusher import PusherFactory
from synapse.util.async_helpers import concurrently_execute
@@ -47,7 +53,29 @@ class PusherPool:
self._should_start_pushers = _hs.config.start_pushers
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
- self.pushers = {}
+
+ # map from user id to app_id:pushkey to pusher
+ self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
+
+ # a lock for the pushers dict, since `count_pushers` is called from an different
+ # and we otherwise get concurrent modification errors
+ self._pushers_lock = Lock()
+
+ def count_pushers():
+ results = defaultdict(int) # type: Dict[Tuple[str, str], int]
+ with self._pushers_lock:
+ for pushers in self.pushers.values():
+ for pusher in pushers.values():
+ k = (type(pusher).__name__, pusher.app_id)
+ results[k] += 1
+ return results
+
+ LaterGauge(
+ name="synapse_pushers",
+ desc="the number of active pushers",
+ labels=["kind", "app_id"],
+ caller=count_pushers,
+ )
def start(self):
"""Starts the pushers off in a background process.
@@ -191,7 +219,7 @@ class PusherPool:
min_stream_id - 1, max_stream_id
)
# This returns a tuple, user_id is at index 3
- users_affected = set([r[3] for r in updated_receipts])
+ users_affected = {r[3] for r in updated_receipts}
for u in users_affected:
if u in self.pushers:
@@ -271,11 +299,12 @@ class PusherPool:
return
appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
- byuser = self.pushers.setdefault(pusherdict["user_name"], {})
- if appid_pushkey in byuser:
- byuser[appid_pushkey].on_stop()
- byuser[appid_pushkey] = p
+ with self._pushers_lock:
+ byuser = self.pushers.setdefault(pusherdict["user_name"], {})
+ if appid_pushkey in byuser:
+ byuser[appid_pushkey].on_stop()
+ byuser[appid_pushkey] = p
# Check if there *may* be push to process. We do this as this check is a
# lot cheaper to do than actually fetching the exact rows we need to
@@ -304,7 +333,9 @@ class PusherPool:
if appid_pushkey in byuser:
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
byuser[appid_pushkey].on_stop()
- del byuser[appid_pushkey]
+ with self._pushers_lock:
+ del byuser[appid_pushkey]
+
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
app_id, pushkey, user_id
)
|