From 4ff0201e6235b8b2efc5ce5a7dc3c479ea96df53 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 1 Oct 2020 08:09:18 -0400 Subject: Enable mypy checking for unreachable code and fix instances. (#8432) --- synapse/push/push_rule_evaluator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 709ace01e5..3a68ce636f 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -16,7 +16,7 @@ import logging import re -from typing import Any, Dict, List, Pattern, Union +from typing import Any, Dict, List, Optional, Pattern, Union from synapse.events import EventBase from synapse.types import UserID @@ -181,7 +181,7 @@ class PushRuleEvaluatorForEvent: return r.search(body) - def _get_value(self, dotted_key: str) -> str: + def _get_value(self, dotted_key: str) -> Optional[str]: return self._value_cache.get(dotted_key, None) -- cgit 1.5.1 From 921a3f8a59da0f8fe706a22627f464a74b54c992 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Oct 2020 13:27:51 +0100 Subject: Fix not sending events over federation when using sharded event persisters (#8536) * Fix outbound federaion with multiple event persisters. We incorrectly notified federation senders that the minimum persisted stream position had advanced when we got an `RDATA` from an event persister. Notifying of federation senders already correctly happens in the notifier, so we just delete the offending line. * Change some interfaces to use RoomStreamToken. By enforcing use of `RoomStreamTokens` we make it less likely that people pass in random ints that they got from somewhere random. --- changelog.d/8536.bugfix | 1 + synapse/app/generic_worker.py | 4 ---- synapse/federation/send_queue.py | 2 +- synapse/federation/sender/__init__.py | 9 +++++++-- synapse/handlers/appservice.py | 11 +++++++---- synapse/notifier.py | 6 +++--- synapse/push/emailpusher.py | 8 +++++++- synapse/push/httppusher.py | 8 +++++++- synapse/push/pusherpool.py | 10 ++++++++-- tests/handlers/test_appservice.py | 13 ++++++++++--- 10 files changed, 51 insertions(+), 21 deletions(-) create mode 100644 changelog.d/8536.bugfix (limited to 'synapse/push') diff --git a/changelog.d/8536.bugfix b/changelog.d/8536.bugfix new file mode 100644 index 0000000000..8d238cc008 --- /dev/null +++ b/changelog.d/8536.bugfix @@ -0,0 +1 @@ +Fix not sending events over federation when using sharded event writers. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index d53181deb1..1b511890aa 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -790,10 +790,6 @@ class FederationSenderHandler: send_queue.process_rows_for_federation(self.federation_sender, rows) await self.update_token(token) - # We also need to poke the federation sender when new events happen - elif stream_name == "events": - self.federation_sender.notify_new_events(token) - # ... and when new receipts happen elif stream_name == ReceiptsStream.NAME: await self._on_new_receipts(rows) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 8e46957d15..5f1bf492c1 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -188,7 +188,7 @@ class FederationRemoteSendQueue: for key in keys[:i]: del self.edus[key] - def notify_new_events(self, current_id): + def notify_new_events(self, max_token): """As per FederationSender""" # We don't need to replicate this as it gets sent down a different # stream. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index e33b29a42c..604cfd1935 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -40,7 +40,7 @@ from synapse.metrics import ( events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import ReadReceipt +from synapse.types import ReadReceipt, RoomStreamToken from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) @@ -154,10 +154,15 @@ class FederationSender: self._per_destination_queues[destination] = queue return queue - def notify_new_events(self, current_id: int) -> None: + def notify_new_events(self, max_token: RoomStreamToken) -> None: """This gets called when we have some new events we might want to send out to other servers. """ + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + current_id = max_token.stream + self._last_poked_id = max(current_id, self._last_poked_id) if self._is_processing: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 9d4e87dad6..c8d5e58035 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -27,6 +27,7 @@ from synapse.metrics import ( event_processing_loop_room_count, ) from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import RoomStreamToken from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -47,15 +48,17 @@ class ApplicationServicesHandler: self.current_max = 0 self.is_processing = False - async def notify_interested_services(self, current_id): + async def notify_interested_services(self, max_token: RoomStreamToken): """Notifies (pushes) all application services interested in this event. Pushing is done asynchronously, so this method won't block for any prolonged length of time. - - Args: - current_id(int): The current maximum ID. """ + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + current_id = max_token.stream + services = self.store.get_app_services() if not services or not self.notify_appservices: return diff --git a/synapse/notifier.py b/synapse/notifier.py index 13adeed01e..51c830c91e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -319,19 +319,19 @@ class Notifier: ) if self.federation_sender: - self.federation_sender.notify_new_events(max_room_stream_token.stream) + self.federation_sender.notify_new_events(max_room_stream_token) async def _notify_app_services(self, max_room_stream_token: RoomStreamToken): try: await self.appservice_handler.notify_interested_services( - max_room_stream_token.stream + max_room_stream_token ) except Exception: logger.exception("Error notifying application services of event") async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): try: - await self._pusher_pool.on_new_notifications(max_room_stream_token.stream) + await self._pusher_pool.on_new_notifications(max_room_stream_token) except Exception: logger.exception("Error pusher pool of event") diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 28bd8ab748..c6763971ee 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -18,6 +18,7 @@ import logging from twisted.internet.error import AlreadyCalled, AlreadyCancelled from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import RoomStreamToken logger = logging.getLogger(__name__) @@ -91,7 +92,12 @@ class EmailPusher: pass self.timed_call = None - def on_new_notifications(self, max_stream_ordering): + def on_new_notifications(self, max_token: RoomStreamToken): + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_ordering = max_token.stream + if self.max_stream_ordering: self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 26706bf3e1..793d0db2d9 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -23,6 +23,7 @@ from synapse.api.constants import EventTypes from synapse.logging import opentracing from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import PusherConfigException +from synapse.types import RoomStreamToken from . import push_rule_evaluator, push_tools @@ -114,7 +115,12 @@ class HttpPusher: if should_check_for_notifs: self._start_processing() - def on_new_notifications(self, max_stream_ordering): + def on_new_notifications(self, max_token: RoomStreamToken): + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_ordering = max_token.stream + self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering or 0 ) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 76150e117b..0080c68ce2 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -24,6 +24,7 @@ from synapse.push import PusherConfigException from synapse.push.emailpusher import EmailPusher from synapse.push.httppusher import HttpPusher from synapse.push.pusher import PusherFactory +from synapse.types import RoomStreamToken from synapse.util.async_helpers import concurrently_execute if TYPE_CHECKING: @@ -186,11 +187,16 @@ class PusherPool: ) await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"]) - async def on_new_notifications(self, max_stream_id: int): + async def on_new_notifications(self, max_token: RoomStreamToken): if not self.pushers: # nothing to do here. return + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_id = max_token.stream + if max_stream_id < self._last_room_stream_id_seen: # Nothing to do return @@ -214,7 +220,7 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): - p.on_new_notifications(max_stream_id) + p.on_new_notifications(max_token) except Exception: logger.exception("Exception in pusher on_new_notifications") diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 2a0b7c1b56..ee4f3da31c 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -18,6 +18,7 @@ from mock import Mock from twisted.internet import defer from synapse.handlers.appservice import ApplicationServicesHandler +from synapse.types import RoomStreamToken from tests.test_utils import make_awaitable from tests.utils import MockClock @@ -61,7 +62,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred(self.handler.notify_interested_services(0)) + yield defer.ensureDeferred( + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + ) self.mock_scheduler.submit_event_for_as.assert_called_once_with( interested_service, event ) @@ -80,7 +83,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred(self.handler.notify_interested_services(0)) + yield defer.ensureDeferred( + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + ) self.mock_as_api.query_user.assert_called_once_with(services[0], user_id) @defer.inlineCallbacks @@ -97,7 +102,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred(self.handler.notify_interested_services(0)) + yield defer.ensureDeferred( + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + ) self.assertFalse( self.mock_as_api.query_user.called, "query_user called when it shouldn't have been.", -- cgit 1.5.1 From 3ee17585cd095e590096683395cfb9a017eac15e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 16 Oct 2020 15:51:57 +0100 Subject: Make LruCache register its own metrics (#8561) rather than have everything that instantiates an LruCache manage metrics separately, have LruCache do it itself. --- changelog.d/8561.misc | 1 + synapse/api/auth.py | 4 +-- synapse/push/push_rule_evaluator.py | 4 +-- synapse/util/caches/__init__.py | 13 ++++++---- synapse/util/caches/deferred_cache.py | 43 ++++++++++-------------------- synapse/util/caches/dictionary_cache.py | 9 +------ synapse/util/caches/lrucache.py | 46 +++++++++++++++++++++++++-------- tests/util/test_lrucache.py | 4 +-- 8 files changed, 62 insertions(+), 62 deletions(-) create mode 100644 changelog.d/8561.misc (limited to 'synapse/push') diff --git a/changelog.d/8561.misc b/changelog.d/8561.misc new file mode 100644 index 0000000000..a40dedfa8e --- /dev/null +++ b/changelog.d/8561.misc @@ -0,0 +1 @@ +Move metric registration code down into `LruCache`. diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 1071a0576e..eb6f418b13 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -34,7 +34,6 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase from synapse.logging import opentracing as opentracing from synapse.types import StateMap, UserID -from synapse.util.caches import register_cache from synapse.util.caches.lrucache import LruCache from synapse.util.metrics import Measure @@ -70,8 +69,7 @@ class Auth: self.store = hs.get_datastore() self.state = hs.get_state_handler() - self.token_cache = LruCache(10000) - register_cache("cache", "token_cache", self.token_cache) + self.token_cache = LruCache(10000, "token_cache") self._auth_blocking = AuthBlocking(self.hs) diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 3a68ce636f..4c95b149c5 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -20,7 +20,6 @@ from typing import Any, Dict, List, Optional, Pattern, Union from synapse.events import EventBase from synapse.types import UserID -from synapse.util.caches import register_cache from synapse.util.caches.lrucache import LruCache logger = logging.getLogger(__name__) @@ -186,8 +185,7 @@ class PushRuleEvaluatorForEvent: # Caches (string, is_glob, word_boundary) -> regex for push. See _glob_matches -regex_cache = LruCache(50000) -register_cache("cache", "regex_push_cache", regex_cache) +regex_cache = LruCache(50000, "regex_push_cache") def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool: diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 8fc05be278..89f0b38535 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -16,7 +16,7 @@ import logging from sys import intern -from typing import Callable, Dict, Optional +from typing import Callable, Dict, Optional, Sized import attr from prometheus_client.core import Gauge @@ -92,7 +92,7 @@ class CacheMetric: def register_cache( cache_type: str, cache_name: str, - cache, + cache: Sized, collect_callback: Optional[Callable] = None, resizable: bool = True, resize_callback: Optional[Callable] = None, @@ -100,12 +100,15 @@ def register_cache( """Register a cache object for metric collection and resizing. Args: - cache_type + cache_type: a string indicating the "type" of the cache. This is used + only for deduplication so isn't too important provided it's constant. cache_name: name of the cache - cache: cache itself + cache: cache itself, which must implement __len__(), and may optionally implement + a max_size property collect_callback: If given, a function which is called during metric collection to update additional metrics. - resizable: Whether this cache supports being resized. + resizable: Whether this cache supports being resized, in which case either + resize_callback must be provided, or the cache must support set_max_size(). resize_callback: A function which can be called to resize the cache. Returns: diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index f728cd2cf2..91fdc8142d 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -24,7 +24,6 @@ from prometheus_client import Gauge from twisted.internet import defer from synapse.util.async_helpers import ObservableDeferred -from synapse.util.caches import register_cache from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry @@ -54,10 +53,7 @@ class DeferredCache(Generic[KT, VT]): __slots__ = ( "cache", - "name", - "keylen", "thread", - "metrics", "_pending_deferred_cache", ) @@ -89,37 +85,27 @@ class DeferredCache(Generic[KT, VT]): cache_type() ) # type: MutableMapping[KT, CacheEntry] + def metrics_cb(): + cache_pending_metric.labels(name).set(len(self._pending_deferred_cache)) + # cache is used for completed results and maps to the result itself, rather than # a Deferred. self.cache = LruCache( max_size=max_entries, keylen=keylen, + cache_name=name, cache_type=cache_type, size_callback=(lambda d: len(d)) if iterable else None, - evicted_callback=self._on_evicted, + metrics_collection_callback=metrics_cb, apply_cache_factor_from_config=apply_cache_factor_from_config, ) - self.name = name - self.keylen = keylen self.thread = None # type: Optional[threading.Thread] - self.metrics = register_cache( - "cache", - name, - self.cache, - collect_callback=self._metrics_collection_callback, - ) @property def max_entries(self): return self.cache.max_size - def _on_evicted(self, evicted_count): - self.metrics.inc_evictions(evicted_count) - - def _metrics_collection_callback(self): - cache_pending_metric.labels(self.name).set(len(self._pending_deferred_cache)) - def check_thread(self): expected_thread = self.thread if expected_thread is None: @@ -154,21 +140,18 @@ class DeferredCache(Generic[KT, VT]): if val is not _Sentinel.sentinel: val.callbacks.update(callbacks) if update_metrics: - self.metrics.inc_hits() + m = self.cache.metrics + assert m # we always have a name, so should always have metrics + m.inc_hits() return val.deferred - val = self.cache.get(key, _Sentinel.sentinel, callbacks=callbacks) - if val is not _Sentinel.sentinel: - self.metrics.inc_hits() - return val - - if update_metrics: - self.metrics.inc_misses() - - if default is _Sentinel.sentinel: + val = self.cache.get( + key, default, callbacks=callbacks, update_metrics=update_metrics + ) + if val is _Sentinel.sentinel: raise KeyError() else: - return default + return val def set( self, diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index 8592b93689..8b426c005b 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -19,8 +19,6 @@ from collections import namedtuple from synapse.util.caches.lrucache import LruCache -from . import register_cache - logger = logging.getLogger(__name__) @@ -46,18 +44,16 @@ class DictionaryCache: """ def __init__(self, name, max_entries=1000): - self.cache = LruCache(max_size=max_entries, size_callback=len) + self.cache = LruCache(max_size=max_entries, cache_name=name, size_callback=len) self.name = name self.sequence = 0 self.thread = None - # caches_by_name[name] = self.cache class Sentinel: __slots__ = [] self.sentinel = Sentinel() - self.metrics = register_cache("dictionary", name, self.cache) def check_thread(self): expected_thread = self.thread @@ -82,8 +78,6 @@ class DictionaryCache: """ entry = self.cache.get(key, self.sentinel) if entry is not self.sentinel: - self.metrics.inc_hits() - if dict_keys is None: return DictionaryEntry( entry.full, entry.known_absent, dict(entry.value) @@ -95,7 +89,6 @@ class DictionaryCache: {k: entry.value[k] for k in dict_keys if k in entry.value}, ) - self.metrics.inc_misses() return DictionaryEntry(False, set(), {}) def invalidate(self, key): diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 33eae2b7c4..e4804f79e0 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -18,6 +18,7 @@ from functools import wraps from typing import Callable, Optional, Type, Union from synapse.config import cache as cache_config +from synapse.util.caches import CacheMetric, register_cache from synapse.util.caches.treecache import TreeCache @@ -43,27 +44,29 @@ class _Node: class LruCache: """ - Least-recently-used cache. + Least-recently-used cache, supporting prometheus metrics and invalidation callbacks. + Supports del_multi only if cache_type=TreeCache If cache_type=TreeCache, all keys must be tuples. - - Can also set callbacks on objects when getting/setting which are fired - when that key gets invalidated/evicted. """ def __init__( self, max_size: int, + cache_name: Optional[str] = None, keylen: int = 1, cache_type: Type[Union[dict, TreeCache]] = dict, size_callback: Optional[Callable] = None, - evicted_callback: Optional[Callable] = None, + metrics_collection_callback: Optional[Callable[[], None]] = None, apply_cache_factor_from_config: bool = True, ): """ Args: max_size: The maximum amount of entries the cache can hold + cache_name: The name of this cache, for the prometheus metrics. If unset, + no metrics will be reported on this cache. + keylen: The length of the tuple used as the cache key. Ignored unless cache_type is `TreeCache`. @@ -73,9 +76,13 @@ class LruCache: size_callback (func(V) -> int | None): - evicted_callback (func(int)|None): - if not None, called on eviction with the size of the evicted - entry + metrics_collection_callback: + metrics collection callback. This is called early in the metrics + collection process, before any of the metrics registered with the + prometheus Registry are collected, so can be used to update any dynamic + metrics. + + Ignored if cache_name is None. apply_cache_factor_from_config (bool): If true, `max_size` will be multiplied by a cache factor derived from the homeserver config @@ -94,6 +101,19 @@ class LruCache: else: self.max_size = int(max_size) + if cache_name is not None: + metrics = register_cache( + "lru_cache", + cache_name, + self, + collect_callback=metrics_collection_callback, + ) # type: Optional[CacheMetric] + else: + metrics = None + + # this is exposed for access from outside this class + self.metrics = metrics + list_root = _Node(None, None, None, None) list_root.next_node = list_root list_root.prev_node = list_root @@ -105,8 +125,8 @@ class LruCache: todelete = list_root.prev_node evicted_len = delete_node(todelete) cache.pop(todelete.key, None) - if evicted_callback: - evicted_callback(evicted_len) + if metrics: + metrics.inc_evictions(evicted_len) def synchronized(f): @wraps(f) @@ -169,13 +189,17 @@ class LruCache: return deleted_len @synchronized - def cache_get(key, default=None, callbacks=[]): + def cache_get(key, default=None, callbacks=[], update_metrics=True): node = cache.get(key, None) if node is not None: move_node_to_front(node) node.callbacks.update(callbacks) + if update_metrics and metrics: + metrics.inc_hits() return node.value else: + if update_metrics and metrics: + metrics.inc_misses() return default @synchronized diff --git a/tests/util/test_lrucache.py b/tests/util/test_lrucache.py index 0adb2174af..f12834edab 100644 --- a/tests/util/test_lrucache.py +++ b/tests/util/test_lrucache.py @@ -59,7 +59,7 @@ class LruCacheTestCase(unittest.HomeserverTestCase): self.assertEquals(cache.pop("key"), None) def test_del_multi(self): - cache = LruCache(4, 2, cache_type=TreeCache) + cache = LruCache(4, keylen=2, cache_type=TreeCache) cache[("animal", "cat")] = "mew" cache[("animal", "dog")] = "woof" cache[("vehicles", "car")] = "vroom" @@ -160,7 +160,7 @@ class LruCacheCallbacksTestCase(unittest.HomeserverTestCase): m2 = Mock() m3 = Mock() m4 = Mock() - cache = LruCache(4, 2, cache_type=TreeCache) + cache = LruCache(4, keylen=2, cache_type=TreeCache) cache.set(("a", "1"), "value", callbacks=[m1]) cache.set(("a", "2"), "value", callbacks=[m2]) -- cgit 1.5.1 From 0ec0bc3886bd72bdf2f64d455a7d777f4573a4f1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 16 Oct 2020 15:56:39 +0100 Subject: type annotations for LruCache --- synapse/api/auth.py | 4 +- synapse/push/push_rule_evaluator.py | 16 ++++---- synapse/util/caches/deferred_cache.py | 5 ++- synapse/util/caches/dictionary_cache.py | 22 ++++++---- synapse/util/caches/lrucache.py | 73 +++++++++++++++++++++++++++------ 5 files changed, 89 insertions(+), 31 deletions(-) (limited to 'synapse/push') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index eb6f418b13..bff87fabde 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -69,7 +69,9 @@ class Auth: self.store = hs.get_datastore() self.state = hs.get_state_handler() - self.token_cache = LruCache(10000, "token_cache") + self.token_cache = LruCache( + 10000, "token_cache" + ) # type: LruCache[str, Tuple[str, bool]] self._auth_blocking = AuthBlocking(self.hs) diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 4c95b149c5..854ffd625e 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -16,7 +16,7 @@ import logging import re -from typing import Any, Dict, List, Optional, Pattern, Union +from typing import Any, Dict, List, Optional, Pattern, Tuple, Union from synapse.events import EventBase from synapse.types import UserID @@ -173,19 +173,21 @@ class PushRuleEvaluatorForEvent: # Similar to _glob_matches, but do not treat display_name as a glob. r = regex_cache.get((display_name, False, True), None) if not r: - r = re.escape(display_name) - r = _re_word_boundary(r) - r = re.compile(r, flags=re.IGNORECASE) + r1 = re.escape(display_name) + r1 = _re_word_boundary(r1) + r = re.compile(r1, flags=re.IGNORECASE) regex_cache[(display_name, False, True)] = r - return r.search(body) + return bool(r.search(body)) def _get_value(self, dotted_key: str) -> Optional[str]: return self._value_cache.get(dotted_key, None) # Caches (string, is_glob, word_boundary) -> regex for push. See _glob_matches -regex_cache = LruCache(50000, "regex_push_cache") +regex_cache = LruCache( + 50000, "regex_push_cache" +) # type: LruCache[Tuple[str, bool, bool],Pattern] def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool: @@ -203,7 +205,7 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool: if not r: r = _glob_to_re(glob, word_boundary) regex_cache[(glob, True, word_boundary)] = r - return r.search(value) + return bool(r.search(value)) except re.error: logger.warning("Failed to parse glob to regex: %r", glob) return False diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index 91fdc8142d..4026e1f8fa 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -98,7 +98,7 @@ class DeferredCache(Generic[KT, VT]): size_callback=(lambda d: len(d)) if iterable else None, metrics_collection_callback=metrics_cb, apply_cache_factor_from_config=apply_cache_factor_from_config, - ) + ) # type: LruCache[KT, VT] self.thread = None # type: Optional[threading.Thread] @@ -240,11 +240,12 @@ class DeferredCache(Generic[KT, VT]): self.check_thread() if not isinstance(key, tuple): raise TypeError("The cache key must be a tuple not %r" % (type(key),)) + key = cast(KT, key) self.cache.del_multi(key) # if we have a pending lookup for this key, remove it from the # _pending_deferred_cache, as above - entry_dict = self._pending_deferred_cache.pop(cast(KT, key), None) + entry_dict = self._pending_deferred_cache.pop(key, None) if entry_dict is not None: for entry in iterate_tree_cache_entry(entry_dict): entry.invalidate() diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index 8b426c005b..588d2d49f2 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -12,10 +12,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import enum import logging import threading from collections import namedtuple +from typing import Any from synapse.util.caches.lrucache import LruCache @@ -38,23 +39,26 @@ class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "known_absent", "va return len(self.value) +class _Sentinel(enum.Enum): + # defining a sentinel in this way allows mypy to correctly handle the + # type of a dictionary lookup. + sentinel = object() + + class DictionaryCache: """Caches key -> dictionary lookups, supporting caching partial dicts, i.e. fetching a subset of dictionary keys for a particular key. """ def __init__(self, name, max_entries=1000): - self.cache = LruCache(max_size=max_entries, cache_name=name, size_callback=len) + self.cache = LruCache( + max_size=max_entries, cache_name=name, size_callback=len + ) # type: LruCache[Any, DictionaryEntry] self.name = name self.sequence = 0 self.thread = None - class Sentinel: - __slots__ = [] - - self.sentinel = Sentinel() - def check_thread(self): expected_thread = self.thread if expected_thread is None: @@ -76,8 +80,8 @@ class DictionaryCache: Returns: DictionaryEntry """ - entry = self.cache.get(key, self.sentinel) - if entry is not self.sentinel: + entry = self.cache.get(key, _Sentinel.sentinel) + if entry is not _Sentinel.sentinel: if dict_keys is None: return DictionaryEntry( entry.full, entry.known_absent, dict(entry.value) diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index e4804f79e0..0eed53d3f4 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -15,12 +15,30 @@ import threading from functools import wraps -from typing import Callable, Optional, Type, Union +from typing import ( + Any, + Callable, + Generic, + Iterable, + Optional, + Type, + TypeVar, + Union, + cast, + overload, +) + +from typing_extensions import Literal from synapse.config import cache as cache_config from synapse.util.caches import CacheMetric, register_cache from synapse.util.caches.treecache import TreeCache +T = TypeVar("T") +FT = TypeVar("FT", bound=Callable[..., Any]) +KT = TypeVar("KT") +VT = TypeVar("VT") + def enumerate_leaves(node, depth): if depth == 0: @@ -42,7 +60,7 @@ class _Node: self.callbacks = callbacks -class LruCache: +class LruCache(Generic[KT, VT]): """ Least-recently-used cache, supporting prometheus metrics and invalidation callbacks. @@ -128,13 +146,13 @@ class LruCache: if metrics: metrics.inc_evictions(evicted_len) - def synchronized(f): + def synchronized(f: FT) -> FT: @wraps(f) def inner(*args, **kwargs): with lock: return f(*args, **kwargs) - return inner + return cast(FT, inner) cached_cache_len = [0] if size_callback is not None: @@ -188,8 +206,31 @@ class LruCache: node.callbacks.clear() return deleted_len + @overload + def cache_get( + key: KT, + default: Literal[None] = None, + callbacks: Iterable[Callable[[], None]] = ..., + update_metrics: bool = ..., + ) -> Optional[VT]: + ... + + @overload + def cache_get( + key: KT, + default: T, + callbacks: Iterable[Callable[[], None]] = ..., + update_metrics: bool = ..., + ) -> Union[T, VT]: + ... + @synchronized - def cache_get(key, default=None, callbacks=[], update_metrics=True): + def cache_get( + key: KT, + default=None, + callbacks: Iterable[Callable[[], None]] = [], + update_metrics: bool = True, + ): node = cache.get(key, None) if node is not None: move_node_to_front(node) @@ -203,7 +244,7 @@ class LruCache: return default @synchronized - def cache_set(key, value, callbacks=[]): + def cache_set(key: KT, value: VT, callbacks: Iterable[Callable[[], None]] = []): node = cache.get(key, None) if node is not None: # We sometimes store large objects, e.g. dicts, which cause @@ -232,7 +273,7 @@ class LruCache: evict() @synchronized - def cache_set_default(key, value): + def cache_set_default(key: KT, value: VT) -> VT: node = cache.get(key, None) if node is not None: return node.value @@ -241,8 +282,16 @@ class LruCache: evict() return value + @overload + def cache_pop(key: KT, default: Literal[None] = None) -> Union[None, VT]: + ... + + @overload + def cache_pop(key: KT, default: T) -> Union[T, VT]: + ... + @synchronized - def cache_pop(key, default=None): + def cache_pop(key: KT, default=None): node = cache.get(key, None) if node: delete_node(node) @@ -252,18 +301,18 @@ class LruCache: return default @synchronized - def cache_del_multi(key): + def cache_del_multi(key: KT) -> None: """ This will only work if constructed with cache_type=TreeCache """ popped = cache.pop(key) if popped is None: return - for leaf in enumerate_leaves(popped, keylen - len(key)): + for leaf in enumerate_leaves(popped, keylen - len(cast(tuple, key))): delete_node(leaf) @synchronized - def cache_clear(): + def cache_clear() -> None: list_root.next_node = list_root list_root.prev_node = list_root for node in cache.values(): @@ -274,7 +323,7 @@ class LruCache: cached_cache_len[0] = 0 @synchronized - def cache_contains(key): + def cache_contains(key: KT) -> bool: return key in cache self.sentinel = object() -- cgit 1.5.1 From 995cc615a01bb11b70dbf8fdd0eb7f8b3d1fdc1e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 16 Oct 2020 16:14:42 +0100 Subject: Apply suggestions from code review Co-authored-by: Patrick Cloke --- synapse/push/push_rule_evaluator.py | 2 +- synapse/util/caches/lrucache.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 854ffd625e..2ce9e444ab 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -187,7 +187,7 @@ class PushRuleEvaluatorForEvent: # Caches (string, is_glob, word_boundary) -> regex for push. See _glob_matches regex_cache = LruCache( 50000, "regex_push_cache" -) # type: LruCache[Tuple[str, bool, bool],Pattern] +) # type: LruCache[Tuple[str, bool, bool], Pattern] def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool: diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 0eed53d3f4..1a2c2d4c0b 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -283,7 +283,7 @@ class LruCache(Generic[KT, VT]): return value @overload - def cache_pop(key: KT, default: Literal[None] = None) -> Union[None, VT]: + def cache_pop(key: KT, default: Literal[None] = None) -> Optional[VT]: ... @overload -- cgit 1.5.1 From c356b4bf422430cd5769c9bf90756fca2efd8451 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 19 Oct 2020 09:12:39 -0400 Subject: Include a simple message in email notifications that include encrypted content (#8545) --- changelog.d/8545.bugfix | 1 + synapse/push/mailer.py | 16 ++++++---- synapse/res/templates/notif.html | 56 +++++++++++++++++++---------------- synapse/res/templates/notif.txt | 24 +++++++++------ synapse/res/templates/notif_mail.html | 26 ++++++++-------- synapse/res/templates/notif_mail.txt | 6 ++-- synapse/res/templates/room.html | 26 ++++++++-------- synapse/res/templates/room.txt | 12 ++++---- tests/push/test_email.py | 15 +++++++++- 9 files changed, 107 insertions(+), 75 deletions(-) create mode 100644 changelog.d/8545.bugfix (limited to 'synapse/push') diff --git a/changelog.d/8545.bugfix b/changelog.d/8545.bugfix new file mode 100644 index 0000000000..64ba307df0 --- /dev/null +++ b/changelog.d/8545.bugfix @@ -0,0 +1 @@ +Fix a long standing bug where email notifications for encrypted messages were blank. diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 455a1acb46..155791b754 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -387,8 +387,8 @@ class Mailer: return ret async def get_message_vars(self, notif, event, room_state_ids): - if event.type != EventTypes.Message: - return + if event.type != EventTypes.Message and event.type != EventTypes.Encrypted: + return None sender_state_event_id = room_state_ids[("m.room.member", event.sender)] sender_state_event = await self.store.get_event(sender_state_event_id) @@ -399,10 +399,8 @@ class Mailer: # sender_hash % the number of default images to choose from sender_hash = string_ordinal_total(event.sender) - msgtype = event.content.get("msgtype") - ret = { - "msgtype": msgtype, + "event_type": event.type, "is_historical": event.event_id != notif["event_id"], "id": event.event_id, "ts": event.origin_server_ts, @@ -411,6 +409,14 @@ class Mailer: "sender_hash": sender_hash, } + # Encrypted messages don't have any additional useful information. + if event.type == EventTypes.Encrypted: + return ret + + msgtype = event.content.get("msgtype") + + ret["msgtype"] = msgtype + if msgtype == "m.text": self.add_text_message_vars(ret, event) elif msgtype == "m.image": diff --git a/synapse/res/templates/notif.html b/synapse/res/templates/notif.html index 1a6c70b562..6d76064d13 100644 --- a/synapse/res/templates/notif.html +++ b/synapse/res/templates/notif.html @@ -1,41 +1,47 @@ -{% for message in notif.messages %} +{%- for message in notif.messages %} - {% if loop.index0 == 0 or notif.messages[loop.index0 - 1].sender_name != notif.messages[loop.index0].sender_name %} - {% if message.sender_avatar_url %} + {%- if loop.index0 == 0 or notif.messages[loop.index0 - 1].sender_name != notif.messages[loop.index0].sender_name %} + {%- if message.sender_avatar_url %} - {% else %} - {% if message.sender_hash % 3 == 0 %} + {%- else %} + {%- if message.sender_hash % 3 == 0 %} - {% elif message.sender_hash % 3 == 1 %} + {%- elif message.sender_hash % 3 == 1 %} - {% else %} + {%- else %} - {% endif %} - {% endif %} - {% endif %} + {%- endif %} + {%- endif %} + {%- endif %} - {% if loop.index0 == 0 or notif.messages[loop.index0 - 1].sender_name != notif.messages[loop.index0].sender_name %} -
{% if message.msgtype == "m.emote" %}*{% endif %} {{ message.sender_name }}
- {% endif %} + {%- if loop.index0 == 0 or notif.messages[loop.index0 - 1].sender_name != notif.messages[loop.index0].sender_name %} +
{%- if message.msgtype == "m.emote" %}*{%- endif %} {{ message.sender_name }}
+ {%- endif %}
- {% if message.msgtype == "m.text" %} - {{ message.body_text_html }} - {% elif message.msgtype == "m.emote" %} - {{ message.body_text_html }} - {% elif message.msgtype == "m.notice" %} - {{ message.body_text_html }} - {% elif message.msgtype == "m.image" %} - - {% elif message.msgtype == "m.file" %} - {{ message.body_text_plain }} - {% endif %} + {%- if message.event_type == "m.room.encrypted" %} + An encrypted message. + {%- elif message.event_type == "m.room.message" %} + {%- if message.msgtype == "m.text" %} + {{ message.body_text_html }} + {%- elif message.msgtype == "m.emote" %} + {{ message.body_text_html }} + {%- elif message.msgtype == "m.notice" %} + {{ message.body_text_html }} + {%- elif message.msgtype == "m.image" %} + + {%- elif message.msgtype == "m.file" %} + {{ message.body_text_plain }} + {%- else %} + A message with unrecognised content. + {%- endif %} + {%- endif %}
{{ message.ts|format_ts("%H:%M") }} -{% endfor %} +{%- endfor %} diff --git a/synapse/res/templates/notif.txt b/synapse/res/templates/notif.txt index a37bee9833..1ee7da3c50 100644 --- a/synapse/res/templates/notif.txt +++ b/synapse/res/templates/notif.txt @@ -1,16 +1,22 @@ -{% for message in notif.messages %} -{% if message.msgtype == "m.emote" %}* {% endif %}{{ message.sender_name }} ({{ message.ts|format_ts("%H:%M") }}) -{% if message.msgtype == "m.text" %} +{%- for message in notif.messages %} +{%- if message.event_type == "m.room.encrypted" %} +An encrypted message. +{%- elif message.event_type == "m.room.message" %} +{%- if message.msgtype == "m.emote" %}* {%- endif %}{{ message.sender_name }} ({{ message.ts|format_ts("%H:%M") }}) +{%- if message.msgtype == "m.text" %} {{ message.body_text_plain }} -{% elif message.msgtype == "m.emote" %} +{%- elif message.msgtype == "m.emote" %} {{ message.body_text_plain }} -{% elif message.msgtype == "m.notice" %} +{%- elif message.msgtype == "m.notice" %} {{ message.body_text_plain }} -{% elif message.msgtype == "m.image" %} +{%- elif message.msgtype == "m.image" %} {{ message.body_text_plain }} -{% elif message.msgtype == "m.file" %} +{%- elif message.msgtype == "m.file" %} {{ message.body_text_plain }} -{% endif %} -{% endfor %} +{%- else %} +A message with unrecognised content. +{%- endif %} +{%- endif %} +{%- endfor %} View {{ room.title }} at {{ notif.link }} diff --git a/synapse/res/templates/notif_mail.html b/synapse/res/templates/notif_mail.html index a2dfeb9e9f..27d4182790 100644 --- a/synapse/res/templates/notif_mail.html +++ b/synapse/res/templates/notif_mail.html @@ -2,8 +2,8 @@ @@ -18,21 +18,21 @@
{{ summary_text }}
- {% if app_name == "Riot" %} + {%- if app_name == "Riot" %} [Riot] - {% elif app_name == "Vector" %} + {%- elif app_name == "Vector" %} [Vector] - {% elif app_name == "Element" %} + {%- elif app_name == "Element" %} [Element] - {% else %} + {%- else %} [matrix] - {% endif %} + {%- endif %} - {% for room in rooms %} - {% include 'room.html' with context %} - {% endfor %} + {%- for room in rooms %} + {%- include 'room.html' with context %} + {%- endfor %} diff --git a/synapse/res/templates/notif_mail.txt b/synapse/res/templates/notif_mail.txt index 24843042a5..df3c253979 100644 --- a/synapse/res/templates/notif_mail.txt +++ b/synapse/res/templates/notif_mail.txt @@ -2,9 +2,9 @@ Hi {{ user_display_name }}, {{ summary_text }} -{% for room in rooms %} -{% include 'room.txt' with context %} -{% endfor %} +{%- for room in rooms %} +{%- include 'room.txt' with context %} +{%- endfor %} You can disable these notifications at {{ unsubscribe_link }} diff --git a/synapse/res/templates/room.html b/synapse/res/templates/room.html index b8525fef88..4fc6f6ac9b 100644 --- a/synapse/res/templates/room.html +++ b/synapse/res/templates/room.html @@ -1,23 +1,23 @@ - {% if room.invite %} + {%- if room.invite %} - {% else %} - {% for notif in room.notifs %} - {% include 'notif.html' with context %} - {% endfor %} - {% endif %} + {%- else %} + {%- for notif in room.notifs %} + {%- include 'notif.html' with context %} + {%- endfor %} + {%- endif %}
- {% if room.avatar_url %} + {%- if room.avatar_url %} - {% else %} - {% if room.hash % 3 == 0 %} + {%- else %} + {%- if room.hash % 3 == 0 %} - {% elif room.hash % 3 == 1 %} + {%- elif room.hash % 3 == 1 %} - {% else %} + {%- else %} - {% endif %} - {% endif %} + {%- endif %} + {%- endif %} {{ room.title }}
@@ -25,9 +25,9 @@
diff --git a/synapse/res/templates/room.txt b/synapse/res/templates/room.txt index 84648c710e..df841e9e6f 100644 --- a/synapse/res/templates/room.txt +++ b/synapse/res/templates/room.txt @@ -1,9 +1,9 @@ {{ room.title }} -{% if room.invite %} +{%- if room.invite %} You've been invited, join at {{ room.link }} -{% else %} - {% for notif in room.notifs %} - {% include 'notif.txt' with context %} - {% endfor %} -{% endif %} +{%- else %} + {%- for notif in room.notifs %} + {%- include 'notif.txt' with context %} + {%- endfor %} +{%- endif %} diff --git a/tests/push/test_email.py b/tests/push/test_email.py index 3224568640..55545d9341 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -158,8 +158,21 @@ class EmailPusherTests(HomeserverTestCase): # We should get emailed about those messages self._check_for_mail() + def test_encrypted_message(self): + room = self.helper.create_room_as(self.user_id, tok=self.access_token) + self.helper.invite( + room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id + ) + self.helper.join(room=room, user=self.others[0].id, tok=self.others[0].token) + + # The other user sends some messages + self.helper.send_event(room, "m.room.encrypted", {}, tok=self.others[0].token) + + # We should get emailed about that message + self._check_for_mail() + def _check_for_mail(self): - "Check that the user receives an email notification" + """Check that the user receives an email notification""" # Get the stream ordering before it gets sent pushers = self.get_success( -- cgit 1.5.1 From 903d11c43a5df9f704e5dad4d14506a6470524fc Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 19 Oct 2020 15:00:12 +0100 Subject: Add `DeferredCache.get_immediate` method (#8568) * Add `DeferredCache.get_immediate` method A bunch of things that are currently calling `DeferredCache.get` are only really interested in the result if it's completed. We can optimise and simplify this case. * Remove unused 'default' parameter to DeferredCache.get() * another get_immediate instance --- changelog.d/8568.misc | 1 + synapse/push/bulk_push_rule_evaluator.py | 2 +- synapse/storage/databases/main/pusher.py | 2 +- synapse/storage/databases/main/receipts.py | 11 +-------- synapse/storage/databases/main/roommember.py | 2 +- synapse/util/caches/deferred_cache.py | 35 ++++++++++++++++++++-------- tests/util/caches/test_deferred_cache.py | 27 +++++++++++++++++---- 7 files changed, 53 insertions(+), 27 deletions(-) create mode 100644 changelog.d/8568.misc (limited to 'synapse/push') diff --git a/changelog.d/8568.misc b/changelog.d/8568.misc new file mode 100644 index 0000000000..0ed7db92d3 --- /dev/null +++ b/changelog.d/8568.misc @@ -0,0 +1 @@ +Add `get_immediate` method to `DeferredCache`. diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index c440f2545c..a701defcdd 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -496,6 +496,6 @@ class _Invalidation(namedtuple("_Invalidation", ("cache", "room_id"))): # dedupe when we add callbacks to lru cache nodes, otherwise the number # of callbacks would grow. def __call__(self): - rules = self.cache.get(self.room_id, None, update_metrics=False) + rules = self.cache.get_immediate(self.room_id, None, update_metrics=False) if rules: rules.invalidate_all() diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index df8609b97b..7997242d90 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -303,7 +303,7 @@ class PusherStore(PusherWorkerStore): lock=False, ) - user_has_pusher = self.get_if_user_has_pusher.cache.get( + user_has_pusher = self.get_if_user_has_pusher.cache.get_immediate( (user_id,), None, update_metrics=False ) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 5cdf16521c..ca7917c989 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -25,7 +25,6 @@ from synapse.storage.database import DatabasePool from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import JsonDict from synapse.util import json_encoder -from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -413,18 +412,10 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta): if receipt_type != "m.read": return - # Returns either an ObservableDeferred or the raw result - res = self.get_users_with_read_receipts_in_room.cache.get( + res = self.get_users_with_read_receipts_in_room.cache.get_immediate( room_id, None, update_metrics=False ) - # first handle the ObservableDeferred case - if isinstance(res, ObservableDeferred): - if res.has_called(): - res = res.get_result() - else: - res = None - if res and user_id in res: # We'd only be adding to the set, so no point invalidating if the # user is already there diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 20fcdaa529..9b08b49862 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -531,7 +531,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): # If we do then we can reuse that result and simply update it with # any membership changes in `delta_ids` if context.prev_group and context.delta_ids: - prev_res = self._get_joined_users_from_context.cache.get( + prev_res = self._get_joined_users_from_context.cache.get_immediate( (room_id, context.prev_group), None ) if prev_res and isinstance(prev_res, dict): diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index 4026e1f8fa..faeef75506 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -17,7 +17,16 @@ import enum import threading -from typing import Callable, Generic, Iterable, MutableMapping, Optional, TypeVar, cast +from typing import ( + Callable, + Generic, + Iterable, + MutableMapping, + Optional, + TypeVar, + Union, + cast, +) from prometheus_client import Gauge @@ -33,7 +42,7 @@ cache_pending_metric = Gauge( ["name"], ) - +T = TypeVar("T") KT = TypeVar("KT") VT = TypeVar("VT") @@ -119,21 +128,21 @@ class DeferredCache(Generic[KT, VT]): def get( self, key: KT, - default=_Sentinel.sentinel, callback: Optional[Callable[[], None]] = None, update_metrics: bool = True, - ): + ) -> Union[ObservableDeferred, VT]: """Looks the key up in the caches. Args: key(tuple) - default: What is returned if key is not in the caches. If not - specified then function throws KeyError instead callback(fn): Gets called when the entry in the cache is invalidated update_metrics (bool): whether to update the cache hit rate metrics Returns: Either an ObservableDeferred or the result itself + + Raises: + KeyError if the key is not found in the cache """ callbacks = [callback] if callback else [] val = self._pending_deferred_cache.get(key, _Sentinel.sentinel) @@ -145,13 +154,19 @@ class DeferredCache(Generic[KT, VT]): m.inc_hits() return val.deferred - val = self.cache.get( - key, default, callbacks=callbacks, update_metrics=update_metrics + val2 = self.cache.get( + key, _Sentinel.sentinel, callbacks=callbacks, update_metrics=update_metrics ) - if val is _Sentinel.sentinel: + if val2 is _Sentinel.sentinel: raise KeyError() else: - return val + return val2 + + def get_immediate( + self, key: KT, default: T, update_metrics: bool = True + ) -> Union[VT, T]: + """If we have a *completed* cached value, return it.""" + return self.cache.get(key, default, update_metrics=update_metrics) def set( self, diff --git a/tests/util/caches/test_deferred_cache.py b/tests/util/caches/test_deferred_cache.py index 9717be56b6..8a08ab6661 100644 --- a/tests/util/caches/test_deferred_cache.py +++ b/tests/util/caches/test_deferred_cache.py @@ -38,6 +38,22 @@ class DeferredCacheTestCase(unittest.TestCase): self.assertEquals(cache.get("foo"), 123) + def test_get_immediate(self): + cache = DeferredCache("test") + d1 = defer.Deferred() + cache.set("key1", d1) + + # get_immediate should return default + v = cache.get_immediate("key1", 1) + self.assertEqual(v, 1) + + # now complete the set + d1.callback(2) + + # get_immediate should return result + v = cache.get_immediate("key1", 1) + self.assertEqual(v, 2) + def test_invalidate(self): cache = DeferredCache("test") cache.prefill(("foo",), 123) @@ -80,9 +96,11 @@ class DeferredCacheTestCase(unittest.TestCase): # now do the invalidation cache.invalidate_all() - # lookup should return none - self.assertIsNone(cache.get("key1", None)) - self.assertIsNone(cache.get("key2", None)) + # lookup should fail + with self.assertRaises(KeyError): + cache.get("key1") + with self.assertRaises(KeyError): + cache.get("key2") # both callbacks should have been callbacked self.assertTrue(callback_record[0], "Invalidation callback for key1 not called") @@ -90,7 +108,8 @@ class DeferredCacheTestCase(unittest.TestCase): # letting the other lookup complete should do nothing d1.callback("result1") - self.assertIsNone(cache.get("key1", None)) + with self.assertRaises(KeyError): + cache.get("key1", None) def test_eviction(self): cache = DeferredCache( -- cgit 1.5.1 From db9ef792f03aafa9f15d796a4004b0e0e551646d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 23 Oct 2020 10:41:32 +0100 Subject: Fix email notifications for invites without local state. (#8627) This can happen if e.g. the room invited into is no longer on the server (or if all users left the room). --- changelog.d/8627.bugfix | 1 + synapse/push/mailer.py | 41 ++++++++++++++++++++++++++++------------- tests/push/test_email.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 13 deletions(-) create mode 100644 changelog.d/8627.bugfix (limited to 'synapse/push') diff --git a/changelog.d/8627.bugfix b/changelog.d/8627.bugfix new file mode 100644 index 0000000000..143cf95f92 --- /dev/null +++ b/changelog.d/8627.bugfix @@ -0,0 +1 @@ +Fix email notifications for invites without local state. diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 155791b754..38195c8eea 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -24,7 +24,7 @@ from typing import Iterable, List, TypeVar import bleach import jinja2 -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import StoreError from synapse.config.emailconfig import EmailSubjectConfig from synapse.logging.context import make_deferred_yieldable @@ -317,9 +317,14 @@ class Mailer: async def get_room_vars( self, room_id, user_id, notifs, notif_events, room_state_ids ): - my_member_event_id = room_state_ids[("m.room.member", user_id)] - my_member_event = await self.store.get_event(my_member_event_id) - is_invite = my_member_event.content["membership"] == "invite" + # Check if one of the notifs is an invite event for the user. + is_invite = False + for n in notifs: + ev = notif_events[n["event_id"]] + if ev.type == EventTypes.Member and ev.state_key == user_id: + if ev.content.get("membership") == Membership.INVITE: + is_invite = True + break room_name = await calculate_room_name(self.store, room_state_ids, user_id) @@ -461,16 +466,26 @@ class Mailer: self.store, room_state_ids[room_id], user_id, fallback_to_members=False ) - my_member_event_id = room_state_ids[room_id][("m.room.member", user_id)] - my_member_event = await self.store.get_event(my_member_event_id) - if my_member_event.content["membership"] == "invite": - inviter_member_event_id = room_state_ids[room_id][ - ("m.room.member", my_member_event.sender) - ] - inviter_member_event = await self.store.get_event( - inviter_member_event_id + # See if one of the notifs is an invite event for the user + invite_event = None + for n in notifs_by_room[room_id]: + ev = notif_events[n["event_id"]] + if ev.type == EventTypes.Member and ev.state_key == user_id: + if ev.content.get("membership") == Membership.INVITE: + invite_event = ev + break + + if invite_event: + inviter_member_event_id = room_state_ids[room_id].get( + ("m.room.member", invite_event.sender) ) - inviter_name = name_from_member_event(inviter_member_event) + inviter_name = invite_event.sender + if inviter_member_event_id: + inviter_member_event = await self.store.get_event( + inviter_member_event_id, allow_none=True + ) + if inviter_member_event: + inviter_name = name_from_member_event(inviter_member_event) if room_name is None: return self.email_subjects.invite_from_person % { diff --git a/tests/push/test_email.py b/tests/push/test_email.py index 55545d9341..d9993e6245 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -131,6 +131,35 @@ class EmailPusherTests(HomeserverTestCase): # We should get emailed about that message self._check_for_mail() + def test_invite_sends_email(self): + # Create a room and invite the user to it + room = self.helper.create_room_as(self.others[0].id, tok=self.others[0].token) + self.helper.invite( + room=room, + src=self.others[0].id, + tok=self.others[0].token, + targ=self.user_id, + ) + + # We should get emailed about the invite + self._check_for_mail() + + def test_invite_to_empty_room_sends_email(self): + # Create a room and invite the user to it + room = self.helper.create_room_as(self.others[0].id, tok=self.others[0].token) + self.helper.invite( + room=room, + src=self.others[0].id, + tok=self.others[0].token, + targ=self.user_id, + ) + + # Then have the original user leave + self.helper.leave(room, self.others[0].id, tok=self.others[0].token) + + # We should get emailed about the invite + self._check_for_mail() + def test_multiple_members_email(self): # We want to test multiple notifications, so we pause processing of push # while we send messages. -- cgit 1.5.1 From 34a5696f9338f1a1ec52203e3871a797a02138a9 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 23 Oct 2020 12:38:40 -0400 Subject: Fix typos and spelling errors. (#8639) --- changelog.d/8639.misc | 1 + docs/sample_config.yaml | 6 +++--- docs/sample_log_config.yaml | 2 +- synapse/config/jwt_config.py | 2 +- synapse/config/logger.py | 2 +- synapse/config/registration.py | 2 +- synapse/config/room_directory.py | 2 +- synapse/config/tracer.py | 2 +- synapse/crypto/context_factory.py | 2 +- synapse/events/__init__.py | 2 +- synapse/events/utils.py | 2 +- synapse/groups/attestations.py | 2 +- synapse/groups/groups_server.py | 4 ++-- synapse/handlers/admin.py | 4 ++-- synapse/handlers/auth.py | 2 +- synapse/handlers/federation.py | 14 +++++++------- synapse/handlers/groups_local.py | 4 ++-- synapse/handlers/message.py | 2 +- synapse/handlers/oidc_handler.py | 6 +++--- synapse/handlers/presence.py | 4 ++-- synapse/handlers/profile.py | 2 +- synapse/handlers/room.py | 2 +- synapse/handlers/search.py | 2 +- synapse/handlers/state_deltas.py | 2 +- synapse/handlers/sync.py | 4 ++-- synapse/handlers/typing.py | 2 +- synapse/handlers/user_directory.py | 2 +- synapse/http/federation/well_known_resolver.py | 2 +- synapse/http/matrixfederationclient.py | 6 +++--- synapse/http/request_metrics.py | 2 +- synapse/http/server.py | 6 +++--- synapse/http/site.py | 4 +++- synapse/metrics/background_process_metrics.py | 2 +- synapse/notifier.py | 2 +- synapse/push/baserules.py | 2 +- synapse/push/bulk_push_rule_evaluator.py | 4 ++-- synapse/server_notices/consent_server_notices.py | 2 +- synapse/state/__init__.py | 2 +- synapse/state/v1.py | 2 +- synapse/state/v2.py | 2 +- synapse/static/client/login/js/login.js | 2 +- 41 files changed, 63 insertions(+), 60 deletions(-) create mode 100644 changelog.d/8639.misc (limited to 'synapse/push') diff --git a/changelog.d/8639.misc b/changelog.d/8639.misc new file mode 100644 index 0000000000..20a213df39 --- /dev/null +++ b/changelog.d/8639.misc @@ -0,0 +1 @@ +Fix typos and spelling errors in the code. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 061226ea6f..07f1628568 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -1886,7 +1886,7 @@ sso: # and issued at ("iat") claims are validated if present. # # Note that this is a non-standard login type and client support is -# expected to be non-existant. +# expected to be non-existent. # # See https://github.com/matrix-org/synapse/blob/master/docs/jwt.md. # @@ -2402,7 +2402,7 @@ spam_checker: # # Options for the rules include: # -# user_id: Matches agaisnt the creator of the alias +# user_id: Matches against the creator of the alias # room_id: Matches against the room ID being published # alias: Matches against any current local or canonical aliases # associated with the room @@ -2448,7 +2448,7 @@ opentracing: # This is a list of regexes which are matched against the server_name of the # homeserver. # - # By defult, it is empty, so no servers are matched. + # By default, it is empty, so no servers are matched. # #homeserver_whitelist: # - ".*" diff --git a/docs/sample_log_config.yaml b/docs/sample_log_config.yaml index 55a48a9ed6..e26657f9fe 100644 --- a/docs/sample_log_config.yaml +++ b/docs/sample_log_config.yaml @@ -59,7 +59,7 @@ root: # then write them to a file. # # Replace "buffer" with "console" to log to stderr instead. (Note that you'll - # also need to update the configuation for the `twisted` logger above, in + # also need to update the configuration for the `twisted` logger above, in # this case.) # handlers: [buffer] diff --git a/synapse/config/jwt_config.py b/synapse/config/jwt_config.py index 3252ad9e7f..f30330abb6 100644 --- a/synapse/config/jwt_config.py +++ b/synapse/config/jwt_config.py @@ -63,7 +63,7 @@ class JWTConfig(Config): # and issued at ("iat") claims are validated if present. # # Note that this is a non-standard login type and client support is - # expected to be non-existant. + # expected to be non-existent. # # See https://github.com/matrix-org/synapse/blob/master/docs/jwt.md. # diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 13d6f6a3ea..6b7be28aee 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -105,7 +105,7 @@ root: # then write them to a file. # # Replace "buffer" with "console" to log to stderr instead. (Note that you'll - # also need to update the configuation for the `twisted` logger above, in + # also need to update the configuration for the `twisted` logger above, in # this case.) # handlers: [buffer] diff --git a/synapse/config/registration.py b/synapse/config/registration.py index d7e3690a32..b0a77a2e43 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -143,7 +143,7 @@ class RegistrationConfig(Config): RoomCreationPreset.TRUSTED_PRIVATE_CHAT, } - # Pull the creater/inviter from the configuration, this gets used to + # Pull the creator/inviter from the configuration, this gets used to # send invites for invite-only rooms. mxid_localpart = config.get("auto_join_mxid_localpart") self.auto_join_user_id = None diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py index 6de1f9d103..92e1b67528 100644 --- a/synapse/config/room_directory.py +++ b/synapse/config/room_directory.py @@ -99,7 +99,7 @@ class RoomDirectoryConfig(Config): # # Options for the rules include: # - # user_id: Matches agaisnt the creator of the alias + # user_id: Matches against the creator of the alias # room_id: Matches against the room ID being published # alias: Matches against any current local or canonical aliases # associated with the room diff --git a/synapse/config/tracer.py b/synapse/config/tracer.py index 8be1346113..0c1a854f09 100644 --- a/synapse/config/tracer.py +++ b/synapse/config/tracer.py @@ -67,7 +67,7 @@ class TracerConfig(Config): # This is a list of regexes which are matched against the server_name of the # homeserver. # - # By defult, it is empty, so no servers are matched. + # By default, it is empty, so no servers are matched. # #homeserver_whitelist: # - ".*" diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py index 79668a402e..57fd426e87 100644 --- a/synapse/crypto/context_factory.py +++ b/synapse/crypto/context_factory.py @@ -149,7 +149,7 @@ class FederationPolicyForHTTPS: return SSLClientConnectionCreator(host, ssl_context, should_verify) def creatorForNetloc(self, hostname, port): - """Implements the IPolicyForHTTPS interace so that this can be passed + """Implements the IPolicyForHTTPS interface so that this can be passed directly to agents. """ return self.get_options(hostname) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 65df62107f..e203206865 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -59,7 +59,7 @@ class DictProperty: # # To exclude the KeyError from the traceback, we explicitly # 'raise from e1.__context__' (which is better than 'raise from None', - # becuase that would omit any *earlier* exceptions). + # because that would omit any *earlier* exceptions). # raise AttributeError( "'%s' has no '%s' property" % (type(instance), self.key) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 355cbe05f1..14f7f1156f 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -180,7 +180,7 @@ def only_fields(dictionary, fields): in 'fields'. If there are no event fields specified then all fields are included. - The entries may include '.' charaters to indicate sub-fields. + The entries may include '.' characters to indicate sub-fields. So ['content.body'] will include the 'body' field of the 'content' object. A literal '.' character in a field name may be escaped using a '\'. diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index a86b3debc5..41cf07cc88 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -22,7 +22,7 @@ attestations have a validity period so need to be periodically renewed. If a user leaves (or gets kicked out of) a group, either side can still use their attestation to "prove" their membership, until the attestation expires. Therefore attestations shouldn't be relied on to prove membership in important -cases, but can for less important situtations, e.g. showing a users membership +cases, but can for less important situations, e.g. showing a users membership of groups on their profile, showing flairs, etc. An attestation is a signed blob of json that looks like: diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index e5f85b472d..0d042cbfac 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -113,7 +113,7 @@ class GroupsServerWorkerHandler: entry = await self.room_list_handler.generate_room_entry( room_id, len(joined_users), with_alias=False, allow_private=True ) - entry = dict(entry) # so we don't change whats cached + entry = dict(entry) # so we don't change what's cached entry.pop("room_id", None) room_entry["profile"] = entry @@ -550,7 +550,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler): group_id, room_id, is_public=is_public ) else: - raise SynapseError(400, "Uknown config option") + raise SynapseError(400, "Unknown config option") return {} diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 1ce2091b46..a703944543 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -88,7 +88,7 @@ class AdminHandler(BaseHandler): # We only try and fetch events for rooms the user has been in. If # they've been e.g. invited to a room without joining then we handle - # those seperately. + # those separately. rooms_user_has_been_in = await self.store.get_rooms_user_has_been_in(user_id) for index, room in enumerate(rooms): @@ -226,7 +226,7 @@ class ExfiltrationWriter: """ def finished(self): - """Called when all data has succesfully been exported and written. + """Called when all data has successfully been exported and written. This functions return value is passed to the caller of `export_user_data`. diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 48d60feaab..dd14ab69d7 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -690,7 +690,7 @@ class AuthHandler(BaseHandler): Creates a new access token for the user with the given user ID. The user is assumed to have been authenticated by some other - machanism (e.g. CAS), and the user_id converted to the canonical case. + mechanism (e.g. CAS), and the user_id converted to the canonical case. The device will be recorded in the table if it is not there already. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fde8f00531..c386957706 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -112,7 +112,7 @@ class FederationHandler(BaseHandler): """Handles events that originated from federation. Responsible for: a) handling received Pdus before handing them on as Events to the rest - of the homeserver (including auth and state conflict resoultion) + of the homeserver (including auth and state conflict resolutions) b) converting events that were produced by local clients that may need to be sent to remote homeservers. c) doing the necessary dances to invite remote users and join remote @@ -477,7 +477,7 @@ class FederationHandler(BaseHandler): # ---- # # Update richvdh 2018/09/18: There are a number of problems with timing this - # request out agressively on the client side: + # request out aggressively on the client side: # # - it plays badly with the server-side rate-limiter, which starts tarpitting you # if you send too many requests at once, so you end up with the server carefully @@ -495,13 +495,13 @@ class FederationHandler(BaseHandler): # we'll end up back here for the *next* PDU in the list, which exacerbates the # problem. # - # - the agressive 10s timeout was introduced to deal with incoming federation + # - the aggressive 10s timeout was introduced to deal with incoming federation # requests taking 8 hours to process. It's not entirely clear why that was going # on; certainly there were other issues causing traffic storms which are now # resolved, and I think in any case we may be more sensible about our locking # now. We're *certainly* more sensible about our logging. # - # All that said: Let's try increasing the timout to 60s and see what happens. + # All that said: Let's try increasing the timeout to 60s and see what happens. try: missing_events = await self.federation_client.get_missing_events( @@ -1120,7 +1120,7 @@ class FederationHandler(BaseHandler): logger.info(str(e)) continue except RequestSendFailed as e: - logger.info("Falied to get backfill from %s because %s", dom, e) + logger.info("Failed to get backfill from %s because %s", dom, e) continue except FederationDeniedError as e: logger.info(e) @@ -1545,7 +1545,7 @@ class FederationHandler(BaseHandler): # # The reasons we have the destination server rather than the origin # server send it are slightly mysterious: the origin server should have - # all the neccessary state once it gets the response to the send_join, + # all the necessary state once it gets the response to the send_join, # so it could send the event itself if it wanted to. It may be that # doing it this way reduces failure modes, or avoids certain attacks # where a new server selectively tells a subset of the federation that @@ -1649,7 +1649,7 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = True event.internal_metadata.out_of_band_membership = True - # Try the host that we succesfully called /make_leave/ on first for + # Try the host that we successfully called /make_leave/ on first for # the /send_leave/ request. host_list = list(target_hosts) try: diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index b2def93bb1..abd8d2af44 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -349,7 +349,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): server_name=get_domain_from_id(group_id), ) - # TODO: Check that the group is public and we're being added publically + # TODO: Check that the group is public and we're being added publicly is_publicised = content.get("publicise", False) token = await self.store.register_user_group_membership( @@ -394,7 +394,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler): server_name=get_domain_from_id(group_id), ) - # TODO: Check that the group is public and we're being added publically + # TODO: Check that the group is public and we're being added publicly is_publicised = content.get("publicise", False) token = await self.store.register_user_group_membership( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index d6855c60ea..f1b4d35182 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -657,7 +657,7 @@ class EventCreationHandler: context: The event context. Returns: - The previous verion of the event is returned, if it is found in the + The previous version of the event is returned, if it is found in the event context. Otherwise, None is returned. """ prev_state_ids = await context.get_prev_state_ids() diff --git a/synapse/handlers/oidc_handler.py b/synapse/handlers/oidc_handler.py index a312610635..331d4e7e96 100644 --- a/synapse/handlers/oidc_handler.py +++ b/synapse/handlers/oidc_handler.py @@ -217,7 +217,7 @@ class OidcHandler: This is based on the requested scopes: if the scopes include ``openid``, the provider should give use an ID token containing the - user informations. If not, we should fetch them using the + user information. If not, we should fetch them using the ``access_token`` with the ``userinfo_endpoint``. """ @@ -426,7 +426,7 @@ class OidcHandler: return resp async def _fetch_userinfo(self, token: Token) -> UserInfo: - """Fetch user informations from the ``userinfo_endpoint``. + """Fetch user information from the ``userinfo_endpoint``. Args: token: the token given by the ``token_endpoint``. @@ -754,7 +754,7 @@ class OidcHandler: Defaults to an hour. Returns: - A signed macaroon token with the session informations. + A signed macaroon token with the session information. """ macaroon = pymacaroons.Macaroon( location=self._server_name, identifier="key", key=self._macaroon_secret_key, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1000ac95ff..49a00eed9c 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -802,7 +802,7 @@ class PresenceHandler(BasePresenceHandler): between the requested tokens due to the limit. The token returned can be used in a subsequent call to this - function to get further updatees. + function to get further updates. The updates are a list of 2-tuples of stream ID and the row data """ @@ -977,7 +977,7 @@ def should_notify(old_state, new_state): new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY ): - # Only notify about last active bumps if we're not currently acive + # Only notify about last active bumps if we're not currently active if not new_state.currently_active: notify_reason_counter.labels("last_active_change_online").inc() return True diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 92700b589c..da5692e03e 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -102,7 +102,7 @@ class ProfileHandler(BaseHandler): async def get_profile_from_cache(self, user_id: str) -> JsonDict: """Get the profile information from our local cache. If the user is - ours then the profile information will always be corect. Otherwise, + ours then the profile information will always be correct. Otherwise, it may be out of date/missing. """ target_user = UserID.from_string(user_id) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ec300d8877..c5b1f1f1e1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1268,7 +1268,7 @@ class RoomShutdownHandler: ) # We now wait for the create room to come back in via replication so - # that we can assume that all the joins/invites have propogated before + # that we can assume that all the joins/invites have propagated before # we try and auto join below. await self._replication.wait_for_stream_position( self.hs.config.worker.events_shard_config.get_instance(new_room_id), diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index e9402e6e2e..66f1bbcfc4 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -139,7 +139,7 @@ class SearchHandler(BaseHandler): # Filter to apply to results filter_dict = room_cat.get("filter", {}) - # What to order results by (impacts whether pagination can be doen) + # What to order results by (impacts whether pagination can be done) order_by = room_cat.get("order_by", "rank") # Return the current state of the rooms? diff --git a/synapse/handlers/state_deltas.py b/synapse/handlers/state_deltas.py index 7a4ae0727a..fb4f70e8e2 100644 --- a/synapse/handlers/state_deltas.py +++ b/synapse/handlers/state_deltas.py @@ -32,7 +32,7 @@ class StateDeltasHandler: Returns: None if the field in the events either both match `public_value` or if neither do, i.e. there has been no change. - True if it didnt match `public_value` but now does + True if it didn't match `public_value` but now does False if it did match `public_value` but now doesn't """ prev_event = None diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b527724bc4..32e53c2d25 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -754,7 +754,7 @@ class SyncHandler: """ # TODO(mjark) Check if the state events were received by the server # after the previous sync, since we need to include those state - # updates even if they occured logically before the previous event. + # updates even if they occurred logically before the previous event. # TODO(mjark) Check for new redactions in the state events. with Measure(self.clock, "compute_state_delta"): @@ -1882,7 +1882,7 @@ class SyncHandler: # members (as the client otherwise doesn't have enough info to form # the name itself). if sync_config.filter_collection.lazy_load_members() and ( - # we recalulate the summary: + # we recalculate the summary: # if there are membership changes in the timeline, or # if membership has changed during a gappy sync, or # if this is an initial sync. diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index d3692842e3..8758066c74 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -371,7 +371,7 @@ class TypingWriterHandler(FollowerTypingHandler): between the requested tokens due to the limit. The token returned can be used in a subsequent call to this - function to get further updatees. + function to get further updates. The updates are a list of 2-tuples of stream ID and the row data """ diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 79393c8829..afbebfc200 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -31,7 +31,7 @@ class UserDirectoryHandler(StateDeltasHandler): N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY The user directory is filled with users who this server can see are joined to a - world_readable or publically joinable room. We keep a database table up to date + world_readable or publicly joinable room. We keep a database table up to date by streaming changes of the current state and recalculating whether users should be in the directory or not when necessary. """ diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py index a306faa267..1cc666fbf6 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py @@ -172,7 +172,7 @@ class WellKnownResolver: had_valid_well_known = self._had_valid_well_known_cache.get(server_name, False) # We do this in two steps to differentiate between possibly transient - # errors (e.g. can't connect to host, 503 response) and more permenant + # errors (e.g. can't connect to host, 503 response) and more permanent # errors (such as getting a 404 response). response, body = await self._make_well_known_request( server_name, retry=had_valid_well_known diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index c23a4d7c0c..04766ca965 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -587,7 +587,7 @@ class MatrixFederationHttpClient: """ Builds the Authorization headers for a federation request Args: - destination (bytes|None): The desination homeserver of the request. + destination (bytes|None): The destination homeserver of the request. May be None if the destination is an identity server, in which case destination_is must be non-None. method (bytes): The HTTP method of the request @@ -640,7 +640,7 @@ class MatrixFederationHttpClient: backoff_on_404=False, try_trailing_slash_on_400=False, ): - """ Sends the specifed json data using PUT + """ Sends the specified json data using PUT Args: destination (str): The remote server to send the HTTP request @@ -729,7 +729,7 @@ class MatrixFederationHttpClient: ignore_backoff=False, args={}, ): - """ Sends the specifed json data using POST + """ Sends the specified json data using POST Args: destination (str): The remote server to send the HTTP request diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index cd94e789e8..7c5defec82 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -109,7 +109,7 @@ in_flight_requests_db_sched_duration = Counter( # The set of all in flight requests, set[RequestMetrics] _in_flight_requests = set() -# Protects the _in_flight_requests set from concurrent accesss +# Protects the _in_flight_requests set from concurrent access _in_flight_requests_lock = threading.Lock() diff --git a/synapse/http/server.py b/synapse/http/server.py index 00b98af3d4..65dbd339ac 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -182,7 +182,7 @@ class HttpServer: """ Register a callback that gets fired if we receive a http request with the given method for a path that matches the given regex. - If the regex contains groups these gets passed to the calback via + If the regex contains groups these gets passed to the callback via an unpacked tuple. Args: @@ -241,7 +241,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta): async def _async_render(self, request: Request): """Delegates to `_async_render_` methods, or returns a 400 if - no appropriate method exists. Can be overriden in sub classes for + no appropriate method exists. Can be overridden in sub classes for different routing. """ # Treat HEAD requests as GET requests. @@ -386,7 +386,7 @@ class JsonResource(DirectServeJsonResource): async def _async_render(self, request): callback, servlet_classname, group_dict = self._get_handler_for_request(request) - # Make sure we have an appopriate name for this handler in prometheus + # Make sure we have an appropriate name for this handler in prometheus # (rather than the default of JsonResource). request.request_metrics.name = servlet_classname diff --git a/synapse/http/site.py b/synapse/http/site.py index ca673028e4..ddb1770b09 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -167,7 +167,9 @@ class SynapseRequest(Request): yield except Exception: # this should already have been caught, and sent back to the client as a 500. - logger.exception("Asynchronous messge handler raised an uncaught exception") + logger.exception( + "Asynchronous message handler raised an uncaught exception" + ) finally: # the request handler has finished its work and either sent the whole response # back, or handed over responsibility to a Producer. diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index ea5f1c7b62..08fbf78eee 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -266,7 +266,7 @@ class BackgroundProcessLoggingContext(LoggingContext): super().__exit__(type, value, traceback) - # The background process has finished. We explictly remove and manually + # The background process has finished. We explicitly remove and manually # update the metrics here so that if nothing is scraping metrics the set # doesn't infinitely grow. with _bg_metrics_lock: diff --git a/synapse/notifier.py b/synapse/notifier.py index 2e993411b9..858b487bec 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -393,7 +393,7 @@ class Notifier: ) def on_new_replication_data(self) -> None: - """Used to inform replication listeners that something has happend + """Used to inform replication listeners that something has happened without waking up any of the normal user event streams""" self.notify_replication() diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py index 8047873ff1..2858b61fb1 100644 --- a/synapse/push/baserules.py +++ b/synapse/push/baserules.py @@ -37,7 +37,7 @@ def list_with_base_rules(rawrules, use_new_defaults=False): modified_base_rules = {r["rule_id"]: r for r in rawrules if r["priority_class"] < 0} # Remove the modified base rules from the list, They'll be added back - # in the default postions in the list. + # in the default positions in the list. rawrules = [r for r in rawrules if r["priority_class"] >= 0] # shove the server default rules for each kind onto the end of each diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index a701defcdd..d9b5478b53 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -390,12 +390,12 @@ class RulesForRoom: continue # If a user has left a room we remove their push rule. If they - # joined then we readd it later in _update_rules_with_member_event_ids + # joined then we re-add it later in _update_rules_with_member_event_ids ret_rules_by_user.pop(user_id, None) missing_member_event_ids[user_id] = event_id if missing_member_event_ids: - # If we have some memebr events we haven't seen, look them up + # If we have some member events we haven't seen, look them up # and fetch push rules for them if appropriate. logger.debug("Found new member events %r", missing_member_event_ids) await self._update_rules_with_member_event_ids( diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py index 3673e7f47e..9137c4edb1 100644 --- a/synapse/server_notices/consent_server_notices.py +++ b/synapse/server_notices/consent_server_notices.py @@ -104,7 +104,7 @@ class ConsentServerNotices: def copy_with_str_subst(x: Any, substitutions: Any) -> Any: - """Deep-copy a structure, carrying out string substitions on any strings + """Deep-copy a structure, carrying out string substitutions on any strings Args: x (object): structure to be copied diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 5b0900aa3c..1fa3b280b4 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -547,7 +547,7 @@ class StateResolutionHandler: event_map: a dict from event_id to event, for any events that we happen to have in flight (eg, those currently being persisted). This will be - used as a starting point fof finding the state we need; any missing + used as a starting point for finding the state we need; any missing events will be requested via state_res_store. If None, all events will be fetched via state_res_store. diff --git a/synapse/state/v1.py b/synapse/state/v1.py index a493279cbd..85edae053d 100644 --- a/synapse/state/v1.py +++ b/synapse/state/v1.py @@ -56,7 +56,7 @@ async def resolve_events_with_store( event_map: a dict from event_id to event, for any events that we happen to have in flight (eg, those currently being persisted). This will be - used as a starting point fof finding the state we need; any missing + used as a starting point for finding the state we need; any missing events will be requested via state_map_factory. If None, all events will be fetched via state_map_factory. diff --git a/synapse/state/v2.py b/synapse/state/v2.py index edf94e7ad6..f57df0d728 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -69,7 +69,7 @@ async def resolve_events_with_store( event_map: a dict from event_id to event, for any events that we happen to have in flight (eg, those currently being persisted). This will be - used as a starting point fof finding the state we need; any missing + used as a starting point for finding the state we need; any missing events will be requested via state_res_store. If None, all events will be fetched via state_res_store. diff --git a/synapse/static/client/login/js/login.js b/synapse/static/client/login/js/login.js index 3678670ec7..744800ec77 100644 --- a/synapse/static/client/login/js/login.js +++ b/synapse/static/client/login/js/login.js @@ -182,7 +182,7 @@ matrixLogin.passwordLogin = function() { }; /* - * The onLogin function gets called after a succesful login. + * The onLogin function gets called after a successful login. * * It is expected that implementations override this to be notified when the * login is complete. The response to the login call is provided as the single -- cgit 1.5.1 From 2b7c180879e5d62145feed88375ba55f18fc2ae5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Oct 2020 09:30:19 +0000 Subject: Start fewer opentracing spans (#8640) #8567 started a span for every background process. This is good as it means all Synapse code that gets run should be in a span (unless in the sentinel logging context), but it means we generate about 15x the number of spans as we did previously. This PR attempts to reduce that number by a) not starting one for send commands to Redis, and b) deferring starting background processes until after we're sure they're necessary. I don't really know how much this will help. --- changelog.d/8640.misc | 1 + synapse/handlers/appservice.py | 50 +++++++++++++++++++++++---- synapse/logging/opentracing.py | 10 +++--- synapse/metrics/background_process_metrics.py | 12 +++++-- synapse/notifier.py | 34 ++++++------------ synapse/push/pusherpool.py | 18 ++++++++-- synapse/replication/tcp/redis.py | 4 ++- tests/handlers/test_appservice.py | 20 +++++------ 8 files changed, 96 insertions(+), 53 deletions(-) create mode 100644 changelog.d/8640.misc (limited to 'synapse/push') diff --git a/changelog.d/8640.misc b/changelog.d/8640.misc new file mode 100644 index 0000000000..cf6023f783 --- /dev/null +++ b/changelog.d/8640.misc @@ -0,0 +1 @@ +Reduce number of OpenTracing spans started. diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 07240d3a14..7826387e53 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Union from prometheus_client import Counter @@ -30,7 +30,10 @@ from synapse.metrics import ( event_processing_loop_counter, event_processing_loop_room_count, ) -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import ( + run_as_background_process, + wrap_as_background_process, +) from synapse.types import Collection, JsonDict, RoomStreamToken, UserID from synapse.util.metrics import Measure @@ -53,7 +56,7 @@ class ApplicationServicesHandler: self.current_max = 0 self.is_processing = False - async def notify_interested_services(self, max_token: RoomStreamToken): + def notify_interested_services(self, max_token: RoomStreamToken): """Notifies (pushes) all application services interested in this event. Pushing is done asynchronously, so this method won't block for any @@ -72,6 +75,12 @@ class ApplicationServicesHandler: if self.is_processing: return + # We only start a new background process if necessary rather than + # optimistically (to cut down on overhead). + self._notify_interested_services(max_token) + + @wrap_as_background_process("notify_interested_services") + async def _notify_interested_services(self, max_token: RoomStreamToken): with Measure(self.clock, "notify_interested_services"): self.is_processing = True try: @@ -166,8 +175,11 @@ class ApplicationServicesHandler: finally: self.is_processing = False - async def notify_interested_services_ephemeral( - self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [], + def notify_interested_services_ephemeral( + self, + stream_key: str, + new_token: Optional[int], + users: Collection[Union[str, UserID]] = [], ): """This is called by the notifier in the background when a ephemeral event handled by the homeserver. @@ -183,13 +195,34 @@ class ApplicationServicesHandler: new_token: The latest stream token users: The user(s) involved with the event. """ + if not self.notify_appservices: + return + + if stream_key not in ("typing_key", "receipt_key", "presence_key"): + return + services = [ service for service in self.store.get_app_services() if service.supports_ephemeral ] - if not services or not self.notify_appservices: + if not services: return + + # We only start a new background process if necessary rather than + # optimistically (to cut down on overhead). + self._notify_interested_services_ephemeral( + services, stream_key, new_token, users + ) + + @wrap_as_background_process("notify_interested_services_ephemeral") + async def _notify_interested_services_ephemeral( + self, + services: List[ApplicationService], + stream_key: str, + new_token: Optional[int], + users: Collection[Union[str, UserID]], + ): logger.info("Checking interested services for %s" % (stream_key)) with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: @@ -237,7 +270,7 @@ class ApplicationServicesHandler: return receipts async def _handle_presence( - self, service: ApplicationService, users: Collection[UserID] + self, service: ApplicationService, users: Collection[Union[str, UserID]] ): events = [] # type: List[JsonDict] presence_source = self.event_sources.sources["presence"] @@ -245,6 +278,9 @@ class ApplicationServicesHandler: service, "presence" ) for user in users: + if isinstance(user, str): + user = UserID.from_string(user) + interested = await service.is_interested_in_presence(user, self.store) if not interested: continue diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index e58850faff..ab586c318c 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -317,7 +317,7 @@ def ensure_active_span(message, ret=None): @contextlib.contextmanager -def _noop_context_manager(*args, **kwargs): +def noop_context_manager(*args, **kwargs): """Does exactly what it says on the tin""" yield @@ -413,7 +413,7 @@ def start_active_span( """ if opentracing is None: - return _noop_context_manager() + return noop_context_manager() return opentracing.tracer.start_active_span( operation_name, @@ -428,7 +428,7 @@ def start_active_span( def start_active_span_follows_from(operation_name, contexts): if opentracing is None: - return _noop_context_manager() + return noop_context_manager() references = [opentracing.follows_from(context) for context in contexts] scope = start_active_span(operation_name, references=references) @@ -459,7 +459,7 @@ def start_active_span_from_request( # Also, twisted uses byte arrays while opentracing expects strings. if opentracing is None: - return _noop_context_manager() + return noop_context_manager() header_dict = { k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders() @@ -497,7 +497,7 @@ def start_active_span_from_edu( """ if opentracing is None: - return _noop_context_manager() + return noop_context_manager() carrier = json_decoder.decode(edu_content.get("context", "{}")).get( "opentracing", {} diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 08fbf78eee..658f6ecd72 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -24,7 +24,7 @@ from prometheus_client.core import REGISTRY, Counter, Gauge from twisted.internet import defer from synapse.logging.context import LoggingContext, PreserveLoggingContext -from synapse.logging.opentracing import start_active_span +from synapse.logging.opentracing import noop_context_manager, start_active_span if TYPE_CHECKING: import resource @@ -167,7 +167,7 @@ class _BackgroundProcess: ) -def run_as_background_process(desc: str, func, *args, **kwargs): +def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwargs): """Run the given function in its own logcontext, with resource metrics This should be used to wrap processes which are fired off to run in the @@ -181,6 +181,9 @@ def run_as_background_process(desc: str, func, *args, **kwargs): Args: desc: a description for this background process type func: a function, which may return a Deferred or a coroutine + bg_start_span: Whether to start an opentracing span. Defaults to True. + Should only be disabled for processes that will not log to or tag + a span. args: positional args for func kwargs: keyword args for func @@ -199,7 +202,10 @@ def run_as_background_process(desc: str, func, *args, **kwargs): with BackgroundProcessLoggingContext(desc) as context: context.request = "%s-%i" % (desc, count) try: - with start_active_span(desc, tags={"request_id": context.request}): + ctx = noop_context_manager() + if bg_start_span: + ctx = start_active_span(desc, tags={"request_id": context.request}) + with ctx: result = func(*args, **kwargs) if inspect.isawaitable(result): diff --git a/synapse/notifier.py b/synapse/notifier.py index 858b487bec..eb56b26f21 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -40,7 +40,6 @@ from synapse.handlers.presence import format_user_presence_state from synapse.logging.context import PreserveLoggingContext from synapse.logging.utils import log_function from synapse.metrics import LaterGauge -from synapse.metrics.background_process_metrics import run_as_background_process from synapse.streams.config import PaginationConfig from synapse.types import ( Collection, @@ -310,44 +309,37 @@ class Notifier: """ # poke any interested application service. - run_as_background_process( - "_notify_app_services", self._notify_app_services, max_room_stream_token - ) - - run_as_background_process( - "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token - ) + self._notify_app_services(max_room_stream_token) + self._notify_pusher_pool(max_room_stream_token) if self.federation_sender: self.federation_sender.notify_new_events(max_room_stream_token) - async def _notify_app_services(self, max_room_stream_token: RoomStreamToken): + def _notify_app_services(self, max_room_stream_token: RoomStreamToken): try: - await self.appservice_handler.notify_interested_services( - max_room_stream_token - ) + self.appservice_handler.notify_interested_services(max_room_stream_token) except Exception: logger.exception("Error notifying application services of event") - async def _notify_app_services_ephemeral( + def _notify_app_services_ephemeral( self, stream_key: str, new_token: Union[int, RoomStreamToken], - users: Collection[UserID] = [], + users: Collection[Union[str, UserID]] = [], ): try: stream_token = None if isinstance(new_token, int): stream_token = new_token - await self.appservice_handler.notify_interested_services_ephemeral( + self.appservice_handler.notify_interested_services_ephemeral( stream_key, stream_token, users ) except Exception: logger.exception("Error notifying application services of event") - async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): + def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): try: - await self._pusher_pool.on_new_notifications(max_room_stream_token) + self._pusher_pool.on_new_notifications(max_room_stream_token) except Exception: logger.exception("Error pusher pool of event") @@ -384,12 +376,8 @@ class Notifier: self.notify_replication() # Notify appservices - run_as_background_process( - "_notify_app_services_ephemeral", - self._notify_app_services_ephemeral, - stream_key, - new_token, - users, + self._notify_app_services_ephemeral( + stream_key, new_token, users, ) def on_new_replication_data(self) -> None: diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 0080c68ce2..f325964983 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -19,7 +19,10 @@ from typing import TYPE_CHECKING, Dict, Union from prometheus_client import Gauge -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import ( + run_as_background_process, + wrap_as_background_process, +) from synapse.push import PusherConfigException from synapse.push.emailpusher import EmailPusher from synapse.push.httppusher import HttpPusher @@ -187,7 +190,7 @@ class PusherPool: ) await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"]) - async def on_new_notifications(self, max_token: RoomStreamToken): + def on_new_notifications(self, max_token: RoomStreamToken): if not self.pushers: # nothing to do here. return @@ -201,6 +204,17 @@ class PusherPool: # Nothing to do return + # We only start a new background process if necessary rather than + # optimistically (to cut down on overhead). + self._on_new_notifications(max_token) + + @wrap_as_background_process("on_new_notifications") + async def _on_new_notifications(self, max_token: RoomStreamToken): + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_id = max_token.stream + prev_stream_id = self._last_room_stream_id_seen self._last_room_stream_id_seen = max_stream_id diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index de19705c1f..bc6ba709a7 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -166,7 +166,9 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): Args: cmd (Command) """ - run_as_background_process("send-cmd", self._async_send_command, cmd) + run_as_background_process( + "send-cmd", self._async_send_command, cmd, bg_start_span=False + ) async def _async_send_command(self, cmd: Command): """Encode a replication command and send it over our outbound connection""" diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index ee4f3da31c..53763cd0f9 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -42,7 +42,6 @@ class AppServiceHandlerTestCase(unittest.TestCase): hs.get_clock.return_value = MockClock() self.handler = ApplicationServicesHandler(hs) - @defer.inlineCallbacks def test_notify_interested_services(self): interested_service = self._mkservice(is_interested=True) services = [ @@ -62,14 +61,12 @@ class AppServiceHandlerTestCase(unittest.TestCase): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred( - self.handler.notify_interested_services(RoomStreamToken(None, 0)) - ) + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + self.mock_scheduler.submit_event_for_as.assert_called_once_with( interested_service, event ) - @defer.inlineCallbacks def test_query_user_exists_unknown_user(self): user_id = "@someone:anywhere" services = [self._mkservice(is_interested=True)] @@ -83,12 +80,11 @@ class AppServiceHandlerTestCase(unittest.TestCase): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred( - self.handler.notify_interested_services(RoomStreamToken(None, 0)) - ) + + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + self.mock_as_api.query_user.assert_called_once_with(services[0], user_id) - @defer.inlineCallbacks def test_query_user_exists_known_user(self): user_id = "@someone:anywhere" services = [self._mkservice(is_interested=True)] @@ -102,9 +98,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): defer.succeed((0, [event])), defer.succeed((0, [])), ] - yield defer.ensureDeferred( - self.handler.notify_interested_services(RoomStreamToken(None, 0)) - ) + + self.handler.notify_interested_services(RoomStreamToken(None, 0)) + self.assertFalse( self.mock_as_api.query_user.called, "query_user called when it shouldn't have been.", -- cgit 1.5.1 From cbc82aa09faa59acc20865e8b5c36561acb9a570 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 30 Oct 2020 11:43:17 +0000 Subject: Implement and use an @lru_cache decorator (#8595) We don't always need the full power of a DeferredCache. --- changelog.d/8595.misc | 1 + synapse/push/bulk_push_rule_evaluator.py | 37 +++-- synapse/util/caches/descriptors.py | 235 ++++++++++++++++++++++++------- tests/util/caches/test_descriptors.py | 60 +++++++- 4 files changed, 272 insertions(+), 61 deletions(-) create mode 100644 changelog.d/8595.misc (limited to 'synapse/push') diff --git a/changelog.d/8595.misc b/changelog.d/8595.misc new file mode 100644 index 0000000000..24fab65cda --- /dev/null +++ b/changelog.d/8595.misc @@ -0,0 +1 @@ +Implement and use an @lru_cache decorator. diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index d9b5478b53..82a72dc34f 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -15,8 +15,8 @@ # limitations under the License. import logging -from collections import namedtuple +import attr from prometheus_client import Counter from synapse.api.constants import EventTypes, Membership, RelationTypes @@ -26,7 +26,8 @@ from synapse.events.snapshot import EventContext from synapse.state import POWER_KEY from synapse.util.async_helpers import Linearizer from synapse.util.caches import register_cache -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import lru_cache +from synapse.util.caches.lrucache import LruCache from .push_rule_evaluator import PushRuleEvaluatorForEvent @@ -120,7 +121,7 @@ class BulkPushRuleEvaluator: dict of user_id -> push_rules """ room_id = event.room_id - rules_for_room = await self._get_rules_for_room(room_id) + rules_for_room = self._get_rules_for_room(room_id) rules_by_user = await rules_for_room.get_rules(event, context) @@ -138,7 +139,7 @@ class BulkPushRuleEvaluator: return rules_by_user - @cached() + @lru_cache() def _get_rules_for_room(self, room_id): """Get the current RulesForRoom object for the given room id @@ -275,12 +276,14 @@ class RulesForRoom: the entire cache for the room. """ - def __init__(self, hs, room_id, rules_for_room_cache, room_push_rule_cache_metrics): + def __init__( + self, hs, room_id, rules_for_room_cache: LruCache, room_push_rule_cache_metrics + ): """ Args: hs (HomeServer) room_id (str) - rules_for_room_cache(Cache): The cache object that caches these + rules_for_room_cache: The cache object that caches these RoomsForUser objects. room_push_rule_cache_metrics (CacheMetric) """ @@ -489,13 +492,21 @@ class RulesForRoom: self.state_group = state_group -class _Invalidation(namedtuple("_Invalidation", ("cache", "room_id"))): - # We rely on _CacheContext implementing __eq__ and __hash__ sensibly, - # which namedtuple does for us (i.e. two _CacheContext are the same if - # their caches and keys match). This is important in particular to - # dedupe when we add callbacks to lru cache nodes, otherwise the number - # of callbacks would grow. +@attr.attrs(slots=True, frozen=True) +class _Invalidation: + # _Invalidation is passed as an `on_invalidate` callback to bulk_get_push_rules, + # which means that it it is stored on the bulk_get_push_rules cache entry. In order + # to ensure that we don't accumulate lots of redunant callbacks on the cache entry, + # we need to ensure that two _Invalidation objects are "equal" if they refer to the + # same `cache` and `room_id`. + # + # attrs provides suitable __hash__ and __eq__ methods, provided we remember to + # set `frozen=True`. + + cache = attr.ib(type=LruCache) + room_id = attr.ib(type=str) + def __call__(self): - rules = self.cache.get_immediate(self.room_id, None, update_metrics=False) + rules = self.cache.get(self.room_id, None, update_metrics=False) if rules: rules.invalidate_all() diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 5d7fffee66..a924140cdf 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -13,10 +13,23 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import enum import functools import inspect import logging -from typing import Any, Callable, Generic, Optional, Tuple, TypeVar, Union, cast +from typing import ( + Any, + Callable, + Generic, + Iterable, + Mapping, + Optional, + Sequence, + Tuple, + TypeVar, + Union, + cast, +) from weakref import WeakValueDictionary from twisted.internet import defer @@ -24,6 +37,7 @@ from twisted.internet import defer from synapse.logging.context import make_deferred_yieldable, preserve_fn from synapse.util import unwrapFirstError from synapse.util.caches.deferred_cache import DeferredCache +from synapse.util.caches.lrucache import LruCache logger = logging.getLogger(__name__) @@ -48,7 +62,7 @@ class _CachedFunction(Generic[F]): class _CacheDescriptorBase: - def __init__(self, orig: _CachedFunction, num_args, cache_context=False): + def __init__(self, orig: Callable[..., Any], num_args, cache_context=False): self.orig = orig arg_spec = inspect.getfullargspec(orig) @@ -97,8 +111,107 @@ class _CacheDescriptorBase: self.add_cache_context = cache_context + self.cache_key_builder = get_cache_key_builder( + self.arg_names, self.arg_defaults + ) + + +class _LruCachedFunction(Generic[F]): + cache = None # type: LruCache[CacheKey, Any] + __call__ = None # type: F + + +def lru_cache( + max_entries: int = 1000, cache_context: bool = False, +) -> Callable[[F], _LruCachedFunction[F]]: + """A method decorator that applies a memoizing cache around the function. + + This is more-or-less a drop-in equivalent to functools.lru_cache, although note + that the signature is slightly different. + + The main differences with functools.lru_cache are: + (a) the size of the cache can be controlled via the cache_factor mechanism + (b) the wrapped function can request a "cache_context" which provides a + callback mechanism to indicate that the result is no longer valid + (c) prometheus metrics are exposed automatically. + + The function should take zero or more arguments, which are used as the key for the + cache. Single-argument functions use that argument as the cache key; otherwise the + arguments are built into a tuple. + + Cached functions can be "chained" (i.e. a cached function can call other cached + functions and get appropriately invalidated when they called caches are + invalidated) by adding a special "cache_context" argument to the function + and passing that as a kwarg to all caches called. For example: + + @lru_cache(cache_context=True) + def foo(self, key, cache_context): + r1 = self.bar1(key, on_invalidate=cache_context.invalidate) + r2 = self.bar2(key, on_invalidate=cache_context.invalidate) + return r1 + r2 + + The wrapped function also has a 'cache' property which offers direct access to the + underlying LruCache. + """ + + def func(orig: F) -> _LruCachedFunction[F]: + desc = LruCacheDescriptor( + orig, max_entries=max_entries, cache_context=cache_context, + ) + return cast(_LruCachedFunction[F], desc) + + return func + + +class LruCacheDescriptor(_CacheDescriptorBase): + """Helper for @lru_cache""" + + class _Sentinel(enum.Enum): + sentinel = object() + + def __init__( + self, orig, max_entries: int = 1000, cache_context: bool = False, + ): + super().__init__(orig, num_args=None, cache_context=cache_context) + self.max_entries = max_entries + + def __get__(self, obj, owner): + cache = LruCache( + cache_name=self.orig.__name__, max_size=self.max_entries, + ) # type: LruCache[CacheKey, Any] + + get_cache_key = self.cache_key_builder + sentinel = LruCacheDescriptor._Sentinel.sentinel + + @functools.wraps(self.orig) + def _wrapped(*args, **kwargs): + invalidate_callback = kwargs.pop("on_invalidate", None) + callbacks = (invalidate_callback,) if invalidate_callback else () + + cache_key = get_cache_key(args, kwargs) -class CacheDescriptor(_CacheDescriptorBase): + ret = cache.get(cache_key, default=sentinel, callbacks=callbacks) + if ret != sentinel: + return ret + + # Add our own `cache_context` to argument list if the wrapped function + # has asked for one + if self.add_cache_context: + kwargs["cache_context"] = _CacheContext.get_instance(cache, cache_key) + + ret2 = self.orig(obj, *args, **kwargs) + cache.set(cache_key, ret2, callbacks=callbacks) + + return ret2 + + wrapped = cast(_CachedFunction, _wrapped) + wrapped.cache = cache + obj.__dict__[self.orig.__name__] = wrapped + + return wrapped + + +class DeferredCacheDescriptor(_CacheDescriptorBase): """ A method decorator that applies a memoizing cache around the function. This caches deferreds, rather than the results themselves. Deferreds that @@ -141,7 +254,6 @@ class CacheDescriptor(_CacheDescriptorBase): cache_context=False, iterable=False, ): - super().__init__(orig, num_args=num_args, cache_context=cache_context) self.max_entries = max_entries @@ -157,41 +269,7 @@ class CacheDescriptor(_CacheDescriptorBase): iterable=self.iterable, ) # type: DeferredCache[CacheKey, Any] - def get_cache_key_gen(args, kwargs): - """Given some args/kwargs return a generator that resolves into - the cache_key. - - We loop through each arg name, looking up if its in the `kwargs`, - otherwise using the next argument in `args`. If there are no more - args then we try looking the arg name up in the defaults - """ - pos = 0 - for nm in self.arg_names: - if nm in kwargs: - yield kwargs[nm] - elif pos < len(args): - yield args[pos] - pos += 1 - else: - yield self.arg_defaults[nm] - - # By default our cache key is a tuple, but if there is only one item - # then don't bother wrapping in a tuple. This is to save memory. - if self.num_args == 1: - nm = self.arg_names[0] - - def get_cache_key(args, kwargs): - if nm in kwargs: - return kwargs[nm] - elif len(args): - return args[0] - else: - return self.arg_defaults[nm] - - else: - - def get_cache_key(args, kwargs): - return tuple(get_cache_key_gen(args, kwargs)) + get_cache_key = self.cache_key_builder @functools.wraps(self.orig) def _wrapped(*args, **kwargs): @@ -223,7 +301,6 @@ class CacheDescriptor(_CacheDescriptorBase): wrapped.prefill = lambda key, val: cache.prefill(key[0], val) else: wrapped.invalidate = cache.invalidate - wrapped.invalidate_all = cache.invalidate_all wrapped.invalidate_many = cache.invalidate_many wrapped.prefill = cache.prefill @@ -236,7 +313,7 @@ class CacheDescriptor(_CacheDescriptorBase): return wrapped -class CacheListDescriptor(_CacheDescriptorBase): +class DeferredCacheListDescriptor(_CacheDescriptorBase): """Wraps an existing cache to support bulk fetching of keys. Given a list of keys it looks in the cache to find any hits, then passes @@ -382,11 +459,13 @@ class _CacheContext: on a lower level. """ + Cache = Union[DeferredCache, LruCache] + _cache_context_objects = ( WeakValueDictionary() - ) # type: WeakValueDictionary[Tuple[DeferredCache, CacheKey], _CacheContext] + ) # type: WeakValueDictionary[Tuple[_CacheContext.Cache, CacheKey], _CacheContext] - def __init__(self, cache, cache_key): # type: (DeferredCache, CacheKey) -> None + def __init__(self, cache: "_CacheContext.Cache", cache_key: CacheKey) -> None: self._cache = cache self._cache_key = cache_key @@ -396,8 +475,8 @@ class _CacheContext: @classmethod def get_instance( - cls, cache, cache_key - ): # type: (DeferredCache, CacheKey) -> _CacheContext + cls, cache: "_CacheContext.Cache", cache_key: CacheKey + ) -> "_CacheContext": """Returns an instance constructed with the given arguments. A new instance is only created if none already exists. @@ -418,7 +497,7 @@ def cached( cache_context: bool = False, iterable: bool = False, ) -> Callable[[F], _CachedFunction[F]]: - func = lambda orig: CacheDescriptor( + func = lambda orig: DeferredCacheDescriptor( orig, max_entries=max_entries, num_args=num_args, @@ -460,7 +539,7 @@ def cachedList( def batch_do_something(self, first_arg, second_args): ... """ - func = lambda orig: CacheListDescriptor( + func = lambda orig: DeferredCacheListDescriptor( orig, cached_method_name=cached_method_name, list_name=list_name, @@ -468,3 +547,65 @@ def cachedList( ) return cast(Callable[[F], _CachedFunction[F]], func) + + +def get_cache_key_builder( + param_names: Sequence[str], param_defaults: Mapping[str, Any] +) -> Callable[[Sequence[Any], Mapping[str, Any]], CacheKey]: + """Construct a function which will build cache keys suitable for a cached function + + Args: + param_names: list of formal parameter names for the cached function + param_defaults: a mapping from parameter name to default value for that param + + Returns: + A function which will take an (args, kwargs) pair and return a cache key + """ + + # By default our cache key is a tuple, but if there is only one item + # then don't bother wrapping in a tuple. This is to save memory. + + if len(param_names) == 1: + nm = param_names[0] + + def get_cache_key(args: Sequence[Any], kwargs: Mapping[str, Any]) -> CacheKey: + if nm in kwargs: + return kwargs[nm] + elif len(args): + return args[0] + else: + return param_defaults[nm] + + else: + + def get_cache_key(args: Sequence[Any], kwargs: Mapping[str, Any]) -> CacheKey: + return tuple(_get_cache_key_gen(param_names, param_defaults, args, kwargs)) + + return get_cache_key + + +def _get_cache_key_gen( + param_names: Iterable[str], + param_defaults: Mapping[str, Any], + args: Sequence[Any], + kwargs: Mapping[str, Any], +) -> Iterable[Any]: + """Given some args/kwargs return a generator that resolves into + the cache_key. + + This is essentially the same operation as `inspect.getcallargs`, but optimised so + that we don't need to inspect the target function for each call. + """ + + # We loop through each arg name, looking up if its in the `kwargs`, + # otherwise using the next argument in `args`. If there are no more + # args then we try looking the arg name up in the defaults. + pos = 0 + for nm in param_names: + if nm in kwargs: + yield kwargs[nm] + elif pos < len(args): + yield args[pos] + pos += 1 + else: + yield param_defaults[nm] diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 2ad08f541b..cf1e3203a4 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -29,13 +29,46 @@ from synapse.logging.context import ( make_deferred_yieldable, ) from synapse.util.caches import descriptors -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, lru_cache from tests import unittest +from tests.test_utils import get_awaitable_result logger = logging.getLogger(__name__) +class LruCacheDecoratorTestCase(unittest.TestCase): + def test_base(self): + class Cls: + def __init__(self): + self.mock = mock.Mock() + + @lru_cache() + def fn(self, arg1, arg2): + return self.mock(arg1, arg2) + + obj = Cls() + obj.mock.return_value = "fish" + r = obj.fn(1, 2) + self.assertEqual(r, "fish") + obj.mock.assert_called_once_with(1, 2) + obj.mock.reset_mock() + + # a call with different params should call the mock again + obj.mock.return_value = "chips" + r = obj.fn(1, 3) + self.assertEqual(r, "chips") + obj.mock.assert_called_once_with(1, 3) + obj.mock.reset_mock() + + # the two values should now be cached + r = obj.fn(1, 2) + self.assertEqual(r, "fish") + r = obj.fn(1, 3) + self.assertEqual(r, "chips") + obj.mock.assert_not_called() + + def run_on_reactor(): d = defer.Deferred() reactor.callLater(0, d.callback, 0) @@ -362,6 +395,31 @@ class DescriptorTestCase(unittest.TestCase): d = obj.fn(1) self.failureResultOf(d, SynapseError) + def test_invalidate_cascade(self): + """Invalidations should cascade up through cache contexts""" + + class Cls: + @cached(cache_context=True) + async def func1(self, key, cache_context): + return await self.func2(key, on_invalidate=cache_context.invalidate) + + @cached(cache_context=True) + async def func2(self, key, cache_context): + return self.func3(key, on_invalidate=cache_context.invalidate) + + @lru_cache(cache_context=True) + def func3(self, key, cache_context): + self.invalidate = cache_context.invalidate + return 42 + + obj = Cls() + + top_invalidate = mock.Mock() + r = get_awaitable_result(obj.func1("k1", on_invalidate=top_invalidate)) + self.assertEqual(r, 42) + obj.invalidate() + top_invalidate.assert_called_once() + class CacheDecoratorTestCase(unittest.HomeserverTestCase): """More tests for @cached -- cgit 1.5.1 From 59cc2472b3ef6bd84919fabed1f65187556abe78 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 2 Nov 2020 16:36:14 +0000 Subject: Add base pushrule to notify for jitsi conferences (#8286) This could be customised to trigger a different kind of notification in the future, but for now it's a normal non-highlight one. --- changelog.d/8286.feature | 1 + synapse/push/baserules.py | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 changelog.d/8286.feature (limited to 'synapse/push') diff --git a/changelog.d/8286.feature b/changelog.d/8286.feature new file mode 100644 index 0000000000..2c371419af --- /dev/null +++ b/changelog.d/8286.feature @@ -0,0 +1 @@ +Add a push rule that highlights when a jitsi conference is created in a room. diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py index 2858b61fb1..f5788c1de7 100644 --- a/synapse/push/baserules.py +++ b/synapse/push/baserules.py @@ -498,6 +498,30 @@ BASE_APPEND_UNDERRIDE_RULES = [ ], "actions": ["notify", {"set_tweak": "highlight", "value": False}], }, + { + "rule_id": "global/underride/.im.vector.jitsi", + "conditions": [ + { + "kind": "event_match", + "key": "type", + "pattern": "im.vector.modular.widgets", + "_id": "_type_modular_widgets", + }, + { + "kind": "event_match", + "key": "content.type", + "pattern": "jitsi", + "_id": "_content_type_jitsi", + }, + { + "kind": "event_match", + "key": "state_key", + "pattern": "*", + "_id": "_is_state_event", + }, + ], + "actions": ["notify", {"set_tweak": "highlight", "value": False}], + }, ] -- cgit 1.5.1 From 17fa58bdd1c23b9019d080fd98873aa5182f56c0 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Mon, 30 Nov 2020 18:43:54 +0000 Subject: Add a config option to change whether unread push notification counts are per-message or per-room (#8820) This PR adds a new config option to the `push` section of the homeserver config, `group_unread_count_by_room`. By default Synapse will group push notifications by room (so if you have 1000 unread messages, if they lie in 55 rooms, you'll see an unread count on your phone of 55). However, it is also useful to be able to send out the true count of unread messages if desired. If `group_unread_count_by_room` is set to `false`, then with the above example, one would see an unread count of 1000 (email anyone?). --- changelog.d/8820.feature | 1 + docs/sample_config.yaml | 10 +++ synapse/config/push.py | 13 ++++ synapse/push/httppusher.py | 13 +++- synapse/push/push_tools.py | 16 +++-- tests/push/test_http.py | 163 ++++++++++++++++++++++++++++++++++++++++++++- 6 files changed, 207 insertions(+), 9 deletions(-) create mode 100644 changelog.d/8820.feature (limited to 'synapse/push') diff --git a/changelog.d/8820.feature b/changelog.d/8820.feature new file mode 100644 index 0000000000..9e35861b11 --- /dev/null +++ b/changelog.d/8820.feature @@ -0,0 +1 @@ +Add a config option, `push.group_by_unread_count`, which controls whether unread message counts in push notifications are defined as "the number of rooms with unread messages" or "total unread messages". diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index df0f3e1d8e..394eb9a3ff 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -2271,6 +2271,16 @@ push: # #include_content: false + # When a push notification is received, an unread count is also sent. + # This number can either be calculated as the number of unread messages + # for the user, or the number of *rooms* the user has unread messages in. + # + # The default value is "true", meaning push clients will see the number of + # rooms with unread messages in them. Uncomment to instead send the number + # of unread messages. + # + #group_unread_count_by_room: false + # Spam checkers are third-party modules that can block specific actions # of local users, such as creating rooms and registering undesirable diff --git a/synapse/config/push.py b/synapse/config/push.py index a71baac89c..3adbfb73e6 100644 --- a/synapse/config/push.py +++ b/synapse/config/push.py @@ -23,6 +23,9 @@ class PushConfig(Config): def read_config(self, config, **kwargs): push_config = config.get("push") or {} self.push_include_content = push_config.get("include_content", True) + self.push_group_unread_count_by_room = push_config.get( + "group_unread_count_by_room", True + ) pusher_instances = config.get("pusher_instances") or [] self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances) @@ -68,4 +71,14 @@ class PushConfig(Config): # include the event ID and room ID in push notification payloads. # #include_content: false + + # When a push notification is received, an unread count is also sent. + # This number can either be calculated as the number of unread messages + # for the user, or the number of *rooms* the user has unread messages in. + # + # The default value is "true", meaning push clients will see the number of + # rooms with unread messages in them. Uncomment to instead send the number + # of unread messages. + # + #group_unread_count_by_room: false """ diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 793d0db2d9..eff0975b6a 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -75,6 +75,7 @@ class HttpPusher: self.failing_since = pusherdict["failing_since"] self.timed_call = None self._is_processing = False + self._group_unread_count_by_room = hs.config.push_group_unread_count_by_room # This is the highest stream ordering we know it's safe to process. # When new events arrive, we'll be given a window of new events: we @@ -136,7 +137,11 @@ class HttpPusher: async def _update_badge(self): # XXX as per https://github.com/matrix-org/matrix-doc/issues/2627, this seems # to be largely redundant. perhaps we can remove it. - badge = await push_tools.get_badge_count(self.hs.get_datastore(), self.user_id) + badge = await push_tools.get_badge_count( + self.hs.get_datastore(), + self.user_id, + group_by_room=self._group_unread_count_by_room, + ) await self._send_badge(badge) def on_timer(self): @@ -283,7 +288,11 @@ class HttpPusher: return True tweaks = push_rule_evaluator.tweaks_for_actions(push_action["actions"]) - badge = await push_tools.get_badge_count(self.hs.get_datastore(), self.user_id) + badge = await push_tools.get_badge_count( + self.hs.get_datastore(), + self.user_id, + group_by_room=self._group_unread_count_by_room, + ) event = await self.store.get_event(push_action["event_id"], allow_none=True) if event is None: diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index d0145666bf..6e7c880dc0 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -12,12 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from synapse.push.presentable_names import calculate_room_name, name_from_member_event from synapse.storage import Storage +from synapse.storage.databases.main import DataStore -async def get_badge_count(store, user_id): +async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -> int: invites = await store.get_invited_rooms_for_local_user(user_id) joins = await store.get_rooms_for_user(user_id) @@ -34,9 +34,15 @@ async def get_badge_count(store, user_id): room_id, user_id, last_unread_event_id ) ) - # return one badge count per conversation, as count per - # message is so noisy as to be almost useless - badge += 1 if notifs["notify_count"] else 0 + if notifs["notify_count"] == 0: + continue + + if group_by_room: + # return one badge count per conversation + badge += 1 + else: + # increment the badge count by the number of unread messages in the room + badge += notifs["notify_count"] return badge diff --git a/tests/push/test_http.py b/tests/push/test_http.py index 8571924b29..f118430309 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from mock import Mock from twisted.internet.defer import Deferred @@ -20,8 +19,9 @@ from twisted.internet.defer import Deferred import synapse.rest.admin from synapse.logging.context import make_deferred_yieldable from synapse.rest.client.v1 import login, room +from synapse.rest.client.v2_alpha import receipts -from tests.unittest import HomeserverTestCase +from tests.unittest import HomeserverTestCase, override_config class HTTPPusherTests(HomeserverTestCase): @@ -29,6 +29,7 @@ class HTTPPusherTests(HomeserverTestCase): synapse.rest.admin.register_servlets_for_client_rest_resource, room.register_servlets, login.register_servlets, + receipts.register_servlets, ] user_id = True hijack_auth = False @@ -499,3 +500,161 @@ class HTTPPusherTests(HomeserverTestCase): # check that this is low-priority self.assertEqual(self.push_attempts[1][2]["notification"]["prio"], "low") + + def test_push_unread_count_group_by_room(self): + """ + The HTTP pusher will group unread count by number of unread rooms. + """ + # Carry out common push count tests and setup + self._test_push_unread_count() + + # Carry out our option-value specific test + # + # This push should still only contain an unread count of 1 (for 1 unread room) + self.assertEqual( + self.push_attempts[5][2]["notification"]["counts"]["unread"], 1 + ) + + @override_config({"push": {"group_unread_count_by_room": False}}) + def test_push_unread_count_message_count(self): + """ + The HTTP pusher will send the total unread message count. + """ + # Carry out common push count tests and setup + self._test_push_unread_count() + + # Carry out our option-value specific test + # + # We're counting every unread message, so there should now be 4 since the + # last read receipt + self.assertEqual( + self.push_attempts[5][2]["notification"]["counts"]["unread"], 4 + ) + + def _test_push_unread_count(self): + """ + Tests that the correct unread count appears in sent push notifications + + Note that: + * Sending messages will cause push notifications to go out to relevant users + * Sending a read receipt will cause a "badge update" notification to go out to + the user that sent the receipt + """ + # Register the user who gets notified + user_id = self.register_user("user", "pass") + access_token = self.login("user", "pass") + + # Register the user who sends the message + other_user_id = self.register_user("other_user", "pass") + other_access_token = self.login("other_user", "pass") + + # Create a room (as other_user) + room_id = self.helper.create_room_as(other_user_id, tok=other_access_token) + + # The user to get notified joins + self.helper.join(room=room_id, user=user_id, tok=access_token) + + # Register the pusher + user_tuple = self.get_success( + self.hs.get_datastore().get_user_by_access_token(access_token) + ) + token_id = user_tuple.token_id + + self.get_success( + self.hs.get_pusherpool().add_pusher( + user_id=user_id, + access_token=token_id, + kind="http", + app_id="m.http", + app_display_name="HTTP Push Notifications", + device_display_name="pushy push", + pushkey="a@example.com", + lang=None, + data={"url": "example.com"}, + ) + ) + + # Send a message + response = self.helper.send( + room_id, body="Hello there!", tok=other_access_token + ) + # To get an unread count, the user who is getting notified has to have a read + # position in the room. We'll set the read position to this event in a moment + first_message_event_id = response["event_id"] + + # Advance time a bit (so the pusher will register something has happened) and + # make the push succeed + self.push_attempts[0][0].callback({}) + self.pump() + + # Check our push made it + self.assertEqual(len(self.push_attempts), 1) + self.assertEqual(self.push_attempts[0][1], "example.com") + + # Check that the unread count for the room is 0 + # + # The unread count is zero as the user has no read receipt in the room yet + self.assertEqual( + self.push_attempts[0][2]["notification"]["counts"]["unread"], 0 + ) + + # Now set the user's read receipt position to the first event + # + # This will actually trigger a new notification to be sent out so that + # even if the user does not receive another message, their unread + # count goes down + request, channel = self.make_request( + "POST", + "/rooms/%s/receipt/m.read/%s" % (room_id, first_message_event_id), + {}, + access_token=access_token, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Advance time and make the push succeed + self.push_attempts[1][0].callback({}) + self.pump() + + # Unread count is still zero as we've read the only message in the room + self.assertEqual(len(self.push_attempts), 2) + self.assertEqual( + self.push_attempts[1][2]["notification"]["counts"]["unread"], 0 + ) + + # Send another message + self.helper.send( + room_id, body="How's the weather today?", tok=other_access_token + ) + + # Advance time and make the push succeed + self.push_attempts[2][0].callback({}) + self.pump() + + # This push should contain an unread count of 1 as there's now been one + # message since our last read receipt + self.assertEqual(len(self.push_attempts), 3) + self.assertEqual( + self.push_attempts[2][2]["notification"]["counts"]["unread"], 1 + ) + + # Since we're grouping by room, sending more messages shouldn't increase the + # unread count, as they're all being sent in the same room + self.helper.send(room_id, body="Hello?", tok=other_access_token) + + # Advance time and make the push succeed + self.pump() + self.push_attempts[3][0].callback({}) + + self.helper.send(room_id, body="Hello??", tok=other_access_token) + + # Advance time and make the push succeed + self.pump() + self.push_attempts[4][0].callback({}) + + self.helper.send(room_id, body="HELLO???", tok=other_access_token) + + # Advance time and make the push succeed + self.pump() + self.push_attempts[5][0].callback({}) + + self.assertEqual(len(self.push_attempts), 6) -- cgit 1.5.1