diff options
author | Erik Johnston <erik@matrix.org> | 2016-08-05 11:15:48 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-08-05 11:15:48 +0100 |
commit | 499dc1b349988e23ab26f829b2005f617134e6c0 (patch) | |
tree | 866dec5ab910b42fd22b5826721c0ec7f8ad7cff | |
parent | Bump version and changelog (diff) | |
parent | Merge pull request #986 from matrix-org/erikj/state (diff) | |
download | synapse-499dc1b349988e23ab26f829b2005f617134e6c0.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into release-v0.17.0
Diffstat (limited to '')
25 files changed, 1070 insertions, 369 deletions
diff --git a/MANIFEST.in b/MANIFEST.in index 216df265b5..981698143f 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -24,5 +24,7 @@ recursive-include synapse/static *.js exclude jenkins.sh exclude jenkins*.sh +exclude jenkins* +recursive-exclude jenkins *.sh prune demo/etc diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh index f715cd559a..68912a8967 100755 --- a/jenkins-dendron-postgres.sh +++ b/jenkins-dendron-postgres.sh @@ -4,85 +4,19 @@ set -eux : ${WORKSPACE:="$(pwd)"} +export WORKSPACE export PYTHONDONTWRITEBYTECODE=yep export SYNAPSE_CACHE_FACTOR=1 -# Output test results as junit xml -export TRIAL_FLAGS="--reporter=subunit" -export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml" -# Write coverage reports to a separate file for each process -export COVERAGE_OPTS="-p" -export DUMP_COVERAGE_COMMAND="coverage help" - -# Output flake8 violations to violations.flake8.log -# Don't exit with non-0 status code on Jenkins, -# so that the build steps continue and a later step can decided whether to -# UNSTABLE or FAILURE this build. -export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?" - -rm .coverage* || echo "No coverage files to remove" - -tox --notest -e py27 - -TOX_BIN=$WORKSPACE/.tox/py27/bin -python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install -$TOX_BIN/pip install psycopg2 -$TOX_BIN/pip install lxml - -: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"} - -if [[ ! -e .dendron-base ]]; then - git clone https://github.com/matrix-org/dendron.git .dendron-base --mirror -else - (cd .dendron-base; git fetch -p) -fi - -rm -rf dendron -git clone .dendron-base dendron --shared -cd dendron - -: ${GOPATH:=${WORKSPACE}/.gopath} -if [[ "${GOPATH}" != *:* ]]; then - mkdir -p "${GOPATH}" - export PATH="${GOPATH}/bin:${PATH}" -fi -export GOPATH - -git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) - -go get github.com/constabulary/gb/... -gb generate -gb build - -cd .. - - -if [[ ! -e .sytest-base ]]; then - git clone https://github.com/matrix-org/sytest.git .sytest-base --mirror -else - (cd .sytest-base; git fetch -p) -fi - -rm -rf sytest -git clone .sytest-base sytest --shared -cd sytest - -git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) - -: ${PORT_BASE:=20000} -: ${PORT_COUNT=100} - -./jenkins/prep_sytest_for_postgres.sh - -mkdir -p var - -echo >&2 "Running sytest with PostgreSQL"; -./jenkins/install_and_run.sh --python $TOX_BIN/python \ - --synapse-directory $WORKSPACE \ - --dendron $WORKSPACE/dendron/bin/dendron \ - --pusher \ - --synchrotron \ - --federation-reader \ - --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1)) - -cd .. +./jenkins/prepare_synapse.sh +./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git +./jenkins/clone.sh dendron https://github.com/matrix-org/dendron.git +./dendron/jenkins/build_dendron.sh +./sytest/jenkins/prep_sytest_for_postgres.sh + +./sytest/jenkins/install_and_run.sh \ + --synapse-directory $WORKSPACE \ + --dendron $WORKSPACE/dendron/bin/dendron \ + --pusher \ + --synchrotron \ + --federation-reader \ diff --git a/jenkins-postgres.sh b/jenkins-postgres.sh index 7a43df0d58..f2ca8ccdff 100755 --- a/jenkins-postgres.sh +++ b/jenkins-postgres.sh @@ -4,61 +4,14 @@ set -eux : ${WORKSPACE:="$(pwd)"} +export WORKSPACE export PYTHONDONTWRITEBYTECODE=yep export SYNAPSE_CACHE_FACTOR=1 -# Output test results as junit xml -export TRIAL_FLAGS="--reporter=subunit" -export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml" -# Write coverage reports to a separate file for each process -export COVERAGE_OPTS="-p" -export DUMP_COVERAGE_COMMAND="coverage help" +./jenkins/prepare_synapse.sh +./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git -# Output flake8 violations to violations.flake8.log -# Don't exit with non-0 status code on Jenkins, -# so that the build steps continue and a later step can decided whether to -# UNSTABLE or FAILURE this build. -export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?" +./sytest/jenkins/prep_sytest_for_postgres.sh -rm .coverage* || echo "No coverage files to remove" - -tox --notest -e py27 - -TOX_BIN=$WORKSPACE/.tox/py27/bin -python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install -$TOX_BIN/pip install psycopg2 -$TOX_BIN/pip install lxml - -: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"} - -if [[ ! -e .sytest-base ]]; then - git clone https://github.com/matrix-org/sytest.git .sytest-base --mirror -else - (cd .sytest-base; git fetch -p) -fi - -rm -rf sytest -git clone .sytest-base sytest --shared -cd sytest - -git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) - -: ${PORT_BASE:=20000} -: ${PORT_COUNT=100} - -./jenkins/prep_sytest_for_postgres.sh - -echo >&2 "Running sytest with PostgreSQL"; -./jenkins/install_and_run.sh --coverage \ - --python $TOX_BIN/python \ - --synapse-directory $WORKSPACE \ - --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1)) \ - -cd .. -cp sytest/.coverage.* . - -# Combine the coverage reports -echo "Combining:" .coverage.* -$TOX_BIN/python -m coverage combine -# Output coverage to coverage.xml -$TOX_BIN/coverage xml -o coverage.xml +./sytest/jenkins/install_and_run.sh \ + --synapse-directory $WORKSPACE \ diff --git a/jenkins-sqlite.sh b/jenkins-sqlite.sh index 27e61af6ee..84613d979c 100755 --- a/jenkins-sqlite.sh +++ b/jenkins-sqlite.sh @@ -4,56 +4,12 @@ set -eux : ${WORKSPACE:="$(pwd)"} +export WORKSPACE export PYTHONDONTWRITEBYTECODE=yep export SYNAPSE_CACHE_FACTOR=1 -# Output test results as junit xml -export TRIAL_FLAGS="--reporter=subunit" -export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml" -# Write coverage reports to a separate file for each process -export COVERAGE_OPTS="-p" -export DUMP_COVERAGE_COMMAND="coverage help" +./jenkins/prepare_synapse.sh +./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git -# Output flake8 violations to violations.flake8.log -# Don't exit with non-0 status code on Jenkins, -# so that the build steps continue and a later step can decided whether to -# UNSTABLE or FAILURE this build. -export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?" - -rm .coverage* || echo "No coverage files to remove" - -tox --notest -e py27 -TOX_BIN=$WORKSPACE/.tox/py27/bin -python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install -$TOX_BIN/pip install lxml - -: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"} - -if [[ ! -e .sytest-base ]]; then - git clone https://github.com/matrix-org/sytest.git .sytest-base --mirror -else - (cd .sytest-base; git fetch -p) -fi - -rm -rf sytest -git clone .sytest-base sytest --shared -cd sytest - -git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) - -: ${PORT_BASE:=20000} -: ${PORT_COUNT=100} - -./jenkins/install_and_run.sh --coverage \ - --python $TOX_BIN/python \ - --synapse-directory $WORKSPACE \ - --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1)) \ - -cd .. -cp sytest/.coverage.* . - -# Combine the coverage reports -echo "Combining:" .coverage.* -$TOX_BIN/python -m coverage combine -# Output coverage to coverage.xml -$TOX_BIN/coverage xml -o coverage.xml +./sytest/jenkins/install_and_run.sh \ + --synapse-directory $WORKSPACE \ diff --git a/jenkins/clone.sh b/jenkins/clone.sh new file mode 100755 index 0000000000..ab30ac7782 --- /dev/null +++ b/jenkins/clone.sh @@ -0,0 +1,44 @@ +#! /bin/bash + +# This clones a project from github into a named subdirectory +# If the project has a branch with the same name as this branch +# then it will checkout that branch after cloning. +# Otherwise it will checkout "origin/develop." +# The first argument is the name of the directory to checkout +# the branch into. +# The second argument is the URL of the remote repository to checkout. +# Usually something like https://github.com/matrix-org/sytest.git + +set -eux + +NAME=$1 +PROJECT=$2 +BASE=".$NAME-base" + +# Update our mirror. +if [ ! -d ".$NAME-base" ]; then + # Create a local mirror of the source repository. + # This saves us from having to download the entire repository + # when this script is next run. + git clone "$PROJECT" "$BASE" --mirror +else + # Fetch any updates from the source repository. + (cd "$BASE"; git fetch -p) +fi + +# Remove the existing repository so that we have a clean copy +rm -rf "$NAME" +# Cloning with --shared means that we will share portions of the +# .git directory with our local mirror. +git clone "$BASE" "$NAME" --shared + +# Jenkins may have supplied us with the name of the branch in the +# environment. Otherwise we will have to guess based on the current +# commit. +: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"} +cd "$NAME" +# check out the relevant branch +git checkout "${GIT_BRANCH}" || ( + echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" + git checkout "origin/develop" +) diff --git a/jenkins/prepare_synapse.sh b/jenkins/prepare_synapse.sh new file mode 100755 index 0000000000..237223c81b --- /dev/null +++ b/jenkins/prepare_synapse.sh @@ -0,0 +1,19 @@ +#! /bin/bash + +cd "`dirname $0`/.." + +TOX_DIR=$WORKSPACE/.tox + +mkdir -p $TOX_DIR + +if ! [ $TOX_DIR -ef .tox ]; then + ln -s "$TOX_DIR" .tox +fi + +# set up the virtualenv +tox -e py27 --notest -v + +TOX_BIN=$TOX_DIR/py27/bin +python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install +$TOX_BIN/pip install lxml +$TOX_BIN/pip install psycopg2 diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py index caa3cee4e7..59c3dce3d7 100644 --- a/scripts-dev/federation_client.py +++ b/scripts-dev/federation_client.py @@ -128,6 +128,7 @@ def get_json(origin_name, origin_key, destination, path): headers={"Authorization": authorization_headers[0]}, verify=False, ) + sys.stderr.write("Status Code: %d\n" % (result.status_code,)) return result.json() diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index efd04da2d6..66c61b0198 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -34,7 +34,7 @@ logger = logging.getLogger("synapse_port_db") BOOLEAN_COLUMNS = { - "events": ["processed", "outlier"], + "events": ["processed", "outlier", "contains_url"], "rooms": ["is_public"], "event_edges": ["is_state"], "presence_list": ["accepted"], @@ -92,8 +92,12 @@ class Store(object): _simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"] _simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"] + _simple_select_one = SQLBaseStore.__dict__["_simple_select_one"] + _simple_select_one_txn = SQLBaseStore.__dict__["_simple_select_one_txn"] _simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"] - _simple_select_one_onecol_txn = SQLBaseStore.__dict__["_simple_select_one_onecol_txn"] + _simple_select_one_onecol_txn = SQLBaseStore.__dict__[ + "_simple_select_one_onecol_txn" + ] _simple_update_one = SQLBaseStore.__dict__["_simple_update_one"] _simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"] @@ -158,31 +162,40 @@ class Porter(object): def setup_table(self, table): if table in APPEND_ONLY_TABLES: # It's safe to just carry on inserting. - next_chunk = yield self.postgres_store._simple_select_one_onecol( + row = yield self.postgres_store._simple_select_one( table="port_from_sqlite3", keyvalues={"table_name": table}, - retcol="rowid", + retcols=("forward_rowid", "backward_rowid"), allow_none=True, ) total_to_port = None - if next_chunk is None: + if row is None: if table == "sent_transactions": - next_chunk, already_ported, total_to_port = ( + forward_chunk, already_ported, total_to_port = ( yield self._setup_sent_transactions() ) + backward_chunk = 0 else: yield self.postgres_store._simple_insert( table="port_from_sqlite3", - values={"table_name": table, "rowid": 1} + values={ + "table_name": table, + "forward_rowid": 1, + "backward_rowid": 0, + } ) - next_chunk = 1 + forward_chunk = 1 + backward_chunk = 0 already_ported = 0 + else: + forward_chunk = row["forward_rowid"] + backward_chunk = row["backward_rowid"] if total_to_port is None: already_ported, total_to_port = yield self._get_total_count_to_port( - table, next_chunk + table, forward_chunk, backward_chunk ) else: def delete_all(txn): @@ -196,46 +209,85 @@ class Porter(object): yield self.postgres_store._simple_insert( table="port_from_sqlite3", - values={"table_name": table, "rowid": 0} + values={ + "table_name": table, + "forward_rowid": 1, + "backward_rowid": 0, + } ) - next_chunk = 1 + forward_chunk = 1 + backward_chunk = 0 already_ported, total_to_port = yield self._get_total_count_to_port( - table, next_chunk + table, forward_chunk, backward_chunk ) - defer.returnValue((table, already_ported, total_to_port, next_chunk)) + defer.returnValue( + (table, already_ported, total_to_port, forward_chunk, backward_chunk) + ) @defer.inlineCallbacks - def handle_table(self, table, postgres_size, table_size, next_chunk): + def handle_table(self, table, postgres_size, table_size, forward_chunk, + backward_chunk): if not table_size: return self.progress.add_table(table, postgres_size, table_size) if table == "event_search": - yield self.handle_search_table(postgres_size, table_size, next_chunk) + yield self.handle_search_table( + postgres_size, table_size, forward_chunk, backward_chunk + ) return - select = ( + forward_select = ( "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,) ) + backward_select = ( + "SELECT rowid, * FROM %s WHERE rowid <= ? ORDER BY rowid LIMIT ?" + % (table,) + ) + + do_forward = [True] + do_backward = [True] + while True: def r(txn): - txn.execute(select, (next_chunk, self.batch_size,)) - rows = txn.fetchall() - headers = [column[0] for column in txn.description] + forward_rows = [] + backward_rows = [] + if do_forward[0]: + txn.execute(forward_select, (forward_chunk, self.batch_size,)) + forward_rows = txn.fetchall() + if not forward_rows: + do_forward[0] = False + + if do_backward[0]: + txn.execute(backward_select, (backward_chunk, self.batch_size,)) + backward_rows = txn.fetchall() + if not backward_rows: + do_backward[0] = False + + if forward_rows or backward_rows: + headers = [column[0] for column in txn.description] + else: + headers = None - return headers, rows + return headers, forward_rows, backward_rows - headers, rows = yield self.sqlite_store.runInteraction("select", r) + headers, frows, brows = yield self.sqlite_store.runInteraction( + "select", r + ) - if rows: - next_chunk = rows[-1][0] + 1 + if frows or brows: + if frows: + forward_chunk = max(row[0] for row in frows) + 1 + if brows: + backward_chunk = min(row[0] for row in brows) - 1 + rows = frows + brows self._convert_rows(table, headers, rows) def insert(txn): @@ -247,7 +299,10 @@ class Porter(object): txn, table="port_from_sqlite3", keyvalues={"table_name": table}, - updatevalues={"rowid": next_chunk}, + updatevalues={ + "forward_rowid": forward_chunk, + "backward_rowid": backward_chunk, + }, ) yield self.postgres_store.execute(insert) @@ -259,7 +314,8 @@ class Porter(object): return @defer.inlineCallbacks - def handle_search_table(self, postgres_size, table_size, next_chunk): + def handle_search_table(self, postgres_size, table_size, forward_chunk, + backward_chunk): select = ( "SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering" " FROM event_search as es" @@ -270,7 +326,7 @@ class Porter(object): while True: def r(txn): - txn.execute(select, (next_chunk, self.batch_size,)) + txn.execute(select, (forward_chunk, self.batch_size,)) rows = txn.fetchall() headers = [column[0] for column in txn.description] @@ -279,7 +335,7 @@ class Porter(object): headers, rows = yield self.sqlite_store.runInteraction("select", r) if rows: - next_chunk = rows[-1][0] + 1 + forward_chunk = rows[-1][0] + 1 # We have to treat event_search differently since it has a # different structure in the two different databases. @@ -312,7 +368,10 @@ class Porter(object): txn, table="port_from_sqlite3", keyvalues={"table_name": "event_search"}, - updatevalues={"rowid": next_chunk}, + updatevalues={ + "forward_rowid": forward_chunk, + "backward_rowid": backward_chunk, + }, ) yield self.postgres_store.execute(insert) @@ -324,7 +383,6 @@ class Porter(object): else: return - def setup_db(self, db_config, database_engine): db_conn = database_engine.module.connect( **{ @@ -395,10 +453,32 @@ class Porter(object): txn.execute( "CREATE TABLE port_from_sqlite3 (" " table_name varchar(100) NOT NULL UNIQUE," - " rowid bigint NOT NULL" + " forward_rowid bigint NOT NULL," + " backward_rowid bigint NOT NULL" ")" ) + # The old port script created a table with just a "rowid" column. + # We want people to be able to rerun this script from an old port + # so that they can pick up any missing events that were not + # ported across. + def alter_table(txn): + txn.execute( + "ALTER TABLE IF EXISTS port_from_sqlite3" + " RENAME rowid TO forward_rowid" + ) + txn.execute( + "ALTER TABLE IF EXISTS port_from_sqlite3" + " ADD backward_rowid bigint NOT NULL DEFAULT 0" + ) + + try: + yield self.postgres_store.runInteraction( + "alter_table", alter_table + ) + except Exception as e: + logger.info("Failed to create port table: %s", e) + try: yield self.postgres_store.runInteraction( "create_port_table", create_port_table @@ -458,7 +538,7 @@ class Porter(object): @defer.inlineCallbacks def _setup_sent_transactions(self): # Only save things from the last day - yesterday = int(time.time()*1000) - 86400000 + yesterday = int(time.time() * 1000) - 86400000 # And save the max transaction id from each destination select = ( @@ -514,7 +594,11 @@ class Porter(object): yield self.postgres_store._simple_insert( table="port_from_sqlite3", - values={"table_name": "sent_transactions", "rowid": next_chunk} + values={ + "table_name": "sent_transactions", + "forward_rowid": next_chunk, + "backward_rowid": 0, + } ) def get_sent_table_size(txn): @@ -535,13 +619,18 @@ class Porter(object): defer.returnValue((next_chunk, inserted_rows, total_count)) @defer.inlineCallbacks - def _get_remaining_count_to_port(self, table, next_chunk): - rows = yield self.sqlite_store.execute_sql( + def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk): + frows = yield self.sqlite_store.execute_sql( "SELECT count(*) FROM %s WHERE rowid >= ?" % (table,), - next_chunk, + forward_chunk, ) - defer.returnValue(rows[0][0]) + brows = yield self.sqlite_store.execute_sql( + "SELECT count(*) FROM %s WHERE rowid <= ?" % (table,), + backward_chunk, + ) + + defer.returnValue(frows[0][0] + brows[0][0]) @defer.inlineCallbacks def _get_already_ported_count(self, table): @@ -552,10 +641,10 @@ class Porter(object): defer.returnValue(rows[0][0]) @defer.inlineCallbacks - def _get_total_count_to_port(self, table, next_chunk): + def _get_total_count_to_port(self, table, forward_chunk, backward_chunk): remaining, done = yield defer.gatherResults( [ - self._get_remaining_count_to_port(table, next_chunk), + self._get_remaining_count_to_port(table, forward_chunk, backward_chunk), self._get_already_ported_count(table), ], consumeErrors=True, @@ -686,7 +775,7 @@ class CursesProgress(Progress): color = curses.color_pair(2) if perc == 100 else curses.color_pair(1) self.stdscr.addstr( - i+2, left_margin + max_len - len(table), + i + 2, left_margin + max_len - len(table), table, curses.A_BOLD | color, ) @@ -694,18 +783,18 @@ class CursesProgress(Progress): size = 20 progress = "[%s%s]" % ( - "#" * int(perc*size/100), - " " * (size - int(perc*size/100)), + "#" * int(perc * size / 100), + " " * (size - int(perc * size / 100)), ) self.stdscr.addstr( - i+2, left_margin + max_len + middle_space, + i + 2, left_margin + max_len + middle_space, "%s %3d%% (%d/%d)" % (progress, perc, data["num_done"], data["total"]), ) if self.finished: self.stdscr.addstr( - rows-1, 0, + rows - 1, 0, "Press any key to exit...", ) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b06387051c..65778fd4ee 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -314,6 +314,40 @@ class FederationClient(FederationBase): Deferred: Results in a list of PDUs. """ + try: + # First we try and ask for just the IDs, as thats far quicker if + # we have most of the state and auth_chain already. + # However, this may 404 if the other side has an old synapse. + result = yield self.transport_layer.get_room_state_ids( + destination, room_id, event_id=event_id, + ) + + state_event_ids = result["pdu_ids"] + auth_event_ids = result.get("auth_chain_ids", []) + + fetched_events, failed_to_fetch = yield self.get_events( + [destination], room_id, set(state_event_ids + auth_event_ids) + ) + + if failed_to_fetch: + logger.warn("Failed to get %r", failed_to_fetch) + + event_map = { + ev.event_id: ev for ev in fetched_events + } + + pdus = [event_map[e_id] for e_id in state_event_ids] + auth_chain = [event_map[e_id] for e_id in auth_event_ids] + + auth_chain.sort(key=lambda e: e.depth) + + defer.returnValue((pdus, auth_chain)) + except HttpResponseException as e: + if e.code == 400 or e.code == 404: + logger.info("Failed to use get_room_state_ids API, falling back") + else: + raise e + result = yield self.transport_layer.get_room_state( destination, room_id, event_id=event_id, ) @@ -327,12 +361,26 @@ class FederationClient(FederationBase): for p in result.get("auth_chain", []) ] + seen_events = yield self.store.get_events([ + ev.event_id for ev in itertools.chain(pdus, auth_chain) + ]) + signed_pdus = yield self._check_sigs_and_hash_and_fetch( - destination, pdus, outlier=True + destination, + [p for p in pdus if p.event_id not in seen_events], + outlier=True + ) + signed_pdus.extend( + seen_events[p.event_id] for p in pdus if p.event_id in seen_events ) signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True + destination, + [p for p in auth_chain if p.event_id not in seen_events], + outlier=True + ) + signed_auth.extend( + seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events ) signed_auth.sort(key=lambda e: e.depth) @@ -340,6 +388,67 @@ class FederationClient(FederationBase): defer.returnValue((signed_pdus, signed_auth)) @defer.inlineCallbacks + def get_events(self, destinations, room_id, event_ids, return_local=True): + """Fetch events from some remote destinations, checking if we already + have them. + + Args: + destinations (list) + room_id (str) + event_ids (list) + return_local (bool): Whether to include events we already have in + the DB in the returned list of events + + Returns: + Deferred: A deferred resolving to a 2-tuple where the first is a list of + events and the second is a list of event ids that we failed to fetch. + """ + if return_local: + seen_events = yield self.store.get_events(event_ids) + signed_events = seen_events.values() + else: + seen_events = yield self.store.have_events(event_ids) + signed_events = [] + + failed_to_fetch = set() + + missing_events = set(event_ids) + for k in seen_events: + missing_events.discard(k) + + if not missing_events: + defer.returnValue((signed_events, failed_to_fetch)) + + def random_server_list(): + srvs = list(destinations) + random.shuffle(srvs) + return srvs + + batch_size = 20 + missing_events = list(missing_events) + for i in xrange(0, len(missing_events), batch_size): + batch = set(missing_events[i:i + batch_size]) + + deferreds = [ + self.get_pdu( + destinations=random_server_list(), + event_id=e_id, + ) + for e_id in batch + ] + + res = yield defer.DeferredList(deferreds, consumeErrors=True) + for success, result in res: + if success: + signed_events.append(result) + batch.discard(result.event_id) + + # We removed all events we successfully fetched from `batch` + failed_to_fetch.update(batch) + + defer.returnValue((signed_events, failed_to_fetch)) + + @defer.inlineCallbacks @log_function def get_event_auth(self, destination, room_id, event_id): res = yield self.transport_layer.get_event_auth( diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 612d274bdb..aba19639c7 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -215,6 +215,27 @@ class FederationServer(FederationBase): defer.returnValue((200, resp)) @defer.inlineCallbacks + def on_state_ids_request(self, origin, room_id, event_id): + if not event_id: + raise NotImplementedError("Specify an event") + + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") + + pdus = yield self.handler.get_state_for_pdu( + room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) + + defer.returnValue((200, { + "pdu_ids": [pdu.event_id for pdu in pdus], + "auth_chain_ids": [pdu.event_id for pdu in auth_chain], + })) + + @defer.inlineCallbacks def _on_context_state_request_compute(self, room_id, event_id): pdus = yield self.handler.get_state_for_pdu( room_id, event_id, @@ -372,27 +393,9 @@ class FederationServer(FederationBase): (200, send_content) ) - @defer.inlineCallbacks @log_function def on_query_client_keys(self, origin, content): - query = [] - for user_id, device_ids in content.get("device_keys", {}).items(): - if not device_ids: - query.append((user_id, None)) - else: - for device_id in device_ids: - query.append((user_id, device_id)) - - results = yield self.store.get_e2e_device_keys(query) - - json_result = {} - for user_id, device_keys in results.items(): - for device_id, json_bytes in device_keys.items(): - json_result.setdefault(user_id, {})[device_id] = json.loads( - json_bytes - ) - - defer.returnValue({"device_keys": json_result}) + return self.on_query_request("client_keys", content) @defer.inlineCallbacks @log_function @@ -602,7 +605,7 @@ class FederationServer(FederationBase): origin, pdu.room_id, pdu.event_id, ) except: - logger.warn("Failed to get state for event: %s", pdu.event_id) + logger.exception("Failed to get state for event: %s", pdu.event_id) yield self.handler.on_receive_pdu( origin, diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index ebb698e278..3d088e43cb 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -55,6 +55,28 @@ class TransportLayerClient(object): ) @log_function + def get_room_state_ids(self, destination, room_id, event_id): + """ Requests all state for a given room from the given server at the + given event. Returns the state's event_id's + + Args: + destination (str): The host name of the remote home server we want + to get the state from. + context (str): The name of the context we want the state of + event_id (str): The event we want the context at. + + Returns: + Deferred: Results in a dict received from the remote homeserver. + """ + logger.debug("get_room_state_ids dest=%s, room=%s", + destination, room_id) + + path = PREFIX + "/state_ids/%s/" % room_id + return self.client.get_json( + destination, path=path, args={"event_id": event_id}, + ) + + @log_function def get_event(self, destination, event_id, timeout=None): """ Requests the pdu with give id and origin from the given server. diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 26fa88ae84..0bc6e0801d 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -271,6 +271,17 @@ class FederationStateServlet(BaseFederationServlet): ) +class FederationStateIdsServlet(BaseFederationServlet): + PATH = "/state_ids/(?P<room_id>[^/]*)/" + + def on_GET(self, origin, content, query, room_id): + return self.handler.on_state_ids_request( + origin, + room_id, + query.get("event_id", [None])[0], + ) + + class FederationBackfillServlet(BaseFederationServlet): PATH = "/backfill/(?P<context>[^/]*)/" @@ -367,10 +378,8 @@ class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet): class FederationClientKeysQueryServlet(BaseFederationServlet): PATH = "/user/keys/query" - @defer.inlineCallbacks def on_POST(self, origin, content, query): - response = yield self.handler.on_query_client_keys(origin, content) - defer.returnValue((200, response)) + return self.handler.on_query_client_keys(origin, content) class FederationClientKeysClaimServlet(BaseFederationServlet): @@ -538,6 +547,7 @@ SERVLET_CLASSES = ( FederationPullServlet, FederationEventServlet, FederationStateServlet, + FederationStateIdsServlet, FederationBackfillServlet, FederationQueryServlet, FederationMakeJoinServlet, diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index f4bf159bb5..8d630c6b1a 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -29,7 +29,7 @@ class DeviceHandler(BaseHandler): @defer.inlineCallbacks def check_device_registered(self, user_id, device_id, - initial_device_display_name): + initial_device_display_name=None): """ If the given device has not been registered, register it with the supplied display name. diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py new file mode 100644 index 0000000000..2c7bfd91ed --- /dev/null +++ b/synapse/handlers/e2e_keys.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import json +import logging + +from twisted.internet import defer + +from synapse.api import errors +import synapse.types + +logger = logging.getLogger(__name__) + + +class E2eKeysHandler(object): + def __init__(self, hs): + self.store = hs.get_datastore() + self.federation = hs.get_replication_layer() + self.is_mine_id = hs.is_mine_id + self.server_name = hs.hostname + + # doesn't really work as part of the generic query API, because the + # query request requires an object POST, but we abuse the + # "query handler" interface. + self.federation.register_query_handler( + "client_keys", self.on_federation_query_client_keys + ) + + @defer.inlineCallbacks + def query_devices(self, query_body): + """ Handle a device key query from a client + + { + "device_keys": { + "<user_id>": ["<device_id>"] + } + } + -> + { + "device_keys": { + "<user_id>": { + "<device_id>": { + ... + } + } + } + } + """ + device_keys_query = query_body.get("device_keys", {}) + + # separate users by domain. + # make a map from domain to user_id to device_ids + queries_by_domain = collections.defaultdict(dict) + for user_id, device_ids in device_keys_query.items(): + user = synapse.types.UserID.from_string(user_id) + queries_by_domain[user.domain][user_id] = device_ids + + # do the queries + # TODO: do these in parallel + results = {} + for destination, destination_query in queries_by_domain.items(): + if destination == self.server_name: + res = yield self.query_local_devices(destination_query) + else: + res = yield self.federation.query_client_keys( + destination, {"device_keys": destination_query} + ) + res = res["device_keys"] + for user_id, keys in res.items(): + if user_id in destination_query: + results[user_id] = keys + + defer.returnValue((200, {"device_keys": results})) + + @defer.inlineCallbacks + def query_local_devices(self, query): + """Get E2E device keys for local users + + Args: + query (dict[string, list[string]|None): map from user_id to a list + of devices to query (None for all devices) + + Returns: + defer.Deferred: (resolves to dict[string, dict[string, dict]]): + map from user_id -> device_id -> device details + """ + local_query = [] + + result_dict = {} + for user_id, device_ids in query.items(): + if not self.is_mine_id(user_id): + logger.warning("Request for keys for non-local user %s", + user_id) + raise errors.SynapseError(400, "Not a user here") + + if not device_ids: + local_query.append((user_id, None)) + else: + for device_id in device_ids: + local_query.append((user_id, device_id)) + + # make sure that each queried user appears in the result dict + result_dict[user_id] = {} + + results = yield self.store.get_e2e_device_keys(local_query) + + # Build the result structure, un-jsonify the results, and add the + # "unsigned" section + for user_id, device_keys in results.items(): + for device_id, device_info in device_keys.items(): + r = json.loads(device_info["key_json"]) + r["unsigned"] = {} + display_name = device_info["device_display_name"] + if display_name is not None: + r["unsigned"]["device_display_name"] = display_name + result_dict[user_id][device_id] = r + + defer.returnValue(result_dict) + + @defer.inlineCallbacks + def on_federation_query_client_keys(self, query_body): + """ Handle a device key query from a federated server + """ + device_keys_query = query_body.get("device_keys", {}) + res = yield self.query_local_devices(device_keys_query) + defer.returnValue({"device_keys": res}) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index dc1d4d8fc6..c5ff16adf3 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -130,9 +130,7 @@ class KeyUploadServlet(RestServlet): # old access_token without an associated device_id. Either way, we # need to double-check the device is registered to avoid ending up with # keys without a corresponding device. - self.device_handler.check_device_registered( - user_id, device_id, "unknown device" - ) + self.device_handler.check_device_registered(user_id, device_id) result = yield self.store.count_e2e_one_time_keys(user_id, device_id) defer.returnValue((200, {"one_time_key_counts": result})) @@ -186,17 +184,19 @@ class KeyQueryServlet(RestServlet): ) def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): + """ super(KeyQueryServlet, self).__init__() - self.store = hs.get_datastore() self.auth = hs.get_auth() - self.federation = hs.get_replication_layer() - self.is_mine = hs.is_mine + self.e2e_keys_handler = hs.get_e2e_keys_handler() @defer.inlineCallbacks def on_POST(self, request, user_id, device_id): yield self.auth.get_user_by_req(request) body = parse_json_object_from_request(request) - result = yield self.handle_request(body) + result = yield self.e2e_keys_handler.query_devices(body) defer.returnValue(result) @defer.inlineCallbacks @@ -205,45 +205,11 @@ class KeyQueryServlet(RestServlet): auth_user_id = requester.user.to_string() user_id = user_id if user_id else auth_user_id device_ids = [device_id] if device_id else [] - result = yield self.handle_request( + result = yield self.e2e_keys_handler.query_devices( {"device_keys": {user_id: device_ids}} ) defer.returnValue(result) - @defer.inlineCallbacks - def handle_request(self, body): - local_query = [] - remote_queries = {} - for user_id, device_ids in body.get("device_keys", {}).items(): - user = UserID.from_string(user_id) - if self.is_mine(user): - if not device_ids: - local_query.append((user_id, None)) - else: - for device_id in device_ids: - local_query.append((user_id, device_id)) - else: - remote_queries.setdefault(user.domain, {})[user_id] = list( - device_ids - ) - results = yield self.store.get_e2e_device_keys(local_query) - - json_result = {} - for user_id, device_keys in results.items(): - for device_id, json_bytes in device_keys.items(): - json_result.setdefault(user_id, {})[device_id] = json.loads( - json_bytes - ) - - for destination, device_keys in remote_queries.items(): - remote_result = yield self.federation.query_client_keys( - destination, {"device_keys": device_keys} - ) - for user_id, keys in remote_result["device_keys"].items(): - if user_id in device_keys: - json_result[user_id] = keys - defer.returnValue((200, {"device_keys": json_result})) - class OneTimeKeyServlet(RestServlet): """ diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 74c64f1371..4060593f7f 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -29,6 +29,8 @@ from synapse.http.server import ( from synapse.util.async import ObservableDeferred from synapse.util.stringutils import is_ascii +from copy import deepcopy + import os import re import fnmatch @@ -329,20 +331,23 @@ class PreviewUrlResource(Resource): # ...or if they are within a <script/> or <style/> tag. # This is a very very very coarse approximation to a plain text # render of the page. - text_nodes = tree.xpath("//text()[not(ancestor::header | ancestor::nav | " - "ancestor::aside | ancestor::footer | " - "ancestor::script | ancestor::style)]" + - "[ancestor::body]") - text = '' - for text_node in text_nodes: - if len(text) < 500: - text += text_node + ' ' - else: - break - text = re.sub(r'[\t ]+', ' ', text) - text = re.sub(r'[\t \r\n]*[\r\n]+', '\n', text) - text = text.strip()[:500] - og['og:description'] = text if text else None + + # We don't just use XPATH here as that is slow on some machines. + + # We clone `tree` as we modify it. + cloned_tree = deepcopy(tree.find("body")) + + TAGS_TO_REMOVE = ("header", "nav", "aside", "footer", "script", "style",) + for el in cloned_tree.iter(TAGS_TO_REMOVE): + el.getparent().remove(el) + + # Split all the text nodes into paragraphs (by splitting on new + # lines) + text_nodes = ( + re.sub(r'\s+', '\n', el.text).strip() + for el in cloned_tree.iter() if el.text + ) + og['og:description'] = summarize_paragraphs(text_nodes) # TODO: delete the url downloads to stop diskfilling, # as we only ever cared about its OG @@ -450,3 +455,56 @@ class PreviewUrlResource(Resource): content_type.startswith("application/xhtml") ): return True + + +def summarize_paragraphs(text_nodes, min_size=200, max_size=500): + # Try to get a summary of between 200 and 500 words, respecting + # first paragraph and then word boundaries. + # TODO: Respect sentences? + + description = '' + + # Keep adding paragraphs until we get to the MIN_SIZE. + for text_node in text_nodes: + if len(description) < min_size: + text_node = re.sub(r'[\t \r\n]+', ' ', text_node) + description += text_node + '\n\n' + else: + break + + description = description.strip() + description = re.sub(r'[\t ]+', ' ', description) + description = re.sub(r'[\t \r\n]*[\r\n]+', '\n\n', description) + + # If the concatenation of paragraphs to get above MIN_SIZE + # took us over MAX_SIZE, then we need to truncate mid paragraph + if len(description) > max_size: + new_desc = "" + + # This splits the paragraph into words, but keeping the + # (preceeding) whitespace intact so we can easily concat + # words back together. + for match in re.finditer("\s*\S+", description): + word = match.group() + + # Keep adding words while the total length is less than + # MAX_SIZE. + if len(word) + len(new_desc) < max_size: + new_desc += word + else: + # At this point the next word *will* take us over + # MAX_SIZE, but we also want to ensure that its not + # a huge word. If it is add it anyway and we'll + # truncate later. + if len(new_desc) < min_size: + new_desc += word + break + + # Double check that we're not over the limit + if len(new_desc) > max_size: + new_desc = new_desc[:max_size] + + # We always add an ellipsis because at the very least + # we chopped mid paragraph. + description = new_desc.strip() + "…" + return description if description else None diff --git a/synapse/server.py b/synapse/server.py index e8b166990d..6bb4988309 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -19,39 +19,38 @@ # partial one for unit test mocking. # Imports required for the default HomeServer() implementation -from twisted.web.client import BrowserLikePolicyForHTTPS +import logging + from twisted.enterprise import adbapi +from twisted.web.client import BrowserLikePolicyForHTTPS -from synapse.appservice.scheduler import ApplicationServiceScheduler +from synapse.api.auth import Auth +from synapse.api.filtering import Filtering +from synapse.api.ratelimiting import Ratelimiter from synapse.appservice.api import ApplicationServiceApi +from synapse.appservice.scheduler import ApplicationServiceScheduler +from synapse.crypto.keyring import Keyring +from synapse.events.builder import EventBuilderFactory from synapse.federation import initialize_http_replication -from synapse.handlers.device import DeviceHandler -from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory -from synapse.notifier import Notifier -from synapse.api.auth import Auth from synapse.handlers import Handlers +from synapse.handlers.appservice import ApplicationServicesHandler +from synapse.handlers.auth import AuthHandler +from synapse.handlers.device import DeviceHandler +from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.presence import PresenceHandler +from synapse.handlers.room import RoomListHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler -from synapse.handlers.room import RoomListHandler -from synapse.handlers.auth import AuthHandler -from synapse.handlers.appservice import ApplicationServicesHandler +from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory +from synapse.http.matrixfederationclient import MatrixFederationHttpClient +from synapse.notifier import Notifier +from synapse.push.pusherpool import PusherPool +from synapse.rest.media.v1.media_repository import MediaRepository from synapse.state import StateHandler from synapse.storage import DataStore +from synapse.streams.events import EventSources from synapse.util import Clock from synapse.util.distributor import Distributor -from synapse.streams.events import EventSources -from synapse.api.ratelimiting import Ratelimiter -from synapse.crypto.keyring import Keyring -from synapse.push.pusherpool import PusherPool -from synapse.events.builder import EventBuilderFactory -from synapse.api.filtering import Filtering -from synapse.rest.media.v1.media_repository import MediaRepository - -from synapse.http.matrixfederationclient import MatrixFederationHttpClient - -import logging - logger = logging.getLogger(__name__) @@ -94,6 +93,7 @@ class HomeServer(object): 'room_list_handler', 'auth_handler', 'device_handler', + 'e2e_keys_handler', 'application_service_api', 'application_service_scheduler', 'application_service_handler', @@ -202,6 +202,9 @@ class HomeServer(object): def build_device_handler(self): return DeviceHandler(self) + def build_e2e_keys_handler(self): + return E2eKeysHandler(self) + def build_application_service_api(self): return ApplicationServiceApi(self) diff --git a/synapse/server.pyi b/synapse/server.pyi index 902f725c06..c0aa868c4f 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -1,6 +1,7 @@ import synapse.handlers import synapse.handlers.auth import synapse.handlers.device +import synapse.handlers.e2e_keys import synapse.storage import synapse.state @@ -14,6 +15,9 @@ class HomeServer(object): def get_device_handler(self) -> synapse.handlers.device.DeviceHandler: pass + def get_e2e_keys_handler(self) -> synapse.handlers.e2e_keys.E2eKeysHandler: + pass + def get_handlers(self) -> synapse.handlers.Handlers: pass diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 62b7790e91..385d607056 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import collections import twisted.internet.defer @@ -38,24 +39,49 @@ class EndToEndKeyStore(SQLBaseStore): query_list(list): List of pairs of user_ids and device_ids. Returns: Dict mapping from user-id to dict mapping from device_id to - key json byte strings. + dict containing "key_json", "device_display_name". """ - def _get_e2e_device_keys(txn): - result = {} - for user_id, device_id in query_list: - user_result = result.setdefault(user_id, {}) - keyvalues = {"user_id": user_id} - if device_id: - keyvalues["device_id"] = device_id - rows = self._simple_select_list_txn( - txn, table="e2e_device_keys_json", - keyvalues=keyvalues, - retcols=["device_id", "key_json"] - ) - for row in rows: - user_result[row["device_id"]] = row["key_json"] - return result - return self.runInteraction("get_e2e_device_keys", _get_e2e_device_keys) + if not query_list: + return {} + + return self.runInteraction( + "get_e2e_device_keys", self._get_e2e_device_keys_txn, query_list + ) + + def _get_e2e_device_keys_txn(self, txn, query_list): + query_clauses = [] + query_params = [] + + for (user_id, device_id) in query_list: + query_clause = "k.user_id = ?" + query_params.append(user_id) + + if device_id: + query_clause += " AND k.device_id = ?" + query_params.append(device_id) + + query_clauses.append(query_clause) + + sql = ( + "SELECT k.user_id, k.device_id, " + " d.display_name AS device_display_name, " + " k.key_json" + " FROM e2e_device_keys_json k" + " LEFT JOIN devices d ON d.user_id = k.user_id" + " AND d.device_id = k.device_id" + " WHERE %s" + ) % ( + " OR ".join("(" + q + ")" for q in query_clauses) + ) + + txn.execute(sql, query_params) + rows = self.cursor_to_dict(txn) + + result = collections.defaultdict(dict) + for row in rows: + result[row["user_id"]][row["device_id"]] = row + + return result def add_e2e_one_time_keys(self, user_id, device_id, time_now, key_list): def _add_e2e_one_time_keys(txn): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c63ca36df6..e4dbaa3547 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -26,7 +26,8 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from canonicaljson import encode_canonical_json -from collections import deque, namedtuple +from collections import deque, namedtuple, OrderedDict +from functools import wraps import synapse import synapse.metrics @@ -150,6 +151,26 @@ class _EventPeristenceQueue(object): _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) +def _retry_on_integrity_error(func): + """Wraps a database function so that it gets retried on IntegrityError, + with `delete_existing=True` passed in. + + Args: + func: function that returns a Deferred and accepts a `delete_existing` arg + """ + @wraps(func) + @defer.inlineCallbacks + def f(self, *args, **kwargs): + try: + res = yield func(self, *args, **kwargs) + except self.database_engine.module.IntegrityError: + logger.exception("IntegrityError, retrying.") + res = yield func(self, *args, delete_existing=True, **kwargs) + defer.returnValue(res) + + return f + + class EventsStore(SQLBaseStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" @@ -229,8 +250,10 @@ class EventsStore(SQLBaseStore): self._event_persist_queue.handle_queue(room_id, persisting_queue) + @_retry_on_integrity_error @defer.inlineCallbacks - def _persist_events(self, events_and_contexts, backfilled=False): + def _persist_events(self, events_and_contexts, backfilled=False, + delete_existing=False): if not events_and_contexts: return @@ -273,12 +296,15 @@ class EventsStore(SQLBaseStore): self._persist_events_txn, events_and_contexts=chunk, backfilled=backfilled, + delete_existing=delete_existing, ) persist_event_counter.inc_by(len(chunk)) + @_retry_on_integrity_error @defer.inlineCallbacks @log_function - def _persist_event(self, event, context, current_state=None, backfilled=False): + def _persist_event(self, event, context, current_state=None, backfilled=False, + delete_existing=False): try: with self._stream_id_gen.get_next() as stream_ordering: with self._state_groups_id_gen.get_next() as state_group_id: @@ -291,6 +317,7 @@ class EventsStore(SQLBaseStore): context=context, current_state=current_state, backfilled=backfilled, + delete_existing=delete_existing, ) persist_event_counter.inc() except _RollbackButIsFineException: @@ -353,7 +380,8 @@ class EventsStore(SQLBaseStore): defer.returnValue({e.event_id: e for e in events}) @log_function - def _persist_event_txn(self, txn, event, context, current_state, backfilled=False): + def _persist_event_txn(self, txn, event, context, current_state, backfilled=False, + delete_existing=False): # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: @@ -393,16 +421,38 @@ class EventsStore(SQLBaseStore): txn, [(event, context)], backfilled=backfilled, + delete_existing=delete_existing, ) @log_function - def _persist_events_txn(self, txn, events_and_contexts, backfilled): + def _persist_events_txn(self, txn, events_and_contexts, backfilled, + delete_existing=False): """Insert some number of room events into the necessary database tables. Rejected events are only inserted into the events table, the events_json table, and the rejections table. Things reading from those table will need to check whether the event was rejected. + + If delete_existing is True then existing events will be purged from the + database before insertion. This is useful when retrying due to IntegrityError. """ + # Ensure that we don't have the same event twice. + # Pick the earliest non-outlier if there is one, else the earliest one. + new_events_and_contexts = OrderedDict() + for event, context in events_and_contexts: + prev_event_context = new_events_and_contexts.get(event.event_id) + if prev_event_context: + if not event.internal_metadata.is_outlier(): + if prev_event_context[0].internal_metadata.is_outlier(): + # To ensure correct ordering we pop, as OrderedDict is + # ordered by first insertion. + new_events_and_contexts.pop(event.event_id, None) + new_events_and_contexts[event.event_id] = (event, context) + else: + new_events_and_contexts[event.event_id] = (event, context) + + events_and_contexts = new_events_and_contexts.values() + depth_updates = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids @@ -433,8 +483,6 @@ class EventsStore(SQLBaseStore): for event_id, outlier in txn.fetchall() } - # Remove the events that we've seen before. - event_map = {} to_remove = set() for event, context in events_and_contexts: if context.rejected: @@ -445,23 +493,6 @@ class EventsStore(SQLBaseStore): to_remove.add(event) continue - # Handle the case of the list including the same event multiple - # times. The tricky thing here is when they differ by whether - # they are an outlier. - if event.event_id in event_map: - other = event_map[event.event_id] - - if not other.internal_metadata.is_outlier(): - to_remove.add(event) - continue - elif not event.internal_metadata.is_outlier(): - to_remove.add(event) - continue - else: - to_remove.add(other) - - event_map[event.event_id] = event - if event.event_id not in have_persisted: continue @@ -539,6 +570,43 @@ class EventsStore(SQLBaseStore): ] } + if delete_existing: + # For paranoia reasons, we go and delete all the existing entries + # for these events so we can reinsert them. + # This gets around any problems with some tables already having + # entries. + + logger.info("Deleting existing") + + for table in ( + "events", + "event_auth", + "event_json", + "event_content_hashes", + "event_destinations", + "event_edge_hashes", + "event_edges", + "event_forward_extremities", + "event_push_actions", + "event_reference_hashes", + "event_search", + "event_signatures", + "event_to_state_groups", + "guest_access", + "history_visibility", + "local_invites", + "room_names", + "state_events", + "rejections", + "redactions", + "room_memberships", + "state_events" + ): + txn.executemany( + "DELETE FROM %s WHERE event_id = ?" % (table,), + [(ev.event_id,) for ev, _ in events_and_contexts] + ) + self._simple_insert_many_txn( txn, table="event_json", diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql index 140f2b63e0..aa4a3b9f2f 100644 --- a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql +++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql @@ -16,4 +16,4 @@ -- make sure that we have a device record for each set of E2E keys, so that the -- user can delete them if they like. INSERT INTO devices - SELECT user_id, device_id, 'unknown device' FROM e2e_device_keys_json; + SELECT user_id, device_id, NULL FROM e2e_device_keys_json; diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql new file mode 100644 index 0000000000..6671573398 --- /dev/null +++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql @@ -0,0 +1,20 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- a previous version of the "devices_for_e2e_keys" delta set all the device +-- names to "unknown device". This wasn't terribly helpful +UPDATE devices + SET display_name = NULL + WHERE display_name = 'unknown device'; diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py new file mode 100644 index 0000000000..878a54dc34 --- /dev/null +++ b/tests/handlers/test_e2e_keys.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +from twisted.internet import defer + +import synapse.api.errors +import synapse.handlers.e2e_keys + +import synapse.storage +from tests import unittest, utils + + +class E2eKeysHandlerTestCase(unittest.TestCase): + def __init__(self, *args, **kwargs): + super(E2eKeysHandlerTestCase, self).__init__(*args, **kwargs) + self.hs = None # type: synapse.server.HomeServer + self.handler = None # type: synapse.handlers.e2e_keys.E2eKeysHandler + + @defer.inlineCallbacks + def setUp(self): + self.hs = yield utils.setup_test_homeserver( + handlers=None, + replication_layer=mock.Mock(), + ) + self.handler = synapse.handlers.e2e_keys.E2eKeysHandler(self.hs) + + @defer.inlineCallbacks + def test_query_local_devices_no_devices(self): + """If the user has no devices, we expect an empty list. + """ + local_user = "@boris:" + self.hs.hostname + res = yield self.handler.query_local_devices({local_user: None}) + self.assertDictEqual(res, {local_user: {}}) diff --git a/tests/storage/test_end_to_end_keys.py b/tests/storage/test_end_to_end_keys.py new file mode 100644 index 0000000000..453bc61438 --- /dev/null +++ b/tests/storage/test_end_to_end_keys.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import tests.unittest +import tests.utils + + +class EndToEndKeyStoreTestCase(tests.unittest.TestCase): + def __init__(self, *args, **kwargs): + super(EndToEndKeyStoreTestCase, self).__init__(*args, **kwargs) + self.store = None # type: synapse.storage.DataStore + + @defer.inlineCallbacks + def setUp(self): + hs = yield tests.utils.setup_test_homeserver() + + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def test_key_without_device_name(self): + now = 1470174257070 + json = '{ "key": "value" }' + + yield self.store.set_e2e_device_keys( + "user", "device", now, json) + + res = yield self.store.get_e2e_device_keys((("user", "device"),)) + self.assertIn("user", res) + self.assertIn("device", res["user"]) + dev = res["user"]["device"] + self.assertDictContainsSubset({ + "key_json": json, + "device_display_name": None, + }, dev) + + @defer.inlineCallbacks + def test_get_key_with_device_name(self): + now = 1470174257070 + json = '{ "key": "value" }' + + yield self.store.set_e2e_device_keys( + "user", "device", now, json) + yield self.store.store_device( + "user", "device", "display_name" + ) + + res = yield self.store.get_e2e_device_keys((("user", "device"),)) + self.assertIn("user", res) + self.assertIn("device", res["user"]) + dev = res["user"]["device"] + self.assertDictContainsSubset({ + "key_json": json, + "device_display_name": "display_name", + }, dev) + + @defer.inlineCallbacks + def test_multiple_devices(self): + now = 1470174257070 + + yield self.store.set_e2e_device_keys( + "user1", "device1", now, 'json11') + yield self.store.set_e2e_device_keys( + "user1", "device2", now, 'json12') + yield self.store.set_e2e_device_keys( + "user2", "device1", now, 'json21') + yield self.store.set_e2e_device_keys( + "user2", "device2", now, 'json22') + + res = yield self.store.get_e2e_device_keys((("user1", "device1"), + ("user2", "device2"))) + self.assertIn("user1", res) + self.assertIn("device1", res["user1"]) + self.assertNotIn("device2", res["user1"]) + self.assertIn("user2", res) + self.assertNotIn("device1", res["user2"]) + self.assertIn("device2", res["user2"]) diff --git a/tests/test_preview.py b/tests/test_preview.py new file mode 100644 index 0000000000..2a801173a0 --- /dev/null +++ b/tests/test_preview.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- +# Copyright 2014-2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import unittest + +from synapse.rest.media.v1.preview_url_resource import summarize_paragraphs + + +class PreviewTestCase(unittest.TestCase): + + def test_long_summarize(self): + example_paras = [ + """Tromsø (Norwegian pronunciation: [ˈtrʊmsœ] ( listen); Northern Sami: + Romsa; Finnish: Tromssa[2] Kven: Tromssa) is a city and municipality in + Troms county, Norway. The administrative centre of the municipality is + the city of Tromsø. Outside of Norway, Tromso and Tromsö are + alternative spellings of the city.Tromsø is considered the northernmost + city in the world with a population above 50,000. The most populous town + north of it is Alta, Norway, with a population of 14,272 (2013).""", + + """Tromsø lies in Northern Norway. The municipality has a population of + (2015) 72,066, but with an annual influx of students it has over 75,000 + most of the year. It is the largest urban area in Northern Norway and the + third largest north of the Arctic Circle (following Murmansk and Norilsk). + Most of Tromsø, including the city centre, is located on the island of + Tromsøya, 350 kilometres (217 mi) north of the Arctic Circle. In 2012, + Tromsøya had a population of 36,088. Substantial parts of the urban area + are also situated on the mainland to the east, and on parts of Kvaløya—a + large island to the west. Tromsøya is connected to the mainland by the Tromsø + Bridge and the Tromsøysund Tunnel, and to the island of Kvaløya by the + Sandnessund Bridge. Tromsø Airport connects the city to many destinations + in Europe. The city is warmer than most other places located on the same + latitude, due to the warming effect of the Gulf Stream.""", + + """The city centre of Tromsø contains the highest number of old wooden + houses in Northern Norway, the oldest house dating from 1789. The Arctic + Cathedral, a modern church from 1965, is probably the most famous landmark + in Tromsø. The city is a cultural centre for its region, with several + festivals taking place in the summer. Some of Norway's best-known + musicians, Torbjørn Brundtland and Svein Berge of the electronica duo + Röyksopp and Lene Marlin grew up and started their careers in Tromsø. + Noted electronic musician Geir Jenssen also hails from Tromsø.""", + ] + + desc = summarize_paragraphs(example_paras, min_size=200, max_size=500) + + self.assertEquals( + desc, + "Tromsø (Norwegian pronunciation: [ˈtrʊmsœ] ( listen); Northern Sami:" + " Romsa; Finnish: Tromssa[2] Kven: Tromssa) is a city and municipality in" + " Troms county, Norway. The administrative centre of the municipality is" + " the city of Tromsø. Outside of Norway, Tromso and Tromsö are" + " alternative spellings of the city.Tromsø is considered the northernmost" + " city in the world with a population above 50,000. The most populous town" + " north of it is Alta, Norway, with a population of 14,272 (2013)." + ) + + desc = summarize_paragraphs(example_paras[1:], min_size=200, max_size=500) + + self.assertEquals( + desc, + "Tromsø lies in Northern Norway. The municipality has a population of" + " (2015) 72,066, but with an annual influx of students it has over 75,000" + " most of the year. It is the largest urban area in Northern Norway and the" + " third largest north of the Arctic Circle (following Murmansk and Norilsk)." + " Most of Tromsø, including the city centre, is located on the island of" + " Tromsøya, 350 kilometres (217 mi) north of the Arctic Circle. In 2012," + " Tromsøya had a population of 36,088. Substantial parts of the…" + ) + + def test_short_summarize(self): + example_paras = [ + "Tromsø (Norwegian pronunciation: [ˈtrʊmsœ] ( listen); Northern Sami:" + " Romsa; Finnish: Tromssa[2] Kven: Tromssa) is a city and municipality in" + " Troms county, Norway.", + + "Tromsø lies in Northern Norway. The municipality has a population of" + " (2015) 72,066, but with an annual influx of students it has over 75,000" + " most of the year.", + + "The city centre of Tromsø contains the highest number of old wooden" + " houses in Northern Norway, the oldest house dating from 1789. The Arctic" + " Cathedral, a modern church from 1965, is probably the most famous landmark" + " in Tromsø.", + ] + + desc = summarize_paragraphs(example_paras, min_size=200, max_size=500) + + self.assertEquals( + desc, + "Tromsø (Norwegian pronunciation: [ˈtrʊmsœ] ( listen); Northern Sami:" + " Romsa; Finnish: Tromssa[2] Kven: Tromssa) is a city and municipality in" + " Troms county, Norway.\n" + "\n" + "Tromsø lies in Northern Norway. The municipality has a population of" + " (2015) 72,066, but with an annual influx of students it has over 75,000" + " most of the year." + ) + + def test_small_then_large_summarize(self): + example_paras = [ + "Tromsø (Norwegian pronunciation: [ˈtrʊmsœ] ( listen); Northern Sami:" + " Romsa; Finnish: Tromssa[2] Kven: Tromssa) is a city and municipality in" + " Troms county, Norway.", + + "Tromsø lies in Northern Norway. The municipality has a population of" + " (2015) 72,066, but with an annual influx of students it has over 75,000" + " most of the year." + " The city centre of Tromsø contains the highest number of old wooden" + " houses in Northern Norway, the oldest house dating from 1789. The Arctic" + " Cathedral, a modern church from 1965, is probably the most famous landmark" + " in Tromsø.", + ] + + desc = summarize_paragraphs(example_paras, min_size=200, max_size=500) + self.assertEquals( + desc, + "Tromsø (Norwegian pronunciation: [ˈtrʊmsœ] ( listen); Northern Sami:" + " Romsa; Finnish: Tromssa[2] Kven: Tromssa) is a city and municipality in" + " Troms county, Norway.\n" + "\n" + "Tromsø lies in Northern Norway. The municipality has a population of" + " (2015) 72,066, but with an annual influx of students it has over 75,000" + " most of the year. The city centre of Tromsø contains the highest number" + " of old wooden houses in Northern Norway, the oldest house dating from" + " 1789. The Arctic Cathedral, a modern church…" + ) |