diff options
26 files changed, 825 insertions, 82 deletions
diff --git a/.gitignore b/.gitignore index 339a99e0d6..af90668c89 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,5 @@ graph/*.dot **/webclient/test/environment-protractor.js uploads + +.idea/ diff --git a/docs/code_style.rst b/docs/code_style.rst index d7e2d5e69e..dc40a7ab7b 100644 --- a/docs/code_style.rst +++ b/docs/code_style.rst @@ -1,10 +1,14 @@ Basically, PEP8 -- Max line width: 80 chars. +- NEVER tabs. 4 spaces to indent. +- Max line width: 79 chars (with flexibility to overflow by a "few chars" if + the overflowing content is not semantically significant and avoids an + explosion of vertical whitespace). - Use camel case for class and type names - Use underscores for functions and variables. - Use double quotes. -- Use parentheses instead of '\' for line continuation where ever possible (which is pretty much everywhere) +- Use parentheses instead of '\\' for line continuation where ever possible + (which is pretty much everywhere) - There should be max a single new line between: - statements - functions in a class @@ -14,5 +18,32 @@ Basically, PEP8 - a single space after a comma - a single space before and after for '=' when used as assignment - no spaces before and after for '=' for default values and keyword arguments. +- Indenting must follow PEP8; either hanging indent or multiline-visual indent + depending on the size and shape of the arguments and what makes more sense to + the author. In other words, both this:: -Comments should follow the google code style. This is so that we can generate documentation with sphinx (http://sphinxcontrib-napoleon.readthedocs.org/en/latest/) + print("I am a fish %s" % "moo") + + and this:: + + print("I am a fish %s" % + "moo") + + and this:: + + print( + "I am a fish %s" % + "moo" + ) + + ...are valid, although given each one takes up 2x more vertical space than + the previous, it's up to the author's discretion as to which layout makes most + sense for their function invocation. (e.g. if they want to add comments + per-argument, or put expressions in the arguments, or group related arguments + together, or want to deliberately extend or preserve vertical/horizontal + space) + +Comments should follow the google code style. This is so that we can generate +documentation with sphinx (http://sphinxcontrib-napoleon.readthedocs.org/en/latest/) + +Code should pass pep8 --max-line-length=100 without any warnings. diff --git a/setup.py b/setup.py index 9b38f790b9..9bee64e368 100755 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ setup( description="Reference Synapse Home Server", install_requires=[ "syutil==0.0.2", - "matrix_angular_sdk==0.5.1", + "matrix_angular_sdk==0.5.3b", "Twisted>=14.0.0", "service_identity>=1.0.0", "pyopenssl>=0.14", @@ -45,7 +45,7 @@ setup( dependency_links=[ "https://github.com/matrix-org/syutil/tarball/v0.0.2#egg=syutil-0.0.2", "https://github.com/pyca/pynacl/tarball/d4d3175589b892f6ea7c22f466e0e223853516fa#egg=pynacl-0.3.0", - "https://github.com/matrix-org/matrix-angular-sdk/tarball/v0.5.1/#egg=matrix_angular_sdk-0.5.1", + "https://github.com/matrix-org/matrix-angular-sdk/tarball/v0.5.3b/#egg=matrix_angular_sdk-0.5.3b", ], setup_requires=[ "setuptools_trial", @@ -59,6 +59,6 @@ setup( entry_points=""" [console_scripts] synctl=synapse.app.synctl:main - synapse-homeserver=synapse.app.homeserver:run + synapse-homeserver=synapse.app.homeserver:main """ ) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 01f87fe423..0cb632fb08 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -334,7 +334,7 @@ class ReplicationLayer(object): defer.returnValue(response) return - logger.debug("[%s] Transacition is new", transaction.transaction_id) + logger.debug("[%s] Transaction is new", transaction.transaction_id) with PreserveLoggingContext(): dl = [] @@ -685,6 +685,7 @@ class _TransactionQueue(object): self.transport_layer = transport_layer self._clock = hs.get_clock() + self.store = hs.get_datastore() # Is a mapping from destinations -> deferreds. Used to keep track # of which destinations have transactions in flight and when they are @@ -728,8 +729,14 @@ class _TransactionQueue(object): (pdu, deferred, order) ) + def eb(failure): + if not deferred.called: + deferred.errback(failure) + else: + logger.warn("Failed to send pdu", failure) + with PreserveLoggingContext(): - self._attempt_new_transaction(destination) + self._attempt_new_transaction(destination).addErrback(eb) deferreds.append(deferred) @@ -739,6 +746,9 @@ class _TransactionQueue(object): def enqueue_edu(self, edu): destination = edu.destination + if destination == self.server_name: + return + deferred = defer.Deferred() self.pending_edus_by_dest.setdefault(destination, []).append( (edu, deferred) @@ -748,7 +758,7 @@ class _TransactionQueue(object): if not deferred.called: deferred.errback(failure) else: - logger.exception("Failed to send edu", failure) + logger.warn("Failed to send edu", failure) with PreserveLoggingContext(): self._attempt_new_transaction(destination).addErrback(eb) @@ -770,10 +780,33 @@ class _TransactionQueue(object): @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): + + (retry_last_ts, retry_interval) = (0, 0) + retry_timings = yield self.store.get_destination_retry_timings( + destination + ) + if retry_timings: + (retry_last_ts, retry_interval) = ( + retry_timings.retry_last_ts, retry_timings.retry_interval + ) + if retry_last_ts + retry_interval > int(self._clock.time_msec()): + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, + ) + return + else: + logger.info("TX [%s] is ready for retry", destination) + if destination in self.pending_transactions: + # XXX: pending_transactions can get stuck on by a never-ending + # request at which point pending_pdus_by_dest just keeps growing. + # we need application-layer timeouts of some flavour of these + # requests return - # list of (pending_pdu, deferred, order) + # list of (pending_pdu, deferred, order) pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, []) @@ -781,7 +814,14 @@ class _TransactionQueue(object): if not pending_pdus and not pending_edus and not pending_failures: return - logger.debug("TX [%s] Attempting new transaction", destination) + logger.debug( + "TX [%s] Attempting new transaction " + "(pdus: %d, edus: %d, failures: %d)", + destination, + len(pending_pdus), + len(pending_edus), + len(pending_failures) + ) # Sort based on the order field pending_pdus.sort(key=lambda t: t[2]) @@ -814,7 +854,11 @@ class _TransactionQueue(object): yield self.transaction_actions.prepare_to_send(transaction) logger.debug("TX [%s] Persisted transaction", destination) - logger.debug("TX [%s] Sending transaction...", destination) + logger.info( + "TX [%s] Sending transaction [%s]", + destination, + transaction.transaction_id, + ) # Actually send the transaction @@ -835,6 +879,8 @@ class _TransactionQueue(object): transaction, json_data_cb ) + logger.info("TX [%s] got %d response", destination, code) + logger.debug("TX [%s] Sent transaction", destination) logger.debug("TX [%s] Marking as delivered...", destination) @@ -847,8 +893,14 @@ class _TransactionQueue(object): for deferred in deferreds: if code == 200: + if retry_last_ts: + # this host is alive! reset retry schedule + yield self.store.set_destination_retry_timings( + destination, 0, 0 + ) deferred.callback(None) else: + self.set_retrying(destination, retry_interval) deferred.errback(RuntimeError("Got status %d" % code)) # Ensures we don't continue until all callbacks on that @@ -861,11 +913,15 @@ class _TransactionQueue(object): logger.debug("TX [%s] Yielded to callbacks", destination) except Exception as e: - logger.error("TX Problem in _attempt_transaction") - # We capture this here as there as nothing actually listens # for this finishing functions deferred. - logger.exception(e) + logger.warn( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) + + self.set_retrying(destination, retry_interval) for deferred in deferreds: if not deferred.called: @@ -877,3 +933,22 @@ class _TransactionQueue(object): # Check to see if there is anything else to send. self._attempt_new_transaction(destination) + + @defer.inlineCallbacks + def set_retrying(self, destination, retry_interval): + # track that this destination is having problems and we should + # give it a chance to recover before trying it again + + if retry_interval: + retry_interval *= 2 + # plateau at hourly retries for now + if retry_interval >= 60 * 60 * 1000: + retry_interval = 60 * 60 * 1000 + else: + retry_interval = 2000 # try again at first after 2 seconds + + yield self.store.set_destination_retry_timings( + destination, + int(self._clock.time_msec()), + retry_interval + ) diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index 8d86152085..0f11c6d491 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -155,7 +155,7 @@ class TransportLayer(object): @defer.inlineCallbacks @log_function def send_transaction(self, transaction, json_data_callback=None): - """ Sends the given Transaction to it's destination + """ Sends the given Transaction to its destination Args: transaction (Transaction) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 925eb5376e..cfb5029774 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -207,6 +207,13 @@ class FederationHandler(BaseHandler): e.msg, affected=event.event_id, ) + + # if we're receiving valid events from an origin, + # it's probably a good idea to mark it as not in retry-state + # for sending (although this is a bit of a leap) + retry_timings = yield self.store.get_destination_retry_timings(origin) + if (retry_timings and retry_timings.retry_last_ts): + self.store.set_destination_retry_timings(origin, 0, 0) room = yield self.store.get_room(event.room_id) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 815d40f166..84a039489f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -659,10 +659,6 @@ class PresenceHandler(BaseHandler): if room_ids: logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids) - if not observers and not room_ids: - logger.debug(" | no interested observers or room IDs") - continue - state = dict(push) del state["user_id"] @@ -683,6 +679,10 @@ class PresenceHandler(BaseHandler): self._user_cachemap_latest_serial += 1 statuscache.update(state, serial=self._user_cachemap_latest_serial) + if not observers and not room_ids: + logger.debug(" | no interested observers or room IDs") + continue + self.push_update_to_clients( observed_user=user, users_to_push=observers, @@ -804,6 +804,7 @@ class PresenceEventSource(object): ) @defer.inlineCallbacks + @log_function def get_new_events_for_user(self, user, from_key, limit): from_key = int(from_key) @@ -816,7 +817,8 @@ class PresenceEventSource(object): # TODO(paul): use a DeferredList ? How to limit concurrency. for observed_user in cachemap.keys(): cached = cachemap[observed_user] - if not (from_key < cached.serial): + + if cached.serial <= from_key: continue if (yield self.is_visible(observer_user, observed_user)): diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a000b44036..c802e8f69d 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -260,6 +260,7 @@ class RoomMemberHandler(BaseHandler): self.distributor = hs.get_distributor() self.distributor.declare("user_joined_room") + self.distributor.declare("user_left_room") @defer.inlineCallbacks def get_room_members(self, room_id, membership=Membership.JOIN): @@ -387,6 +388,12 @@ class RoomMemberHandler(BaseHandler): do_auth=do_auth, ) + if prev_state and prev_state.membership == Membership.JOIN: + user = self.hs.parse_userid(event.user_id) + self.distributor.fire( + "user_left_room", user=user, room_id=event.room_id + ) + defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index d88a53242c..46a0b299a1 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -43,7 +43,23 @@ class TypingNotificationHandler(BaseHandler): self.federation.register_edu_handler("m.typing", self._recv_edu) - self._member_typing_until = {} + hs.get_distributor().observe("user_left_room", self.user_left_room) + + self._member_typing_until = {} # clock time we expect to stop + self._member_typing_timer = {} # deferreds to manage theabove + + # map room IDs to serial numbers + self._room_serials = {} + self._latest_room_serial = 0 + # map room IDs to sets of users currently typing + self._room_typing = {} + + def tearDown(self): + """Cancels all the pending timers. + Normally this shouldn't be needed, but it's required from unit tests + to avoid a "Reactor was unclean" warning.""" + for t in self._member_typing_timer.values(): + self.clock.cancel_call_later(t) @defer.inlineCallbacks def started_typing(self, target_user, auth_user, room_id, timeout): @@ -53,12 +69,24 @@ class TypingNotificationHandler(BaseHandler): if target_user != auth_user: raise AuthError(400, "Cannot set another user's typing state") + yield self.auth.check_joined_room(room_id, target_user.to_string()) + + logger.debug( + "%s has started typing in %s", target_user.to_string(), room_id + ) + until = self.clock.time_msec() + timeout member = RoomMember(room_id=room_id, user=target_user) was_present = member in self._member_typing_until + if member in self._member_typing_timer: + self.clock.cancel_call_later(self._member_typing_timer[member]) + self._member_typing_until[member] = until + self._member_typing_timer[member] = self.clock.call_later( + timeout / 1000, lambda: self._stopped_typing(member) + ) if was_present: # No point sending another notification @@ -78,18 +106,39 @@ class TypingNotificationHandler(BaseHandler): if target_user != auth_user: raise AuthError(400, "Cannot set another user's typing state") + yield self.auth.check_joined_room(room_id, target_user.to_string()) + + logger.debug( + "%s has stopped typing in %s", target_user.to_string(), room_id + ) + member = RoomMember(room_id=room_id, user=target_user) + yield self._stopped_typing(member) + + @defer.inlineCallbacks + def user_left_room(self, user, room_id): + if user.is_mine: + member = RoomMember(room_id=room_id, user=user) + yield self._stopped_typing(member) + + @defer.inlineCallbacks + def _stopped_typing(self, member): if member not in self._member_typing_until: # No point defer.returnValue(None) yield self._push_update( - room_id=room_id, - user=target_user, + room_id=member.room_id, + user=member.user, typing=False, ) + del self._member_typing_until[member] + + self.clock.cancel_call_later(self._member_typing_timer[member]) + del self._member_typing_timer[member] + @defer.inlineCallbacks def _push_update(self, room_id, user, typing): localusers = set() @@ -101,12 +150,11 @@ class TypingNotificationHandler(BaseHandler): ignore_user=user ) - for u in localusers: - self.push_update_to_clients( + if localusers: + self._push_update_local( room_id=room_id, - observer_user=u, - observed_user=user, - typing=typing, + user=user, + typing=typing ) deferreds = [] @@ -135,29 +183,65 @@ class TypingNotificationHandler(BaseHandler): room_id, localusers=localusers ) - for u in localusers: - self.push_update_to_clients( + if localusers: + self._push_update_local( room_id=room_id, - observer_user=u, - observed_user=user, + user=user, typing=content["typing"] ) - def push_update_to_clients(self, room_id, observer_user, observed_user, - typing): - # TODO(paul) steal this from presence.py - pass + def _push_update_local(self, room_id, user, typing): + if room_id not in self._room_serials: + self._room_serials[room_id] = 0 + self._room_typing[room_id] = set() + + room_set = self._room_typing[room_id] + if typing: + room_set.add(user) + elif user in room_set: + room_set.remove(user) + + self._latest_room_serial += 1 + self._room_serials[room_id] = self._latest_room_serial + + self.notifier.on_new_user_event(rooms=[room_id]) class TypingNotificationEventSource(object): def __init__(self, hs): self.hs = hs + self._handler = None + + def handler(self): + # Avoid cyclic dependency in handler setup + if not self._handler: + self._handler = self.hs.get_handlers().typing_notification_handler + return self._handler + + def _make_event_for(self, room_id): + typing = self.handler()._room_typing[room_id] + return { + "type": "m.typing", + "room_id": room_id, + "typing": [u.to_string() for u in typing], + } def get_new_events_for_user(self, user, from_key, limit): - return ([], from_key) + from_key = int(from_key) + handler = self.handler() + + events = [] + for room_id in handler._room_serials: + if handler._room_serials[room_id] <= from_key: + continue + + # TODO: check if user is in room + events.append(self._make_event_for(room_id)) + + return (events, handler._latest_room_serial) def get_current_key(self): - return 0 + return self.handler()._latest_room_serial def get_pagination_rows(self, user, pagination_config, key): return ([], pagination_config.from_key) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 510f07dd7b..fc5b5ab809 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -89,8 +89,8 @@ class MatrixFederationHttpClient(object): ("", "", path_bytes, param_bytes, query_bytes, "",) ) - logger.debug("Sending request to %s: %s %s", - destination, method, url_bytes) + logger.info("Sending request to %s: %s %s", + destination, method, url_bytes) logger.debug( "Types: %s", @@ -101,6 +101,8 @@ class MatrixFederationHttpClient(object): ] ) + # XXX: Would be much nicer to retry only at the transaction-layer + # (once we have reliable transactions in place) retries_left = 5 endpoint = self._getEndpoint(reactor, destination) @@ -127,11 +129,20 @@ class MatrixFederationHttpClient(object): break except Exception as e: if not retry_on_dns_fail and isinstance(e, DNSLookupError): - logger.warn("DNS Lookup failed to %s with %s", destination, - e) + logger.warn( + "DNS Lookup failed to %s with %s", + destination, + e + ) raise SynapseError(400, "Domain specified not found.") - logger.exception("Got error in _create_request") + logger.warn( + "Sending request failed to %s: %s %s : %s", + destination, + method, + url_bytes, + e + ) _print_ex(e) if retries_left: @@ -140,15 +151,21 @@ class MatrixFederationHttpClient(object): else: raise + logger.info( + "Received response %d %s for %s: %s %s", + response.code, + response.phrase, + destination, + method, + url_bytes + ) + if 200 <= response.code < 300: # We need to update the transactions table to say it was sent? pass else: # :'( # Update transactions table? - logger.error( - "Got response %d %s", response.code, response.phrase - ) raise CodeMessageException( response.code, response.phrase ) @@ -284,7 +301,7 @@ def _print_ex(e): for ex in e.reasons: _print_ex(ex) else: - logger.exception(e) + logger.warn(e) class _JsonProducer(object): diff --git a/synapse/rest/room.py b/synapse/rest/room.py index 3147d7a60b..e7f28c2784 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -483,6 +483,37 @@ class RoomRedactEventRestServlet(RestServlet): defer.returnValue(response) +class RoomTypingRestServlet(RestServlet): + PATTERN = client_path_pattern("/rooms/(?P<room_id>[^/]*)/typing/(?P<user_id>[^/]*)$") + + @defer.inlineCallbacks + def on_PUT(self, request, room_id, user_id): + auth_user = yield self.auth.get_user_by_req(request) + + room_id = urllib.unquote(room_id) + target_user = self.hs.parse_userid(urllib.unquote(user_id)) + + content = _parse_json(request) + + typing_handler = self.handlers.typing_notification_handler + + if content["typing"]: + yield typing_handler.started_typing( + target_user=target_user, + auth_user=auth_user, + room_id=room_id, + timeout=content.get("timeout", 30000), + ) + else: + yield typing_handler.stopped_typing( + target_user=target_user, + auth_user=auth_user, + room_id=room_id, + ) + + defer.returnValue((200, {})) + + def _parse_json(request): try: content = json.loads(request.content.read()) @@ -538,3 +569,4 @@ def register_servlets(hs, http_server): RoomStateRestServlet(hs).register(http_server) RoomInitialSyncRestServlet(hs).register(http_server) RoomRedactEventRestServlet(hs).register(http_server) + RoomTypingRestServlet(hs).register(http_server) diff --git a/synapse/rest/transactions.py b/synapse/rest/transactions.py index 93c0122f30..8c41ab4edb 100644 --- a/synapse/rest/transactions.py +++ b/synapse/rest/transactions.py @@ -19,7 +19,7 @@ import logging logger = logging.getLogger(__name__) - +# FIXME: elsewhere we use FooStore to indicate something in the storage layer... class HttpTransactionStore(object): def __init__(self): diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f15e3dfe62..04ab39341d 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -67,7 +67,7 @@ SCHEMAS = [ # Remember to update this number every time an incompatible change is made to # database schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 8 +SCHEMA_VERSION = 9 class _RollbackButIsFineException(Exception): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 4881f03368..e72200e2f7 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -650,7 +650,7 @@ class JoinHelper(object): to dump the results into. Attributes: - taples (list): List of `Table` classes + tables (list): List of `Table` classes EntryType (type) """ diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql new file mode 100644 index 0000000000..ad680c64da --- /dev/null +++ b/synapse/storage/schema/delta/v9.sql @@ -0,0 +1,23 @@ +/* Copyright 2014 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- To track destination health +CREATE TABLE IF NOT EXISTS destinations( + destination TEXT PRIMARY KEY, + retry_last_ts INTEGER, + retry_interval INTEGER +); + +PRAGMA user_version = 9; \ No newline at end of file diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/transactions.sql index 88e3e4e04d..de461bfa15 100644 --- a/synapse/storage/schema/transactions.sql +++ b/synapse/storage/schema/transactions.sql @@ -59,3 +59,9 @@ CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(tra CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination); +-- To track destination health +CREATE TABLE IF NOT EXISTS destinations( + destination TEXT PRIMARY KEY, + retry_last_ts INTEGER, + retry_interval INTEGER +); diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 00d0f48082..423cc3f02a 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -17,6 +17,8 @@ from ._base import SQLBaseStore, Table from collections import namedtuple +from twisted.internet import defer + import logging logger = logging.getLogger(__name__) @@ -26,6 +28,10 @@ class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ + # a write-through cache of DestinationsTable.EntryType indexed by + # destination string + destination_retry_cache = {} + def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -114,7 +120,7 @@ class TransactionStore(SQLBaseStore): def _prep_send_transaction(self, txn, transaction_id, destination, origin_server_ts): - # First we find out what the prev_txs should be. + # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. query = "%s ORDER BY id DESC LIMIT 1" % ( @@ -205,6 +211,92 @@ class TransactionStore(SQLBaseStore): return ReceivedTransactionsTable.decode_results(txn.fetchall()) + def get_destination_retry_timings(self, destination): + """Gets the current retry timings (if any) for a given destination. + + Args: + destination (str) + + Returns: + None if not retrying + Otherwise a DestinationsTable.EntryType for the retry scheme + """ + if destination in self.destination_retry_cache: + return defer.succeed(self.destination_retry_cache[destination]) + + return self.runInteraction( + "get_destination_retry_timings", + self._get_destination_retry_timings, destination) + + def _get_destination_retry_timings(cls, txn, destination): + query = DestinationsTable.select_statement("destination = ?") + txn.execute(query, (destination,)) + result = txn.fetchall() + if result: + result = DestinationsTable.decode_single_result(result) + if result.retry_last_ts > 0: + return result + else: + return None + + def set_destination_retry_timings(self, destination, + retry_last_ts, retry_interval): + """Sets the current retry timings for a given destination. + Both timings should be zero if retrying is no longer occuring. + + Args: + destination (str) + retry_last_ts (int) - time of last retry attempt in unix epoch ms + retry_interval (int) - how long until next retry in ms + """ + + self.destination_retry_cache[destination] = ( + DestinationsTable.EntryType( + destination, + retry_last_ts, + retry_interval + ) + ) + + # XXX: we could chose to not bother persisting this if our cache thinks + # this is a NOOP + return self.runInteraction( + "set_destination_retry_timings", + self._set_destination_retry_timings, + destination, + retry_last_ts, + retry_interval, + ) + + def _set_destination_retry_timings(cls, txn, destination, + retry_last_ts, retry_interval): + + query = ( + "INSERT OR REPLACE INTO %s " + "(destination, retry_last_ts, retry_interval) " + "VALUES (?, ?, ?) " + ) % DestinationsTable.table_name + + txn.execute(query, (destination, retry_last_ts, retry_interval)) + + def get_destinations_needing_retry(self): + """Get all destinations which are due a retry for sending a transaction. + + Returns: + list: A list of `DestinationsTable.EntryType` + """ + + return self.runInteraction( + "get_destinations_needing_retry", + self._get_destinations_needing_retry + ) + + def _get_destinations_needing_retry(cls, txn): + where = "retry_last_ts > 0 and retry_next_ts < now()" + query = DestinationsTable.select_statement(where) + txn.execute(query) + return DestinationsTable.decode_results(txn.fetchall()) + class ReceivedTransactionsTable(Table): table_name = "received_transactions" @@ -247,3 +339,15 @@ class TransactionsToPduTable(Table): ] EntryType = namedtuple("TransactionsToPduEntry", fields) + + +class DestinationsTable(Table): + table_name = "destinations" + + fields = [ + "destination", + "retry_last_ts", + "retry_interval", + ] + + EntryType = namedtuple("DestinationsEntry", fields) diff --git a/tests/federation/test_federation.py b/tests/federation/test_federation.py index 73dd289276..f6b41e2c4c 100644 --- a/tests/federation/test_federation.py +++ b/tests/federation/test_federation.py @@ -25,6 +25,7 @@ from synapse.server import HomeServer from synapse.federation import initialize_http_replication from synapse.api.events import SynapseEvent +from synapse.storage.transactions import DestinationsTable def make_pdu(prev_pdus=[], **kwargs): """Provide some default fields for making a PduTuple.""" @@ -55,10 +56,14 @@ class FederationTestCase(unittest.TestCase): "delivered_txn", "get_received_txn_response", "set_received_txn_response", + "get_destination_retry_timings", ]) self.mock_persistence.get_received_txn_response.return_value = ( defer.succeed(None) ) + self.mock_persistence.get_destination_retry_timings.return_value = ( + defer.succeed(DestinationsTable.EntryType("", 0, 0)) + ) self.mock_config = Mock() self.mock_config.signing_key = [MockKey()] self.clock = MockClock() diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 33016c16ef..fae33716a3 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -53,6 +53,8 @@ class FederationTestCase(unittest.TestCase): "persist_event", "store_room", "get_room", + "get_destination_retry_timings", + "set_destination_retry_timings", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index fe69ce47eb..b85a89052a 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -30,7 +30,7 @@ from synapse.api.constants import PresenceState from synapse.api.errors import SynapseError from synapse.handlers.presence import PresenceHandler, UserPresenceCache from synapse.streams.config import SourcePaginationConfig - +from synapse.storage.transactions import DestinationsTable OFFLINE = PresenceState.OFFLINE UNAVAILABLE = PresenceState.UNAVAILABLE @@ -528,6 +528,7 @@ class PresencePushTestCase(unittest.TestCase): "delivered_txn", "get_received_txn_response", "set_received_txn_response", + "get_destination_retry_timings", ]), handlers=None, resource_for_client=Mock(), @@ -539,6 +540,9 @@ class PresencePushTestCase(unittest.TestCase): hs.handlers = JustPresenceHandlers(hs) self.datastore = hs.get_datastore() + self.datastore.get_destination_retry_timings.return_value = ( + defer.succeed(DestinationsTable.EntryType("", 0, 0)) + ) def get_received_txn_response(*args): return defer.succeed(None) @@ -1037,6 +1041,7 @@ class PresencePollingTestCase(unittest.TestCase): "delivered_txn", "get_received_txn_response", "set_received_txn_response", + "get_destination_retry_timings", ]), handlers=None, resource_for_client=Mock(), @@ -1048,6 +1053,9 @@ class PresencePollingTestCase(unittest.TestCase): hs.handlers = JustPresenceHandlers(hs) self.datastore = hs.get_datastore() + self.datastore.get_destination_retry_timings.return_value = ( + defer.succeed(DestinationsTable.EntryType("", 0, 0)) + ) def get_received_txn_response(*args): return defer.succeed(None) diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index 0279ab703a..3a71ed0aed 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -222,14 +222,52 @@ class RoomMemberHandlerTestCase(unittest.TestCase): user=user, room_id=room_id ) - def _create_member(self, user_id, room_id): + @defer.inlineCallbacks + def test_simple_leave(self): + room_id = "!foo:red" + user_id = "@bob:red" + user = self.hs.parse_userid(user_id) + + event = self._create_member( + user_id=user_id, + room_id=room_id, + membership=Membership.LEAVE, + ) + + prev_state = NonCallableMock() + prev_state.membership = Membership.JOIN + prev_state.sender = user_id + self.datastore.get_room_member.return_value = defer.succeed(prev_state) + + event.state_events = { + (RoomMemberEvent.TYPE, user_id): event, + } + + event.old_state_events = { + (RoomMemberEvent.TYPE, user_id): self._create_member( + user_id=user_id, + room_id=room_id, + ), + } + + leave_signal_observer = Mock() + self.distributor.observe("user_left_room", leave_signal_observer) + + # Actual invocation + yield self.room_member_handler.change_membership(event) + + leave_signal_observer.assert_called_with( + user=user, room_id=room_id + ) + + def _create_member(self, user_id, room_id, membership=Membership.JOIN): return self.hs.get_event_factory().create_event( etype=RoomMemberEvent.TYPE, user_id=user_id, state_key=user_id, room_id=room_id, - membership=Membership.JOIN, - content={"membership": Membership.JOIN}, + membership=membership, + content={"membership": membership}, ) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index adb5148351..bc19db8dfa 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -22,9 +22,12 @@ import json from ..utils import MockHttpResource, MockClock, DeferredMockCallable, MockKey +from synapse.api.errors import AuthError from synapse.server import HomeServer from synapse.handlers.typing import TypingNotificationHandler +from synapse.storage.transactions import DestinationsTable + def _expect_edu(destination, edu_type, content, origin="test"): return { @@ -63,7 +66,13 @@ class TypingNotificationsTestCase(unittest.TestCase): self.mock_config = Mock() self.mock_config.signing_key = [MockKey()] + mock_notifier = Mock(spec=["on_new_user_event"]) + self.on_new_user_event = mock_notifier.on_new_user_event + + self.auth = Mock(spec=[]) + hs = HomeServer("test", + auth=self.auth, clock=self.clock, db_pool=None, datastore=Mock(spec=[ @@ -72,8 +81,10 @@ class TypingNotificationsTestCase(unittest.TestCase): "delivered_txn", "get_received_txn_response", "set_received_txn_response", + "get_destination_retry_timings", ]), handlers=None, + notifier=mock_notifier, resource_for_client=Mock(), resource_for_federation=self.mock_federation_resource, http_client=self.mock_http_client, @@ -82,13 +93,14 @@ class TypingNotificationsTestCase(unittest.TestCase): ) hs.handlers = JustTypingNotificationHandlers(hs) - self.mock_update_client = Mock() - self.mock_update_client.return_value = defer.succeed(None) - self.handler = hs.get_handlers().typing_notification_handler - self.handler.push_update_to_clients = self.mock_update_client + + self.event_source = hs.get_event_sources().sources["typing"] self.datastore = hs.get_datastore() + self.datastore.get_destination_retry_timings.return_value = ( + defer.succeed(DestinationsTable.EntryType("", 0, 0)) + ) def get_received_txn_response(*args): return defer.succeed(None) @@ -134,6 +146,12 @@ class TypingNotificationsTestCase(unittest.TestCase): self.room_member_handler.fetch_room_distributions_into = ( fetch_room_distributions_into) + def check_joined_room(room_id, user_id): + if user_id not in [u.to_string() for u in self.room_members]: + raise AuthError(401, "User is not in the room") + + self.auth.check_joined_room = check_joined_room + # Some local users to test with self.u_apple = hs.parse_userid("@apple:test") self.u_banana = hs.parse_userid("@banana:test") @@ -145,6 +163,8 @@ class TypingNotificationsTestCase(unittest.TestCase): def test_started_typing_local(self): self.room_members = [self.u_apple, self.u_banana] + self.assertEquals(self.event_source.get_current_key(), 0) + yield self.handler.started_typing( target_user=self.u_apple, auth_user=self.u_apple, @@ -152,13 +172,20 @@ class TypingNotificationsTestCase(unittest.TestCase): timeout=20000, ) - self.mock_update_client.assert_has_calls([ - call(observer_user=self.u_banana, - observed_user=self.u_apple, - room_id=self.room_id, - typing=True), + self.on_new_user_event.assert_has_calls([ + call(rooms=[self.room_id]), ]) + self.assertEquals(self.event_source.get_current_key(), 1) + self.assertEquals( + self.event_source.get_new_events_for_user(self.u_apple, 0, None)[0], + [ + {"type": "m.typing", + "room_id": self.room_id, + "typing": [self.u_apple.to_string()]}, + ] + ) + @defer.inlineCallbacks def test_started_typing_remote_send(self): self.room_members = [self.u_apple, self.u_onion] @@ -192,6 +219,8 @@ class TypingNotificationsTestCase(unittest.TestCase): def test_started_typing_remote_recv(self): self.room_members = [self.u_apple, self.u_onion] + self.assertEquals(self.event_source.get_current_key(), 0) + yield self.mock_federation_resource.trigger("PUT", "/_matrix/federation/v1/send/1000000/", _make_edu_json("farm", "m.typing", @@ -203,13 +232,20 @@ class TypingNotificationsTestCase(unittest.TestCase): ) ) - self.mock_update_client.assert_has_calls([ - call(observer_user=self.u_apple, - observed_user=self.u_onion, - room_id=self.room_id, - typing=True), + self.on_new_user_event.assert_has_calls([ + call(rooms=[self.room_id]), ]) + self.assertEquals(self.event_source.get_current_key(), 1) + self.assertEquals( + self.event_source.get_new_events_for_user(self.u_apple, 0, None)[0], + [ + {"type": "m.typing", + "room_id": self.room_id, + "typing": [self.u_onion.to_string()]}, + ] + ) + @defer.inlineCallbacks def test_stopped_typing(self): self.room_members = [self.u_apple, self.u_banana, self.u_onion] @@ -232,9 +268,14 @@ class TypingNotificationsTestCase(unittest.TestCase): # Gut-wrenching from synapse.handlers.typing import RoomMember - self.handler._member_typing_until[ - RoomMember(self.room_id, self.u_apple) - ] = 1002000 + member = RoomMember(self.room_id, self.u_apple) + self.handler._member_typing_until[member] = 1002000 + self.handler._member_typing_timer[member] = ( + self.clock.call_later(1002, lambda: 0) + ) + self.handler._room_typing[self.room_id] = set((self.u_apple,)) + + self.assertEquals(self.event_source.get_current_key(), 0) yield self.handler.stopped_typing( target_user=self.u_apple, @@ -242,11 +283,62 @@ class TypingNotificationsTestCase(unittest.TestCase): room_id=self.room_id, ) - self.mock_update_client.assert_has_calls([ - call(observer_user=self.u_banana, - observed_user=self.u_apple, - room_id=self.room_id, - typing=False), + self.on_new_user_event.assert_has_calls([ + call(rooms=[self.room_id]), ]) yield put_json.await_calls() + + self.assertEquals(self.event_source.get_current_key(), 1) + self.assertEquals( + self.event_source.get_new_events_for_user(self.u_apple, 0, None)[0], + [ + {"type": "m.typing", + "room_id": self.room_id, + "typing": []}, + ] + ) + + @defer.inlineCallbacks + def test_typing_timeout(self): + self.room_members = [self.u_apple, self.u_banana] + + self.assertEquals(self.event_source.get_current_key(), 0) + + yield self.handler.started_typing( + target_user=self.u_apple, + auth_user=self.u_apple, + room_id=self.room_id, + timeout=10000, + ) + + self.on_new_user_event.assert_has_calls([ + call(rooms=[self.room_id]), + ]) + self.on_new_user_event.reset_mock() + + self.assertEquals(self.event_source.get_current_key(), 1) + self.assertEquals( + self.event_source.get_new_events_for_user(self.u_apple, 0, None)[0], + [ + {"type": "m.typing", + "room_id": self.room_id, + "typing": [self.u_apple.to_string()]}, + ] + ) + + self.clock.advance_time(11) + + self.on_new_user_event.assert_has_calls([ + call(rooms=[self.room_id]), + ]) + + self.assertEquals(self.event_source.get_current_key(), 2) + self.assertEquals( + self.event_source.get_new_events_for_user(self.u_apple, 1, None)[0], + [ + {"type": "m.typing", + "room_id": self.room_id, + "typing": []}, + ] + ) diff --git a/tests/rest/test_rooms.py b/tests/rest/test_rooms.py index ff7c9f0530..1f719beb0a 100644 --- a/tests/rest/test_rooms.py +++ b/tests/rest/test_rooms.py @@ -1066,7 +1066,3 @@ class RoomInitialSyncTestCase(RestTestCase): } self.assertTrue(self.user_id in presence_by_user) self.assertEquals("m.presence", presence_by_user[self.user_id]["type"]) - -# (code, response) = yield self.mock_resource.trigger("GET", path, None) -# self.assertEquals(200, code, msg=str(response)) -# self.assert_dict(json.loads(content), response) diff --git a/tests/rest/test_typing.py b/tests/rest/test_typing.py new file mode 100644 index 0000000000..0b95d70718 --- /dev/null +++ b/tests/rest/test_typing.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests REST events for /rooms paths.""" + +# twisted imports +from twisted.internet import defer + +import synapse.rest.room +from synapse.server import HomeServer + +from ..utils import MockHttpResource, SQLiteMemoryDbPool, MockKey +from .utils import RestTestCase + +from mock import Mock, NonCallableMock + + +PATH_PREFIX = "/_matrix/client/api/v1" + + +class RoomTypingTestCase(RestTestCase): + """ Tests /rooms/$room_id/typing/$user_id REST API. """ + user_id = "@sid:red" + + @defer.inlineCallbacks + def setUp(self): + self.mock_resource = MockHttpResource(prefix=PATH_PREFIX) + self.auth_user_id = self.user_id + + self.mock_config = NonCallableMock() + self.mock_config.signing_key = [MockKey()] + + db_pool = SQLiteMemoryDbPool() + yield db_pool.prepare() + + hs = HomeServer( + "red", + db_pool=db_pool, + http_client=None, + replication_layer=Mock(), + ratelimiter=NonCallableMock(spec_set=[ + "send_message", + ]), + config=self.mock_config, + ) + self.hs = hs + + self.event_source = hs.get_event_sources().sources["typing"] + + self.ratelimiter = hs.get_ratelimiter() + self.ratelimiter.send_message.return_value = (True, 0) + + hs.get_handlers().federation_handler = Mock() + + def _get_user_by_token(token=None): + return { + "user": hs.parse_userid(self.auth_user_id), + "admin": False, + "device_id": None, + } + + hs.get_auth().get_user_by_token = _get_user_by_token + + def _insert_client_ip(*args, **kwargs): + return defer.succeed(None) + hs.get_datastore().insert_client_ip = _insert_client_ip + + synapse.rest.room.register_servlets(hs, self.mock_resource) + + self.room_id = yield self.create_room_as(self.user_id) + # Need another user to make notifications actually work + yield self.join(self.room_id, user="@jim:red") + + def tearDown(self): + self.hs.get_handlers().typing_notification_handler.tearDown() + + @defer.inlineCallbacks + def test_set_typing(self): + (code, _) = yield self.mock_resource.trigger("PUT", + "/rooms/%s/typing/%s" % (self.room_id, self.user_id), + '{"typing": true, "timeout": 30000}' + ) + self.assertEquals(200, code) + + self.assertEquals(self.event_source.get_current_key(), 1) + self.assertEquals( + self.event_source.get_new_events_for_user(self.user_id, 0, None)[0], + [ + {"type": "m.typing", + "room_id": self.room_id, + "typing": [self.user_id]}, + ] + ) + + @defer.inlineCallbacks + def test_set_not_typing(self): + (code, _) = yield self.mock_resource.trigger("PUT", + "/rooms/%s/typing/%s" % (self.room_id, self.user_id), + '{"typing": false}' + ) + self.assertEquals(200, code) diff --git a/tests/test_test_utils.py b/tests/test_test_utils.py new file mode 100644 index 0000000000..b42787dd25 --- /dev/null +++ b/tests/test_test_utils.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from tests import unittest + +from tests.utils import MockClock + +class MockClockTestCase(unittest.TestCase): + + def setUp(self): + self.clock = MockClock() + + def test_advance_time(self): + start_time = self.clock.time() + + self.clock.advance_time(20) + + self.assertEquals(20, self.clock.time() - start_time) + + def test_later(self): + invoked = [0, 0] + + def _cb0(): + invoked[0] = 1 + self.clock.call_later(10, _cb0) + + def _cb1(): + invoked[1] = 1 + self.clock.call_later(20, _cb1) + + self.assertFalse(invoked[0]) + + self.clock.advance_time(15) + + self.assertTrue(invoked[0]) + self.assertFalse(invoked[1]) + + self.clock.advance_time(5) + + self.assertTrue(invoked[1]) + + def test_cancel_later(self): + invoked = [0, 0] + + def _cb0(): + invoked[0] = 1 + t0 = self.clock.call_later(10, _cb0) + + def _cb1(): + invoked[1] = 1 + t1 = self.clock.call_later(20, _cb1) + + self.clock.cancel_call_later(t0) + + self.clock.advance_time(30) + + self.assertFalse(invoked[0]) + self.assertTrue(invoked[1]) diff --git a/tests/utils.py b/tests/utils.py index d8be73dba8..f9a34748cd 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -18,6 +18,8 @@ from synapse.api.errors import cs_error, CodeMessageException, StoreError from synapse.api.constants import Membership from synapse.storage import prepare_database +from synapse.util.logcontext import LoggingContext + from synapse.api.events.room import ( RoomMemberEvent, MessageEvent ) @@ -134,16 +136,43 @@ class MockKey(object): class MockClock(object): now = 1000 + def __init__(self): + # list of tuples of (absolute_time, callback) in no particular order + self.timers = [] + def time(self): return self.now def time_msec(self): return self.time() * 1000 + def call_later(self, delay, callback): + current_context = LoggingContext.current_context() + + def wrapped_callback(): + LoggingContext.thread_local.current_context = current_context + callback() + + t = (self.now + delay, wrapped_callback) + self.timers.append(t) + return t + + def cancel_call_later(self, timer): + self.timers = [t for t in self.timers if t != timer] + # For unit testing def advance_time(self, secs): self.now += secs + timers = self.timers + self.timers = [] + + for time, callback in timers: + if self.now >= time: + callback() + else: + self.timers.append((time, callback)) + class SQLiteMemoryDbPool(ConnectionPool, object): def __init__(self): |