diff options
author | Erik Johnston <erik@matrix.org> | 2015-06-19 11:50:34 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-06-19 11:50:34 +0100 |
commit | 63141e77e769c4f8693c681cd4eaf65d0852387b (patch) | |
tree | d28cf93a44d7cbbe1db4bead36ce4a28ab2bc3f9 | |
parent | Don't bother storing things we don't need (diff) | |
parent | Merge pull request #190 from matrix-org/erikj/syn-412 (diff) | |
download | synapse-63141e77e769c4f8693c681cd4eaf65d0852387b.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/persist_event_perf
-rw-r--r-- | AUTHORS.rst | 4 | ||||
-rw-r--r-- | synapse/api/auth.py | 2 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 106 | ||||
-rw-r--r-- | synapse/config/_base.py | 4 | ||||
-rw-r--r-- | synapse/config/repository.py | 5 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 1 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 2 | ||||
-rw-r--r-- | synapse/handlers/message.py | 52 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 28 | ||||
-rw-r--r-- | synapse/http/client.py | 18 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 128 | ||||
-rw-r--r-- | synapse/http/server.py | 78 | ||||
-rw-r--r-- | synapse/notifier.py | 127 | ||||
-rw-r--r-- | synapse/rest/client/v1/transactions.py | 6 | ||||
-rw-r--r-- | synapse/util/__init__.py | 8 | ||||
-rw-r--r-- | synapse/util/async.py | 16 | ||||
-rw-r--r-- | tests/handlers/test_appservice.py | 43 |
17 files changed, 388 insertions, 240 deletions
diff --git a/AUTHORS.rst b/AUTHORS.rst index 3a457cd9fc..d7224ff5de 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -38,3 +38,7 @@ Brabo <brabo at riseup.net> Ivan Shapovalov <intelfx100 at gmail.com> * contrib/systemd: a sample systemd unit file and a logger configuration + +Eric Myhre <hash at exultant.us> + * Fix bug where ``media_store_path`` config option was ignored by v0 content + repository API. diff --git a/synapse/api/auth.py b/synapse/api/auth.py index d5bf0be85c..4da62e5d8d 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -370,6 +370,8 @@ class Auth(object): user_agent=user_agent ) + request.authenticated_entity = user.to_string() + defer.returnValue((user, ClientInfo(device_id, token_id))) except KeyError: raise AuthError( diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 95e9122d3e..49e27c9e11 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -35,7 +35,6 @@ from twisted.enterprise import adbapi from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.web.static import File from twisted.web.server import Site, GzipEncoderFactory, Request -from twisted.web.http import proxiedLogFormatter, combinedLogFormatter from synapse.http.server import JsonResource, RootRedirect from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource @@ -61,10 +60,13 @@ import twisted.manhole.telnet import synapse +import contextlib import logging import os +import re import resource import subprocess +import time logger = logging.getLogger("synapse.app.homeserver") @@ -112,7 +114,7 @@ class SynapseHomeServer(HomeServer): def build_resource_for_content_repo(self): return ContentRepoResource( - self, self.upload_dir, self.auth, self.content_addr + self, self.config.uploads_path, self.auth, self.content_addr ) def build_resource_for_media_repository(self): @@ -142,6 +144,7 @@ class SynapseHomeServer(HomeServer): port = listener_config["port"] bind_address = listener_config.get("bind_address", "") tls = listener_config.get("tls", False) + site_tag = listener_config.get("tag", port) if tls and config.no_tls: return @@ -197,7 +200,8 @@ class SynapseHomeServer(HomeServer): reactor.listenSSL( port, SynapseSite( - "synapse.access.https", + "synapse.access.https.%s" % (site_tag,), + site_tag, listener_config, root_resource, ), @@ -208,7 +212,8 @@ class SynapseHomeServer(HomeServer): reactor.listenTCP( port, SynapseSite( - "synapse.access.https", + "synapse.access.http.%s" % (site_tag,), + site_tag, listener_config, root_resource, ), @@ -375,7 +380,6 @@ def setup(config_options): hs = SynapseHomeServer( config.server_name, - upload_dir=os.path.abspath("uploads"), db_config=config.database_config, tls_context_factory=tls_context_factory, config=config, @@ -433,9 +437,70 @@ class SynapseService(service.Service): return self._port.stopListening() -class XForwardedForRequest(Request): - def __init__(self, *args, **kw): +class SynapseRequest(Request): + def __init__(self, site, *args, **kw): Request.__init__(self, *args, **kw) + self.site = site + self.authenticated_entity = None + self.start_time = 0 + + def __repr__(self): + # We overwrite this so that we don't log ``access_token`` + return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % ( + self.__class__.__name__, + id(self), + self.method, + self.get_redacted_uri(), + self.clientproto, + self.site.site_tag, + ) + + def get_redacted_uri(self): + return re.sub( + r'(\?.*access_token=)[^&]*(.*)$', + r'\1<redacted>\2', + self.uri + ) + + def get_user_agent(self): + return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1] + + def started_processing(self): + self.site.access_logger.info( + "%s - %s - Received request: %s %s", + self.getClientIP(), + self.site.site_tag, + self.method, + self.get_redacted_uri() + ) + self.start_time = int(time.time() * 1000) + + def finished_processing(self): + self.site.access_logger.info( + "%s - %s - {%s}" + " Processed request: %dms %sB %s \"%s %s %s\" \"%s\"", + self.getClientIP(), + self.site.site_tag, + self.authenticated_entity, + int(time.time() * 1000) - self.start_time, + self.sentLength, + self.code, + self.method, + self.get_redacted_uri(), + self.clientproto, + self.get_user_agent(), + ) + + @contextlib.contextmanager + def processing(self): + self.started_processing() + yield + self.finished_processing() + + +class XForwardedForRequest(SynapseRequest): + def __init__(self, *args, **kw): + SynapseRequest.__init__(self, *args, **kw) """ Add a layer on top of another request that only uses the value of an @@ -451,8 +516,16 @@ class XForwardedForRequest(Request): b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip() -def XForwardedFactory(*args, **kwargs): - return XForwardedForRequest(*args, **kwargs) +class SynapseRequestFactory(object): + def __init__(self, site, x_forwarded_for): + self.site = site + self.x_forwarded_for = x_forwarded_for + + def __call__(self, *args, **kwargs): + if self.x_forwarded_for: + return XForwardedForRequest(self.site, *args, **kwargs) + else: + return SynapseRequest(self.site, *args, **kwargs) class SynapseSite(Site): @@ -460,18 +533,17 @@ class SynapseSite(Site): Subclass of a twisted http Site that does access logging with python's standard logging """ - def __init__(self, logger_name, config, resource, *args, **kwargs): + def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs): Site.__init__(self, resource, *args, **kwargs) - if config.get("x_forwarded", False): - self.requestFactory = XForwardedFactory - self._log_formatter = proxiedLogFormatter - else: - self._log_formatter = combinedLogFormatter + + self.site_tag = site_tag + + proxied = config.get("x_forwarded", False) + self.requestFactory = SynapseRequestFactory(self, proxied) self.access_logger = logging.getLogger(logger_name) def log(self, request): - line = self._log_formatter(self._logDateTime, request) - self.access_logger.info(line) + pass def create_resource_tree(desired_tree, redirect_root_to_web_client=True): diff --git a/synapse/config/_base.py b/synapse/config/_base.py index d4163d6272..d483c67c6a 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -148,7 +148,7 @@ class Config(object): if not config_args.config_path: config_parser.error( "Must supply a config file.\nA config file can be automatically" - " generated using \"--generate-config -h SERVER_NAME" + " generated using \"--generate-config -H SERVER_NAME" " -c CONFIG-FILE\"" ) @@ -209,7 +209,7 @@ class Config(object): if not config_args.config_path: config_parser.error( "Must supply a config file.\nA config file can be automatically" - " generated using \"--generate-config -h SERVER_NAME" + " generated using \"--generate-config -H SERVER_NAME" " -c CONFIG-FILE\"" ) diff --git a/synapse/config/repository.py b/synapse/config/repository.py index adaf4e4bb2..6891abd71d 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -21,13 +21,18 @@ class ContentRepositoryConfig(Config): self.max_upload_size = self.parse_size(config["max_upload_size"]) self.max_image_pixels = self.parse_size(config["max_image_pixels"]) self.media_store_path = self.ensure_directory(config["media_store_path"]) + self.uploads_path = self.ensure_directory(config["uploads_path"]) def default_config(self, config_dir_path, server_name): media_store = self.default_path("media_store") + uploads_path = self.default_path("uploads") return """ # Directory where uploaded images and attachments are stored. media_store_path: "%(media_store)s" + # Directory where in-progress uploads are stored. + uploads_path: "%(uploads_path)s" + # The largest allowed upload size in bytes max_upload_size: "10M" diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 31190e700a..bad93c6b2f 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -94,6 +94,7 @@ class TransportLayerServer(object): yield self.keyring.verify_json_for_server(origin, json_request) logger.info("Request from %s", origin) + request.authenticated_entity = origin defer.returnValue((origin, content)) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 8269482e47..1240e51649 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -177,7 +177,7 @@ class ApplicationServicesHandler(object): return user_info = yield self.store.get_user_by_id(user_id) - if not user_info: + if user_info: defer.returnValue(False) return diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 867fdbefb0..e324662f18 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -380,15 +380,6 @@ class MessageHandler(BaseHandler): if limit is None: limit = 10 - messages, token = yield self.store.get_recent_events_for_room( - room_id, - limit=limit, - end_token=now_token.room_key, - ) - - start_token = now_token.copy_and_replace("room_key", token[0]) - end_token = now_token.copy_and_replace("room_key", token[1]) - room_members = [ m for m in current_state.values() if m.type == EventTypes.Member @@ -396,19 +387,38 @@ class MessageHandler(BaseHandler): ] presence_handler = self.hs.get_handlers().presence_handler - presence = [] - for m in room_members: - try: - member_presence = yield presence_handler.get_state( - target_user=UserID.from_string(m.user_id), - auth_user=auth_user, - as_event=True, - ) - presence.append(member_presence) - except SynapseError: - logger.exception( - "Failed to get member presence of %r", m.user_id + + @defer.inlineCallbacks + def get_presence(): + presence_defs = yield defer.DeferredList( + [ + presence_handler.get_state( + target_user=UserID.from_string(m.user_id), + auth_user=auth_user, + as_event=True, + check_auth=False, + ) + for m in room_members + ], + consumeErrors=True, + ) + + defer.returnValue([p for success, p in presence_defs if success]) + + presence, (messages, token) = yield defer.gatherResults( + [ + get_presence(), + self.store.get_recent_events_for_room( + room_id, + limit=limit, + end_token=now_token.room_key, ) + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) + + start_token = now_token.copy_and_replace("room_key", token[0]) + end_token = now_token.copy_and_replace("room_key", token[1]) time_now = self.clock.time_msec() diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 023ad33ab0..7c03198313 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -191,24 +191,24 @@ class PresenceHandler(BaseHandler): defer.returnValue(False) @defer.inlineCallbacks - def get_state(self, target_user, auth_user, as_event=False): + def get_state(self, target_user, auth_user, as_event=False, check_auth=True): if self.hs.is_mine(target_user): - visible = yield self.is_presence_visible( - observer_user=auth_user, - observed_user=target_user - ) + if check_auth: + visible = yield self.is_presence_visible( + observer_user=auth_user, + observed_user=target_user + ) - if not visible: - raise SynapseError(404, "Presence information not visible") - state = yield self.store.get_presence_state(target_user.localpart) - if "mtime" in state: - del state["mtime"] - state["presence"] = state.pop("state") + if not visible: + raise SynapseError(404, "Presence information not visible") if target_user in self._user_cachemap: - cached_state = self._user_cachemap[target_user].get_state() - if "last_active" in cached_state: - state["last_active"] = cached_state["last_active"] + state = self._user_cachemap[target_user].get_state() + else: + state = yield self.store.get_presence_state(target_user.localpart) + if "mtime" in state: + del state["mtime"] + state["presence"] = state.pop("state") else: # TODO(paul): Have remote server send us permissions set state = self._get_or_offline_usercache(target_user).get_state() diff --git a/synapse/http/client.py b/synapse/http/client.py index e746f2416e..9091ae2d38 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -61,21 +61,31 @@ class SimpleHttpClient(object): self.agent = Agent(reactor, pool=pool) self.version_string = hs.version_string - def request(self, method, *args, **kwargs): + def request(self, method, uri, *args, **kwargs): # A small wrapper around self.agent.request() so we can easily attach # counters to it outgoing_requests_counter.inc(method) d = preserve_context_over_fn( self.agent.request, - method, *args, **kwargs + method, uri, *args, **kwargs ) + logger.info("Sending request %s %s", method, uri) + def _cb(response): incoming_responses_counter.inc(method, response.code) + logger.info( + "Received response to %s %s: %s", + method, uri, response.code + ) return response def _eb(failure): incoming_responses_counter.inc(method, "ERR") + logger.info( + "Error sending request to %s %s: %s %s", + method, uri, failure.type, failure.getErrorMessage() + ) return failure d.addCallbacks(_cb, _eb) @@ -84,7 +94,9 @@ class SimpleHttpClient(object): @defer.inlineCallbacks def post_urlencoded_get_json(self, uri, args={}): + # TODO: Do we ever want to log message contents? logger.debug("post_urlencoded_get_json args: %s", args) + query_bytes = urllib.urlencode(args, True) response = yield self.request( @@ -105,7 +117,7 @@ class SimpleHttpClient(object): def post_json_get_json(self, uri, post_json): json_str = encode_canonical_json(post_json) - logger.info("HTTP POST %s -> %s", json_str, uri) + logger.debug("HTTP POST %s -> %s", json_str, uri) response = yield self.request( "POST", diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 7f3d8fc884..1b90692731 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -35,11 +35,13 @@ from syutil.crypto.jsonsign import sign_json import simplejson as json import logging +import sys import urllib import urlparse logger = logging.getLogger(__name__) +outbound_logger = logging.getLogger("synapse.http.outbound") metrics = synapse.metrics.get_metrics_for(__name__) @@ -109,6 +111,8 @@ class MatrixFederationHttpClient(object): self.clock = hs.get_clock() self.version_string = hs.version_string + self._next_id = 1 + @defer.inlineCallbacks def _create_request(self, destination, method, path_bytes, body_callback, headers_dict={}, param_bytes=b"", @@ -123,16 +127,12 @@ class MatrixFederationHttpClient(object): ("", "", path_bytes, param_bytes, query_bytes, "",) ) - logger.info("Sending request to %s: %s %s", - destination, method, url_bytes) + txn_id = "%s-%s" % (method, self._next_id) + self._next_id = (self._next_id + 1) % (sys.maxint - 1) - logger.debug( - "Types: %s", - [ - type(destination), type(method), type(path_bytes), - type(param_bytes), - type(query_bytes) - ] + outbound_logger.info( + "{%s} [%s] Sending request: %s %s", + txn_id, destination, method, url_bytes ) # XXX: Would be much nicer to retry only at the transaction-layer @@ -141,63 +141,71 @@ class MatrixFederationHttpClient(object): endpoint = self._getEndpoint(reactor, destination) - while True: - producer = None - if body_callback: - producer = body_callback(method, url_bytes, headers_dict) - - try: - request_deferred = preserve_context_over_fn( - self.agent.request, - destination, - endpoint, - method, - path_bytes, - param_bytes, - query_bytes, - Headers(headers_dict), - producer - ) + log_result = None + try: + while True: + producer = None + if body_callback: + producer = body_callback(method, url_bytes, headers_dict) + + try: + request_deferred = preserve_context_over_fn( + self.agent.request, + destination, + endpoint, + method, + path_bytes, + param_bytes, + query_bytes, + Headers(headers_dict), + producer + ) - response = yield self.clock.time_bound_deferred( - request_deferred, - time_out=timeout/1000. if timeout else 60, - ) + response = yield self.clock.time_bound_deferred( + request_deferred, + time_out=timeout/1000. if timeout else 60, + ) + + log_result = "%d %s" % (response.code, response.phrase,) + break + except Exception as e: + if not retry_on_dns_fail and isinstance(e, DNSLookupError): + logger.warn( + "DNS Lookup failed to %s with %s", + destination, + e + ) + log_result = "DNS Lookup failed to %s with %s" % ( + destination, e + ) + raise - logger.debug("Got response to %s", method) - break - except Exception as e: - if not retry_on_dns_fail and isinstance(e, DNSLookupError): logger.warn( - "DNS Lookup failed to %s with %s", + "{%s} Sending request failed to %s: %s %s: %s - %s", + txn_id, destination, - e + method, + url_bytes, + type(e).__name__, + _flatten_response_never_received(e), ) - raise - - logger.warn( - "Sending request failed to %s: %s %s: %s - %s", - destination, - method, - url_bytes, - type(e).__name__, - _flatten_response_never_received(e), - ) - if retries_left and not timeout: - yield sleep(2 ** (5 - retries_left)) - retries_left -= 1 - else: - raise - - logger.info( - "Received response %d %s for %s: %s %s", - response.code, - response.phrase, - destination, - method, - url_bytes - ) + log_result = "%s - %s" % ( + type(e).__name__, _flatten_response_never_received(e), + ) + + if retries_left and not timeout: + yield sleep(2 ** (5 - retries_left)) + retries_left -= 1 + else: + raise + finally: + outbound_logger.info( + "{%s} [%s] Result: %s", + txn_id, + destination, + log_result, + ) if 200 <= response.code < 300: pass diff --git a/synapse/http/server.py b/synapse/http/server.py index ae8f3b3972..807ff95c65 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -79,53 +79,39 @@ def request_handler(request_handler): _next_request_id += 1 with LoggingContext(request_id) as request_context: request_context.request = request_id - code = None - start = self.clock.time_msec() - try: - logger.info( - "Received request: %s %s", - request.method, request.path - ) - d = request_handler(self, request) - with PreserveLoggingContext(): - yield d - code = request.code - except CodeMessageException as e: - code = e.code - if isinstance(e, SynapseError): - logger.info( - "%s SynapseError: %s - %s", request, code, e.msg + with request.processing(): + try: + d = request_handler(self, request) + with PreserveLoggingContext(): + yield d + except CodeMessageException as e: + code = e.code + if isinstance(e, SynapseError): + logger.info( + "%s SynapseError: %s - %s", request, code, e.msg + ) + else: + logger.exception(e) + outgoing_responses_counter.inc(request.method, str(code)) + respond_with_json( + request, code, cs_exception(e), send_cors=True, + pretty_print=_request_user_agent_is_curl(request), + version_string=self.version_string, + ) + except: + logger.exception( + "Failed handle request %s.%s on %r: %r", + request_handler.__module__, + request_handler.__name__, + self, + request + ) + respond_with_json( + request, + 500, + {"error": "Internal server error"}, + send_cors=True ) - else: - logger.exception(e) - outgoing_responses_counter.inc(request.method, str(code)) - respond_with_json( - request, code, cs_exception(e), send_cors=True, - pretty_print=_request_user_agent_is_curl(request), - version_string=self.version_string, - ) - except: - code = 500 - logger.exception( - "Failed handle request %s.%s on %r: %r", - request_handler.__module__, - request_handler.__name__, - self, - request - ) - respond_with_json( - request, - 500, - {"error": "Internal server error"}, - send_cors=True - ) - finally: - code = str(code) if code else "-" - end = self.clock.time_msec() - logger.info( - "Processed request: %dms %s %s %s", - end-start, code, request.method, request.path - ) return wrapped_request_handler diff --git a/synapse/notifier.py b/synapse/notifier.py index 078abfc56d..d6655f3f5a 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor +from synapse.util.async import run_on_reactor, ObservableDeferred from synapse.types import StreamToken import synapse.metrics @@ -45,21 +45,11 @@ class _NotificationListener(object): The events stream handler will have yielded to the deferred, so to notify the handler it is sufficient to resolve the deferred. """ + __slots__ = ["deferred"] def __init__(self, deferred): self.deferred = deferred - def notified(self): - return self.deferred.called - - def notify(self, token): - """ Inform whoever is listening about the new events. - """ - try: - self.deferred.callback(token) - except defer.AlreadyCalledError: - pass - class _NotifierUserStream(object): """This represents a user connected to the event stream. @@ -75,11 +65,12 @@ class _NotifierUserStream(object): appservice=None): self.user = str(user) self.appservice = appservice - self.listeners = set() self.rooms = set(rooms) self.current_token = current_token self.last_notified_ms = time_now_ms + self.notify_deferred = ObservableDeferred(defer.Deferred()) + def notify(self, stream_key, stream_id, time_now_ms): """Notify any listeners for this user of a new event from an event source. @@ -91,12 +82,10 @@ class _NotifierUserStream(object): self.current_token = self.current_token.copy_and_advance( stream_key, stream_id ) - if self.listeners: - self.last_notified_ms = time_now_ms - listeners = self.listeners - self.listeners = set() - for listener in listeners: - listener.notify(self.current_token) + self.last_notified_ms = time_now_ms + noify_deferred = self.notify_deferred + self.notify_deferred = ObservableDeferred(defer.Deferred()) + noify_deferred.callback(self.current_token) def remove(self, notifier): """ Remove this listener from all the indexes in the Notifier @@ -114,6 +103,18 @@ class _NotifierUserStream(object): self.appservice, set() ).discard(self) + def count_listeners(self): + return len(self.noify_deferred.observers()) + + def new_listener(self, token): + """Returns a deferred that is resolved when there is a new token + greater than the given token. + """ + if self.current_token.is_after(token): + return _NotificationListener(defer.succeed(self.current_token)) + else: + return _NotificationListener(self.notify_deferred.observe()) + class Notifier(object): """ This class is responsible for notifying any listeners when there are @@ -158,7 +159,7 @@ class Notifier(object): for x in self.appservice_to_user_streams.values(): all_user_streams |= x - return sum(len(stream.listeners) for stream in all_user_streams) + return sum(stream.count_listeners() for stream in all_user_streams) metrics.register_callback("listeners", count_listeners) metrics.register_callback( @@ -286,10 +287,6 @@ class Notifier(object): """Wait until the callback returns a non empty response or the timeout fires. """ - - deferred = defer.Deferred() - time_now_ms = self.clock.time_msec() - user = str(user) user_stream = self.user_to_user_stream.get(user) if user_stream is None: @@ -302,55 +299,44 @@ class Notifier(object): rooms=rooms, appservice=appservice, current_token=current_token, - time_now_ms=time_now_ms, + time_now_ms=self.clock.time_msec(), ) self._register_with_keys(user_stream) + + result = None + if timeout: + # Will be set to a _NotificationListener that we'll be waiting on. + # Allows us to cancel it. + listener = None + + def timed_out(): + if listener: + listener.deferred.cancel() + timer = self.clock.call_later(timeout/1000., timed_out) + + prev_token = from_token + while not result: + try: + current_token = user_stream.current_token + + result = yield callback(prev_token, current_token) + if result: + break + + # Now we wait for the _NotifierUserStream to be told there + # is a new token. + # We need to supply the token we supplied to callback so + # that we don't miss any current_token updates. + prev_token = current_token + listener = user_stream.new_listener(prev_token) + yield listener.deferred + except defer.CancelledError: + break + + self.clock.cancel_call_later(timer, ignore_errs=True) else: current_token = user_stream.current_token - - listener = [_NotificationListener(deferred)] - - if timeout and not current_token.is_after(from_token): - user_stream.listeners.add(listener[0]) - - if current_token.is_after(from_token): result = yield callback(from_token, current_token) - else: - result = None - - timer = [None] - - if result: - user_stream.listeners.discard(listener[0]) - defer.returnValue(result) - return - - if timeout: - timed_out = [False] - - def _timeout_listener(): - timed_out[0] = True - timer[0] = None - user_stream.listeners.discard(listener[0]) - listener[0].notify(current_token) - - # We create multiple notification listeners so we have to manage - # canceling the timeout ourselves. - timer[0] = self.clock.call_later(timeout/1000., _timeout_listener) - - while not result and not timed_out[0]: - new_token = yield deferred - deferred = defer.Deferred() - listener[0] = _NotificationListener(deferred) - user_stream.listeners.add(listener[0]) - result = yield callback(current_token, new_token) - current_token = new_token - - if timer[0] is not None: - try: - self.clock.cancel_call_later(timer[0]) - except: - logger.exception("Failed to cancel notifer timer") defer.returnValue(result) @@ -368,6 +354,9 @@ class Notifier(object): @defer.inlineCallbacks def check_for_updates(before_token, after_token): + if not after_token.is_after(before_token): + defer.returnValue(None) + events = [] end_token = from_token for name, source in self.event_sources.sources.items(): @@ -402,7 +391,7 @@ class Notifier(object): expired_streams = [] expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS for stream in self.user_to_user_stream.values(): - if stream.listeners: + if stream.count_listeners(): continue if stream.last_notified_ms < expire_before_ts: expired_streams.append(stream) diff --git a/synapse/rest/client/v1/transactions.py b/synapse/rest/client/v1/transactions.py index d933fea18a..b861069b89 100644 --- a/synapse/rest/client/v1/transactions.py +++ b/synapse/rest/client/v1/transactions.py @@ -39,10 +39,10 @@ class HttpTransactionStore(object): A tuple of (HTTP response code, response content) or None. """ try: - logger.debug("get_response Key: %s TxnId: %s", key, txn_id) + logger.debug("get_response TxnId: %s", txn_id) (last_txn_id, response) = self.transactions[key] if txn_id == last_txn_id: - logger.info("get_response: Returning a response for %s", key) + logger.info("get_response: Returning a response for %s", txn_id) return response except KeyError: pass @@ -58,7 +58,7 @@ class HttpTransactionStore(object): txn_id (str): The transaction ID for this request. response (tuple): A tuple of (HTTP response code, response content) """ - logger.debug("store_response Key: %s TxnId: %s", key, txn_id) + logger.debug("store_response TxnId: %s", txn_id) self.transactions[key] = (txn_id, response) def store_client_transaction(self, request, txn_id, response): diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 260714ccc2..07ff25cef3 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -91,8 +91,12 @@ class Clock(object): with PreserveLoggingContext(): return reactor.callLater(delay, wrapped_callback, *args, **kwargs) - def cancel_call_later(self, timer): - timer.cancel() + def cancel_call_later(self, timer, ignore_errs=False): + try: + timer.cancel() + except: + if not ignore_errs: + raise def time_bound_deferred(self, given_deferred, time_out): if given_deferred.called: diff --git a/synapse/util/async.py b/synapse/util/async.py index 1c2044e5b4..5a1d545c96 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -38,6 +38,9 @@ class ObservableDeferred(object): deferred. If consumeErrors is true errors will be captured from the origin deferred. + + Cancelling or otherwise resolving an observer will not affect the original + ObservableDeferred. """ __slots__ = ["_deferred", "_observers", "_result"] @@ -45,7 +48,7 @@ class ObservableDeferred(object): def __init__(self, deferred, consumeErrors=False): object.__setattr__(self, "_deferred", deferred) object.__setattr__(self, "_result", None) - object.__setattr__(self, "_observers", []) + object.__setattr__(self, "_observers", set()) def callback(r): self._result = (True, r) @@ -74,12 +77,21 @@ class ObservableDeferred(object): def observe(self): if not self._result: d = defer.Deferred() - self._observers.append(d) + + def remove(r): + self._observers.discard(d) + return r + d.addBoth(remove) + + self._observers.add(d) return d else: success, res = self._result return defer.succeed(res) if success else defer.fail(res) + def observers(self): + return self._observers + def __getattr__(self, name): return getattr(self._deferred, name) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 06cb1dd4cf..9e95d1e532 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -58,6 +58,49 @@ class AppServiceHandlerTestCase(unittest.TestCase): ) @defer.inlineCallbacks + def test_query_user_exists_unknown_user(self): + user_id = "@someone:anywhere" + services = [self._mkservice(is_interested=True)] + services[0].is_interested_in_user = Mock(return_value=True) + self.mock_store.get_app_services = Mock(return_value=services) + self.mock_store.get_user_by_id = Mock(return_value=None) + + event = Mock( + sender=user_id, + type="m.room.message", + room_id="!foo:bar" + ) + self.mock_as_api.push = Mock() + self.mock_as_api.query_user = Mock() + yield self.handler.notify_interested_services(event) + self.mock_as_api.query_user.assert_called_once_with( + services[0], user_id + ) + + @defer.inlineCallbacks + def test_query_user_exists_known_user(self): + user_id = "@someone:anywhere" + services = [self._mkservice(is_interested=True)] + services[0].is_interested_in_user = Mock(return_value=True) + self.mock_store.get_app_services = Mock(return_value=services) + self.mock_store.get_user_by_id = Mock(return_value={ + "name": user_id + }) + + event = Mock( + sender=user_id, + type="m.room.message", + room_id="!foo:bar" + ) + self.mock_as_api.push = Mock() + self.mock_as_api.query_user = Mock() + yield self.handler.notify_interested_services(event) + self.assertFalse( + self.mock_as_api.query_user.called, + "query_user called when it shouldn't have been." + ) + + @defer.inlineCallbacks def test_query_room_alias_exists(self): room_alias_str = "#foo:bar" room_alias = Mock() |