diff --git a/MANIFEST.in b/MANIFEST.in
index 216df265b5..981698143f 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -24,5 +24,7 @@ recursive-include synapse/static *.js
exclude jenkins.sh
exclude jenkins*.sh
+exclude jenkins*
+recursive-exclude jenkins *.sh
prune demo/etc
diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh
index f715cd559a..68912a8967 100755
--- a/jenkins-dendron-postgres.sh
+++ b/jenkins-dendron-postgres.sh
@@ -4,85 +4,19 @@ set -eux
: ${WORKSPACE:="$(pwd)"}
+export WORKSPACE
export PYTHONDONTWRITEBYTECODE=yep
export SYNAPSE_CACHE_FACTOR=1
-# Output test results as junit xml
-export TRIAL_FLAGS="--reporter=subunit"
-export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml"
-# Write coverage reports to a separate file for each process
-export COVERAGE_OPTS="-p"
-export DUMP_COVERAGE_COMMAND="coverage help"
-
-# Output flake8 violations to violations.flake8.log
-# Don't exit with non-0 status code on Jenkins,
-# so that the build steps continue and a later step can decided whether to
-# UNSTABLE or FAILURE this build.
-export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?"
-
-rm .coverage* || echo "No coverage files to remove"
-
-tox --notest -e py27
-
-TOX_BIN=$WORKSPACE/.tox/py27/bin
-python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install
-$TOX_BIN/pip install psycopg2
-$TOX_BIN/pip install lxml
-
-: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"}
-
-if [[ ! -e .dendron-base ]]; then
- git clone https://github.com/matrix-org/dendron.git .dendron-base --mirror
-else
- (cd .dendron-base; git fetch -p)
-fi
-
-rm -rf dendron
-git clone .dendron-base dendron --shared
-cd dendron
-
-: ${GOPATH:=${WORKSPACE}/.gopath}
-if [[ "${GOPATH}" != *:* ]]; then
- mkdir -p "${GOPATH}"
- export PATH="${GOPATH}/bin:${PATH}"
-fi
-export GOPATH
-
-git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop)
-
-go get github.com/constabulary/gb/...
-gb generate
-gb build
-
-cd ..
-
-
-if [[ ! -e .sytest-base ]]; then
- git clone https://github.com/matrix-org/sytest.git .sytest-base --mirror
-else
- (cd .sytest-base; git fetch -p)
-fi
-
-rm -rf sytest
-git clone .sytest-base sytest --shared
-cd sytest
-
-git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop)
-
-: ${PORT_BASE:=20000}
-: ${PORT_COUNT=100}
-
-./jenkins/prep_sytest_for_postgres.sh
-
-mkdir -p var
-
-echo >&2 "Running sytest with PostgreSQL";
-./jenkins/install_and_run.sh --python $TOX_BIN/python \
- --synapse-directory $WORKSPACE \
- --dendron $WORKSPACE/dendron/bin/dendron \
- --pusher \
- --synchrotron \
- --federation-reader \
- --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1))
-
-cd ..
+./jenkins/prepare_synapse.sh
+./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git
+./jenkins/clone.sh dendron https://github.com/matrix-org/dendron.git
+./dendron/jenkins/build_dendron.sh
+./sytest/jenkins/prep_sytest_for_postgres.sh
+
+./sytest/jenkins/install_and_run.sh \
+ --synapse-directory $WORKSPACE \
+ --dendron $WORKSPACE/dendron/bin/dendron \
+ --pusher \
+ --synchrotron \
+ --federation-reader \
diff --git a/jenkins-postgres.sh b/jenkins-postgres.sh
index 7a43df0d58..f2ca8ccdff 100755
--- a/jenkins-postgres.sh
+++ b/jenkins-postgres.sh
@@ -4,61 +4,14 @@ set -eux
: ${WORKSPACE:="$(pwd)"}
+export WORKSPACE
export PYTHONDONTWRITEBYTECODE=yep
export SYNAPSE_CACHE_FACTOR=1
-# Output test results as junit xml
-export TRIAL_FLAGS="--reporter=subunit"
-export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml"
-# Write coverage reports to a separate file for each process
-export COVERAGE_OPTS="-p"
-export DUMP_COVERAGE_COMMAND="coverage help"
+./jenkins/prepare_synapse.sh
+./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git
-# Output flake8 violations to violations.flake8.log
-# Don't exit with non-0 status code on Jenkins,
-# so that the build steps continue and a later step can decided whether to
-# UNSTABLE or FAILURE this build.
-export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?"
+./sytest/jenkins/prep_sytest_for_postgres.sh
-rm .coverage* || echo "No coverage files to remove"
-
-tox --notest -e py27
-
-TOX_BIN=$WORKSPACE/.tox/py27/bin
-python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install
-$TOX_BIN/pip install psycopg2
-$TOX_BIN/pip install lxml
-
-: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"}
-
-if [[ ! -e .sytest-base ]]; then
- git clone https://github.com/matrix-org/sytest.git .sytest-base --mirror
-else
- (cd .sytest-base; git fetch -p)
-fi
-
-rm -rf sytest
-git clone .sytest-base sytest --shared
-cd sytest
-
-git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop)
-
-: ${PORT_BASE:=20000}
-: ${PORT_COUNT=100}
-
-./jenkins/prep_sytest_for_postgres.sh
-
-echo >&2 "Running sytest with PostgreSQL";
-./jenkins/install_and_run.sh --coverage \
- --python $TOX_BIN/python \
- --synapse-directory $WORKSPACE \
- --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1)) \
-
-cd ..
-cp sytest/.coverage.* .
-
-# Combine the coverage reports
-echo "Combining:" .coverage.*
-$TOX_BIN/python -m coverage combine
-# Output coverage to coverage.xml
-$TOX_BIN/coverage xml -o coverage.xml
+./sytest/jenkins/install_and_run.sh \
+ --synapse-directory $WORKSPACE \
diff --git a/jenkins-sqlite.sh b/jenkins-sqlite.sh
index 27e61af6ee..84613d979c 100755
--- a/jenkins-sqlite.sh
+++ b/jenkins-sqlite.sh
@@ -4,56 +4,12 @@ set -eux
: ${WORKSPACE:="$(pwd)"}
+export WORKSPACE
export PYTHONDONTWRITEBYTECODE=yep
export SYNAPSE_CACHE_FACTOR=1
-# Output test results as junit xml
-export TRIAL_FLAGS="--reporter=subunit"
-export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml"
-# Write coverage reports to a separate file for each process
-export COVERAGE_OPTS="-p"
-export DUMP_COVERAGE_COMMAND="coverage help"
+./jenkins/prepare_synapse.sh
+./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git
-# Output flake8 violations to violations.flake8.log
-# Don't exit with non-0 status code on Jenkins,
-# so that the build steps continue and a later step can decided whether to
-# UNSTABLE or FAILURE this build.
-export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?"
-
-rm .coverage* || echo "No coverage files to remove"
-
-tox --notest -e py27
-TOX_BIN=$WORKSPACE/.tox/py27/bin
-python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install
-$TOX_BIN/pip install lxml
-
-: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"}
-
-if [[ ! -e .sytest-base ]]; then
- git clone https://github.com/matrix-org/sytest.git .sytest-base --mirror
-else
- (cd .sytest-base; git fetch -p)
-fi
-
-rm -rf sytest
-git clone .sytest-base sytest --shared
-cd sytest
-
-git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop)
-
-: ${PORT_BASE:=20000}
-: ${PORT_COUNT=100}
-
-./jenkins/install_and_run.sh --coverage \
- --python $TOX_BIN/python \
- --synapse-directory $WORKSPACE \
- --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1)) \
-
-cd ..
-cp sytest/.coverage.* .
-
-# Combine the coverage reports
-echo "Combining:" .coverage.*
-$TOX_BIN/python -m coverage combine
-# Output coverage to coverage.xml
-$TOX_BIN/coverage xml -o coverage.xml
+./sytest/jenkins/install_and_run.sh \
+ --synapse-directory $WORKSPACE \
diff --git a/jenkins/clone.sh b/jenkins/clone.sh
new file mode 100755
index 0000000000..ab30ac7782
--- /dev/null
+++ b/jenkins/clone.sh
@@ -0,0 +1,44 @@
+#! /bin/bash
+
+# This clones a project from github into a named subdirectory
+# If the project has a branch with the same name as this branch
+# then it will checkout that branch after cloning.
+# Otherwise it will checkout "origin/develop."
+# The first argument is the name of the directory to checkout
+# the branch into.
+# The second argument is the URL of the remote repository to checkout.
+# Usually something like https://github.com/matrix-org/sytest.git
+
+set -eux
+
+NAME=$1
+PROJECT=$2
+BASE=".$NAME-base"
+
+# Update our mirror.
+if [ ! -d ".$NAME-base" ]; then
+ # Create a local mirror of the source repository.
+ # This saves us from having to download the entire repository
+ # when this script is next run.
+ git clone "$PROJECT" "$BASE" --mirror
+else
+ # Fetch any updates from the source repository.
+ (cd "$BASE"; git fetch -p)
+fi
+
+# Remove the existing repository so that we have a clean copy
+rm -rf "$NAME"
+# Cloning with --shared means that we will share portions of the
+# .git directory with our local mirror.
+git clone "$BASE" "$NAME" --shared
+
+# Jenkins may have supplied us with the name of the branch in the
+# environment. Otherwise we will have to guess based on the current
+# commit.
+: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"}
+cd "$NAME"
+# check out the relevant branch
+git checkout "${GIT_BRANCH}" || (
+ echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop"
+ git checkout "origin/develop"
+)
diff --git a/jenkins/prepare_synapse.sh b/jenkins/prepare_synapse.sh
new file mode 100755
index 0000000000..237223c81b
--- /dev/null
+++ b/jenkins/prepare_synapse.sh
@@ -0,0 +1,19 @@
+#! /bin/bash
+
+cd "`dirname $0`/.."
+
+TOX_DIR=$WORKSPACE/.tox
+
+mkdir -p $TOX_DIR
+
+if ! [ $TOX_DIR -ef .tox ]; then
+ ln -s "$TOX_DIR" .tox
+fi
+
+# set up the virtualenv
+tox -e py27 --notest -v
+
+TOX_BIN=$TOX_DIR/py27/bin
+python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install
+$TOX_BIN/pip install lxml
+$TOX_BIN/pip install psycopg2
diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py
index caa3cee4e7..59c3dce3d7 100644
--- a/scripts-dev/federation_client.py
+++ b/scripts-dev/federation_client.py
@@ -128,6 +128,7 @@ def get_json(origin_name, origin_key, destination, path):
headers={"Authorization": authorization_headers[0]},
verify=False,
)
+ sys.stderr.write("Status Code: %d\n" % (result.status_code,))
return result.json()
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index efd04da2d6..66c61b0198 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -34,7 +34,7 @@ logger = logging.getLogger("synapse_port_db")
BOOLEAN_COLUMNS = {
- "events": ["processed", "outlier"],
+ "events": ["processed", "outlier", "contains_url"],
"rooms": ["is_public"],
"event_edges": ["is_state"],
"presence_list": ["accepted"],
@@ -92,8 +92,12 @@ class Store(object):
_simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"]
_simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"]
+ _simple_select_one = SQLBaseStore.__dict__["_simple_select_one"]
+ _simple_select_one_txn = SQLBaseStore.__dict__["_simple_select_one_txn"]
_simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"]
- _simple_select_one_onecol_txn = SQLBaseStore.__dict__["_simple_select_one_onecol_txn"]
+ _simple_select_one_onecol_txn = SQLBaseStore.__dict__[
+ "_simple_select_one_onecol_txn"
+ ]
_simple_update_one = SQLBaseStore.__dict__["_simple_update_one"]
_simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"]
@@ -158,31 +162,40 @@ class Porter(object):
def setup_table(self, table):
if table in APPEND_ONLY_TABLES:
# It's safe to just carry on inserting.
- next_chunk = yield self.postgres_store._simple_select_one_onecol(
+ row = yield self.postgres_store._simple_select_one(
table="port_from_sqlite3",
keyvalues={"table_name": table},
- retcol="rowid",
+ retcols=("forward_rowid", "backward_rowid"),
allow_none=True,
)
total_to_port = None
- if next_chunk is None:
+ if row is None:
if table == "sent_transactions":
- next_chunk, already_ported, total_to_port = (
+ forward_chunk, already_ported, total_to_port = (
yield self._setup_sent_transactions()
)
+ backward_chunk = 0
else:
yield self.postgres_store._simple_insert(
table="port_from_sqlite3",
- values={"table_name": table, "rowid": 1}
+ values={
+ "table_name": table,
+ "forward_rowid": 1,
+ "backward_rowid": 0,
+ }
)
- next_chunk = 1
+ forward_chunk = 1
+ backward_chunk = 0
already_ported = 0
+ else:
+ forward_chunk = row["forward_rowid"]
+ backward_chunk = row["backward_rowid"]
if total_to_port is None:
already_ported, total_to_port = yield self._get_total_count_to_port(
- table, next_chunk
+ table, forward_chunk, backward_chunk
)
else:
def delete_all(txn):
@@ -196,46 +209,85 @@ class Porter(object):
yield self.postgres_store._simple_insert(
table="port_from_sqlite3",
- values={"table_name": table, "rowid": 0}
+ values={
+ "table_name": table,
+ "forward_rowid": 1,
+ "backward_rowid": 0,
+ }
)
- next_chunk = 1
+ forward_chunk = 1
+ backward_chunk = 0
already_ported, total_to_port = yield self._get_total_count_to_port(
- table, next_chunk
+ table, forward_chunk, backward_chunk
)
- defer.returnValue((table, already_ported, total_to_port, next_chunk))
+ defer.returnValue(
+ (table, already_ported, total_to_port, forward_chunk, backward_chunk)
+ )
@defer.inlineCallbacks
- def handle_table(self, table, postgres_size, table_size, next_chunk):
+ def handle_table(self, table, postgres_size, table_size, forward_chunk,
+ backward_chunk):
if not table_size:
return
self.progress.add_table(table, postgres_size, table_size)
if table == "event_search":
- yield self.handle_search_table(postgres_size, table_size, next_chunk)
+ yield self.handle_search_table(
+ postgres_size, table_size, forward_chunk, backward_chunk
+ )
return
- select = (
+ forward_select = (
"SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?"
% (table,)
)
+ backward_select = (
+ "SELECT rowid, * FROM %s WHERE rowid <= ? ORDER BY rowid LIMIT ?"
+ % (table,)
+ )
+
+ do_forward = [True]
+ do_backward = [True]
+
while True:
def r(txn):
- txn.execute(select, (next_chunk, self.batch_size,))
- rows = txn.fetchall()
- headers = [column[0] for column in txn.description]
+ forward_rows = []
+ backward_rows = []
+ if do_forward[0]:
+ txn.execute(forward_select, (forward_chunk, self.batch_size,))
+ forward_rows = txn.fetchall()
+ if not forward_rows:
+ do_forward[0] = False
+
+ if do_backward[0]:
+ txn.execute(backward_select, (backward_chunk, self.batch_size,))
+ backward_rows = txn.fetchall()
+ if not backward_rows:
+ do_backward[0] = False
+
+ if forward_rows or backward_rows:
+ headers = [column[0] for column in txn.description]
+ else:
+ headers = None
- return headers, rows
+ return headers, forward_rows, backward_rows
- headers, rows = yield self.sqlite_store.runInteraction("select", r)
+ headers, frows, brows = yield self.sqlite_store.runInteraction(
+ "select", r
+ )
- if rows:
- next_chunk = rows[-1][0] + 1
+ if frows or brows:
+ if frows:
+ forward_chunk = max(row[0] for row in frows) + 1
+ if brows:
+ backward_chunk = min(row[0] for row in brows) - 1
+ rows = frows + brows
self._convert_rows(table, headers, rows)
def insert(txn):
@@ -247,7 +299,10 @@ class Porter(object):
txn,
table="port_from_sqlite3",
keyvalues={"table_name": table},
- updatevalues={"rowid": next_chunk},
+ updatevalues={
+ "forward_rowid": forward_chunk,
+ "backward_rowid": backward_chunk,
+ },
)
yield self.postgres_store.execute(insert)
@@ -259,7 +314,8 @@ class Porter(object):
return
@defer.inlineCallbacks
- def handle_search_table(self, postgres_size, table_size, next_chunk):
+ def handle_search_table(self, postgres_size, table_size, forward_chunk,
+ backward_chunk):
select = (
"SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering"
" FROM event_search as es"
@@ -270,7 +326,7 @@ class Porter(object):
while True:
def r(txn):
- txn.execute(select, (next_chunk, self.batch_size,))
+ txn.execute(select, (forward_chunk, self.batch_size,))
rows = txn.fetchall()
headers = [column[0] for column in txn.description]
@@ -279,7 +335,7 @@ class Porter(object):
headers, rows = yield self.sqlite_store.runInteraction("select", r)
if rows:
- next_chunk = rows[-1][0] + 1
+ forward_chunk = rows[-1][0] + 1
# We have to treat event_search differently since it has a
# different structure in the two different databases.
@@ -312,7 +368,10 @@ class Porter(object):
txn,
table="port_from_sqlite3",
keyvalues={"table_name": "event_search"},
- updatevalues={"rowid": next_chunk},
+ updatevalues={
+ "forward_rowid": forward_chunk,
+ "backward_rowid": backward_chunk,
+ },
)
yield self.postgres_store.execute(insert)
@@ -324,7 +383,6 @@ class Porter(object):
else:
return
-
def setup_db(self, db_config, database_engine):
db_conn = database_engine.module.connect(
**{
@@ -395,10 +453,32 @@ class Porter(object):
txn.execute(
"CREATE TABLE port_from_sqlite3 ("
" table_name varchar(100) NOT NULL UNIQUE,"
- " rowid bigint NOT NULL"
+ " forward_rowid bigint NOT NULL,"
+ " backward_rowid bigint NOT NULL"
")"
)
+ # The old port script created a table with just a "rowid" column.
+ # We want people to be able to rerun this script from an old port
+ # so that they can pick up any missing events that were not
+ # ported across.
+ def alter_table(txn):
+ txn.execute(
+ "ALTER TABLE IF EXISTS port_from_sqlite3"
+ " RENAME rowid TO forward_rowid"
+ )
+ txn.execute(
+ "ALTER TABLE IF EXISTS port_from_sqlite3"
+ " ADD backward_rowid bigint NOT NULL DEFAULT 0"
+ )
+
+ try:
+ yield self.postgres_store.runInteraction(
+ "alter_table", alter_table
+ )
+ except Exception as e:
+ logger.info("Failed to create port table: %s", e)
+
try:
yield self.postgres_store.runInteraction(
"create_port_table", create_port_table
@@ -458,7 +538,7 @@ class Porter(object):
@defer.inlineCallbacks
def _setup_sent_transactions(self):
# Only save things from the last day
- yesterday = int(time.time()*1000) - 86400000
+ yesterday = int(time.time() * 1000) - 86400000
# And save the max transaction id from each destination
select = (
@@ -514,7 +594,11 @@ class Porter(object):
yield self.postgres_store._simple_insert(
table="port_from_sqlite3",
- values={"table_name": "sent_transactions", "rowid": next_chunk}
+ values={
+ "table_name": "sent_transactions",
+ "forward_rowid": next_chunk,
+ "backward_rowid": 0,
+ }
)
def get_sent_table_size(txn):
@@ -535,13 +619,18 @@ class Porter(object):
defer.returnValue((next_chunk, inserted_rows, total_count))
@defer.inlineCallbacks
- def _get_remaining_count_to_port(self, table, next_chunk):
- rows = yield self.sqlite_store.execute_sql(
+ def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk):
+ frows = yield self.sqlite_store.execute_sql(
"SELECT count(*) FROM %s WHERE rowid >= ?" % (table,),
- next_chunk,
+ forward_chunk,
)
- defer.returnValue(rows[0][0])
+ brows = yield self.sqlite_store.execute_sql(
+ "SELECT count(*) FROM %s WHERE rowid <= ?" % (table,),
+ backward_chunk,
+ )
+
+ defer.returnValue(frows[0][0] + brows[0][0])
@defer.inlineCallbacks
def _get_already_ported_count(self, table):
@@ -552,10 +641,10 @@ class Porter(object):
defer.returnValue(rows[0][0])
@defer.inlineCallbacks
- def _get_total_count_to_port(self, table, next_chunk):
+ def _get_total_count_to_port(self, table, forward_chunk, backward_chunk):
remaining, done = yield defer.gatherResults(
[
- self._get_remaining_count_to_port(table, next_chunk),
+ self._get_remaining_count_to_port(table, forward_chunk, backward_chunk),
self._get_already_ported_count(table),
],
consumeErrors=True,
@@ -686,7 +775,7 @@ class CursesProgress(Progress):
color = curses.color_pair(2) if perc == 100 else curses.color_pair(1)
self.stdscr.addstr(
- i+2, left_margin + max_len - len(table),
+ i + 2, left_margin + max_len - len(table),
table,
curses.A_BOLD | color,
)
@@ -694,18 +783,18 @@ class CursesProgress(Progress):
size = 20
progress = "[%s%s]" % (
- "#" * int(perc*size/100),
- " " * (size - int(perc*size/100)),
+ "#" * int(perc * size / 100),
+ " " * (size - int(perc * size / 100)),
)
self.stdscr.addstr(
- i+2, left_margin + max_len + middle_space,
+ i + 2, left_margin + max_len + middle_space,
"%s %3d%% (%d/%d)" % (progress, perc, data["num_done"], data["total"]),
)
if self.finished:
self.stdscr.addstr(
- rows-1, 0,
+ rows - 1, 0,
"Press any key to exit...",
)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index b06387051c..c6ed720166 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -314,6 +314,40 @@ class FederationClient(FederationBase):
Deferred: Results in a list of PDUs.
"""
+ try:
+ # First we try and ask for just the IDs, as thats far quicker if
+ # we have most of the state and auth_chain already.
+ # However, this may 404 if the other side has an old synapse.
+ result = yield self.transport_layer.get_room_state_ids(
+ destination, room_id, event_id=event_id,
+ )
+
+ state_event_ids = result["pdu_ids"]
+ auth_event_ids = result.get("auth_chain_ids", [])
+
+ fetched_events, failed_to_fetch = yield self.get_events(
+ [destination], room_id, set(state_event_ids + auth_event_ids)
+ )
+
+ if failed_to_fetch:
+ logger.warn("Failed to get %r", failed_to_fetch)
+
+ event_map = {
+ ev.event_id: ev for ev in fetched_events
+ }
+
+ pdus = [event_map[e_id] for e_id in state_event_ids]
+ auth_chain = [event_map[e_id] for e_id in auth_event_ids]
+
+ auth_chain.sort(key=lambda e: e.depth)
+
+ defer.returnValue((pdus, auth_chain))
+ except HttpResponseException as e:
+ if e.code == 404:
+ logger.info("Failed to use get_room_state_ids API, falling back")
+ else:
+ raise e
+
result = yield self.transport_layer.get_room_state(
destination, room_id, event_id=event_id,
)
@@ -340,6 +374,67 @@ class FederationClient(FederationBase):
defer.returnValue((signed_pdus, signed_auth))
@defer.inlineCallbacks
+ def get_events(self, destinations, room_id, event_ids, return_local=True):
+ """Fetch events from some remote destinations, checking if we already
+ have them.
+
+ Args:
+ destinations (list)
+ room_id (str)
+ event_ids (list)
+ return_local (bool): Whether to include events we already have in
+ the DB in the returned list of events
+
+ Returns:
+ Deferred: A deferred resolving to a 2-tuple where the first is a list of
+ events and the second is a list of event ids that we failed to fetch.
+ """
+ if return_local:
+ seen_events = yield self.store.get_events(event_ids)
+ signed_events = seen_events.values()
+ else:
+ seen_events = yield self.store.have_events(event_ids)
+ signed_events = []
+
+ failed_to_fetch = set()
+
+ missing_events = set(event_ids)
+ for k in seen_events:
+ missing_events.discard(k)
+
+ if not missing_events:
+ defer.returnValue((signed_events, failed_to_fetch))
+
+ def random_server_list():
+ srvs = list(destinations)
+ random.shuffle(srvs)
+ return srvs
+
+ batch_size = 20
+ missing_events = list(missing_events)
+ for i in xrange(0, len(missing_events), batch_size):
+ batch = set(missing_events[i:i + batch_size])
+
+ deferreds = [
+ self.get_pdu(
+ destinations=random_server_list(),
+ event_id=e_id,
+ )
+ for e_id in batch
+ ]
+
+ res = yield defer.DeferredList(deferreds, consumeErrors=True)
+ for success, result in res:
+ if success:
+ signed_events.append(result)
+ batch.discard(result.event_id)
+
+ # We removed all events we successfully fetched from `batch`
+ failed_to_fetch.update(batch)
+
+ defer.returnValue((signed_events, failed_to_fetch))
+
+ @defer.inlineCallbacks
@log_function
def get_event_auth(self, destination, room_id, event_id):
res = yield self.transport_layer.get_event_auth(
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 612d274bdb..aba19639c7 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -215,6 +215,27 @@ class FederationServer(FederationBase):
defer.returnValue((200, resp))
@defer.inlineCallbacks
+ def on_state_ids_request(self, origin, room_id, event_id):
+ if not event_id:
+ raise NotImplementedError("Specify an event")
+
+ in_room = yield self.auth.check_host_in_room(room_id, origin)
+ if not in_room:
+ raise AuthError(403, "Host not in room.")
+
+ pdus = yield self.handler.get_state_for_pdu(
+ room_id, event_id,
+ )
+ auth_chain = yield self.store.get_auth_chain(
+ [pdu.event_id for pdu in pdus]
+ )
+
+ defer.returnValue((200, {
+ "pdu_ids": [pdu.event_id for pdu in pdus],
+ "auth_chain_ids": [pdu.event_id for pdu in auth_chain],
+ }))
+
+ @defer.inlineCallbacks
def _on_context_state_request_compute(self, room_id, event_id):
pdus = yield self.handler.get_state_for_pdu(
room_id, event_id,
@@ -372,27 +393,9 @@ class FederationServer(FederationBase):
(200, send_content)
)
- @defer.inlineCallbacks
@log_function
def on_query_client_keys(self, origin, content):
- query = []
- for user_id, device_ids in content.get("device_keys", {}).items():
- if not device_ids:
- query.append((user_id, None))
- else:
- for device_id in device_ids:
- query.append((user_id, device_id))
-
- results = yield self.store.get_e2e_device_keys(query)
-
- json_result = {}
- for user_id, device_keys in results.items():
- for device_id, json_bytes in device_keys.items():
- json_result.setdefault(user_id, {})[device_id] = json.loads(
- json_bytes
- )
-
- defer.returnValue({"device_keys": json_result})
+ return self.on_query_request("client_keys", content)
@defer.inlineCallbacks
@log_function
@@ -602,7 +605,7 @@ class FederationServer(FederationBase):
origin, pdu.room_id, pdu.event_id,
)
except:
- logger.warn("Failed to get state for event: %s", pdu.event_id)
+ logger.exception("Failed to get state for event: %s", pdu.event_id)
yield self.handler.on_receive_pdu(
origin,
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index ebb698e278..3d088e43cb 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -55,6 +55,28 @@ class TransportLayerClient(object):
)
@log_function
+ def get_room_state_ids(self, destination, room_id, event_id):
+ """ Requests all state for a given room from the given server at the
+ given event. Returns the state's event_id's
+
+ Args:
+ destination (str): The host name of the remote home server we want
+ to get the state from.
+ context (str): The name of the context we want the state of
+ event_id (str): The event we want the context at.
+
+ Returns:
+ Deferred: Results in a dict received from the remote homeserver.
+ """
+ logger.debug("get_room_state_ids dest=%s, room=%s",
+ destination, room_id)
+
+ path = PREFIX + "/state_ids/%s/" % room_id
+ return self.client.get_json(
+ destination, path=path, args={"event_id": event_id},
+ )
+
+ @log_function
def get_event(self, destination, event_id, timeout=None):
""" Requests the pdu with give id and origin from the given server.
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 26fa88ae84..0bc6e0801d 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -271,6 +271,17 @@ class FederationStateServlet(BaseFederationServlet):
)
+class FederationStateIdsServlet(BaseFederationServlet):
+ PATH = "/state_ids/(?P<room_id>[^/]*)/"
+
+ def on_GET(self, origin, content, query, room_id):
+ return self.handler.on_state_ids_request(
+ origin,
+ room_id,
+ query.get("event_id", [None])[0],
+ )
+
+
class FederationBackfillServlet(BaseFederationServlet):
PATH = "/backfill/(?P<context>[^/]*)/"
@@ -367,10 +378,8 @@ class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
class FederationClientKeysQueryServlet(BaseFederationServlet):
PATH = "/user/keys/query"
- @defer.inlineCallbacks
def on_POST(self, origin, content, query):
- response = yield self.handler.on_query_client_keys(origin, content)
- defer.returnValue((200, response))
+ return self.handler.on_query_client_keys(origin, content)
class FederationClientKeysClaimServlet(BaseFederationServlet):
@@ -538,6 +547,7 @@ SERVLET_CLASSES = (
FederationPullServlet,
FederationEventServlet,
FederationStateServlet,
+ FederationStateIdsServlet,
FederationBackfillServlet,
FederationQueryServlet,
FederationMakeJoinServlet,
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index f4bf159bb5..8d630c6b1a 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -29,7 +29,7 @@ class DeviceHandler(BaseHandler):
@defer.inlineCallbacks
def check_device_registered(self, user_id, device_id,
- initial_device_display_name):
+ initial_device_display_name=None):
"""
If the given device has not been registered, register it with the
supplied display name.
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
new file mode 100644
index 0000000000..2c7bfd91ed
--- /dev/null
+++ b/synapse/handlers/e2e_keys.py
@@ -0,0 +1,139 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import json
+import logging
+
+from twisted.internet import defer
+
+from synapse.api import errors
+import synapse.types
+
+logger = logging.getLogger(__name__)
+
+
+class E2eKeysHandler(object):
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+ self.federation = hs.get_replication_layer()
+ self.is_mine_id = hs.is_mine_id
+ self.server_name = hs.hostname
+
+ # doesn't really work as part of the generic query API, because the
+ # query request requires an object POST, but we abuse the
+ # "query handler" interface.
+ self.federation.register_query_handler(
+ "client_keys", self.on_federation_query_client_keys
+ )
+
+ @defer.inlineCallbacks
+ def query_devices(self, query_body):
+ """ Handle a device key query from a client
+
+ {
+ "device_keys": {
+ "<user_id>": ["<device_id>"]
+ }
+ }
+ ->
+ {
+ "device_keys": {
+ "<user_id>": {
+ "<device_id>": {
+ ...
+ }
+ }
+ }
+ }
+ """
+ device_keys_query = query_body.get("device_keys", {})
+
+ # separate users by domain.
+ # make a map from domain to user_id to device_ids
+ queries_by_domain = collections.defaultdict(dict)
+ for user_id, device_ids in device_keys_query.items():
+ user = synapse.types.UserID.from_string(user_id)
+ queries_by_domain[user.domain][user_id] = device_ids
+
+ # do the queries
+ # TODO: do these in parallel
+ results = {}
+ for destination, destination_query in queries_by_domain.items():
+ if destination == self.server_name:
+ res = yield self.query_local_devices(destination_query)
+ else:
+ res = yield self.federation.query_client_keys(
+ destination, {"device_keys": destination_query}
+ )
+ res = res["device_keys"]
+ for user_id, keys in res.items():
+ if user_id in destination_query:
+ results[user_id] = keys
+
+ defer.returnValue((200, {"device_keys": results}))
+
+ @defer.inlineCallbacks
+ def query_local_devices(self, query):
+ """Get E2E device keys for local users
+
+ Args:
+ query (dict[string, list[string]|None): map from user_id to a list
+ of devices to query (None for all devices)
+
+ Returns:
+ defer.Deferred: (resolves to dict[string, dict[string, dict]]):
+ map from user_id -> device_id -> device details
+ """
+ local_query = []
+
+ result_dict = {}
+ for user_id, device_ids in query.items():
+ if not self.is_mine_id(user_id):
+ logger.warning("Request for keys for non-local user %s",
+ user_id)
+ raise errors.SynapseError(400, "Not a user here")
+
+ if not device_ids:
+ local_query.append((user_id, None))
+ else:
+ for device_id in device_ids:
+ local_query.append((user_id, device_id))
+
+ # make sure that each queried user appears in the result dict
+ result_dict[user_id] = {}
+
+ results = yield self.store.get_e2e_device_keys(local_query)
+
+ # Build the result structure, un-jsonify the results, and add the
+ # "unsigned" section
+ for user_id, device_keys in results.items():
+ for device_id, device_info in device_keys.items():
+ r = json.loads(device_info["key_json"])
+ r["unsigned"] = {}
+ display_name = device_info["device_display_name"]
+ if display_name is not None:
+ r["unsigned"]["device_display_name"] = display_name
+ result_dict[user_id][device_id] = r
+
+ defer.returnValue(result_dict)
+
+ @defer.inlineCallbacks
+ def on_federation_query_client_keys(self, query_body):
+ """ Handle a device key query from a federated server
+ """
+ device_keys_query = query_body.get("device_keys", {})
+ res = yield self.query_local_devices(device_keys_query)
+ defer.returnValue({"device_keys": res})
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index dc1d4d8fc6..c5ff16adf3 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -130,9 +130,7 @@ class KeyUploadServlet(RestServlet):
# old access_token without an associated device_id. Either way, we
# need to double-check the device is registered to avoid ending up with
# keys without a corresponding device.
- self.device_handler.check_device_registered(
- user_id, device_id, "unknown device"
- )
+ self.device_handler.check_device_registered(user_id, device_id)
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
defer.returnValue((200, {"one_time_key_counts": result}))
@@ -186,17 +184,19 @@ class KeyQueryServlet(RestServlet):
)
def __init__(self, hs):
+ """
+ Args:
+ hs (synapse.server.HomeServer):
+ """
super(KeyQueryServlet, self).__init__()
- self.store = hs.get_datastore()
self.auth = hs.get_auth()
- self.federation = hs.get_replication_layer()
- self.is_mine = hs.is_mine
+ self.e2e_keys_handler = hs.get_e2e_keys_handler()
@defer.inlineCallbacks
def on_POST(self, request, user_id, device_id):
yield self.auth.get_user_by_req(request)
body = parse_json_object_from_request(request)
- result = yield self.handle_request(body)
+ result = yield self.e2e_keys_handler.query_devices(body)
defer.returnValue(result)
@defer.inlineCallbacks
@@ -205,45 +205,11 @@ class KeyQueryServlet(RestServlet):
auth_user_id = requester.user.to_string()
user_id = user_id if user_id else auth_user_id
device_ids = [device_id] if device_id else []
- result = yield self.handle_request(
+ result = yield self.e2e_keys_handler.query_devices(
{"device_keys": {user_id: device_ids}}
)
defer.returnValue(result)
- @defer.inlineCallbacks
- def handle_request(self, body):
- local_query = []
- remote_queries = {}
- for user_id, device_ids in body.get("device_keys", {}).items():
- user = UserID.from_string(user_id)
- if self.is_mine(user):
- if not device_ids:
- local_query.append((user_id, None))
- else:
- for device_id in device_ids:
- local_query.append((user_id, device_id))
- else:
- remote_queries.setdefault(user.domain, {})[user_id] = list(
- device_ids
- )
- results = yield self.store.get_e2e_device_keys(local_query)
-
- json_result = {}
- for user_id, device_keys in results.items():
- for device_id, json_bytes in device_keys.items():
- json_result.setdefault(user_id, {})[device_id] = json.loads(
- json_bytes
- )
-
- for destination, device_keys in remote_queries.items():
- remote_result = yield self.federation.query_client_keys(
- destination, {"device_keys": device_keys}
- )
- for user_id, keys in remote_result["device_keys"].items():
- if user_id in device_keys:
- json_result[user_id] = keys
- defer.returnValue((200, {"device_keys": json_result}))
-
class OneTimeKeyServlet(RestServlet):
"""
diff --git a/synapse/server.py b/synapse/server.py
index e8b166990d..6bb4988309 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -19,39 +19,38 @@
# partial one for unit test mocking.
# Imports required for the default HomeServer() implementation
-from twisted.web.client import BrowserLikePolicyForHTTPS
+import logging
+
from twisted.enterprise import adbapi
+from twisted.web.client import BrowserLikePolicyForHTTPS
-from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.api.auth import Auth
+from synapse.api.filtering import Filtering
+from synapse.api.ratelimiting import Ratelimiter
from synapse.appservice.api import ApplicationServiceApi
+from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.crypto.keyring import Keyring
+from synapse.events.builder import EventBuilderFactory
from synapse.federation import initialize_http_replication
-from synapse.handlers.device import DeviceHandler
-from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
-from synapse.notifier import Notifier
-from synapse.api.auth import Auth
from synapse.handlers import Handlers
+from synapse.handlers.appservice import ApplicationServicesHandler
+from synapse.handlers.auth import AuthHandler
+from synapse.handlers.device import DeviceHandler
+from synapse.handlers.e2e_keys import E2eKeysHandler
from synapse.handlers.presence import PresenceHandler
+from synapse.handlers.room import RoomListHandler
from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import TypingHandler
-from synapse.handlers.room import RoomListHandler
-from synapse.handlers.auth import AuthHandler
-from synapse.handlers.appservice import ApplicationServicesHandler
+from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
+from synapse.http.matrixfederationclient import MatrixFederationHttpClient
+from synapse.notifier import Notifier
+from synapse.push.pusherpool import PusherPool
+from synapse.rest.media.v1.media_repository import MediaRepository
from synapse.state import StateHandler
from synapse.storage import DataStore
+from synapse.streams.events import EventSources
from synapse.util import Clock
from synapse.util.distributor import Distributor
-from synapse.streams.events import EventSources
-from synapse.api.ratelimiting import Ratelimiter
-from synapse.crypto.keyring import Keyring
-from synapse.push.pusherpool import PusherPool
-from synapse.events.builder import EventBuilderFactory
-from synapse.api.filtering import Filtering
-from synapse.rest.media.v1.media_repository import MediaRepository
-
-from synapse.http.matrixfederationclient import MatrixFederationHttpClient
-
-import logging
-
logger = logging.getLogger(__name__)
@@ -94,6 +93,7 @@ class HomeServer(object):
'room_list_handler',
'auth_handler',
'device_handler',
+ 'e2e_keys_handler',
'application_service_api',
'application_service_scheduler',
'application_service_handler',
@@ -202,6 +202,9 @@ class HomeServer(object):
def build_device_handler(self):
return DeviceHandler(self)
+ def build_e2e_keys_handler(self):
+ return E2eKeysHandler(self)
+
def build_application_service_api(self):
return ApplicationServiceApi(self)
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 902f725c06..c0aa868c4f 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -1,6 +1,7 @@
import synapse.handlers
import synapse.handlers.auth
import synapse.handlers.device
+import synapse.handlers.e2e_keys
import synapse.storage
import synapse.state
@@ -14,6 +15,9 @@ class HomeServer(object):
def get_device_handler(self) -> synapse.handlers.device.DeviceHandler:
pass
+ def get_e2e_keys_handler(self) -> synapse.handlers.e2e_keys.E2eKeysHandler:
+ pass
+
def get_handlers(self) -> synapse.handlers.Handlers:
pass
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 62b7790e91..385d607056 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import collections
import twisted.internet.defer
@@ -38,24 +39,49 @@ class EndToEndKeyStore(SQLBaseStore):
query_list(list): List of pairs of user_ids and device_ids.
Returns:
Dict mapping from user-id to dict mapping from device_id to
- key json byte strings.
+ dict containing "key_json", "device_display_name".
"""
- def _get_e2e_device_keys(txn):
- result = {}
- for user_id, device_id in query_list:
- user_result = result.setdefault(user_id, {})
- keyvalues = {"user_id": user_id}
- if device_id:
- keyvalues["device_id"] = device_id
- rows = self._simple_select_list_txn(
- txn, table="e2e_device_keys_json",
- keyvalues=keyvalues,
- retcols=["device_id", "key_json"]
- )
- for row in rows:
- user_result[row["device_id"]] = row["key_json"]
- return result
- return self.runInteraction("get_e2e_device_keys", _get_e2e_device_keys)
+ if not query_list:
+ return {}
+
+ return self.runInteraction(
+ "get_e2e_device_keys", self._get_e2e_device_keys_txn, query_list
+ )
+
+ def _get_e2e_device_keys_txn(self, txn, query_list):
+ query_clauses = []
+ query_params = []
+
+ for (user_id, device_id) in query_list:
+ query_clause = "k.user_id = ?"
+ query_params.append(user_id)
+
+ if device_id:
+ query_clause += " AND k.device_id = ?"
+ query_params.append(device_id)
+
+ query_clauses.append(query_clause)
+
+ sql = (
+ "SELECT k.user_id, k.device_id, "
+ " d.display_name AS device_display_name, "
+ " k.key_json"
+ " FROM e2e_device_keys_json k"
+ " LEFT JOIN devices d ON d.user_id = k.user_id"
+ " AND d.device_id = k.device_id"
+ " WHERE %s"
+ ) % (
+ " OR ".join("(" + q + ")" for q in query_clauses)
+ )
+
+ txn.execute(sql, query_params)
+ rows = self.cursor_to_dict(txn)
+
+ result = collections.defaultdict(dict)
+ for row in rows:
+ result[row["user_id"]][row["device_id"]] = row
+
+ return result
def add_e2e_one_time_keys(self, user_id, device_id, time_now, key_list):
def _add_e2e_one_time_keys(txn):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c63ca36df6..4664cfe6d9 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -26,7 +26,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from canonicaljson import encode_canonical_json
-from collections import deque, namedtuple
+from collections import deque, namedtuple, OrderedDict
import synapse
import synapse.metrics
@@ -403,6 +403,23 @@ class EventsStore(SQLBaseStore):
and the rejections table. Things reading from those table will need to check
whether the event was rejected.
"""
+ # Ensure that we don't have the same event twice.
+ # Pick the earliest non-outlier if there is one, else the earliest one.
+ new_events_and_contexts = OrderedDict()
+ for event, context in events_and_contexts:
+ prev_event_context = new_events_and_contexts.get(event.event_id)
+ if prev_event_context:
+ if not event.internal_metadata.is_outlier():
+ if prev_event_context[0].internal_metadata.is_outlier():
+ # To ensure correct ordering we pop, as OrderedDict is
+ # ordered by first insertion.
+ new_events_and_contexts.pop(event.event_id, None)
+ new_events_and_contexts[event.event_id] = (event, context)
+ else:
+ new_events_and_contexts[event.event_id] = (event, context)
+
+ events_and_contexts = new_events_and_contexts.values()
+
depth_updates = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
@@ -433,8 +450,6 @@ class EventsStore(SQLBaseStore):
for event_id, outlier in txn.fetchall()
}
- # Remove the events that we've seen before.
- event_map = {}
to_remove = set()
for event, context in events_and_contexts:
if context.rejected:
@@ -445,23 +460,6 @@ class EventsStore(SQLBaseStore):
to_remove.add(event)
continue
- # Handle the case of the list including the same event multiple
- # times. The tricky thing here is when they differ by whether
- # they are an outlier.
- if event.event_id in event_map:
- other = event_map[event.event_id]
-
- if not other.internal_metadata.is_outlier():
- to_remove.add(event)
- continue
- elif not event.internal_metadata.is_outlier():
- to_remove.add(event)
- continue
- else:
- to_remove.add(other)
-
- event_map[event.event_id] = event
-
if event.event_id not in have_persisted:
continue
diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql
index 140f2b63e0..aa4a3b9f2f 100644
--- a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql
+++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql
@@ -16,4 +16,4 @@
-- make sure that we have a device record for each set of E2E keys, so that the
-- user can delete them if they like.
INSERT INTO devices
- SELECT user_id, device_id, 'unknown device' FROM e2e_device_keys_json;
+ SELECT user_id, device_id, NULL FROM e2e_device_keys_json;
diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql
new file mode 100644
index 0000000000..6671573398
--- /dev/null
+++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql
@@ -0,0 +1,20 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- a previous version of the "devices_for_e2e_keys" delta set all the device
+-- names to "unknown device". This wasn't terribly helpful
+UPDATE devices
+ SET display_name = NULL
+ WHERE display_name = 'unknown device';
diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py
new file mode 100644
index 0000000000..878a54dc34
--- /dev/null
+++ b/tests/handlers/test_e2e_keys.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+from twisted.internet import defer
+
+import synapse.api.errors
+import synapse.handlers.e2e_keys
+
+import synapse.storage
+from tests import unittest, utils
+
+
+class E2eKeysHandlerTestCase(unittest.TestCase):
+ def __init__(self, *args, **kwargs):
+ super(E2eKeysHandlerTestCase, self).__init__(*args, **kwargs)
+ self.hs = None # type: synapse.server.HomeServer
+ self.handler = None # type: synapse.handlers.e2e_keys.E2eKeysHandler
+
+ @defer.inlineCallbacks
+ def setUp(self):
+ self.hs = yield utils.setup_test_homeserver(
+ handlers=None,
+ replication_layer=mock.Mock(),
+ )
+ self.handler = synapse.handlers.e2e_keys.E2eKeysHandler(self.hs)
+
+ @defer.inlineCallbacks
+ def test_query_local_devices_no_devices(self):
+ """If the user has no devices, we expect an empty list.
+ """
+ local_user = "@boris:" + self.hs.hostname
+ res = yield self.handler.query_local_devices({local_user: None})
+ self.assertDictEqual(res, {local_user: {}})
diff --git a/tests/storage/test_end_to_end_keys.py b/tests/storage/test_end_to_end_keys.py
new file mode 100644
index 0000000000..453bc61438
--- /dev/null
+++ b/tests/storage/test_end_to_end_keys.py
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+import tests.unittest
+import tests.utils
+
+
+class EndToEndKeyStoreTestCase(tests.unittest.TestCase):
+ def __init__(self, *args, **kwargs):
+ super(EndToEndKeyStoreTestCase, self).__init__(*args, **kwargs)
+ self.store = None # type: synapse.storage.DataStore
+
+ @defer.inlineCallbacks
+ def setUp(self):
+ hs = yield tests.utils.setup_test_homeserver()
+
+ self.store = hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def test_key_without_device_name(self):
+ now = 1470174257070
+ json = '{ "key": "value" }'
+
+ yield self.store.set_e2e_device_keys(
+ "user", "device", now, json)
+
+ res = yield self.store.get_e2e_device_keys((("user", "device"),))
+ self.assertIn("user", res)
+ self.assertIn("device", res["user"])
+ dev = res["user"]["device"]
+ self.assertDictContainsSubset({
+ "key_json": json,
+ "device_display_name": None,
+ }, dev)
+
+ @defer.inlineCallbacks
+ def test_get_key_with_device_name(self):
+ now = 1470174257070
+ json = '{ "key": "value" }'
+
+ yield self.store.set_e2e_device_keys(
+ "user", "device", now, json)
+ yield self.store.store_device(
+ "user", "device", "display_name"
+ )
+
+ res = yield self.store.get_e2e_device_keys((("user", "device"),))
+ self.assertIn("user", res)
+ self.assertIn("device", res["user"])
+ dev = res["user"]["device"]
+ self.assertDictContainsSubset({
+ "key_json": json,
+ "device_display_name": "display_name",
+ }, dev)
+
+ @defer.inlineCallbacks
+ def test_multiple_devices(self):
+ now = 1470174257070
+
+ yield self.store.set_e2e_device_keys(
+ "user1", "device1", now, 'json11')
+ yield self.store.set_e2e_device_keys(
+ "user1", "device2", now, 'json12')
+ yield self.store.set_e2e_device_keys(
+ "user2", "device1", now, 'json21')
+ yield self.store.set_e2e_device_keys(
+ "user2", "device2", now, 'json22')
+
+ res = yield self.store.get_e2e_device_keys((("user1", "device1"),
+ ("user2", "device2")))
+ self.assertIn("user1", res)
+ self.assertIn("device1", res["user1"])
+ self.assertNotIn("device2", res["user1"])
+ self.assertIn("user2", res)
+ self.assertNotIn("device1", res["user2"])
+ self.assertIn("device2", res["user2"])
|