diff options
59 files changed, 1036 insertions, 463 deletions
diff --git a/.gitignore b/.gitignore index 1718185384..3b2252ad8a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,11 @@ *.pyc .*.swp *~ +*.lock .DS_Store _trial_temp/ +_trial_temp*/ logs/ dbs/ *.egg diff --git a/.travis.yml b/.travis.yml index b3ee66da8f..b6faca4b92 100644 --- a/.travis.yml +++ b/.travis.yml @@ -32,6 +32,11 @@ matrix: env: TOX_ENV=py36 - python: 3.6 + env: TOX_ENV=py36-postgres TRIAL_FLAGS="-j 4" + services: + - postgresql + + - python: 3.6 env: TOX_ENV=check_isort - python: 3.6 diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index f9de78a460..6ef7d48dc7 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -30,12 +30,28 @@ use github's pull request workflow to review the contribution, and either ask you to make any refinements needed or merge it and make them ourselves. The changes will then land on master when we next do a release. -We use `Jenkins <http://matrix.org/jenkins>`_ and -`Travis <https://travis-ci.org/matrix-org/synapse>`_ for continuous -integration. All pull requests to synapse get automatically tested by Travis; -the Jenkins builds require an adminstrator to start them. If your change -breaks the build, this will be shown in github, so please keep an eye on the -pull request for feedback. +We use `CircleCI <https://circleci.com/gh/matrix-org>`_ and `Travis CI +<https://travis-ci.org/matrix-org/synapse>`_ for continuous integration. All +pull requests to synapse get automatically tested by Travis and CircleCI. +If your change breaks the build, this will be shown in GitHub, so please +keep an eye on the pull request for feedback. + +To run unit tests in a local development environment, you can use: + +- ``tox -e py27`` (requires tox to be installed by ``pip install tox``) for + SQLite-backed Synapse on Python 2.7. +- ``tox -e py35`` for SQLite-backed Synapse on Python 3.5. +- ``tox -e py36`` for SQLite-backed Synapse on Python 3.6. +- ``tox -e py27-postgres`` for PostgreSQL-backed Synapse on Python 2.7 + (requires a running local PostgreSQL with access to create databases). +- ``./test_postgresql.sh`` for PostgreSQL-backed Synapse on Python 2.7 + (requires Docker). Entirely self-contained, recommended if you don't want to + set up PostgreSQL yourself. + +Docker images are available for running the integration tests (SyTest) locally, +see the `documentation in the SyTest repo +<https://github.com/matrix-org/sytest/blob/develop/docker/README.md>`_ for more +information. Code style ~~~~~~~~~~ @@ -77,7 +93,8 @@ AUTHORS.rst file for the project in question. Please feel free to include a change to AUTHORS.rst in your pull request to list yourself and a short description of the area(s) you've worked on. Also, we sometimes have swag to give away to contributors - if you feel that Matrix-branded apparel is missing -from your life, please mail us your shipping address to matrix at matrix.org and we'll try to fix it :) +from your life, please mail us your shipping address to matrix at matrix.org and +we'll try to fix it :) Sign off ~~~~~~~~ @@ -144,4 +161,9 @@ flag to ``git commit``, which uses the name and email set in your Conclusion ~~~~~~~~~~ -That's it! Matrix is a very open and collaborative project as you might expect given our obsession with open communication. If we're going to successfully matrix together all the fragmented communication technologies out there we are reliant on contributions and collaboration from the community to do so. So please get involved - and we hope you have as much fun hacking on Matrix as we do! +That's it! Matrix is a very open and collaborative project as you might expect +given our obsession with open communication. If we're going to successfully +matrix together all the fragmented communication technologies out there we are +reliant on contributions and collaboration from the community to do so. So +please get involved - and we hope you have as much fun hacking on Matrix as we +do! diff --git a/MANIFEST.in b/MANIFEST.in index e0826ba544..47ae5a77b9 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -28,6 +28,7 @@ exclude jenkins*.sh exclude jenkins* exclude Dockerfile exclude .dockerignore +exclude test_postgresql.sh recursive-exclude jenkins *.sh include pyproject.toml diff --git a/README.rst b/README.rst index cfcf8b5219..5547f617ba 100644 --- a/README.rst +++ b/README.rst @@ -157,7 +157,7 @@ if you prefer. In case of problems, please see the _`Troubleshooting` section below. -There is an offical synapse image available at +There is an offical synapse image available at https://hub.docker.com/r/matrixdotorg/synapse/tags/ which can be used with the docker-compose file available at `contrib/docker <contrib/docker>`_. Further information on this including configuration options is available in the README on @@ -459,37 +459,13 @@ https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/services/misc/matrix- Windows Install --------------- -Synapse can be installed on Cygwin. It requires the following Cygwin packages: - -- gcc -- git -- libffi-devel -- openssl (and openssl-devel, python-openssl) -- python -- python-setuptools - -The content repository requires additional packages and will be unable to process -uploads without them: - -- libjpeg8 -- libjpeg8-devel -- zlib - -If you choose to install Synapse without these packages, you will need to reinstall -``pillow`` for changes to be applied, e.g. ``pip uninstall pillow`` ``pip install -pillow --user`` - -Troubleshooting: - -- You may need to upgrade ``setuptools`` to get this to work correctly: - ``pip install setuptools --upgrade``. -- You may encounter errors indicating that ``ffi.h`` is missing, even with - ``libffi-devel`` installed. If you do, copy the ``.h`` files: - ``cp /usr/lib/libffi-3.0.13/include/*.h /usr/include`` -- You may need to install libsodium from source in order to install PyNacl. If - you do, you may need to create a symlink to ``libsodium.a`` so ``ld`` can find - it: ``ln -s /usr/local/lib/libsodium.a /usr/lib/libsodium.a`` +If you wish to run or develop Synapse on Windows, the Windows Subsystem For +Linux provides a Linux environment on Windows 10 which is capable of using the +Debian, Fedora, or source installation methods. More information about WSL can +be found at https://docs.microsoft.com/en-us/windows/wsl/install-win10 for +Windows 10 and https://docs.microsoft.com/en-us/windows/wsl/install-on-server +for Windows Server. Troubleshooting =============== @@ -908,7 +884,7 @@ to install using pip and a virtualenv:: virtualenv -p python2.7 env source env/bin/activate - python synapse/python_dependencies.py | xargs pip install + python -m synapse.python_dependencies | xargs pip install pip install lxml mock This will run a process of downloading and installing all the needed @@ -968,7 +944,7 @@ improvement in overall amount, and especially in terms of giving back RAM to the OS. To use it, the library must simply be put in the LD_PRELOAD environment variable when launching Synapse. On Debian, this can be done by installing the ``libjemalloc1`` package and adding this line to -``/etc/default/matrix-synaspse``:: +``/etc/default/matrix-synapse``:: LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.1 diff --git a/changelog.d/3699.misc b/changelog.d/3699.misc new file mode 100644 index 0000000000..437efbd98f --- /dev/null +++ b/changelog.d/3699.misc @@ -0,0 +1,2 @@ +Unit tests can now be run under PostgreSQL in Docker using +``test_postgresql.sh``. diff --git a/changelog.d/3868.bugfix b/changelog.d/3868.bugfix new file mode 100644 index 0000000000..99da8389a5 --- /dev/null +++ b/changelog.d/3868.bugfix @@ -0,0 +1 @@ +Fix broken invite email links for self hosted riots diff --git a/changelog.d/3873.misc b/changelog.d/3873.misc new file mode 100644 index 0000000000..8104b5c085 --- /dev/null +++ b/changelog.d/3873.misc @@ -0,0 +1,2 @@ +Remove documentation regarding installation on Cygwin, the use of WSL is +recommended instead. diff --git a/changelog.d/3879.bugfix b/changelog.d/3879.bugfix new file mode 100644 index 0000000000..82cb2a8ebc --- /dev/null +++ b/changelog.d/3879.bugfix @@ -0,0 +1 @@ +Don't ratelimit autojoins diff --git a/changelog.d/3883.feature b/changelog.d/3883.feature new file mode 100644 index 0000000000..c11e5c5309 --- /dev/null +++ b/changelog.d/3883.feature @@ -0,0 +1 @@ +Adding the ability to change MAX_UPLOAD_SIZE for the docker container variables. \ No newline at end of file diff --git a/changelog.d/3889.bugfix b/changelog.d/3889.bugfix new file mode 100644 index 0000000000..e976425987 --- /dev/null +++ b/changelog.d/3889.bugfix @@ -0,0 +1 @@ +Fix 500 error when deleting unknown room alias diff --git a/changelog.d/3892.bugfix b/changelog.d/3892.bugfix new file mode 100644 index 0000000000..8b30afab04 --- /dev/null +++ b/changelog.d/3892.bugfix @@ -0,0 +1 @@ +Fix some b'abcd' noise in logs and metrics diff --git a/changelog.d/3894.feature b/changelog.d/3894.feature new file mode 100644 index 0000000000..1ed0cccdb2 --- /dev/null +++ b/changelog.d/3894.feature @@ -0,0 +1 @@ +Report "python_version" in the phone home stats diff --git a/changelog.d/3895.bugfix b/changelog.d/3895.bugfix new file mode 100644 index 0000000000..8b30afab04 --- /dev/null +++ b/changelog.d/3895.bugfix @@ -0,0 +1 @@ +Fix some b'abcd' noise in logs and metrics diff --git a/changelog.d/3897.misc b/changelog.d/3897.misc new file mode 100644 index 0000000000..87e7ac796e --- /dev/null +++ b/changelog.d/3897.misc @@ -0,0 +1 @@ +Fix typo in README, synaspse -> synapse \ No newline at end of file diff --git a/changelog.d/3899.bugfix b/changelog.d/3899.bugfix new file mode 100644 index 0000000000..5120e3a823 --- /dev/null +++ b/changelog.d/3899.bugfix @@ -0,0 +1 @@ +When we join a room, always try the server we used for the alias lookup first, to avoid unresponsive and out-of-date servers. diff --git a/changelog.d/3903.misc b/changelog.d/3903.misc new file mode 100644 index 0000000000..49b64bf333 --- /dev/null +++ b/changelog.d/3903.misc @@ -0,0 +1 @@ +Increase the timeout when filling missing events in federation requests \ No newline at end of file diff --git a/changelog.d/3904.misc b/changelog.d/3904.misc new file mode 100644 index 0000000000..1e3c8e1706 --- /dev/null +++ b/changelog.d/3904.misc @@ -0,0 +1 @@ +Improve the logging when handling a federation transaction \ No newline at end of file diff --git a/changelog.d/3906.misc b/changelog.d/3906.misc new file mode 100644 index 0000000000..11709186d3 --- /dev/null +++ b/changelog.d/3906.misc @@ -0,0 +1 @@ +Improve logging of outbound federation requests \ No newline at end of file diff --git a/changelog.d/3907.bugfix b/changelog.d/3907.bugfix new file mode 100644 index 0000000000..45e010c052 --- /dev/null +++ b/changelog.d/3907.bugfix @@ -0,0 +1 @@ +Fix incorrect server-name indication for outgoing federation requests \ No newline at end of file diff --git a/changelog.d/3908.bugfix b/changelog.d/3908.bugfix new file mode 100644 index 0000000000..518aee6c4d --- /dev/null +++ b/changelog.d/3908.bugfix @@ -0,0 +1 @@ +Fix adding client IPs to the database failing on Python 3. \ No newline at end of file diff --git a/changelog.d/3909.misc b/changelog.d/3909.misc new file mode 100644 index 0000000000..11709186d3 --- /dev/null +++ b/changelog.d/3909.misc @@ -0,0 +1 @@ +Improve logging of outbound federation requests \ No newline at end of file diff --git a/changelog.d/3910.bugfix b/changelog.d/3910.bugfix new file mode 100644 index 0000000000..22ec2adc33 --- /dev/null +++ b/changelog.d/3910.bugfix @@ -0,0 +1 @@ +Fix bug where things occaisonally were not being timed out correctly. diff --git a/changelog.d/3912.misc b/changelog.d/3912.misc new file mode 100644 index 0000000000..87d73697ea --- /dev/null +++ b/changelog.d/3912.misc @@ -0,0 +1 @@ +Add a regression test for logging failed HTTP requests on Python 3. \ No newline at end of file diff --git a/changelog.d/3914.bugfix b/changelog.d/3914.bugfix new file mode 100644 index 0000000000..27e6bad590 --- /dev/null +++ b/changelog.d/3914.bugfix @@ -0,0 +1 @@ +Fix bug where outbound federation would stop talking to some servers when using workers diff --git a/changelog.d/3916.feature b/changelog.d/3916.feature new file mode 100644 index 0000000000..13282d992b --- /dev/null +++ b/changelog.d/3916.feature @@ -0,0 +1 @@ +Always LL ourselves if we're in a room diff --git a/changelog.d/3927.misc b/changelog.d/3927.misc new file mode 100644 index 0000000000..4bd8e25f67 --- /dev/null +++ b/changelog.d/3927.misc @@ -0,0 +1 @@ +Log exceptions thrown by background tasks diff --git a/docker/Dockerfile-pgtests b/docker/Dockerfile-pgtests new file mode 100644 index 0000000000..7da8eeb9eb --- /dev/null +++ b/docker/Dockerfile-pgtests @@ -0,0 +1,12 @@ +# Use the Sytest image that comes with a lot of the build dependencies +# pre-installed +FROM matrixdotorg/sytest:latest + +# The Sytest image doesn't come with python, so install that +RUN apt-get -qq install -y python python-dev python-pip + +# We need tox to run the tests in run_pg_tests.sh +RUN pip install tox + +ADD run_pg_tests.sh /pg_tests.sh +ENTRYPOINT /pg_tests.sh diff --git a/docker/README.md b/docker/README.md index 038c78f7c0..3c00d1e948 100644 --- a/docker/README.md +++ b/docker/README.md @@ -88,6 +88,7 @@ variables are available for configuration: * ``SYNAPSE_TURN_URIS``, set this variable to the coma-separated list of TURN uris to enable TURN for this homeserver. * ``SYNAPSE_TURN_SECRET``, set this to the TURN shared secret if required. +* ``SYNAPSE_MAX_UPLOAD_SIZE``, set this variable to change the max upload size [default `10M`]. Shared secrets, that will be initialized to random values if not set: diff --git a/docker/conf/homeserver.yaml b/docker/conf/homeserver.yaml index 6bc25bb45f..cfe88788f2 100644 --- a/docker/conf/homeserver.yaml +++ b/docker/conf/homeserver.yaml @@ -85,7 +85,7 @@ federation_rc_concurrent: 3 media_store_path: "/data/media" uploads_path: "/data/uploads" -max_upload_size: "10M" +max_upload_size: "{{ SYNAPSE_MAX_UPLOAD_SIZE or "10M" }}" max_image_pixels: "32M" dynamic_thumbnails: false diff --git a/docker/run_pg_tests.sh b/docker/run_pg_tests.sh new file mode 100755 index 0000000000..e77424c41a --- /dev/null +++ b/docker/run_pg_tests.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# This script runs the PostgreSQL tests inside a Docker container. It expects +# the relevant source files to be mounted into /src (done automatically by the +# caller script). It will set up the database, run it, and then use the tox +# configuration to run the tests. + +set -e + +# Set PGUSER so Synapse's tests know what user to connect to the database with +export PGUSER=postgres + +# Initialise & start the database +su -c '/usr/lib/postgresql/9.6/bin/initdb -D /var/lib/postgresql/data -E "UTF-8" --lc-collate="en_US.UTF-8" --lc-ctype="en_US.UTF-8" --username=postgres' postgres +su -c '/usr/lib/postgresql/9.6/bin/pg_ctl -w -D /var/lib/postgresql/data start' postgres + +# Run the tests +cd /src +export TRIAL_FLAGS="-j 4" +tox --workdir=/tmp -e py27-postgres diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index ac97e19649..3241ded188 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -457,6 +457,10 @@ def run(hs): stats["homeserver"] = hs.config.server_name stats["timestamp"] = now stats["uptime_seconds"] = uptime + version = sys.version_info + stats["python_version"] = "{}.{}.{}".format( + version.major, version.minor, version.micro + ) stats["total_users"] = yield hs.get_datastore().count_all_users() total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users() diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index ef866da1b6..18741c5fac 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -20,7 +20,14 @@ import string from twisted.internet import defer from synapse.api.constants import EventTypes -from synapse.api.errors import AuthError, CodeMessageException, Codes, SynapseError +from synapse.api.errors import ( + AuthError, + CodeMessageException, + Codes, + NotFoundError, + StoreError, + SynapseError, +) from synapse.types import RoomAlias, UserID, get_domain_from_id from ._base import BaseHandler @@ -109,7 +116,13 @@ class DirectoryHandler(BaseHandler): def delete_association(self, requester, user_id, room_alias): # association deletion for human users - can_delete = yield self._user_can_delete_alias(room_alias, user_id) + try: + can_delete = yield self._user_can_delete_alias(room_alias, user_id) + except StoreError as e: + if e.code == 404: + raise NotFoundError("Unknown room alias") + raise + if not can_delete: raise AuthError( 403, "You don't have permission to delete the alias.", @@ -320,7 +333,7 @@ class DirectoryHandler(BaseHandler): def _user_can_delete_alias(self, alias, user_id): creator = yield self.store.get_room_alias_creator(alias.to_string()) - if creator and creator == user_id: + if creator is not None and creator == user_id: defer.returnValue(True) is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id)) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0c68e8a472..8d6bd7976d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -69,6 +69,27 @@ from ._base import BaseHandler logger = logging.getLogger(__name__) +def shortstr(iterable, maxitems=5): + """If iterable has maxitems or fewer, return the stringification of a list + containing those items. + + Otherwise, return the stringification of a a list with the first maxitems items, + followed by "...". + + Args: + iterable (Iterable): iterable to truncate + maxitems (int): number of items to return before truncating + + Returns: + unicode + """ + + items = list(itertools.islice(iterable, maxitems + 1)) + if len(items) <= maxitems: + return str(items) + return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]" + + class FederationHandler(BaseHandler): """Handles events that originated from federation. Responsible for: @@ -114,7 +135,6 @@ class FederationHandler(BaseHandler): self._room_pdu_linearizer = Linearizer("fed_room_pdu") @defer.inlineCallbacks - @log_function def on_receive_pdu( self, origin, pdu, get_missing=True, sent_to_us_directly=False, ): @@ -130,9 +150,17 @@ class FederationHandler(BaseHandler): Returns (Deferred): completes with None """ + room_id = pdu.room_id + event_id = pdu.event_id + + logger.info( + "[%s %s] handling received PDU: %s", + room_id, event_id, pdu, + ) + # We reprocess pdus when we have seen them only as outliers existing = yield self.store.get_event( - pdu.event_id, + event_id, allow_none=True, allow_rejected=True, ) @@ -147,7 +175,7 @@ class FederationHandler(BaseHandler): ) ) if already_seen: - logger.debug("Already seen pdu %s", pdu.event_id) + logger.debug("[%s %s]: Already seen pdu", room_id, event_id) return # do some initial sanity-checking of the event. In particular, make @@ -156,6 +184,7 @@ class FederationHandler(BaseHandler): try: self._sanity_check_event(pdu) except SynapseError as err: + logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id) raise FederationError( "ERROR", err.code, @@ -165,10 +194,12 @@ class FederationHandler(BaseHandler): # If we are currently in the process of joining this room, then we # queue up events for later processing. - if pdu.room_id in self.room_queues: - logger.info("Ignoring PDU %s for room %s from %s for now; join " - "in progress", pdu.event_id, pdu.room_id, origin) - self.room_queues[pdu.room_id].append((pdu, origin)) + if room_id in self.room_queues: + logger.info( + "[%s %s] Queuing PDU from %s for now: join in progress", + room_id, event_id, origin, + ) + self.room_queues[room_id].append((pdu, origin)) return # If we're no longer in the room just ditch the event entirely. This @@ -179,7 +210,7 @@ class FederationHandler(BaseHandler): # we should check if we *are* in fact in the room. If we are then we # can magically rejoin the room. is_in_room = yield self.auth.check_host_in_room( - pdu.room_id, + room_id, self.server_name ) if not is_in_room: @@ -188,8 +219,8 @@ class FederationHandler(BaseHandler): ) if was_in_room: logger.info( - "Ignoring PDU %s for room %s from %s as we've left the room!", - pdu.event_id, pdu.room_id, origin, + "[%s %s] Ignoring PDU from %s as we've left the room", + room_id, event_id, origin, ) defer.returnValue(None) @@ -204,8 +235,8 @@ class FederationHandler(BaseHandler): ) logger.debug( - "_handle_new_pdu min_depth for %s: %d", - pdu.room_id, min_depth + "[%s %s] min_depth: %d", + room_id, event_id, min_depth, ) prevs = {e_id for e_id, _ in pdu.prev_events} @@ -218,17 +249,18 @@ class FederationHandler(BaseHandler): # send to the clients. pdu.internal_metadata.outlier = True elif min_depth and pdu.depth > min_depth: - if get_missing and prevs - seen: + missing_prevs = prevs - seen + if get_missing and missing_prevs: # If we're missing stuff, ensure we only fetch stuff one # at a time. logger.info( - "Acquiring lock for room %r to fetch %d missing events: %r...", - pdu.room_id, len(prevs - seen), list(prevs - seen)[:5], + "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s", + room_id, event_id, len(missing_prevs), shortstr(missing_prevs), ) with (yield self._room_pdu_linearizer.queue(pdu.room_id)): logger.info( - "Acquired lock for room %r to fetch %d missing events", - pdu.room_id, len(prevs - seen), + "[%s %s] Acquired room lock to fetch %d missing prev_events", + room_id, event_id, len(missing_prevs), ) yield self._get_missing_events_for_pdu( @@ -241,19 +273,23 @@ class FederationHandler(BaseHandler): if not prevs - seen: logger.info( - "Found all missing prev events for %s", pdu.event_id + "[%s %s] Found all missing prev_events", + room_id, event_id, ) - elif prevs - seen: + elif missing_prevs: logger.info( - "Not fetching %d missing events for room %r,event %s: %r...", - len(prevs - seen), pdu.room_id, pdu.event_id, - list(prevs - seen)[:5], + "[%s %s] Not recursively fetching %d missing prev_events: %s", + room_id, event_id, len(missing_prevs), shortstr(missing_prevs), ) if sent_to_us_directly and prevs - seen: # If they have sent it to us directly, and the server # isn't telling us about the auth events that it's # made a message referencing, we explode + logger.warn( + "[%s %s] Failed to fetch %d prev events: rejecting", + room_id, event_id, len(prevs - seen), + ) raise FederationError( "ERROR", 403, @@ -270,15 +306,19 @@ class FederationHandler(BaseHandler): auth_chains = set() try: # Get the state of the events we know about - ours = yield self.store.get_state_groups(pdu.room_id, list(seen)) + ours = yield self.store.get_state_groups(room_id, list(seen)) state_groups.append(ours) # Ask the remote server for the states we don't # know about for p in prevs - seen: + logger.info( + "[%s %s] Requesting state at missing prev_event %s", + room_id, event_id, p, + ) state, got_auth_chain = ( yield self.federation_client.get_state_for_room( - origin, pdu.room_id, p + origin, room_id, p, ) ) auth_chains.update(got_auth_chain) @@ -291,19 +331,24 @@ class FederationHandler(BaseHandler): ev_ids, get_prev_content=False, check_redacted=False ) - room_version = yield self.store.get_room_version(pdu.room_id) + room_version = yield self.store.get_room_version(room_id) state_map = yield resolve_events_with_factory( - room_version, state_groups, {pdu.event_id: pdu}, fetch + room_version, state_groups, {event_id: pdu}, fetch ) state = (yield self.store.get_events(state_map.values())).values() auth_chain = list(auth_chains) except Exception: + logger.warn( + "[%s %s] Error attempting to resolve state at missing " + "prev_events", + room_id, event_id, exc_info=True, + ) raise FederationError( "ERROR", 403, "We can't get valid state history.", - affected=pdu.event_id, + affected=event_id, ) yield self._process_received_pdu( @@ -322,15 +367,16 @@ class FederationHandler(BaseHandler): prevs (set(str)): List of event ids which we are missing min_depth (int): Minimum depth of events to return. """ - # We recalculate seen, since it may have changed. + + room_id = pdu.room_id + event_id = pdu.event_id + seen = yield self.store.have_seen_events(prevs) if not prevs - seen: return - latest = yield self.store.get_latest_event_ids_in_room( - pdu.room_id - ) + latest = yield self.store.get_latest_event_ids_in_room(room_id) # We add the prev events that we have seen to the latest # list to ensure the remote server doesn't give them to us @@ -338,8 +384,8 @@ class FederationHandler(BaseHandler): latest |= seen logger.info( - "Missing %d events for room %r pdu %s: %r...", - len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5] + "[%s %s]: Requesting %d prev_events: %s", + room_id, event_id, len(prevs - seen), shortstr(prevs - seen) ) # XXX: we set timeout to 10s to help workaround @@ -360,49 +406,87 @@ class FederationHandler(BaseHandler): # apparently. # # see https://github.com/matrix-org/synapse/pull/1744 + # + # ---- + # + # Update richvdh 2018/09/18: There are a number of problems with timing this + # request out agressively on the client side: + # + # - it plays badly with the server-side rate-limiter, which starts tarpitting you + # if you send too many requests at once, so you end up with the server carefully + # working through the backlog of your requests, which you have already timed + # out. + # + # - for this request in particular, we now (as of + # https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the + # server can't produce a plausible-looking set of prev_events - so we becone + # much more likely to reject the event. + # + # - contrary to what it says above, we do *not* fall back to fetching fresh state + # for the room if get_missing_events times out. Rather, we give up processing + # the PDU whose prevs we are missing, which then makes it much more likely that + # we'll end up back here for the *next* PDU in the list, which exacerbates the + # problem. + # + # - the agressive 10s timeout was introduced to deal with incoming federation + # requests taking 8 hours to process. It's not entirely clear why that was going + # on; certainly there were other issues causing traffic storms which are now + # resolved, and I think in any case we may be more sensible about our locking + # now. We're *certainly* more sensible about our logging. + # + # All that said: Let's try increasing the timout to 60s and see what happens. missing_events = yield self.federation_client.get_missing_events( origin, - pdu.room_id, + room_id, earliest_events_ids=list(latest), latest_events=[pdu], limit=10, min_depth=min_depth, - timeout=10000, + timeout=60000, ) logger.info( - "Got %d events: %r...", - len(missing_events), [e.event_id for e in missing_events[:5]] + "[%s %s]: Got %d prev_events: %s", + room_id, event_id, len(missing_events), shortstr(missing_events), ) # We want to sort these by depth so we process them and # tell clients about them in order. missing_events.sort(key=lambda x: x.depth) - for e in missing_events: - logger.info("Handling found event %s", e.event_id) + for ev in missing_events: + logger.info( + "[%s %s] Handling received prev_event %s", + room_id, event_id, ev.event_id, + ) try: yield self.on_receive_pdu( origin, - e, + ev, get_missing=False ) except FederationError as e: if e.code == 403: - logger.warn("Event %s failed history check.") + logger.warn( + "[%s %s] Received prev_event %s failed history check.", + room_id, event_id, ev.event_id, + ) else: raise - @log_function @defer.inlineCallbacks - def _process_received_pdu(self, origin, pdu, state, auth_chain): + def _process_received_pdu(self, origin, event, state, auth_chain): """ Called when we have a new pdu. We need to do auth checks and put it through the StateHandler. """ - event = pdu + room_id = event.room_id + event_id = event.event_id - logger.debug("Processing event: %s", event) + logger.debug( + "[%s %s] Processing event: %s", + room_id, event_id, event, + ) # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work @@ -411,15 +495,16 @@ class FederationHandler(BaseHandler): # event. if state and auth_chain and not event.internal_metadata.is_outlier(): is_in_room = yield self.auth.check_host_in_room( - event.room_id, + room_id, self.server_name ) else: is_in_room = True + if not is_in_room: logger.info( - "Got event for room we're not in: %r %r", - event.room_id, event.event_id + "[%s %s] Got event for room we're not in", + room_id, event_id, ) try: @@ -431,7 +516,7 @@ class FederationHandler(BaseHandler): "ERROR", e.code, e.msg, - affected=event.event_id, + affected=event_id, ) else: @@ -480,12 +565,12 @@ class FederationHandler(BaseHandler): affected=event.event_id, ) - room = yield self.store.get_room(event.room_id) + room = yield self.store.get_room(room_id) if not room: try: yield self.store.store_room( - room_id=event.room_id, + room_id=room_id, room_creator_user_id="", is_public=False, ) @@ -513,7 +598,7 @@ class FederationHandler(BaseHandler): if newly_joined: user = UserID.from_string(event.state_key) - yield self.user_joined_room(user, event.room_id) + yield self.user_joined_room(user, room_id) @log_function @defer.inlineCallbacks @@ -1430,12 +1515,10 @@ class FederationHandler(BaseHandler): else: defer.returnValue(None) - @log_function def get_min_depth_for_context(self, context): return self.store.get_min_depth(context) @defer.inlineCallbacks - @log_function def _handle_new_event(self, origin, event, state=None, auth_events=None, backfilled=False): context = yield self._prep_event( @@ -1635,8 +1718,8 @@ class FederationHandler(BaseHandler): ) except AuthError as e: logger.warn( - "Rejecting %s because %s", - event.event_id, e.msg + "[%s %s] Rejecting: %s", + event.room_id, event.event_id, e.msg ) context.rejected = RejectedReason.AUTH_ERROR diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 1e53f2c635..da914c46ff 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -534,4 +534,5 @@ class RegistrationHandler(BaseHandler): room_id=room_id, remote_room_hosts=remote_room_hosts, action="join", + ratelimit=False, ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index f643619047..07fd3e82fc 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -583,6 +583,11 @@ class RoomMemberHandler(object): room_id = mapping["room_id"] servers = mapping["servers"] + # put the server which owns the alias at the front of the server list. + if room_alias.domain in servers: + servers.remove(room_alias.domain) + servers.insert(0, room_alias.domain) + defer.returnValue((RoomID.from_string(room_id), servers)) @defer.inlineCallbacks diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9bca4e7067..b598916b21 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -713,6 +713,10 @@ class SyncHandler(object): ) ] + # always make sure we LL ourselves so we know we're in the room + # (if we are), to fix https://github.com/vector-im/riot-web/issues/7209 + types.append((EventTypes.Member, sync_config.user.to_string())) + # only apply the filtering to room members filtered_types = [EventTypes.Member] diff --git a/synapse/http/client.py b/synapse/http/client.py index ec339a92ad..3d05f83b8c 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -43,7 +43,7 @@ from twisted.web.http_headers import Headers from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.http import cancelled_to_request_timed_out_error, redact_uri from synapse.http.endpoint import SpiderEndpoint -from synapse.util.async_helpers import add_timeout_to_deferred +from synapse.util.async_helpers import timeout_deferred from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable @@ -99,7 +99,7 @@ class SimpleHttpClient(object): request_deferred = treq.request( method, uri, agent=self.agent, data=data, headers=headers ) - add_timeout_to_deferred( + request_deferred = timeout_deferred( request_deferred, 60, self.hs.get_reactor(), cancelled_to_request_timed_out_error, ) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index b0c9369519..91025037a3 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -108,7 +108,7 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory= Args: reactor: Twisted reactor. - destination (bytes): The name of the server to connect to. + destination (unicode): The name of the server to connect to. tls_client_options_factory (synapse.crypto.context_factory.ClientTLSOptionsFactory): Factory which generates TLS options for client connections. @@ -126,10 +126,17 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory= transport_endpoint = HostnameEndpoint default_port = 8008 else: + # the SNI string should be the same as the Host header, minus the port. + # as per https://github.com/matrix-org/synapse/issues/2525#issuecomment-336896777, + # the Host header and SNI should therefore be the server_name of the remote + # server. + tls_options = tls_client_options_factory.get_options(domain) + def transport_endpoint(reactor, host, port, timeout): return wrapClientTLS( - tls_client_options_factory.get_options(host), - HostnameEndpoint(reactor, host, port, timeout=timeout)) + tls_options, + HostnameEndpoint(reactor, host, port, timeout=timeout), + ) default_port = 8448 if port is None: diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 13b19f7626..14b12cd1c4 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -17,10 +17,12 @@ import cgi import logging import random import sys +from io import BytesIO from six import PY3, string_types from six.moves import urllib +import attr import treq from canonicaljson import encode_canonical_json from prometheus_client import Counter @@ -28,8 +30,9 @@ from signedjson.sign import sign_json from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError +from twisted.internet.task import _EPSILON, Cooperator from twisted.web._newclient import ResponseDone -from twisted.web.client import Agent, HTTPConnectionPool +from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool from twisted.web.http_headers import Headers import synapse.metrics @@ -41,13 +44,11 @@ from synapse.api.errors import ( SynapseError, ) from synapse.http.endpoint import matrix_federation_endpoint -from synapse.util import logcontext -from synapse.util.async_helpers import timeout_no_seriously +from synapse.util.async_helpers import timeout_deferred from synapse.util.logcontext import make_deferred_yieldable from synapse.util.metrics import Measure logger = logging.getLogger(__name__) -outbound_logger = logging.getLogger("synapse.http.outbound") outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests", "", ["method"]) @@ -78,6 +79,99 @@ class MatrixFederationEndpointFactory(object): ) +_next_id = 1 + + +@attr.s +class MatrixFederationRequest(object): + method = attr.ib() + """HTTP method + :type: str + """ + + path = attr.ib() + """HTTP path + :type: str + """ + + destination = attr.ib() + """The remote server to send the HTTP request to. + :type: str""" + + json = attr.ib(default=None) + """JSON to send in the body. + :type: dict|None + """ + + json_callback = attr.ib(default=None) + """A callback to generate the JSON. + :type: func|None + """ + + query = attr.ib(default=None) + """Query arguments. + :type: dict|None + """ + + txn_id = attr.ib(default=None) + """Unique ID for this request (for logging) + :type: str|None + """ + + def __attrs_post_init__(self): + global _next_id + self.txn_id = "%s-O-%s" % (self.method, _next_id) + _next_id = (_next_id + 1) % (MAXINT - 1) + + def get_json(self): + if self.json_callback: + return self.json_callback() + return self.json + + +@defer.inlineCallbacks +def _handle_json_response(reactor, timeout_sec, request, response): + """ + Reads the JSON body of a response, with a timeout + + Args: + reactor (IReactor): twisted reactor, for the timeout + timeout_sec (float): number of seconds to wait for response to complete + request (MatrixFederationRequest): the request that triggered the response + response (IResponse): response to the request + + Returns: + dict: parsed JSON response + """ + try: + check_content_type_is_json(response.headers) + + d = treq.json_content(response) + d = timeout_deferred( + d, + timeout=timeout_sec, + reactor=reactor, + ) + + body = yield make_deferred_yieldable(d) + except Exception as e: + logger.warn( + "{%s} [%s] Error reading response: %s", + request.txn_id, + request.destination, + e, + ) + raise + logger.info( + "{%s} [%s] Completed: %d %s", + request.txn_id, + request.destination, + response.code, + response.phrase.decode('ascii', errors='replace'), + ) + defer.returnValue(body) + + class MatrixFederationHttpClient(object): """HTTP client used to talk to other homeservers over the federation protocol. Send client certificates and signs requests. @@ -102,34 +196,35 @@ class MatrixFederationHttpClient(object): self.clock = hs.get_clock() self._store = hs.get_datastore() self.version_string = hs.version_string.encode('ascii') - self._next_id = 1 self.default_timeout = 60 - def _create_url(self, destination, path_bytes, param_bytes, query_bytes): - return urllib.parse.urlunparse( - (b"matrix", destination, path_bytes, param_bytes, query_bytes, b"") - ) + def schedule(x): + reactor.callLater(_EPSILON, x) + + self._cooperator = Cooperator(scheduler=schedule) @defer.inlineCallbacks - def _request(self, destination, method, path, - json=None, json_callback=None, - param_bytes=b"", - query=None, retry_on_dns_fail=True, - timeout=None, long_retries=False, - ignore_backoff=False, - backoff_on_404=False): + def _send_request( + self, + request, + retry_on_dns_fail=True, + timeout=None, + long_retries=False, + ignore_backoff=False, + backoff_on_404=False + ): """ - Creates and sends a request to the given server. + Sends a request to the given server. Args: - destination (str): The remote server to send the HTTP request to. - method (str): HTTP method - path (str): The HTTP path - json (dict or None): JSON to send in the body. - json_callback (func or None): A callback to generate the JSON. - query (dict or None): Query arguments. + request (MatrixFederationRequest): details of request to be sent + + timeout (int|None): number of milliseconds to wait for the response headers + (including connecting to the server). 60s by default. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. + backoff_on_404 (bool): Back off if we get a 404 Returns: @@ -154,38 +249,32 @@ class MatrixFederationHttpClient(object): if ( self.hs.config.federation_domain_whitelist is not None and - destination not in self.hs.config.federation_domain_whitelist + request.destination not in self.hs.config.federation_domain_whitelist ): - raise FederationDeniedError(destination) + raise FederationDeniedError(request.destination) limiter = yield synapse.util.retryutils.get_retry_limiter( - destination, + request.destination, self.clock, self._store, backoff_on_404=backoff_on_404, ignore_backoff=ignore_backoff, ) - headers_dict = {} - path_bytes = path.encode("ascii") - if query: - query_bytes = encode_query_args(query) + method = request.method + destination = request.destination + path_bytes = request.path.encode("ascii") + if request.query: + query_bytes = encode_query_args(request.query) else: query_bytes = b"" headers_dict = { "User-Agent": [self.version_string], - "Host": [destination], + "Host": [request.destination], } with limiter: - url = self._create_url( - destination.encode("ascii"), path_bytes, param_bytes, query_bytes - ).decode('ascii') - - txn_id = "%s-O-%s" % (method, self._next_id) - self._next_id = (self._next_id + 1) % (MAXINT - 1) - # XXX: Would be much nicer to retry only at the transaction-layer # (once we have reliable transactions in place) if long_retries: @@ -193,16 +282,19 @@ class MatrixFederationHttpClient(object): else: retries_left = MAX_SHORT_RETRIES - http_url = urllib.parse.urlunparse( - (b"", b"", path_bytes, param_bytes, query_bytes, b"") - ).decode('ascii') + url = urllib.parse.urlunparse(( + b"matrix", destination.encode("ascii"), + path_bytes, None, query_bytes, b"", + )).decode('ascii') + + http_url = urllib.parse.urlunparse(( + b"", b"", + path_bytes, None, query_bytes, b"", + )).decode('ascii') - log_result = None while True: try: - if json_callback: - json = json_callback() - + json = request.get_json() if json: data = encode_canonical_json(json) headers_dict["Content-Type"] = ["application/json"] @@ -213,29 +305,32 @@ class MatrixFederationHttpClient(object): data = None self.sign_request(destination, method, http_url, headers_dict) - outbound_logger.info( + logger.info( "{%s} [%s] Sending request: %s %s", - txn_id, destination, method, url + request.txn_id, destination, method, url ) + if data: + producer = FileBodyProducer( + BytesIO(data), + cooperator=self._cooperator + ) + else: + producer = None + request_deferred = treq.request( method, url, headers=Headers(headers_dict), - data=data, + data=producer, agent=self.agent, reactor=self.hs.get_reactor(), unbuffered=True ) - request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor()) - # Sometimes the timeout above doesn't work, so lets hack yet - # another layer of timeouts in in the vain hope that at some - # point the world made sense and this really really really - # should work. - request_deferred = timeout_no_seriously( + request_deferred = timeout_deferred( request_deferred, - timeout=_sec_timeout * 2, + timeout=_sec_timeout, reactor=self.hs.get_reactor(), ) @@ -244,30 +339,19 @@ class MatrixFederationHttpClient(object): request_deferred, ) - log_result = "%d %s" % (response.code, response.phrase,) break except Exception as e: - if not retry_on_dns_fail and isinstance(e, DNSLookupError): - logger.warn( - "DNS Lookup failed to %s with %s", - destination, - e - ) - log_result = "DNS Lookup failed to %s with %s" % ( - destination, e - ) - raise - logger.warn( - "{%s} Sending request failed to %s: %s %s: %s", - txn_id, + "{%s} [%s] Request failed: %s %s: %s", + request.txn_id, destination, method, url, _flatten_response_never_received(e), ) - log_result = _flatten_response_never_received(e) + if not retry_on_dns_fail and isinstance(e, DNSLookupError): + raise if retries_left and not timeout: if long_retries: @@ -280,33 +364,37 @@ class MatrixFederationHttpClient(object): delay *= random.uniform(0.8, 1.4) logger.debug( - "{%s} Waiting %s before sending to %s...", - txn_id, + "{%s} [%s] Waiting %ss before re-sending...", + request.txn_id, + destination, delay, - destination ) yield self.clock.sleep(delay) retries_left -= 1 else: raise - finally: - outbound_logger.info( - "{%s} [%s] Result: %s", - txn_id, - destination, - log_result, - ) + + logger.info( + "{%s} [%s] Got response headers: %d %s", + request.txn_id, + destination, + response.code, + response.phrase.decode('ascii', errors='replace'), + ) if 200 <= response.code < 300: pass else: # :'( # Update transactions table? - with logcontext.PreserveLoggingContext(): - d = treq.content(response) - d.addTimeout(_sec_timeout, self.hs.get_reactor()) - body = yield make_deferred_yieldable(d) + d = treq.content(response) + d = timeout_deferred( + d, + timeout=_sec_timeout, + reactor=self.hs.get_reactor(), + ) + body = yield make_deferred_yieldable(d) raise HttpResponseException( response.code, response.phrase, body ) @@ -400,29 +488,26 @@ class MatrixFederationHttpClient(object): is not on our federation whitelist """ - if not json_data_callback: - json_data_callback = lambda: data - - response = yield self._request( - destination, - "PUT", - path, - json_callback=json_data_callback, + request = MatrixFederationRequest( + method="PUT", + destination=destination, + path=path, query=args, + json_callback=json_data_callback, + json=data, + ) + + response = yield self._send_request( + request, long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, backoff_on_404=backoff_on_404, ) - if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? - check_content_type_is_json(response.headers) - - with logcontext.PreserveLoggingContext(): - d = treq.json_content(response) - d.addTimeout(self.default_timeout, self.hs.get_reactor()) - body = yield make_deferred_yieldable(d) + body = yield _handle_json_response( + self.hs.get_reactor(), self.default_timeout, request, response, + ) defer.returnValue(body) @defer.inlineCallbacks @@ -456,31 +541,30 @@ class MatrixFederationHttpClient(object): Fails with ``FederationDeniedError`` if this destination is not on our federation whitelist """ - response = yield self._request( - destination, - "POST", - path, + + request = MatrixFederationRequest( + method="POST", + destination=destination, + path=path, query=args, json=data, + ) + + response = yield self._send_request( + request, long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, ) - if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? - check_content_type_is_json(response.headers) - - with logcontext.PreserveLoggingContext(): - d = treq.json_content(response) - if timeout: - _sec_timeout = timeout / 1000 - else: - _sec_timeout = self.default_timeout - - d.addTimeout(_sec_timeout, self.hs.get_reactor()) - body = yield make_deferred_yieldable(d) + if timeout: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + body = yield _handle_json_response( + self.hs.get_reactor(), _sec_timeout, request, response, + ) defer.returnValue(body) @defer.inlineCallbacks @@ -516,25 +600,23 @@ class MatrixFederationHttpClient(object): logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail) - response = yield self._request( - destination, - "GET", - path, + request = MatrixFederationRequest( + method="GET", + destination=destination, + path=path, query=args, + ) + + response = yield self._send_request( + request, retry_on_dns_fail=retry_on_dns_fail, timeout=timeout, ignore_backoff=ignore_backoff, ) - if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? - check_content_type_is_json(response.headers) - - with logcontext.PreserveLoggingContext(): - d = treq.json_content(response) - d.addTimeout(self.default_timeout, self.hs.get_reactor()) - body = yield make_deferred_yieldable(d) - + body = yield _handle_json_response( + self.hs.get_reactor(), self.default_timeout, request, response, + ) defer.returnValue(body) @defer.inlineCallbacks @@ -565,25 +647,23 @@ class MatrixFederationHttpClient(object): Fails with ``FederationDeniedError`` if this destination is not on our federation whitelist """ - response = yield self._request( - destination, - "DELETE", - path, + request = MatrixFederationRequest( + method="DELETE", + destination=destination, + path=path, query=args, + ) + + response = yield self._send_request( + request, long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, ) - if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? - check_content_type_is_json(response.headers) - - with logcontext.PreserveLoggingContext(): - d = treq.json_content(response) - d.addTimeout(self.default_timeout, self.hs.get_reactor()) - body = yield make_deferred_yieldable(d) - + body = yield _handle_json_response( + self.hs.get_reactor(), self.default_timeout, request, response, + ) defer.returnValue(body) @defer.inlineCallbacks @@ -611,11 +691,15 @@ class MatrixFederationHttpClient(object): Fails with ``FederationDeniedError`` if this destination is not on our federation whitelist """ - response = yield self._request( - destination, - "GET", - path, + request = MatrixFederationRequest( + method="GET", + destination=destination, + path=path, query=args, + ) + + response = yield self._send_request( + request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff, ) @@ -623,14 +707,25 @@ class MatrixFederationHttpClient(object): headers = dict(response.headers.getAllRawHeaders()) try: - with logcontext.PreserveLoggingContext(): - d = _readBodyToFile(response, output_stream, max_size) - d.addTimeout(self.default_timeout, self.hs.get_reactor()) - length = yield make_deferred_yieldable(d) - except Exception: - logger.exception("Failed to download body") + d = _readBodyToFile(response, output_stream, max_size) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + length = yield make_deferred_yieldable(d) + except Exception as e: + logger.warn( + "{%s} [%s] Error reading response: %s", + request.txn_id, + request.destination, + e, + ) raise - + logger.info( + "{%s} [%s] Completed: %d %s [%d bytes]", + request.txn_id, + request.destination, + response.code, + response.phrase.decode('ascii', errors='replace'), + length, + ) defer.returnValue((length, headers)) diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index 72c2654678..fedb4e6b18 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -162,7 +162,7 @@ class RequestMetrics(object): with _in_flight_requests_lock: _in_flight_requests.add(self) - def stop(self, time_sec, request): + def stop(self, time_sec, response_code, sent_bytes): with _in_flight_requests_lock: _in_flight_requests.discard(self) @@ -179,35 +179,35 @@ class RequestMetrics(object): ) return - response_code = str(request.code) + response_code = str(response_code) - outgoing_responses_counter.labels(request.method, response_code).inc() + outgoing_responses_counter.labels(self.method, response_code).inc() - response_count.labels(request.method, self.name, tag).inc() + response_count.labels(self.method, self.name, tag).inc() - response_timer.labels(request.method, self.name, tag, response_code).observe( + response_timer.labels(self.method, self.name, tag, response_code).observe( time_sec - self.start ) resource_usage = context.get_resource_usage() - response_ru_utime.labels(request.method, self.name, tag).inc( + response_ru_utime.labels(self.method, self.name, tag).inc( resource_usage.ru_utime, ) - response_ru_stime.labels(request.method, self.name, tag).inc( + response_ru_stime.labels(self.method, self.name, tag).inc( resource_usage.ru_stime, ) - response_db_txn_count.labels(request.method, self.name, tag).inc( + response_db_txn_count.labels(self.method, self.name, tag).inc( resource_usage.db_txn_count ) - response_db_txn_duration.labels(request.method, self.name, tag).inc( + response_db_txn_duration.labels(self.method, self.name, tag).inc( resource_usage.db_txn_duration_sec ) - response_db_sched_duration.labels(request.method, self.name, tag).inc( + response_db_sched_duration.labels(self.method, self.name, tag).inc( resource_usage.db_sched_duration_sec ) - response_size.labels(request.method, self.name, tag).inc(request.sentLength) + response_size.labels(self.method, self.name, tag).inc(sent_bytes) # We always call this at the end to ensure that we update the metrics # regardless of whether a call to /metrics while the request was in diff --git a/synapse/http/site.py b/synapse/http/site.py index e1e53e8ae5..50be2de3bb 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -75,14 +75,14 @@ class SynapseRequest(Request): return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % ( self.__class__.__name__, id(self), - self.method, + self.method.decode('ascii', errors='replace'), self.get_redacted_uri(), - self.clientproto, + self.clientproto.decode('ascii', errors='replace'), self.site.site_tag, ) def get_request_id(self): - return "%s-%i" % (self.method, self.request_seq) + return "%s-%i" % (self.method.decode('ascii'), self.request_seq) def get_redacted_uri(self): uri = self.uri @@ -119,7 +119,7 @@ class SynapseRequest(Request): # dispatching to the handler, so that the handler # can update the servlet name in the request # metrics - requests_counter.labels(self.method, + requests_counter.labels(self.method.decode('ascii'), self.request_metrics.name).inc() @contextlib.contextmanager @@ -280,15 +280,15 @@ class SynapseRequest(Request): int(usage.db_txn_count), self.sentLength, code, - self.method, + self.method.decode('ascii'), self.get_redacted_uri(), - self.clientproto, + self.clientproto.decode('ascii', errors='replace'), user_agent, usage.evt_db_fetch_count, ) try: - self.request_metrics.stop(self.finish_time, self) + self.request_metrics.stop(self.finish_time, self.code, self.sentLength) except Exception as e: logger.warn("Failed to stop metrics: %r", e) @@ -308,7 +308,7 @@ class XForwardedForRequest(SynapseRequest): C{b"-"}. """ return self.requestHeaders.getRawHeaders( - b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip() + b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip().decode('ascii') class SynapseRequestFactory(object): diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 167167be0a..173908299c 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import threading import six @@ -23,6 +24,9 @@ from twisted.internet import defer from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +logger = logging.getLogger(__name__) + + _background_process_start_count = Counter( "synapse_background_process_start_count", "Number of background processes started", @@ -191,6 +195,8 @@ def run_as_background_process(desc, func, *args, **kwargs): try: yield func(*args, **kwargs) + except Exception: + logger.exception("Background process '%s' threw an exception", desc) finally: proc.update_metrics() diff --git a/synapse/notifier.py b/synapse/notifier.py index 82f391481c..f1d92c1395 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -25,11 +25,7 @@ from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state from synapse.metrics import LaterGauge from synapse.types import StreamToken -from synapse.util.async_helpers import ( - DeferredTimeoutError, - ObservableDeferred, - add_timeout_to_deferred, -) +from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.logcontext import PreserveLoggingContext, run_in_background from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -337,7 +333,7 @@ class Notifier(object): # Now we wait for the _NotifierUserStream to be told there # is a new token. listener = user_stream.new_listener(prev_token) - add_timeout_to_deferred( + listener.deferred = timeout_deferred( listener.deferred, (end_time - now) / 1000., self.hs.get_reactor(), @@ -354,7 +350,7 @@ class Notifier(object): # Update the prev_token to the current_token since nothing # has happened between the old prev_token and the current_token prev_token = current_token - except DeferredTimeoutError: + except defer.TimeoutError: break except defer.CancelledError: break @@ -559,15 +555,16 @@ class Notifier(object): if end_time <= now: break - add_timeout_to_deferred( - listener.deferred.addTimeout, - (end_time - now) / 1000., - self.hs.get_reactor(), + listener.deferred = timeout_deferred( + listener.deferred, + timeout=(end_time - now) / 1000., + reactor=self.hs.get_reactor(), ) + try: with PreserveLoggingContext(): yield listener.deferred - except DeferredTimeoutError: + except defer.TimeoutError: break except defer.CancelledError: break diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index b78ce10396..1a5a10d974 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -441,7 +441,7 @@ class Mailer(object): def make_room_link(self, room_id): if self.hs.config.email_riot_base_url: - base_url = self.hs.config.email_riot_base_url + base_url = "%s/#/room" % (self.hs.config.email_riot_base_url) elif self.app_name == "Vector": # need /beta for Universal Links to work on iOS base_url = "https://vector.im/beta/#/room" diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 8fc678fa67..9ad17b7c25 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -119,21 +119,25 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): for entry in iteritems(to_update): (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry - self._simple_upsert_txn( - txn, - table="user_ips", - keyvalues={ - "user_id": user_id, - "access_token": access_token, - "ip": ip, - "user_agent": user_agent, - "device_id": device_id, - }, - values={ - "last_seen": last_seen, - }, - lock=False, - ) + try: + self._simple_upsert_txn( + txn, + table="user_ips", + keyvalues={ + "user_id": user_id, + "access_token": access_token, + "ip": ip, + "user_agent": user_agent, + "device_id": device_id, + }, + values={ + "last_seen": last_seen, + }, + lock=False, + ) + except Exception as e: + # Failed to upsert, log and continue + logger.error("Failed to insert client IP %r: %r", entry, e) @defer.inlineCallbacks def get_last_client_ip_by_device(self, user_id, device_id): diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 808194236a..cfb687cb53 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -75,7 +75,6 @@ class DirectoryWorkerStore(SQLBaseStore): }, retcol="creator", desc="get_room_alias_creator", - allow_none=True ) @cached(max_entries=5000) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 0c42bd3322..baf0379a68 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json from twisted.internet import defer from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.util.caches.descriptors import cached from ._base import SQLBaseStore, db_to_json @@ -156,7 +155,6 @@ class TransactionStore(SQLBaseStore): """ pass - @cached(max_entries=10000) def get_destination_retry_timings(self, destination): """Gets the current retry timings (if any) for a given destination. @@ -198,8 +196,6 @@ class TransactionStore(SQLBaseStore): retry_interval (int) - how long until next retry in ms """ - # XXX: we could chose to not bother persisting this if our cache thinks - # this is a NOOP return self.runInteraction( "set_destination_retry_timings", self._set_destination_retry_timings, @@ -212,10 +208,6 @@ class TransactionStore(SQLBaseStore): retry_last_ts, retry_interval): self.database_engine.lock_table(txn, "destinations") - self._invalidate_cache_and_stream( - txn, self.get_destination_retry_timings, (destination,) - ) - # We need to be careful here as the data may have changed from under us # due to a worker setting the timings. diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 40c2946129..ec7b2c9672 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -374,29 +374,25 @@ class ReadWriteLock(object): defer.returnValue(_ctx_manager()) -class DeferredTimeoutError(Exception): - """ - This error is raised by default when a L{Deferred} times out. - """ - +def _cancelled_to_timed_out_error(value, timeout): + if isinstance(value, failure.Failure): + value.trap(CancelledError) + raise defer.TimeoutError(timeout, "Deferred") + return value -def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None): - """ - Add a timeout to a deferred by scheduling it to be cancelled after - timeout seconds. - This is essentially a backport of deferred.addTimeout, which was introduced - in twisted 16.5. +def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None): + """The in built twisted `Deferred.addTimeout` fails to time out deferreds + that have a canceller that throws exceptions. This method creates a new + deferred that wraps and times out the given deferred, correctly handling + the case where the given deferred's canceller throws. - If the deferred gets timed out, it errbacks with a DeferredTimeoutError, - unless a cancelable function was passed to its initialization or unless - a different on_timeout_cancel callable is provided. + NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred Args: - deferred (defer.Deferred): deferred to be timed out - timeout (Number): seconds to time out after - reactor (twisted.internet.reactor): the Twisted reactor to use - + deferred (Deferred) + timeout (float): Timeout in seconds + reactor (twisted.internet.reactor): The twisted reactor to use on_timeout_cancel (callable): A callable which is called immediately after the deferred times out, and not if this deferred is otherwise cancelled before the timeout. @@ -406,48 +402,10 @@ def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None): the timeout. The default callable (if none is provided) will translate a - CancelledError Failure into a DeferredTimeoutError. - """ - timed_out = [False] - - def time_it_out(): - timed_out[0] = True - deferred.cancel() - - delayed_call = reactor.callLater(timeout, time_it_out) - - def convert_cancelled(value): - if timed_out[0]: - to_call = on_timeout_cancel or _cancelled_to_timed_out_error - return to_call(value, timeout) - return value - - deferred.addBoth(convert_cancelled) + CancelledError Failure into a defer.TimeoutError. - def cancel_timeout(result): - # stop the pending call to cancel the deferred if it's been fired - if delayed_call.active(): - delayed_call.cancel() - return result - - deferred.addBoth(cancel_timeout) - - -def _cancelled_to_timed_out_error(value, timeout): - if isinstance(value, failure.Failure): - value.trap(CancelledError) - raise DeferredTimeoutError(timeout, "Deferred") - return value - - -def timeout_no_seriously(deferred, timeout, reactor): - """The in build twisted deferred addTimeout (and the method above) - completely fail to time things out under some unknown circumstances. - - Lets try a different way of timing things out and maybe that will make - things work?! - - TODO: Kill this with fire. + Returns: + Deferred """ new_d = defer.Deferred() @@ -457,16 +415,20 @@ def timeout_no_seriously(deferred, timeout, reactor): def time_it_out(): timed_out[0] = True - if not new_d.called: - new_d.errback(DeferredTimeoutError(timeout, "Deferred")) + try: + deferred.cancel() + except: # noqa: E722, if we throw any exception it'll break time outs + logger.exception("Canceller failed during timeout") - deferred.cancel() + if not new_d.called: + new_d.errback(defer.TimeoutError(timeout, "Deferred")) delayed_call = reactor.callLater(timeout, time_it_out) def convert_cancelled(value): if timed_out[0]: - return _cancelled_to_timed_out_error(value, timeout) + to_call = on_timeout_cancel or _cancelled_to_timed_out_error + return to_call(value, timeout) return value deferred.addBoth(convert_cancelled) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 8a3a06fd74..26cce7d197 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -188,7 +188,7 @@ class RetryDestinationLimiter(object): else: self.retry_interval = self.min_retry_interval - logger.debug( + logger.info( "Connection to %s was unsuccessful (%s(%s)); backoff now %i", self.destination, exc_type, exc_val, self.retry_interval ) diff --git a/test_postgresql.sh b/test_postgresql.sh new file mode 100755 index 0000000000..1ffcaabd31 --- /dev/null +++ b/test_postgresql.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +# This script builds the Docker image to run the PostgreSQL tests, and then runs +# the tests. + +set -e + +# Build, and tag +docker build docker/ -f docker/Dockerfile-pgtests -t synapsepgtests + +# Run, mounting the current directory into /src +docker run --rm -it -v $(pwd)\:/src synapsepgtests diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py index 1c46c9cfeb..66c09f63b6 100644 --- a/tests/http/test_fedclient.py +++ b/tests/http/test_fedclient.py @@ -18,9 +18,14 @@ from mock import Mock from twisted.internet.defer import TimeoutError from twisted.internet.error import ConnectingCancelledError, DNSLookupError from twisted.web.client import ResponseNeverReceived +from twisted.web.http import HTTPChannel -from synapse.http.matrixfederationclient import MatrixFederationHttpClient +from synapse.http.matrixfederationclient import ( + MatrixFederationHttpClient, + MatrixFederationRequest, +) +from tests.server import FakeTransport from tests.unittest import HomeserverTestCase @@ -40,7 +45,7 @@ class FederationClientTests(HomeserverTestCase): """ If the DNS raising returns an error, it will bubble up. """ - d = self.cl._request("testserv2:8008", "GET", "foo/bar", timeout=10000) + d = self.cl.get_json("testserv2:8008", "foo/bar", timeout=10000) self.pump() f = self.failureResultOf(d) @@ -51,7 +56,7 @@ class FederationClientTests(HomeserverTestCase): If the HTTP request is not connected and is timed out, it'll give a ConnectingCancelledError. """ - d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) + d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000) self.pump() @@ -78,7 +83,7 @@ class FederationClientTests(HomeserverTestCase): If the HTTP request is connected, but gets no response before being timed out, it'll give a ResponseNeverReceived. """ - d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) + d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000) self.pump() @@ -108,7 +113,12 @@ class FederationClientTests(HomeserverTestCase): """ Once the client gets the headers, _request returns successfully. """ - d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) + request = MatrixFederationRequest( + method="GET", + destination="testserv:8008", + path="foo/bar", + ) + d = self.cl._send_request(request, timeout=10000) self.pump() @@ -155,3 +165,26 @@ class FederationClientTests(HomeserverTestCase): f = self.failureResultOf(d) self.assertIsInstance(f.value, TimeoutError) + + def test_client_sends_body(self): + self.cl.post_json( + "testserv:8008", "foo/bar", timeout=10000, + data={"a": "b"} + ) + + self.pump() + + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + client = clients[0][2].buildProtocol(None) + server = HTTPChannel() + + client.makeConnection(FakeTransport(server, self.reactor)) + server.makeConnection(FakeTransport(client, self.reactor)) + + self.pump(0.1) + + self.assertEqual(len(server.requests), 1) + request = server.requests[0] + content = request.content.read() + self.assertEqual(content, b'{"a":"b"}') diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 089cecfbee..9e9fbbfe93 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -15,8 +15,6 @@ from mock import Mock, NonCallableMock -import attr - from synapse.replication.tcp.client import ( ReplicationClientFactory, ReplicationClientHandler, @@ -24,6 +22,7 @@ from synapse.replication.tcp.client import ( from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from tests import unittest +from tests.server import FakeTransport class BaseSlavedStoreTestCase(unittest.HomeserverTestCase): @@ -56,36 +55,8 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase): server = server_factory.buildProtocol(None) client = client_factory.buildProtocol(None) - @attr.s - class FakeTransport(object): - - other = attr.ib() - disconnecting = False - buffer = attr.ib(default=b'') - - def registerProducer(self, producer, streaming): - - self.producer = producer - - def _produce(): - self.producer.resumeProducing() - reactor.callLater(0.1, _produce) - - reactor.callLater(0.0, _produce) - - def write(self, byt): - self.buffer = self.buffer + byt - - if getattr(self.other, "transport") is not None: - self.other.dataReceived(self.buffer) - self.buffer = b"" - - def writeSequence(self, seq): - for x in seq: - self.write(x) - - client.makeConnection(FakeTransport(server)) - server.makeConnection(FakeTransport(client)) + client.makeConnection(FakeTransport(server, reactor)) + server.makeConnection(FakeTransport(client, reactor)) def replicate(self): """Tell the master side of replication that something has happened, and then diff --git a/tests/server.py b/tests/server.py index 420ec4e088..7bee58dff1 100644 --- a/tests/server.py +++ b/tests/server.py @@ -98,7 +98,7 @@ class FakeSite: return FakeLogger() -def make_request(method, path, content=b"", access_token=None): +def make_request(method, path, content=b"", access_token=None, request=SynapseRequest): """ Make a web request using the given method and path, feed it the content, and return the Request and the Channel underneath. @@ -120,14 +120,16 @@ def make_request(method, path, content=b"", access_token=None): site = FakeSite() channel = FakeChannel() - req = SynapseRequest(site, channel) + req = request(site, channel) req.process = lambda: b"" req.content = BytesIO(content) if access_token: req.requestHeaders.addRawHeader(b"Authorization", b"Bearer " + access_token) - req.requestHeaders.addRawHeader(b"X-Forwarded-For", b"127.0.0.1") + if content: + req.requestHeaders.addRawHeader(b"Content-Type", b"application/json") + req.requestReceived(method, path, b"1.1") return req, channel @@ -280,3 +282,84 @@ def get_clock(): clock = ThreadedMemoryReactorClock() hs_clock = Clock(clock) return (clock, hs_clock) + + +@attr.s +class FakeTransport(object): + """ + A twisted.internet.interfaces.ITransport implementation which sends all its data + straight into an IProtocol object: it exists to connect two IProtocols together. + + To use it, instantiate it with the receiving IProtocol, and then pass it to the + sending IProtocol's makeConnection method: + + server = HTTPChannel() + client.makeConnection(FakeTransport(server, self.reactor)) + + If you want bidirectional communication, you'll need two instances. + """ + + other = attr.ib() + """The Protocol object which will receive any data written to this transport. + + :type: twisted.internet.interfaces.IProtocol + """ + + _reactor = attr.ib() + """Test reactor + + :type: twisted.internet.interfaces.IReactorTime + """ + + disconnecting = False + buffer = attr.ib(default=b'') + producer = attr.ib(default=None) + + def getPeer(self): + return None + + def getHost(self): + return None + + def loseConnection(self): + self.disconnecting = True + + def abortConnection(self): + self.disconnecting = True + + def pauseProducing(self): + self.producer.pauseProducing() + + def unregisterProducer(self): + if not self.producer: + return + + self.producer = None + + def registerProducer(self, producer, streaming): + self.producer = producer + self.producerStreaming = streaming + + def _produce(): + d = self.producer.resumeProducing() + d.addCallback(lambda x: self._reactor.callLater(0.1, _produce)) + + if not streaming: + self._reactor.callLater(0.0, _produce) + + def write(self, byt): + self.buffer = self.buffer + byt + + def _write(): + if getattr(self.other, "transport") is not None: + self.other.dataReceived(self.buffer) + self.buffer = b"" + return + + self._reactor.callLater(0.0, _write) + + _write() + + def writeSequence(self, seq): + for x in seq: + self.write(x) diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py index c9b02a062b..2ffbb9f14f 100644 --- a/tests/storage/test_client_ips.py +++ b/tests/storage/test_client_ips.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,35 +13,45 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import hashlib +import hmac +import json + from mock import Mock from twisted.internet import defer -import tests.unittest -import tests.utils +from synapse.http.site import XForwardedForRequest +from synapse.rest.client.v1 import admin, login + +from tests import unittest -class ClientIpStoreTestCase(tests.unittest.TestCase): - def __init__(self, *args, **kwargs): - super(ClientIpStoreTestCase, self).__init__(*args, **kwargs) - self.store = None # type: synapse.storage.DataStore - self.clock = None # type: tests.utils.MockClock +class ClientIpStoreTestCase(unittest.HomeserverTestCase): + def make_homeserver(self, reactor, clock): + hs = self.setup_test_homeserver() + return hs - @defer.inlineCallbacks - def setUp(self): - self.hs = yield tests.utils.setup_test_homeserver(self.addCleanup) + def prepare(self, hs, reactor, clock): self.store = self.hs.get_datastore() - self.clock = self.hs.get_clock() - @defer.inlineCallbacks def test_insert_new_client_ip(self): - self.clock.now = 12345678 + self.reactor.advance(12345678) + user_id = "@user:id" - yield self.store.insert_client_ip( - user_id, "access_token", "ip", "user_agent", "device_id" + self.get_success( + self.store.insert_client_ip( + user_id, "access_token", "ip", "user_agent", "device_id" + ) ) - result = yield self.store.get_last_client_ip_by_device(user_id, "device_id") + # Trigger the storage loop + self.reactor.advance(10) + + result = self.get_success( + self.store.get_last_client_ip_by_device(user_id, "device_id") + ) r = result[(user_id, "device_id")] self.assertDictContainsSubset( @@ -55,18 +66,18 @@ class ClientIpStoreTestCase(tests.unittest.TestCase): r, ) - @defer.inlineCallbacks def test_disabled_monthly_active_user(self): self.hs.config.limit_usage_by_mau = False self.hs.config.max_mau_value = 50 user_id = "@user:server" - yield self.store.insert_client_ip( - user_id, "access_token", "ip", "user_agent", "device_id" + self.get_success( + self.store.insert_client_ip( + user_id, "access_token", "ip", "user_agent", "device_id" + ) ) - active = yield self.store.user_last_seen_monthly_active(user_id) + active = self.get_success(self.store.user_last_seen_monthly_active(user_id)) self.assertFalse(active) - @defer.inlineCallbacks def test_adding_monthly_active_user_when_full(self): self.hs.config.limit_usage_by_mau = True self.hs.config.max_mau_value = 50 @@ -76,38 +87,159 @@ class ClientIpStoreTestCase(tests.unittest.TestCase): self.store.get_monthly_active_count = Mock( return_value=defer.succeed(lots_of_users) ) - yield self.store.insert_client_ip( - user_id, "access_token", "ip", "user_agent", "device_id" + self.get_success( + self.store.insert_client_ip( + user_id, "access_token", "ip", "user_agent", "device_id" + ) ) - active = yield self.store.user_last_seen_monthly_active(user_id) + active = self.get_success(self.store.user_last_seen_monthly_active(user_id)) self.assertFalse(active) - @defer.inlineCallbacks def test_adding_monthly_active_user_when_space(self): self.hs.config.limit_usage_by_mau = True self.hs.config.max_mau_value = 50 user_id = "@user:server" - active = yield self.store.user_last_seen_monthly_active(user_id) + active = self.get_success(self.store.user_last_seen_monthly_active(user_id)) self.assertFalse(active) - yield self.store.insert_client_ip( - user_id, "access_token", "ip", "user_agent", "device_id" + # Trigger the saving loop + self.reactor.advance(10) + + self.get_success( + self.store.insert_client_ip( + user_id, "access_token", "ip", "user_agent", "device_id" + ) ) - active = yield self.store.user_last_seen_monthly_active(user_id) + active = self.get_success(self.store.user_last_seen_monthly_active(user_id)) self.assertTrue(active) - @defer.inlineCallbacks def test_updating_monthly_active_user_when_space(self): self.hs.config.limit_usage_by_mau = True self.hs.config.max_mau_value = 50 user_id = "@user:server" - yield self.store.register(user_id=user_id, token="123", password_hash=None) + self.get_success( + self.store.register(user_id=user_id, token="123", password_hash=None) + ) - active = yield self.store.user_last_seen_monthly_active(user_id) + active = self.get_success(self.store.user_last_seen_monthly_active(user_id)) self.assertFalse(active) - yield self.store.insert_client_ip( - user_id, "access_token", "ip", "user_agent", "device_id" + # Trigger the saving loop + self.reactor.advance(10) + + self.get_success( + self.store.insert_client_ip( + user_id, "access_token", "ip", "user_agent", "device_id" + ) ) - active = yield self.store.user_last_seen_monthly_active(user_id) + active = self.get_success(self.store.user_last_seen_monthly_active(user_id)) self.assertTrue(active) + + +class ClientIpAuthTestCase(unittest.HomeserverTestCase): + + servlets = [admin.register_servlets, login.register_servlets] + + def make_homeserver(self, reactor, clock): + hs = self.setup_test_homeserver() + return hs + + def prepare(self, hs, reactor, clock): + self.hs.config.registration_shared_secret = u"shared" + self.store = self.hs.get_datastore() + + # Create the user + request, channel = self.make_request("GET", "/_matrix/client/r0/admin/register") + self.render(request) + nonce = channel.json_body["nonce"] + + want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1) + want_mac.update(nonce.encode('ascii') + b"\x00bob\x00abc123\x00admin") + want_mac = want_mac.hexdigest() + + body = json.dumps( + { + "nonce": nonce, + "username": "bob", + "password": "abc123", + "admin": True, + "mac": want_mac, + } + ) + request, channel = self.make_request( + "POST", "/_matrix/client/r0/admin/register", body.encode('utf8') + ) + self.render(request) + + self.assertEqual(channel.code, 200) + self.user_id = channel.json_body["user_id"] + + def test_request_with_xforwarded(self): + """ + The IP in X-Forwarded-For is entered into the client IPs table. + """ + self._runtest( + {b"X-Forwarded-For": b"127.9.0.1"}, + "127.9.0.1", + {"request": XForwardedForRequest}, + ) + + def test_request_from_getPeer(self): + """ + The IP returned by getPeer is entered into the client IPs table, if + there's no X-Forwarded-For header. + """ + self._runtest({}, "127.0.0.1", {}) + + def _runtest(self, headers, expected_ip, make_request_args): + device_id = "bleb" + + body = json.dumps( + { + "type": "m.login.password", + "user": "bob", + "password": "abc123", + "device_id": device_id, + } + ) + request, channel = self.make_request( + "POST", "/_matrix/client/r0/login", body.encode('utf8'), **make_request_args + ) + self.render(request) + self.assertEqual(channel.code, 200) + access_token = channel.json_body["access_token"].encode('ascii') + + # Advance to a known time + self.reactor.advance(123456 - self.reactor.seconds()) + + request, channel = self.make_request( + "GET", + "/_matrix/client/r0/admin/users/" + self.user_id, + body.encode('utf8'), + access_token=access_token, + **make_request_args + ) + request.requestHeaders.addRawHeader(b"User-Agent", b"Mozzila pizza") + + # Add the optional headers + for h, v in headers.items(): + request.requestHeaders.addRawHeader(h, v) + self.render(request) + + # Advance so the save loop occurs + self.reactor.advance(100) + + result = self.get_success( + self.store.get_last_client_ip_by_device(self.user_id, device_id) + ) + r = result[(self.user_id, device_id)] + self.assertDictContainsSubset( + { + "user_id": self.user_id, + "device_id": device_id, + "ip": expected_ip, + "user_agent": "Mozzila pizza", + "last_seen": 123456100, + }, + r, + ) diff --git a/tests/test_server.py b/tests/test_server.py index ef74544e93..4045fdadc3 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -1,14 +1,35 @@ +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging import re +from six import StringIO + from twisted.internet.defer import Deferred -from twisted.test.proto_helpers import MemoryReactorClock +from twisted.python.failure import Failure +from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource +from synapse.http.site import SynapseSite, logger from synapse.util import Clock from tests import unittest -from tests.server import make_request, render, setup_test_homeserver +from tests.server import FakeTransport, make_request, render, setup_test_homeserver class JsonResourceTests(unittest.TestCase): @@ -121,3 +142,52 @@ class JsonResourceTests(unittest.TestCase): self.assertEqual(channel.result["code"], b'400') self.assertEqual(channel.json_body["error"], "Unrecognized request") self.assertEqual(channel.json_body["errcode"], "M_UNRECOGNIZED") + + +class SiteTestCase(unittest.HomeserverTestCase): + def test_lose_connection(self): + """ + We log the URI correctly redacted when we lose the connection. + """ + + class HangingResource(Resource): + """ + A Resource that strategically hangs, as if it were processing an + answer. + """ + + def render(self, request): + return NOT_DONE_YET + + # Set up a logging handler that we can inspect afterwards + output = StringIO() + handler = logging.StreamHandler(output) + logger.addHandler(handler) + old_level = logger.level + logger.setLevel(10) + self.addCleanup(logger.setLevel, old_level) + self.addCleanup(logger.removeHandler, handler) + + # Make a resource and a Site, the resource will hang and allow us to + # time out the request while it's 'processing' + base_resource = Resource() + base_resource.putChild(b'', HangingResource()) + site = SynapseSite("test", "site_tag", {}, base_resource, "1.0") + + server = site.buildProtocol(None) + client = AccumulatingProtocol() + client.makeConnection(FakeTransport(server, self.reactor)) + server.makeConnection(FakeTransport(client, self.reactor)) + + # Send a request with an access token that will get redacted + server.dataReceived(b"GET /?access_token=bar HTTP/1.0\r\n\r\n") + self.pump() + + # Lose the connection + e = Failure(Exception("Failed123")) + server.connectionLost(e) + handler.flush() + + # Our access token is redacted and the failure reason is logged. + self.assertIn("/?access_token=<redacted>", output.getvalue()) + self.assertIn("Failed123", output.getvalue()) diff --git a/tests/unittest.py b/tests/unittest.py index a3d39920db..ef905e6389 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -26,6 +26,7 @@ from twisted.internet.defer import Deferred from twisted.trial import unittest from synapse.http.server import JsonResource +from synapse.http.site import SynapseRequest from synapse.server import HomeServer from synapse.types import UserID, create_requester from synapse.util.logcontext import LoggingContextFilter @@ -219,7 +220,8 @@ class HomeserverTestCase(TestCase): Function to be overridden in subclasses. """ - raise NotImplementedError() + hs = self.setup_test_homeserver() + return hs def prepare(self, reactor, clock, homeserver): """ @@ -236,7 +238,9 @@ class HomeserverTestCase(TestCase): Function to optionally be overridden in subclasses. """ - def make_request(self, method, path, content=b""): + def make_request( + self, method, path, content=b"", access_token=None, request=SynapseRequest + ): """ Create a SynapseRequest at the path using the method and containing the given content. @@ -254,7 +258,7 @@ class HomeserverTestCase(TestCase): if isinstance(content, dict): content = json.dumps(content).encode('utf8') - return make_request(method, path, content) + return make_request(method, path, content, access_token, request) def render(self, request): """ diff --git a/tests/utils.py b/tests/utils.py index 215226debf..aaed1149c3 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -16,7 +16,9 @@ import atexit import hashlib import os +import time import uuid +import warnings from inspect import getcallargs from mock import Mock, patch @@ -237,20 +239,41 @@ def setup_test_homeserver( else: # We need to do cleanup on PostgreSQL def cleanup(): + import psycopg2 + # Close all the db pools hs.get_db_pool().close() + dropped = False + # Drop the test database db_conn = db_engine.module.connect( database=POSTGRES_BASE_DB, user=POSTGRES_USER ) db_conn.autocommit = True cur = db_conn.cursor() - cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,)) - db_conn.commit() + + # Try a few times to drop the DB. Some things may hold on to the + # database for a few more seconds due to flakiness, preventing + # us from dropping it when the test is over. If we can't drop + # it, warn and move on. + for x in range(5): + try: + cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,)) + db_conn.commit() + dropped = True + except psycopg2.OperationalError as e: + warnings.warn( + "Couldn't drop old db: " + str(e), category=UserWarning + ) + time.sleep(0.5) + cur.close() db_conn.close() + if not dropped: + warnings.warn("Failed to drop old DB.", category=UserWarning) + if not LEAVE_DB: # Register the cleanup hook cleanup_func(cleanup) diff --git a/tox.ini b/tox.ini index 80ac9324df..e4db563b4b 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ envlist = packaging, py27, py36, pep8, check_isort [base] deps = coverage - Twisted>=15.1 + Twisted>=17.1 mock python-subunit junitxml @@ -70,6 +70,16 @@ usedevelop=true [testenv:py36] usedevelop=true +[testenv:py36-postgres] +usedevelop=true +deps = + {[base]deps} + psycopg2 +setenv = + {[base]setenv} + SYNAPSE_POSTGRES = 1 + + [testenv:packaging] deps = check-manifest |