diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 9d257ecf31..084c1503da 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -254,7 +254,7 @@ class PaginationHandler(object):
})
state = None
- if event_filter and event_filter.lazy_load_members():
+ if event_filter and event_filter.lazy_load_members() and len(events) > 0:
# TODO: remove redundant members
# FIXME: we also care about invite targets etc.
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 4c2690ba26..696469732c 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -16,8 +16,8 @@ import logging
from twisted.internet import defer
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id
-from synapse.util import logcontext
from ._base import BaseHandler
@@ -59,7 +59,9 @@ class ReceiptsHandler(BaseHandler):
if is_new:
# fire off a process in the background to send the receipt to
# remote servers
- self._push_remotes([receipt])
+ run_as_background_process(
+ 'push_receipts_to_remotes', self._push_remotes, receipt
+ )
@defer.inlineCallbacks
def _received_remote_receipt(self, origin, content):
@@ -125,44 +127,42 @@ class ReceiptsHandler(BaseHandler):
defer.returnValue(True)
- @logcontext.preserve_fn # caller should not yield on this
@defer.inlineCallbacks
- def _push_remotes(self, receipts):
- """Given a list of receipts, works out which remote servers should be
+ def _push_remotes(self, receipt):
+ """Given a receipt, works out which remote servers should be
poked and pokes them.
"""
try:
- # TODO: Some of this stuff should be coallesced.
- for receipt in receipts:
- room_id = receipt["room_id"]
- receipt_type = receipt["receipt_type"]
- user_id = receipt["user_id"]
- event_ids = receipt["event_ids"]
- data = receipt["data"]
-
- users = yield self.state.get_current_user_in_room(room_id)
- remotedomains = set(get_domain_from_id(u) for u in users)
- remotedomains = remotedomains.copy()
- remotedomains.discard(self.server_name)
-
- logger.debug("Sending receipt to: %r", remotedomains)
-
- for domain in remotedomains:
- self.federation.send_edu(
- destination=domain,
- edu_type="m.receipt",
- content={
- room_id: {
- receipt_type: {
- user_id: {
- "event_ids": event_ids,
- "data": data,
- }
+ # TODO: optimise this to move some of the work to the workers.
+ room_id = receipt["room_id"]
+ receipt_type = receipt["receipt_type"]
+ user_id = receipt["user_id"]
+ event_ids = receipt["event_ids"]
+ data = receipt["data"]
+
+ users = yield self.state.get_current_user_in_room(room_id)
+ remotedomains = set(get_domain_from_id(u) for u in users)
+ remotedomains = remotedomains.copy()
+ remotedomains.discard(self.server_name)
+
+ logger.debug("Sending receipt to: %r", remotedomains)
+
+ for domain in remotedomains:
+ self.federation.send_edu(
+ destination=domain,
+ edu_type="m.receipt",
+ content={
+ room_id: {
+ receipt_type: {
+ user_id: {
+ "event_ids": event_ids,
+ "data": data,
}
- },
+ }
},
- key=(room_id, receipt_type, user_id),
- )
+ },
+ key=(room_id, receipt_type, user_id),
+ )
except Exception:
logger.exception("Error pushing receipts to remote servers")
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 1c045383eb..ffae376915 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -32,9 +32,25 @@ if six.PY3:
logger = logging.getLogger(__name__)
-http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "")
+http_push_processed_counter = Counter(
+ "synapse_http_httppusher_http_pushes_processed",
+ "Number of push notifications successfully sent",
+)
-http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "")
+http_push_failed_counter = Counter(
+ "synapse_http_httppusher_http_pushes_failed",
+ "Number of push notifications which failed",
+)
+
+http_badges_processed_counter = Counter(
+ "synapse_http_httppusher_badge_updates_processed",
+ "Number of badge updates successfully sent",
+)
+
+http_badges_failed_counter = Counter(
+ "synapse_http_httppusher_badge_updates_failed",
+ "Number of badge updates which failed",
+)
class HttpPusher(object):
@@ -81,6 +97,11 @@ class HttpPusher(object):
pusherdict['pushkey'],
)
+ if self.data is None:
+ raise PusherConfigException(
+ "data can not be null for HTTP pusher"
+ )
+
if 'url' not in self.data:
raise PusherConfigException(
"'url' required in data for HTTP pusher"
@@ -350,6 +371,10 @@ class HttpPusher(object):
@defer.inlineCallbacks
def _send_badge(self, badge):
+ """
+ Args:
+ badge (int): number of unread messages
+ """
logger.info("Sending updated badge count %d to %s", badge, self.name)
d = {
'notification': {
@@ -370,14 +395,11 @@ class HttpPusher(object):
}
}
try:
- resp = yield self.http_client.post_json_get_json(self.url, d)
+ yield self.http_client.post_json_get_json(self.url, d)
+ http_badges_processed_counter.inc()
except Exception as e:
logger.warning(
"Failed to send badge count to %s: %s %s",
self.name, type(e), e,
)
- defer.returnValue(False)
- rejected = []
- if 'rejected' in resp:
- rejected = resp['rejected']
- defer.returnValue(rejected)
+ http_badges_failed_counter.inc()
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 368d5094be..b33f2a357b 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -56,7 +56,7 @@ class PusherFactory(object):
f = self.pusher_types.get(kind, None)
if not f:
return None
- logger.info("creating %s pusher for %r", kind, pusherdict)
+ logger.debug("creating %s pusher for %r", kind, pusherdict)
return f(self.hs, pusherdict)
def _create_email_pusher(self, _hs, pusherdict):
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 99f499a60e..abf1a1a9c1 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,6 +19,7 @@ import logging
from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.push import PusherConfigException
from synapse.push.pusher import PusherFactory
logger = logging.getLogger(__name__)
@@ -140,6 +141,10 @@ class PusherPool:
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_id, max_stream_id):
+ if not self.pushers:
+ # nothing to do here.
+ return
+
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
@@ -218,6 +223,15 @@ class PusherPool:
"""
try:
p = self.pusher_factory.create_pusher(pusherdict)
+ except PusherConfigException as e:
+ logger.warning(
+ "Pusher incorrectly configured user=%s, appid=%s, pushkey=%s: %s",
+ pusherdict.get('user_name'),
+ pusherdict.get('app_id'),
+ pusherdict.get('pushkey'),
+ e,
+ )
+ return
except Exception:
logger.exception("Couldn't start a pusher: caught Exception")
return
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 1353a32d00..817d1f67f9 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -59,12 +59,7 @@ class BaseSlavedStore(SQLBaseStore):
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
else:
- try:
- getattr(self, row.cache_func).invalidate(tuple(row.keys))
- except AttributeError:
- # We probably haven't pulled in the cache in this worker,
- # which is fine.
- pass
+ self._attempt_to_invalidate_cache(row.cache_func, tuple(row.keys))
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
txn.call_after(cache_func.invalidate, keys)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 3d895da43c..5a80eef211 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -1342,15 +1342,43 @@ class SQLBaseStore(object):
changed
"""
for member in members_changed:
- self.get_rooms_for_user_with_stream_ordering.invalidate((member,))
+ self._attempt_to_invalidate_cache(
+ "get_rooms_for_user_with_stream_ordering", (member,),
+ )
for host in set(get_domain_from_id(u) for u in members_changed):
- self.is_host_joined.invalidate((room_id, host))
- self.was_host_joined.invalidate((room_id, host))
+ self._attempt_to_invalidate_cache(
+ "is_host_joined", (room_id, host,),
+ )
+ self._attempt_to_invalidate_cache(
+ "was_host_joined", (room_id, host,),
+ )
+
+ self._attempt_to_invalidate_cache(
+ "get_users_in_room", (room_id,),
+ )
+ self._attempt_to_invalidate_cache(
+ "get_room_summary", (room_id,),
+ )
+ self._attempt_to_invalidate_cache(
+ "get_current_state_ids", (room_id,),
+ )
+
+ def _attempt_to_invalidate_cache(self, cache_name, key):
+ """Attempts to invalidate the cache of the given name, ignoring if the
+ cache doesn't exist. Mainly used for invalidating caches on workers,
+ where they may not have the cache.
- self.get_users_in_room.invalidate((room_id,))
- self.get_room_summary.invalidate((room_id,))
- self.get_current_state_ids.invalidate((room_id,))
+ Args:
+ cache_name (str)
+ key (tuple)
+ """
+ try:
+ getattr(self, cache_name).invalidate(key)
+ except AttributeError:
+ # We probably haven't pulled in the cache in this worker,
+ # which is fine.
+ pass
def _send_invalidation_to_replication(self, txn, cache_name, keys):
"""Notifies replication that given cache has been invalidated.
|