From 53cb17366391a13e8d8297c9f4fb3f77fd6b85b7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 25 Jan 2016 13:53:05 +0000 Subject: Push: Use storage apis that are cached --- synapse/push/__init__.py | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index e6a28bd8c0..9bc0b356f4 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.streams.config import PaginationConfig from synapse.types import StreamToken -from synapse.api.constants import Membership import synapse.util.async import push_rule_evaluator as push_rule_evaluator @@ -296,31 +295,28 @@ class Pusher(object): @defer.inlineCallbacks def _get_badge_count(self): - room_list = yield self.store.get_rooms_for_user_where_membership_is( - user_id=self.user_id, - membership_list=(Membership.INVITE, Membership.JOIN) - ) + invites, joins = yield defer.gatherResults([ + self.store.get_invites_for_user(self.user_id), + self.store.get_rooms_for_user(self.user_id), + ], consumeErrors=True) my_receipts_by_room = yield self.store.get_receipts_for_user( self.user_id, "m.read", ) - badge = 0 + badge = len(invites) - for r in room_list: - if r.membership == Membership.INVITE: - badge += 1 - else: - if r.room_id in my_receipts_by_room: - last_unread_event_id = my_receipts_by_room[r.room_id] + for r in joins: + if r.room_id in my_receipts_by_room: + last_unread_event_id = my_receipts_by_room[r.room_id] - notifs = yield ( - self.store.get_unread_event_push_actions_by_room_for_user( - r.room_id, self.user_id, last_unread_event_id - ) + notifs = yield ( + self.store.get_unread_event_push_actions_by_room_for_user( + r.room_id, self.user_id, last_unread_event_id ) - badge += len(notifs) + ) + badge += len(notifs) defer.returnValue(badge) -- cgit 1.4.1 From 766c24b2e6efbb5d250f5a36809caf3026140593 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 26 Jan 2016 10:21:41 +0000 Subject: Only notify for messages in one to one rooms, not every event Fixes the fact that candidate events and hangups generated notifications. --- synapse/push/baserules.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'synapse/push') diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py index 186281dfa3..0832c77cb4 100644 --- a/synapse/push/baserules.py +++ b/synapse/push/baserules.py @@ -173,6 +173,12 @@ BASE_APPEND_UNDERRIDE_RULES = [ 'kind': 'room_member_count', 'is': '2', '_id': 'member_count', + }, + { + 'kind': 'event_match', + 'key': 'type', + 'pattern': 'm.room.message', + '_id': '_message', } ], 'actions': [ -- cgit 1.4.1 From d83d004ccdb7ace1dcb51b8acf7645bc176b10a5 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Tue, 2 Feb 2016 17:18:50 +0000 Subject: Fix flake8 warnings for new flake8 --- setup.cfg | 1 + synapse/api/auth.py | 2 +- synapse/app/__init__.py | 19 ++++++++++++++++ synapse/app/homeserver.py | 38 +++++++++----------------------- synapse/appservice/api.py | 2 +- synapse/federation/federation_client.py | 2 +- synapse/handlers/_base.py | 2 +- synapse/handlers/directory.py | 4 ++-- synapse/handlers/events.py | 2 +- synapse/handlers/presence.py | 2 +- synapse/handlers/register.py | 2 +- synapse/handlers/room.py | 2 +- synapse/http/matrixfederationclient.py | 2 +- synapse/notifier.py | 2 +- synapse/push/push_rule_evaluator.py | 2 +- synapse/rest/client/v1/login.py | 2 +- synapse/rest/client/v1/pusher.py | 4 ++-- synapse/rest/client/v1/register.py | 3 ++- synapse/rest/client/v2_alpha/register.py | 3 ++- synapse/rest/client/versions.py | 4 +--- synapse/server.py | 2 +- synapse/state.py | 2 +- synapse/storage/__init__.py | 2 +- synapse/storage/_base.py | 7 ++++-- synapse/storage/engines/sqlite3.py | 2 +- synapse/storage/event_federation.py | 2 +- synapse/storage/events.py | 6 ++--- synapse/storage/stream.py | 2 +- synapse/util/__init__.py | 2 +- synapse/util/caches/descriptors.py | 4 ++-- synapse/util/caches/expiringcache.py | 2 +- synapse/util/caches/treecache.py | 2 +- synapse/util/logutils.py | 2 +- synapse/util/ratelimitutils.py | 2 +- 34 files changed, 73 insertions(+), 66 deletions(-) (limited to 'synapse/push') diff --git a/setup.cfg b/setup.cfg index ba027c7d13..e7fc5ffe78 100644 --- a/setup.cfg +++ b/setup.cfg @@ -16,3 +16,4 @@ ignore = [flake8] max-line-length = 90 +ignore = W503 diff --git a/synapse/api/auth.py b/synapse/api/auth.py index b5536e8565..c5a2865e26 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -574,7 +574,7 @@ class Auth(object): raise AuthError( 403, "Application service has not registered this user" - ) + ) defer.returnValue(user_id) @defer.inlineCallbacks diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py index bfebb0f644..1bc4279807 100644 --- a/synapse/app/__init__.py +++ b/synapse/app/__init__.py @@ -12,3 +12,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import sys +sys.dont_write_bytecode = True + +from synapse.python_dependencies import ( + check_requirements, MissingRequirementError +) # NOQA + +try: + check_requirements() +except MissingRequirementError as e: + message = "\n".join([ + "Missing Requirement: %s" % (e.message,), + "To install run:", + " pip install --upgrade --force \"%s\"" % (e.dependency,), + "", + ]) + sys.stderr.writelines(message) + sys.exit(1) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e5066c48ef..c3066d6a0d 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -14,27 +14,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import synapse + +import contextlib +import logging +import os +import re +import resource +import subprocess import sys -from synapse.rest import ClientRestResource +import time -sys.dont_write_bytecode = True from synapse.python_dependencies import ( - check_requirements, DEPENDENCY_LINKS, MissingRequirementError + check_requirements, DEPENDENCY_LINKS ) -if __name__ == '__main__': - try: - check_requirements() - except MissingRequirementError as e: - message = "\n".join([ - "Missing Requirement: %s" % (e.message,), - "To install run:", - " pip install --upgrade --force \"%s\"" % (e.dependency,), - "", - ]) - sys.stderr.writelines(message) - sys.exit(1) - +from synapse.rest import ClientRestResource from synapse.storage.engines import create_engine, IncorrectDatabaseSetup from synapse.storage import are_all_users_on_domain from synapse.storage.prepare_database import UpgradeDatabaseException @@ -73,17 +68,6 @@ from synapse import events from daemonize import Daemonize -import synapse - -import contextlib -import logging -import os -import re -import resource -import subprocess -import time - - logger = logging.getLogger("synapse.app.homeserver") diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index e1c07028e8..bc90605324 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -29,7 +29,7 @@ class ApplicationServiceApi(SimpleHttpClient): pushing. """ - def __init__(self, hs): + def __init__(self, hs): super(ApplicationServiceApi, self).__init__(hs) self.clock = hs.get_clock() diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c6259f9dc8..e30e2da58d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -57,7 +57,7 @@ class FederationClient(FederationBase): cache_name="get_pdu_cache", clock=self._clock, max_len=1000, - expiry_ms=120*1000, + expiry_ms=120 * 1000, reset_expiry_on_get=False, ) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 744a9ee507..1423df6cf3 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -147,7 +147,7 @@ class BaseHandler(object): ) if not allowed: raise LimitExceededError( - retry_after_ms=int(1000*(time_allowed - time_now)), + retry_after_ms=int(1000 * (time_allowed - time_now)), ) @defer.inlineCallbacks diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 691564c651..4efecb1ffd 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -175,8 +175,8 @@ class DirectoryHandler(BaseHandler): # If this server is in the list of servers, return it first. if self.server_name in servers: servers = ( - [self.server_name] - + [s for s in servers if s != self.server_name] + [self.server_name] + + [s for s in servers if s != self.server_name] ) else: servers = list(servers) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 254b483da6..5ad8f3779a 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -130,7 +130,7 @@ class EventStreamHandler(BaseHandler): # Add some randomness to this value to try and mitigate against # thundering herds on restart. - timeout = random.randint(int(timeout*0.9), int(timeout*1.1)) + timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1)) events, tokens = yield self.notifier.get_events_for( auth_user, pagin_config, timeout, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d36eb3b8d7..d0c21ff5c9 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -34,7 +34,7 @@ metrics = synapse.metrics.get_metrics_for(__name__) # Don't bother bumping "last active" time if it differs by less than 60 seconds -LAST_ACTIVE_GRANULARITY = 60*1000 +LAST_ACTIVE_GRANULARITY = 60 * 1000 # Keep no more than this number of offline serial revisions MAX_OFFLINE_SERIALS = 1000 diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index abd1a16a41..b8fbcf9233 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -213,7 +213,7 @@ class RegistrationHandler(BaseHandler): 400, "User ID must only contain characters which do not" " require URL encoding." - ) + ) user = UserID(localpart, self.hs.hostname) user_id = user.to_string() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 799221c198..088b76d237 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -927,7 +927,7 @@ class RoomContextHandler(BaseHandler): Returns: dict, or None if the event isn't found """ - before_limit = math.floor(limit/2.) + before_limit = math.floor(limit / 2.) after_limit = limit - before_limit now_token = yield self.hs.get_event_sources().get_current_token() diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index da13e32e78..c3589534f8 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -152,7 +152,7 @@ class MatrixFederationHttpClient(object): return self.clock.time_bound_deferred( request_deferred, - time_out=timeout/1000. if timeout else 60, + time_out=timeout / 1000. if timeout else 60, ) response = yield preserve_context_over_fn( diff --git a/synapse/notifier.py b/synapse/notifier.py index 29965a9ab5..1a90bd55cd 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -308,7 +308,7 @@ class Notifier(object): def timed_out(): if listener: listener.deferred.cancel() - timer = self.clock.call_later(timeout/1000., timed_out) + timer = self.clock.call_later(timeout / 1000., timed_out) prev_token = from_token while not result: diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index dca018af95..2a2b4437dc 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -304,7 +304,7 @@ def _flatten_dict(d, prefix=[], result={}): if isinstance(value, basestring): result[".".join(prefix + [key])] = value.lower() elif hasattr(value, "items"): - _flatten_dict(value, prefix=(prefix+[key]), result=result) + _flatten_dict(value, prefix=(prefix + [key]), result=result) return result diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 07836709fb..7199113dac 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -89,7 +89,7 @@ class LoginRestServlet(ClientV1RestServlet): LoginRestServlet.SAML2_TYPE): relay_state = "" if "relay_state" in login_submission: - relay_state = "&RelayState="+urllib.quote( + relay_state = "&RelayState=" + urllib.quote( login_submission["relay_state"]) result = { "uri": "%s%s" % (self.idp_redirect_url, relay_state) diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py index e218ed215c..5547f1b112 100644 --- a/synapse/rest/client/v1/pusher.py +++ b/synapse/rest/client/v1/pusher.py @@ -52,7 +52,7 @@ class PusherRestServlet(ClientV1RestServlet): if i not in content: missing.append(i) if len(missing): - raise SynapseError(400, "Missing parameters: "+','.join(missing), + raise SynapseError(400, "Missing parameters: " + ','.join(missing), errcode=Codes.MISSING_PARAM) logger.debug("set pushkey %s to kind %s", content['pushkey'], content['kind']) @@ -83,7 +83,7 @@ class PusherRestServlet(ClientV1RestServlet): data=content['data'] ) except PusherConfigException as pce: - raise SynapseError(400, "Config Error: "+pce.message, + raise SynapseError(400, "Config Error: " + pce.message, errcode=Codes.MISSING_PARAM) defer.returnValue((200, {})) diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index 5378a9a938..2bfd4d96bf 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -38,7 +38,8 @@ logger = logging.getLogger(__name__) if hasattr(hmac, "compare_digest"): compare_digest = hmac.compare_digest else: - compare_digest = lambda a, b: a == b + def compare_digest(a, b): + return a == b class RegisterRestServlet(ClientV1RestServlet): diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 5d50dd9e3d..56a5bbec30 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -34,7 +34,8 @@ from synapse.util.async import run_on_reactor if hasattr(hmac, "compare_digest"): compare_digest = hmac.compare_digest else: - compare_digest = lambda a, b: a == b + def compare_digest(a, b): + return a == b logger = logging.getLogger(__name__) diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index 349ef6b396..ca5468c402 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -26,9 +26,7 @@ class VersionsRestServlet(RestServlet): def on_GET(self, request): return (200, { - "versions": [ - "r0.0.1", - ] + "versions": ["r0.0.1"] }) diff --git a/synapse/server.py b/synapse/server.py index 5fee7fe130..368d615576 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -23,7 +23,7 @@ from twisted.web.client import BrowserLikePolicyForHTTPS from twisted.enterprise import adbapi from synapse.federation import initialize_http_replication -from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory +from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.notifier import Notifier from synapse.api.auth import Auth from synapse.handlers import Handlers diff --git a/synapse/state.py b/synapse/state.py index 0acf309fe0..b9a1387520 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -63,7 +63,7 @@ class StateHandler(object): cache_name="state_cache", clock=self.clock, max_len=SIZE_OF_CACHE, - expiry_ms=EVICTION_TIMEOUT_SECONDS*1000, + expiry_ms=EVICTION_TIMEOUT_SECONDS * 1000, reset_expiry_on_get=True, ) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index c91c7a3729..5a9e7720d9 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -59,7 +59,7 @@ logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller # times give more inserts into the database even for readonly API hits # 120 seconds == 2 minutes -LAST_SEEN_GRANULARITY = 120*1000 +LAST_SEEN_GRANULARITY = 120 * 1000 class DataStore(RoomMemberStore, RoomStore, diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 5e77320540..cfb87d9328 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -185,7 +185,7 @@ class SQLBaseStore(object): time_then = self._previous_loop_ts self._previous_loop_ts = time_now - ratio = (curr - prev)/(time_now - time_then) + ratio = (curr - prev) / (time_now - time_then) top_three_counters = self._txn_perf_counters.interval( time_now - time_then, limit=3 @@ -643,7 +643,10 @@ class SQLBaseStore(object): if not iterable: defer.returnValue(results) - chunks = [iterable[i:i+batch_size] for i in xrange(0, len(iterable), batch_size)] + chunks = [ + iterable[i:i + batch_size] + for i in xrange(0, len(iterable), batch_size) + ] for chunk in chunks: rows = yield self.runInteraction( desc, diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py index 400c10103c..91fac33b8b 100644 --- a/synapse/storage/engines/sqlite3.py +++ b/synapse/storage/engines/sqlite3.py @@ -54,7 +54,7 @@ class Sqlite3Engine(object): def _parse_match_info(buf): bufsize = len(buf) - return [struct.unpack('@I', buf[i:i+4])[0] for i in range(0, bufsize, 4)] + return [struct.unpack('@I', buf[i:i + 4])[0] for i in range(0, bufsize, 4)] def _rank(raw_match_info): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 5f32eec6f8..ce2c794025 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -58,7 +58,7 @@ class EventFederationStore(SQLBaseStore): new_front = set() front_list = list(front) chunks = [ - front_list[x:x+100] + front_list[x:x + 100] for x in xrange(0, len(front), 100) ] for chunk in chunks: diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5e85552029..4d7cdd00d0 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -84,7 +84,7 @@ class EventsStore(SQLBaseStore): event.internal_metadata.stream_ordering = stream chunks = [ - events_and_contexts[x:x+100] + events_and_contexts[x:x + 100] for x in xrange(0, len(events_and_contexts), 100) ] @@ -740,7 +740,7 @@ class EventsStore(SQLBaseStore): rows = [] N = 200 for i in range(1 + len(events) / N): - evs = events[i*N:(i + 1)*N] + evs = events[i * N:(i + 1) * N] if not evs: break @@ -755,7 +755,7 @@ class EventsStore(SQLBaseStore): " LEFT JOIN rejections as rej USING (event_id)" " LEFT JOIN redactions as r ON e.event_id = r.redacts" " WHERE e.event_id IN (%s)" - ) % (",".join(["?"]*len(evs)),) + ) % (",".join(["?"] * len(evs)),) txn.execute(sql, evs) rows.extend(self.cursor_to_dict(txn)) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 338a9d40d5..2c49a5e499 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -168,7 +168,7 @@ class StreamStore(SQLBaseStore): results = {} room_ids = list(room_ids) - for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)): + for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): res = yield defer.gatherResults([ self.get_room_events_stream_for_room( room_id, from_key, to_key, limit diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index f1fe963adf..7566d9eb33 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -46,7 +46,7 @@ class Clock(object): def looping_call(self, f, msec): l = task.LoopingCall(f) - l.start(msec/1000.0, now=False) + l.start(msec / 1000.0, now=False) return l def stop_looping_call(self, loop): diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 88e56e3302..e27917c63a 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -149,7 +149,7 @@ class CacheDescriptor(object): self.lru = lru self.tree = tree - self.arg_names = inspect.getargspec(orig).args[1:num_args+1] + self.arg_names = inspect.getargspec(orig).args[1:num_args + 1] if len(self.arg_names) < self.num_args: raise Exception( @@ -250,7 +250,7 @@ class CacheListDescriptor(object): self.num_args = num_args self.list_name = list_name - self.arg_names = inspect.getargspec(orig).args[1:num_args+1] + self.arg_names = inspect.getargspec(orig).args[1:num_args + 1] self.list_pos = self.arg_names.index(self.list_name) self.cache = cache diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 494226f5ea..62cae99649 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -55,7 +55,7 @@ class ExpiringCache(object): def f(): self._prune_cache() - self._clock.looping_call(f, self._expiry_ms/2) + self._clock.looping_call(f, self._expiry_ms / 2) def __setitem__(self, key, value): now = self._clock.time_msec() diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index 29d02f7e95..03bc1401b7 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -58,7 +58,7 @@ class TreeCache(object): if n: break - node_and_keys[i+1][0].pop(k) + node_and_keys[i + 1][0].pop(k) popped, cnt = _strip_and_count_entires(popped) self.size -= cnt diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py index d5b1a37eff..c37a157787 100644 --- a/synapse/util/logutils.py +++ b/synapse/util/logutils.py @@ -111,7 +111,7 @@ def time_function(f): _log_debug_as_f( f, "[FUNC END] {%s-%d} %f", - (func_name, id, end-start,), + (func_name, id, end - start,), ) return r diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index c37d6f12e3..ea321bc6a9 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -163,7 +163,7 @@ class _PerHostRatelimiter(object): "Ratelimit [%s]: sleeping req", id(request_id), ) - ret_defer = sleep(self.sleep_msec/1000.0) + ret_defer = sleep(self.sleep_msec / 1000.0) self.sleeping_requests.add(request_id) -- cgit 1.4.1 From 771528ab1323715271b9e968d2d337b88910fb2f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Feb 2016 10:50:49 +0000 Subject: Change event_push_actions_rm_tokens schema --- synapse/handlers/sync.py | 6 +-- synapse/push/__init__.py | 2 +- synapse/storage/event_push_actions.py | 47 ++++++++++++++++-------- synapse/storage/prepare_database.py | 2 +- synapse/storage/schema/delta/29/push_actions.sql | 31 ++++++++++++++++ 5 files changed, 67 insertions(+), 21 deletions(-) create mode 100644 synapse/storage/schema/delta/29/push_actions.sql (limited to 'synapse/push') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index dc686db541..0292e06733 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -706,10 +706,8 @@ class SyncHandler(BaseHandler): ) if notifs is not None: - unread_notifications["notification_count"] = len(notifs) - unread_notifications["highlight_count"] = len([ - 1 for notif in notifs if _action_has_highlight(notif["actions"]) - ]) + unread_notifications["notification_count"] = notifs["notify_count"] + unread_notifications["highlight_count"] = notifs["highlight_count"] logger.debug("Room sync: %r", room_sync) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 9bc0b356f4..8b9d0f03e5 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -316,7 +316,7 @@ class Pusher(object): r.room_id, self.user_id, last_unread_event_id ) ) - badge += len(notifs) + badge += notifs["notify_count"] defer.returnValue(badge) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index a05c4f84cf..aca3219206 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -37,7 +37,11 @@ class EventPushActionsStore(SQLBaseStore): 'event_id': event.event_id, 'user_id': uid, 'profile_tag': profile_tag, - 'actions': json.dumps(actions) + 'actions': json.dumps(actions), + 'stream_ordering': event.internal_metadata.stream_ordering, + 'topological_ordering': event.depth, + 'notif': 1, + 'highlight': 1 if _action_has_highlight(actions) else 0, }) def f(txn): @@ -74,26 +78,28 @@ class EventPushActionsStore(SQLBaseStore): topological_ordering = results[0][1] sql = ( - "SELECT ea.event_id, ea.actions" - " FROM event_push_actions ea, events e" - " WHERE ea.room_id = e.room_id" - " AND ea.event_id = e.event_id" - " AND ea.user_id = ?" - " AND ea.room_id = ?" + "SELECT sum(notif), sum(highlight)" + " FROM event_push_actions ea" + " WHERE" + " user_id = ?" + " AND room_id = ?" " AND (" - " e.topological_ordering > ?" - " OR (e.topological_ordering = ? AND e.stream_ordering > ?)" + " topological_ordering > ?" + " OR (topological_ordering = ? AND stream_ordering > ?)" ")" ) txn.execute(sql, ( user_id, room_id, topological_ordering, topological_ordering, stream_ordering - ) - ) - return [ - {"event_id": row[0], "actions": json.loads(row[1])} - for row in txn.fetchall() - ] + )) + row = txn.fetchone() + if row: + return { + "notify_count": row[0] or 0, + "highlight_count": row[1] or 0, + } + else: + return {"notify_count": 0, "highlight_count": 0} ret = yield self.runInteraction( "get_unread_event_push_actions_by_room", @@ -117,3 +123,14 @@ class EventPushActionsStore(SQLBaseStore): "remove_push_actions_for_event_id", f ) + + +def _action_has_highlight(actions): + for action in actions: + try: + if action.get("set_tweak", None) == "highlight": + return action.get("value", True) + except AttributeError: + pass + + return False diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index c1f5f99789..d782b8e25b 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 28 +SCHEMA_VERSION = 29 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/29/push_actions.sql b/synapse/storage/schema/delta/29/push_actions.sql new file mode 100644 index 0000000000..7e7b09820a --- /dev/null +++ b/synapse/storage/schema/delta/29/push_actions.sql @@ -0,0 +1,31 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE event_push_actions ADD COLUMN topological_ordering BIGINT; +ALTER TABLE event_push_actions ADD COLUMN stream_ordering BIGINT; +ALTER TABLE event_push_actions ADD COLUMN notif SMALLINT; +ALTER TABLE event_push_actions ADD COLUMN highlight SMALLINT; + +UPDATE event_push_actions SET stream_ordering = ( + SELECT stream_ordering FROM events WHERE event_id = event_push_actions.event_id +), topological_ordering = ( + SELECT topological_ordering FROM events WHERE event_id = event_push_actions.event_id +); + +UPDATE event_push_actions SET notif = 1, highlight = 0; + +CREATE INDEX event_push_actions_rm_tokens on event_push_actions( + user_id, room_id, topological_ordering, stream_ordering +); -- cgit 1.4.1 From 13e6262659fd544155875e18c0a3351c12d7651d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Feb 2016 10:15:56 +0000 Subject: Add metrics to pushers --- synapse/http/server.py | 10 ++++++ synapse/push/__init__.py | 84 +++++++++++++++++++++++++++++----------------- synapse/util/metrics.py | 86 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 149 insertions(+), 31 deletions(-) create mode 100644 synapse/util/metrics.py (limited to 'synapse/push') diff --git a/synapse/http/server.py b/synapse/http/server.py index c250a4604f..06935783ca 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -208,6 +208,9 @@ class JsonResource(HttpServer, resource.Resource): if request.method == "OPTIONS": self._send_response(request, 200, {}) return + + start_context = LoggingContext.current_context() + # Loop through all the registered callbacks to check if the method # and path regex match for path_entry in self.path_regexs.get(request.method, []): @@ -243,6 +246,13 @@ class JsonResource(HttpServer, resource.Resource): if context: tag = context.tag + if context != start_context: + logger.warn( + "Context have unexpectedly changed %r, %r", + context, self.start_context + ) + return + incoming_requests_counter.inc(request.method, servlet_classname, tag) response_timer.inc_by( diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 8b9d0f03e5..64e581b8ba 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -17,6 +17,8 @@ from twisted.internet import defer from synapse.streams.config import PaginationConfig from synapse.types import StreamToken +from synapse.util.logcontext import LoggingContext +from synapse.util.metrics import Measure import synapse.util.async import push_rule_evaluator as push_rule_evaluator @@ -27,6 +29,16 @@ import random logger = logging.getLogger(__name__) +_NEXT_ID = 1 + + +def _get_next_id(): + global _NEXT_ID + _id = _NEXT_ID + _NEXT_ID += 1 + return _id + + # Pushers could now be moved to pull out of the event_push_actions table instead # of listening on the event stream: this would avoid them having to run the # rules again. @@ -57,6 +69,8 @@ class Pusher(object): self.alive = True self.badge = None + self.name = "Pusher-%d" % (_get_next_id(),) + # The last value of last_active_time that we saw self.last_last_active_time = 0 self.has_unread = True @@ -86,38 +100,46 @@ class Pusher(object): @defer.inlineCallbacks def start(self): - if not self.last_token: - # First-time setup: get a token to start from (we can't - # just start from no token, ie. 'now' - # because we need the result to be reproduceable in case - # we fail to dispatch the push) - config = PaginationConfig(from_token=None, limit='1') - chunk = yield self.evStreamHandler.get_stream( - self.user_id, config, timeout=0, affect_presence=False - ) - self.last_token = chunk['end'] - self.store.update_pusher_last_token( - self.app_id, self.pushkey, self.user_id, self.last_token - ) - logger.info("Pusher %s for user %s starting from token %s", - self.pushkey, self.user_id, self.last_token) - - wait = 0 - while self.alive: - try: - if wait > 0: - yield synapse.util.async.sleep(wait) - yield self.get_and_dispatch() - wait = 0 - except: - if wait == 0: - wait = 1 - else: - wait = min(wait * 2, 1800) - logger.exception( - "Exception in pusher loop for pushkey %s. Pausing for %ds", - self.pushkey, wait + with LoggingContext(self.name): + if not self.last_token: + # First-time setup: get a token to start from (we can't + # just start from no token, ie. 'now' + # because we need the result to be reproduceable in case + # we fail to dispatch the push) + config = PaginationConfig(from_token=None, limit='1') + chunk = yield self.evStreamHandler.get_stream( + self.user_id, config, timeout=0, affect_presence=False ) + self.last_token = chunk['end'] + self.store.update_pusher_last_token( + self.app_id, self.pushkey, self.user_id, self.last_token + ) + logger.info("New pusher %s for user %s starting from token %s", + self.pushkey, self.user_id, self.last_token) + + else: + logger.info( + "Old pusher %s for user %s starting", + self.pushkey, self.user_id, + ) + + wait = 0 + while self.alive: + try: + if wait > 0: + yield synapse.util.async.sleep(wait) + with Measure(self.clock, "push"): + yield self.get_and_dispatch() + wait = 0 + except: + if wait == 0: + wait = 1 + else: + wait = min(wait * 2, 1800) + logger.exception( + "Exception in pusher loop for pushkey %s. Pausing for %ds", + self.pushkey, wait + ) @defer.inlineCallbacks def get_and_dispatch(self): diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py new file mode 100644 index 0000000000..daf6087fe0 --- /dev/null +++ b/synapse/util/metrics.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from synapse.util.logcontext import LoggingContext +import synapse.metrics + +import logging + + +logger = logging.getLogger(__name__) + + +metrics = synapse.metrics.get_metrics_for(__name__) + +block_timer = metrics.register_distribution( + "block_timer", + labels=["block_name"] +) + +block_ru_utime = metrics.register_distribution( + "block_ru_utime", labels=["block_name"] +) + +block_ru_stime = metrics.register_distribution( + "block_ru_stime", labels=["block_name"] +) + +block_db_txn_count = metrics.register_distribution( + "block_db_txn_count", labels=["block_name"] +) + +block_db_txn_duration = metrics.register_distribution( + "block_db_txn_duration", labels=["block_name"] +) + + +class Measure(object): + __slots__ = ["clock", "name", "start_context", "start"] + + def __init__(self, clock, name): + self.clock = clock + self.name = name + self.start_context = None + self.start = None + + def __enter__(self): + self.start = self.clock.time_msec() + self.start_context = LoggingContext.current_context() + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + return + + duration = self.clock.time_msec() - self.start + block_timer.inc_by(duration, self.name) + + context = LoggingContext.current_context() + if not context: + return + + if context != self.start_context: + logger.warn( + "Context have unexpectedly changed %r, %r", + context, self.start_context + ) + return + + ru_utime, ru_stime = context.get_resource_usage() + + block_ru_utime.inc_by(ru_utime, self.name) + block_ru_stime.inc_by(ru_stime, self.name) + block_db_txn_count.inc_by(context.db_txn_count, self.name) + block_db_txn_duration.inc_by(context.db_txn_duration, self.name) -- cgit 1.4.1 From 2c1fbea5319db2c64fa486adb32b5e66680b6daf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Feb 2016 10:22:44 +0000 Subject: Fix up logcontexts --- synapse/api/auth.py | 4 +- synapse/app/homeserver.py | 2 + synapse/crypto/keyring.py | 83 ++++++++++++----------- synapse/federation/federation_server.py | 4 +- synapse/federation/transaction_queue.py | 3 - synapse/handlers/_base.py | 10 +-- synapse/handlers/events.py | 11 +++- synapse/handlers/federation.py | 50 ++------------ synapse/handlers/presence.py | 20 +++--- synapse/handlers/register.py | 2 +- synapse/handlers/room.py | 11 +++- synapse/handlers/sync.py | 40 ++++++------ synapse/http/server.py | 5 +- synapse/notifier.py | 58 ++++++++-------- synapse/push/__init__.py | 2 +- synapse/push/pusherpool.py | 9 +-- synapse/rest/client/v2_alpha/account_data.py | 4 +- synapse/rest/client/v2_alpha/tags.py | 4 +- synapse/storage/_base.py | 18 ++--- synapse/storage/events.py | 34 ++++++---- synapse/storage/presence.py | 5 +- synapse/storage/stream.py | 9 +-- synapse/util/__init__.py | 6 +- synapse/util/async.py | 11 +++- synapse/util/caches/descriptors.py | 16 +++-- synapse/util/caches/snapshot_cache.py | 3 +- synapse/util/distributor.py | 15 +++-- synapse/util/logcontext.py | 98 ++++++++++++++++++++++++++-- synapse/util/logutils.py | 35 ++++++++++ synapse/util/metrics.py | 10 +-- synapse/util/ratelimitutils.py | 3 +- 31 files changed, 356 insertions(+), 229 deletions(-) (limited to 'synapse/push') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 5bba9343f6..e2f84c4d57 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -24,6 +24,7 @@ from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import AuthError, Codes, SynapseError, EventSizeError from synapse.types import Requester, RoomID, UserID, EventID from synapse.util.logutils import log_function +from synapse.util.logcontext import preserve_context_over_fn from unpaddedbase64 import decode_base64 import logging @@ -529,7 +530,8 @@ class Auth(object): default=[""] )[0] if user and access_token and ip_addr: - self.store.insert_client_ip( + preserve_context_over_fn( + self.store.insert_client_ip, user=user, access_token=access_token, ip=ip_addr, diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e5c7e39cf9..2b4be7bdd0 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -709,6 +709,8 @@ def run(hs): phone_home_task.start(60 * 60 * 24, now=False) def in_thread(): + # Uncomment to enable tracing of log context changes. + # sys.settrace(logcontext_tracer) with LoggingContext("run"): change_resource_limit(hs.config.soft_file_limit) reactor.run() diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index cddec0b2bc..d08ee0aa91 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -18,6 +18,10 @@ from synapse.api.errors import SynapseError, Codes from synapse.util.retryutils import get_retry_limiter from synapse.util import unwrapFirstError from synapse.util.async import ObservableDeferred +from synapse.util.logcontext import ( + preserve_context_over_deferred, preserve_context_over_fn, PreserveLoggingContext, + preserve_fn +) from twisted.internet import defer @@ -142,40 +146,43 @@ class Keyring(object): for server_name, _ in server_and_json } - # We want to wait for any previous lookups to complete before - # proceeding. - wait_on_deferred = self.wait_for_previous_lookups( - [server_name for server_name, _ in server_and_json], - server_to_deferred, - ) + with PreserveLoggingContext(): - # Actually start fetching keys. - wait_on_deferred.addBoth( - lambda _: self.get_server_verify_keys(group_id_to_group, deferreds) - ) + # We want to wait for any previous lookups to complete before + # proceeding. + wait_on_deferred = self.wait_for_previous_lookups( + [server_name for server_name, _ in server_and_json], + server_to_deferred, + ) - # When we've finished fetching all the keys for a given server_name, - # resolve the deferred passed to `wait_for_previous_lookups` so that - # any lookups waiting will proceed. - server_to_gids = {} + # Actually start fetching keys. + wait_on_deferred.addBoth( + lambda _: self.get_server_verify_keys(group_id_to_group, deferreds) + ) + + # When we've finished fetching all the keys for a given server_name, + # resolve the deferred passed to `wait_for_previous_lookups` so that + # any lookups waiting will proceed. + server_to_gids = {} - def remove_deferreds(res, server_name, group_id): - server_to_gids[server_name].discard(group_id) - if not server_to_gids[server_name]: - d = server_to_deferred.pop(server_name, None) - if d: - d.callback(None) - return res + def remove_deferreds(res, server_name, group_id): + server_to_gids[server_name].discard(group_id) + if not server_to_gids[server_name]: + d = server_to_deferred.pop(server_name, None) + if d: + d.callback(None) + return res - for g_id, deferred in deferreds.items(): - server_name = group_id_to_group[g_id].server_name - server_to_gids.setdefault(server_name, set()).add(g_id) - deferred.addBoth(remove_deferreds, server_name, g_id) + for g_id, deferred in deferreds.items(): + server_name = group_id_to_group[g_id].server_name + server_to_gids.setdefault(server_name, set()).add(g_id) + deferred.addBoth(remove_deferreds, server_name, g_id) # Pass those keys to handle_key_deferred so that the json object # signatures can be verified return [ - handle_key_deferred( + preserve_context_over_fn( + handle_key_deferred, group_id_to_group[g_id], deferreds[g_id], ) @@ -198,12 +205,13 @@ class Keyring(object): if server_name in self.key_downloads ] if wait_on: - yield defer.DeferredList(wait_on) + with PreserveLoggingContext(): + yield defer.DeferredList(wait_on) else: break for server_name, deferred in server_to_deferred.items(): - d = ObservableDeferred(deferred) + d = ObservableDeferred(preserve_context_over_deferred(deferred)) self.key_downloads[server_name] = d def rm(r, server_name): @@ -244,12 +252,13 @@ class Keyring(object): for group in group_id_to_group.values(): for key_id in group.key_ids: if key_id in merged_results[group.server_name]: - group_id_to_deferred[group.group_id].callback(( - group.group_id, - group.server_name, - key_id, - merged_results[group.server_name][key_id], - )) + with PreserveLoggingContext(): + group_id_to_deferred[group.group_id].callback(( + group.group_id, + group.server_name, + key_id, + merged_results[group.server_name][key_id], + )) break else: missing_groups.setdefault( @@ -504,7 +513,7 @@ class Keyring(object): yield defer.gatherResults( [ - self.store_keys( + preserve_fn(self.store_keys)( server_name=key_server_name, from_server=server_name, verify_keys=verify_keys, @@ -573,7 +582,7 @@ class Keyring(object): yield defer.gatherResults( [ - self.store.store_server_keys_json( + preserve_fn(self.store.store_server_keys_json)( server_name=server_name, key_id=key_id, from_server=server_name, @@ -675,7 +684,7 @@ class Keyring(object): # TODO(markjh): Store whether the keys have expired. yield defer.gatherResults( [ - self.store.store_server_verify_key( + preserve_fn(self.store.store_server_verify_key)( server_name, server_name, key.time_added, key ) for key_id, key in verify_keys.items() diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index a97aa0c94a..90718192dd 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -126,10 +126,8 @@ class FederationServer(FederationBase): results = [] for pdu in pdu_list: - d = self._handle_new_pdu(transaction.origin, pdu) - try: - yield d + yield self._handle_new_pdu(transaction.origin, pdu) results.append({}) except FederationError as e: self.send_failure(e, transaction.origin) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 622adad3ae..1928da03b3 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -103,7 +103,6 @@ class TransactionQueue(object): else: return not destination.startswith("localhost") - @defer.inlineCallbacks def enqueue_pdu(self, pdu, destinations, order): # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus @@ -141,8 +140,6 @@ class TransactionQueue(object): deferreds.append(deferred) - yield defer.DeferredList(deferreds, consumeErrors=True) - # NO inlineCallbacks def enqueue_edu(self, edu): destination = edu.destination diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 1423df6cf3..fa83d3e464 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -293,19 +293,11 @@ class BaseHandler(object): with PreserveLoggingContext(): # Don't block waiting on waking up all the listeners. - notify_d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - notify_d.addErrback(log_failure) - # If invite, remove room_state from unsigned before sending. event.unsigned.pop("invite_room_state", None) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 5ad8f3779a..4933c31c19 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.types import UserID from synapse.events.utils import serialize_event +from synapse.util.logcontext import preserve_context_over_fn from ._base import BaseHandler @@ -29,11 +30,17 @@ logger = logging.getLogger(__name__) def started_user_eventstream(distributor, user): - return distributor.fire("started_user_eventstream", user) + return preserve_context_over_fn( + distributor.fire, + "started_user_eventstream", user + ) def stopped_user_eventstream(distributor, user): - return distributor.fire("stopped_user_eventstream", user) + return preserve_context_over_fn( + distributor.fire, + "stopped_user_eventstream", user + ) class EventStreamHandler(BaseHandler): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2ce1e9d6c7..b78b0502d9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -221,19 +221,11 @@ class FederationHandler(BaseHandler): extra_users.append(target_user) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - if event.type == EventTypes.Member: if event.membership == Membership.JOIN: prev_state = context.current_state.get((event.type, event.state_key)) @@ -643,19 +635,11 @@ class FederationHandler(BaseHandler): ) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=[joinee] ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - logger.debug("Finished joining %s to %s", joinee, room_id) finally: room_queue = self.room_queues[room_id] @@ -730,18 +714,10 @@ class FederationHandler(BaseHandler): extra_users.append(target_user) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = UserID.from_string(event.state_key) @@ -811,19 +787,11 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(event.state_key) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=[target_user], ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - defer.returnValue(event) @defer.inlineCallbacks @@ -948,18 +916,10 @@ class FederationHandler(BaseHandler): extra_users.append(target_user) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - new_pdu = event destinations = set() diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d0c21ff5c9..b61394f2b5 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -378,9 +378,9 @@ class PresenceHandler(BaseHandler): was_polling = target_user in self._user_cachemap if now_online and not was_polling: - self.start_polling_presence(target_user, state=state) + yield self.start_polling_presence(target_user, state=state) elif not now_online and was_polling: - self.stop_polling_presence(target_user) + yield self.stop_polling_presence(target_user) # TODO(paul): perform a presence push as part of start/stop poll so # we don't have to do this all the time @@ -394,7 +394,8 @@ class PresenceHandler(BaseHandler): if now - prev_state.state.get("last_active", 0) < LAST_ACTIVE_GRANULARITY: return - self.changed_presencelike_data(user, {"last_active": now}) + with PreserveLoggingContext(): + self.changed_presencelike_data(user, {"last_active": now}) def get_joined_rooms_for_user(self, user): """Get the list of rooms a user is joined to. @@ -466,11 +467,12 @@ class PresenceHandler(BaseHandler): local_user, room_ids=[room_id], add_to_cache=False ) - self.push_update_to_local_and_remote( - observed_user=local_user, - users_to_push=[user], - statuscache=statuscache, - ) + with PreserveLoggingContext(): + self.push_update_to_local_and_remote( + observed_user=local_user, + users_to_push=[user], + statuscache=statuscache, + ) @defer.inlineCallbacks def send_presence_invite(self, observer_user, observed_user): @@ -556,7 +558,7 @@ class PresenceHandler(BaseHandler): observer_user.localpart, observed_user.to_string() ) - self.start_polling_presence( + yield self.start_polling_presence( observer_user, target_user=observed_user ) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 2660fd21a2..24c850ae9b 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -186,7 +186,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash="" ) - registered_user(self.distributor, user) + yield registered_user(self.distributor, user) defer.returnValue((user_id, token)) @defer.inlineCallbacks diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index bfd7e44e9f..a8e3a9029c 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -25,6 +25,7 @@ from synapse.api.constants import ( from synapse.api.errors import AuthError, StoreError, SynapseError, Codes from synapse.util import stringutils, unwrapFirstError from synapse.util.async import run_on_reactor +from synapse.util.logcontext import preserve_context_over_fn from signedjson.sign import verify_signed_json from signedjson.key import decode_verify_key_bytes @@ -46,11 +47,17 @@ def collect_presencelike_data(distributor, user, content): def user_left_room(distributor, user, room_id): - return distributor.fire("user_left_room", user=user, room_id=room_id) + return preserve_context_over_fn( + distributor.fire, + "user_left_room", user=user, room_id=room_id + ) def user_joined_room(distributor, user, room_id): - return distributor.fire("user_joined_room", user=user, room_id=room_id) + return preserve_context_over_fn( + distributor.fire, + "user_joined_room", user=user, room_id=room_id + ) class RoomCreationHandler(BaseHandler): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 72271f2626..3f1cda5b0b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -18,7 +18,7 @@ from ._base import BaseHandler from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes from synapse.util import unwrapFirstError -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from twisted.internet import defer @@ -241,15 +241,16 @@ class SyncHandler(BaseHandler): deferreds = [] for event in room_list: if event.membership == Membership.JOIN: - room_sync_deferred = self.full_state_sync_for_joined_room( - room_id=event.room_id, - sync_config=sync_config, - now_token=now_token, - timeline_since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) + with PreserveLoggingContext(LoggingContext.current_context()): + room_sync_deferred = self.full_state_sync_for_joined_room( + room_id=event.room_id, + sync_config=sync_config, + now_token=now_token, + timeline_since_token=timeline_since_token, + ephemeral_by_room=ephemeral_by_room, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + ) room_sync_deferred.addCallback(joined.append) deferreds.append(room_sync_deferred) elif event.membership == Membership.INVITE: @@ -262,15 +263,16 @@ class SyncHandler(BaseHandler): leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) - room_sync_deferred = self.full_state_sync_for_archived_room( - sync_config=sync_config, - room_id=event.room_id, - leave_event_id=event.event_id, - leave_token=leave_token, - timeline_since_token=timeline_since_token, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) + with PreserveLoggingContext(LoggingContext.current_context()): + room_sync_deferred = self.full_state_sync_for_archived_room( + sync_config=sync_config, + room_id=event.room_id, + leave_event_id=event.event_id, + leave_token=leave_token, + timeline_since_token=timeline_since_token, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + ) room_sync_deferred.addCallback(archived.append) deferreds.append(room_sync_deferred) diff --git a/synapse/http/server.py b/synapse/http/server.py index 06935783ca..a90e2e1125 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -99,9 +99,8 @@ def request_handler(request_handler): request_context.request = request_id with request.processing(): try: - d = request_handler(self, request) - with PreserveLoggingContext(): - yield d + with PreserveLoggingContext(request_context): + yield request_handler(self, request) except CodeMessageException as e: code = e.code if isinstance(e, SynapseError): diff --git a/synapse/notifier.py b/synapse/notifier.py index 1a90bd55cd..560866b26e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -18,7 +18,8 @@ from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor, ObservableDeferred +from synapse.util.async import ObservableDeferred +from synapse.util.logcontext import PreserveLoggingContext from synapse.types import StreamToken import synapse.metrics @@ -73,7 +74,8 @@ class _NotifierUserStream(object): self.current_token = current_token self.last_notified_ms = time_now_ms - self.notify_deferred = ObservableDeferred(defer.Deferred()) + with PreserveLoggingContext(): + self.notify_deferred = ObservableDeferred(defer.Deferred()) def notify(self, stream_key, stream_id, time_now_ms): """Notify any listeners for this user of a new event from an @@ -88,8 +90,10 @@ class _NotifierUserStream(object): ) self.last_notified_ms = time_now_ms noify_deferred = self.notify_deferred - self.notify_deferred = ObservableDeferred(defer.Deferred()) - noify_deferred.callback(self.current_token) + + with PreserveLoggingContext(): + self.notify_deferred = ObservableDeferred(defer.Deferred()) + noify_deferred.callback(self.current_token) def remove(self, notifier): """ Remove this listener from all the indexes in the Notifier @@ -184,8 +188,6 @@ class Notifier(object): lambda: count(bool, self.appservice_to_user_streams.values()), ) - @log_function - @defer.inlineCallbacks def on_new_room_event(self, event, room_stream_id, max_room_stream_id, extra_users=[]): """ Used by handlers to inform the notifier something has happened @@ -199,12 +201,11 @@ class Notifier(object): until all previous events have been persisted before notifying the client streams. """ - yield run_on_reactor() - - self.pending_new_room_events.append(( - room_stream_id, event, extra_users - )) - self._notify_pending_new_room_events(max_room_stream_id) + with PreserveLoggingContext(): + self.pending_new_room_events.append(( + room_stream_id, event, extra_users + )) + self._notify_pending_new_room_events(max_room_stream_id) def _notify_pending_new_room_events(self, max_room_stream_id): """Notify for the room events that were queued waiting for a previous @@ -251,31 +252,29 @@ class Notifier(object): extra_streams=app_streams, ) - @defer.inlineCallbacks - @log_function def on_new_event(self, stream_key, new_token, users=[], rooms=[], extra_streams=set()): """ Used to inform listeners that something has happend event wise. Will wake up all listeners for the given users and rooms. """ - yield run_on_reactor() - user_streams = set() + with PreserveLoggingContext(): + user_streams = set() - for user in users: - user_stream = self.user_to_user_stream.get(str(user)) - if user_stream is not None: - user_streams.add(user_stream) + for user in users: + user_stream = self.user_to_user_stream.get(str(user)) + if user_stream is not None: + user_streams.add(user_stream) - for room in rooms: - user_streams |= self.room_to_user_streams.get(room, set()) + for room in rooms: + user_streams |= self.room_to_user_streams.get(room, set()) - time_now_ms = self.clock.time_msec() - for user_stream in user_streams: - try: - user_stream.notify(stream_key, new_token, time_now_ms) - except: - logger.exception("Failed to notify listener") + time_now_ms = self.clock.time_msec() + for user_stream in user_streams: + try: + user_stream.notify(stream_key, new_token, time_now_ms) + except: + logger.exception("Failed to notify listener") @defer.inlineCallbacks def wait_for_events(self, user_id, timeout, callback, room_ids=None, @@ -325,7 +324,8 @@ class Notifier(object): # that we don't miss any current_token updates. prev_token = current_token listener = user_stream.new_listener(prev_token) - yield listener.deferred + with PreserveLoggingContext(): + yield listener.deferred except defer.CancelledError: break diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 64e581b8ba..8da2d8716c 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -111,7 +111,7 @@ class Pusher(object): self.user_id, config, timeout=0, affect_presence=False ) self.last_token = chunk['end'] - self.store.update_pusher_last_token( + yield self.store.update_pusher_last_token( self.app_id, self.pushkey, self.user_id, self.last_token ) logger.info("New pusher %s for user %s starting from token %s", diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index d1b7c0802f..d7dcb2de4b 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -18,6 +18,7 @@ from twisted.internet import defer from httppusher import HttpPusher from synapse.push import PusherConfigException +from synapse.util.logcontext import preserve_fn import logging @@ -76,7 +77,7 @@ class PusherPool: "Removing pusher for app id %s, pushkey %s, user %s", app_id, pushkey, p['user_name'] ) - self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) + yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) @defer.inlineCallbacks def remove_pushers_by_user(self, user_id): @@ -91,7 +92,7 @@ class PusherPool: "Removing pusher for app id %s, pushkey %s, user %s", p['app_id'], p['pushkey'], p['user_name'] ) - self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) + yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) @defer.inlineCallbacks def _add_pusher_to_store(self, user_id, access_token, profile_tag, kind, @@ -110,7 +111,7 @@ class PusherPool: lang=lang, data=data, ) - self._refresh_pusher(app_id, pushkey, user_id) + yield self._refresh_pusher(app_id, pushkey, user_id) def _create_pusher(self, pusherdict): if pusherdict['kind'] == 'http': @@ -166,7 +167,7 @@ class PusherPool: if fullid in self.pushers: self.pushers[fullid].stop() self.pushers[fullid] = p - p.start() + preserve_fn(p.start)() logger.info("Started pushers") diff --git a/synapse/rest/client/v2_alpha/account_data.py b/synapse/rest/client/v2_alpha/account_data.py index 985efe2a62..1456881c1a 100644 --- a/synapse/rest/client/v2_alpha/account_data.py +++ b/synapse/rest/client/v2_alpha/account_data.py @@ -57,7 +57,7 @@ class AccountDataServlet(RestServlet): user_id, account_data_type, body ) - yield self.notifier.on_new_event( + self.notifier.on_new_event( "account_data_key", max_id, users=[user_id] ) @@ -99,7 +99,7 @@ class RoomAccountDataServlet(RestServlet): user_id, room_id, account_data_type, body ) - yield self.notifier.on_new_event( + self.notifier.on_new_event( "account_data_key", max_id, users=[user_id] ) diff --git a/synapse/rest/client/v2_alpha/tags.py b/synapse/rest/client/v2_alpha/tags.py index 42f2203f3d..79c436a8cf 100644 --- a/synapse/rest/client/v2_alpha/tags.py +++ b/synapse/rest/client/v2_alpha/tags.py @@ -80,7 +80,7 @@ class TagServlet(RestServlet): max_id = yield self.store.add_tag_to_room(user_id, room_id, tag, body) - yield self.notifier.on_new_event( + self.notifier.on_new_event( "account_data_key", max_id, users=[user_id] ) @@ -94,7 +94,7 @@ class TagServlet(RestServlet): max_id = yield self.store.remove_tag_from_room(user_id, room_id, tag) - yield self.notifier.on_new_event( + self.notifier.on_new_event( "account_data_key", max_id, users=[user_id] ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index cfb87d9328..2e97ac84a8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -15,7 +15,7 @@ import logging from synapse.api.errors import StoreError -from synapse.util.logcontext import preserve_context_over_fn, LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache import synapse.metrics @@ -298,10 +298,10 @@ class SQLBaseStore(object): func, *args, **kwargs ) - result = yield preserve_context_over_fn( - self._db_pool.runWithConnection, - inner_func, *args, **kwargs - ) + with PreserveLoggingContext(): + result = yield self._db_pool.runWithConnection( + inner_func, *args, **kwargs + ) for after_callback, after_args in after_callbacks: after_callback(*after_args) @@ -326,10 +326,10 @@ class SQLBaseStore(object): return func(conn, *args, **kwargs) - result = yield preserve_context_over_fn( - self._db_pool.runWithConnection, - inner_func, *args, **kwargs - ) + with PreserveLoggingContext(): + result = yield self._db_pool.runWithConnection( + inner_func, *args, **kwargs + ) defer.returnValue(result) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4d7cdd00d0..c6ed54721c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,7 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event -from synapse.util.logcontext import preserve_context_over_deferred +from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function from synapse.api.constants import EventTypes @@ -664,14 +664,16 @@ class EventsStore(SQLBaseStore): for ids, d in lst: if not d.called: try: - d.callback([ - res[i] - for i in ids - if i in res - ]) + with PreserveLoggingContext(): + d.callback([ + res[i] + for i in ids + if i in res + ]) except: logger.exception("Failed to callback") - reactor.callFromThread(fire, event_list, row_dict) + with PreserveLoggingContext(): + reactor.callFromThread(fire, event_list, row_dict) except Exception as e: logger.exception("do_fetch") @@ -679,10 +681,12 @@ class EventsStore(SQLBaseStore): def fire(evs): for _, d in evs: if not d.called: - d.errback(e) + with PreserveLoggingContext(): + d.errback(e) if event_list: - reactor.callFromThread(fire, event_list) + with PreserveLoggingContext(): + reactor.callFromThread(fire, event_list) @defer.inlineCallbacks def _enqueue_events(self, events, check_redacted=True, @@ -709,18 +713,20 @@ class EventsStore(SQLBaseStore): should_start = False if should_start: - self.runWithConnection( - self._do_fetch - ) + with PreserveLoggingContext(): + self.runWithConnection( + self._do_fetch + ) - rows = yield preserve_context_over_deferred(events_d) + with PreserveLoggingContext(): + rows = yield events_d if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] res = yield defer.gatherResults( [ - self._get_event_from_row( + preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], check_redacted=check_redacted, get_prev_content=get_prev_content, diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 9b3aecaf8c..ef525f34c5 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -68,8 +68,9 @@ class PresenceStore(SQLBaseStore): for row in rows }) + @defer.inlineCallbacks def set_presence_state(self, user_localpart, new_state): - res = self._simple_update_one( + res = yield self._simple_update_one( table="presence", keyvalues={"user_id": user_localpart}, updatevalues={"state": new_state["state"], @@ -79,7 +80,7 @@ class PresenceStore(SQLBaseStore): ) self.get_presence_state.invalidate((user_localpart,)) - return res + defer.returnValue(res) def allow_presence_visible(self, observed_localpart, observer_userid): return self._simple_insert( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 50436cb2d2..367ffc9543 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,6 +39,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken +from synapse.util.logcontext import preserve_fn import logging @@ -170,12 +171,12 @@ class StreamStore(SQLBaseStore): room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): res = yield defer.gatherResults([ - self.get_room_events_stream_for_room( - room_id, from_key, to_key, limit - ).addCallback(lambda r, rm: (rm, r), room_id) + preserve_fn(self.get_room_events_stream_for_room)( + room_id, from_key, to_key, limit, + ) for room_id in room_ids ]) - results.update(dict(res)) + results.update(dict(zip(rm_ids, res))) defer.returnValue(results) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 7566d9eb33..133671e238 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import PreserveLoggingContext from twisted.internet import defer, reactor, task @@ -61,10 +61,8 @@ class Clock(object): *args: Postional arguments to pass to function. **kwargs: Key arguments to pass to function. """ - current_context = LoggingContext.current_context() - def wrapped_callback(*args, **kwargs): - with PreserveLoggingContext(current_context): + with PreserveLoggingContext(): callback(*args, **kwargs) with PreserveLoggingContext(): diff --git a/synapse/util/async.py b/synapse/util/async.py index 200edd404c..640fae3890 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -16,13 +16,16 @@ from twisted.internet import defer, reactor -from .logcontext import preserve_context_over_deferred +from .logcontext import PreserveLoggingContext +@defer.inlineCallbacks def sleep(seconds): d = defer.Deferred() - reactor.callLater(seconds, d.callback, seconds) - return preserve_context_over_deferred(d) + with PreserveLoggingContext(): + reactor.callLater(seconds, d.callback, seconds) + res = yield d + defer.returnValue(res) def run_on_reactor(): @@ -54,6 +57,7 @@ class ObservableDeferred(object): object.__setattr__(self, "_result", (True, r)) while self._observers: try: + # TODO: Handle errors here. self._observers.pop().callback(r) except: pass @@ -63,6 +67,7 @@ class ObservableDeferred(object): object.__setattr__(self, "_result", (False, f)) while self._observers: try: + # TODO: Handle errors here. self._observers.pop().errback(f) except: pass diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index e27917c63a..277854ccbc 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -18,6 +18,9 @@ from synapse.util.async import ObservableDeferred from synapse.util import unwrapFirstError from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn +) from . import caches_by_name, DEBUG_CACHES, cache_counter @@ -190,7 +193,7 @@ class CacheDescriptor(object): defer.returnValue(cached_result) observer.addCallback(check_result) - return observer + return preserve_context_over_deferred(observer) except KeyError: # Get the sequence number of the cache before reading from the # database so that we can tell if the cache is invalidated @@ -198,6 +201,7 @@ class CacheDescriptor(object): sequence = self.cache.sequence ret = defer.maybeDeferred( + preserve_context_over_fn, self.function_to_call, obj, *args, **kwargs ) @@ -211,7 +215,7 @@ class CacheDescriptor(object): ret = ObservableDeferred(ret, consumeErrors=True) self.cache.update(sequence, cache_key, ret) - return ret.observe() + return preserve_context_over_deferred(ret.observe()) wrapped.invalidate = self.cache.invalidate wrapped.invalidate_all = self.cache.invalidate_all @@ -299,6 +303,7 @@ class CacheListDescriptor(object): args_to_call[self.list_name] = missing ret_d = defer.maybeDeferred( + preserve_context_over_fn, self.function_to_call, **args_to_call ) @@ -308,7 +313,8 @@ class CacheListDescriptor(object): # We need to create deferreds for each arg in the list so that # we can insert the new deferred into the cache. for arg in missing: - observer = ret_d.observe() + with PreserveLoggingContext(): + observer = ret_d.observe() observer.addCallback(lambda r, arg: r.get(arg, None), arg) observer = ObservableDeferred(observer) @@ -327,10 +333,10 @@ class CacheListDescriptor(object): cached[arg] = res - return defer.gatherResults( + return preserve_context_over_deferred(defer.gatherResults( cached.values(), consumeErrors=True, - ).addErrback(unwrapFirstError).addCallback(lambda res: dict(res)) + ).addErrback(unwrapFirstError).addCallback(lambda res: dict(res))) obj.__dict__[self.orig.__name__] = wrapped diff --git a/synapse/util/caches/snapshot_cache.py b/synapse/util/caches/snapshot_cache.py index b1e40417fd..d03678b8c8 100644 --- a/synapse/util/caches/snapshot_cache.py +++ b/synapse/util/caches/snapshot_cache.py @@ -87,7 +87,8 @@ class SnapshotCache(object): # expire from the rotation of that cache. self.next_result_cache[key] = result self.pending_result_cache.pop(key, None) + return r - result.observe().addBoth(shuffle_along) + result.addBoth(shuffle_along) return result.observe() diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 4ebfebf701..8875813de4 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -15,9 +15,7 @@ from twisted.internet import defer -from synapse.util.logcontext import ( - PreserveLoggingContext, preserve_context_over_deferred, -) +from synapse.util.logcontext import PreserveLoggingContext from synapse.util import unwrapFirstError @@ -97,6 +95,7 @@ class Signal(object): Each observer callable may return a Deferred.""" self.observers.append(observer) + @defer.inlineCallbacks def fire(self, *args, **kwargs): """Invokes every callable in the observer list, passing in the args and kwargs. Exceptions thrown by observers are logged but ignored. It is @@ -116,6 +115,7 @@ class Signal(object): failure.getTracebackObject())) if not self.suppress_failures: return failure + return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb) with PreserveLoggingContext(): @@ -124,8 +124,11 @@ class Signal(object): for observer in self.observers ] - d = defer.gatherResults(deferreds, consumeErrors=True) + res = yield defer.gatherResults( + deferreds, consumeErrors=True + ).addErrback(unwrapFirstError) - d.addErrback(unwrapFirstError) + defer.returnValue(res) - return preserve_context_over_deferred(d) + def __repr__(self): + return "" % (self.name,) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index e701092cd8..9134e67908 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -48,7 +48,7 @@ class LoggingContext(object): __slots__ = [ "parent_context", "name", "usage_start", "usage_end", "main_thread", - "__dict__", "tag", + "__dict__", "tag", "alive", ] thread_local = threading.local() @@ -88,6 +88,7 @@ class LoggingContext(object): self.usage_start = None self.main_thread = threading.current_thread() self.tag = "" + self.alive = True def __str__(self): return "%s@%x" % (self.name, id(self)) @@ -106,6 +107,7 @@ class LoggingContext(object): The context that was previously active """ current = cls.current_context() + if current is not context: current.stop() cls.thread_local.current_context = context @@ -117,6 +119,7 @@ class LoggingContext(object): if self.parent_context is not None: raise Exception("Attempt to enter logging context multiple times") self.parent_context = self.set_current_context(self) + self.alive = True return self def __exit__(self, type, value, traceback): @@ -136,6 +139,7 @@ class LoggingContext(object): self ) self.parent_context = None + self.alive = False def __getattr__(self, name): """Delegate member lookup to parent context""" @@ -213,7 +217,7 @@ class PreserveLoggingContext(object): exited. Used to restore the context after a function using @defer.inlineCallbacks is resumed by a callback from the reactor.""" - __slots__ = ["current_context", "new_context"] + __slots__ = ["current_context", "new_context", "has_parent"] def __init__(self, new_context=LoggingContext.sentinel): self.new_context = new_context @@ -224,11 +228,26 @@ class PreserveLoggingContext(object): self.new_context ) + if self.current_context: + self.has_parent = self.current_context.parent_context is not None + if not self.current_context.alive: + logger.warn( + "Entering dead context: %s", + self.current_context, + ) + def __exit__(self, type, value, traceback): """Restores the current logging context""" - LoggingContext.set_current_context(self.current_context) + context = LoggingContext.set_current_context(self.current_context) + + if context != self.new_context: + logger.warn( + "Unexpected logging context: %s is not %s", + context, self.new_context, + ) + if self.current_context is not LoggingContext.sentinel: - if self.current_context.parent_context is None: + if not self.current_context.alive: logger.warn( "Restoring dead context: %s", self.current_context, @@ -289,3 +308,74 @@ def preserve_context_over_deferred(deferred): d = _PreservingContextDeferred(current_context) deferred.chainDeferred(d) return d + + +def preserve_fn(f): + """Ensures that function is called with correct context and that context is + restored after return. Useful for wrapping functions that return a deferred + which you don't yield on. + """ + current = LoggingContext.current_context() + + def g(*args, **kwargs): + with PreserveLoggingContext(current): + return f(*args, **kwargs) + + return g + + +# modules to ignore in `logcontext_tracer` +_to_ignore = [ + "synapse.util.logcontext", + "synapse.http.server", + "synapse.storage._base", + "synapse.util.async", +] + + +def logcontext_tracer(frame, event, arg): + """A tracer that logs whenever a logcontext "unexpectedly" changes within + a function. Probably inaccurate. + + Use by calling `sys.settrace(logcontext_tracer)` in the main thread. + """ + if event == 'call': + name = frame.f_globals["__name__"] + if name.startswith("synapse"): + if name == "synapse.util.logcontext": + if frame.f_code.co_name in ["__enter__", "__exit__"]: + tracer = frame.f_back.f_trace + if tracer: + tracer.just_changed = True + + tracer = frame.f_trace + if tracer: + return tracer + + if not any(name.startswith(ig) for ig in _to_ignore): + return LineTracer() + + +class LineTracer(object): + __slots__ = ["context", "just_changed"] + + def __init__(self): + self.context = LoggingContext.current_context() + self.just_changed = False + + def __call__(self, frame, event, arg): + if event in 'line': + if self.just_changed: + self.context = LoggingContext.current_context() + self.just_changed = False + else: + c = LoggingContext.current_context() + if c != self.context: + logger.info( + "Context changed! %s -> %s, %s, %s", + self.context, c, + frame.f_code.co_filename, frame.f_lineno + ) + self.context = c + + return self diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py index c37a157787..3a83828d25 100644 --- a/synapse/util/logutils.py +++ b/synapse/util/logutils.py @@ -168,3 +168,38 @@ def trace_function(f): wrapped.__name__ = func_name return wrapped + + +def get_previous_frames(): + s = inspect.currentframe().f_back.f_back + to_return = [] + while s: + if s.f_globals["__name__"].startswith("synapse"): + filename, lineno, function, _, _ = inspect.getframeinfo(s) + args_string = inspect.formatargvalues(*inspect.getargvalues(s)) + + to_return.append("{{ %s:%d %s - Args: %s }}" % ( + filename, lineno, function, args_string + )) + + s = s.f_back + + return ", ". join(to_return) + + +def get_previous_frame(ignore=[]): + s = inspect.currentframe().f_back.f_back + + while s: + if s.f_globals["__name__"].startswith("synapse"): + if not any(s.f_globals["__name__"].startswith(ig) for ig in ignore): + filename, lineno, function, _, _ = inspect.getframeinfo(s) + args_string = inspect.formatargvalues(*inspect.getargvalues(s)) + + return "{{ %s:%d %s - Args: %s }}" % ( + filename, lineno, function, args_string + ) + + s = s.f_back + + return None diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index daf6087fe0..ca48007218 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -68,16 +68,18 @@ class Measure(object): block_timer.inc_by(duration, self.name) context = LoggingContext.current_context() - if not context: - return if context != self.start_context: logger.warn( - "Context have unexpectedly changed %r, %r", - context, self.start_context + "Context have unexpectedly changed from '%s' to '%s'. (%r)", + context, self.start_context, self.name ) return + if not context: + logger.warn("Expected context. (%r)", self.name) + return + ru_utime, ru_stime = context.get_resource_usage() block_ru_utime.inc_by(ru_utime, self.name) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index ea321bc6a9..4076eed269 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.errors import LimitExceededError from synapse.util.async import sleep +from synapse.util.logcontext import preserve_fn import collections import contextlib @@ -163,7 +164,7 @@ class _PerHostRatelimiter(object): "Ratelimit [%s]: sleeping req", id(request_id), ) - ret_defer = sleep(self.sleep_msec / 1000.0) + ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0) self.sleeping_requests.add(request_id) -- cgit 1.4.1 From 549698b1e025263b2c4d8aa3f9b20f1c8761dc59 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Feb 2016 09:37:09 +0000 Subject: Don't measure across event stream call, as it lasts for a long time. --- synapse/push/__init__.py | 248 +++++++++++++++++++++++------------------------ 1 file changed, 124 insertions(+), 124 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 8da2d8716c..b9522a8050 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -128,8 +128,7 @@ class Pusher(object): try: if wait > 0: yield synapse.util.async.sleep(wait) - with Measure(self.clock, "push"): - yield self.get_and_dispatch() + yield self.get_and_dispatch() wait = 0 except: if wait == 0: @@ -151,115 +150,27 @@ class Pusher(object): only_keys=("room", "receipt",), ) - # limiting to 1 may get 1 event plus 1 presence event, so - # pick out the actual event - single_event = None - read_receipt = None - for c in chunk['chunk']: - if 'event_id' in c: # Hmmm... - single_event = c - elif c['type'] == 'm.receipt': - read_receipt = c - - have_updated_badge = False - if read_receipt: - for receipt_part in read_receipt['content'].values(): - if 'm.read' in receipt_part: - if self.user_id in receipt_part['m.read'].keys(): - have_updated_badge = True - - if not single_event: - if have_updated_badge: - yield self.update_badge() - self.last_token = chunk['end'] - yield self.store.update_pusher_last_token( - self.app_id, - self.pushkey, - self.user_id, - self.last_token - ) - return - - if not self.alive: - return - - processed = False - - rule_evaluator = yield \ - push_rule_evaluator.evaluator_for_user_id_and_profile_tag( - self.user_id, self.profile_tag, single_event['room_id'], self.store - ) - - actions = yield rule_evaluator.actions_for_event(single_event) - tweaks = rule_evaluator.tweaks_for_actions(actions) - - if 'notify' in actions: - self.badge = yield self._get_badge_count() - rejected = yield self.dispatch_push(single_event, tweaks, self.badge) - self.has_unread = True - if isinstance(rejected, list) or isinstance(rejected, tuple): - processed = True - for pk in rejected: - if pk != self.pushkey: - # for sanity, we only remove the pushkey if it - # was the one we actually sent... - logger.warn( - ("Ignoring rejected pushkey %s because we" - " didn't send it"), pk - ) - else: - logger.info( - "Pushkey %s was rejected: removing", - pk - ) - yield self.hs.get_pusherpool().remove_pusher( - self.app_id, pk, self.user_id - ) - else: - if have_updated_badge: - yield self.update_badge() - processed = True - - if not self.alive: - return - - if processed: - self.backoff_delay = Pusher.INITIAL_BACKOFF - self.last_token = chunk['end'] - yield self.store.update_pusher_last_token_and_success( - self.app_id, - self.pushkey, - self.user_id, - self.last_token, - self.clock.time_msec() - ) - if self.failing_since: - self.failing_since = None - yield self.store.update_pusher_failing_since( - self.app_id, - self.pushkey, - self.user_id, - self.failing_since) - else: - if not self.failing_since: - self.failing_since = self.clock.time_msec() - yield self.store.update_pusher_failing_since( - self.app_id, - self.pushkey, - self.user_id, - self.failing_since - ) - - if (self.failing_since and - self.failing_since < - self.clock.time_msec() - Pusher.GIVE_UP_AFTER): - # we really only give up so that if the URL gets - # fixed, we don't suddenly deliver a load - # of old notifications. - logger.warn("Giving up on a notification to user %s, " - "pushkey %s", - self.user_id, self.pushkey) - self.backoff_delay = Pusher.INITIAL_BACKOFF + with Measure(self.clock, "push"): + # limiting to 1 may get 1 event plus 1 presence event, so + # pick out the actual event + single_event = None + read_receipt = None + for c in chunk['chunk']: + if 'event_id' in c: # Hmmm... + single_event = c + elif c['type'] == 'm.receipt': + read_receipt = c + + have_updated_badge = False + if read_receipt: + for receipt_part in read_receipt['content'].values(): + if 'm.read' in receipt_part: + if self.user_id in receipt_part['m.read'].keys(): + have_updated_badge = True + + if not single_event: + if have_updated_badge: + yield self.update_badge() self.last_token = chunk['end'] yield self.store.update_pusher_last_token( self.app_id, @@ -267,25 +178,114 @@ class Pusher(object): self.user_id, self.last_token ) + return + + if not self.alive: + return + + processed = False + + rule_evaluator = yield \ + push_rule_evaluator.evaluator_for_user_id_and_profile_tag( + self.user_id, self.profile_tag, single_event['room_id'], self.store + ) - self.failing_since = None - yield self.store.update_pusher_failing_since( + actions = yield rule_evaluator.actions_for_event(single_event) + tweaks = rule_evaluator.tweaks_for_actions(actions) + + if 'notify' in actions: + self.badge = yield self._get_badge_count() + rejected = yield self.dispatch_push(single_event, tweaks, self.badge) + self.has_unread = True + if isinstance(rejected, list) or isinstance(rejected, tuple): + processed = True + for pk in rejected: + if pk != self.pushkey: + # for sanity, we only remove the pushkey if it + # was the one we actually sent... + logger.warn( + ("Ignoring rejected pushkey %s because we" + " didn't send it"), pk + ) + else: + logger.info( + "Pushkey %s was rejected: removing", + pk + ) + yield self.hs.get_pusherpool().remove_pusher( + self.app_id, pk, self.user_id + ) + else: + if have_updated_badge: + yield self.update_badge() + processed = True + + if not self.alive: + return + + if processed: + self.backoff_delay = Pusher.INITIAL_BACKOFF + self.last_token = chunk['end'] + yield self.store.update_pusher_last_token_and_success( self.app_id, self.pushkey, self.user_id, - self.failing_since + self.last_token, + self.clock.time_msec() ) + if self.failing_since: + self.failing_since = None + yield self.store.update_pusher_failing_since( + self.app_id, + self.pushkey, + self.user_id, + self.failing_since) else: - logger.warn("Failed to dispatch push for user %s " - "(failing for %dms)." - "Trying again in %dms", - self.user_id, - self.clock.time_msec() - self.failing_since, - self.backoff_delay) - yield synapse.util.async.sleep(self.backoff_delay / 1000.0) - self.backoff_delay *= 2 - if self.backoff_delay > Pusher.MAX_BACKOFF: - self.backoff_delay = Pusher.MAX_BACKOFF + if not self.failing_since: + self.failing_since = self.clock.time_msec() + yield self.store.update_pusher_failing_since( + self.app_id, + self.pushkey, + self.user_id, + self.failing_since + ) + + if (self.failing_since and + self.failing_since < + self.clock.time_msec() - Pusher.GIVE_UP_AFTER): + # we really only give up so that if the URL gets + # fixed, we don't suddenly deliver a load + # of old notifications. + logger.warn("Giving up on a notification to user %s, " + "pushkey %s", + self.user_id, self.pushkey) + self.backoff_delay = Pusher.INITIAL_BACKOFF + self.last_token = chunk['end'] + yield self.store.update_pusher_last_token( + self.app_id, + self.pushkey, + self.user_id, + self.last_token + ) + + self.failing_since = None + yield self.store.update_pusher_failing_since( + self.app_id, + self.pushkey, + self.user_id, + self.failing_since + ) + else: + logger.warn("Failed to dispatch push for user %s " + "(failing for %dms)." + "Trying again in %dms", + self.user_id, + self.clock.time_msec() - self.failing_since, + self.backoff_delay) + yield synapse.util.async.sleep(self.backoff_delay / 1000.0) + self.backoff_delay *= 2 + if self.backoff_delay > Pusher.MAX_BACKOFF: + self.backoff_delay = Pusher.MAX_BACKOFF def stop(self): self.alive = False -- cgit 1.4.1 From 31a2b892d8935dcb49cbc3c71a0ccf8aa34eeb97 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Feb 2016 11:25:09 +0000 Subject: Revert to putting it around the entire block --- synapse/push/__init__.py | 248 +++++++++++++++++++++++------------------------ 1 file changed, 124 insertions(+), 124 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index b9522a8050..8da2d8716c 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -128,7 +128,8 @@ class Pusher(object): try: if wait > 0: yield synapse.util.async.sleep(wait) - yield self.get_and_dispatch() + with Measure(self.clock, "push"): + yield self.get_and_dispatch() wait = 0 except: if wait == 0: @@ -150,27 +151,115 @@ class Pusher(object): only_keys=("room", "receipt",), ) - with Measure(self.clock, "push"): - # limiting to 1 may get 1 event plus 1 presence event, so - # pick out the actual event - single_event = None - read_receipt = None - for c in chunk['chunk']: - if 'event_id' in c: # Hmmm... - single_event = c - elif c['type'] == 'm.receipt': - read_receipt = c - - have_updated_badge = False - if read_receipt: - for receipt_part in read_receipt['content'].values(): - if 'm.read' in receipt_part: - if self.user_id in receipt_part['m.read'].keys(): - have_updated_badge = True - - if not single_event: - if have_updated_badge: - yield self.update_badge() + # limiting to 1 may get 1 event plus 1 presence event, so + # pick out the actual event + single_event = None + read_receipt = None + for c in chunk['chunk']: + if 'event_id' in c: # Hmmm... + single_event = c + elif c['type'] == 'm.receipt': + read_receipt = c + + have_updated_badge = False + if read_receipt: + for receipt_part in read_receipt['content'].values(): + if 'm.read' in receipt_part: + if self.user_id in receipt_part['m.read'].keys(): + have_updated_badge = True + + if not single_event: + if have_updated_badge: + yield self.update_badge() + self.last_token = chunk['end'] + yield self.store.update_pusher_last_token( + self.app_id, + self.pushkey, + self.user_id, + self.last_token + ) + return + + if not self.alive: + return + + processed = False + + rule_evaluator = yield \ + push_rule_evaluator.evaluator_for_user_id_and_profile_tag( + self.user_id, self.profile_tag, single_event['room_id'], self.store + ) + + actions = yield rule_evaluator.actions_for_event(single_event) + tweaks = rule_evaluator.tweaks_for_actions(actions) + + if 'notify' in actions: + self.badge = yield self._get_badge_count() + rejected = yield self.dispatch_push(single_event, tweaks, self.badge) + self.has_unread = True + if isinstance(rejected, list) or isinstance(rejected, tuple): + processed = True + for pk in rejected: + if pk != self.pushkey: + # for sanity, we only remove the pushkey if it + # was the one we actually sent... + logger.warn( + ("Ignoring rejected pushkey %s because we" + " didn't send it"), pk + ) + else: + logger.info( + "Pushkey %s was rejected: removing", + pk + ) + yield self.hs.get_pusherpool().remove_pusher( + self.app_id, pk, self.user_id + ) + else: + if have_updated_badge: + yield self.update_badge() + processed = True + + if not self.alive: + return + + if processed: + self.backoff_delay = Pusher.INITIAL_BACKOFF + self.last_token = chunk['end'] + yield self.store.update_pusher_last_token_and_success( + self.app_id, + self.pushkey, + self.user_id, + self.last_token, + self.clock.time_msec() + ) + if self.failing_since: + self.failing_since = None + yield self.store.update_pusher_failing_since( + self.app_id, + self.pushkey, + self.user_id, + self.failing_since) + else: + if not self.failing_since: + self.failing_since = self.clock.time_msec() + yield self.store.update_pusher_failing_since( + self.app_id, + self.pushkey, + self.user_id, + self.failing_since + ) + + if (self.failing_since and + self.failing_since < + self.clock.time_msec() - Pusher.GIVE_UP_AFTER): + # we really only give up so that if the URL gets + # fixed, we don't suddenly deliver a load + # of old notifications. + logger.warn("Giving up on a notification to user %s, " + "pushkey %s", + self.user_id, self.pushkey) + self.backoff_delay = Pusher.INITIAL_BACKOFF self.last_token = chunk['end'] yield self.store.update_pusher_last_token( self.app_id, @@ -178,114 +267,25 @@ class Pusher(object): self.user_id, self.last_token ) - return - - if not self.alive: - return - - processed = False - - rule_evaluator = yield \ - push_rule_evaluator.evaluator_for_user_id_and_profile_tag( - self.user_id, self.profile_tag, single_event['room_id'], self.store - ) - actions = yield rule_evaluator.actions_for_event(single_event) - tweaks = rule_evaluator.tweaks_for_actions(actions) - - if 'notify' in actions: - self.badge = yield self._get_badge_count() - rejected = yield self.dispatch_push(single_event, tweaks, self.badge) - self.has_unread = True - if isinstance(rejected, list) or isinstance(rejected, tuple): - processed = True - for pk in rejected: - if pk != self.pushkey: - # for sanity, we only remove the pushkey if it - # was the one we actually sent... - logger.warn( - ("Ignoring rejected pushkey %s because we" - " didn't send it"), pk - ) - else: - logger.info( - "Pushkey %s was rejected: removing", - pk - ) - yield self.hs.get_pusherpool().remove_pusher( - self.app_id, pk, self.user_id - ) - else: - if have_updated_badge: - yield self.update_badge() - processed = True - - if not self.alive: - return - - if processed: - self.backoff_delay = Pusher.INITIAL_BACKOFF - self.last_token = chunk['end'] - yield self.store.update_pusher_last_token_and_success( + self.failing_since = None + yield self.store.update_pusher_failing_since( self.app_id, self.pushkey, self.user_id, - self.last_token, - self.clock.time_msec() + self.failing_since ) - if self.failing_since: - self.failing_since = None - yield self.store.update_pusher_failing_since( - self.app_id, - self.pushkey, - self.user_id, - self.failing_since) else: - if not self.failing_since: - self.failing_since = self.clock.time_msec() - yield self.store.update_pusher_failing_since( - self.app_id, - self.pushkey, - self.user_id, - self.failing_since - ) - - if (self.failing_since and - self.failing_since < - self.clock.time_msec() - Pusher.GIVE_UP_AFTER): - # we really only give up so that if the URL gets - # fixed, we don't suddenly deliver a load - # of old notifications. - logger.warn("Giving up on a notification to user %s, " - "pushkey %s", - self.user_id, self.pushkey) - self.backoff_delay = Pusher.INITIAL_BACKOFF - self.last_token = chunk['end'] - yield self.store.update_pusher_last_token( - self.app_id, - self.pushkey, - self.user_id, - self.last_token - ) - - self.failing_since = None - yield self.store.update_pusher_failing_since( - self.app_id, - self.pushkey, - self.user_id, - self.failing_since - ) - else: - logger.warn("Failed to dispatch push for user %s " - "(failing for %dms)." - "Trying again in %dms", - self.user_id, - self.clock.time_msec() - self.failing_since, - self.backoff_delay) - yield synapse.util.async.sleep(self.backoff_delay / 1000.0) - self.backoff_delay *= 2 - if self.backoff_delay > Pusher.MAX_BACKOFF: - self.backoff_delay = Pusher.MAX_BACKOFF + logger.warn("Failed to dispatch push for user %s " + "(failing for %dms)." + "Trying again in %dms", + self.user_id, + self.clock.time_msec() - self.failing_since, + self.backoff_delay) + yield synapse.util.async.sleep(self.backoff_delay / 1000.0) + self.backoff_delay *= 2 + if self.backoff_delay > Pusher.MAX_BACKOFF: + self.backoff_delay = Pusher.MAX_BACKOFF def stop(self): self.alive = False -- cgit 1.4.1 From f28cc4518368955f692a1262a688e907cb22d226 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Feb 2016 16:01:40 +0000 Subject: Pass in current state to push action handler --- synapse/handlers/_base.py | 31 +++++++++++++------------------ synapse/push/action_generator.py | 6 ++++-- synapse/push/bulk_push_rule_evaluator.py | 10 +++------- 3 files changed, 20 insertions(+), 27 deletions(-) (limited to 'synapse/push') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index fa83d3e464..d3f722b22e 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -53,25 +53,10 @@ class BaseHandler(object): self.event_builder_factory = hs.get_event_builder_factory() @defer.inlineCallbacks - def _filter_events_for_clients(self, user_tuples, events): + def _filter_events_for_clients(self, user_tuples, events, event_id_to_state): """ Returns dict of user_id -> list of events that user is allowed to see. """ - # If there is only one user, just get the state for that one user, - # otherwise just get all the state. - if len(user_tuples) == 1: - types = ( - (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, user_tuples[0][0]), - ) - else: - types = None - - event_id_to_state = yield self.store.get_state_for_events( - frozenset(e.event_id for e in events), - types=types - ) - forgotten = yield defer.gatherResults([ self.store.who_forgot_in_room( room_id, @@ -135,7 +120,17 @@ class BaseHandler(object): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, events, is_peeking=False): # Assumes that user has at some point joined the room if not is_guest. - res = yield self._filter_events_for_clients([(user_id, is_peeking)], events) + types = ( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, user_id), + ) + event_id_to_state = yield self.store.get_state_for_events( + frozenset(e.event_id for e in events), + types=types + ) + res = yield self._filter_events_for_clients( + [(user_id, is_peeking)], events, event_id_to_state + ) defer.returnValue(res.get(user_id, [])) def ratelimit(self, user_id): @@ -275,7 +270,7 @@ class BaseHandler(object): action_generator = ActionGenerator(self.hs) yield action_generator.handle_push_actions_for_event( - event, self + event, self, context.current_state ) destinations = set() diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 1d2e558f9a..d8f8256a1f 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -36,7 +36,7 @@ class ActionGenerator: # tag (ie. we just need all the users). @defer.inlineCallbacks - def handle_push_actions_for_event(self, event, handler): + def handle_push_actions_for_event(self, event, handler, current_state): if event.type == EventTypes.Redaction and event.redacts is not None: yield self.store.remove_push_actions_for_event_id( event.room_id, event.redacts @@ -46,7 +46,9 @@ class ActionGenerator: event.room_id, self.hs, self.store ) - actions_by_user = yield bulk_evaluator.action_for_event_by_user(event, handler) + actions_by_user = yield bulk_evaluator.action_for_event_by_user( + event, handler, current_state + ) yield self.store.set_push_actions_for_event_and_users( event, diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 20c60422bf..8ac5ceb9ef 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -98,25 +98,21 @@ class BulkPushRuleEvaluator: self.store = store @defer.inlineCallbacks - def action_for_event_by_user(self, event, handler): + def action_for_event_by_user(self, event, handler, current_state): actions_by_user = {} users_dict = yield self.store.are_guests(self.rules_by_user.keys()) filtered_by_user = yield handler._filter_events_for_clients( - users_dict.items(), [event] + users_dict.items(), [event], {event.event_id: current_state} ) evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room)) condition_cache = {} - member_state = yield self.store.get_state_for_event( - event.event_id, - ) - display_names = {} - for ev in member_state.values(): + for ev in current_state.values(): nm = ev.content.get("displayname", None) if nm and ev.type == EventTypes.Member: display_names[ev.state_key] = nm -- cgit 1.4.1 From 7b0d846407a593ccd204f82aaa1090b8af8df84c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Feb 2016 16:19:15 +0000 Subject: Atomically persit push actions when we persist the event --- synapse/events/snapshot.py | 1 + synapse/handlers/_base.py | 10 ++++---- synapse/handlers/federation.py | 12 +++++----- synapse/push/action_generator.py | 20 ++++------------ synapse/storage/event_push_actions.py | 45 +++++++++++++---------------------- synapse/storage/events.py | 26 ++++++++++++-------- 6 files changed, 49 insertions(+), 65 deletions(-) (limited to 'synapse/push') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index f51200d18e..8a475417a6 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -20,3 +20,4 @@ class EventContext(object): self.current_state = current_state self.state_group = None self.rejected = False + self.push_actions = [] diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index d3f722b22e..064e8723c8 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -264,13 +264,13 @@ class BaseHandler(object): "You don't have permission to redact events" ) - (event_stream_id, max_stream_id) = yield self.store.persist_event( - event, context=context - ) - action_generator = ActionGenerator(self.hs) yield action_generator.handle_push_actions_for_event( - event, self, context.current_state + event, context, self + ) + + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context ) destinations = set() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b78b0502d9..da55d43541 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -236,12 +236,6 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - if not backfilled and not event.internal_metadata.is_outlier(): - action_generator = ActionGenerator(self.hs) - yield action_generator.handle_push_actions_for_event( - event, self - ) - @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): event_to_state = yield self.store.get_state_for_events( @@ -1073,6 +1067,12 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) + if not backfilled and not event.internal_metadata.is_outlier(): + action_generator = ActionGenerator(self.hs) + yield action_generator.handle_push_actions_for_event( + event, context, self + ) + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index d8f8256a1f..e0da0868ec 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -19,8 +19,6 @@ import bulk_push_rule_evaluator import logging -from synapse.api.constants import EventTypes - logger = logging.getLogger(__name__) @@ -36,23 +34,15 @@ class ActionGenerator: # tag (ie. we just need all the users). @defer.inlineCallbacks - def handle_push_actions_for_event(self, event, handler, current_state): - if event.type == EventTypes.Redaction and event.redacts is not None: - yield self.store.remove_push_actions_for_event_id( - event.room_id, event.redacts - ) - + def handle_push_actions_for_event(self, event, context, handler): bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( event.room_id, self.hs, self.store ) actions_by_user = yield bulk_evaluator.action_for_event_by_user( - event, handler, current_state + event, handler, context.current_state ) - yield self.store.set_push_actions_for_event_and_users( - event, - [ - (uid, None, actions) for uid, actions in actions_by_user.items() - ] - ) + context.push_actions = [ + (uid, None, actions) for uid, actions in actions_by_user.items() + ] diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index d0a969f50b..466f07a1c4 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -24,8 +24,7 @@ logger = logging.getLogger(__name__) class EventPushActionsStore(SQLBaseStore): - @defer.inlineCallbacks - def set_push_actions_for_event_and_users(self, event, tuples): + def _set_push_actions_for_event_and_users(self, txn, event, tuples): """ :param event: the event set actions for :param tuples: list of tuples of (user_id, profile_tag, actions) @@ -44,18 +43,12 @@ class EventPushActionsStore(SQLBaseStore): 'highlight': 1 if _action_has_highlight(actions) else 0, }) - def f(txn): - for uid, _, __ in tuples: - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (event.room_id, uid) - ) - return self._simple_insert_many_txn(txn, "event_push_actions", values) - - yield self.runInteraction( - "set_actions_for_event_and_users", - f, - ) + for uid, _, __ in tuples: + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (event.room_id, uid) + ) + self._simple_insert_many_txn(txn, "event_push_actions", values) @cachedInlineCallbacks(num_args=3, lru=True, tree=True) def get_unread_event_push_actions_by_room_for_user( @@ -107,21 +100,15 @@ class EventPushActionsStore(SQLBaseStore): ) defer.returnValue(ret) - @defer.inlineCallbacks - def remove_push_actions_for_event_id(self, room_id, event_id): - def f(txn): - # Sad that we have to blow away the cache for the whole room here - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (room_id,) - ) - txn.execute( - "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", - (room_id, event_id) - ) - yield self.runInteraction( - "remove_push_actions_for_event_id", - f + def _remove_push_actions_for_event_id(self, txn, room_id, event_id): + # Sad that we have to blow away the cache for the whole room here + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id,) + ) + txn.execute( + "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", + (room_id, event_id) ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c6ed54721c..7d4012c414 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -205,23 +205,29 @@ class EventsStore(SQLBaseStore): @log_function def _persist_events_txn(self, txn, events_and_contexts, backfilled, is_new_state=True): - - # Remove the any existing cache entries for the event_ids - for event, _ in events_and_contexts: + depth_updates = {} + for event, context in events_and_contexts: + # Remove the any existing cache entries for the event_ids txn.call_after(self._invalidate_get_event_cache, event.event_id) - if not backfilled: txn.call_after( self._events_stream_cache.entity_has_changed, event.room_id, event.internal_metadata.stream_ordering, ) - depth_updates = {} - for event, _ in events_and_contexts: - if event.internal_metadata.is_outlier(): - continue - depth_updates[event.room_id] = max( - event.depth, depth_updates.get(event.room_id, event.depth) + if not event.internal_metadata.is_outlier(): + depth_updates[event.room_id] = max( + event.depth, depth_updates.get(event.room_id, event.depth) + ) + + if context.push_actions: + self._set_push_actions_for_event_and_users( + txn, event, context.push_actions + ) + + if event.type == EventTypes.Redaction and event.redacts is not None: + self._remove_push_actions_for_event_id( + txn, event.room_id, event.redacts ) for room_id, depth in depth_updates.items(): -- cgit 1.4.1 From b9977ea667889f6cf89464c92fc57cbcae7cca28 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 18 Feb 2016 16:05:13 +0000 Subject: Remove dead code for setting device specific rules. It wasn't possible to hit the code from the API because of a typo in parsing the request path. Since no-one was using the feature we might as well remove the dead code. --- synapse/push/__init__.py | 7 ++- synapse/push/action_generator.py | 2 +- synapse/push/bulk_push_rule_evaluator.py | 2 +- synapse/push/httppusher.py | 3 +- synapse/push/push_rule_evaluator.py | 15 ++---- synapse/push/pusherpool.py | 48 +++++++---------- synapse/rest/client/v1/push_rule.py | 90 ++------------------------------ synapse/rest/client/v1/pusher.py | 6 +-- synapse/storage/event_push_actions.py | 7 ++- synapse/storage/pusher.py | 6 +-- 10 files changed, 45 insertions(+), 141 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 8da2d8716c..4c6c3b83a2 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -47,14 +47,13 @@ class Pusher(object): MAX_BACKOFF = 60 * 60 * 1000 GIVE_UP_AFTER = 24 * 60 * 60 * 1000 - def __init__(self, _hs, profile_tag, user_id, app_id, + def __init__(self, _hs, user_id, app_id, app_display_name, device_display_name, pushkey, pushkey_ts, data, last_token, last_success, failing_since): self.hs = _hs self.evStreamHandler = self.hs.get_handlers().event_stream_handler self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() - self.profile_tag = profile_tag self.user_id = user_id self.app_id = app_id self.app_display_name = app_display_name @@ -186,8 +185,8 @@ class Pusher(object): processed = False rule_evaluator = yield \ - push_rule_evaluator.evaluator_for_user_id_and_profile_tag( - self.user_id, self.profile_tag, single_event['room_id'], self.store + push_rule_evaluator.evaluator_for_user_id( + self.user_id, single_event['room_id'], self.store ) actions = yield rule_evaluator.actions_for_event(single_event) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index e0da0868ec..c6c1dc769e 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -44,5 +44,5 @@ class ActionGenerator: ) context.push_actions = [ - (uid, None, actions) for uid, actions in actions_by_user.items() + (uid, actions) for uid, actions in actions_by_user.items() ] diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 8ac5ceb9ef..0a23b3f102 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -152,7 +152,7 @@ def _condition_checker(evaluator, conditions, uid, display_name, cache): elif res is True: continue - res = evaluator.matches(cond, uid, display_name, None) + res = evaluator.matches(cond, uid, display_name) if _id: cache[_id] = bool(res) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index cdc4494928..9be4869360 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -23,12 +23,11 @@ logger = logging.getLogger(__name__) class HttpPusher(Pusher): - def __init__(self, _hs, profile_tag, user_id, app_id, + def __init__(self, _hs, user_id, app_id, app_display_name, device_display_name, pushkey, pushkey_ts, data, last_token, last_success, failing_since): super(HttpPusher, self).__init__( _hs, - profile_tag, user_id, app_id, app_display_name, diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 2a2b4437dc..98e2a2015e 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -33,7 +33,7 @@ INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$") @defer.inlineCallbacks -def evaluator_for_user_id_and_profile_tag(user_id, profile_tag, room_id, store): +def evaluator_for_user_id(user_id, room_id, store): rawrules = yield store.get_push_rules_for_user(user_id) enabled_map = yield store.get_push_rules_enabled_for_user(user_id) our_member_event = yield store.get_current_state( @@ -43,7 +43,7 @@ def evaluator_for_user_id_and_profile_tag(user_id, profile_tag, room_id, store): ) defer.returnValue(PushRuleEvaluator( - user_id, profile_tag, rawrules, enabled_map, + user_id, rawrules, enabled_map, room_id, our_member_event, store )) @@ -77,10 +77,9 @@ def _room_member_count(ev, condition, room_member_count): class PushRuleEvaluator: DEFAULT_ACTIONS = [] - def __init__(self, user_id, profile_tag, raw_rules, enabled_map, room_id, + def __init__(self, user_id, raw_rules, enabled_map, room_id, our_member_event, store): self.user_id = user_id - self.profile_tag = profile_tag self.room_id = room_id self.our_member_event = our_member_event self.store = store @@ -152,7 +151,7 @@ class PushRuleEvaluator: matches = True for c in conditions: matches = evaluator.matches( - c, self.user_id, my_display_name, self.profile_tag + c, self.user_id, my_display_name ) if not matches: break @@ -189,13 +188,9 @@ class PushRuleEvaluatorForEvent(object): # Maps strings of e.g. 'content.body' -> event["content"]["body"] self._value_cache = _flatten_dict(event) - def matches(self, condition, user_id, display_name, profile_tag): + def matches(self, condition, user_id, display_name): if condition['kind'] == 'event_match': return self._event_match(condition, user_id) - elif condition['kind'] == 'device': - if 'profile_tag' not in condition: - return True - return condition['profile_tag'] == profile_tag elif condition['kind'] == 'contains_display_name': return self._contains_display_name(display_name) elif condition['kind'] == 'room_member_count': diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index d7dcb2de4b..a05aa5f661 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -29,6 +29,7 @@ class PusherPool: def __init__(self, _hs): self.hs = _hs self.store = self.hs.get_datastore() + self.clock = self.hs.get_clock() self.pushers = {} self.last_pusher_started = -1 @@ -38,8 +39,11 @@ class PusherPool: self._start_pushers(pushers) @defer.inlineCallbacks - def add_pusher(self, user_id, access_token, profile_tag, kind, app_id, - app_display_name, device_display_name, pushkey, lang, data): + def add_pusher(self, user_id, access_token, kind, app_id, + app_display_name, device_display_name, pushkey, lang, data, + profile_tag=""): + time_now_msec = self.clock.time_msec() + # we try to create the pusher just to validate the config: it # will then get pulled out of the database, # recreated, added and started: this means we have only one @@ -47,23 +51,31 @@ class PusherPool: self._create_pusher({ "user_name": user_id, "kind": kind, - "profile_tag": profile_tag, "app_id": app_id, "app_display_name": app_display_name, "device_display_name": device_display_name, "pushkey": pushkey, - "ts": self.hs.get_clock().time_msec(), + "ts": time_now_msec, "lang": lang, "data": data, "last_token": None, "last_success": None, "failing_since": None }) - yield self._add_pusher_to_store( - user_id, access_token, profile_tag, kind, app_id, - app_display_name, device_display_name, - pushkey, lang, data + yield self.store.add_pusher( + user_id=user_id, + access_token=access_token, + kind=kind, + app_id=app_id, + app_display_name=app_display_name, + device_display_name=device_display_name, + pushkey=pushkey, + pushkey_ts=time_now_msec, + lang=lang, + data=data, + profile_tag=profile_tag, ) + yield self._refresh_pusher(app_id, pushkey, user_id) @defer.inlineCallbacks def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey, @@ -94,30 +106,10 @@ class PusherPool: ) yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) - @defer.inlineCallbacks - def _add_pusher_to_store(self, user_id, access_token, profile_tag, kind, - app_id, app_display_name, device_display_name, - pushkey, lang, data): - yield self.store.add_pusher( - user_id=user_id, - access_token=access_token, - profile_tag=profile_tag, - kind=kind, - app_id=app_id, - app_display_name=app_display_name, - device_display_name=device_display_name, - pushkey=pushkey, - pushkey_ts=self.hs.get_clock().time_msec(), - lang=lang, - data=data, - ) - yield self._refresh_pusher(app_id, pushkey, user_id) - def _create_pusher(self, pusherdict): if pusherdict['kind'] == 'http': return HttpPusher( self.hs, - profile_tag=pusherdict['profile_tag'], user_id=pusherdict['user_name'], app_id=pusherdict['app_id'], app_display_name=pusherdict['app_display_name'], diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 7766b8be1d..5db2805d68 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -60,7 +60,6 @@ class PushRuleRestServlet(ClientV1RestServlet): spec['template'], spec['rule_id'], content, - device=spec['device'] if 'device' in spec else None ) except InvalidRuleException as e: raise SynapseError(400, e.message) @@ -153,23 +152,7 @@ class PushRuleRestServlet(ClientV1RestServlet): elif pattern_type == "user_localpart": c["pattern"] = user.localpart - if r['priority_class'] > PRIORITY_CLASS_MAP['override']: - # per-device rule - profile_tag = _profile_tag_from_conditions(r["conditions"]) - r = _strip_device_condition(r) - if not profile_tag: - continue - if profile_tag not in rules['device']: - rules['device'][profile_tag] = {} - rules['device'][profile_tag] = ( - _add_empty_priority_class_arrays( - rules['device'][profile_tag] - ) - ) - - rulearray = rules['device'][profile_tag][template_name] - else: - rulearray = rules['global'][template_name] + rulearray = rules['global'][template_name] template_rule = _rule_to_template(r) if template_rule: @@ -195,24 +178,6 @@ class PushRuleRestServlet(ClientV1RestServlet): path = path[1:] result = _filter_ruleset_with_path(rules['global'], path) defer.returnValue((200, result)) - elif path[0] == 'device': - path = path[1:] - if path == []: - raise UnrecognizedRequestError( - PushRuleRestServlet.SLIGHTLY_PEDANTIC_TRAILING_SLASH_ERROR - ) - if path[0] == '': - defer.returnValue((200, rules['device'])) - - profile_tag = path[0] - path = path[1:] - if profile_tag not in rules['device']: - ret = {} - ret = _add_empty_priority_class_arrays(ret) - defer.returnValue((200, ret)) - ruleset = rules['device'][profile_tag] - result = _filter_ruleset_with_path(ruleset, path) - defer.returnValue((200, result)) else: raise UnrecognizedRequestError() @@ -252,16 +217,9 @@ def _rule_spec_from_path(path): scope = path[1] path = path[2:] - if scope not in ['global', 'device']: + if scope != 'global': raise UnrecognizedRequestError() - device = None - if scope == 'device': - if len(path) == 0: - raise UnrecognizedRequestError() - device = path[0] - path = path[1:] - if len(path) == 0: raise UnrecognizedRequestError() @@ -278,8 +236,6 @@ def _rule_spec_from_path(path): 'template': template, 'rule_id': rule_id } - if device: - spec['profile_tag'] = device path = path[1:] @@ -289,7 +245,7 @@ def _rule_spec_from_path(path): return spec -def _rule_tuple_from_request_object(rule_template, rule_id, req_obj, device=None): +def _rule_tuple_from_request_object(rule_template, rule_id, req_obj): if rule_template in ['override', 'underride']: if 'conditions' not in req_obj: raise InvalidRuleException("Missing 'conditions'") @@ -322,12 +278,6 @@ def _rule_tuple_from_request_object(rule_template, rule_id, req_obj, device=None else: raise InvalidRuleException("Unknown rule template: %s" % (rule_template,)) - if device: - conditions.append({ - 'kind': 'device', - 'profile_tag': device - }) - if 'actions' not in req_obj: raise InvalidRuleException("No actions found") actions = req_obj['actions'] @@ -349,17 +299,6 @@ def _add_empty_priority_class_arrays(d): return d -def _profile_tag_from_conditions(conditions): - """ - Given a list of conditions, return the profile tag of the - device rule if there is one - """ - for c in conditions: - if c['kind'] == 'device': - return c['profile_tag'] - return None - - def _filter_ruleset_with_path(ruleset, path): if path == []: raise UnrecognizedRequestError( @@ -403,19 +342,11 @@ def _priority_class_from_spec(spec): raise InvalidRuleException("Unknown template: %s" % (spec['template'])) pc = PRIORITY_CLASS_MAP[spec['template']] - if spec['scope'] == 'device': - pc += len(PRIORITY_CLASS_MAP) - return pc def _priority_class_to_template_name(pc): - if pc > PRIORITY_CLASS_MAP['override']: - # per-device - prio_class_index = pc - len(PRIORITY_CLASS_MAP) - return PRIORITY_CLASS_INVERSE_MAP[prio_class_index] - else: - return PRIORITY_CLASS_INVERSE_MAP[pc] + return PRIORITY_CLASS_INVERSE_MAP[pc] def _rule_to_template(rule): @@ -445,23 +376,12 @@ def _rule_to_template(rule): return templaterule -def _strip_device_condition(rule): - for i, c in enumerate(rule['conditions']): - if c['kind'] == 'device': - del rule['conditions'][i] - return rule - - def _namespaced_rule_id_from_spec(spec): return _namespaced_rule_id(spec, spec['rule_id']) def _namespaced_rule_id(spec, rule_id): - if spec['scope'] == 'global': - scope = 'global' - else: - scope = 'device/%s' % (spec['profile_tag']) - return "%s/%s/%s" % (scope, spec['template'], rule_id) + return "global/%s/%s" % (spec['template'], rule_id) def _rule_id_from_namespaced(in_rule_id): diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py index 5547f1b112..4c662e6e3c 100644 --- a/synapse/rest/client/v1/pusher.py +++ b/synapse/rest/client/v1/pusher.py @@ -45,7 +45,7 @@ class PusherRestServlet(ClientV1RestServlet): ) defer.returnValue((200, {})) - reqd = ['profile_tag', 'kind', 'app_id', 'app_display_name', + reqd = ['kind', 'app_id', 'app_display_name', 'device_display_name', 'pushkey', 'lang', 'data'] missing = [] for i in reqd: @@ -73,14 +73,14 @@ class PusherRestServlet(ClientV1RestServlet): yield pusher_pool.add_pusher( user_id=user.to_string(), access_token=requester.access_token_id, - profile_tag=content['profile_tag'], kind=content['kind'], app_id=content['app_id'], app_display_name=content['app_display_name'], device_display_name=content['device_display_name'], pushkey=content['pushkey'], lang=content['lang'], - data=content['data'] + data=content['data'], + profile_tag=content.get('profile_tag', ""), ) except PusherConfigException as pce: raise SynapseError(400, "Config Error: " + pce.message, diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index d77a817682..5820539a92 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -27,15 +27,14 @@ class EventPushActionsStore(SQLBaseStore): def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ :param event: the event set actions for - :param tuples: list of tuples of (user_id, profile_tag, actions) + :param tuples: list of tuples of (user_id, actions) """ values = [] - for uid, profile_tag, actions in tuples: + for uid, actions in tuples: values.append({ 'room_id': event.room_id, 'event_id': event.event_id, 'user_id': uid, - 'profile_tag': profile_tag, 'actions': json.dumps(actions), 'stream_ordering': event.internal_metadata.stream_ordering, 'topological_ordering': event.depth, @@ -43,7 +42,7 @@ class EventPushActionsStore(SQLBaseStore): 'highlight': 1 if _action_has_highlight(actions) else 0, }) - for uid, _, __ in tuples: + for uid, __ in tuples: txn.call_after( self.get_unread_event_push_actions_by_room_for_user.invalidate_many, (event.room_id, uid) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 8ec706178a..c23648cdbc 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -80,9 +80,9 @@ class PusherStore(SQLBaseStore): defer.returnValue(rows) @defer.inlineCallbacks - def add_pusher(self, user_id, access_token, profile_tag, kind, app_id, + def add_pusher(self, user_id, access_token, kind, app_id, app_display_name, device_display_name, - pushkey, pushkey_ts, lang, data): + pushkey, pushkey_ts, lang, data, profile_tag=""): try: next_id = yield self._pushers_id_gen.get_next() yield self._simple_upsert( @@ -95,12 +95,12 @@ class PusherStore(SQLBaseStore): dict( access_token=access_token, kind=kind, - profile_tag=profile_tag, app_display_name=app_display_name, device_display_name=device_display_name, ts=pushkey_ts, lang=lang, data=encode_canonical_json(data), + profile_tag=profile_tag, ), insertion_values=dict( id=next_id, -- cgit 1.4.1 From b71ca2b0140fa4d7866ebb10ee49556de7eff44f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 19 Feb 2016 11:41:02 +0000 Subject: Allow guest users access to messages in rooms they have joined There should be no difference between guest users and non-guest users in terms of access to messages. Define the semantics of the is_peeking argument to filter_events_for_clients (slightly) better; interpret it appropriately, and set it correctly from /sync. --- synapse/handlers/_base.py | 52 +++++++++++++++++++++++++------- synapse/handlers/sync.py | 2 -- synapse/push/bulk_push_rule_evaluator.py | 2 +- 3 files changed, 42 insertions(+), 14 deletions(-) (limited to 'synapse/push') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 064e8723c8..da219184c5 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -53,9 +53,15 @@ class BaseHandler(object): self.event_builder_factory = hs.get_event_builder_factory() @defer.inlineCallbacks - def _filter_events_for_clients(self, user_tuples, events, event_id_to_state): + def filter_events_for_clients(self, user_tuples, events, event_id_to_state): """ Returns dict of user_id -> list of events that user is allowed to see. + + :param (str, bool) user_tuples: (user id, is_peeking) for each + user to be checked. is_peeking should be true if: + * the user is not currently a member of the room, and: + * the user has not been a member of the room since the given + events """ forgotten = yield defer.gatherResults([ self.store.who_forgot_in_room( @@ -72,18 +78,20 @@ class BaseHandler(object): def allowed(event, user_id, is_peeking): state = event_id_to_state[event.event_id] + # get the room_visibility at the time of the event. visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None) if visibility_event: visibility = visibility_event.content.get("history_visibility", "shared") else: visibility = "shared" + # if it was world_readable, it's easy: everyone can read it if visibility == "world_readable": return True - if is_peeking: - return False - + # get the user's membership at the time of the event. (or rather, + # just *after* the event. Which means that people can see their + # own join events, but not (currently) their own leave events.) membership_event = state.get((EventTypes.Member, user_id), None) if membership_event: if membership_event.event_id in event_id_forgotten: @@ -93,20 +101,32 @@ class BaseHandler(object): else: membership = None + # if the user was a member of the room at the time of the event, + # they can see it. if membership == Membership.JOIN: return True if event.type == EventTypes.RoomHistoryVisibility: - return not is_peeking + # XXX why are m.room.history_visibility events special? + # return True + pass if visibility == "shared": - return True - elif visibility == "joined": - return membership == Membership.JOIN + # user can also see the event if he has become a member since + # the event + # + # XXX: if the user has subsequently joined and then left again, + # ideally we would share history up to the point they left. But + # we don't know when they left. + return not is_peeking elif visibility == "invited": + # user can also see the event if he was *invited* at the time + # of the event. return membership == Membership.INVITE - return True + # presumably visibility is "joined"; we weren't a member at the + # time of the event, so we're done. + return False defer.returnValue({ user_id: [ @@ -119,7 +139,17 @@ class BaseHandler(object): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, events, is_peeking=False): - # Assumes that user has at some point joined the room if not is_guest. + """ + Check which events a user is allowed to see + + :param str user_id: user id to be checked + :param [synapse.events.EventBase] events: list of events to be checked + :param bool is_peeking should be True if: + * the user is not currently a member of the room, and: + * the user has not been a member of the room since the given + events + :rtype [synapse.events.EventBase] + """ types = ( (EventTypes.RoomHistoryVisibility, ""), (EventTypes.Member, user_id), @@ -128,7 +158,7 @@ class BaseHandler(object): frozenset(e.event_id for e in events), types=types ) - res = yield self._filter_events_for_clients( + res = yield self.filter_events_for_clients( [(user_id, is_peeking)], events, event_id_to_state ) defer.returnValue(res.get(user_id, [])) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 1d0f0058a2..f5122b5fb1 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -623,7 +623,6 @@ class SyncHandler(BaseHandler): recents = yield self._filter_events_for_client( sync_config.user.to_string(), recents, - is_peeking=sync_config.is_guest, ) else: recents = [] @@ -645,7 +644,6 @@ class SyncHandler(BaseHandler): loaded_recents = yield self._filter_events_for_client( sync_config.user.to_string(), loaded_recents, - is_peeking=sync_config.is_guest, ) loaded_recents.extend(recents) recents = loaded_recents diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 8ac5ceb9ef..206b20e15f 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -103,7 +103,7 @@ class BulkPushRuleEvaluator: users_dict = yield self.store.are_guests(self.rules_by_user.keys()) - filtered_by_user = yield handler._filter_events_for_clients( + filtered_by_user = yield handler.filter_events_for_clients( users_dict.items(), [event], {event.event_id: current_state} ) -- cgit 1.4.1 From de27f7fc79b785961181d13749468ae3e2019772 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 26 Feb 2016 14:28:19 +0000 Subject: Add support for changing the actions for default rules See matrix-org/matrix-doc#283 Works by adding dummy rules to the push rules table with a negative priority class and then using those rules to clobber the default rule actions when adding the default rules in ``list_with_base_rules`` --- synapse/push/baserules.py | 57 ++++++++++++++++++++++++++++++++----- synapse/rest/client/v1/push_rule.py | 31 +++++++++++++++++--- synapse/storage/push_rule.py | 25 ++++++++++++++++ 3 files changed, 102 insertions(+), 11 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py index 0832c77cb4..86a2998bcc 100644 --- a/synapse/push/baserules.py +++ b/synapse/push/baserules.py @@ -13,46 +13,67 @@ # limitations under the License. from synapse.push.rulekinds import PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP +import copy def list_with_base_rules(rawrules): + """Combine the list of rules set by the user with the default push rules + + :param list rawrules: The rules the user has modified or set. + :returns: A new list with the rules set by the user combined with the + defaults. + """ ruleslist = [] + # Grab the base rules that the user has modified. + # The modified base rules have a priority_class of -1. + modified_base_rules = { + r['rule_id']: r for r in rawrules if r['priority_class'] < 0 + } + + # Remove the modified base rules from the list, They'll be added back + # in the default postions in the list. + rawrules = [r for r in rawrules if r['priority_class'] >= 0] + # shove the server default rules for each kind onto the end of each current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1] ruleslist.extend(make_base_prepend_rules( - PRIORITY_CLASS_INVERSE_MAP[current_prio_class] + PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules )) for r in rawrules: if r['priority_class'] < current_prio_class: while r['priority_class'] < current_prio_class: ruleslist.extend(make_base_append_rules( - PRIORITY_CLASS_INVERSE_MAP[current_prio_class] + PRIORITY_CLASS_INVERSE_MAP[current_prio_class], + modified_base_rules, )) current_prio_class -= 1 if current_prio_class > 0: ruleslist.extend(make_base_prepend_rules( - PRIORITY_CLASS_INVERSE_MAP[current_prio_class] + PRIORITY_CLASS_INVERSE_MAP[current_prio_class], + modified_base_rules, )) ruleslist.append(r) while current_prio_class > 0: ruleslist.extend(make_base_append_rules( - PRIORITY_CLASS_INVERSE_MAP[current_prio_class] + PRIORITY_CLASS_INVERSE_MAP[current_prio_class], + modified_base_rules, )) current_prio_class -= 1 if current_prio_class > 0: ruleslist.extend(make_base_prepend_rules( - PRIORITY_CLASS_INVERSE_MAP[current_prio_class] + PRIORITY_CLASS_INVERSE_MAP[current_prio_class], + modified_base_rules, )) return ruleslist -def make_base_append_rules(kind): +def make_base_append_rules(kind, modified_base_rules): rules = [] if kind == 'override': @@ -62,15 +83,31 @@ def make_base_append_rules(kind): elif kind == 'content': rules = BASE_APPEND_CONTENT_RULES + # Copy the rules before modifying them + rules = copy.deepcopy(rules) + for r in rules: + # Only modify the actions, keep the conditions the same. + modified = modified_base_rules.get(r['rule_id']) + if modified: + r['actions'] = modified['actions'] + return rules -def make_base_prepend_rules(kind): +def make_base_prepend_rules(kind, modified_base_rules): rules = [] if kind == 'override': rules = BASE_PREPEND_OVERRIDE_RULES + # Copy the rules before modifying them + rules = copy.deepcopy(rules) + for r in rules: + # Only modify the actions, keep the conditions the same. + modified = modified_base_rules.get(r['rule_id']) + if modified: + r['actions'] = modified['actions'] + return rules @@ -263,18 +300,24 @@ BASE_APPEND_UNDERRIDE_RULES = [ ] +BASE_RULE_IDS = set() + for r in BASE_APPEND_CONTENT_RULES: r['priority_class'] = PRIORITY_CLASS_MAP['content'] r['default'] = True + BASE_RULE_IDS.add(r['rule_id']) for r in BASE_PREPEND_OVERRIDE_RULES: r['priority_class'] = PRIORITY_CLASS_MAP['override'] r['default'] = True + BASE_RULE_IDS.add(r['rule_id']) for r in BASE_APPEND_OVRRIDE_RULES: r['priority_class'] = PRIORITY_CLASS_MAP['override'] r['default'] = True + BASE_RULE_IDS.add(r['rule_id']) for r in BASE_APPEND_UNDERRIDE_RULES: r['priority_class'] = PRIORITY_CLASS_MAP['underride'] r['default'] = True + BASE_RULE_IDS.add(r['rule_id']) diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index d26e4cde3e..970a019223 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -22,7 +22,7 @@ from .base import ClientV1RestServlet, client_path_patterns from synapse.storage.push_rule import ( InconsistentRuleException, RuleNotFoundException ) -import synapse.push.baserules as baserules +from synapse.push.baserules import list_with_base_rules, BASE_RULE_IDS from synapse.push.rulekinds import ( PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP ) @@ -55,6 +55,10 @@ class PushRuleRestServlet(ClientV1RestServlet): yield self.set_rule_attr(requester.user.to_string(), spec, content) defer.returnValue((200, {})) + if spec['rule_id'].startswith('.'): + # Rule ids starting with '.' are reserved for server default rules. + raise SynapseError(400, "cannot add new rule_ids that start with '.'") + try: (conditions, actions) = _rule_tuple_from_request_object( spec['template'], @@ -128,7 +132,7 @@ class PushRuleRestServlet(ClientV1RestServlet): ruleslist.append(rule) # We're going to be mutating this a lot, so do a deep copy - ruleslist = copy.deepcopy(baserules.list_with_base_rules(ruleslist)) + ruleslist = copy.deepcopy(list_with_base_rules(ruleslist)) rules = {'global': {}, 'device': {}} @@ -197,6 +201,18 @@ class PushRuleRestServlet(ClientV1RestServlet): return self.hs.get_datastore().set_push_rule_enabled( user_id, namespaced_rule_id, val ) + elif spec['attr'] == 'actions': + actions = val.get('actions') + _check_actions(actions) + namespaced_rule_id = _namespaced_rule_id_from_spec(spec) + rule_id = spec['rule_id'] + is_default_rule = rule_id.startswith(".") + if is_default_rule: + if namespaced_rule_id not in BASE_RULE_IDS: + raise SynapseError(404, "Unknown rule %r" % (namespaced_rule_id,)) + return self.hs.get_datastore().set_push_rule_actions( + user_id, namespaced_rule_id, actions, is_default_rule + ) else: raise UnrecognizedRequestError() @@ -274,6 +290,15 @@ def _rule_tuple_from_request_object(rule_template, rule_id, req_obj): raise InvalidRuleException("No actions found") actions = req_obj['actions'] + _check_actions(actions) + + return conditions, actions + + +def _check_actions(actions): + if not isinstance(actions, list): + raise InvalidRuleException("No actions found") + for a in actions: if a in ['notify', 'dont_notify', 'coalesce']: pass @@ -282,8 +307,6 @@ def _rule_tuple_from_request_object(rule_template, rule_id, req_obj): else: raise InvalidRuleException("Unrecognised action") - return conditions, actions - def _add_empty_priority_class_arrays(d): for pc in PRIORITY_CLASS_MAP.keys(): diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index e19a81e41f..bb5c14d912 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -294,6 +294,31 @@ class PushRuleStore(SQLBaseStore): self.get_push_rules_enabled_for_user.invalidate, (user_id,) ) + def set_push_rule_actions(self, user_id, rule_id, actions, is_default_rule): + actions_json = json.dumps(actions) + + def set_push_rule_actions_txn(txn): + if is_default_rule: + # Add a dummy rule to the rules table with the user specified + # actions. + priority_class = -1 + priority = 1 + self._upsert_push_rule_txn( + txn, user_id, rule_id, priority_class, priority, + "[]", actions_json + ) + else: + self._simple_update_one_txn( + txn, + "push_rules", + {'user_name': user_id, 'rule_id': rule_id}, + {'actions': actions_json}, + ) + + return self.runInteraction( + "set_push_rule_actions", set_push_rule_actions_txn, + ) + class RuleNotFoundException(Exception): pass -- cgit 1.4.1 From 3406eba4ef40de888ebb5b22c0ea4925b2dddeb1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 3 Mar 2016 16:11:59 +0000 Subject: Move the code for formatting push rules into a separate function --- synapse/push/clientformat.py | 112 ++++++++++++++++++++++++++++++++++++ synapse/rest/client/v1/push_rule.py | 90 ++--------------------------- 2 files changed, 116 insertions(+), 86 deletions(-) create mode 100644 synapse/push/clientformat.py (limited to 'synapse/push') diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py new file mode 100644 index 0000000000..ae9db9ec2f --- /dev/null +++ b/synapse/push/clientformat.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.push.baserules import list_with_base_rules + +from synapse.push.rulekinds import ( + PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP +) + +import copy +import simplejson as json + + +def format_push_rules_for_user(user, rawrules, enabled_map): + """Converts a list of rawrules and a enabled map into nested dictionaries + to match the Matrix client-server format for push rules""" + + ruleslist = [] + for rawrule in rawrules: + rule = dict(rawrule) + rule["conditions"] = json.loads(rawrule["conditions"]) + rule["actions"] = json.loads(rawrule["actions"]) + ruleslist.append(rule) + + # We're going to be mutating this a lot, so do a deep copy + ruleslist = copy.deepcopy(list_with_base_rules(ruleslist)) + + rules = {'global': {}, 'device': {}} + + rules['global'] = _add_empty_priority_class_arrays(rules['global']) + + for r in ruleslist: + rulearray = None + + template_name = _priority_class_to_template_name(r['priority_class']) + + # Remove internal stuff. + for c in r["conditions"]: + c.pop("_id", None) + + pattern_type = c.pop("pattern_type", None) + if pattern_type == "user_id": + c["pattern"] = user.to_string() + elif pattern_type == "user_localpart": + c["pattern"] = user.localpart + + rulearray = rules['global'][template_name] + + template_rule = _rule_to_template(r) + if template_rule: + if r['rule_id'] in enabled_map: + template_rule['enabled'] = enabled_map[r['rule_id']] + elif 'enabled' in r: + template_rule['enabled'] = r['enabled'] + else: + template_rule['enabled'] = True + rulearray.append(template_rule) + + return rules + + +def _add_empty_priority_class_arrays(d): + for pc in PRIORITY_CLASS_MAP.keys(): + d[pc] = [] + return d + + +def _rule_to_template(rule): + unscoped_rule_id = None + if 'rule_id' in rule: + unscoped_rule_id = _rule_id_from_namespaced(rule['rule_id']) + + template_name = _priority_class_to_template_name(rule['priority_class']) + if template_name in ['override', 'underride']: + templaterule = {k: rule[k] for k in ["conditions", "actions"]} + elif template_name in ["sender", "room"]: + templaterule = {'actions': rule['actions']} + unscoped_rule_id = rule['conditions'][0]['pattern'] + elif template_name == 'content': + if len(rule["conditions"]) != 1: + return None + thecond = rule["conditions"][0] + if "pattern" not in thecond: + return None + templaterule = {'actions': rule['actions']} + templaterule["pattern"] = thecond["pattern"] + + if unscoped_rule_id: + templaterule['rule_id'] = unscoped_rule_id + if 'default' in rule: + templaterule['default'] = rule['default'] + return templaterule + + +def _rule_id_from_namespaced(in_rule_id): + return in_rule_id.split('/')[-1] + + +def _priority_class_to_template_name(pc): + return PRIORITY_CLASS_INVERSE_MAP[pc] diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index cf68725ca1..edfe28c79b 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -22,12 +22,10 @@ from .base import ClientV1RestServlet, client_path_patterns from synapse.storage.push_rule import ( InconsistentRuleException, RuleNotFoundException ) -from synapse.push.baserules import list_with_base_rules, BASE_RULE_IDS -from synapse.push.rulekinds import ( - PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP -) +from synapse.push.clientformat import format_push_rules_for_user +from synapse.push.baserules import BASE_RULE_IDS +from synapse.push.rulekinds import PRIORITY_CLASS_MAP -import copy import simplejson as json @@ -133,48 +131,9 @@ class PushRuleRestServlet(ClientV1RestServlet): # is probably not going to make a whole lot of difference rawrules = yield self.store.get_push_rules_for_user(user_id) - ruleslist = [] - for rawrule in rawrules: - rule = dict(rawrule) - rule["conditions"] = json.loads(rawrule["conditions"]) - rule["actions"] = json.loads(rawrule["actions"]) - ruleslist.append(rule) - - # We're going to be mutating this a lot, so do a deep copy - ruleslist = copy.deepcopy(list_with_base_rules(ruleslist)) - - rules = {'global': {}, 'device': {}} - - rules['global'] = _add_empty_priority_class_arrays(rules['global']) - enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id) - for r in ruleslist: - rulearray = None - - template_name = _priority_class_to_template_name(r['priority_class']) - - # Remove internal stuff. - for c in r["conditions"]: - c.pop("_id", None) - - pattern_type = c.pop("pattern_type", None) - if pattern_type == "user_id": - c["pattern"] = user_id - elif pattern_type == "user_localpart": - c["pattern"] = requester.user.localpart - - rulearray = rules['global'][template_name] - - template_rule = _rule_to_template(r) - if template_rule: - if r['rule_id'] in enabled_map: - template_rule['enabled'] = enabled_map[r['rule_id']] - elif 'enabled' in r: - template_rule['enabled'] = r['enabled'] - else: - template_rule['enabled'] = True - rulearray.append(template_rule) + rules = format_push_rules_for_user(requester.user, rawrules, enabled_map) path = request.postpath[1:] @@ -322,12 +281,6 @@ def _check_actions(actions): raise InvalidRuleException("Unrecognised action") -def _add_empty_priority_class_arrays(d): - for pc in PRIORITY_CLASS_MAP.keys(): - d[pc] = [] - return d - - def _filter_ruleset_with_path(ruleset, path): if path == []: raise UnrecognizedRequestError( @@ -376,37 +329,6 @@ def _priority_class_from_spec(spec): return pc -def _priority_class_to_template_name(pc): - return PRIORITY_CLASS_INVERSE_MAP[pc] - - -def _rule_to_template(rule): - unscoped_rule_id = None - if 'rule_id' in rule: - unscoped_rule_id = _rule_id_from_namespaced(rule['rule_id']) - - template_name = _priority_class_to_template_name(rule['priority_class']) - if template_name in ['override', 'underride']: - templaterule = {k: rule[k] for k in ["conditions", "actions"]} - elif template_name in ["sender", "room"]: - templaterule = {'actions': rule['actions']} - unscoped_rule_id = rule['conditions'][0]['pattern'] - elif template_name == 'content': - if len(rule["conditions"]) != 1: - return None - thecond = rule["conditions"][0] - if "pattern" not in thecond: - return None - templaterule = {'actions': rule['actions']} - templaterule["pattern"] = thecond["pattern"] - - if unscoped_rule_id: - templaterule['rule_id'] = unscoped_rule_id - if 'default' in rule: - templaterule['default'] = rule['default'] - return templaterule - - def _namespaced_rule_id_from_spec(spec): return _namespaced_rule_id(spec, spec['rule_id']) @@ -415,10 +337,6 @@ def _namespaced_rule_id(spec, rule_id): return "global/%s/%s" % (spec['template'], rule_id) -def _rule_id_from_namespaced(in_rule_id): - return in_rule_id.split('/')[-1] - - class InvalidRuleException(Exception): pass -- cgit 1.4.1 From 7076082ae677b280c5b68df37b1fee2fc72752ff Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 8 Mar 2016 11:45:50 +0000 Subject: Fix relative imports so they work in both py3 and py27 --- synapse/push/__init__.py | 4 ++-- synapse/push/action_generator.py | 4 ++-- synapse/push/bulk_push_rule_evaluator.py | 6 +++--- synapse/push/push_rule_evaluator.py | 4 ++-- synapse/push/pusherpool.py | 2 +- synapse/rest/client/v1/admin.py | 2 +- synapse/rest/client/v1/initial_sync.py | 2 +- synapse/rest/client/v1/login.py | 2 +- synapse/rest/client/v1/register.py | 2 +- synapse/rest/client/v1/room.py | 2 +- synapse/rest/client/v1/voip.py | 2 +- synapse/storage/__init__.py | 2 +- synapse/storage/end_to_end_keys.py | 2 +- synapse/storage/events.py | 2 +- synapse/storage/keys.py | 2 +- synapse/storage/media_repository.py | 2 +- synapse/storage/signatures.py | 2 +- 17 files changed, 22 insertions(+), 22 deletions(-) (limited to 'synapse/push') diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 4c6c3b83a2..65ef1b68a3 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -21,7 +21,7 @@ from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure import synapse.util.async -import push_rule_evaluator as push_rule_evaluator +from .push_rule_evaluator import evaluator_for_user_id import logging import random @@ -185,7 +185,7 @@ class Pusher(object): processed = False rule_evaluator = yield \ - push_rule_evaluator.evaluator_for_user_id( + evaluator_for_user_id( self.user_id, single_event['room_id'], self.store ) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index c6c1dc769e..84efcdd184 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -15,7 +15,7 @@ from twisted.internet import defer -import bulk_push_rule_evaluator +from .bulk_push_rule_evaluator import evaluator_for_room_id import logging @@ -35,7 +35,7 @@ class ActionGenerator: @defer.inlineCallbacks def handle_push_actions_for_event(self, event, context, handler): - bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( + bulk_evaluator = yield evaluator_for_room_id( event.room_id, self.hs, self.store ) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 5d8be483e5..87d5061fb0 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -18,8 +18,8 @@ import ujson as json from twisted.internet import defer -import baserules -from push_rule_evaluator import PushRuleEvaluatorForEvent +from .baserules import list_with_base_rules +from .push_rule_evaluator import PushRuleEvaluatorForEvent from synapse.api.constants import EventTypes @@ -39,7 +39,7 @@ def _get_rules(room_id, user_ids, store): rules_enabled_by_user = yield store.bulk_get_push_rules_enabled(user_ids) rules_by_user = { - uid: baserules.list_with_base_rules([ + uid: list_with_base_rules([ decode_rule_json(rule_list) for rule_list in rules_by_user.get(uid, []) ]) diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 98e2a2015e..51f73a5b78 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -15,7 +15,7 @@ from twisted.internet import defer -import baserules +from .baserules import list_with_base_rules import logging import simplejson as json @@ -91,7 +91,7 @@ class PushRuleEvaluator: rule['actions'] = json.loads(raw_rule['actions']) rules.append(rule) - self.rules = baserules.list_with_base_rules(rules) + self.rules = list_with_base_rules(rules) self.enabled_map = enabled_map diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index a05aa5f661..772a095f8b 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -16,7 +16,7 @@ from twisted.internet import defer -from httppusher import HttpPusher +from .httppusher import HttpPusher from synapse.push import PusherConfigException from synapse.util.logcontext import preserve_fn diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index e2f5eb7b29..aa05b3f023 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import AuthError, SynapseError from synapse.types import UserID -from base import ClientV1RestServlet, client_path_patterns +from .base import ClientV1RestServlet, client_path_patterns import logging diff --git a/synapse/rest/client/v1/initial_sync.py b/synapse/rest/client/v1/initial_sync.py index ad161bdbab..36c3520567 100644 --- a/synapse/rest/client/v1/initial_sync.py +++ b/synapse/rest/client/v1/initial_sync.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.streams.config import PaginationConfig -from base import ClientV1RestServlet, client_path_patterns +from .base import ClientV1RestServlet, client_path_patterns # TODO: Needs unit testing diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index c14e8af00e..f6902a60a8 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -19,7 +19,7 @@ from synapse.api.errors import SynapseError, LoginError, Codes from synapse.types import UserID from synapse.http.server import finish_request -from base import ClientV1RestServlet, client_path_patterns +from .base import ClientV1RestServlet, client_path_patterns import simplejson as json import urllib diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index 6d6d03c34c..040a7a7ffa 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, Codes from synapse.api.constants import LoginType -from base import ClientV1RestServlet, client_path_patterns +from .base import ClientV1RestServlet, client_path_patterns import synapse.util.stringutils as stringutils from synapse.util.async import run_on_reactor diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index cbf3673eff..4b7d198c52 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -16,7 +16,7 @@ """ This module contains REST servlets to do with rooms: /rooms/ """ from twisted.internet import defer -from base import ClientV1RestServlet, client_path_patterns +from .base import ClientV1RestServlet, client_path_patterns from synapse.api.errors import SynapseError, Codes, AuthError from synapse.streams.config import PaginationConfig from synapse.api.constants import EventTypes, Membership diff --git a/synapse/rest/client/v1/voip.py b/synapse/rest/client/v1/voip.py index ec4cf8db79..c40442f958 100644 --- a/synapse/rest/client/v1/voip.py +++ b/synapse/rest/client/v1/voip.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from base import ClientV1RestServlet, client_path_patterns +from .base import ClientV1RestServlet, client_path_patterns import hmac diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6f37a85d09..168eb27b03 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -45,7 +45,7 @@ from .search import SearchStore from .tags import TagsStore from .account_data import AccountDataStore -from util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator +from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator from synapse.api.constants import PresenceState from synapse.util.caches.stream_change_cache import StreamChangeCache diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 5dd32b1413..2e89066515 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from _base import SQLBaseStore +from ._base import SQLBaseStore class EndToEndKeyStore(SQLBaseStore): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 60936500d8..552e7ca35b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from _base import SQLBaseStore, _RollbackButIsFineException +from ._base import SQLBaseStore, _RollbackButIsFineException from twisted.internet import defer, reactor diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index fd05bfe54e..a495a8a7d9 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from _base import SQLBaseStore +from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks from twisted.internet import defer diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 0894384780..9d3ba32478 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from _base import SQLBaseStore +from ._base import SQLBaseStore class MediaRepositoryStore(SQLBaseStore): diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 70c6a06cd1..b10f2a5787 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from _base import SQLBaseStore +from ._base import SQLBaseStore from unpaddedbase64 import encode_base64 from synapse.crypto.event_signing import compute_event_reference_hash -- cgit 1.4.1 From aa11db5f119b9fa88242b0df95cfddd00e196ca1 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 11 Mar 2016 13:14:18 +0000 Subject: Fix cache invalidation so deleting access tokens (which we did when changing password) actually takes effect without HS restart. Reinstate the code to avoid logging out the session that changed the password, removed in 415c2f05491ce65a4fc34326519754cd1edd9c54 --- synapse/handlers/auth.py | 13 +++++++++---- synapse/push/pusherpool.py | 8 ++++---- synapse/rest/client/v2_alpha/account.py | 2 +- synapse/storage/registration.py | 28 ++++++++++++++++++++-------- 4 files changed, 34 insertions(+), 17 deletions(-) (limited to 'synapse/push') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 7a4afe446d..a740cc3da3 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -432,13 +432,18 @@ class AuthHandler(BaseHandler): ) @defer.inlineCallbacks - def set_password(self, user_id, newpassword): + def set_password(self, user_id, newpassword, requester=None): password_hash = self.hash(newpassword) + except_access_token_ids = [requester.access_token_id] if requester else [] + yield self.store.user_set_password_hash(user_id, password_hash) - yield self.store.user_delete_access_tokens(user_id) - yield self.hs.get_pusherpool().remove_pushers_by_user(user_id) - yield self.store.flush_user(user_id) + yield self.store.user_delete_access_tokens_except( + user_id, except_access_token_ids + ) + yield self.hs.get_pusherpool().remove_pushers_by_user_except_access_tokens( + user_id, except_access_token_ids + ) @defer.inlineCallbacks def add_threepid(self, user_id, medium, address, validated_at): diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 772a095f8b..28ec94d866 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -92,14 +92,14 @@ class PusherPool: yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) @defer.inlineCallbacks - def remove_pushers_by_user(self, user_id): + def remove_pushers_by_user_except_access_tokens(self, user_id, except_token_ids): all = yield self.store.get_all_pushers() logger.info( - "Removing all pushers for user %s", - user_id, + "Removing all pushers for user %s except access tokens ids %r", + user_id, except_token_ids ) for p in all: - if p['user_name'] == user_id: + if p['user_name'] == user_id and p['access_token'] not in except_token_ids: logger.info( "Removing pusher for app id %s, pushkey %s, user %s", p['app_id'], p['pushkey'], p['user_name'] diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 688b051580..dd4ea45588 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -79,7 +79,7 @@ class PasswordRestServlet(RestServlet): new_password = params['new_password'] yield self.auth_handler.set_password( - user_id, new_password + user_id, new_password, requester ) defer.returnValue((200, {})) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index aa49f53458..5eef7ebcc7 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -208,14 +208,26 @@ class RegistrationStore(SQLBaseStore): ) @defer.inlineCallbacks - def flush_user(self, user_id): - rows = yield self._execute( - 'flush_user', None, - "SELECT token FROM access_tokens WHERE user_id = ?", - user_id - ) - for r in rows: - self.get_user_by_access_token.invalidate((r,)) + def user_delete_access_tokens_except(self, user_id, except_token_ids): + def f(txn): + txn.execute( + "SELECT id, token FROM access_tokens WHERE user_id = ? LIMIT 50", + (user_id,) + ) + rows = txn.fetchall() + for r in rows: + if r[0] in except_token_ids: + continue + + txn.call_after(self.get_user_by_access_token.invalidate, (r[1],)) + txn.execute( + "DELETE FROM access_tokens WHERE id in (%s)" % ",".join( + ["?" for _ in rows] + ), [r[0] for r in rows] + ) + return len(rows) == 50 + while (yield self.runInteraction("user_delete_access_tokens_except", f)): + pass @cached() def get_user_by_access_token(self, token): -- cgit 1.4.1 From af59826a2fda31a6382f60659796a1c8db6f21ce Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 11 Mar 2016 14:34:09 +0000 Subject: Make select more sensible when dseleting access tokens, rename pusher deletion to match access token deletion and make exception arg optional. --- synapse/handlers/auth.py | 2 +- synapse/push/pusherpool.py | 2 +- synapse/storage/registration.py | 8 +++----- 3 files changed, 5 insertions(+), 7 deletions(-) (limited to 'synapse/push') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 43f2cdc2c4..5c0ea636bc 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -441,7 +441,7 @@ class AuthHandler(BaseHandler): yield self.store.user_delete_access_tokens( user_id, except_access_token_ids ) - yield self.hs.get_pusherpool().remove_pushers_by_user_except_access_tokens( + yield self.hs.get_pusherpool().remove_pushers_by_user( user_id, except_access_token_ids ) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 28ec94d866..0b463c6fdb 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -92,7 +92,7 @@ class PusherPool: yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) @defer.inlineCallbacks - def remove_pushers_by_user_except_access_tokens(self, user_id, except_token_ids): + def remove_pushers_by_user(self, user_id, except_token_ids=[]): all = yield self.store.get_all_pushers() logger.info( "Removing all pushers for user %s except access tokens ids %r", diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 266e29f5bc..3a050621d9 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -198,14 +198,12 @@ class RegistrationStore(SQLBaseStore): def user_delete_access_tokens(self, user_id, except_token_ids): def f(txn): txn.execute( - "SELECT id, token FROM access_tokens WHERE user_id = ? LIMIT 50", - (user_id,) + "SELECT id, token FROM access_tokens " + "WHERE user_id = ? AND id not in LIMIT 50", + (user_id,except_token_ids) ) rows = txn.fetchall() for r in rows: - if r[0] in except_token_ids: - continue - txn.call_after(self.get_user_by_access_token.invalidate, (r[1],)) txn.execute( "DELETE FROM access_tokens WHERE id in (%s)" % ",".join( -- cgit 1.4.1